Tuesday, August 28, 2012

Big Data,Spatial Pig,Threaded Visualization

This post is PACKED with goodies - One of the ways to analyze large sets of data in the Hadoop File System without writing MapReduce jobs is to use Apache Pig. I highly recommend that you read Programming Pig, in addition to the online documentation. Pig Latin, the scripting language of Pig, is easy to understand, write and more importantly to extend. Since we do spatial stuff, the first goodie extends Pig Latin with a spatial function when analyzing data from HDFS. Here is the problem I posed to myself, given a very large set of records containing an X and Y field, and given a set of polygons, I want to produce a set of tuples containing the polygon id and the number of X/Y records in that polygon. Nothing that a GP/Py task cannot do, but needed the exercise and BTW, you can call Pig from Python (post for another day).
Pig, when executing in MapReduce mode, converts the Pig Latin script that you give it into a MapReduce job jar that it submits to Hadoop. You can register additional jars that get packaged into the job jar and define UDFs (User Defined Function) to extend Pig Latin with new capabilities. The UDF that I wrote returns the polygon identifier that contains a given X and Y values. The polygon set is read by the UDF at startup from a shapefile that is loaded into the Distributed Cache. The polygons are spatially indexed for fast query and PIP (Point In Polygon) operation during tuple (x,y) evaluation. The Evaluator extends the org.apache.pig.EvalFunc class and is constructed with a reference to the shapefile path in the Hadoop file system.

    public PIPShapefile(final String path)
    {
        m_path = path; // Save reference as field variable.
    }

A mapping of the referenced shapefile to an internal name should be defined. The book Programming Pig dwells in detail on this.

    @Override
    public List getCacheFiles()
    {
        final List list = new ArrayList(1);
        list.add(m_path + ".shp#file.shp"); // Note the #
        return list;
    }

Since the evaluator return the polygon identifier, we tell the function to return an integer.

    @Override
    public Schema outputSchema(Schema input)
    {
        return new Schema(new Schema.FieldSchema(null, DataType.INTEGER));
    }

Finally, we implement the exec method that will be executed on every input tuple. This is a quick and dirty PIP implementation using JTS (leaving to the reader as an exercise to expand and elaborate :-)

    @Override
    public Integer exec(final Tuple tuple) throws IOException
    {
        m_init.init();
        int rc = -1;
        if (tuple != null && tuple.size() > 1)
        {
            double minArea = Double.POSITIVE_INFINITY;
            final Double x = (Double) tuple.get(0);
            final Double y = (Double) tuple.get(1);
            // log.info(x + " " + y);
            m_envelope.init(x - OFFSET, x + OFFSET, y - OFFSET, y + OFFSET);
            final List list = m_spatialIndex.query(m_envelope);
            for (final Feature feature : list)
            {
                if (feature.envelope.covers(x, y))
                {
                    final double area = feature.envelope.getArea();
                    if (area < minArea)
                    {
                        minArea = area;
                        rc = feature.number;
                    }
                }
            }
        }
        return rc;
    }

Note the m_init.init() invocation, this is because there is no initialization entry point in the lifecycle of a UDF. So the m_init is a reference to an implementation that mutates after the first time through from a shape file loader to an empty function. I used the geotools library to load and spatially index the features for later processing.

    private interface IInit
    {
        void init() throws IOException;
    }
    
    private final class FTTInit implements IInit
    {
        public void init() throws IOException
        {
            m_spatialIndex = new STRtree();
            final ShapefileReader shapefileReader = new ShapefileReader(new ShpFiles("file.shp"), true, true, m_geometryFactory);
            try
            {
                shapefileReader.disableShxUsage();
                while (shapefileReader.hasNext())
                {
                    final ShapefileReader.Record record = shapefileReader.nextRecord();
                    final Envelope envelope = new Envelope(record.envelope());
                    final Feature feature = new Feature();
                    feature.envelope = envelope;
                    feature.number = record.number;
                    m_spatialIndex.insert(envelope, feature);
                }
            }
            finally
            {
                shapefileReader.close();
            }
            m_init = new NoopInit(); // NOOP after first time through !!
        }
    }

    private final class NoopInit implements IInit
    {
        public void init()
        {
        }
    }

Compile the class using the following jars and package all into a jar to be registered with Pig.

gt-api-9-SNAPSHOT.jar (include)
gt-data-9-SNAPSHOT.jar (include)
gt-main-9-SNAPSHOT.jar (include)
gt-metadata-9-SNAPSHOT.jar (include)
gt-opengis-9-SNAPSHOT.jar (include)
gt-shapefile-9-SNAPSHOT.jar (include)
jts-1.13.jar (include)
pig-0.10.0.jar (exclude)

You can download the source code for PIPShapefile.java from here.

Before executing the script, I loaded the shapefile into HDFS using hadoop CLI commands:

$ hadoop fs -mkdir geo
$ hadoop fs -put cntry06.shp geo
$ haddop fs -put cntry06.dbf geo

I generated a million random points using AWK:

$ cat gendata.awk
BEGIN{
        OFS="\t"
        for(I=0;I < 1000000;I++){
                X=-180+360*rand()
                Y=-90+180*rand()
                print "L-"I,X,Y
        }
        exit
}

$ awk -f gendata.awk > data.tsv
$ hadoop fs -mkdir data
$ hadoop fs -put data.tsv data

The following is the Pig Latin script that solves the above stated problem; As you can see it is pretty self-explanatory:

$ cat script.pig

register '/Users/mraad_admin/JavaWorkspace/GeomX/out/GeomX.jar'
define pip com.esri.PIPShapefile('/user/mraad_admin/geo/cntry06');
A = LOAD 'data/data.tsv' AS (name:chararray,x:double,y:double);
B = FOREACH A GENERATE pip(x,y) AS P:int;
F = FILTER B BY P != -1;
G = GROUP F BY P;
C = FOREACH G GENERATE group,COUNT(F);
dump C;

Register the jar to be included in the MapReduce job jar. Define pip UDF with reference to the shape file in the HDFS. Load the data.tsv from HDFS with the described schema. Iterate over the data and generate a list of polygon ids based on x and y values. Keep all "found" polygon (id != -1), group them by the id value, and finally dump each id and its count. Pretty neat, no ?  To execute the script:

$ pig -f script.pig

Now, what to do with all these outputted tuples ? Goodie #2; Execute Pig on the server side and send the output to a client side app for visualization. Using Spring Data Hadoop, we can invoke a Pig script using a Pig Server instance and using Spring Flex expose it as a service for Remote Object invocation.
Here is the PigService implementation:

package com.esri;

@Service("pigService")
@RemotingDestination
public final class PigService
{
    @Autowired
    public PigServerFactoryBean m_pigServerFactoryBean;

    public List run(final String load, final String funcArg) throws Exception
    {
        final List list = new ArrayList();
        final PigServer pigServer = m_pigServerFactoryBean.getObject();
        try
        {
            pigServer.registerJar("/Users/mraad_admin/JavaWorkspace/GeomX/out/GeomX.jar");

            pigServer.registerFunction("pip", new FuncSpec("com.esri.PIPShapefile", funcArg));

            pigServer.registerQuery("A = load '" + load + "' as (name:chararray,x:double,y:double);");
            pigServer.registerQuery("B = foreach A generate pip(x,y) as P:int;");
            pigServer.registerQuery("F = FILTER B BY P != -1;");
            pigServer.registerQuery("G = GROUP F BY P;");
            pigServer.registerQuery("C = FOREACH G GENERATE group,COUNT(F);");

            final Iterator tupleIterator = pigServer.openIterator("C");
            while (tupleIterator.hasNext())
            {
                final Tuple tuple = tupleIterator.next();
                final ASObject asObject = new ASObject();
                asObject.put("index", tuple.get(0));
                asObject.put("count", tuple.get(1));
                list.add(asObject);
            }
        }
        finally
        {
            pigServer.shutdown();
        }
        return list;
    }
}

Notice how the Pig Script is broken down by registering the jar, the function, the queries and all are executed when openIterator is invoked, resulting in an iterator that enables me to convert the tuples into AS3Objects ready for AMF transfer to the client.

Final step; Invoke, fuse and visualize. The Flex API for ArcGIS is a great tool. In this post's implementation, I am taking advantage of one of the enhancements in the latest Flash Player (11.4); Workers. Finally, we have "threads" in the Flash Player! Take a look at Lee Brimelow video tutorial for a great description of worker usage. The application enables the user to load a local shapefile.  I am using a worker to parse the shapefile that was loaded into the distributed cache and convert each shape into a Graphic instance for visualization. The shapefile is a bit over 3.5MB, and in the "old" days that would have spun up the "beach ball of death" on my mac while parsing this size shapefile.  Now....no beach ball and the UI is still responsive. That was goodie #3. I had to play a bit of "games" by creating separate projects to enable the creation of the worker swf and the common code.  All this should be resolved in theory with the release of the Flash Builder 4.7. The Flex application enables the user to invoke the PigService and the resulting tuples are merged into the loaded feature attribute for thematic rendering. Goodie #4, ThematicFillSymbol is a custom symbol with full 180 warp-around polygon rendering.  It accepts as parameters the min/max count value and the min/max color value and fills each polygon proportionally to the count attribute value.

