Friday, November 23, 2012

BigData: Cloudera Impala and ArcPy

So at the last Strata+Hadoop World, Cloudera introduces Impala- I downloaded the demo VM, and install the TPC-DS data set (read the impala_readme.txt once the VM starts up) and tested some of the queries. Was pretty fast and cool - As of this writing, UDFs and SerDes are missing from this beta release, so I cannot do my Spatial UDF operators, nor can I read JSON formatted HDFS record :-(
One of the setup tables was a customer_address table, though it was missing a lat/lon field, it included the standard address field. Will be cool to invoke an Impala query on that table and mach up the result in ArcMap using ArcPy. So, I downloaded and installed onto my Windows VM (now remember I work on a real machine, a MacBookPro :-) the ODBC driver and downloaded and installed pyodbc. Small thing, the documentation keeps talking about 'You must use the 32-bit version'. A bit of googling revealed that they are referring to the Odbcad32.exe file is located in the %systemdrive%\Windows\SysWoW64 folder.  The following is a simple GeoProcessing Python Toolbox that queries Impala, and the result set is converted into an in memory table that is appended to ArcMap's table of content.  You can join the table with a state layer and symbolize using quantile breaks the state polygons based on Impala's aggregated response.  I think this combination of BigData in HDFS converted instantly into "SmallData" for rendering and further processing in ArcMap is a great marriage - Looking forward to the next release of Impala with UDFs.

import arcpy
import pyodbc

class Toolbox(object):
    def __init__(self):
        self.label = "Toolbox"
        self.alias = "Toolbox"
        self.tools = [QueryImpala]

class QueryImpala(object):
    def __init__(self):
        self.label = "Query Impala"
        self.description = "Query Impala"
        self.canRunInBackground = False
    
    def getParameterInfo(self):
        paramtab = arcpy.Parameter("ImpalaTable", "Impala Table", "Output", "Table", "Derived")
        return [paramtab]
    
    def isLicensed(self):
        return True
    
    def updateParameters(self, parameters):
        return
    
    def updateMessages(self, parameters):
        return
    
    def execute(self, parameters, messages):
        tab = "in_memory/ImpalaTable"
        if arcpy.Exists(tab): arcpy.management.Delete(tab)
        arcpy.management.CreateTable("in_memory","ImpalaTable")
        arcpy.management.AddField(tab, "STATE", "TEXT")
        arcpy.management.AddField(tab, "COUNT", "LONG")
        
        connection = pyodbc.connect('DSN=HiveODBC')
        cursor = connection.cursor()
        cursor.execute("""
select ca_state as state,count(ca_state) as cnt
from customer_address
group by ca_state
""")
        with arcpy.da.InsertCursor(tab, ['STATE', 'COUNT']) as insert:
            rows = cursor.fetchall()
            for row in rows:
                insert.insertRow([row.state,row.cnt])
            del row
            del rows
        del insert
        del cursor
        del connection
        parameters[0].value = tab
        return

No comments: