Monday, May 20, 2013

Export FeatureClass to Hadoop, Run MapReduce, Visualize in ArcMap

In the previous post we launched a CDH cluster on EC2 in under 5 minutes. In this post, we will use that cluster to perform geo spatial analytics in the form of MapReduce and visualize the result in ArcMap. See, ArcMap is one of the desktop tools that a GeoData Scientist will use when working and visualizing spatial data.  The use case in my mind is something like the following:  Point data is streamed through, for example GeoEventProcessor into Amazon S3. The user has a set polygons in ArcGIS that needs to be spatially joined with that big data point content. The result of the big data join is linked back to the polygon set for symbol classification and visualization.

After editing the polygons in ArcMap, the user exports the feature class into HDFS using the ExportToHDFSTool.

Using the new Esri Geometry API for Java, a MapReduceJob is written as a GeoProcessing extension, in such that it can be directly executed from within ArcMap. The result of the job is covered directly back into a feature class.

The tool expects the following parameters:

  • A Hadoop configuration in the form of a properties file.
  • A user name that Hadoop will use as credentials and its privileges when executing the job.
  • The big data input to use as the source - in the above use case that will be the S3 data.
  • The small data polygon set to spatially join and aggregate.
  • The remote output folder - where the MapReduce job will put its results.
  • A list of jar files that will be used by the DistributedCache to augment the MapReduce classpath. That is because the Esri Geometry API for Java jar is not part of the Hadoop distribution, and we use the distributed cache mechanism to "push" it to each node.
  • The output feature class to create when returning back the MapReduce job result from HDFS.
This code borrows resources from Spatial Framework for Hadoop for the JSON serialization.

For the ArcGIS Java GeoProcessing Extension developers out there - I would like to point you to a couple tricks that will make your development a bit easier and hopefully will make you bang your head a little bit less when dealing with ArcObjects - LOL !

I strongly recommend that you use Apache Maven as your building process. In addition to a good code structure and unified repository, it comes with a great set of plugins to assist in the deployment process.  The first plugin is the maven-dependency-plugin. It copies all the runtime dependencies into a specified folder.  I learned the hard that when ArcMap starts up, it introspects all the classes in all the jars in the extension folder for the @ArcGISExtension annotation declaration.  Now this is fine if you are writing a nice "hello world" GP extension, but in our case, where we are depending on 30+ jars, well….ArcMap will never starts. So, the solution is to put all the jars in a separate subfolder and declare a Class-Path entry in the main jar manifest file that references all the dependencies. This is where the second plugin comes in to the rescue. The maven-jar-plugin can be configured to automatically generate a manifest file containing a class path entry that references all the dependencies declares in the maven pom.

Talk about classpath. So you think that all is well and good by using the above setup. There is one more thing that you have to do to force the correct classpath when executing the GP task and that is to set the current thread context class loader to the system class loader as follows:

Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());

It took me 3 weeks to figure this out - so hopefully somebody will find this useful.

Like usual, all the source code can be found here.

Saturday, May 18, 2013

BigData: Launch CDH on EC2 from ArcMap in under 5 minutes

well....after you get all the necessary software, certificates and... setup everything correctly :-)

Update: Regarding the above comment - you can download a zip file containing all the necessary jars and the toolbox so like that you do not have to package the project from scratch.

The idea here is that I would like an ArcGIS user to just push a button from within ArcMap and have a Cloudera based Hadoop cluster started on Amazon EC2. From there on, a user can edit features in ArcMap that can be exported into that cluster to be used as an input to a MapReduce job. The output of the MapReduce job is imported back into ArcMap for further analysis. This combination of SmallData (GeoDatabase) and BigData (Hadoop) is a great fusion in a geo-data scientist arsenal. When done with the analysis, the user again will push a button and destroys the cluster, thus paying for what he/she used while having access to elastic resources.

The following is a sequence of prerequisite steps that you need to execute to get going:

Create an Amazon Web Service account and setup AWS security credentials. You need to remember your 'Access Key ID' and 'Secret Access Key'. They are ugly long strings that we will need later to create our Hadoop cluster in EC2.

Download the PuTTY Windows installer from http://www.chiark.greenend.org.uk/~sgtatham/putty/download.html and run it. This will add two additional programs to your system; putty.exe and puttygen.exe. Will use puttygen.exe to generate a OpenSSH formatted public and private key to enable a secure tunneled communication with the soon to be created AWS instances.