Well, that was a lot of stuff.  If you stayed with me all the way till here....thank you. Like usual, you can download all the source code of the server from here and the client from here.

Thursday, August 23, 2012

MongoDB + Spring + Mobile Flex API for ArcGIS = Harmonie


I've used MongoDB on a project for the City of Chicago with great success.  I was impressed with the fact that we can store JSON documents in one giant collection, scale horizontally by just adding new nodes, the plethora of language APIs (Java,AS3) that can talk to it, run MapReduce tasks, and my favorite is that you can create a true spatial index on a document property.  This is not some BTree index on a compounded x/y properties, but a true spatial index that can be used to perform spatial operation such as near and within.  Today, Mongo only supports point geometries, but I understand that they are working on storing and spatially indexing lines and polygons and enabling other spatial operations. I've experimented with great success in an intranet in consuming BSON object directly from Mongo into a Flex based application, but the direct socket connection is a problem in a web or mobile environment. In addition, what I wanted was some middle tier that can turn my BSON documents into a full fledge Flex AGS Graphic instance with as little impedance as possible. When we wrote the Flex API for ArcGIS, we annotated key classes with "RemoteClass" to enable direct AMF streaming from ArcGIS Server at the SoC level.  What that means, is that there is no impedance mismatch between the object types created on the server with the object types consumed on the client. Some of these annotated classes are, FeatureSet, Graphic, MapPoint, SpatialReference. Whenever the words middle tier and AMF are together, you have to think BlazeDS with its RemoteObject capabilities on POJO and whenever you say POJO, the SpringFramework is by far the best at handling POJOs. Lucky for me, Under the Spring Framework umbrella exist two projects that the marriage of the two will create beautiful harmony; the Spring BlazeDS Integration and the Spring Data for MongoDB.  The BlazeDS Integration will handle the exposure of annotated POJOs as remote services that can be consumed from Flex and using IoC, these POJOs are injected with MongoDB templates for data access.  In today's world, client applications have to be of mobile nature and my experiment to fuze all these entities is no exception. So, I've written a simple Flex mobile application, that declares a RemoteObject with an http based endpoint to a server side registered POJO that exposes a "near" operation. On the client, the user can tap a location on the map.  The location is an argument to the "near" server side function, that invokes on MongoDB the "near" operation on a collection of tweet documents. The found tweet documents are converted to Graphic instances and returned back to the Flex application as part of a FeatureSet. The features in the FeatureSet are assigned to the graphicProvider property of a GraphicLayer, thus rendering these features on the map.  Just for coolness, I placed a switch toggle as a map control to enable/disable the clustering of the returned features. The key glue to this fusion is the mongo mapping converter implementation that converts BSON object to AGS features.


public class FeatureReadConverter implements Converter
{
    public Feature convert(final DBObject dbo)
    {
        final List shape = (List) dbo.removeField("shape");
        final Double lon = shape.get(0);
        final Double lat = shape.get(1);
        final MapPoint mapPoint = new MapPoint(
           WebMercator.longitudeToX(lon),
           WebMercator.latitudeToY(lat));
        final ASObject attributes = new ASObject();
        for (final String key : dbo.keySet())
        {
            attributes.put(key, dbo.get(key));
        }
        return new Feature(mapPoint, attributes);
    }
}


Here is a snapshot of the application in action from my iPhone !


And like usual, you can download the client source code from here, and the server side code from here.
Disclosure - The current flaring does not work on mobile using the released API unless you include the mx swc :-( this has been fix for the next release (don't ask....soon :-)

Wednesday, August 1, 2012

Big Data, Small Data, Big Visualization


Ever since I became a Cloudera Certified Developer for Apache Hadoop, I've been walking around with a hammer written on it "Map Reduce" looking for Big Data nails to pound.  Finally, a real world problem from a customer came to my attention where a Hadoop implementation will solve his dilemma. Given a 250GB (I know, I know, this is _not_ big) CSV data set of demographic data consisting of gender, race, age, income and of course location, and given a set of Point of Interest locations, generate a 50 mile heatmap for each demographic attribute for each the POI locations.
Using the "traditional" GeoProcessing with Python would take way more than a couple of days to run and would generate over 850GB of raster data. What do I mean by the "traditional" way ? You load the CSV data into a GeoDatabase and then you write an ArcPy script that; for each location, generate a 50 mile buffer.  Cookie cut the demographic data based on an attribute using the buffer and pass that feature set to the statistical package for density analysis which generates a raster file. Sure, you can manually partition the process onto acquired high CPU machines, but as I said, all that has to be done manually and will still take days.

There gotta to be a better way!

Enter Hadoop and a "different" way to process the data by taking advantage of:
- Hadoop File System fast splittable input streaming
- Distributed nature of Hadoop map reduce parts
- Distributed cache for "join" data
- External Fast java-based computational geometry library
- Producing vectors data rather than raster images

The last advantage is very important. This is something I call "Cooperative processing". See, people forget that there is a CPU/GPU on their client machines. If the server can producer vector data and we let the client render that data based on its capabilities, we will have a way more expressive application and the size of the data is way smaller. Will explain that in a bit.

Let me go back to the data processing. Actually, there is nothing to do.  There is no need for a transform and load process, as the CSV data can be directly placed onto an HDFS folder. The hadoop job will take as input the HDFS folder.

The Mapper Task - After instantiation, the 'configure' method is invoked to load the POI locations from the distributed cache and a 50 mile buffer is generated for each POI location using the fast computational geometry library, whereupon the buffer polygons are stored in an memory-based spatial index for fast intersection look up. The 'map' method is invoked on every record in the 250GB CSV input, where each record is tokenized for coordinates and demographic values. Using the coordinates and the prebuilt spatial index, we can find the associated POI locations.  Each 50 mile buffer is logically divided into kernel cells. Knowing the POI location, we can determine mathematically the relative kernel cell. We emit as a map key the combination of the POI location and the demographic value, and we emit the relative kernel cell as a map value.

map(k1,v1) - list(k2,v2)
where:
k1 = lineno
v1 = csv text
k2 = POI location,demographic
v2 = cellx,celly,1

Again, taking advantage of the powerful shuffle and sort capability of Hadoop on the POI Location/demographic key, I am ensuring that a reduce task will receive all the cells for a POI location/demographic combination.

The Reduce Task - For a POI location/demographic, the reduce method is invoke with the list of its associated cells. Cells with the same cellx,celly values are aggregated to produce a new list. We compose a JSON document of the new list and we emit the string representation of the JSON document using a custom output formatter onto which we override the 'generateFileNameForKeyValue' method to return something of the form "poi-location_demographic.json".

reduce(k2,list(v2)) - list(k3,v3)
where:
k2: POI location, demographic
v2: cellx, cellx, 1
k3: POI location, demographic
v3: JSON Text

I was able to validate my progress by invoking MRUnit on my codebase to ensure the soundness of my logic.

I packaged my map/reduce code and the geometry library into a jar, and I was ready to test it on the 250GB CSV.

But where to run this ?

Enter Amazon Elastic MapReduce. With a virtual swipe of a credit card, I was able to start up 10 large instances passing it a reference to my data and my jar in S3. 30 minutes later, a set of JSON files where produced in S3 occupying 238MB of space! Pretty cool, eh ? Compare that to days of execution time, and 850GB of rasters.  What is even more exciting, after a set of trials and errors and density kernel adjustments, I looked up my account balance and I owed Amazon $37.67 (will cost more to process a reimbursement request) !

Next comes the fun part, how to represent this JSON data for a particular POI location/demographic? Enter the Flex API for ArcGIS with its amazing extensibility and the flash player with its vector graphic and bitmap GPU-enhanced capabilities. See, by using the gradient filling of a drawn circle and the screen blending mode when placing that circle onto a bitmap, a set of close points will dissolve into a heatpoint. So, by taking advantage of this collaborative process between the server and the client, where the server generates a weighted point and let the client rasterize that point based on its weight, you get an expressive dynamic application. Let's push the visualization further to the coolest platform....the iPad. Flex code can be cross-compiled to run natively on the an iOS device. Let's push a bit more....3D. Taking advantage to the Stage3D capability, the heatmap vector data can be downloaded at runtime and dynamically morphed into a heightmap and a texture that can be draped on that heightmap. And here is the result....I call it "Heatmap in the Cloud". You can download the pdf of this Esri UC 2012 presentation from here. Have fun.