Sunday, November 25, 2012

BigData: HDFS FeatureClass ETL and MapReduce GPTool


This post is dedicated to my Esri colleagues Ajit D. and Philip H. for their invaluable help.

This is work in progress - but I've put a good dent in it that I would like to share it with you.  In this post, we will go through a complete cycle, where from ArcMap, we will:

  • Export a FeatureClass to an HDFS folder
  • Register that folder as a Hive table
  • Run command line Hive queries
  • Execute Hive queries from ArcPy and show the results in ArcMap
  • Execute a MapReduce Job as a GP Tool
  • Import an HDFS Folder (result of MapReduce Job) as a FeatureClass

This post brings everything that I have been blogging about so far into a nice story, so here we go:

BTW - I am assuming that you have a Hadoop instance running somewhere and are familiar with ArcMap. You can download a Hadoop demo VM for local testing.

Download the ArcMap extension in this file and unzip its content into your ArcGIS\Desktop10.1\java\lib\ext folder - The jars have to be children of the ext folder.

Make sure to adjust the ArcGIS JVM using the JavaConfigTool.exe location in ArcGIS\Desktop10.1\bin:

Start ArcMap, create a new toolbox and add to it the Hadoop Tools - Check out this help for detailed information on managing toolboxes:

Add the world cities to ArcMap:

Let's export the world cities to HDFS:

This tool iterates over the input FeatureClass features and stores each feature in the specified HDFS output path. The output path content is text formatted and each feature is stored as a line in an Esri JSON text representation followed by a carriage return.
This enables us to continuously add new records from for example a streaming process such as Esri GeoEvent Server.

The metadata for that HDFS based FeatureClass is stored in an HDFS based 'metastore' for other processes to inspect - A better place would have been ZooKeeper - but that is a post for another day.

Here is a sample metadata:

{
"wkid": 4326,
"geometryType": 1,
"fields": [{
"name": "ObjectID",
"alias": "ObjectID",
"type": 6,
"length": 4
}, {
"name": "Shape",
"alias": "Shape",
"type": 7,
"length": 0
}, {
"name": "CITY_NAME",
"alias": "CITY_NAME",
"type": 4,
"length": 30
}, {
...
}, {
"name": "LABEL_FLAG",
"alias": "LABEL_FLAG",
"type": 1,
"length": 4
}]
}

The metastore contains a set of files where by convention the file name is the imported FeatureClass name followed by ".json". For example:

$ hadoop fs -cat hdfs://localhadoop:9000/user/mraad/metastore/worldcities.json

The GP import tool adds one more file to the metastore, a Hive script that you can execute from the Hive command line to create an external table referencing the HDFS FeatureClass. Again, by convention the script name is the name of the imported FeatureClass followed by ".hql". For example:

$ hadoop fs -cat hdfs://localhadoop:9000/user/mraad/metastore/worldcities.hql

You can "cat" the content of the script and you will notice the usage of the collection data type for the feature geometry and attribute representation. In addition, the serialization and deserialization (SerDe) from JSON is based on a Cloudera library found in the article 'Analyzing Twitter Data Using CDH'. You can download the jar from here.


ADD JAR hive-serdes-1.0-SNAPSHOT.jar;
CREATE EXTERNAL TABLE IF NOT EXISTS worldcities (
geometry STRUCT <x:DOUBLE,y:DOUBLE,spatialReference:STRUCT <wkid:INT>>,
attributes STRUCT <
CITY_NAME:STRING,
GMI_ADMIN:STRING,
ADMIN_NAME:STRING,
FIPS_CNTRY:STRING,
CNTRY_NAME:STRING,
STATUS:STRING,
POP_RANK:INT,
POP_CLASS:STRING,
PORT_ID:INT,
LABEL_FLAG:INT
>) ROW FORMAT SERDE 'com.cloudera.hive.serde.JSONSerDe'
LOCATION 'hdfs://localhadoop:9000/user/mraad_admin/worldcities'
TBLPROPERTIES ('wkid'='4326','type'='point');


Please note how tables can have properties - is this case, I added the wkid and geometry type.

Upon the execution of the above commands, you can now query the table. Here are some sample queries:

hive> select * from worldcities limit 10;
hive> select attributes.city_name,geometry.x,geometry.y from world cities where attributes.cntry_name='Lebanon';

Hive can be accessed using ArcPy through the thrift protocol - Here is a Toolbox that enables the user to draw a polygon as input and invoke Hive spatial UDF constraining the resulting FeatureSet to the world cities within the drawn polygon. Download the UDF jar from here, and place it and the hive-serde jar in the same location where you will start the hive server as follows:

$> hive --service hiveserver

Next, I wanted to demo the capability to run a MapReduce Job as a GP Tool.

Quick MapReduce recap, For the unix geeks:

$> cat input.txt | map | sort | reduce > output.txt

And for the "acedemics":

map(K1,V1) emit list(K2,V2)
shuffle/sort K2
reduce(K2,list(V2)) emit list(K3,V3)

This is fairly low level and requires explicit writing of the Mapper and Reducer Java classes.  This is not for your average GIS user.  But I can see a time when advanced users will write parameter driven MapReduce tools and share them with the community. This is all based on 'How to build custom geoprocessing tools'.

This simple MR tool takes as input the world cities HDFS FeatureClass and finds the "centroids" by country of all the cities with a specific population rank.

BTW, this can easily be written in HQL as follows:

select attributes.cntry_name as name,
avg(geometry.x) as x,
avg(geometry.y) as y,
count(attributes.cntry_name) as cnt
from worldcities
where attributes.pop_rank < 6
group by attributes.cntry_name
having cnt > 10;

The JobRunnerTool accepts as input:

  • A set of Hadoop properties
  • HDFS FeatureClass input path
  • HDFS FeatureClass output path
  • Metastore location
The mapper converts a JSON formatter input text line (V1) into a PointFeature and will emit its point geometry (V2) if it meets a filter criteria - in this case, a population rank that is less than a user defined value. The mapper output key (K2) is the country name. BTW, K1 is the line number.

The suffle/sort portion will ensure that each reducer will receive a country name (K2) as an input key and a list of geometry points (V2) as input values.

The reducer averages the coordinates, creates a PointFeature whose geometry is a point with values based on the averaged calculations.  The attributes will include the country name and the number of points used in the averaging. The reducer key output (K3) will be the JSON formatted text representation of the PointFeature and the output value (V3) will be NULL. thus producing an HDFS FeatureClass with its metadata in the metastore for further processing and inspection.

Lastly, we close the cycle by importing the HDFS Feature class.

The tool accepts as input an HDFS FeatureClass, its metadata and an output location. When executed within ArcMap, the output is automatically added to the display.

Things to work on next:
  • UDF that accepts as input an ArcMap generated FeatureSet into the DistributedCache - I already blogged about this, but as standalone.
  • MapReduceTool that accepts external jar containing mapper/reducer classes - I think this will pave the way for the advanced users.
Stay tuned for more things to come. And like usual, all the source code can be downloaded from here.

Friday, November 23, 2012

BigData: Cloudera Impala and ArcPy

So at the last Strata+Hadoop World, Cloudera introduces Impala- I downloaded the demo VM, and install the TPC-DS data set (read the impala_readme.txt once the VM starts up) and tested some of the queries. Was pretty fast and cool - As of this writing, UDFs and SerDes are missing from this beta release, so I cannot do my Spatial UDF operators, nor can I read JSON formatted HDFS record :-(
One of the setup tables was a customer_address table, though it was missing a lat/lon field, it included the standard address field. Will be cool to invoke an Impala query on that table and mach up the result in ArcMap using ArcPy. So, I downloaded and installed onto my Windows VM (now remember I work on a real machine, a MacBookPro :-) the ODBC driver and downloaded and installed pyodbc. Small thing, the documentation keeps talking about 'You must use the 32-bit version'. A bit of googling revealed that they are referring to the Odbcad32.exe file is located in the %systemdrive%\Windows\SysWoW64 folder.  The following is a simple GeoProcessing Python Toolbox that queries Impala, and the result set is converted into an in memory table that is appended to ArcMap's table of content.  You can join the table with a state layer and symbolize using quantile breaks the state polygons based on Impala's aggregated response.  I think this combination of BigData in HDFS converted instantly into "SmallData" for rendering and further processing in ArcMap is a great marriage - Looking forward to the next release of Impala with UDFs.

import arcpy
import pyodbc

class Toolbox(object):
    def __init__(self):
        self.label = "Toolbox"
        self.alias = "Toolbox"
        self.tools = [QueryImpala]

class QueryImpala(object):
    def __init__(self):
        self.label = "Query Impala"
        self.description = "Query Impala"
        self.canRunInBackground = False
    
    def getParameterInfo(self):
        paramtab = arcpy.Parameter("ImpalaTable", "Impala Table", "Output", "Table", "Derived")
        return [paramtab]
    
    def isLicensed(self):
        return True
    
    def updateParameters(self, parameters):
        return
    
    def updateMessages(self, parameters):
        return
    
    def execute(self, parameters, messages):
        tab = "in_memory/ImpalaTable"
        if arcpy.Exists(tab): arcpy.management.Delete(tab)
        arcpy.management.CreateTable("in_memory","ImpalaTable")
        arcpy.management.AddField(tab, "STATE", "TEXT")
        arcpy.management.AddField(tab, "COUNT", "LONG")
        
        connection = pyodbc.connect('DSN=HiveODBC')
        cursor = connection.cursor()
        cursor.execute("""
select ca_state as state,count(ca_state) as cnt
from customer_address
group by ca_state
""")
        with arcpy.da.InsertCursor(tab, ['STATE', 'COUNT']) as insert:
            rows = cursor.fetchall()
            for row in rows:
                insert.insertRow([row.state,row.cnt])
            del row
            del rows
        del insert
        del cursor
        del connection
        parameters[0].value = tab
        return