Here are the steps to create the keys:

  • Launch PuTTYGen.
  • Set the number of bits in generated keys to 2048.
  • Click the 'Generate' button and move your mouse randomly in the 'blank' area.
  • To save your private key, click the 'Conversions' menu option and select the 'Export OpenSSH key' menu item. An alert will be displayed, warning you that you are exporting this key without a passphrase. Press 'Yes' to save it
  • To save your public key, select and copy the content of the text area. Open notepad, paste the content and save the file.
  • Finally, save this key in a PuTTY format by clicking on the 'Save private key' button.

Run as administrator the 'JavaConfigTool' located in C:\Program Files (x86)\ArcGIS\Desktop10.1\bin and adjust the Minimum heap size to 512, Maximum heap size to 999, and the Thread stack size to 512.

The deployment of the Cloudera Distribution of Hadoop (CDH) on Amazon EC2 is executed by Apache Whirr. You can find great documentation here on how to run it from the command line. However in my case, I wanted to execute the process from within ArcMap. So, I wrote a GeoProcessing extension that you can add to ArcCatalog enabling the invocation of the deployment tool from within ArcMap.

Whirr depends on a set of configurations to launch the cluster. The values of some of the parameters depend on the previous two steps. Here is a snippet of my 'cluster.properties' file.

whirr.provider=aws-ec2
whirr.identity=my-aws-access-key-id
whirr.credential=my-aws-secret-access-key
whirr.cluster-name=hadoopcluster
whirr.instance-templates=1 hadoop-jobtracker+hadoop-namenode,3 hadoop-datanode+hadoop-tasktracker
whirr.public-key-file=/Users/mraad/mypublickey
whirr.private-key-file=/Users/mraad/myprivatekey
whirr.env.repo=cdh4
whirr.hadoop.install-function=install_cdh_hadoop
whirr.hadoop-configure-function=configure_cdh_hadoop
whirr.hardware-id=m1.large
whirr.image-id=us-east-1/ami-ccb35ea5
whirr.location-id=us-east-1

Please follow the instructions on Github to install the WhirrToolbox in ArcMap.

To launch the cluster, start ArcMap, show the catalog window, expand the 'WhirrToolbox' and double click the 'LaunchClusterTool'. Select a cluster configuration file and click the 'Ok' button.

Now, this will take a little bit less that 5 minutes.

 To see the Whirr progress, disable the background processing in the GeoProcessing Options by unchecking the 'Enable' checkbox:

Once the cluster is launched successfully, Whirr will create a folder named '.whirr' in your home folder. In addition, underneath that folder, a subfolder will be created and will be named after the value of 'whirr.cluster-name' property in the configuration file.

The file 'hadoop-site.xml' contains the Hadoop cluster properties.

I've written yet another GP tool (ClusterPropertiesTool) that will convert this xml file in a properties file. This properties file will be used in the subsequent tools when exporting, importing feature classes from and to the cluster and running map reduce jobs.

Now, all the communication with the cluster is secure and should be tunneled through an ssh connection. To test this, we will use PuTTY. But before starting PuTTY, we need the name node IP. This can be found in the 'instance' file in the '.whirr' subfolder. Make sure to get the first IP in the 'namenode' row, not the one that starts with '10.xxx'.

Start PuTTY. Put in the 'Host Name (or IP Address)' field the value of the name node IP.

In the Connection / SSH / Auth form, browse for the PuTTY formatted private key that we save previously.

 In The Connection / SSH / Tunnels, select the 'Dynamic' radio button and enter '6666' in the 'Source port' field and click the 'Add' Button.

Click the 'Open' button, and that should connect you to your cluster.

Enter your username (this is your Windows login username) and hit enter, and you should see a welcome banner and a bash shell prompt.

Once you are done with the cluster, the DestroyClusterTool GP tool will destroy the cluster.

In the next post, we will use that cluster to export, import feature classes and run map reduce jobs.

Like usual, all the source code can be found here.

Wednesday, April 24, 2013

BigData: Terracotta BigMemory and ArcGIS Webmaps

I was asked to put a Proof of Concept implementation of a very fast interactive dynamic density map generation on 11 million records for a webmap application, where a user can specify dynamically a query definition (a where clause in SQL terms), a ramp color and the service implementation will return back a density representation of the records on a map.

This is typically done via a GeoProcessing task where the data is queried and stored into an intermediate FeatureClass that is further processed by a Kernel Density that produced a raster layer that is finally visualized. As you can tell, this is not interactive nor relatively fast.

Since the traditional means of retrieving the data from a relational database is not fast enough and 11 million records is not such a big set after all, I decided to put the whole thing in memory. BTW, this is a meme that has been trending for while now, and the most vocal about it is SAP HANA.
I decided to use Terracotta's BigMemory to hold the data in the "off-heap" and use its EHCache query capability to aggregate and fetch the data.  Now despite the name, I used the cache "eternal" capabilities to forever hold the data elements.

I was given the data in CSV format, so I wrote a simple CSV reader/tokenizer that bulk loads the data into BigMemory using multiple threads. The inter thread communication was handled by the LMAX disruptor, letting me focus on the loading, parsing and putting into BigMemory.

To access this BigMemory from a webmap as a layer, I decided to write a middleware that implements the ArcGIS Rest JSON interface for an ArcGISDynamicLayer. Now, I did not write the whole interface, just the layer metadata and the export image endpoints, in such that you can access the layer as:

http://host/arcgis/rest/services/Service/MapServer

The implementation of the Rest endpoint was easily achievable using the Spring MVC framework which enables me to export POJO methods as ArcGIS Rest endpoints:
The final step is the convert the aggregated features from BigMemory into an image that is returned as a response to an export Rest request. This reminded me of the days when I implemented the first version of ArcIMS in Java back in 1996/7, where I used AWT for exactly that same purpose.  What is old is new again, and used the same technique to create an in memory image buffer and used the Graphics2D API to draw and color code the aggregated cells. I used the ImageIO to convert the image buffer into a PNG output stream.  Now ImageIO is new and if this PoC becomes reality, I will use JAI with its GPU enhancements to do the same task.

To run this PoC, I started 2 extra large Amazon EC2 instances to run BigMemory with 15GB of off heap RAM (just because I can :-) and started a medium instance to run tomcat with the Spring MVC Rest interface that communicated with two large instances.

The PoC was very well received and extended it to make the layer time aware enabling the client to slide though time aware features.

Like usual, you can download all the source code from here, here and here.

Wednesday, April 3, 2013

BigData: DataRush Workflow in ArcMap


At this year's DevSummit, we announced the GIS Tools for Hadoop project. Included in that project is a low level geometry java API which enables spatial operations in MapReduce jobs or the construction of higher level functions such as Hive User Defined Functions. However, this geometry library is not restricted to Hadoop MapReduce. It is used in Geo Event Processor, and can be used in Storm bolts or other parallel workflows. One such parallel workflow processing engine is Pervasive DataRush that I demoed at the DevSummit. Using the KNIME visual workflow, I was able to process 90 million records (BTW, small in the BigData world) in Hadoop File System, for heatmap visualization in ArcMap.


A DataRush workflow engine can run on a local machine or remotely on a cluster of machines and is fully extensible with custom operators.  An operator is a node in a workflow graph whose input and output can be linked to other operators. So, I wrote my own spatial operators that utilizes the geometry API.

The first operator is a simple spatial filter that passes through records that contain LAT/LON fields and are within a user defined distance from a given point. This uses the "GeometryEngine.geodesicDistanceOnWGS84" function and augments the input record fields with a distance "METERS" output field.

In the above application, a graph workflow reads a tab delimited file and sends its output to the new distance operator who sends its output to the a console log operator. Simple and easy, but not simplistic - if you notice that when I execute the application without any arguments, the graph runs on the local machine taking full advantage of all the local processors. However, if I specify a dummy argument, the graph takes its input from HDFS and executes remotely on a DataRush cluster. This simple toggle is pretty impressive when switching from development to production mode.

Next is to add the operator to the KNIME visual workflow.
The Pervasive folks made that really easy using a plugin for the Eclipse environment.

Using the Node Extension wizard, you can select an operator and it generates template code for the operator xml descriptor, the dialog pane, the model factory and the node settings.

This gets you 80% to 90% there. The rest is completing the XML node descriptor and laying out a nice dialog pane.


The final step is to export the compiled code into KNIME as a plugin - BTW, KNIME is based on Eclipse.  Within KNIME, you are in what I call a PHD (Push Here Dummy) mode, where you can drag and drop and link operators on the workbench.

A slightly more complicated operator, is a composite operator that, as its name suggests is composed of two or more operators, but the whole is exposed as one operator.
So, the next spatial operator extension to write is a composite one to perform a spatial join between a point input source and a polygon input source where the output is a flow indicating what polygon an input point is contained into.
The Spatial Join composite operator contains a non parallelizable operator that blocks to read first time through into an in memory model the polygons.  The polygons input format is in the Esri JSON text format and we use the "GeometryEngine.jsonToGeometry" to convert it into a Polygon POJO. The second embedded operator is parallel enabled and walks the input points and uses the builtin in memory model of polygons to find the containing one using the "GeometryEngine.contains" function.  This "build and walk" pattern is used whenever a join or lookup needs to be performed.

The above graph adds one extra built-in operator (group by) where the output is the number of points that each input polygon contains.  The result of such a flow, can be used to thematically color code the polygons in ArcMap.

Displaying thousands or millions of points on a map is, well....not very smart and does not make sense. But if all these points are binned and each bin is weighted proportionally to the points in that bin, then a map displaying the color coded bins with respect to their weight is much more appealing and conveys much more information, such as in the above map, where I was rendering a heatmap of locations with businesses with more than 100,000 employees.

So I wrote a terminal operator, the raster operator, that writes bin information as an Esri raster file in either float or ASCII format. The interesting thing about that operator is that it is a "fan in" operator and despite being part of a graph that is executed on a remote cluster, the raster operator is executed on the client machine. The client being the machine that submitted the graph to the cluster.

One last thing. A KNIME workflow can be exported into an external representation that can be imported into a logical graph for local or remote execution.

This means, I can turn that code into an ArcMap GeoProcessing extension enabling me to invoke a DataRush workflow as a Tool and combine it with other existing tools for further analysis and visualization.

In my workflow example, rather than me manually calling the Float To Raster GP Task after Executing the DataRush Tool, I can combine that into one ArcPy Script Tool:

So to recap, you can store data into HDFS, or use the new GeoProcessing tools for Hadoop. Compose DataRush workflows with spatial operators that you export to be executed on a cluster from ArcMap, whose result is consumed back by ArcMap for further local GeoProcessing or visualization.

This is very cool!

Like usual, all the source code can be downloaded from here and here.

Monday, March 4, 2013

BigData meet Raspberry Pi

The Raspberry Pi is a very neat little machine - I bootstrapped it with Soft-Float Debian "Wheezy" so I can run Java Embedded on it - I download and install ejre-7u10-fcs-b18-linux-arm-vfp-client_headless-28_nov_2012.tar, and now can run java programs on the Pi. Wrote the ubiquitous "Hello World" to make sure it works (I should not have doubted the write once run anywhere motto) and jumped directly to running a Spring container with beans. So, the next thing to do is to run the Hadoop client from my previous post on it.  Now remember, there is not a lot of RAM on the Model B Rev 2 Pi (512MB) and the post-action runs on the client machine, so I reduced the cell size and the search radius, and....it worked like a charm. Now, why did I do that ? because... I can :-)
Next, is to come up with something to do with the generated data.

BigData: Kernel Density Analysis on Hadoop MapReduce

The storage of BigData has been democratized by Hadoop. And with that, rather than bringing the data to the program for analysis, we send the program to the data. That twist comes with a challenge to take existing serial algorithms and parallelizing them. This post is about such a process. I want to perform a Kernel Density analysis on billions of records loaded into HDFS. The data is fairly simple; a set of records separated by carriage return, and the fields in each record are tab separated. The fields' values contain a latitude, a longitude and a weight.
If you do not know how Kernel Density works, check out this ArcGIS reference.
To take advantage of the distributed HDFS datanodes, I have to transform the traditional state-full sequential implementation into a model that supports stateless parallelism. MapReduce is such a model, and Hadoop can distribute a MapReduce implementation to read data from the "local" HDFS datanodes and reduces back the result into HDFS.
A Kernel Density analysis expects 3 parameters when processing spatial data; an output extent, a cell size and a search radius. From the above reference description, a input value influences the cells in its search area, and the level of influence is proportional to the input weight and inversely proportional to the distance of the cell from the input. That inverse proportionality is computed using a Kernel function.
So, in the MapReduce programming paradigm, a map function expects a key(k1),value(v1) tuple and emits zero or more new key(k2),value(v2) tuples.

