Sunday, May 4, 2014

Spatially Enabling In-Memory BigData Stores

I deeply believe that the future of BigData stores and processing will be driven by GPUs and purely based on distributed InMemory engines that is backed by something resilient to hardware failure like HDFS.
HBase, Accumulo, Cassandra depend heavily on their in-memory capabilities for their performance. And when it comes to processing, SQL is still King….MemSQL is combining both - pretty impressive.
However, ALL lack something that is so important in today’s BigData world and that is true spatial storage, index and processing of native points, lines and polygons. SpaceCurve is doing great progress on that front.
A lot of smart people have taken advantage of the native lexicographical indexing of these key value stores and used geohash to save, index, and search spatial elements, and have solved the Z-order range search. Though these are great implementation, I always thought that the end did not justify the means. There is a need for a true and effective BigData spatial capabilities.
I’ve been a big fan of Hazelcast for quite some time now and was always impressed by their technology. In their latest implementation, they have added a MapReduce API, in such that now you can send programs to data - very cool !
But…like the others, they lack the spatial aspect when it comes my world. So…here is a set of small tweaks that truly spatially enables this in-memory BigData engine. I’ve used the MapReduce API and the spatial index in an example to visualize hotspot conflict in Africa.

Like usual, all the source code can be downloaded from here.

Monday, March 3, 2014

Yet Another Temporal/Spatial Storage/Analysis of BigData

At this year’s FedUC, I presented an introduction and a practice session on spatial types in BigData. In these sessions, I demonstrated how to analyze temporal and spatial BigData in the form of Automatic Identification System.  This post discusses an ensemble of projects that made a section of this demo possible.  The storage and subsequence processing of the data is very specific to this project, where data is stored and analyzed in a temporal and then a spatial order. Please note the order; temporal then spatial. However, I see this pattern in a lot of BigData projects that I worked on.  This order enables us to take advantage of the native partitioning scheme of paths in HDFS for temporal indexing that we later augment with a “local” spatial index. So time, or more specifically an hour’s data file can be located by traversing the HDFS file system in the form /YYYY/MM/DD/HH/UUID, where YYYY is the year, MM is the numeric month, DD is the day, HH is the hour and UUID is a file with a unique name that holds all the data for that hour.  You can imagine a process, such as GeoEvent Processor that continuously adds data in this pattern. However, in my case, I received the data as a set of GZIP files and used the Hadoop MapReduce AISImport tool to place the data in the correct folders/files in HDFS. Once an hour file is closed, a spatial index file is created that enables the spatial search of the data at that hour.  This spatial index is based on the QuadTree model for point specific geometry and is initiated using the AISTools.  I rewrote my “ubiquitous” density MapReduce job to take into account this new spatial index, where now I can rapidly ask questions such as “What is the AIS density by unique MMSI and what does it look like at the entry of the harbor every day at 10AM in the month of January ?”  The following is a visualization of the answer in ArcMap from the Miami Port sample data.

Like usual, all the source code can be found here.

Wednesday, January 29, 2014

Cascading Workflow for Spatial Binning of BigData

I just finished a 3 day training on Cascading by Concurrent and it was worth every minute. I always knew about Cascading, but never invested in it but I wish I had, specially last month when I was doing a BigData ETL job in MapReduce. My development time would have been significantly reduced (pun intended :-) if I thought of the problem in terms of a cascading water flow rather than in MapReduce.
So in Cascading, you compose a data flow with set of pipes having operations such as filtering, joining and grouping and it turns that flow into a MapReduce job that you can execute on a Hadoop cluster.
Being spatially aware, I _had_ to add a spatial function to Cascading using our GIS Tools For Hadoop geometry API. The spatial function that I decided to implement bins location data in the same area, in such that at the end of the process, each area has a count of the locations that is covers. This is a nice way to visualize massive data.
So, we start with:

to produce:

Again, rather than thinking in MapReduce, think data water flow:

