Monday, September 24, 2012

Processing Big Data with Apache Hive and Esri ArcPy


Data Scientists, if you are processing and analyzing spatial data and are using Python, then ArcPy should be included in your arsenal of tools and ArcMap should be utilized for geo spatial data visualization.  Following the last post where I extended Apache Hive with spatial User Defined Functions (UDFs), in this post I will demonstrate the usage of the "extended" Hive within Python and how to save the output into a feature class for rendering within ArcMap or any ArcWeb client using ArcGIS Server.

Given a running Hadoop instance and assuming that you have installed Hive and have created a Hive table as described in the last post, start the Hive Thrift server as follows:

$ hive --service hiveserver

When ArcGIS for Desktop is installed on a host, Python is optionally installed and is enabled with GeoProcessing capabilities. Install Hive on your desktop and set the environment variable HIVE_HOME to the location where Hive is residing. To access the Hive python libraries, export the environment variable PYTHONPATH with its value set to $HIVE_HOME/lib/py.

With the setup behind us, let's tackle a simple use case; Given a polygon feature class on the desktop and a set of points stored in the Hadoop File System and are exposed through a Hive table, I want to perform a point in polygon operation on Hadoop and update the local feature class polygon attributes with the return results.

Here is the Python script:

import arcpy;
import sys

from arcpy import env

from hive_service import ThriftHive
from hive_service.ttypes import HiveServerException
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol

env.overwriteOutput = True

try:    
    transport = TSocket.TSocket('localhost', 10000)
    transport = TTransport.TBufferedTransport(transport)
    protocol = TBinaryProtocol.TBinaryProtocol(transport)
    client = ThriftHive.Client(protocol)
    transport.open()

    client.execute("add file countries.shp")
    client.execute("add file countries.shx")
    client.execute("add jar GeomX.jar")
    client.execute("create temporary function pip as 'com.esri.GenericUDFPip'")

    client.execute("""
    select t.p as fid,count(t.p) as val
    from (select pip(x,y,'./countries.shp') as p from cities) t
    where p!=-1 group by t.p
    """)
    rows = client.fetchAll()
    transport.close()
    
    keyval = dict()

    for row in rows:
        tokens = row.split()
        key = int(tokens[0])
        val = int(tokens[1])
        keyval[key] = val
    del row
    del rows

    rows = arcpy.UpdateCursor("countries.shp")
    for row in rows:
        key = row.FID
        if key in keyval:
            row.HADOOP = keyval[key]
            rows.updateRow(row)
    del row
    del rows

except Thrift.TException, tx:
    print '%s' % (tx.message)

The script imports the Thrift Hive client and the ArcPy library. It then connects to the Thrift Hive server on the localhost and executes a set of setup operations. The first two add the countries shapefile geometry and spatial index files into the distributed cache.  The next setup adds the jar file containing the spatial UDF functions. The last setup defines the pip function with a reference to the class in the loaded jar. The select statement is executed to retrieve the country identifier and the number of cities in that country based on a nest select who uses the pip function to identify which city point falls into which country polygon. An fid with a value of -1 is returned if a pip result is not found and is excluded from the final group count.  The fetchAll function returns a list of text items, where each text item is an fid value followed by a tab then a count value.  A dictionary is populated by tokenizing the list where the dictionary key is the fid and the value is the count.  An arcpy update cursor is opened on the local countries feature class and a row iterator is executed.  for each row, the FID value is retrieved and checked if it exists as a dictionary key. If found, the attribute HADOOP field is updated with the dictionary value.

Upon a successful execution (and remember, this might take a while as Hive is a batch process), open ArcMap, load that feature class and symbolize it with a class break qualifier based on the HADOOP field values.

Pretty cool, no?  This is a very very simple example of the marriage of a BigData tool and a GIS tool using Python.  There is so much more that can be done using this combination of tools in the same thought process. Expect more posts along the same vein with more arcpy usage. I just wanted to plant a small seed in your mind.

Update: This is another example that calculates the average lat/lon values of cities per country in Hive and the result set in used to create a point feature class:


import arcpy, sys, os

from arcpy import env

from hive_service import ThriftHive
from hive_service.ttypes import HiveServerException
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol

env.overwriteOutput = True

try:
  prjFile = os.path.join(arcpy.GetInstallInfo()["InstallDir"],
        r"Coordinate Systems\Geographic Coordinate Systems\World\WGS 1984.prj")
  spatialRef = arcpy.SpatialReference(prjFile)

  tempWS = "in_memory"
  tempFS = "XY_FeatureClass"

  arcpy.CreateFeatureclass_management(tempWS, tempFS , "POINT", "","","", spatialRef)

  tempFC = os.path.join(tempWS, tempFS)

  arcpy.AddField_management(tempFC, "country", "TEXT", 0, 0, 8)
  
  transport = TSocket.TSocket('10.128.249.8', 10000)
  transport = TTransport.TBufferedTransport(transport)
  protocol = TBinaryProtocol.TBinaryProtocol(transport)
  client = ThriftHive.Client(protocol)
  transport.open()

  client.execute("add jar /Users/mraad_admin/JavaWorkspace/GeomX/GeomX.jar")

  client.execute("""
    select country,avg(x),avg(y)
    from cities
    group by country
    """)
  rows = client.fetchAll()
  transport.close()
  
  inCur = arcpy.InsertCursor(tempFC)
  for row in rows:
    tokens = row.split()

    country = tokens[0]
    avgx = float(tokens[1])
    avgy = float(tokens[2])

    feat = inCur.newRow()
    feat.Shape = arcpy.Point(avgx, avgy)
    feat.country = country
    inCur.insertRow(feat)

  del inCur, rows, row

  arcpy.CopyFeatures_management(tempFC, r"Z:\Sites\XYpoints")
  
except Thrift.TException, tx:
  print '%s' % (tx.message)


1 comment:

Unknown said...

This big data buzzz is really picking up....now it truly harmonizes the blending of opensource and ESRI, there is one article i would like to share with you,
http://www.aftenposten.no/digital/nyheter/Samler-data-om-hele-verden-6999454.html, although in norwegian.

Sandeep Kuniel