Monday, September 24, 2012

Processing Big Data with Apache Hive and Esri ArcPy


Data Scientists, if you are processing and analyzing spatial data and are using Python, then ArcPy should be included in your arsenal of tools and ArcMap should be utilized for geo spatial data visualization.  Following the last post where I extended Apache Hive with spatial User Defined Functions (UDFs), in this post I will demonstrate the usage of the "extended" Hive within Python and how to save the output into a feature class for rendering within ArcMap or any ArcWeb client using ArcGIS Server.

Given a running Hadoop instance and assuming that you have installed Hive and have created a Hive table as described in the last post, start the Hive Thrift server as follows:

$ hive --service hiveserver

When ArcGIS for Desktop is installed on a host, Python is optionally installed and is enabled with GeoProcessing capabilities. Install Hive on your desktop and set the environment variable HIVE_HOME to the location where Hive is residing. To access the Hive python libraries, export the environment variable PYTHONPATH with its value set to $HIVE_HOME/lib/py.

With the setup behind us, let's tackle a simple use case; Given a polygon feature class on the desktop and a set of points stored in the Hadoop File System and are exposed through a Hive table, I want to perform a point in polygon operation on Hadoop and update the local feature class polygon attributes with the return results.

Here is the Python script:

import arcpy;
import sys

from arcpy import env

from hive_service import ThriftHive
from hive_service.ttypes import HiveServerException
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol

env.overwriteOutput = True

try:    
    transport = TSocket.TSocket('localhost', 10000)
    transport = TTransport.TBufferedTransport(transport)
    protocol = TBinaryProtocol.TBinaryProtocol(transport)
    client = ThriftHive.Client(protocol)
    transport.open()

    client.execute("add file countries.shp")
    client.execute("add file countries.shx")
    client.execute("add jar GeomX.jar")
    client.execute("create temporary function pip as 'com.esri.GenericUDFPip'")

    client.execute("""
    select t.p as fid,count(t.p) as val
    from (select pip(x,y,'./countries.shp') as p from cities) t
    where p!=-1 group by t.p
    """)
    rows = client.fetchAll()
    transport.close()
    
    keyval = dict()

    for row in rows:
        tokens = row.split()
        key = int(tokens[0])
        val = int(tokens[1])
        keyval[key] = val
    del row
    del rows

    rows = arcpy.UpdateCursor("countries.shp")
    for row in rows:
        key = row.FID
        if key in keyval:
            row.HADOOP = keyval[key]
            rows.updateRow(row)
    del row
    del rows

except Thrift.TException, tx:
    print '%s' % (tx.message)

The script imports the Thrift Hive client and the ArcPy library. It then connects to the Thrift Hive server on the localhost and executes a set of setup operations. The first two add the countries shapefile geometry and spatial index files into the distributed cache.  The next setup adds the jar file containing the spatial UDF functions. The last setup defines the pip function with a reference to the class in the loaded jar. The select statement is executed to retrieve the country identifier and the number of cities in that country based on a nest select who uses the pip function to identify which city point falls into which country polygon. An fid with a value of -1 is returned if a pip result is not found and is excluded from the final group count.  The fetchAll function returns a list of text items, where each text item is an fid value followed by a tab then a count value.  A dictionary is populated by tokenizing the list where the dictionary key is the fid and the value is the count.  An arcpy update cursor is opened on the local countries feature class and a row iterator is executed.  for each row, the FID value is retrieved and checked if it exists as a dictionary key. If found, the attribute HADOOP field is updated with the dictionary value.

Upon a successful execution (and remember, this might take a while as Hive is a batch process), open ArcMap, load that feature class and symbolize it with a class break qualifier based on the HADOOP field values.

Pretty cool, no?  This is a very very simple example of the marriage of a BigData tool and a GIS tool using Python.  There is so much more that can be done using this combination of tools in the same thought process. Expect more posts along the same vein with more arcpy usage. I just wanted to plant a small seed in your mind.

Update: This is another example that calculates the average lat/lon values of cities per country in Hive and the result set in used to create a point feature class:


import arcpy, sys, os

from arcpy import env

from hive_service import ThriftHive
from hive_service.ttypes import HiveServerException
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol

env.overwriteOutput = True

try:
  prjFile = os.path.join(arcpy.GetInstallInfo()["InstallDir"],
        r"Coordinate Systems\Geographic Coordinate Systems\World\WGS 1984.prj")
  spatialRef = arcpy.SpatialReference(prjFile)

  tempWS = "in_memory"
  tempFS = "XY_FeatureClass"

  arcpy.CreateFeatureclass_management(tempWS, tempFS , "POINT", "","","", spatialRef)

  tempFC = os.path.join(tempWS, tempFS)

  arcpy.AddField_management(tempFC, "country", "TEXT", 0, 0, 8)
  
  transport = TSocket.TSocket('10.128.249.8', 10000)
  transport = TTransport.TBufferedTransport(transport)
  protocol = TBinaryProtocol.TBinaryProtocol(transport)
  client = ThriftHive.Client(protocol)
  transport.open()

  client.execute("add jar /Users/mraad_admin/JavaWorkspace/GeomX/GeomX.jar")

  client.execute("""
    select country,avg(x),avg(y)
    from cities
    group by country
    """)
  rows = client.fetchAll()
  transport.close()
  
  inCur = arcpy.InsertCursor(tempFC)
  for row in rows:
    tokens = row.split()

    country = tokens[0]
    avgx = float(tokens[1])
    avgy = float(tokens[2])

    feat = inCur.newRow()
    feat.Shape = arcpy.Point(avgx, avgy)
    feat.country = country
    inCur.insertRow(feat)

  del inCur, rows, row

  arcpy.CopyFeatures_management(tempFC, r"Z:\Sites\XYpoints")
  
except Thrift.TException, tx:
  print '%s' % (tx.message)


Monday, September 17, 2012

Big Data, Spatial Hive, Sequence Files


Following the last post, where we used Pig to analyze data stored in HDFS, in this post we will be using Hive and spatially enabling it for geo analysis. Hive enable you to write SQL like statements in a language called HiveQL that Hive converts to a MapReduce job that is submitted to Hadoop for execution. Again, if you know SQL, then learning HiveQL is very easy and intuitive.  Hive is not intended for OLTP and real-time analysis. Like Pig, Hive is extensible via User Defined Functions (UDFs), so we will be using almost the same functions as in the previous post to find things that are near and/or within some criteria.

There are several ways to store data in HDFS, one of them is in the SequenceFile format. This is a key/value binary format with compression capabilities. For this post, I will be transforming an input into a well know binary format to be stored onto HDFS for later query and analysis.

An object that required persistence onto a SequenceFile has to implement the Writable interface. So, here we go, since we deal with spatial features, let's declare a Feature class that implements the Writable interface:


public class Feature implements Writable
{
    public IGeometry geometry;
    public ISymbol symbol = NoopSymbol.NOOP;

    public Feature()
    {
    }

    public void write(final DataOutput dataOutput) throws IOException
    {
        geometry.write(dataOutput);
        symbol.write(dataOutput);
    }

    public void readFields(final DataInput dataInput) throws IOException
    {
        geometry.readFields(dataInput);
        symbol.readFields(dataInput);
    }
}

A Feature has a geometry and a symbol. A geometry is also Writable:


public interface IGeometry extends Writable
{
}

An implementation of the geometry interface is a two dimensional MapPoint:


public class MapPoint implements IGeometry
{
    public double x;
    public double y;

    public MapPoint()
    {
    }

    public void write(final DataOutput dataOutput) throws IOException
    {
        dataOutput.writeDouble(x);
        dataOutput.writeDouble(y);
    }

    public void readFields(final DataInput dataInput) throws IOException
    {
        x = dataInput.readDouble();
        y = dataInput.readDouble();
    }
}

For now, feature will have an no operation (noop) symbol associated with them:


public class NoopSymbol implements ISymbol
{
    public final static ISymbol NOOP = new NoopSymbol();

    public NoopSymbol()
    {
    }

    public void write(final DataOutput dataOutput) throws IOException
    {
    }

    public void readFields(final DataInput dataInput) throws IOException
    {
    }
}

The input source that I wanted to test in my ETL is a set of cities (cities1000) in TSV format downloaded from geonames. The Writable object to read and write from a SequenceFile is a City class that extends a Feature and is augmented with attributes.


public class City extends Feature
{
    public int cityId;
    public String name;
    public String country;
    public int population;
    public int elevation;
    public String timeZone;

    @Override
    public void write(final DataOutput dataOutput) throws IOException
    {
        super.write(dataOutput);
        dataOutput.writeInt(cityId);
        dataOutput.writeUTF(name);
        dataOutput.writeUTF(country);
        dataOutput.writeInt(population);
        dataOutput.writeInt(elevation);
        dataOutput.writeUTF(timeZone);
    }