map(k1,v1) -> List(k2,v2)

The map portion of the parallel Kernel Density analysis implementation will emit all the influenced cells and their associated weights for a given input. A cell is defined as a string concatenating its row and column values with a slash character.
After a shuffle and sort intermediate step, the reduce function expects a key(k2),List(v2) tuple and emits zero or more new key(k3),value(v3) tuples.

reduce(k2,list(v2)) -> List(k3,v3)

The implementation will emit the sum of all the weights of a cell
To execute the kernel density map and reduce functions, I decided to use the newly released Spring Data - Apache Hadoop project. As you know, I love the Spring Framework and have been tracking the Spring Data Hadoop project for a while now and glad that it is finally released.
The following Spring application context bootstraps an application and runs the defined job. The job declaratively reference a mapper class, a reducer class and a combiner class. In addition, the job runner defines a pre-action and a post-action. The pre-action tests the existence of the HDFS output path and removes it if it exits. The post-action references a class that converts the content of the output path into a local float raster file. You can use the Float to Raster conversion tool in ArcMap to visualize the kernel density output. And like usual, all the source code can be found here.

Sunday, November 25, 2012

BigData: HDFS FeatureClass ETL and MapReduce GPTool


This post is dedicated to my Esri colleagues Ajit D. and Philip H. for their invaluable help.

This is work in progress - but I've put a good dent in it that I would like to share it with you.  In this post, we will go through a complete cycle, where from ArcMap, we will:

  • Export a FeatureClass to an HDFS folder
  • Register that folder as a Hive table
  • Run command line Hive queries
  • Execute Hive queries from ArcPy and show the results in ArcMap
  • Execute a MapReduce Job as a GP Tool
  • Import an HDFS Folder (result of MapReduce Job) as a FeatureClass

This post brings everything that I have been blogging about so far into a nice story, so here we go:

BTW - I am assuming that you have a Hadoop instance running somewhere and are familiar with ArcMap. You can download a Hadoop demo VM for local testing.

Download the ArcMap extension in this file and unzip its content into your ArcGIS\Desktop10.1\java\lib\ext folder - The jars have to be children of the ext folder.

Make sure to adjust the ArcGIS JVM using the JavaConfigTool.exe location in ArcGIS\Desktop10.1\bin:

Start ArcMap, create a new toolbox and add to it the Hadoop Tools - Check out this help for detailed information on managing toolboxes:

Add the world cities to ArcMap:

Let's export the world cities to HDFS:

This tool iterates over the input FeatureClass features and stores each feature in the specified HDFS output path. The output path content is text formatted and each feature is stored as a line in an Esri JSON text representation followed by a carriage return.
This enables us to continuously add new records from for example a streaming process such as Esri GeoEvent Server.

The metadata for that HDFS based FeatureClass is stored in an HDFS based 'metastore' for other processes to inspect - A better place would have been ZooKeeper - but that is a post for another day.

Here is a sample metadata:

{
"wkid": 4326,
"geometryType": 1,
"fields": [{
"name": "ObjectID",
"alias": "ObjectID",
"type": 6,
"length": 4
}, {
"name": "Shape",
"alias": "Shape",
"type": 7,
"length": 0
}, {
"name": "CITY_NAME",
"alias": "CITY_NAME",
"type": 4,
"length": 30
}, {
...
}, {
"name": "LABEL_FLAG",
"alias": "LABEL_FLAG",
"type": 1,
"length": 4
}]
}

The metastore contains a set of files where by convention the file name is the imported FeatureClass name followed by ".json". For example:

$ hadoop fs -cat hdfs://localhadoop:9000/user/mraad/metastore/worldcities.json

The GP import tool adds one more file to the metastore, a Hive script that you can execute from the Hive command line to create an external table referencing the HDFS FeatureClass. Again, by convention the script name is the name of the imported FeatureClass followed by ".hql". For example:

$ hadoop fs -cat hdfs://localhadoop:9000/user/mraad/metastore/worldcities.hql

You can "cat" the content of the script and you will notice the usage of the collection data type for the feature geometry and attribute representation. In addition, the serialization and deserialization (SerDe) from JSON is based on a Cloudera library found in the article 'Analyzing Twitter Data Using CDH'. You can download the jar from here.


ADD JAR hive-serdes-1.0-SNAPSHOT.jar;
CREATE EXTERNAL TABLE IF NOT EXISTS worldcities (
geometry STRUCT <x:DOUBLE,y:DOUBLE,spatialReference:STRUCT <wkid:INT>>,
attributes STRUCT <
CITY_NAME:STRING,
GMI_ADMIN:STRING,
ADMIN_NAME:STRING,
FIPS_CNTRY:STRING,
CNTRY_NAME:STRING,
STATUS:STRING,
POP_RANK:INT,
POP_CLASS:STRING,
PORT_ID:INT,
LABEL_FLAG:INT
>) ROW FORMAT SERDE 'com.cloudera.hive.serde.JSONSerDe'
LOCATION 'hdfs://localhadoop:9000/user/mraad_admin/worldcities'
TBLPROPERTIES ('wkid'='4326','type'='point');


Please note how tables can have properties - is this case, I added the wkid and geometry type.

Upon the execution of the above commands, you can now query the table. Here are some sample queries:

hive> select * from worldcities limit 10;
hive> select attributes.city_name,geometry.x,geometry.y from world cities where attributes.cntry_name='Lebanon';

Hive can be accessed using ArcPy through the thrift protocol - Here is a Toolbox that enables the user to draw a polygon as input and invoke Hive spatial UDF constraining the resulting FeatureSet to the world cities within the drawn polygon. Download the UDF jar from here, and place it and the hive-serde jar in the same location where you will start the hive server as follows:

$> hive --service hiveserver

Next, I wanted to demo the capability to run a MapReduce Job as a GP Tool.

Quick MapReduce recap, For the unix geeks:

$> cat input.txt | map | sort | reduce > output.txt

And for the "acedemics":

map(K1,V1) emit list(K2,V2)
shuffle/sort K2
reduce(K2,list(V2)) emit list(K3,V3)

This is fairly low level and requires explicit writing of the Mapper and Reducer Java classes.  This is not for your average GIS user.  But I can see a time when advanced users will write parameter driven MapReduce tools and share them with the community. This is all based on 'How to build custom geoprocessing tools'.

This simple MR tool takes as input the world cities HDFS FeatureClass and finds the "centroids" by country of all the cities with a specific population rank.

BTW, this can easily be written in HQL as follows:

select attributes.cntry_name as name,
avg(geometry.x) as x,
avg(geometry.y) as y,
count(attributes.cntry_name) as cnt
from worldcities
where attributes.pop_rank < 6
group by attributes.cntry_name
having cnt > 10;

The JobRunnerTool accepts as input:

  • A set of Hadoop properties
  • HDFS FeatureClass input path
  • HDFS FeatureClass output path
  • Metastore location
The mapper converts a JSON formatter input text line (V1) into a PointFeature and will emit its point geometry (V2) if it meets a filter criteria - in this case, a population rank that is less than a user defined value. The mapper output key (K2) is the country name. BTW, K1 is the line number.

The suffle/sort portion will ensure that each reducer will receive a country name (K2) as an input key and a list of geometry points (V2) as input values.

The reducer averages the coordinates, creates a PointFeature whose geometry is a point with values based on the averaged calculations.  The attributes will include the country name and the number of points used in the averaging. The reducer key output (K3) will be the JSON formatted text representation of the PointFeature and the output value (V3) will be NULL. thus producing an HDFS FeatureClass with its metadata in the metastore for further processing and inspection.

Lastly, we close the cycle by importing the HDFS Feature class.

The tool accepts as input an HDFS FeatureClass, its metadata and an output location. When executed within ArcMap, the output is automatically added to the display.

Things to work on next:
  • UDF that accepts as input an ArcMap generated FeatureSet into the DistributedCache - I already blogged about this, but as standalone.
  • MapReduceTool that accepts external jar containing mapper/reducer classes - I think this will pave the way for the advanced users.
Stay tuned for more things to come. And like usual, all the source code can be downloaded from here.