Here, I have an input tap that accepts text data from HDFS. Each text record is composed of fields separated by a tab character. In Cascading, a tap can define the field names and types. A pipe is created to select the “X” and “Y” fields to be processed by a function. This is a spatial function that utilizes the Esri Geometry API. It loads into an in-memory spatial index a set of polygons defined as an property value, and will be used to perform a point-in-polygon operation on each input X/Y tuple.  The overlapping polygon identifier is emitted as the pipe output. The output polygon identifiers are grouped together and counted by yet another pipe. The tuple set of polygon identifier/count is written to a comma separated HDFS based file using an output tap. The count field is labeled as POPULATION to make it ArcGIS friendly :-)
Like usual, all the source code can be found here.

Friday, January 24, 2014

Hadoop and Shapefiles

Shapefiles are still today the ubiquitous way to share and exchange geospatial data. I’ve been getting a lot of requests lately from BigData Hadoop users to read shapefiles directly off HDFS, I mean after all, the 3rd V (variety) should allow me to do that. Since the format of shapefiles was developed by Esri, there was always an "uneasiness" in me as an Esri employee in using third party open source tools (geotools and JTS) to read these shapefiles when we have just released on Github our geometry API. In addition, I always thought that these powerful libraries were too heavy for my needs, when I just wanted to plow through a shapefile in a map or reduce phase in a job. So, I decide to write my own simple java implementation that for now just reads points and polygons.  This 20% implementation should for now cover my 80% usage. I know that there exist a lot of java implementations on the net that read the shp and dbf format, but I wanted one that is tailored to by BigData needs specially when it comes to writable instances and more importantly, it generates geometry instances based on our geometry model and API. Like usual all the source code can be found here.

Wednesday, January 15, 2014

Apache Spark, Spatial Functions and ArcGIS for Desktop

A while back I watched with great fascination a webinar presented by UC Berkley amp lab on Spark and Shark. I wanted to spatially enable spark and has been on my todo list for a while.
Spark has “graduated” and has joined the real world as databricks and has raise some serious cash to take on map reduce. Even Cloudera is teaming up with databricks to support Spark.
So it was time for me to bring back that project to the front burner and I posted onto github a project that enables me to invoke a spark job from ArcGIS For Desktop to perform a density analysis on data residing in HDFS. The density calculation is based on a honeycomb style layer that I think produces some pretty neat looking maps.  Here is a sample:

Anyway, like usual all the source code can be found here. Have fun and happy new year.

Wednesday, October 30, 2013

BigData Spatial Joins

There has been a lot of research on performing spatial joins using Hadoop MapReduce on BigData, specially when both sets are very big. A notable one is the Spatial Hadoop project at the University of Minnesota. This post is derived from that body of work and uses the Esri Geometry API in the GIS Tools for Hadoop project to perform the spatial operations in the map and reduce phases.
Lexicographical join of native types (numerical,textural) in BigData can be performed in the map phase or in the reduce phase depending on the size of the input data and how much memory the mapper or reducer has access to. Actually, Hadoop provides a join framework package as this is a common pattern. The same can be applied for spatially joining two big data sets. It can be performed in the mapper phase or the reducer phase depending on the size of the data and how much memory each phase has access to.
Let's start with a map phase spatial join. Let's say you have a billion point records and you have the polygon areas of the US zip codes, and the task at hand is to find the count of points per zip code. Since the US zip code feature class is a 'small' set (~41,700), it can be fully loaded into each mapper memory space at start up. In addition, it can be in-memory spatially indexed using the API QuadTree for quick look up based on the envelope of each feature. The GeometryEngine comes with a handy 'contains' method that can further refine the spatial constraint. The source of the zip code feature class can be from the DistributedCache or from HDFS. In addition, it can be in the Esri JSON format or GeoJSON or in the OGC format . The GeometryEngine API can parse all these formats into Geometry POJOs. As the mapper is reading each point feature, it locates the zip code polygon where the point falls into and emits the zip code to the reducer. The reducer task is to sum the emitted values per zip code.
Now, let's say that you have a billion polygons that you want to spatially join with yet another billion polygons and return the intersection set. How do you proceed with doing that in MapReduce when clearly a billion features cannot all fit in the mapper space (at least not in a typical node that I have access to :-). This is where we take advantage of the shuffle and sort phase of Hadoop's MapReduce implementation to partition and group all the "similar" elements, in such that the "reduced" set can be held in memory for processing. In our case that similarity is based on spatial grouping. So, in the map phase, the extent that the two polygon sets occupy is projected onto a grid with predefined cell sizes. The minimum bounding rectangle (MBR) of the input polygon from each set is overlaid onto the grid. The overlapping cells are emitted as keys whose value is a tuple consisting of an input source reference and the polygon shape. So now, all the polygons that overlap a specific cell are sent to the same reducer. In the reduce phase, we know the cell that we are operating over as this is derived from the key, we divide the list of values based on their source into separate lists where we can now perform a spatial cartesian product from one list to the other. Actually, one of the lists can be spatially indexed for fast lookup. Since an input polygon MBR can straddle multiple cells, there is a trick to not dispatch multiple times the spatial join by different reducers. It is best explained using the below picture.

