I was asked to put a Proof of Concept implementation of a very fast interactive dynamic density map generation on 11 million records for a webmap application, where a user can specify dynamically a query definition (a where clause in SQL terms), a ramp color and the service implementation will return back a density representation of the records on a map.
This is typically done via a GeoProcessing task where the data is queried and stored into an intermediate FeatureClass that is further processed by a Kernel Density that produced a raster layer that is finally visualized. As you can tell, this is not interactive nor relatively fast.
Since the traditional means of retrieving the data from a relational database is not fast enough and 11 million records is not such a big set after all, I decided to put the whole thing in memory. BTW, this is a meme that has been trending for while now, and the most vocal about it is SAP HANA.
I decided to use Terracotta's BigMemory to hold the data in the "off-heap" and use its EHCache query capability to aggregate and fetch the data. Now despite the name, I used the cache "eternal" capabilities to forever hold the data elements.
I was given the data in CSV format, so I wrote a simple CSV reader/tokenizer that bulk loads the data into BigMemory using multiple threads. The inter thread communication was handled by the LMAX disruptor, letting me focus on the loading, parsing and putting into BigMemory.
To access this BigMemory from a webmap as a layer, I decided to write a middleware that implements the ArcGIS Rest JSON interface for an ArcGISDynamicLayer. Now, I did not write the whole interface, just the layer metadata and the export image endpoints, in such that you can access the layer as:
http://host/arcgis/rest/services/Service/MapServer
The implementation of the Rest endpoint was easily achievable using the Spring MVC framework which enables me to export POJO methods as ArcGIS Rest endpoints:
The final step is the convert the aggregated features from BigMemory into an image that is returned as a response to an export Rest request. This reminded me of the days when I implemented the first version of ArcIMS in Java back in 1996/7, where I used AWT for exactly that same purpose. What is old is new again, and used the same technique to create an in memory image buffer and used the Graphics2D API to draw and color code the aggregated cells. I used the ImageIO to convert the image buffer into a PNG output stream. Now ImageIO is new and if this PoC becomes reality, I will use JAI with its GPU enhancements to do the same task.
To run this PoC, I started 2 extra large Amazon EC2 instances to run BigMemory with 15GB of off heap RAM (just because I can :-) and started a medium instance to run tomcat with the Spring MVC Rest interface that communicated with two large instances.
The PoC was very well received and extended it to make the layer time aware enabling the client to slide though time aware features.
Like usual, you can download all the source code from here, here and here.
Wednesday, April 24, 2013
Wednesday, April 3, 2013
BigData: DataRush Workflow in ArcMap
At this year's DevSummit, we announced the GIS Tools for Hadoop project. Included in that project is a low level geometry java API which enables spatial operations in MapReduce jobs or the construction of higher level functions such as Hive User Defined Functions. However, this geometry library is not restricted to Hadoop MapReduce. It is used in Geo Event Processor, and can be used in Storm bolts or other parallel workflows. One such parallel workflow processing engine is Pervasive DataRush that I demoed at the DevSummit. Using the KNIME visual workflow, I was able to process 90 million records (BTW, small in the BigData world) in Hadoop File System, for heatmap visualization in ArcMap.
A DataRush workflow engine can run on a local machine or remotely on a cluster of machines and is fully extensible with custom operators. An operator is a node in a workflow graph whose input and output can be linked to other operators. So, I wrote my own spatial operators that utilizes the geometry API.
The first operator is a simple spatial filter that passes through records that contain LAT/LON fields and are within a user defined distance from a given point. This uses the "GeometryEngine.geodesicDistanceOnWGS84" function and augments the input record fields with a distance "METERS" output field.
In the above application, a graph workflow reads a tab delimited file and sends its output to the new distance operator who sends its output to the a console log operator. Simple and easy, but not simplistic - if you notice that when I execute the application without any arguments, the graph runs on the local machine taking full advantage of all the local processors. However, if I specify a dummy argument, the graph takes its input from HDFS and executes remotely on a DataRush cluster. This simple toggle is pretty impressive when switching from development to production mode.
Next is to add the operator to the KNIME visual workflow.
The Pervasive folks made that really easy using a plugin for the Eclipse environment.
Using the Node Extension wizard, you can select an operator and it generates template code for the operator xml descriptor, the dialog pane, the model factory and the node settings.
This gets you 80% to 90% there. The rest is completing the XML node descriptor and laying out a nice dialog pane.
The final step is to export the compiled code into KNIME as a plugin - BTW, KNIME is based on Eclipse. Within KNIME, you are in what I call a PHD (Push Here Dummy) mode, where you can drag and drop and link operators on the workbench.
A slightly more complicated operator, is a composite operator that, as its name suggests is composed of two or more operators, but the whole is exposed as one operator.
So, the next spatial operator extension to write is a composite one to perform a spatial join between a point input source and a polygon input source where the output is a flow indicating what polygon an input point is contained into.
The Spatial Join composite operator contains a non parallelizable operator that blocks to read first time through into an in memory model the polygons. The polygons input format is in the Esri JSON text format and we use the "GeometryEngine.jsonToGeometry" to convert it into a Polygon POJO. The second embedded operator is parallel enabled and walks the input points and uses the builtin in memory model of polygons to find the containing one using the "GeometryEngine.contains" function. This "build and walk" pattern is used whenever a join or lookup needs to be performed.
The above graph adds one extra built-in operator (group by) where the output is the number of points that each input polygon contains. The result of such a flow, can be used to thematically color code the polygons in ArcMap.
Displaying thousands or millions of points on a map is, well....not very smart and does not make sense. But if all these points are binned and each bin is weighted proportionally to the points in that bin, then a map displaying the color coded bins with respect to their weight is much more appealing and conveys much more information, such as in the above map, where I was rendering a heatmap of locations with businesses with more than 100,000 employees.
So I wrote a terminal operator, the raster operator, that writes bin information as an Esri raster file in either float or ASCII format. The interesting thing about that operator is that it is a "fan in" operator and despite being part of a graph that is executed on a remote cluster, the raster operator is executed on the client machine. The client being the machine that submitted the graph to the cluster.
One last thing. A KNIME workflow can be exported into an external representation that can be imported into a logical graph for local or remote execution.
This means, I can turn that code into an ArcMap GeoProcessing extension enabling me to invoke a DataRush workflow as a Tool and combine it with other existing tools for further analysis and visualization.
In my workflow example, rather than me manually calling the Float To Raster GP Task after Executing the DataRush Tool, I can combine that into one ArcPy Script Tool:
So to recap, you can store data into HDFS, or use the new GeoProcessing tools for Hadoop. Compose DataRush workflows with spatial operators that you export to be executed on a cluster from ArcMap, whose result is consumed back by ArcMap for further local GeoProcessing or visualization.
This is very cool!
Like usual, all the source code can be downloaded from here and here.
Monday, March 4, 2013
BigData meet Raspberry Pi
The Raspberry Pi is a very neat little machine - I bootstrapped it with Soft-Float Debian "Wheezy" so I can run Java Embedded on it - I download and install ejre-7u10-fcs-b18-linux-arm-vfp-client_headless-28_nov_2012.tar, and now can run java programs on the Pi. Wrote the ubiquitous "Hello World" to make sure it works (I should not have doubted the write once run anywhere motto) and jumped directly to running a Spring container with beans. So, the next thing to do is to run the Hadoop client from my previous post on it. Now remember, there is not a lot of RAM on the Model B Rev 2 Pi (512MB) and the post-action runs on the client machine, so I reduced the cell size and the search radius, and....it worked like a charm. Now, why did I do that ? because... I can :-)
Next, is to come up with something to do with the generated data.
Next, is to come up with something to do with the generated data.
BigData: Kernel Density Analysis on Hadoop MapReduce
The storage of BigData has been democratized by Hadoop. And with that, rather than bringing the data to the program for analysis, we send the program to the data. That twist comes with a challenge to take existing serial algorithms and parallelizing them. This post is about such a process. I want to perform a Kernel Density analysis on billions of records loaded into HDFS. The data is fairly simple; a set of records separated by carriage return, and the fields in each record are tab separated. The fields' values contain a latitude, a longitude and a weight.
If you do not know how Kernel Density works, check out this ArcGIS reference.
To take advantage of the distributed HDFS datanodes, I have to transform the traditional state-full sequential implementation into a model that supports stateless parallelism. MapReduce is such a model, and Hadoop can distribute a MapReduce implementation to read data from the "local" HDFS datanodes and reduces back the result into HDFS.
A Kernel Density analysis expects 3 parameters when processing spatial data; an output extent, a cell size and a search radius. From the above reference description, a input value influences the cells in its search area, and the level of influence is proportional to the input weight and inversely proportional to the distance of the cell from the input. That inverse proportionality is computed using a Kernel function.
So, in the MapReduce programming paradigm, a map function expects a key(k1),value(v1) tuple and emits zero or more new key(k2),value(v2) tuples.
map(k1,v1) -> List(k2,v2)
The map portion of the parallel Kernel Density analysis implementation will emit all the influenced cells and their associated weights for a given input. A cell is defined as a string concatenating its row and column values with a slash character.
After a shuffle and sort intermediate step, the reduce function expects a key(k2),List(v2) tuple and emits zero or more new key(k3),value(v3) tuples.
reduce(k2,list(v2)) -> List(k3,v3)
The implementation will emit the sum of all the weights of a cell
To execute the kernel density map and reduce functions, I decided to use the newly released Spring Data - Apache Hadoop project. As you know, I love the Spring Framework and have been tracking the Spring Data Hadoop project for a while now and glad that it is finally released.
The following Spring application context bootstraps an application and runs the defined job. The job declaratively reference a mapper class, a reducer class and a combiner class. In addition, the job runner defines a pre-action and a post-action. The pre-action tests the existence of the HDFS output path and removes it if it exits. The post-action references a class that converts the content of the output path into a local float raster file. You can use the Float to Raster conversion tool in ArcMap to visualize the kernel density output.
And like usual, all the source code can be found here.
If you do not know how Kernel Density works, check out this ArcGIS reference.
To take advantage of the distributed HDFS datanodes, I have to transform the traditional state-full sequential implementation into a model that supports stateless parallelism. MapReduce is such a model, and Hadoop can distribute a MapReduce implementation to read data from the "local" HDFS datanodes and reduces back the result into HDFS.
A Kernel Density analysis expects 3 parameters when processing spatial data; an output extent, a cell size and a search radius. From the above reference description, a input value influences the cells in its search area, and the level of influence is proportional to the input weight and inversely proportional to the distance of the cell from the input. That inverse proportionality is computed using a Kernel function.
So, in the MapReduce programming paradigm, a map function expects a key(k1),value(v1) tuple and emits zero or more new key(k2),value(v2) tuples.
map(k1,v1) -> List(k2,v2)
The map portion of the parallel Kernel Density analysis implementation will emit all the influenced cells and their associated weights for a given input. A cell is defined as a string concatenating its row and column values with a slash character.
After a shuffle and sort intermediate step, the reduce function expects a key(k2),List(v2) tuple and emits zero or more new key(k3),value(v3) tuples.
reduce(k2,list(v2)) -> List(k3,v3)
The implementation will emit the sum of all the weights of a cell
To execute the kernel density map and reduce functions, I decided to use the newly released Spring Data - Apache Hadoop project. As you know, I love the Spring Framework and have been tracking the Spring Data Hadoop project for a while now and glad that it is finally released.
The following Spring application context bootstraps an application and runs the defined job. The job declaratively reference a mapper class, a reducer class and a combiner class. In addition, the job runner defines a pre-action and a post-action. The pre-action tests the existence of the HDFS output path and removes it if it exits. The post-action references a class that converts the content of the output path into a local float raster file. You can use the Float to Raster conversion tool in ArcMap to visualize the kernel density output.

Sunday, November 25, 2012
BigData: HDFS FeatureClass ETL and MapReduce GPTool
This post is dedicated to my Esri colleagues Ajit D. and Philip H. for their invaluable help.
This is work in progress - but I've put a good dent in it that I would like to share it with you. In this post, we will go through a complete cycle, where from ArcMap, we will:
- Export a FeatureClass to an HDFS folder
- Register that folder as a Hive table
- Run command line Hive queries
- Execute Hive queries from ArcPy and show the results in ArcMap
- Execute a MapReduce Job as a GP Tool
- Import an HDFS Folder (result of MapReduce Job) as a FeatureClass
This post brings everything that I have been blogging about so far into a nice story, so here we go:
BTW - I am assuming that you have a Hadoop instance running somewhere and are familiar with ArcMap. You can download a Hadoop demo VM for local testing.
Download the ArcMap extension in this file and unzip its content into your ArcGIS\Desktop10.1\java\lib\ext folder - The jars have to be children of the ext folder.
Make sure to adjust the ArcGIS JVM using the JavaConfigTool.exe location in ArcGIS\Desktop10.1\bin:
Add the world cities to ArcMap:
Let's export the world cities to HDFS:
This tool iterates over the input FeatureClass features and stores each feature in the specified HDFS output path. The output path content is text formatted and each feature is stored as a line in an Esri JSON text representation followed by a carriage return.
This enables us to continuously add new records from for example a streaming process such as Esri GeoEvent Server.
The metadata for that HDFS based FeatureClass is stored in an HDFS based 'metastore' for other processes to inspect - A better place would have been ZooKeeper - but that is a post for another day.
Here is a sample metadata:
{
"wkid": 4326,
"geometryType": 1,
"fields": [{
"name": "ObjectID",
"alias": "ObjectID",
"type": 6,
"length": 4
}, {
"name": "Shape",
"alias": "Shape",
"type": 7,
"length": 0
}, {
"name": "CITY_NAME",
"alias": "CITY_NAME",
"type": 4,
"length": 30
}, {
...
}, {
"name": "LABEL_FLAG",
"alias": "LABEL_FLAG",
"type": 1,
"length": 4
}]
}
The metastore contains a set of files where by convention the file name is the imported FeatureClass name followed by ".json". For example:
$ hadoop fs -cat hdfs://localhadoop:9000/user/mraad/metastore/worldcities.json
The GP import tool adds one more file to the metastore, a Hive script that you can execute from the Hive command line to create an external table referencing the HDFS FeatureClass. Again, by convention the script name is the name of the imported FeatureClass followed by ".hql". For example:
$ hadoop fs -cat hdfs://localhadoop:9000/user/mraad/metastore/worldcities.hql
You can "cat" the content of the script and you will notice the usage of the collection data type for the feature geometry and attribute representation. In addition, the serialization and deserialization (SerDe) from JSON is based on a Cloudera library found in the article 'Analyzing Twitter Data Using CDH'. You can download the jar from here.
ADD JAR hive-serdes-1.0-SNAPSHOT.jar;
CREATE EXTERNAL TABLE IF NOT EXISTS worldcities (
geometry STRUCT <x:DOUBLE,y:DOUBLE,spatialReference:STRUCT <wkid:INT>>,
attributes STRUCT <
CITY_NAME:STRING,
GMI_ADMIN:STRING,
ADMIN_NAME:STRING,
FIPS_CNTRY:STRING,
CNTRY_NAME:STRING,
STATUS:STRING,
POP_RANK:INT,
POP_CLASS:STRING,
PORT_ID:INT,
LABEL_FLAG:INT
>) ROW FORMAT SERDE 'com.cloudera.hive.serde.JSONSerDe'
LOCATION 'hdfs://localhadoop:9000/user/mraad_admin/worldcities'
TBLPROPERTIES ('wkid'='4326','type'='point');
Please note how tables can have properties - is this case, I added the wkid and geometry type.
Upon the execution of the above commands, you can now query the table. Here are some sample queries:
hive> select * from worldcities limit 10;
hive> select attributes.city_name,geometry.x,geometry.y from world cities where attributes.cntry_name='Lebanon';
Hive can be accessed using ArcPy through the thrift protocol - Here is a Toolbox that enables the user to draw a polygon as input and invoke Hive spatial UDF constraining the resulting FeatureSet to the world cities within the drawn polygon. Download the UDF jar from here, and place it and the hive-serde jar in the same location where you will start the hive server as follows:
$> hive --service hiveserver
Next, I wanted to demo the capability to run a MapReduce Job as a GP Tool.
Quick MapReduce recap, For the unix geeks:
$> cat input.txt | map | sort | reduce > output.txt
And for the "acedemics":
map(K1,V1) emit list(K2,V2)
shuffle/sort K2
reduce(K2,list(V2)) emit list(K3,V3)
This is fairly low level and requires explicit writing of the Mapper and Reducer Java classes. This is not for your average GIS user. But I can see a time when advanced users will write parameter driven MapReduce tools and share them with the community. This is all based on 'How to build custom geoprocessing tools'.
This simple MR tool takes as input the world cities HDFS FeatureClass and finds the "centroids" by country of all the cities with a specific population rank.
BTW, this can easily be written in HQL as follows:
select attributes.cntry_name as name,
avg(geometry.x) as x,
avg(geometry.y) as y,
count(attributes.cntry_name) as cnt
from worldcities
where attributes.pop_rank < 6
group by attributes.cntry_name
having cnt > 10;
The JobRunnerTool accepts as input:
- A set of Hadoop properties
- HDFS FeatureClass input path
- HDFS FeatureClass output path
- Metastore location
The suffle/sort portion will ensure that each reducer will receive a country name (K2) as an input key and a list of geometry points (V2) as input values.
The reducer averages the coordinates, creates a PointFeature whose geometry is a point with values based on the averaged calculations. The attributes will include the country name and the number of points used in the averaging. The reducer key output (K3) will be the JSON formatted text representation of the PointFeature and the output value (V3) will be NULL. thus producing an HDFS FeatureClass with its metadata in the metastore for further processing and inspection.
Lastly, we close the cycle by importing the HDFS Feature class.
The tool accepts as input an HDFS FeatureClass, its metadata and an output location. When executed within ArcMap, the output is automatically added to the display.
Things to work on next:
- UDF that accepts as input an ArcMap generated FeatureSet into the DistributedCache - I already blogged about this, but as standalone.
- MapReduceTool that accepts external jar containing mapper/reducer classes - I think this will pave the way for the advanced users.
Friday, November 23, 2012
BigData: Cloudera Impala and ArcPy
So at the last Strata+Hadoop World, Cloudera introduces Impala- I downloaded the demo VM, and install the TPC-DS data set (read the impala_readme.txt once the VM starts up) and tested some of the queries. Was pretty fast and cool - As of this writing, UDFs and SerDes are missing from this beta release, so I cannot do my Spatial UDF operators, nor can I read JSON formatted HDFS record :-(
One of the setup tables was a customer_address table, though it was missing a lat/lon field, it included the standard address field. Will be cool to invoke an Impala query on that table and mach up the result in ArcMap using ArcPy. So, I downloaded and installed onto my Windows VM (now remember I work on a real machine, a MacBookPro :-) the ODBC driver and downloaded and installed pyodbc. Small thing, the documentation keeps talking about 'You must use the 32-bit version'. A bit of googling revealed that they are referring to the Odbcad32.exe file is located in the %systemdrive%\Windows\SysWoW64 folder. The following is a simple GeoProcessing Python Toolbox that queries Impala, and the result set is converted into an in memory table that is appended to ArcMap's table of content. You can join the table with a state layer and symbolize using quantile breaks the state polygons based on Impala's aggregated response. I think this combination of BigData in HDFS converted instantly into "SmallData" for rendering and further processing in ArcMap is a great marriage - Looking forward to the next release of Impala with UDFs.
import arcpy
import pyodbc
class Toolbox(object):
def __init__(self):
self.label = "Toolbox"
self.alias = "Toolbox"
self.tools = [QueryImpala]
class QueryImpala(object):
def __init__(self):
self.label = "Query Impala"
self.description = "Query Impala"
self.canRunInBackground = False
def getParameterInfo(self):
paramtab = arcpy.Parameter("ImpalaTable", "Impala Table", "Output", "Table", "Derived")
return [paramtab]
def isLicensed(self):
return True
def updateParameters(self, parameters):
return
def updateMessages(self, parameters):
return
def execute(self, parameters, messages):
tab = "in_memory/ImpalaTable"
if arcpy.Exists(tab): arcpy.management.Delete(tab)
arcpy.management.CreateTable("in_memory","ImpalaTable")
arcpy.management.AddField(tab, "STATE", "TEXT")
arcpy.management.AddField(tab, "COUNT", "LONG")
connection = pyodbc.connect('DSN=HiveODBC')
cursor = connection.cursor()
cursor.execute("""
select ca_state as state,count(ca_state) as cnt
from customer_address
group by ca_state
""")
with arcpy.da.InsertCursor(tab, ['STATE', 'COUNT']) as insert:
rows = cursor.fetchall()
for row in rows:
insert.insertRow([row.state,row.cnt])
del row
del rows
del insert
del cursor
del connection
parameters[0].value = tab
return
Monday, October 8, 2012
Streaming Big Data For Heatmap Visualization using Storm
A record number of tweets was set during the 2012 Presidential debate. If you wondered how technologically this happened, then Event Stream Processing is your answer.
Actually, Twitter open sourced such an implementation called Storm. Pretty impressive piece of technology! So, I wanted to try it out with a "geo" twist.
To get started, I recommend that you read "Getting Started with Storm". Here is the challenge at hand, simulate a stream of aircraft flight track targets, in such that a heatmap is generated based on the density of targets at "near" locations.
The topology is very simple in this challenge, a spout reading the targets from an external source (to be defined later) is linked to a bolt that overlays a virtual grid on top of an area of interest. Each emitted target tuple is mapped to a cell in the grid and the cell weight is decreased or increased when the target leaves or enters a cell.
Storm is a distributed system, that means that spouts and bolts can run on different physical nodes. The grid in the bolt is implemented as a map and since the bolt can to be distributed, then the map implementation has to be distributed too. Enter Hazelcast, it is a "clustering and highly scalable data distribution platform for Java". In addition to maps, a distributed queue implementation is also available. The spouts (note the plural) use the latter to poll the offered targets and emit them to bolts (plural again :-) to update the density map grid. Here is the source code of the spout and the bolt.
The continuously running topology in this challenge is fronted with a web application container that "feeds" the spout and periodically accumulates the set of non-zero weighted cells from the grid for web client visualization.
The Spring web container is used in this implementation due to the Flex Integration and many many other reasons that I will not go into this post.
The Spring application context, defines:
- A bean component scanner
- A task scheduler
- A Hazelcast instance
- A flex message broker
- A flex message destination
- A Spring message template
Flight tracks are simulated by time stepping the parameters of cubic hermite spline that is bound by starting and ending locations and vectors. A scheduled bean iterates over the set of track parameters and offers to the Hazelcast queue the current position along that track at the current timestamp in the form of a Target instance. When the target reaches the end of the track, the latter is removed from the simulation set. In addition, that scheduled bean, gets from the Hazelcast grid map the set of non-zero weighted cells and accumulates them in the form of a list that is sent back to the client using the injected message template thought the a message broker.
Onto the client. Upon startup completion, the application subscribes to the message destination and adds a message listener to handle the list of weighed cells. This list is bound to a DensityMapLayer instance that converts the cells in the list into gradient fillings on a bitmap. This enables a very fast and fluid rendering of a large set of data. The overlay of gradient fillings is what generates a heatmap. In addition, a UI Button is added to the stage enabling the user to define random starting and ending locations and vectors that are sent to the Spring container as seed to the Scheduled bean track/target simulator.
I know that this is very technical and I am assuming that you have some background in the above mentioned technologies - I just wanted to show how they can be integrated together seamlessly for BigData streaming - like usual, all the source code is available here.
Subscribe to:
Posts (Atom)