Wednesday, January 29, 2014

Cascading Workflow for Spatial Binning of BigData

I just finished a 3 day training on Cascading by Concurrent and it was worth every minute. I always knew about Cascading, but never invested in it but I wish I had, specially last month when I was doing a BigData ETL job in MapReduce. My development time would have been significantly reduced (pun intended :-) if I thought of the problem in terms of a cascading water flow rather than in MapReduce.
So in Cascading, you compose a data flow with set of pipes having operations such as filtering, joining and grouping and it turns that flow into a MapReduce job that you can execute on a Hadoop cluster.
Being spatially aware, I _had_ to add a spatial function to Cascading using our GIS Tools For Hadoop geometry API. The spatial function that I decided to implement bins location data in the same area, in such that at the end of the process, each area has a count of the locations that is covers. This is a nice way to visualize massive data.
So, we start with:

to produce:

Again, rather than thinking in MapReduce, think data water flow:
package com.esri;
import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.operation.aggregator.Count;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.property.AppProps;
import cascading.scheme.hadoop.TextDelimited;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;
import java.net.URISyntaxException;
import java.util.Properties;
/**
* Hello spatial cascading !
*/
public class App
{
public static void main(final String[] args) throws URISyntaxException
{
if (args.length != 3)
{
System.err.println("Arguments: [data.tsv] [polygon.shp] [output]");
return;
}
final Fields inFields = new Fields("ID", "X", "Y").
applyTypes(long.class, double.class, double.class);
final Tap inTap = new Hfs(new TextDelimited(inFields, false, "\t"), args[0]);
final Tap outTap = new Hfs(new TextDelimited(true, ","), args[2], SinkMode.REPLACE);
final SpatialDensity spatialDensity = new SpatialDensity();
Pipe pipe = new Each("start", new Fields("X", "Y"), spatialDensity);
pipe = new GroupBy(pipe, spatialDensity.getFieldDeclaration());
pipe = new Every(pipe, Fields.GROUP, new Count(new Fields("POPULATION")));
final Properties properties = AppProps.appProps().
setJarClass(App.class).
buildProperties();
properties.put(SpatialDensity.KEY_SHP, args[1]);
final FlowConnector connector = new HadoopFlowConnector(properties);
final Flow flow = connector.connect(inTap, outTap, pipe);
flow.complete();
}
}

Here, I have an input tap that accepts text data from HDFS. Each text record is composed of fields separated by a tab character. In Cascading, a tap can define the field names and types. A pipe is created to select the “X” and “Y” fields to be processed by a function. This is a spatial function that utilizes the Esri Geometry API. It loads into an in-memory spatial index a set of polygons defined as an property value, and will be used to perform a point-in-polygon operation on each input X/Y tuple.  The overlapping polygon identifier is emitted as the pipe output. The output polygon identifiers are grouped together and counted by yet another pipe. The tuple set of polygon identifier/count is written to a comma separated HDFS based file using an output tap. The count field is labeled as POPULATION to make it ArcGIS friendly :-)
Like usual, all the source code can be found here.
 

No comments: