Tuesday, August 28, 2012

Big Data,Spatial Pig,Threaded Visualization

This post is PACKED with goodies - One of the ways to analyze large sets of data in the Hadoop File System without writing MapReduce jobs is to use Apache Pig. I highly recommend that you read Programming Pig, in addition to the online documentation. Pig Latin, the scripting language of Pig, is easy to understand, write and more importantly to extend. Since we do spatial stuff, the first goodie extends Pig Latin with a spatial function when analyzing data from HDFS. Here is the problem I posed to myself, given a very large set of records containing an X and Y field, and given a set of polygons, I want to produce a set of tuples containing the polygon id and the number of X/Y records in that polygon. Nothing that a GP/Py task cannot do, but needed the exercise and BTW, you can call Pig from Python (post for another day).
Pig, when executing in MapReduce mode, converts the Pig Latin script that you give it into a MapReduce job jar that it submits to Hadoop. You can register additional jars that get packaged into the job jar and define UDFs (User Defined Function) to extend Pig Latin with new capabilities. The UDF that I wrote returns the polygon identifier that contains a given X and Y values. The polygon set is read by the UDF at startup from a shapefile that is loaded into the Distributed Cache. The polygons are spatially indexed for fast query and PIP (Point In Polygon) operation during tuple (x,y) evaluation. The Evaluator extends the org.apache.pig.EvalFunc class and is constructed with a reference to the shapefile path in the Hadoop file system.

    public PIPShapefile(final String path)
    {
        m_path = path; // Save reference as field variable.
    }

A mapping of the referenced shapefile to an internal name should be defined. The book Programming Pig dwells in detail on this.

    @Override
    public List getCacheFiles()
    {
        final List list = new ArrayList(1);
        list.add(m_path + ".shp#file.shp"); // Note the #
        return list;
    }

Since the evaluator return the polygon identifier, we tell the function to return an integer.

    @Override
    public Schema outputSchema(Schema input)
    {
        return new Schema(new Schema.FieldSchema(null, DataType.INTEGER));
    }

Finally, we implement the exec method that will be executed on every input tuple. This is a quick and dirty PIP implementation using JTS (leaving to the reader as an exercise to expand and elaborate :-)

    @Override
    public Integer exec(final Tuple tuple) throws IOException
    {
        m_init.init();
        int rc = -1;
        if (tuple != null && tuple.size() > 1)
        {
            double minArea = Double.POSITIVE_INFINITY;
            final Double x = (Double) tuple.get(0);
            final Double y = (Double) tuple.get(1);
            // log.info(x + " " + y);
            m_envelope.init(x - OFFSET, x + OFFSET, y - OFFSET, y + OFFSET);
            final List list = m_spatialIndex.query(m_envelope);
            for (final Feature feature : list)
            {
                if (feature.envelope.covers(x, y))
                {
                    final double area = feature.envelope.getArea();
                    if (area < minArea)
                    {
                        minArea = area;
                        rc = feature.number;
                    }
                }
            }
        }
        return rc;
    }

Note the m_init.init() invocation, this is because there is no initialization entry point in the lifecycle of a UDF. So the m_init is a reference to an implementation that mutates after the first time through from a shape file loader to an empty function. I used the geotools library to load and spatially index the features for later processing.

    private interface IInit
    {
        void init() throws IOException;
    }
    
    private final class FTTInit implements IInit
    {
        public void init() throws IOException
        {
            m_spatialIndex = new STRtree();
            final ShapefileReader shapefileReader = new ShapefileReader(new ShpFiles("file.shp"), true, true, m_geometryFactory);
            try
            {
                shapefileReader.disableShxUsage();
                while (shapefileReader.hasNext())
                {
                    final ShapefileReader.Record record = shapefileReader.nextRecord();
                    final Envelope envelope = new Envelope(record.envelope());
                    final Feature feature = new Feature();
                    feature.envelope = envelope;
                    feature.number = record.number;
                    m_spatialIndex.insert(envelope, feature);
                }
            }
            finally
            {
                shapefileReader.close();
            }
            m_init = new NoopInit(); // NOOP after first time through !!
        }
    }

    private final class NoopInit implements IInit
    {
        public void init()
        {
        }
    }

Compile the class using the following jars and package all into a jar to be registered with Pig.

gt-api-9-SNAPSHOT.jar (include)
gt-data-9-SNAPSHOT.jar (include)
gt-main-9-SNAPSHOT.jar (include)
gt-metadata-9-SNAPSHOT.jar (include)
gt-opengis-9-SNAPSHOT.jar (include)
gt-shapefile-9-SNAPSHOT.jar (include)
jts-1.13.jar (include)
pig-0.10.0.jar (exclude)

You can download the source code for PIPShapefile.java from here.