    @Override
    public void readFields(final DataInput dataInput) throws IOException
    {
        geometry = MapPoint.newMapPoint(dataInput);
        symbol = NoopSymbol.NOOP;
        cityId = dataInput.readInt();
        name = dataInput.readUTF();
        country = dataInput.readUTF();
        population = dataInput.readInt();
        elevation = dataInput.readInt();
        timeZone = dataInput.readUTF();
    }
}

Using Hadoop's command line interface, I prepared a working directory to load the cities into HDFS:


$ hadoop fs -mkdir cities

Next, I wrote and executed a Java program using the opencsv library to extract, transform and load the TSV into SequenceFile records onto HDFS.

I highly recommend that you read Hadoop In Action. It has a nice introduction to installing and running Hive. Remember, Hive operates on SQL-like statements, so to operate on the loaded City data, we create a table that maps to the City object. From the Hive command line interface, we execute the following command:


hive> create external table cities(
 x double,
 y double,
 cityid int,
 name  string,
 country string,
 population int,
 elevation int,
 timezone string
 ) row format serde 'com.esri.CitySerDe'
 stored as sequencefile location '/user/mraad_admin/cities';

If you know SQL, this should be familiar. But note the last two lines;  It instructs Hive to read the data in a SequenceFile format from an HDFS location that we previously prepared and since the data is in a binary format, each row is serialized and deserialized using a helper SerDe class.
The CitySerDe class knows how to serialize and deserialize a writable object from the input and output stream into and from a concrete City class instance. In addition, it provides column metadata such as the column name and the type to Hive. The SerDe is compiled and packaged into a jar that is added to the hive runtime for usage:


hive> add jar /Users/mraad_admin/JavaWorkspace/GeomX/GeomX.jar;

Added /Users/mraad_admin/JavaWorkspace/GeomX/GeomX.jar to class path
Added resource: /Users/mraad_admin/JavaWorkspace/GeomX/GeomX.jar

hive> show tables;
OK
cities
Time taken: 3.98 seconds

hive> describe cities;
OK
x double from deserializer
y double from deserializer
cityid int from deserializer
name string from deserializer
country string from deserializer
population int from deserializer
elevation int from deserializer
timezone string from deserializer
Time taken: 0.434 seconds

hive> select * from cities limit 5;
OK
1.49129 42.46372 3039163 Sant Julia de Loria AD 8022 0 Europe/Andorra
1.73361 42.54277 3039604 Pas de la Casa AD 2363 2050 Europe/Andorra
1.53319 42.55623 3039678 Ordino AD 3066 0 Europe/Andorra
1.53414 42.50729 3040051 les Escaldes AD 15853 0 Europe/Andorra
1.51483 42.54499 3040132 la Massana AD 7211 0 Europe/Andorra
Time taken: 0.108 seconds

Like I said, If you know SQL, you can find the top 5 countries with the most cities by issuing the following statement - no need to write MR jobs:


hive> select country,count(country) as c from cities group by country order by c desc limit 5;

Onto spatial.  Hive is extensible via User Defined Functions (UDFs). So I wanted to find all the cities that are near a specific location, I extend hive with a 'near' function that was packaged in the added jar and defined it as follows:


hive> create temporary function near as 'com.esri.UDFNear';

I can now use the 'near' function to locate cities within 5 miles of a specific location:


hive> select name from cities where near(x,y,-84.20299,39.43534,5);

The UDFNear function extends the UDF class and implements in this case the Haversine distance calculation between two geographical locations.


public class UDFNear extends UDF
{
    private final static BooleanWritable TRUE = new BooleanWritable(true);
    private final static BooleanWritable FALSE = new BooleanWritable(false);

    public BooleanWritable evaluate(
            DoubleWritable x1, DoubleWritable y1,
            DoubleWritable x2, DoubleWritable y2,
            DoubleWritable distance
    )
    {
        return HaversineMiles.distance(y1.get(), x1.get(), y2.get(), x1.get()) < distance.get() ? TRUE : FALSE;
    }

    public boolean evaluate(
            double x1, double y1,
            double x2, double y2,
            double distance
    )
    {
        return HaversineMiles.distance(y1, x1, y2, x2) < distance;
    }
}

Let's assume that the field 'country' was not defined in the table, and I want to find the count of cities per country where I only have the spatial boundaries of the countries.   This is a perfect spatial join and where UDF, DistributedCache, and spatial libraries like JTS and geotools come to the rescue.

I extended the GenericUDF class with a GenericUDFPip class that performs a 'naive' point-in-polygon (pip) based on geometries loaded into the distributed cache.  This enabled me to write a spatial query as follows:


hive> add file borders.shp;
hive> add file borders.shx;
hive> create temporary function pip as 'com.esri.GenericUDFPip';
hive> select t.p,count(t.p) from (select pip(x,y,'./borders.shp') as p from cities) t where t.p != -1 group by t.p;

The first two lines load into the distributed cache the countries borders shape file and its spatial index - these will be used by the pip function first time through to create an in-memory spatial index for fast searching. The pip function is defined as a class in the previously added jar file. The pip function expects 3 arguments; the first is a longitude, the second is a latitude and the third is the shape file location in the distributed cache. Based on these arguments, it will return the country border record identifier where the longitude and latitude arguments fall into or a -1 if there is no intersection.  For the query, a nested table is first created based on the pip result, which is then grouped and aggregated based on a non-negative border identifier.
Pretty cool - no ? So imagine what else you could do with HQL and these libraries...Say find me the top 10 cities with the most surrounding cities in a 25 mile radius (exercise for the reader. Hint, use join and look at the source code for UDFDist :-)

The fun is not about to stop. Since this is SQL-Like, Hive comes with a JDBC driver. Using my favorite Java framework, Spring-Hadoop integrates with Hive to become a JDBC client.

First make sure to start Hive as a service:


$ hive --service hiverserver

Next, define a Spring application context as follows:



   
          destroy-method="close"
          p:driverClassName="org.apache.hadoop.hive.jdbc.HiveDriver"
          p:url="jdbc:hive://localhost:10000/default"
          p:connectionInitSqls-ref="initSqls">
   

   
        add jar /Users/mraad_admin/JavaWorkspace/GeomX/GeomX.jar
        create temporary function near as 'com.esri.UDFNear'
        create temporary function dist as 'com.esri.UDFDist'
   

   
          c:dataSource-ref="hive-ds"/>


A Hive data source bean is defined using the Apache commons database connection pool library. The data source driver class property is set to the Hive JDBC driver and a set of SQL statements are executed upon start up.  These statements add the jar containing the UDF classes to the distributed cache and declares a reference to the 'near' and 'dist' UDFs.  Finally, a JDBC Spring template is defined with a reference to the aforementioned data source. This template will be injected into a service bean to enable SQL query execution and row mapping.

The see physically the result of the query on a world map, the Flex API for ArcGIS Server is utilized. Bridging the server side and the client side is the Spring Flex Integration project. This enables a Flex client application to execute functions on Spring based Remote Objects.


@Service("hiveService")
@RemotingDestination
public class HiveService
{
    @Autowired
    public JdbcTemplate jdbcTemplate;

    public FeatureSet query(final String where)
    {
        final List list = jdbcTemplate.query("SELECT X,Y,NAME FROM CITIES WHERE " + where, new RowMapper()
        {
            public Feature mapRow(final ResultSet resultSet, final int i) throws SQLException
            {
                final double x = WebMercator.longitudeToX(resultSet.getDouble(1));
                final double y = WebMercator.latitudeToY(resultSet.getDouble(2));
                final MapPoint mapPoint = new MapPoint(x, y);

                final String name = resultSet.getString(3);
                final ASObject attributes = new ASObject();
                attributes.put("name", name);

                return new Feature(mapPoint, attributes);
            }
        });
        final Feature[] features = new Feature[list.size()];
        list.toArray(features);
        return new FeatureSet(features);
    }
}

I've talked extensively in my previous posts about the beauty of the no-impedance mismatch between the server side and client side transfer objects such as in the case of FeatureSet, MapPoint and Feature instances. This HiveService is injected with the Spring defined JDBC template and exposes a 'query' function that expects a 'where' clause string. Upon a successful execution, each result set is transformed into a Feature instance that is appended to a list who this transferred back to the client in a FeatureSet instance.

Onto the client. This is a simple Flex implementation where the map and a data grid are stacked on top of each other. A user can enter a where clause that is sent to the server 'query' function using the RemoteObject capabilities.  Upon success execution, a FeatureSet is retuned and the features are rendered on the map in a GraphicLayer instance and the same features are rendered in a DataGrid instance as data rows. A user can click on a row in the data grid to highlight the feature on the map. Vice versa, a user can click on a feature on the map to highlight a row in the data grid.

I know that this is a lot of information.  Thanks for reading it through. Like usual you can find all the source code from here.