Given two MBRs of polygons A and B, and given a grid partitioning of depth 1 with 4 quads, we can see that A and B overlap the top two quads (1,0) (1,1). So the mapper will emit values (A,B) for Quad (1,0) and (1,1) as keys. In the reducer for key (1,0) we have a reference to A and B and we calculate the lower left corner of the intersection of A and B MBRs (red dot). Because the intersection point falls into the Quad (1,0), we can then proceed with the spatial join and emit the result. Now, in the reducer for key (1,1), we can see that the intersection point does _not_ fall into quad (1,1) indicating that the spatial join was performed or will be performed by a reducer for another quad, thus any further processing is stopped and nothing is emitted.
Neat trick. However, the question at hand is "What is the 'correct' grid size ?" This takes me back to my days when I was an SDE specialist and had to tweak the spatial grid sizes and levels of a layer for best performance based on the layer content and usage. This will be a nice follow on post on writing an MR preprocessing task to scan through the static data and 'recommend' a value. In addition, the current implementation is assuming that you will want to spatially join _all_ the polygons in the full extent. What is missing is a spatial index to enable an initial cookie cutting of a region to operate over. This is what the above project does. However, it assumes the data is static and can be migrated into a new spatially optimized layout. Yet another post on how to do that on dynamic existing data.
Like usual all the source code is available here.

Saturday, September 21, 2013

BigData: Apache Flume, HDFS and HBase

In this post, I will show how to log very large amount of web requests to a BigData storage for traffic analysis. The source code for the project is on github. We will rely on the logging library log4j and the associated Flume NG appender implementation. For storage, we will place the log information into a set of HDFS files or into an HBase table. The HDFS files will be mapped into a Hive table as partitions for query and aggregation.
In this demonstration, all web requests are handled by a very simple web application servlet that is mapped to an access url:
In practice, all logic is handled by the servlet and the logging of the request information will be handled by a servlet filter that uses the log4j logging API.
In this demonstration the path info will define the log level and the query string will define the log message content.
Log4j is configured using the resource to use a Flume NG appender:
The appender will send all log events to a Flume agent instantiated on the host defined by the property log4j.appender.flume.Hostname.
The log event body content is defined by the ConversionPattern property, where each of the string tokens are separated by a tab character. This is how the content will be stored into an HDFS file and parsed back into tokens to be stored as columns into an HBase table.
Before starting the web application, a flume agent has to be instantiated with the predefined configuration:
This agent has an avro source that accepts incoming events on port 51515. The source has an interceptor to ensure that all incoming events have a timestamp.  The events are forwarded to an HDFS sink through a memory channel.  The HDFS sink writes the events to a file where the path of the file is specified by the current year, month, day and hour. Each file is closed on the hour and all events are "rolled" into a newly created file.  The content of the file is in a pure text format in this demonstration.   For efficiency, I could have used a binary compressed format.
You can start an agent from the command line as follows:
$ flume-ng agent -n agent -c conf -f conf/
Or you can use a GUI like the Cloudera Manager:

The HDFS files are mapped onto a Hive partitioned table for query as follows:
And as file partitions are added onto the file system, the table can be altered to account for that new partition.  Here is an example:
The following is an agent configuration that will place all log events into an HBase table where each of the log event elements populates a column:
Note: Make sure to first create the HBase table using the shell:
create 'log4j',{NAME=>'info',VERSIONS=>1}
This agent has an avro source that listens for incoming event on port 61616 and forwards these event s to an HBase sink though a memory channel.  The Hbase sink uses a regular expression parser to tokenize the event body content and each token is respectively placed into a column in a table.
Like usual, all the source code is available here.