Before executing the script, I loaded the shapefile into HDFS using hadoop CLI commands:

$ hadoop fs -mkdir geo
$ hadoop fs -put cntry06.shp geo
$ haddop fs -put cntry06.dbf geo

I generated a million random points using AWK:

$ cat gendata.awk
BEGIN{
        OFS="\t"
        for(I=0;I < 1000000;I++){
                X=-180+360*rand()
                Y=-90+180*rand()
                print "L-"I,X,Y
        }
        exit
}

$ awk -f gendata.awk > data.tsv
$ hadoop fs -mkdir data
$ hadoop fs -put data.tsv data

The following is the Pig Latin script that solves the above stated problem; As you can see it is pretty self-explanatory:

$ cat script.pig

register '/Users/mraad_admin/JavaWorkspace/GeomX/out/GeomX.jar'
define pip com.esri.PIPShapefile('/user/mraad_admin/geo/cntry06');
A = LOAD 'data/data.tsv' AS (name:chararray,x:double,y:double);
B = FOREACH A GENERATE pip(x,y) AS P:int;
F = FILTER B BY P != -1;
G = GROUP F BY P;
C = FOREACH G GENERATE group,COUNT(F);
dump C;

Register the jar to be included in the MapReduce job jar. Define pip UDF with reference to the shape file in the HDFS. Load the data.tsv from HDFS with the described schema. Iterate over the data and generate a list of polygon ids based on x and y values. Keep all "found" polygon (id != -1), group them by the id value, and finally dump each id and its count. Pretty neat, no ?  To execute the script:

$ pig -f script.pig

Now, what to do with all these outputted tuples ? Goodie #2; Execute Pig on the server side and send the output to a client side app for visualization. Using Spring Data Hadoop, we can invoke a Pig script using a Pig Server instance and using Spring Flex expose it as a service for Remote Object invocation.
Here is the PigService implementation:

package com.esri;

@Service("pigService")
@RemotingDestination
public final class PigService
{
    @Autowired
    public PigServerFactoryBean m_pigServerFactoryBean;

    public List run(final String load, final String funcArg) throws Exception
    {
        final List list = new ArrayList();
        final PigServer pigServer = m_pigServerFactoryBean.getObject();
        try
        {
            pigServer.registerJar("/Users/mraad_admin/JavaWorkspace/GeomX/out/GeomX.jar");

            pigServer.registerFunction("pip", new FuncSpec("com.esri.PIPShapefile", funcArg));

            pigServer.registerQuery("A = load '" + load + "' as (name:chararray,x:double,y:double);");
            pigServer.registerQuery("B = foreach A generate pip(x,y) as P:int;");
            pigServer.registerQuery("F = FILTER B BY P != -1;");
            pigServer.registerQuery("G = GROUP F BY P;");
            pigServer.registerQuery("C = FOREACH G GENERATE group,COUNT(F);");

            final Iterator tupleIterator = pigServer.openIterator("C");
            while (tupleIterator.hasNext())
            {
                final Tuple tuple = tupleIterator.next();
                final ASObject asObject = new ASObject();
                asObject.put("index", tuple.get(0));
                asObject.put("count", tuple.get(1));
                list.add(asObject);
            }
        }
        finally
        {
            pigServer.shutdown();
        }
        return list;
    }
}

Notice how the Pig Script is broken down by registering the jar, the function, the queries and all are executed when openIterator is invoked, resulting in an iterator that enables me to convert the tuples into AS3Objects ready for AMF transfer to the client.

Final step; Invoke, fuse and visualize. The Flex API for ArcGIS is a great tool. In this post's implementation, I am taking advantage of one of the enhancements in the latest Flash Player (11.4); Workers. Finally, we have "threads" in the Flash Player! Take a look at Lee Brimelow video tutorial for a great description of worker usage. The application enables the user to load a local shapefile.  I am using a worker to parse the shapefile that was loaded into the distributed cache and convert each shape into a Graphic instance for visualization. The shapefile is a bit over 3.5MB, and in the "old" days that would have spun up the "beach ball of death" on my mac while parsing this size shapefile.  Now....no beach ball and the UI is still responsive. That was goodie #3. I had to play a bit of "games" by creating separate projects to enable the creation of the worker swf and the common code.  All this should be resolved in theory with the release of the Flash Builder 4.7. The Flex application enables the user to invoke the PigService and the resulting tuples are merged into the loaded feature attribute for thematic rendering. Goodie #4, ThematicFillSymbol is a custom symbol with full 180 warp-around polygon rendering.  It accepts as parameters the min/max count value and the min/max color value and fills each polygon proportionally to the count attribute value.

Well, that was a lot of stuff.  If you stayed with me all the way till here....thank you. Like usual, you can download all the source code of the server from here and the client from here.

No comments: