Sunday, November 25, 2012

BigData: HDFS FeatureClass ETL and MapReduce GPTool


This post is dedicated to my Esri colleagues Ajit D. and Philip H. for their invaluable help.

This is work in progress - but I've put a good dent in it that I would like to share it with you.  In this post, we will go through a complete cycle, where from ArcMap, we will:

  • Export a FeatureClass to an HDFS folder
  • Register that folder as a Hive table
  • Run command line Hive queries
  • Execute Hive queries from ArcPy and show the results in ArcMap
  • Execute a MapReduce Job as a GP Tool
  • Import an HDFS Folder (result of MapReduce Job) as a FeatureClass

This post brings everything that I have been blogging about so far into a nice story, so here we go:

BTW - I am assuming that you have a Hadoop instance running somewhere and are familiar with ArcMap. You can download a Hadoop demo VM for local testing.

Download the ArcMap extension in this file and unzip its content into your ArcGIS\Desktop10.1\java\lib\ext folder - The jars have to be children of the ext folder.

Make sure to adjust the ArcGIS JVM using the JavaConfigTool.exe location in ArcGIS\Desktop10.1\bin:

Start ArcMap, create a new toolbox and add to it the Hadoop Tools - Check out this help for detailed information on managing toolboxes:

Add the world cities to ArcMap:

Let's export the world cities to HDFS:

This tool iterates over the input FeatureClass features and stores each feature in the specified HDFS output path. The output path content is text formatted and each feature is stored as a line in an Esri JSON text representation followed by a carriage return.
This enables us to continuously add new records from for example a streaming process such as Esri GeoEvent Server.

The metadata for that HDFS based FeatureClass is stored in an HDFS based 'metastore' for other processes to inspect - A better place would have been ZooKeeper - but that is a post for another day.

Here is a sample metadata:

{
"wkid": 4326,
"geometryType": 1,
"fields": [{
"name": "ObjectID",
"alias": "ObjectID",
"type": 6,
"length": 4
}, {
"name": "Shape",
"alias": "Shape",
"type": 7,
"length": 0
}, {
"name": "CITY_NAME",
"alias": "CITY_NAME",
"type": 4,
"length": 30
}, {
...
}, {
"name": "LABEL_FLAG",
"alias": "LABEL_FLAG",
"type": 1,
"length": 4
}]
}

The metastore contains a set of files where by convention the file name is the imported FeatureClass name followed by ".json". For example:

$ hadoop fs -cat hdfs://localhadoop:9000/user/mraad/metastore/worldcities.json

The GP import tool adds one more file to the metastore, a Hive script that you can execute from the Hive command line to create an external table referencing the HDFS FeatureClass. Again, by convention the script name is the name of the imported FeatureClass followed by ".hql". For example:

$ hadoop fs -cat hdfs://localhadoop:9000/user/mraad/metastore/worldcities.hql

You can "cat" the content of the script and you will notice the usage of the collection data type for the feature geometry and attribute representation. In addition, the serialization and deserialization (SerDe) from JSON is based on a Cloudera library found in the article 'Analyzing Twitter Data Using CDH'. You can download the jar from here.


ADD JAR hive-serdes-1.0-SNAPSHOT.jar;
CREATE EXTERNAL TABLE IF NOT EXISTS worldcities (
geometry STRUCT <x:DOUBLE,y:DOUBLE,spatialReference:STRUCT <wkid:INT>>,
attributes STRUCT <
CITY_NAME:STRING,
GMI_ADMIN:STRING,
ADMIN_NAME:STRING,
FIPS_CNTRY:STRING,
CNTRY_NAME:STRING,
STATUS:STRING,
POP_RANK:INT,
POP_CLASS:STRING,
PORT_ID:INT,
LABEL_FLAG:INT
>) ROW FORMAT SERDE 'com.cloudera.hive.serde.JSONSerDe'
LOCATION 'hdfs://localhadoop:9000/user/mraad_admin/worldcities'
TBLPROPERTIES ('wkid'='4326','type'='point');


Please note how tables can have properties - is this case, I added the wkid and geometry type.

Upon the execution of the above commands, you can now query the table. Here are some sample queries:

hive> select * from worldcities limit 10;
hive> select attributes.city_name,geometry.x,geometry.y from world cities where attributes.cntry_name='Lebanon';

Hive can be accessed using ArcPy through the thrift protocol - Here is a Toolbox that enables the user to draw a polygon as input and invoke Hive spatial UDF constraining the resulting FeatureSet to the world cities within the drawn polygon. Download the UDF jar from here, and place it and the hive-serde jar in the same location where you will start the hive server as follows:

$> hive --service hiveserver

Next, I wanted to demo the capability to run a MapReduce Job as a GP Tool.

Quick MapReduce recap, For the unix geeks:

$> cat input.txt | map | sort | reduce > output.txt

And for the "acedemics":

map(K1,V1) emit list(K2,V2)
shuffle/sort K2
reduce(K2,list(V2)) emit list(K3,V3)

This is fairly low level and requires explicit writing of the Mapper and Reducer Java classes.  This is not for your average GIS user.  But I can see a time when advanced users will write parameter driven MapReduce tools and share them with the community. This is all based on 'How to build custom geoprocessing tools'.

This simple MR tool takes as input the world cities HDFS FeatureClass and finds the "centroids" by country of all the cities with a specific population rank.

BTW, this can easily be written in HQL as follows:

select attributes.cntry_name as name,
avg(geometry.x) as x,
avg(geometry.y) as y,
count(attributes.cntry_name) as cnt
from worldcities
where attributes.pop_rank < 6
group by attributes.cntry_name
having cnt > 10;

The JobRunnerTool accepts as input:

  • A set of Hadoop properties
  • HDFS FeatureClass input path
  • HDFS FeatureClass output path
  • Metastore location
The mapper converts a JSON formatter input text line (V1) into a PointFeature and will emit its point geometry (V2) if it meets a filter criteria - in this case, a population rank that is less than a user defined value. The mapper output key (K2) is the country name. BTW, K1 is the line number.

The suffle/sort portion will ensure that each reducer will receive a country name (K2) as an input key and a list of geometry points (V2) as input values.

The reducer averages the coordinates, creates a PointFeature whose geometry is a point with values based on the averaged calculations.  The attributes will include the country name and the number of points used in the averaging. The reducer key output (K3) will be the JSON formatted text representation of the PointFeature and the output value (V3) will be NULL. thus producing an HDFS FeatureClass with its metadata in the metastore for further processing and inspection.

Lastly, we close the cycle by importing the HDFS Feature class.

The tool accepts as input an HDFS FeatureClass, its metadata and an output location. When executed within ArcMap, the output is automatically added to the display.

Things to work on next:
  • UDF that accepts as input an ArcMap generated FeatureSet into the DistributedCache - I already blogged about this, but as standalone.
  • MapReduceTool that accepts external jar containing mapper/reducer classes - I think this will pave the way for the advanced users.
Stay tuned for more things to come. And like usual, all the source code can be downloaded from here.

Friday, November 23, 2012

BigData: Cloudera Impala and ArcPy

So at the last Strata+Hadoop World, Cloudera introduces Impala- I downloaded the demo VM, and install the TPC-DS data set (read the impala_readme.txt once the VM starts up) and tested some of the queries. Was pretty fast and cool - As of this writing, UDFs and SerDes are missing from this beta release, so I cannot do my Spatial UDF operators, nor can I read JSON formatted HDFS record :-(
One of the setup tables was a customer_address table, though it was missing a lat/lon field, it included the standard address field. Will be cool to invoke an Impala query on that table and mach up the result in ArcMap using ArcPy. So, I downloaded and installed onto my Windows VM (now remember I work on a real machine, a MacBookPro :-) the ODBC driver and downloaded and installed pyodbc. Small thing, the documentation keeps talking about 'You must use the 32-bit version'. A bit of googling revealed that they are referring to the Odbcad32.exe file is located in the %systemdrive%\Windows\SysWoW64 folder.  The following is a simple GeoProcessing Python Toolbox that queries Impala, and the result set is converted into an in memory table that is appended to ArcMap's table of content.  You can join the table with a state layer and symbolize using quantile breaks the state polygons based on Impala's aggregated response.  I think this combination of BigData in HDFS converted instantly into "SmallData" for rendering and further processing in ArcMap is a great marriage - Looking forward to the next release of Impala with UDFs.

import arcpy
import pyodbc

class Toolbox(object):
    def __init__(self):
        self.label = "Toolbox"
        self.alias = "Toolbox"
        self.tools = [QueryImpala]

class QueryImpala(object):
    def __init__(self):
        self.label = "Query Impala"
        self.description = "Query Impala"
        self.canRunInBackground = False
    
    def getParameterInfo(self):
        paramtab = arcpy.Parameter("ImpalaTable", "Impala Table", "Output", "Table", "Derived")
        return [paramtab]
    
    def isLicensed(self):
        return True
    
    def updateParameters(self, parameters):
        return
    
    def updateMessages(self, parameters):
        return
    
    def execute(self, parameters, messages):
        tab = "in_memory/ImpalaTable"
        if arcpy.Exists(tab): arcpy.management.Delete(tab)
        arcpy.management.CreateTable("in_memory","ImpalaTable")
        arcpy.management.AddField(tab, "STATE", "TEXT")
        arcpy.management.AddField(tab, "COUNT", "LONG")
        
        connection = pyodbc.connect('DSN=HiveODBC')
        cursor = connection.cursor()
        cursor.execute("""
select ca_state as state,count(ca_state) as cnt
from customer_address
group by ca_state
""")
        with arcpy.da.InsertCursor(tab, ['STATE', 'COUNT']) as insert:
            rows = cursor.fetchall()
            for row in rows:
                insert.insertRow([row.state,row.cnt])
            del row
            del rows
        del insert
        del cursor
        del connection
        parameters[0].value = tab
        return

Monday, October 8, 2012

Streaming Big Data For Heatmap Visualization using Storm


A record number of tweets was set during the 2012 Presidential debate. If you wondered how technologically this happened, then Event Stream Processing is your answer.
Actually, Twitter open sourced such an implementation called Storm. Pretty impressive piece of technology! So, I wanted to try it out with a "geo" twist.
To get started, I recommend that you read "Getting Started with Storm".  Here is the challenge at hand, simulate a stream of aircraft flight track targets, in such that a heatmap is generated based on the density of targets at "near" locations.
The topology is very simple in this challenge, a spout reading the targets from an external source (to be defined later) is linked to a bolt that overlays a virtual grid on top of an area of interest.  Each emitted target tuple is mapped to a cell in the grid and the cell weight is decreased or increased when the target leaves or enters a cell.
Storm is a distributed system, that means that spouts and bolts can run on different physical nodes. The grid in the bolt is implemented as a map and since the bolt can to be distributed, then the map implementation has to be distributed too. Enter Hazelcast, it is a "clustering and highly scalable data distribution platform for Java". In addition to maps, a distributed queue implementation is also available.  The spouts (note the plural) use the latter to poll the offered targets and emit them to bolts (plural again :-) to update the density map grid. Here is the source code of the spout and the bolt.
The continuously running topology in this challenge is fronted with a web application container that "feeds" the spout and periodically accumulates the set of non-zero weighted cells from the grid for web client visualization.
The Spring web container is used in this implementation due to the Flex Integration and many many other reasons that I will not go into this post.
The Spring application context, defines:

  •  A bean component scanner
  •  A task scheduler
  •  A Hazelcast instance
  •  A flex message broker
  •  A flex message destination
  •  A Spring message template

Flight tracks are simulated by time stepping the parameters of cubic hermite spline that is bound by starting and ending locations and vectors. A scheduled bean iterates over the set of track parameters and offers to the Hazelcast queue the current position along that track at the current timestamp in the form of a Target instance. When the target reaches the end of the track, the latter is removed from the simulation set.  In addition, that scheduled bean, gets from the Hazelcast grid map the set of non-zero weighted cells and accumulates them in the form of a list that is sent back to the client using the injected message template thought the a message broker.
Onto the client. Upon startup completion, the application subscribes to the message destination and adds a message listener to handle the list of weighed cells. This list is bound to a DensityMapLayer instance that converts the cells in the list into gradient fillings on a bitmap. This enables a very fast and fluid rendering of a large set of data. The overlay of gradient fillings is what generates a heatmap. In addition, a UI Button is added to the stage enabling the user to define random starting and ending locations and vectors that are sent to the Spring container as seed to the Scheduled bean track/target simulator.
I know that this is very technical and I am assuming that you have some background in the above mentioned technologies - I just wanted to show how they can be integrated together seamlessly for BigData streaming - like usual, all the source code is available here.

Monday, September 24, 2012

Processing Big Data with Apache Hive and Esri ArcPy


Data Scientists, if you are processing and analyzing spatial data and are using Python, then ArcPy should be included in your arsenal of tools and ArcMap should be utilized for geo spatial data visualization.  Following the last post where I extended Apache Hive with spatial User Defined Functions (UDFs), in this post I will demonstrate the usage of the "extended" Hive within Python and how to save the output into a feature class for rendering within ArcMap or any ArcWeb client using ArcGIS Server.

Given a running Hadoop instance and assuming that you have installed Hive and have created a Hive table as described in the last post, start the Hive Thrift server as follows:

$ hive --service hiveserver

When ArcGIS for Desktop is installed on a host, Python is optionally installed and is enabled with GeoProcessing capabilities. Install Hive on your desktop and set the environment variable HIVE_HOME to the location where Hive is residing. To access the Hive python libraries, export the environment variable PYTHONPATH with its value set to $HIVE_HOME/lib/py.

With the setup behind us, let's tackle a simple use case; Given a polygon feature class on the desktop and a set of points stored in the Hadoop File System and are exposed through a Hive table, I want to perform a point in polygon operation on Hadoop and update the local feature class polygon attributes with the return results.

Here is the Python script:

import arcpy;
import sys

from arcpy import env

from hive_service import ThriftHive
from hive_service.ttypes import HiveServerException
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol

env.overwriteOutput = True

try:    
    transport = TSocket.TSocket('localhost', 10000)
    transport = TTransport.TBufferedTransport(transport)
    protocol = TBinaryProtocol.TBinaryProtocol(transport)
    client = ThriftHive.Client(protocol)
    transport.open()

    client.execute("add file countries.shp")
    client.execute("add file countries.shx")
    client.execute("add jar GeomX.jar")
    client.execute("create temporary function pip as 'com.esri.GenericUDFPip'")

    client.execute("""
    select t.p as fid,count(t.p) as val
    from (select pip(x,y,'./countries.shp') as p from cities) t
    where p!=-1 group by t.p
    """)
    rows = client.fetchAll()
    transport.close()
    
    keyval = dict()

    for row in rows:
        tokens = row.split()
        key = int(tokens[0])
        val = int(tokens[1])
        keyval[key] = val
    del row
    del rows

    rows = arcpy.UpdateCursor("countries.shp")
    for row in rows:
        key = row.FID
        if key in keyval:
            row.HADOOP = keyval[key]
            rows.updateRow(row)
    del row
    del rows

except Thrift.TException, tx:
    print '%s' % (tx.message)

The script imports the Thrift Hive client and the ArcPy library. It then connects to the Thrift Hive server on the localhost and executes a set of setup operations. The first two add the countries shapefile geometry and spatial index files into the distributed cache.  The next setup adds the jar file containing the spatial UDF functions. The last setup defines the pip function with a reference to the class in the loaded jar. The select statement is executed to retrieve the country identifier and the number of cities in that country based on a nest select who uses the pip function to identify which city point falls into which country polygon. An fid with a value of -1 is returned if a pip result is not found and is excluded from the final group count.  The fetchAll function returns a list of text items, where each text item is an fid value followed by a tab then a count value.  A dictionary is populated by tokenizing the list where the dictionary key is the fid and the value is the count.  An arcpy update cursor is opened on the local countries feature class and a row iterator is executed.  for each row, the FID value is retrieved and checked if it exists as a dictionary key. If found, the attribute HADOOP field is updated with the dictionary value.

Upon a successful execution (and remember, this might take a while as Hive is a batch process), open ArcMap, load that feature class and symbolize it with a class break qualifier based on the HADOOP field values.

Pretty cool, no?  This is a very very simple example of the marriage of a BigData tool and a GIS tool using Python.  There is so much more that can be done using this combination of tools in the same thought process. Expect more posts along the same vein with more arcpy usage. I just wanted to plant a small seed in your mind.

Update: This is another example that calculates the average lat/lon values of cities per country in Hive and the result set in used to create a point feature class:


import arcpy, sys, os

from arcpy import env

from hive_service import ThriftHive
from hive_service.ttypes import HiveServerException
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol

env.overwriteOutput = True

try:
  prjFile = os.path.join(arcpy.GetInstallInfo()["InstallDir"],
        r"Coordinate Systems\Geographic Coordinate Systems\World\WGS 1984.prj")
  spatialRef = arcpy.SpatialReference(prjFile)

  tempWS = "in_memory"
  tempFS = "XY_FeatureClass"

  arcpy.CreateFeatureclass_management(tempWS, tempFS , "POINT", "","","", spatialRef)

  tempFC = os.path.join(tempWS, tempFS)

  arcpy.AddField_management(tempFC, "country", "TEXT", 0, 0, 8)
  
  transport = TSocket.TSocket('10.128.249.8', 10000)
  transport = TTransport.TBufferedTransport(transport)
  protocol = TBinaryProtocol.TBinaryProtocol(transport)
  client = ThriftHive.Client(protocol)
  transport.open()

  client.execute("add jar /Users/mraad_admin/JavaWorkspace/GeomX/GeomX.jar")

  client.execute("""
    select country,avg(x),avg(y)
    from cities
    group by country
    """)
  rows = client.fetchAll()
  transport.close()
  
  inCur = arcpy.InsertCursor(tempFC)
  for row in rows:
    tokens = row.split()

    country = tokens[0]
    avgx = float(tokens[1])
    avgy = float(tokens[2])

    feat = inCur.newRow()
    feat.Shape = arcpy.Point(avgx, avgy)
    feat.country = country
    inCur.insertRow(feat)

  del inCur, rows, row

  arcpy.CopyFeatures_management(tempFC, r"Z:\Sites\XYpoints")
  
except Thrift.TException, tx:
  print '%s' % (tx.message)


Monday, September 17, 2012

Big Data, Spatial Hive, Sequence Files


Following the last post, where we used Pig to analyze data stored in HDFS, in this post we will be using Hive and spatially enabling it for geo analysis. Hive enable you to write SQL like statements in a language called HiveQL that Hive converts to a MapReduce job that is submitted to Hadoop for execution. Again, if you know SQL, then learning HiveQL is very easy and intuitive.  Hive is not intended for OLTP and real-time analysis. Like Pig, Hive is extensible via User Defined Functions (UDFs), so we will be using almost the same functions as in the previous post to find things that are near and/or within some criteria.

There are several ways to store data in HDFS, one of them is in the SequenceFile format. This is a key/value binary format with compression capabilities. For this post, I will be transforming an input into a well know binary format to be stored onto HDFS for later query and analysis.

An object that required persistence onto a SequenceFile has to implement the Writable interface. So, here we go, since we deal with spatial features, let's declare a Feature class that implements the Writable interface:


public class Feature implements Writable
{
    public IGeometry geometry;
    public ISymbol symbol = NoopSymbol.NOOP;

    public Feature()
    {
    }

    public void write(final DataOutput dataOutput) throws IOException
    {
        geometry.write(dataOutput);
        symbol.write(dataOutput);
    }

    public void readFields(final DataInput dataInput) throws IOException
    {
        geometry.readFields(dataInput);
        symbol.readFields(dataInput);
    }
}

A Feature has a geometry and a symbol. A geometry is also Writable:


public interface IGeometry extends Writable
{
}

An implementation of the geometry interface is a two dimensional MapPoint:


public class MapPoint implements IGeometry
{
    public double x;
    public double y;

    public MapPoint()
    {
    }

    public void write(final DataOutput dataOutput) throws IOException
    {
        dataOutput.writeDouble(x);
        dataOutput.writeDouble(y);
    }

    public void readFields(final DataInput dataInput) throws IOException
    {
        x = dataInput.readDouble();
        y = dataInput.readDouble();
    }
}

For now, feature will have an no operation (noop) symbol associated with them:


public class NoopSymbol implements ISymbol
{
    public final static ISymbol NOOP = new NoopSymbol();

    public NoopSymbol()
    {
    }

    public void write(final DataOutput dataOutput) throws IOException
    {
    }

    public void readFields(final DataInput dataInput) throws IOException
    {
    }
}

The input source that I wanted to test in my ETL is a set of cities (cities1000) in TSV format downloaded from geonames. The Writable object to read and write from a SequenceFile is a City class that extends a Feature and is augmented with attributes.


public class City extends Feature
{
    public int cityId;
    public String name;
    public String country;
    public int population;
    public int elevation;
    public String timeZone;

    @Override
    public void write(final DataOutput dataOutput) throws IOException
    {
        super.write(dataOutput);
        dataOutput.writeInt(cityId);
        dataOutput.writeUTF(name);
        dataOutput.writeUTF(country);
        dataOutput.writeInt(population);
        dataOutput.writeInt(elevation);
        dataOutput.writeUTF(timeZone);
    }

    @Override
    public void readFields(final DataInput dataInput) throws IOException
    {
        geometry = MapPoint.newMapPoint(dataInput);
        symbol = NoopSymbol.NOOP;
        cityId = dataInput.readInt();
        name = dataInput.readUTF();
        country = dataInput.readUTF();
        population = dataInput.readInt();
        elevation = dataInput.readInt();
        timeZone = dataInput.readUTF();
    }
}

Using Hadoop's command line interface, I prepared a working directory to load the cities into HDFS:


$ hadoop fs -mkdir cities

Next, I wrote and executed a Java program using the opencsv library to extract, transform and load the TSV into SequenceFile records onto HDFS.

I highly recommend that you read Hadoop In Action. It has a nice introduction to installing and running Hive. Remember, Hive operates on SQL-like statements, so to operate on the loaded City data, we create a table that maps to the City object. From the Hive command line interface, we execute the following command:


hive> create external table cities(
 x double,
 y double,
 cityid int,
 name  string,
 country string,
 population int,
 elevation int,
 timezone string
 ) row format serde 'com.esri.CitySerDe'
 stored as sequencefile location '/user/mraad_admin/cities';

If you know SQL, this should be familiar. But note the last two lines;  It instructs Hive to read the data in a SequenceFile format from an HDFS location that we previously prepared and since the data is in a binary format, each row is serialized and deserialized using a helper SerDe class.
The CitySerDe class knows how to serialize and deserialize a writable object from the input and output stream into and from a concrete City class instance. In addition, it provides column metadata such as the column name and the type to Hive. The SerDe is compiled and packaged into a jar that is added to the hive runtime for usage:


hive> add jar /Users/mraad_admin/JavaWorkspace/GeomX/GeomX.jar;

Added /Users/mraad_admin/JavaWorkspace/GeomX/GeomX.jar to class path
Added resource: /Users/mraad_admin/JavaWorkspace/GeomX/GeomX.jar

hive> show tables;
OK
cities
Time taken: 3.98 seconds

hive> describe cities;
OK
x double from deserializer
y double from deserializer
cityid int from deserializer
name string from deserializer
country string from deserializer
population int from deserializer
elevation int from deserializer
timezone string from deserializer
Time taken: 0.434 seconds

hive> select * from cities limit 5;
OK
1.49129 42.46372 3039163 Sant Julia de Loria AD 8022 0 Europe/Andorra
1.73361 42.54277 3039604 Pas de la Casa AD 2363 2050 Europe/Andorra
1.53319 42.55623 3039678 Ordino AD 3066 0 Europe/Andorra
1.53414 42.50729 3040051 les Escaldes AD 15853 0 Europe/Andorra
1.51483 42.54499 3040132 la Massana AD 7211 0 Europe/Andorra
Time taken: 0.108 seconds

Like I said, If you know SQL, you can find the top 5 countries with the most cities by issuing the following statement - no need to write MR jobs:


hive> select country,count(country) as c from cities group by country order by c desc limit 5;

Onto spatial.  Hive is extensible via User Defined Functions (UDFs). So I wanted to find all the cities that are near a specific location, I extend hive with a 'near' function that was packaged in the added jar and defined it as follows:


hive> create temporary function near as 'com.esri.UDFNear';

I can now use the 'near' function to locate cities within 5 miles of a specific location:


hive> select name from cities where near(x,y,-84.20299,39.43534,5);

The UDFNear function extends the UDF class and implements in this case the Haversine distance calculation between two geographical locations.


public class UDFNear extends UDF
{
    private final static BooleanWritable TRUE = new BooleanWritable(true);
    private final static BooleanWritable FALSE = new BooleanWritable(false);

    public BooleanWritable evaluate(
            DoubleWritable x1, DoubleWritable y1,
            DoubleWritable x2, DoubleWritable y2,
            DoubleWritable distance
    )
    {
        return HaversineMiles.distance(y1.get(), x1.get(), y2.get(), x1.get()) < distance.get() ? TRUE : FALSE;
    }

    public boolean evaluate(
            double x1, double y1,
            double x2, double y2,
            double distance
    )
    {
        return HaversineMiles.distance(y1, x1, y2, x2) < distance;
    }
}

Let's assume that the field 'country' was not defined in the table, and I want to find the count of cities per country where I only have the spatial boundaries of the countries.   This is a perfect spatial join and where UDF, DistributedCache, and spatial libraries like JTS and geotools come to the rescue.

I extended the GenericUDF class with a GenericUDFPip class that performs a 'naive' point-in-polygon (pip) based on geometries loaded into the distributed cache.  This enabled me to write a spatial query as follows:


hive> add file borders.shp;
hive> add file borders.shx;
hive> create temporary function pip as 'com.esri.GenericUDFPip';
hive> select t.p,count(t.p) from (select pip(x,y,'./borders.shp') as p from cities) t where t.p != -1 group by t.p;

The first two lines load into the distributed cache the countries borders shape file and its spatial index - these will be used by the pip function first time through to create an in-memory spatial index for fast searching. The pip function is defined as a class in the previously added jar file. The pip function expects 3 arguments; the first is a longitude, the second is a latitude and the third is the shape file location in the distributed cache. Based on these arguments, it will return the country border record identifier where the longitude and latitude arguments fall into or a -1 if there is no intersection.  For the query, a nested table is first created based on the pip result, which is then grouped and aggregated based on a non-negative border identifier.
Pretty cool - no ? So imagine what else you could do with HQL and these libraries...Say find me the top 10 cities with the most surrounding cities in a 25 mile radius (exercise for the reader. Hint, use join and look at the source code for UDFDist :-)

The fun is not about to stop. Since this is SQL-Like, Hive comes with a JDBC driver. Using my favorite Java framework, Spring-Hadoop integrates with Hive to become a JDBC client.

First make sure to start Hive as a service:


$ hive --service hiverserver

Next, define a Spring application context as follows:



   
          destroy-method="close"
          p:driverClassName="org.apache.hadoop.hive.jdbc.HiveDriver"
          p:url="jdbc:hive://localhost:10000/default"
          p:connectionInitSqls-ref="initSqls">
   

   
        add jar /Users/mraad_admin/JavaWorkspace/GeomX/GeomX.jar
        create temporary function near as 'com.esri.UDFNear'
        create temporary function dist as 'com.esri.UDFDist'
   

   
          c:dataSource-ref="hive-ds"/>


A Hive data source bean is defined using the Apache commons database connection pool library. The data source driver class property is set to the Hive JDBC driver and a set of SQL statements are executed upon start up.  These statements add the jar containing the UDF classes to the distributed cache and declares a reference to the 'near' and 'dist' UDFs.  Finally, a JDBC Spring template is defined with a reference to the aforementioned data source. This template will be injected into a service bean to enable SQL query execution and row mapping.

The see physically the result of the query on a world map, the Flex API for ArcGIS Server is utilized. Bridging the server side and the client side is the Spring Flex Integration project. This enables a Flex client application to execute functions on Spring based Remote Objects.


@Service("hiveService")
@RemotingDestination
public class HiveService
{
    @Autowired
    public JdbcTemplate jdbcTemplate;

    public FeatureSet query(final String where)
    {
        final List list = jdbcTemplate.query("SELECT X,Y,NAME FROM CITIES WHERE " + where, new RowMapper()
        {
            public Feature mapRow(final ResultSet resultSet, final int i) throws SQLException
            {
                final double x = WebMercator.longitudeToX(resultSet.getDouble(1));
                final double y = WebMercator.latitudeToY(resultSet.getDouble(2));
                final MapPoint mapPoint = new MapPoint(x, y);

                final String name = resultSet.getString(3);
                final ASObject attributes = new ASObject();
                attributes.put("name", name);

                return new Feature(mapPoint, attributes);
            }
        });
        final Feature[] features = new Feature[list.size()];
        list.toArray(features);
        return new FeatureSet(features);
    }
}

I've talked extensively in my previous posts about the beauty of the no-impedance mismatch between the server side and client side transfer objects such as in the case of FeatureSet, MapPoint and Feature instances. This HiveService is injected with the Spring defined JDBC template and exposes a 'query' function that expects a 'where' clause string. Upon a successful execution, each result set is transformed into a Feature instance that is appended to a list who this transferred back to the client in a FeatureSet instance.

Onto the client. This is a simple Flex implementation where the map and a data grid are stacked on top of each other. A user can enter a where clause that is sent to the server 'query' function using the RemoteObject capabilities.  Upon success execution, a FeatureSet is retuned and the features are rendered on the map in a GraphicLayer instance and the same features are rendered in a DataGrid instance as data rows. A user can click on a row in the data grid to highlight the feature on the map. Vice versa, a user can click on a feature on the map to highlight a row in the data grid.

I know that this is a lot of information.  Thanks for reading it through. Like usual you can find all the source code from here.

Tuesday, August 28, 2012

Big Data,Spatial Pig,Threaded Visualization

This post is PACKED with goodies - One of the ways to analyze large sets of data in the Hadoop File System without writing MapReduce jobs is to use Apache Pig. I highly recommend that you read Programming Pig, in addition to the online documentation. Pig Latin, the scripting language of Pig, is easy to understand, write and more importantly to extend. Since we do spatial stuff, the first goodie extends Pig Latin with a spatial function when analyzing data from HDFS. Here is the problem I posed to myself, given a very large set of records containing an X and Y field, and given a set of polygons, I want to produce a set of tuples containing the polygon id and the number of X/Y records in that polygon. Nothing that a GP/Py task cannot do, but needed the exercise and BTW, you can call Pig from Python (post for another day).
Pig, when executing in MapReduce mode, converts the Pig Latin script that you give it into a MapReduce job jar that it submits to Hadoop. You can register additional jars that get packaged into the job jar and define UDFs (User Defined Function) to extend Pig Latin with new capabilities. The UDF that I wrote returns the polygon identifier that contains a given X and Y values. The polygon set is read by the UDF at startup from a shapefile that is loaded into the Distributed Cache. The polygons are spatially indexed for fast query and PIP (Point In Polygon) operation during tuple (x,y) evaluation. The Evaluator extends the org.apache.pig.EvalFunc class and is constructed with a reference to the shapefile path in the Hadoop file system.

    public PIPShapefile(final String path)
    {
        m_path = path; // Save reference as field variable.
    }

A mapping of the referenced shapefile to an internal name should be defined. The book Programming Pig dwells in detail on this.

    @Override
    public List getCacheFiles()
    {
        final List list = new ArrayList(1);
        list.add(m_path + ".shp#file.shp"); // Note the #
        return list;
    }

Since the evaluator return the polygon identifier, we tell the function to return an integer.

    @Override
    public Schema outputSchema(Schema input)
    {
        return new Schema(new Schema.FieldSchema(null, DataType.INTEGER));
    }

Finally, we implement the exec method that will be executed on every input tuple. This is a quick and dirty PIP implementation using JTS (leaving to the reader as an exercise to expand and elaborate :-)

    @Override
    public Integer exec(final Tuple tuple) throws IOException
    {
        m_init.init();
        int rc = -1;
        if (tuple != null && tuple.size() > 1)
        {
            double minArea = Double.POSITIVE_INFINITY;
            final Double x = (Double) tuple.get(0);
            final Double y = (Double) tuple.get(1);
            // log.info(x + " " + y);
            m_envelope.init(x - OFFSET, x + OFFSET, y - OFFSET, y + OFFSET);
            final List list = m_spatialIndex.query(m_envelope);
            for (final Feature feature : list)
            {
                if (feature.envelope.covers(x, y))
                {
                    final double area = feature.envelope.getArea();
                    if (area < minArea)
                    {
                        minArea = area;
                        rc = feature.number;
                    }
                }
            }
        }
        return rc;
    }

Note the m_init.init() invocation, this is because there is no initialization entry point in the lifecycle of a UDF. So the m_init is a reference to an implementation that mutates after the first time through from a shape file loader to an empty function. I used the geotools library to load and spatially index the features for later processing.

    private interface IInit
    {
        void init() throws IOException;
    }
    
    private final class FTTInit implements IInit
    {
        public void init() throws IOException
        {
            m_spatialIndex = new STRtree();
            final ShapefileReader shapefileReader = new ShapefileReader(new ShpFiles("file.shp"), true, true, m_geometryFactory);
            try
            {
                shapefileReader.disableShxUsage();
                while (shapefileReader.hasNext())
                {
                    final ShapefileReader.Record record = shapefileReader.nextRecord();
                    final Envelope envelope = new Envelope(record.envelope());
                    final Feature feature = new Feature();
                    feature.envelope = envelope;
                    feature.number = record.number;
                    m_spatialIndex.insert(envelope, feature);
                }
            }
            finally
            {
                shapefileReader.close();
            }
            m_init = new NoopInit(); // NOOP after first time through !!
        }
    }

    private final class NoopInit implements IInit
    {
        public void init()
        {
        }
    }

Compile the class using the following jars and package all into a jar to be registered with Pig.

gt-api-9-SNAPSHOT.jar (include)
gt-data-9-SNAPSHOT.jar (include)
gt-main-9-SNAPSHOT.jar (include)
gt-metadata-9-SNAPSHOT.jar (include)
gt-opengis-9-SNAPSHOT.jar (include)
gt-shapefile-9-SNAPSHOT.jar (include)
jts-1.13.jar (include)
pig-0.10.0.jar (exclude)

You can download the source code for PIPShapefile.java from here.

Before executing the script, I loaded the shapefile into HDFS using hadoop CLI commands:

$ hadoop fs -mkdir geo
$ hadoop fs -put cntry06.shp geo
$ haddop fs -put cntry06.dbf geo

I generated a million random points using AWK:

$ cat gendata.awk
BEGIN{
        OFS="\t"
        for(I=0;I < 1000000;I++){
                X=-180+360*rand()
                Y=-90+180*rand()
                print "L-"I,X,Y
        }
        exit
}

$ awk -f gendata.awk > data.tsv
$ hadoop fs -mkdir data
$ hadoop fs -put data.tsv data

The following is the Pig Latin script that solves the above stated problem; As you can see it is pretty self-explanatory:

$ cat script.pig

register '/Users/mraad_admin/JavaWorkspace/GeomX/out/GeomX.jar'
define pip com.esri.PIPShapefile('/user/mraad_admin/geo/cntry06');
A = LOAD 'data/data.tsv' AS (name:chararray,x:double,y:double);
B = FOREACH A GENERATE pip(x,y) AS P:int;
F = FILTER B BY P != -1;
G = GROUP F BY P;
C = FOREACH G GENERATE group,COUNT(F);
dump C;

Register the jar to be included in the MapReduce job jar. Define pip UDF with reference to the shape file in the HDFS. Load the data.tsv from HDFS with the described schema. Iterate over the data and generate a list of polygon ids based on x and y values. Keep all "found" polygon (id != -1), group them by the id value, and finally dump each id and its count. Pretty neat, no ?  To execute the script:

$ pig -f script.pig

Now, what to do with all these outputted tuples ? Goodie #2; Execute Pig on the server side and send the output to a client side app for visualization. Using Spring Data Hadoop, we can invoke a Pig script using a Pig Server instance and using Spring Flex expose it as a service for Remote Object invocation.
Here is the PigService implementation:

package com.esri;

@Service("pigService")
@RemotingDestination
public final class PigService
{
    @Autowired
    public PigServerFactoryBean m_pigServerFactoryBean;

    public List run(final String load, final String funcArg) throws Exception
    {
        final List list = new ArrayList();
        final PigServer pigServer = m_pigServerFactoryBean.getObject();
        try
        {
            pigServer.registerJar("/Users/mraad_admin/JavaWorkspace/GeomX/out/GeomX.jar");

            pigServer.registerFunction("pip", new FuncSpec("com.esri.PIPShapefile", funcArg));

            pigServer.registerQuery("A = load '" + load + "' as (name:chararray,x:double,y:double);");
            pigServer.registerQuery("B = foreach A generate pip(x,y) as P:int;");
            pigServer.registerQuery("F = FILTER B BY P != -1;");
            pigServer.registerQuery("G = GROUP F BY P;");
            pigServer.registerQuery("C = FOREACH G GENERATE group,COUNT(F);");

            final Iterator tupleIterator = pigServer.openIterator("C");
            while (tupleIterator.hasNext())
            {
                final Tuple tuple = tupleIterator.next();
                final ASObject asObject = new ASObject();
                asObject.put("index", tuple.get(0));
                asObject.put("count", tuple.get(1));
                list.add(asObject);
            }
        }
        finally
        {
            pigServer.shutdown();
        }
        return list;
    }
}

Notice how the Pig Script is broken down by registering the jar, the function, the queries and all are executed when openIterator is invoked, resulting in an iterator that enables me to convert the tuples into AS3Objects ready for AMF transfer to the client.

Final step; Invoke, fuse and visualize. The Flex API for ArcGIS is a great tool. In this post's implementation, I am taking advantage of one of the enhancements in the latest Flash Player (11.4); Workers. Finally, we have "threads" in the Flash Player! Take a look at Lee Brimelow video tutorial for a great description of worker usage. The application enables the user to load a local shapefile.  I am using a worker to parse the shapefile that was loaded into the distributed cache and convert each shape into a Graphic instance for visualization. The shapefile is a bit over 3.5MB, and in the "old" days that would have spun up the "beach ball of death" on my mac while parsing this size shapefile.  Now....no beach ball and the UI is still responsive. That was goodie #3. I had to play a bit of "games" by creating separate projects to enable the creation of the worker swf and the common code.  All this should be resolved in theory with the release of the Flash Builder 4.7. The Flex application enables the user to invoke the PigService and the resulting tuples are merged into the loaded feature attribute for thematic rendering. Goodie #4, ThematicFillSymbol is a custom symbol with full 180 warp-around polygon rendering.  It accepts as parameters the min/max count value and the min/max color value and fills each polygon proportionally to the count attribute value.

Well, that was a lot of stuff.  If you stayed with me all the way till here....thank you. Like usual, you can download all the source code of the server from here and the client from here.

Thursday, August 23, 2012

MongoDB + Spring + Mobile Flex API for ArcGIS = Harmonie


I've used MongoDB on a project for the City of Chicago with great success.  I was impressed with the fact that we can store JSON documents in one giant collection, scale horizontally by just adding new nodes, the plethora of language APIs (Java,AS3) that can talk to it, run MapReduce tasks, and my favorite is that you can create a true spatial index on a document property.  This is not some BTree index on a compounded x/y properties, but a true spatial index that can be used to perform spatial operation such as near and within.  Today, Mongo only supports point geometries, but I understand that they are working on storing and spatially indexing lines and polygons and enabling other spatial operations. I've experimented with great success in an intranet in consuming BSON object directly from Mongo into a Flex based application, but the direct socket connection is a problem in a web or mobile environment. In addition, what I wanted was some middle tier that can turn my BSON documents into a full fledge Flex AGS Graphic instance with as little impedance as possible. When we wrote the Flex API for ArcGIS, we annotated key classes with "RemoteClass" to enable direct AMF streaming from ArcGIS Server at the SoC level.  What that means, is that there is no impedance mismatch between the object types created on the server with the object types consumed on the client. Some of these annotated classes are, FeatureSet, Graphic, MapPoint, SpatialReference. Whenever the words middle tier and AMF are together, you have to think BlazeDS with its RemoteObject capabilities on POJO and whenever you say POJO, the SpringFramework is by far the best at handling POJOs. Lucky for me, Under the Spring Framework umbrella exist two projects that the marriage of the two will create beautiful harmony; the Spring BlazeDS Integration and the Spring Data for MongoDB.  The BlazeDS Integration will handle the exposure of annotated POJOs as remote services that can be consumed from Flex and using IoC, these POJOs are injected with MongoDB templates for data access.  In today's world, client applications have to be of mobile nature and my experiment to fuze all these entities is no exception. So, I've written a simple Flex mobile application, that declares a RemoteObject with an http based endpoint to a server side registered POJO that exposes a "near" operation. On the client, the user can tap a location on the map.  The location is an argument to the "near" server side function, that invokes on MongoDB the "near" operation on a collection of tweet documents. The found tweet documents are converted to Graphic instances and returned back to the Flex application as part of a FeatureSet. The features in the FeatureSet are assigned to the graphicProvider property of a GraphicLayer, thus rendering these features on the map.  Just for coolness, I placed a switch toggle as a map control to enable/disable the clustering of the returned features. The key glue to this fusion is the mongo mapping converter implementation that converts BSON object to AGS features.


public class FeatureReadConverter implements Converter
{
    public Feature convert(final DBObject dbo)
    {
        final List shape = (List) dbo.removeField("shape");
        final Double lon = shape.get(0);
        final Double lat = shape.get(1);
        final MapPoint mapPoint = new MapPoint(
           WebMercator.longitudeToX(lon),
           WebMercator.latitudeToY(lat));
        final ASObject attributes = new ASObject();
        for (final String key : dbo.keySet())
        {
            attributes.put(key, dbo.get(key));
        }
        return new Feature(mapPoint, attributes);
    }
}


Here is a snapshot of the application in action from my iPhone !


And like usual, you can download the client source code from here, and the server side code from here.
Disclosure - The current flaring does not work on mobile using the released API unless you include the mx swc :-( this has been fix for the next release (don't ask....soon :-)