Monday, March 4, 2013

BigData meet Raspberry Pi

The Raspberry Pi is a very neat little machine - I bootstrapped it with Soft-Float Debian "Wheezy" so I can run Java Embedded on it - I download and install ejre-7u10-fcs-b18-linux-arm-vfp-client_headless-28_nov_2012.tar, and now can run java programs on the Pi. Wrote the ubiquitous "Hello World" to make sure it works (I should not have doubted the write once run anywhere motto) and jumped directly to running a Spring container with beans. So, the next thing to do is to run the Hadoop client from my previous post on it.  Now remember, there is not a lot of RAM on the Model B Rev 2 Pi (512MB) and the post-action runs on the client machine, so I reduced the cell size and the search radius, and....it worked like a charm. Now, why did I do that ? because... I can :-)
Next, is to come up with something to do with the generated data.

BigData: Kernel Density Analysis on Hadoop MapReduce

The storage of BigData has been democratized by Hadoop. And with that, rather than bringing the data to the program for analysis, we send the program to the data. That twist comes with a challenge to take existing serial algorithms and parallelizing them. This post is about such a process. I want to perform a Kernel Density analysis on billions of records loaded into HDFS. The data is fairly simple; a set of records separated by carriage return, and the fields in each record are tab separated. The fields' values contain a latitude, a longitude and a weight.
If you do not know how Kernel Density works, check out this ArcGIS reference.
To take advantage of the distributed HDFS datanodes, I have to transform the traditional state-full sequential implementation into a model that supports stateless parallelism. MapReduce is such a model, and Hadoop can distribute a MapReduce implementation to read data from the "local" HDFS datanodes and reduces back the result into HDFS.
A Kernel Density analysis expects 3 parameters when processing spatial data; an output extent, a cell size and a search radius. From the above reference description, a input value influences the cells in its search area, and the level of influence is proportional to the input weight and inversely proportional to the distance of the cell from the input. That inverse proportionality is computed using a Kernel function.
So, in the MapReduce programming paradigm, a map function expects a key(k1),value(v1) tuple and emits zero or more new key(k2),value(v2) tuples.

map(k1,v1) -> List(k2,v2)

The map portion of the parallel Kernel Density analysis implementation will emit all the influenced cells and their associated weights for a given input. A cell is defined as a string concatenating its row and column values with a slash character.
After a shuffle and sort intermediate step, the reduce function expects a key(k2),List(v2) tuple and emits zero or more new key(k3),value(v3) tuples.

reduce(k2,list(v2)) -> List(k3,v3)

The implementation will emit the sum of all the weights of a cell
To execute the kernel density map and reduce functions, I decided to use the newly released Spring Data - Apache Hadoop project. As you know, I love the Spring Framework and have been tracking the Spring Data Hadoop project for a while now and glad that it is finally released.
The following Spring application context bootstraps an application and runs the defined job. The job declaratively reference a mapper class, a reducer class and a combiner class. In addition, the job runner defines a pre-action and a post-action. The pre-action tests the existence of the HDFS output path and removes it if it exits. The post-action references a class that converts the content of the output path into a local float raster file. You can use the Float to Raster conversion tool in ArcMap to visualize the kernel density output. And like usual, all the source code can be found here.

Sunday, November 25, 2012

BigData: HDFS FeatureClass ETL and MapReduce GPTool


This post is dedicated to my Esri colleagues Ajit D. and Philip H. for their invaluable help.

This is work in progress - but I've put a good dent in it that I would like to share it with you.  In this post, we will go through a complete cycle, where from ArcMap, we will:

  • Export a FeatureClass to an HDFS folder
  • Register that folder as a Hive table
  • Run command line Hive queries
  • Execute Hive queries from ArcPy and show the results in ArcMap
  • Execute a MapReduce Job as a GP Tool
  • Import an HDFS Folder (result of MapReduce Job) as a FeatureClass

This post brings everything that I have been blogging about so far into a nice story, so here we go:

BTW - I am assuming that you have a Hadoop instance running somewhere and are familiar with ArcMap. You can download a Hadoop demo VM for local testing.

Download the ArcMap extension in this file and unzip its content into your ArcGIS\Desktop10.1\java\lib\ext folder - The jars have to be children of the ext folder.

Make sure to adjust the ArcGIS JVM using the JavaConfigTool.exe location in ArcGIS\Desktop10.1\bin:

Start ArcMap, create a new toolbox and add to it the Hadoop Tools - Check out this help for detailed information on managing toolboxes:

Add the world cities to ArcMap:

Let's export the world cities to HDFS:

This tool iterates over the input FeatureClass features and stores each feature in the specified HDFS output path. The output path content is text formatted and each feature is stored as a line in an Esri JSON text representation followed by a carriage return.
This enables us to continuously add new records from for example a streaming process such as Esri GeoEvent Server.

The metadata for that HDFS based FeatureClass is stored in an HDFS based 'metastore' for other processes to inspect - A better place would have been ZooKeeper - but that is a post for another day.

Here is a sample metadata:

{
"wkid": 4326,
"geometryType": 1,
"fields": [{
"name": "ObjectID",
"alias": "ObjectID",
"type": 6,
"length": 4
}, {
"name": "Shape",
"alias": "Shape",
"type": 7,
"length": 0
}, {
"name": "CITY_NAME",
"alias": "CITY_NAME",
"type": 4,
"length": 30
}, {
...
}, {
"name": "LABEL_FLAG",
"alias": "LABEL_FLAG",
"type": 1,
"length": 4
}]
}

The metastore contains a set of files where by convention the file name is the imported FeatureClass name followed by ".json". For example:

$ hadoop fs -cat hdfs://localhadoop:9000/user/mraad/metastore/worldcities.json

The GP import tool adds one more file to the metastore, a Hive script that you can execute from the Hive command line to create an external table referencing the HDFS FeatureClass. Again, by convention the script name is the name of the imported FeatureClass followed by ".hql". For example:

$ hadoop fs -cat hdfs://localhadoop:9000/user/mraad/metastore/worldcities.hql

You can "cat" the content of the script and you will notice the usage of the collection data type for the feature geometry and attribute representation. In addition, the serialization and deserialization (SerDe) from JSON is based on a Cloudera library found in the article 'Analyzing Twitter Data Using CDH'. You can download the jar from here.


ADD JAR hive-serdes-1.0-SNAPSHOT.jar;
CREATE EXTERNAL TABLE IF NOT EXISTS worldcities (
geometry STRUCT <x:DOUBLE,y:DOUBLE,spatialReference:STRUCT <wkid:INT>>,
attributes STRUCT <
CITY_NAME:STRING,
GMI_ADMIN:STRING,
ADMIN_NAME:STRING,
FIPS_CNTRY:STRING,
CNTRY_NAME:STRING,
STATUS:STRING,
POP_RANK:INT,
POP_CLASS:STRING,
PORT_ID:INT,
LABEL_FLAG:INT
>) ROW FORMAT SERDE 'com.cloudera.hive.serde.JSONSerDe'
LOCATION 'hdfs://localhadoop:9000/user/mraad_admin/worldcities'
TBLPROPERTIES ('wkid'='4326','type'='point');


Please note how tables can have properties - is this case, I added the wkid and geometry type.

Upon the execution of the above commands, you can now query the table. Here are some sample queries:

hive> select * from worldcities limit 10;
hive> select attributes.city_name,geometry.x,geometry.y from world cities where attributes.cntry_name='Lebanon';

Hive can be accessed using ArcPy through the thrift protocol - Here is a Toolbox that enables the user to draw a polygon as input and invoke Hive spatial UDF constraining the resulting FeatureSet to the world cities within the drawn polygon. Download the UDF jar from here, and place it and the hive-serde jar in the same location where you will start the hive server as follows:

$> hive --service hiveserver

Next, I wanted to demo the capability to run a MapReduce Job as a GP Tool.

Quick MapReduce recap, For the unix geeks:

$> cat input.txt | map | sort | reduce > output.txt

And for the "acedemics":

map(K1,V1) emit list(K2,V2)
shuffle/sort K2
reduce(K2,list(V2)) emit list(K3,V3)

This is fairly low level and requires explicit writing of the Mapper and Reducer Java classes.  This is not for your average GIS user.  But I can see a time when advanced users will write parameter driven MapReduce tools and share them with the community. This is all based on 'How to build custom geoprocessing tools'.

This simple MR tool takes as input the world cities HDFS FeatureClass and finds the "centroids" by country of all the cities with a specific population rank.

BTW, this can easily be written in HQL as follows:

select attributes.cntry_name as name,
avg(geometry.x) as x,
avg(geometry.y) as y,
count(attributes.cntry_name) as cnt
from worldcities
where attributes.pop_rank < 6
group by attributes.cntry_name
having cnt > 10;

The JobRunnerTool accepts as input:

  • A set of Hadoop properties
  • HDFS FeatureClass input path
  • HDFS FeatureClass output path
  • Metastore location
The mapper converts a JSON formatter input text line (V1) into a PointFeature and will emit its point geometry (V2) if it meets a filter criteria - in this case, a population rank that is less than a user defined value. The mapper output key (K2) is the country name. BTW, K1 is the line number.

The suffle/sort portion will ensure that each reducer will receive a country name (K2) as an input key and a list of geometry points (V2) as input values.

The reducer averages the coordinates, creates a PointFeature whose geometry is a point with values based on the averaged calculations.  The attributes will include the country name and the number of points used in the averaging. The reducer key output (K3) will be the JSON formatted text representation of the PointFeature and the output value (V3) will be NULL. thus producing an HDFS FeatureClass with its metadata in the metastore for further processing and inspection.

Lastly, we close the cycle by importing the HDFS Feature class.

The tool accepts as input an HDFS FeatureClass, its metadata and an output location. When executed within ArcMap, the output is automatically added to the display.

Things to work on next:
  • UDF that accepts as input an ArcMap generated FeatureSet into the DistributedCache - I already blogged about this, but as standalone.
  • MapReduceTool that accepts external jar containing mapper/reducer classes - I think this will pave the way for the advanced users.
Stay tuned for more things to come. And like usual, all the source code can be downloaded from here.

Friday, November 23, 2012

BigData: Cloudera Impala and ArcPy

So at the last Strata+Hadoop World, Cloudera introduces Impala- I downloaded the demo VM, and install the TPC-DS data set (read the impala_readme.txt once the VM starts up) and tested some of the queries. Was pretty fast and cool - As of this writing, UDFs and SerDes are missing from this beta release, so I cannot do my Spatial UDF operators, nor can I read JSON formatted HDFS record :-(
One of the setup tables was a customer_address table, though it was missing a lat/lon field, it included the standard address field. Will be cool to invoke an Impala query on that table and mach up the result in ArcMap using ArcPy. So, I downloaded and installed onto my Windows VM (now remember I work on a real machine, a MacBookPro :-) the ODBC driver and downloaded and installed pyodbc. Small thing, the documentation keeps talking about 'You must use the 32-bit version'. A bit of googling revealed that they are referring to the Odbcad32.exe file is located in the %systemdrive%\Windows\SysWoW64 folder.  The following is a simple GeoProcessing Python Toolbox that queries Impala, and the result set is converted into an in memory table that is appended to ArcMap's table of content.  You can join the table with a state layer and symbolize using quantile breaks the state polygons based on Impala's aggregated response.  I think this combination of BigData in HDFS converted instantly into "SmallData" for rendering and further processing in ArcMap is a great marriage - Looking forward to the next release of Impala with UDFs.

import arcpy
import pyodbc

class Toolbox(object):
    def __init__(self):
        self.label = "Toolbox"
        self.alias = "Toolbox"
        self.tools = [QueryImpala]

class QueryImpala(object):
    def __init__(self):
        self.label = "Query Impala"
        self.description = "Query Impala"
        self.canRunInBackground = False
    
    def getParameterInfo(self):
        paramtab = arcpy.Parameter("ImpalaTable", "Impala Table", "Output", "Table", "Derived")
        return [paramtab]
    
    def isLicensed(self):
        return True
    
    def updateParameters(self, parameters):
        return
    
    def updateMessages(self, parameters):
        return
    
    def execute(self, parameters, messages):
        tab = "in_memory/ImpalaTable"
        if arcpy.Exists(tab): arcpy.management.Delete(tab)
        arcpy.management.CreateTable("in_memory","ImpalaTable")
        arcpy.management.AddField(tab, "STATE", "TEXT")
        arcpy.management.AddField(tab, "COUNT", "LONG")
        
        connection = pyodbc.connect('DSN=HiveODBC')
        cursor = connection.cursor()
        cursor.execute("""
select ca_state as state,count(ca_state) as cnt
from customer_address
group by ca_state
""")
        with arcpy.da.InsertCursor(tab, ['STATE', 'COUNT']) as insert:
            rows = cursor.fetchall()
            for row in rows:
                insert.insertRow([row.state,row.cnt])
            del row
            del rows
        del insert
        del cursor
        del connection
        parameters[0].value = tab
        return

Monday, October 8, 2012

Streaming Big Data For Heatmap Visualization using Storm


A record number of tweets was set during the 2012 Presidential debate. If you wondered how technologically this happened, then Event Stream Processing is your answer.
Actually, Twitter open sourced such an implementation called Storm. Pretty impressive piece of technology! So, I wanted to try it out with a "geo" twist.
To get started, I recommend that you read "Getting Started with Storm".  Here is the challenge at hand, simulate a stream of aircraft flight track targets, in such that a heatmap is generated based on the density of targets at "near" locations.
The topology is very simple in this challenge, a spout reading the targets from an external source (to be defined later) is linked to a bolt that overlays a virtual grid on top of an area of interest.  Each emitted target tuple is mapped to a cell in the grid and the cell weight is decreased or increased when the target leaves or enters a cell.
Storm is a distributed system, that means that spouts and bolts can run on different physical nodes. The grid in the bolt is implemented as a map and since the bolt can to be distributed, then the map implementation has to be distributed too. Enter Hazelcast, it is a "clustering and highly scalable data distribution platform for Java". In addition to maps, a distributed queue implementation is also available.  The spouts (note the plural) use the latter to poll the offered targets and emit them to bolts (plural again :-) to update the density map grid. Here is the source code of the spout and the bolt.
The continuously running topology in this challenge is fronted with a web application container that "feeds" the spout and periodically accumulates the set of non-zero weighted cells from the grid for web client visualization.
The Spring web container is used in this implementation due to the Flex Integration and many many other reasons that I will not go into this post.
The Spring application context, defines:

  •  A bean component scanner
  •  A task scheduler
  •  A Hazelcast instance
  •  A flex message broker
  •  A flex message destination
  •  A Spring message template

Flight tracks are simulated by time stepping the parameters of cubic hermite spline that is bound by starting and ending locations and vectors. A scheduled bean iterates over the set of track parameters and offers to the Hazelcast queue the current position along that track at the current timestamp in the form of a Target instance. When the target reaches the end of the track, the latter is removed from the simulation set.  In addition, that scheduled bean, gets from the Hazelcast grid map the set of non-zero weighted cells and accumulates them in the form of a list that is sent back to the client using the injected message template thought the a message broker.
Onto the client. Upon startup completion, the application subscribes to the message destination and adds a message listener to handle the list of weighed cells. This list is bound to a DensityMapLayer instance that converts the cells in the list into gradient fillings on a bitmap. This enables a very fast and fluid rendering of a large set of data. The overlay of gradient fillings is what generates a heatmap. In addition, a UI Button is added to the stage enabling the user to define random starting and ending locations and vectors that are sent to the Spring container as seed to the Scheduled bean track/target simulator.
I know that this is very technical and I am assuming that you have some background in the above mentioned technologies - I just wanted to show how they can be integrated together seamlessly for BigData streaming - like usual, all the source code is available here.

Monday, September 24, 2012

Processing Big Data with Apache Hive and Esri ArcPy


Data Scientists, if you are processing and analyzing spatial data and are using Python, then ArcPy should be included in your arsenal of tools and ArcMap should be utilized for geo spatial data visualization.  Following the last post where I extended Apache Hive with spatial User Defined Functions (UDFs), in this post I will demonstrate the usage of the "extended" Hive within Python and how to save the output into a feature class for rendering within ArcMap or any ArcWeb client using ArcGIS Server.

Given a running Hadoop instance and assuming that you have installed Hive and have created a Hive table as described in the last post, start the Hive Thrift server as follows:

$ hive --service hiveserver

When ArcGIS for Desktop is installed on a host, Python is optionally installed and is enabled with GeoProcessing capabilities. Install Hive on your desktop and set the environment variable HIVE_HOME to the location where Hive is residing. To access the Hive python libraries, export the environment variable PYTHONPATH with its value set to $HIVE_HOME/lib/py.

With the setup behind us, let's tackle a simple use case; Given a polygon feature class on the desktop and a set of points stored in the Hadoop File System and are exposed through a Hive table, I want to perform a point in polygon operation on Hadoop and update the local feature class polygon attributes with the return results.

Here is the Python script:

import arcpy;
import sys

from arcpy import env

from hive_service import ThriftHive
from hive_service.ttypes import HiveServerException
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol

env.overwriteOutput = True

try:    
    transport = TSocket.TSocket('localhost', 10000)
    transport = TTransport.TBufferedTransport(transport)
    protocol = TBinaryProtocol.TBinaryProtocol(transport)
    client = ThriftHive.Client(protocol)
    transport.open()

    client.execute("add file countries.shp")
    client.execute("add file countries.shx")
    client.execute("add jar GeomX.jar")
    client.execute("create temporary function pip as 'com.esri.GenericUDFPip'")

    client.execute("""
    select t.p as fid,count(t.p) as val
    from (select pip(x,y,'./countries.shp') as p from cities) t
    where p!=-1 group by t.p
    """)
    rows = client.fetchAll()
    transport.close()
    
    keyval = dict()

    for row in rows:
        tokens = row.split()
        key = int(tokens[0])
        val = int(tokens[1])
        keyval[key] = val
    del row
    del rows

    rows = arcpy.UpdateCursor("countries.shp")
    for row in rows:
        key = row.FID
        if key in keyval:
            row.HADOOP = keyval[key]
            rows.updateRow(row)
    del row
    del rows

except Thrift.TException, tx:
    print '%s' % (tx.message)

The script imports the Thrift Hive client and the ArcPy library. It then connects to the Thrift Hive server on the localhost and executes a set of setup operations. The first two add the countries shapefile geometry and spatial index files into the distributed cache.  The next setup adds the jar file containing the spatial UDF functions. The last setup defines the pip function with a reference to the class in the loaded jar. The select statement is executed to retrieve the country identifier and the number of cities in that country based on a nest select who uses the pip function to identify which city point falls into which country polygon. An fid with a value of -1 is returned if a pip result is not found and is excluded from the final group count.  The fetchAll function returns a list of text items, where each text item is an fid value followed by a tab then a count value.  A dictionary is populated by tokenizing the list where the dictionary key is the fid and the value is the count.  An arcpy update cursor is opened on the local countries feature class and a row iterator is executed.  for each row, the FID value is retrieved and checked if it exists as a dictionary key. If found, the attribute HADOOP field is updated with the dictionary value.

Upon a successful execution (and remember, this might take a while as Hive is a batch process), open ArcMap, load that feature class and symbolize it with a class break qualifier based on the HADOOP field values.

Pretty cool, no?  This is a very very simple example of the marriage of a BigData tool and a GIS tool using Python.  There is so much more that can be done using this combination of tools in the same thought process. Expect more posts along the same vein with more arcpy usage. I just wanted to plant a small seed in your mind.

Update: This is another example that calculates the average lat/lon values of cities per country in Hive and the result set in used to create a point feature class:


import arcpy, sys, os

from arcpy import env

from hive_service import ThriftHive
from hive_service.ttypes import HiveServerException
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol

env.overwriteOutput = True

try:
  prjFile = os.path.join(arcpy.GetInstallInfo()["InstallDir"],
        r"Coordinate Systems\Geographic Coordinate Systems\World\WGS 1984.prj")
  spatialRef = arcpy.SpatialReference(prjFile)

  tempWS = "in_memory"
  tempFS = "XY_FeatureClass"

  arcpy.CreateFeatureclass_management(tempWS, tempFS , "POINT", "","","", spatialRef)

  tempFC = os.path.join(tempWS, tempFS)

  arcpy.AddField_management(tempFC, "country", "TEXT", 0, 0, 8)
  
  transport = TSocket.TSocket('10.128.249.8', 10000)
  transport = TTransport.TBufferedTransport(transport)
  protocol = TBinaryProtocol.TBinaryProtocol(transport)
  client = ThriftHive.Client(protocol)
  transport.open()

  client.execute("add jar /Users/mraad_admin/JavaWorkspace/GeomX/GeomX.jar")

  client.execute("""
    select country,avg(x),avg(y)
    from cities
    group by country
    """)
  rows = client.fetchAll()
  transport.close()
  
  inCur = arcpy.InsertCursor(tempFC)
  for row in rows:
    tokens = row.split()

    country = tokens[0]
    avgx = float(tokens[1])
    avgy = float(tokens[2])

    feat = inCur.newRow()
    feat.Shape = arcpy.Point(avgx, avgy)
    feat.country = country
    inCur.insertRow(feat)

  del inCur, rows, row

  arcpy.CopyFeatures_management(tempFC, r"Z:\Sites\XYpoints")
  
except Thrift.TException, tx:
  print '%s' % (tx.message)


Monday, September 17, 2012

Big Data, Spatial Hive, Sequence Files


Following the last post, where we used Pig to analyze data stored in HDFS, in this post we will be using Hive and spatially enabling it for geo analysis. Hive enable you to write SQL like statements in a language called HiveQL that Hive converts to a MapReduce job that is submitted to Hadoop for execution. Again, if you know SQL, then learning HiveQL is very easy and intuitive.  Hive is not intended for OLTP and real-time analysis. Like Pig, Hive is extensible via User Defined Functions (UDFs), so we will be using almost the same functions as in the previous post to find things that are near and/or within some criteria.

There are several ways to store data in HDFS, one of them is in the SequenceFile format. This is a key/value binary format with compression capabilities. For this post, I will be transforming an input into a well know binary format to be stored onto HDFS for later query and analysis.

An object that required persistence onto a SequenceFile has to implement the Writable interface. So, here we go, since we deal with spatial features, let's declare a Feature class that implements the Writable interface:


public class Feature implements Writable
{
    public IGeometry geometry;
    public ISymbol symbol = NoopSymbol.NOOP;

    public Feature()
    {
    }

    public void write(final DataOutput dataOutput) throws IOException
    {
        geometry.write(dataOutput);
        symbol.write(dataOutput);
    }

    public void readFields(final DataInput dataInput) throws IOException
    {
        geometry.readFields(dataInput);
        symbol.readFields(dataInput);
    }
}

A Feature has a geometry and a symbol. A geometry is also Writable:


public interface IGeometry extends Writable
{
}

An implementation of the geometry interface is a two dimensional MapPoint:


public class MapPoint implements IGeometry
{
    public double x;
    public double y;

    public MapPoint()
    {
    }

    public void write(final DataOutput dataOutput) throws IOException
    {
        dataOutput.writeDouble(x);
        dataOutput.writeDouble(y);
    }

    public void readFields(final DataInput dataInput) throws IOException
    {
        x = dataInput.readDouble();
        y = dataInput.readDouble();
    }
}

For now, feature will have an no operation (noop) symbol associated with them:


public class NoopSymbol implements ISymbol
{
    public final static ISymbol NOOP = new NoopSymbol();

    public NoopSymbol()
    {
    }

    public void write(final DataOutput dataOutput) throws IOException
    {
    }

    public void readFields(final DataInput dataInput) throws IOException
    {
    }
}

The input source that I wanted to test in my ETL is a set of cities (cities1000) in TSV format downloaded from geonames. The Writable object to read and write from a SequenceFile is a City class that extends a Feature and is augmented with attributes.


public class City extends Feature
{
    public int cityId;
    public String name;
    public String country;
    public int population;
    public int elevation;
    public String timeZone;

    @Override
    public void write(final DataOutput dataOutput) throws IOException
    {
        super.write(dataOutput);
        dataOutput.writeInt(cityId);
        dataOutput.writeUTF(name);
        dataOutput.writeUTF(country);
        dataOutput.writeInt(population);
        dataOutput.writeInt(elevation);
        dataOutput.writeUTF(timeZone);
    }

    @Override
    public void readFields(final DataInput dataInput) throws IOException
    {
        geometry = MapPoint.newMapPoint(dataInput);
        symbol = NoopSymbol.NOOP;
        cityId = dataInput.readInt();
        name = dataInput.readUTF();
        country = dataInput.readUTF();
        population = dataInput.readInt();
        elevation = dataInput.readInt();
        timeZone = dataInput.readUTF();
    }
}

Using Hadoop's command line interface, I prepared a working directory to load the cities into HDFS:


$ hadoop fs -mkdir cities

Next, I wrote and executed a Java program using the opencsv library to extract, transform and load the TSV into SequenceFile records onto HDFS.

I highly recommend that you read Hadoop In Action. It has a nice introduction to installing and running Hive. Remember, Hive operates on SQL-like statements, so to operate on the loaded City data, we create a table that maps to the City object. From the Hive command line interface, we execute the following command:


hive> create external table cities(
 x double,
 y double,
 cityid int,
 name  string,
 country string,
 population int,
 elevation int,
 timezone string
 ) row format serde 'com.esri.CitySerDe'
 stored as sequencefile location '/user/mraad_admin/cities';

If you know SQL, this should be familiar. But note the last two lines;  It instructs Hive to read the data in a SequenceFile format from an HDFS location that we previously prepared and since the data is in a binary format, each row is serialized and deserialized using a helper SerDe class.
The CitySerDe class knows how to serialize and deserialize a writable object from the input and output stream into and from a concrete City class instance. In addition, it provides column metadata such as the column name and the type to Hive. The SerDe is compiled and packaged into a jar that is added to the hive runtime for usage:


hive> add jar /Users/mraad_admin/JavaWorkspace/GeomX/GeomX.jar;

Added /Users/mraad_admin/JavaWorkspace/GeomX/GeomX.jar to class path
Added resource: /Users/mraad_admin/JavaWorkspace/GeomX/GeomX.jar

hive> show tables;
OK
cities
Time taken: 3.98 seconds

hive> describe cities;
OK
x double from deserializer
y double from deserializer
cityid int from deserializer
name string from deserializer
country string from deserializer
population int from deserializer
elevation int from deserializer
timezone string from deserializer
Time taken: 0.434 seconds

hive> select * from cities limit 5;
OK
1.49129 42.46372 3039163 Sant Julia de Loria AD 8022 0 Europe/Andorra
1.73361 42.54277 3039604 Pas de la Casa AD 2363 2050 Europe/Andorra
1.53319 42.55623 3039678 Ordino AD 3066 0 Europe/Andorra
1.53414 42.50729 3040051 les Escaldes AD 15853 0 Europe/Andorra
1.51483 42.54499 3040132 la Massana AD 7211 0 Europe/Andorra
Time taken: 0.108 seconds

Like I said, If you know SQL, you can find the top 5 countries with the most cities by issuing the following statement - no need to write MR jobs:


hive> select country,count(country) as c from cities group by country order by c desc limit 5;

Onto spatial.  Hive is extensible via User Defined Functions (UDFs). So I wanted to find all the cities that are near a specific location, I extend hive with a 'near' function that was packaged in the added jar and defined it as follows:


hive> create temporary function near as 'com.esri.UDFNear';

I can now use the 'near' function to locate cities within 5 miles of a specific location:


hive> select name from cities where near(x,y,-84.20299,39.43534,5);

The UDFNear function extends the UDF class and implements in this case the Haversine distance calculation between two geographical locations.


public class UDFNear extends UDF
{
    private final static BooleanWritable TRUE = new BooleanWritable(true);
    private final static BooleanWritable FALSE = new BooleanWritable(false);

    public BooleanWritable evaluate(
            DoubleWritable x1, DoubleWritable y1,
            DoubleWritable x2, DoubleWritable y2,
            DoubleWritable distance
    )
    {
        return HaversineMiles.distance(y1.get(), x1.get(), y2.get(), x1.get()) < distance.get() ? TRUE : FALSE;
    }

    public boolean evaluate(
            double x1, double y1,
            double x2, double y2,
            double distance
    )
    {
        return HaversineMiles.distance(y1, x1, y2, x2) < distance;
    }
}

Let's assume that the field 'country' was not defined in the table, and I want to find the count of cities per country where I only have the spatial boundaries of the countries.   This is a perfect spatial join and where UDF, DistributedCache, and spatial libraries like JTS and geotools come to the rescue.

I extended the GenericUDF class with a GenericUDFPip class that performs a 'naive' point-in-polygon (pip) based on geometries loaded into the distributed cache.  This enabled me to write a spatial query as follows:


hive> add file borders.shp;
hive> add file borders.shx;
hive> create temporary function pip as 'com.esri.GenericUDFPip';
hive> select t.p,count(t.p) from (select pip(x,y,'./borders.shp') as p from cities) t where t.p != -1 group by t.p;

The first two lines load into the distributed cache the countries borders shape file and its spatial index - these will be used by the pip function first time through to create an in-memory spatial index for fast searching. The pip function is defined as a class in the previously added jar file. The pip function expects 3 arguments; the first is a longitude, the second is a latitude and the third is the shape file location in the distributed cache. Based on these arguments, it will return the country border record identifier where the longitude and latitude arguments fall into or a -1 if there is no intersection.  For the query, a nested table is first created based on the pip result, which is then grouped and aggregated based on a non-negative border identifier.
Pretty cool - no ? So imagine what else you could do with HQL and these libraries...Say find me the top 10 cities with the most surrounding cities in a 25 mile radius (exercise for the reader. Hint, use join and look at the source code for UDFDist :-)

The fun is not about to stop. Since this is SQL-Like, Hive comes with a JDBC driver. Using my favorite Java framework, Spring-Hadoop integrates with Hive to become a JDBC client.

First make sure to start Hive as a service:


$ hive --service hiverserver

Next, define a Spring application context as follows:



   
          destroy-method="close"
          p:driverClassName="org.apache.hadoop.hive.jdbc.HiveDriver"
          p:url="jdbc:hive://localhost:10000/default"
          p:connectionInitSqls-ref="initSqls">
   

   
        add jar /Users/mraad_admin/JavaWorkspace/GeomX/GeomX.jar
        create temporary function near as 'com.esri.UDFNear'
        create temporary function dist as 'com.esri.UDFDist'
   

   
          c:dataSource-ref="hive-ds"/>


A Hive data source bean is defined using the Apache commons database connection pool library. The data source driver class property is set to the Hive JDBC driver and a set of SQL statements are executed upon start up.  These statements add the jar containing the UDF classes to the distributed cache and declares a reference to the 'near' and 'dist' UDFs.  Finally, a JDBC Spring template is defined with a reference to the aforementioned data source. This template will be injected into a service bean to enable SQL query execution and row mapping.

The see physically the result of the query on a world map, the Flex API for ArcGIS Server is utilized. Bridging the server side and the client side is the Spring Flex Integration project. This enables a Flex client application to execute functions on Spring based Remote Objects.


@Service("hiveService")
@RemotingDestination
public class HiveService
{
    @Autowired
    public JdbcTemplate jdbcTemplate;

    public FeatureSet query(final String where)
    {
        final List list = jdbcTemplate.query("SELECT X,Y,NAME FROM CITIES WHERE " + where, new RowMapper()
        {
            public Feature mapRow(final ResultSet resultSet, final int i) throws SQLException
            {
                final double x = WebMercator.longitudeToX(resultSet.getDouble(1));
                final double y = WebMercator.latitudeToY(resultSet.getDouble(2));
                final MapPoint mapPoint = new MapPoint(x, y);

                final String name = resultSet.getString(3);
                final ASObject attributes = new ASObject();
                attributes.put("name", name);

                return new Feature(mapPoint, attributes);
            }
        });
        final Feature[] features = new Feature[list.size()];
        list.toArray(features);
        return new FeatureSet(features);
    }
}

I've talked extensively in my previous posts about the beauty of the no-impedance mismatch between the server side and client side transfer objects such as in the case of FeatureSet, MapPoint and Feature instances. This HiveService is injected with the Spring defined JDBC template and exposes a 'query' function that expects a 'where' clause string. Upon a successful execution, each result set is transformed into a Feature instance that is appended to a list who this transferred back to the client in a FeatureSet instance.

Onto the client. This is a simple Flex implementation where the map and a data grid are stacked on top of each other. A user can enter a where clause that is sent to the server 'query' function using the RemoteObject capabilities.  Upon success execution, a FeatureSet is retuned and the features are rendered on the map in a GraphicLayer instance and the same features are rendered in a DataGrid instance as data rows. A user can click on a row in the data grid to highlight the feature on the map. Vice versa, a user can click on a feature on the map to highlight a row in the data grid.

I know that this is a lot of information.  Thanks for reading it through. Like usual you can find all the source code from here.