I just finished a 3 day training on Cascading by Concurrent and it was worth every minute. I always knew about Cascading, but never invested in it but I wish I had, specially last month when I was doing a BigData ETL job in MapReduce. My development time would have been significantly reduced (pun intended :-) if I thought of the problem in terms of a cascading water flow rather than in MapReduce.
So in Cascading, you compose a data flow with set of pipes having operations such as filtering, joining and grouping and it turns that flow into a MapReduce job that you can execute on a Hadoop cluster.
Being spatially aware, I _had_ to add a spatial function to Cascading using our GIS Tools For Hadoop geometry API. The spatial function that I decided to implement bins location data in the same area, in such that at the end of the process, each area has a count of the locations that is covers. This is a nice way to visualize massive data.
So, we start with:
to produce:
Again, rather than thinking in MapReduce, think data water flow:
Here, I have an input tap that accepts text data from HDFS. Each text record is composed of fields separated by a tab character. In Cascading, a tap can define the field names and types. A pipe is created to select the “X” and “Y” fields to be processed by a function. This is a spatial function that utilizes the Esri Geometry API. It loads into an in-memory spatial index a set of polygons defined as an property value, and will be used to perform a point-in-polygon operation on each input X/Y tuple. The overlapping polygon identifier is emitted as the pipe output. The output polygon identifiers are grouped together and counted by yet another pipe. The tuple set of polygon identifier/count is written to a comma separated HDFS based file using an output tap. The count field is labeled as POPULATION to make it ArcGIS friendly :-)
Like usual, all the source code can be found here.
So in Cascading, you compose a data flow with set of pipes having operations such as filtering, joining and grouping and it turns that flow into a MapReduce job that you can execute on a Hadoop cluster.
Being spatially aware, I _had_ to add a spatial function to Cascading using our GIS Tools For Hadoop geometry API. The spatial function that I decided to implement bins location data in the same area, in such that at the end of the process, each area has a count of the locations that is covers. This is a nice way to visualize massive data.
So, we start with:
to produce:
Again, rather than thinking in MapReduce, think data water flow:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.esri; | |
import cascading.flow.Flow; | |
import cascading.flow.FlowConnector; | |
import cascading.flow.hadoop.HadoopFlowConnector; | |
import cascading.operation.aggregator.Count; | |
import cascading.pipe.Each; | |
import cascading.pipe.Every; | |
import cascading.pipe.GroupBy; | |
import cascading.pipe.Pipe; | |
import cascading.property.AppProps; | |
import cascading.scheme.hadoop.TextDelimited; | |
import cascading.tap.SinkMode; | |
import cascading.tap.Tap; | |
import cascading.tap.hadoop.Hfs; | |
import cascading.tuple.Fields; | |
import java.net.URISyntaxException; | |
import java.util.Properties; | |
/** | |
* Hello spatial cascading ! | |
*/ | |
public class App | |
{ | |
public static void main(final String[] args) throws URISyntaxException | |
{ | |
if (args.length != 3) | |
{ | |
System.err.println("Arguments: [data.tsv] [polygon.shp] [output]"); | |
return; | |
} | |
final Fields inFields = new Fields("ID", "X", "Y"). | |
applyTypes(long.class, double.class, double.class); | |
final Tap inTap = new Hfs(new TextDelimited(inFields, false, "\t"), args[0]); | |
final Tap outTap = new Hfs(new TextDelimited(true, ","), args[2], SinkMode.REPLACE); | |
final SpatialDensity spatialDensity = new SpatialDensity(); | |
Pipe pipe = new Each("start", new Fields("X", "Y"), spatialDensity); | |
pipe = new GroupBy(pipe, spatialDensity.getFieldDeclaration()); | |
pipe = new Every(pipe, Fields.GROUP, new Count(new Fields("POPULATION"))); | |
final Properties properties = AppProps.appProps(). | |
setJarClass(App.class). | |
buildProperties(); | |
properties.put(SpatialDensity.KEY_SHP, args[1]); | |
final FlowConnector connector = new HadoopFlowConnector(properties); | |
final Flow flow = connector.connect(inTap, outTap, pipe); | |
flow.complete(); | |
} | |
} |
Here, I have an input tap that accepts text data from HDFS. Each text record is composed of fields separated by a tab character. In Cascading, a tap can define the field names and types. A pipe is created to select the “X” and “Y” fields to be processed by a function. This is a spatial function that utilizes the Esri Geometry API. It loads into an in-memory spatial index a set of polygons defined as an property value, and will be used to perform a point-in-polygon operation on each input X/Y tuple. The overlapping polygon identifier is emitted as the pipe output. The output polygon identifiers are grouped together and counted by yet another pipe. The tuple set of polygon identifier/count is written to a comma separated HDFS based file using an output tap. The count field is labeled as POPULATION to make it ArcGIS friendly :-)
Like usual, all the source code can be found here.