Wednesday, April 3, 2013

BigData: DataRush Workflow in ArcMap


At this year's DevSummit, we announced the GIS Tools for Hadoop project. Included in that project is a low level geometry java API which enables spatial operations in MapReduce jobs or the construction of higher level functions such as Hive User Defined Functions. However, this geometry library is not restricted to Hadoop MapReduce. It is used in Geo Event Processor, and can be used in Storm bolts or other parallel workflows. One such parallel workflow processing engine is Pervasive DataRush that I demoed at the DevSummit. Using the KNIME visual workflow, I was able to process 90 million records (BTW, small in the BigData world) in Hadoop File System, for heatmap visualization in ArcMap.


A DataRush workflow engine can run on a local machine or remotely on a cluster of machines and is fully extensible with custom operators.  An operator is a node in a workflow graph whose input and output can be linked to other operators. So, I wrote my own spatial operators that utilizes the geometry API.

The first operator is a simple spatial filter that passes through records that contain LAT/LON fields and are within a user defined distance from a given point. This uses the "GeometryEngine.geodesicDistanceOnWGS84" function and augments the input record fields with a distance "METERS" output field.

In the above application, a graph workflow reads a tab delimited file and sends its output to the new distance operator who sends its output to the a console log operator. Simple and easy, but not simplistic - if you notice that when I execute the application without any arguments, the graph runs on the local machine taking full advantage of all the local processors. However, if I specify a dummy argument, the graph takes its input from HDFS and executes remotely on a DataRush cluster. This simple toggle is pretty impressive when switching from development to production mode.

Next is to add the operator to the KNIME visual workflow.
The Pervasive folks made that really easy using a plugin for the Eclipse environment.

Using the Node Extension wizard, you can select an operator and it generates template code for the operator xml descriptor, the dialog pane, the model factory and the node settings.

This gets you 80% to 90% there. The rest is completing the XML node descriptor and laying out a nice dialog pane.


The final step is to export the compiled code into KNIME as a plugin - BTW, KNIME is based on Eclipse.  Within KNIME, you are in what I call a PHD (Push Here Dummy) mode, where you can drag and drop and link operators on the workbench.

A slightly more complicated operator, is a composite operator that, as its name suggests is composed of two or more operators, but the whole is exposed as one operator.
So, the next spatial operator extension to write is a composite one to perform a spatial join between a point input source and a polygon input source where the output is a flow indicating what polygon an input point is contained into.
The Spatial Join composite operator contains a non parallelizable operator that blocks to read first time through into an in memory model the polygons.  The polygons input format is in the Esri JSON text format and we use the "GeometryEngine.jsonToGeometry" to convert it into a Polygon POJO. The second embedded operator is parallel enabled and walks the input points and uses the builtin in memory model of polygons to find the containing one using the "GeometryEngine.contains" function.  This "build and walk" pattern is used whenever a join or lookup needs to be performed.

The above graph adds one extra built-in operator (group by) where the output is the number of points that each input polygon contains.  The result of such a flow, can be used to thematically color code the polygons in ArcMap.

Displaying thousands or millions of points on a map is, well....not very smart and does not make sense. But if all these points are binned and each bin is weighted proportionally to the points in that bin, then a map displaying the color coded bins with respect to their weight is much more appealing and conveys much more information, such as in the above map, where I was rendering a heatmap of locations with businesses with more than 100,000 employees.

So I wrote a terminal operator, the raster operator, that writes bin information as an Esri raster file in either float or ASCII format. The interesting thing about that operator is that it is a "fan in" operator and despite being part of a graph that is executed on a remote cluster, the raster operator is executed on the client machine. The client being the machine that submitted the graph to the cluster.

One last thing. A KNIME workflow can be exported into an external representation that can be imported into a logical graph for local or remote execution.

This means, I can turn that code into an ArcMap GeoProcessing extension enabling me to invoke a DataRush workflow as a Tool and combine it with other existing tools for further analysis and visualization.

In my workflow example, rather than me manually calling the Float To Raster GP Task after Executing the DataRush Tool, I can combine that into one ArcPy Script Tool:

So to recap, you can store data into HDFS, or use the new GeoProcessing tools for Hadoop. Compose DataRush workflows with spatial operators that you export to be executed on a cluster from ArcMap, whose result is consumed back by ArcMap for further local GeoProcessing or visualization.

This is very cool!

Like usual, all the source code can be downloaded from here and here.

No comments: