Wednesday, October 30, 2013

BigData Spatial Joins

There has been a lot of research on performing spatial joins using Hadoop MapReduce on BigData, specially when both sets are very big. A notable one is the Spatial Hadoop project at the University of Minnesota. This post is derived from that body of work and uses the Esri Geometry API in the GIS Tools for Hadoop project to perform the spatial operations in the map and reduce phases.
Lexicographical join of native types (numerical,textural) in BigData can be performed in the map phase or in the reduce phase depending on the size of the input data and how much memory the mapper or reducer has access to. Actually, Hadoop provides a join framework package as this is a common pattern. The same can be applied for spatially joining two big data sets. It can be performed in the mapper phase or the reducer phase depending on the size of the data and how much memory each phase has access to.
Let's start with a map phase spatial join. Let's say you have a billion point records and you have the polygon areas of the US zip codes, and the task at hand is to find the count of points per zip code. Since the US zip code feature class is a 'small' set (~41,700), it can be fully loaded into each mapper memory space at start up. In addition, it can be in-memory spatially indexed using the API QuadTree for quick look up based on the envelope of each feature. The GeometryEngine comes with a handy 'contains' method that can further refine the spatial constraint. The source of the zip code feature class can be from the DistributedCache or from HDFS. In addition, it can be in the Esri JSON format or GeoJSON or in the OGC format . The GeometryEngine API can parse all these formats into Geometry POJOs. As the mapper is reading each point feature, it locates the zip code polygon where the point falls into and emits the zip code to the reducer. The reducer task is to sum the emitted values per zip code.
Now, let's say that you have a billion polygons that you want to spatially join with yet another billion polygons and return the intersection set. How do you proceed with doing that in MapReduce when clearly a billion features cannot all fit in the mapper space (at least not in a typical node that I have access to :-). This is where we take advantage of the shuffle and sort phase of Hadoop's MapReduce implementation to partition and group all the "similar" elements, in such that the "reduced" set can be held in memory for processing. In our case that similarity is based on spatial grouping. So, in the map phase, the extent that the two polygon sets occupy is projected onto a grid with predefined cell sizes. The minimum bounding rectangle (MBR) of the input polygon from each set is overlaid onto the grid. The overlapping cells are emitted as keys whose value is a tuple consisting of an input source reference and the polygon shape. So now, all the polygons that overlap a specific cell are sent to the same reducer. In the reduce phase, we know the cell that we are operating over as this is derived from the key, we divide the list of values based on their source into separate lists where we can now perform a spatial cartesian product from one list to the other. Actually, one of the lists can be spatially indexed for fast lookup. Since an input polygon MBR can straddle multiple cells, there is a trick to not dispatch multiple times the spatial join by different reducers. It is best explained using the below picture.

Given two MBRs of polygons A and B, and given a grid partitioning of depth 1 with 4 quads, we can see that A and B overlap the top two quads (1,0) (1,1). So the mapper will emit values (A,B) for Quad (1,0) and (1,1) as keys. In the reducer for key (1,0) we have a reference to A and B and we calculate the lower left corner of the intersection of A and B MBRs (red dot). Because the intersection point falls into the Quad (1,0), we can then proceed with the spatial join and emit the result. Now, in the reducer for key (1,1), we can see that the intersection point does _not_ fall into quad (1,1) indicating that the spatial join was performed or will be performed by a reducer for another quad, thus any further processing is stopped and nothing is emitted.
Neat trick. However, the question at hand is "What is the 'correct' grid size ?" This takes me back to my days when I was an SDE specialist and had to tweak the spatial grid sizes and levels of a layer for best performance based on the layer content and usage. This will be a nice follow on post on writing an MR preprocessing task to scan through the static data and 'recommend' a value. In addition, the current implementation is assuming that you will want to spatially join _all_ the polygons in the full extent. What is missing is a spatial index to enable an initial cookie cutting of a region to operate over. This is what the above project does. However, it assumes the data is static and can be migrated into a new spatially optimized layout. Yet another post on how to do that on dynamic existing data.
Like usual all the source code is available here.

Saturday, September 21, 2013

BigData: Apache Flume, HDFS and HBase

In this post, I will show how to log very large amount of web requests to a BigData storage for traffic analysis. The source code for the project is on github. We will rely on the logging library log4j and the associated Flume NG appender implementation. For storage, we will place the log information into a set of HDFS files or into an HBase table. The HDFS files will be mapped into a Hive table as partitions for query and aggregation.
In this demonstration, all web requests are handled by a very simple web application servlet that is mapped to an access url:
In practice, all logic is handled by the servlet and the logging of the request information will be handled by a servlet filter that uses the log4j logging API.
In this demonstration the path info will define the log level and the query string will define the log message content.
Log4j is configured using the resource log4j.properties to use a Flume NG appender:
The appender will send all log events to a Flume agent instantiated on the host defined by the property log4j.appender.flume.Hostname.
The log event body content is defined by the ConversionPattern property, where each of the string tokens are separated by a tab character. This is how the content will be stored into an HDFS file and parsed back into tokens to be stored as columns into an HBase table.
Before starting the web application, a flume agent has to be instantiated with the predefined configuration:
This agent has an avro source that accepts incoming events on port 51515. The source has an interceptor to ensure that all incoming events have a timestamp.  The events are forwarded to an HDFS sink through a memory channel.  The HDFS sink writes the events to a file where the path of the file is specified by the current year, month, day and hour. Each file is closed on the hour and all events are "rolled" into a newly created file.  The content of the file is in a pure text format in this demonstration.   For efficiency, I could have used a binary compressed format.
You can start an agent from the command line as follows:
$ flume-ng agent -n agent -c conf -f conf/agent.properties
Or you can use a GUI like the Cloudera Manager:

The HDFS files are mapped onto a Hive partitioned table for query as follows:
And as file partitions are added onto the file system, the table can be altered to account for that new partition.  Here is an example:
The following is an agent configuration that will place all log events into an HBase table where each of the log event elements populates a column:
Note: Make sure to first create the HBase table using the shell:
create 'log4j',{NAME=>'info',VERSIONS=>1}
This agent has an avro source that listens for incoming event on port 61616 and forwards these event s to an HBase sink though a memory channel.  The Hbase sink uses a regular expression parser to tokenize the event body content and each token is respectively placed into a column in a table.
Like usual, all the source code is available here.

Friday, September 6, 2013

BigData GeoEnrichment

What is GeoEnrichment? An example would best describe it. Given a big set of customer location records, I would like each location to be GeoEnriched with the average income of the zip code where that location falls into and with the number of people between the age of 25 and 30 that live in that zip code.

Before GeoEnrichment:
CustId,Lat,Lon

After GeoEnrichment:
CustId,Lat,Lon,AverageIncome,Age25To30

Of course the key to this whole thing is the spatial reference data :-) and there are a lot of search options, such as Point-In-Polygon, Nearest Neighbor and enrichment based on a Drive Time Polygon from each location.

I've implemented two search methods:
  • Point-In-Polygon method
  • Nearest Neighbor Weighted method
The Point-In-Polygon (PiP) method is fairly simple. Given a point, find the polygon it falls into and pull from the polygon feature the selected attributes and add them to the original point.

The Nearest Neighbor Weighted (NNW) method finds all the reference points within a specified distance and weights each point based on its distance. The GeoEnrichment value is the sum of the weighted attribute value.

You can more details about this here, where I've used HBase and Geometry API for Java to perform the GeoEnrichment.

Tuesday, August 27, 2013

BigData Spatial Indexes and HBase Toolbox

This is a set of experiments with HBase. From within ArcMap, you can create an HTable and export the content of a feature class into that table. The HTable is mapped into a Hive table to perform SQL Like queries on it and of course the Spatial Framework UDFs can be applied for spatial operations. If Hive is not fast enough for you, try Impala. Finally, by taking advantage of the RowKey design, a spatial index is derived using geohash to be later used in a MapReduce big spatial joins job.

Like usual all the source code can be found here.

Wednesday, August 14, 2013

BigData: Experiments with Apache Avro and Parquet

 

In the GIS tools for Hadoop, we store and retrieve feature classes in Esri JSON or GeoJSON formats to and from HDFS. This post is about a set of experiments in different storage and serialization technics of feature classes. The two environments I evaluated are Apache Avro and Parquet. All the source code can be found here.

Experiment I - Simple Feature Class Schema

Despite that Avro has the capability of dynamic typing and being old fashion, I wanted to generate a set of POJOs for a simple feature class schema.  In my mind, a feature has a geometry and a set of attributes.   A geometry has a spatial reference and can be either a point, line or a polygon.  A point is made up of one coordinate. A coordinate is simply made up of an x and y value. Lines are made up of paths, where a path is a set of coordinates. Polygons are made of rings, where a ring is a set of coordinates. Based on the Avro specification, I wrote two schemas; one for a generic feature and one for a point feature. The feature attributes is a set of key values, where the key is in a text format, and the values can be either numeric or textural.

Using maven, a set of concrete class can be generated to be used in static programming:

$ mvn avro:schema

Experiment II - Generate Data

Once the classes are generated, the project is compiled and the set of features with random geographic coordinates is generated and placed into HDFS:

$ mvn exec:java -q -Dexec.mainClass=com.esri.RandomPoints -Dexec.args="cloudera hdfs://localhost.localdomain:8020/user/cloudera/points/points.avro"

Experiment II - View Avro Data

Since the Avro data is in a binary format, a "cat" of the data will render gibberish.  So I borrowed an idea from the Avro tool project to dump the content to the console in JSON text format:

$ mvn exec:java -q -Dexec.mainClass=com.esri.AvroToJson -Dexec.args="hdfs://localhost.localdomain:8020/user/cloudera/points/points.avro 10"

Experiment III - Run MapReduce Job on Avro Data

The MapReduce job in mind is a simple bin clustering, where the output is the number of points per bin - think of a bin as a cell in a grid.  Here are the mapper and the reducer.

private static final class FeatureMapper extends AvroMapper<
AvroPointFeature, Pair<Long, Integer>>
{
@Override
public void map(
final AvroPointFeature feature,
final AvroCollector<Pair<Long, Integer>> collector,
final Reporter reporter)
throws IOException
{
final AvroPoint avroPoint = feature.getGeometry();
final AvroCoord coord = avroPoint.getCoord();
final long x = (long) Math.floor(coord.getX() - XMIN);
final long y = (long) Math.floor(coord.getY() - YMIN);
final long g = (y << 32) | x;
collector.collect(new Pair<Long, Integer>(g, 1));
}
}
private static final class FeatureReducer extends AvroReducer<Long, Integer,
Pair<Long, Integer>>
{
@Override
public void reduce(
final Long key,
final Iterable<Integer> values,
final AvroCollector<Pair<Long, Integer>> collector,
final Reporter reporter)
throws IOException
{
int sum = 0;
for (final Integer value : values)
{
sum += value;
}
collector.collect(new Pair<Long, Integer>(key, sum));
}
}

Of course the Avro output can be viewed using the Avro to JSON tool from the second experiment :-)

Experiment IV - GeoProcessing Tool To Generate Schema.

I wanted to generate an Avro schema that closely resembles a specific simple feature class metadata. So, I wrote the SchemaTool as an ArcMap extension that writes to HDFS the specific Avro schema of a given feature class.

Experiment V - Export Feature Class Based on Specific Schema

Now that we have a specific schema, I wrote yet another extension (ExportToGenericAvro) to export the content of a feature class into an Avro formatted HDFS file. Again, you can validate the content using the Avro to JSON tool :-)

Experiment VI - Use Hive To Query Avro Data

Hive takes a SQL-like statement and converts to a MapReduce Job to operate on a table that is mapped to an HDFS path. Hive maps binary content as table using a concept called SerDe. In my experiments, I am using the Cloudera VM as my sandbox and it comes preinstalled with Hive and all kind of SerDe. An external Hive table can be defined as follows:

CREATE EXTERNAL TABLE points
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION '/user/cloudera/points/'
TBLPROPERTIES ('avro.schema.url'='hdfs://localhost.localdomain/user/cloudera/points.avsc');

Once the table is registered, you can start querying it.  Here is a sample query:

select geometry.coord,attributes['id'] from points limit 10;

Experiment VII - Using Spatial UDFs

The GIS Tools for Hadoop comes with a set of Spatial Hive User Defined Functions. This enable you to "augment" HQL with spatial operations. Make sure to first add the geometry and spatial framework jars to Hive and register the spatial UDFs. In my case, I cloned the projects, compiled them and copied the resulting jars to my local home folder from where I launched Hive.

hive> add jar /home/cloudera/esri-geometry-api-1.2-SNAPSHOT.jar;
hive> add jar /home/cloudera/spatial-sdk-hive-1.0-SNAPSHOT.jar;
hive> source function-ddl.sql;

Now, I can perform spatial queries on my points table. Here is a sample query that counts the number of points that are within a certain distance from a specified point:

hive> select count(1) from points
where ST_DISTANCE(ST_POINT(geometry.coord.x,geometry.coord.y),ST_POINT(75,24)) < 5;

Experiment VIII - Parquet Format.

So, all the above experiments place the data in a row format in HDFS.  Parquet is a columnar storage for Hadoop. Typically the column schema is defined in Apache Thrift. However, there is a sub project that enables you to define your column schema in Avro - cool, no ? So I wrote yet another tool (ExportToAvroParquetTool) to export the content of a feature class into Parquet based format and based on a specific schema defined in Avro.

Final Thoughts…

Time is due to evaluate performance in capacity and speed - that will be an interesting post - per my readings, Twitter is so impressed with Parquet that they are already moving it into production - I like Avro, as it enables a future proof schema changes - after all you cannot "readjust" your data to fit a new schema when it is in the petabytes :-)

I do all my development on a mac and have a Windows VM running ArcMap and the Cloudera VM running Hadoop all on the same machine (Yea, it is a beefy machine :-) I had to use Putty to SSH tunnel all the ArcMap communication with Hadoop - check out this post for some hints.

Rather than using Hive, I desperately wanted to use Impala, however as of this writing, Impala does not support structs, maps nor UDFs. But when it does….it will be AMAZING, as in the past, when still in beta, it saved my "derrière", where I could query 4.5 billion records in under 20 seconds live on a stage in front of thousand of people !

Again, like usual, all the source code can be download from here.

Monday, July 29, 2013

Minecraft: The Gamification of GIS BigData

So in this short post I am integrating 4 things that I love the most - GIS, Hadoop, BigMemory and Minecraft - The idea is the following, I would like to visualize in Minecraft the result of Kernel Density calculation performed by Hadoop.  The job output is placed into distributed memory rather than back into HDFS in such that a Minecraft server can read that memory and populate a world that I have teleported into.

Here is a short video that showcases the last steps after I started the BigMemory server and ran the Hadoop Job.
Like usual all the source code is available here.

Monday, July 15, 2013

Minecraft: The Gamification of GIS

At this year's User Conference, I presented in the Esri Labs booth "The Gamification of GIS", where I showcased how a Minecraft player can teleport and interact with GIS worlds. It was a simple proof of concept, but I believe the ramification can be significant, especially after listening to Amber Case talking about STEM in the plenary.
My boys and their friends can play Minecraft for hours. They build collaboratively amazing online worlds and pixel art. And despite the "blockiness" of the worlds and the art, the game is pretty immersive.
One day my son tells me "Hey dad, I think I can build one of your worlds in Minecraft". What he meant about my world is a GIS world that I render using ArcMap. So I started thinking about this and wondered if I too can do this, but programmatically. A bit of googling, and found out that I can program the Minecraft server using Java.  My boys play Minecraft on their XBox and have a desktop version too. The latter has additional features such as connecting to your own server that is extensible with plugins.
So, I setup my own server, and started programming my own plugin to build my GIS based Minecraft worlds.
Since Minecraft is based on blocks, I needed to convert the world vector features into "block" . The raster format fits that requirement. Using ArcMap's built-in conversion tools, I can convert features to raster, where the raster values are based on the features attribute values.  The idea in the usage of the attribute values is to create worlds with heights that are proportional to the attribute values. To enable the plugin to read these raster values, I converted the raster to float values.
To know where I am in these generated worlds, I need sign posts. In GIS parlance, a sign post is a symbol to render point features. So I used the built-in data management feature to point tool, followed by the execution of a Python based GeoProcessing task (thanks @pheede) to convert the point feature class to a CSV file that can be read by the plugin to create sign post block types. The text on the sign post is based on a feature attribute value.
My UC presentation was based on navigating the countries of the world, where the height of each country is proportional to its population and the sign posts in each country are annotated with the country name.

Here is the sequence of steps that I followed to enable that navigation:
After installing and setting up a Bukkit server (as of this writing, I am using version 1.6.2 Build #2815 to work with the latest 1.6.2 Desktop version), a plugins folder will be created.
Stop the server and download into the plugins folder the Multiverse-Core-2.5.jar and the RasterPlugin-1.0-SNAPSHOT.jar.
Download and unzip the content of the minecraft-cntry.zip file into your home folder.  You should see 3 files; cntry.flt, cntry.hdr and cntry.csv
Start the server and issue the following commands at the server command line prompt:
mv create cntry normal -g RasterPlugin:/path/to/your/home/folder/cntry.flt -t FLAT
This command uses the Multiverse plugin to create a normal flat world named 'cntry' using the RasterPlugin generator.
To place the sign posts in the generated world, type the following command:
label cntry /path/to/your/home/folder/cntry.csv
Now start the Minecraft desktop application. Click the 'Multiplayer' button. Click the 'Direct Connect' button. Enter localhost in the Server Address text input, then click the 'Join Server' button.
Make sure the keyboard focus is in the Minecraft application by clicking on it. To teleport to the newly generated 'cntry' world, type the following command:
/mv tp cntry
Place yourself in 'creative' mode:
/gamemode 1
To teleport yourself to the highest location in that world, issue the following command:
/pos max
Start walking or flying around and explore.
Hope you find this fun and educational - like usual all the source code can be downloaded from here.
This post is dedicated to my son Coln who is my inspiration. Him and his Minecraft friends raised some serious cash at this year's St Baldrick's Foundation event.
IMG 1221

Sunday, May 26, 2013

Creating Spatial Crunch Pipelines

Josh Wills (@Josh_wills) introduced me to Apache Crunch which is now a top-level project within the Apache Foundation. Crunch simplifies the coding of data aggregation in MapReduce.

Here is a proof-of-concept project that spatially enables a crunch pipeline with a Point-In-Polygon function from a very large set of static point data with a small set of dynamic polygons.

Crunch has simplified so much so the process, that is came down to a one line syntax:

final PTable<Long, Long> counts = pipeline. readTextFile(args[0]). parallelDo(new PointInPolygon(), Writables.longs()). count();

Crunch's strength is in processing BigData that cannot be stored in the "traditional means", such a time series and graphs. Will be interesting to perform some kind to spatial and temporal analysis with it in a followup post.

Like usual, all the source code can be found here.

Monday, May 20, 2013

Export FeatureClass to Hadoop, Run MapReduce, Visualize in ArcMap

In the previous post we launched a CDH cluster on EC2 in under 5 minutes. In this post, we will use that cluster to perform geo spatial analytics in the form of MapReduce and visualize the result in ArcMap. See, ArcMap is one of the desktop tools that a GeoData Scientist will use when working and visualizing spatial data.  The use case in my mind is something like the following:  Point data is streamed through, for example GeoEventProcessor into Amazon S3. The user has a set polygons in ArcGIS that needs to be spatially joined with that big data point content. The result of the big data join is linked back to the polygon set for symbol classification and visualization.

After editing the polygons in ArcMap, the user exports the feature class into HDFS using the ExportToHDFSTool.

Using the new Esri Geometry API for Java, a MapReduceJob is written as a GeoProcessing extension, in such that it can be directly executed from within ArcMap. The result of the job is covered directly back into a feature class.

The tool expects the following parameters:

  • A Hadoop configuration in the form of a properties file.
  • A user name that Hadoop will use as credentials and its privileges when executing the job.
  • The big data input to use as the source - in the above use case that will be the S3 data.
  • The small data polygon set to spatially join and aggregate.
  • The remote output folder - where the MapReduce job will put its results.
  • A list of jar files that will be used by the DistributedCache to augment the MapReduce classpath. That is because the Esri Geometry API for Java jar is not part of the Hadoop distribution, and we use the distributed cache mechanism to "push" it to each node.
  • The output feature class to create when returning back the MapReduce job result from HDFS.
This code borrows resources from Spatial Framework for Hadoop for the JSON serialization.

For the ArcGIS Java GeoProcessing Extension developers out there - I would like to point you to a couple tricks that will make your development a bit easier and hopefully will make you bang your head a little bit less when dealing with ArcObjects - LOL !

I strongly recommend that you use Apache Maven as your building process. In addition to a good code structure and unified repository, it comes with a great set of plugins to assist in the deployment process.  The first plugin is the maven-dependency-plugin. It copies all the runtime dependencies into a specified folder.  I learned the hard that when ArcMap starts up, it introspects all the classes in all the jars in the extension folder for the @ArcGISExtension annotation declaration.  Now this is fine if you are writing a nice "hello world" GP extension, but in our case, where we are depending on 30+ jars, well….ArcMap will never starts. So, the solution is to put all the jars in a separate subfolder and declare a Class-Path entry in the main jar manifest file that references all the dependencies. This is where the second plugin comes in to the rescue. The maven-jar-plugin can be configured to automatically generate a manifest file containing a class path entry that references all the dependencies declares in the maven pom.

Talk about classpath. So you think that all is well and good by using the above setup. There is one more thing that you have to do to force the correct classpath when executing the GP task and that is to set the current thread context class loader to the system class loader as follows:

Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());

It took me 3 weeks to figure this out - so hopefully somebody will find this useful.

Like usual, all the source code can be found here.

Saturday, May 18, 2013

BigData: Launch CDH on EC2 from ArcMap in under 5 minutes

well....after you get all the necessary software, certificates and... setup everything correctly :-)

Update: Regarding the above comment - you can download a zip file containing all the necessary jars and the toolbox so like that you do not have to package the project from scratch.

The idea here is that I would like an ArcGIS user to just push a button from within ArcMap and have a Cloudera based Hadoop cluster started on Amazon EC2. From there on, a user can edit features in ArcMap that can be exported into that cluster to be used as an input to a MapReduce job. The output of the MapReduce job is imported back into ArcMap for further analysis. This combination of SmallData (GeoDatabase) and BigData (Hadoop) is a great fusion in a geo-data scientist arsenal. When done with the analysis, the user again will push a button and destroys the cluster, thus paying for what he/she used while having access to elastic resources.

The following is a sequence of prerequisite steps that you need to execute to get going:

Create an Amazon Web Service account and setup AWS security credentials. You need to remember your 'Access Key ID' and 'Secret Access Key'. They are ugly long strings that we will need later to create our Hadoop cluster in EC2.

Download the PuTTY Windows installer from http://www.chiark.greenend.org.uk/~sgtatham/putty/download.html and run it. This will add two additional programs to your system; putty.exe and puttygen.exe. Will use puttygen.exe to generate a OpenSSH formatted public and private key to enable a secure tunneled communication with the soon to be created AWS instances.

Here are the steps to create the keys:

  • Launch PuTTYGen.
  • Set the number of bits in generated keys to 2048.
  • Click the 'Generate' button and move your mouse randomly in the 'blank' area.
  • To save your private key, click the 'Conversions' menu option and select the 'Export OpenSSH key' menu item. An alert will be displayed, warning you that you are exporting this key without a passphrase. Press 'Yes' to save it
  • To save your public key, select and copy the content of the text area. Open notepad, paste the content and save the file.
  • Finally, save this key in a PuTTY format by clicking on the 'Save private key' button.

Run as administrator the 'JavaConfigTool' located in C:\Program Files (x86)\ArcGIS\Desktop10.1\bin and adjust the Minimum heap size to 512, Maximum heap size to 999, and the Thread stack size to 512.

The deployment of the Cloudera Distribution of Hadoop (CDH) on Amazon EC2 is executed by Apache Whirr. You can find great documentation here on how to run it from the command line. However in my case, I wanted to execute the process from within ArcMap. So, I wrote a GeoProcessing extension that you can add to ArcCatalog enabling the invocation of the deployment tool from within ArcMap.

Whirr depends on a set of configurations to launch the cluster. The values of some of the parameters depend on the previous two steps. Here is a snippet of my 'cluster.properties' file.

whirr.provider=aws-ec2
whirr.identity=my-aws-access-key-id
whirr.credential=my-aws-secret-access-key
whirr.cluster-name=hadoopcluster
whirr.instance-templates=1 hadoop-jobtracker+hadoop-namenode,3 hadoop-datanode+hadoop-tasktracker
whirr.public-key-file=/Users/mraad/mypublickey
whirr.private-key-file=/Users/mraad/myprivatekey
whirr.env.repo=cdh4
whirr.hadoop.install-function=install_cdh_hadoop
whirr.hadoop-configure-function=configure_cdh_hadoop
whirr.hardware-id=m1.large
whirr.image-id=us-east-1/ami-ccb35ea5
whirr.location-id=us-east-1

Please follow the instructions on Github to install the WhirrToolbox in ArcMap.

To launch the cluster, start ArcMap, show the catalog window, expand the 'WhirrToolbox' and double click the 'LaunchClusterTool'. Select a cluster configuration file and click the 'Ok' button.

Now, this will take a little bit less that 5 minutes.

 To see the Whirr progress, disable the background processing in the GeoProcessing Options by unchecking the 'Enable' checkbox:

Once the cluster is launched successfully, Whirr will create a folder named '.whirr' in your home folder. In addition, underneath that folder, a subfolder will be created and will be named after the value of 'whirr.cluster-name' property in the configuration file.

The file 'hadoop-site.xml' contains the Hadoop cluster properties.

I've written yet another GP tool (ClusterPropertiesTool) that will convert this xml file in a properties file. This properties file will be used in the subsequent tools when exporting, importing feature classes from and to the cluster and running map reduce jobs.

Now, all the communication with the cluster is secure and should be tunneled through an ssh connection. To test this, we will use PuTTY. But before starting PuTTY, we need the name node IP. This can be found in the 'instance' file in the '.whirr' subfolder. Make sure to get the first IP in the 'namenode' row, not the one that starts with '10.xxx'.

Start PuTTY. Put in the 'Host Name (or IP Address)' field the value of the name node IP.

In the Connection / SSH / Auth form, browse for the PuTTY formatted private key that we save previously.

 In The Connection / SSH / Tunnels, select the 'Dynamic' radio button and enter '6666' in the 'Source port' field and click the 'Add' Button.

Click the 'Open' button, and that should connect you to your cluster.

Enter your username (this is your Windows login username) and hit enter, and you should see a welcome banner and a bash shell prompt.

Once you are done with the cluster, the DestroyClusterTool GP tool will destroy the cluster.

In the next post, we will use that cluster to export, import feature classes and run map reduce jobs.

Like usual, all the source code can be found here.

Wednesday, April 24, 2013

BigData: Terracotta BigMemory and ArcGIS Webmaps

I was asked to put a Proof of Concept implementation of a very fast interactive dynamic density map generation on 11 million records for a webmap application, where a user can specify dynamically a query definition (a where clause in SQL terms), a ramp color and the service implementation will return back a density representation of the records on a map.

This is typically done via a GeoProcessing task where the data is queried and stored into an intermediate FeatureClass that is further processed by a Kernel Density that produced a raster layer that is finally visualized. As you can tell, this is not interactive nor relatively fast.

Since the traditional means of retrieving the data from a relational database is not fast enough and 11 million records is not such a big set after all, I decided to put the whole thing in memory. BTW, this is a meme that has been trending for while now, and the most vocal about it is SAP HANA.
I decided to use Terracotta's BigMemory to hold the data in the "off-heap" and use its EHCache query capability to aggregate and fetch the data.  Now despite the name, I used the cache "eternal" capabilities to forever hold the data elements.

I was given the data in CSV format, so I wrote a simple CSV reader/tokenizer that bulk loads the data into BigMemory using multiple threads. The inter thread communication was handled by the LMAX disruptor, letting me focus on the loading, parsing and putting into BigMemory.

To access this BigMemory from a webmap as a layer, I decided to write a middleware that implements the ArcGIS Rest JSON interface for an ArcGISDynamicLayer. Now, I did not write the whole interface, just the layer metadata and the export image endpoints, in such that you can access the layer as:

http://host/arcgis/rest/services/Service/MapServer

The implementation of the Rest endpoint was easily achievable using the Spring MVC framework which enables me to export POJO methods as ArcGIS Rest endpoints:
The final step is the convert the aggregated features from BigMemory into an image that is returned as a response to an export Rest request. This reminded me of the days when I implemented the first version of ArcIMS in Java back in 1996/7, where I used AWT for exactly that same purpose.  What is old is new again, and used the same technique to create an in memory image buffer and used the Graphics2D API to draw and color code the aggregated cells. I used the ImageIO to convert the image buffer into a PNG output stream.  Now ImageIO is new and if this PoC becomes reality, I will use JAI with its GPU enhancements to do the same task.

To run this PoC, I started 2 extra large Amazon EC2 instances to run BigMemory with 15GB of off heap RAM (just because I can :-) and started a medium instance to run tomcat with the Spring MVC Rest interface that communicated with two large instances.

The PoC was very well received and extended it to make the layer time aware enabling the client to slide though time aware features.

Like usual, you can download all the source code from here, here and here.

Wednesday, April 3, 2013

BigData: DataRush Workflow in ArcMap


At this year's DevSummit, we announced the GIS Tools for Hadoop project. Included in that project is a low level geometry java API which enables spatial operations in MapReduce jobs or the construction of higher level functions such as Hive User Defined Functions. However, this geometry library is not restricted to Hadoop MapReduce. It is used in Geo Event Processor, and can be used in Storm bolts or other parallel workflows. One such parallel workflow processing engine is Pervasive DataRush that I demoed at the DevSummit. Using the KNIME visual workflow, I was able to process 90 million records (BTW, small in the BigData world) in Hadoop File System, for heatmap visualization in ArcMap.


A DataRush workflow engine can run on a local machine or remotely on a cluster of machines and is fully extensible with custom operators.  An operator is a node in a workflow graph whose input and output can be linked to other operators. So, I wrote my own spatial operators that utilizes the geometry API.

The first operator is a simple spatial filter that passes through records that contain LAT/LON fields and are within a user defined distance from a given point. This uses the "GeometryEngine.geodesicDistanceOnWGS84" function and augments the input record fields with a distance "METERS" output field.

In the above application, a graph workflow reads a tab delimited file and sends its output to the new distance operator who sends its output to the a console log operator. Simple and easy, but not simplistic - if you notice that when I execute the application without any arguments, the graph runs on the local machine taking full advantage of all the local processors. However, if I specify a dummy argument, the graph takes its input from HDFS and executes remotely on a DataRush cluster. This simple toggle is pretty impressive when switching from development to production mode.

Next is to add the operator to the KNIME visual workflow.
The Pervasive folks made that really easy using a plugin for the Eclipse environment.

Using the Node Extension wizard, you can select an operator and it generates template code for the operator xml descriptor, the dialog pane, the model factory and the node settings.

This gets you 80% to 90% there. The rest is completing the XML node descriptor and laying out a nice dialog pane.


The final step is to export the compiled code into KNIME as a plugin - BTW, KNIME is based on Eclipse.  Within KNIME, you are in what I call a PHD (Push Here Dummy) mode, where you can drag and drop and link operators on the workbench.

A slightly more complicated operator, is a composite operator that, as its name suggests is composed of two or more operators, but the whole is exposed as one operator.
So, the next spatial operator extension to write is a composite one to perform a spatial join between a point input source and a polygon input source where the output is a flow indicating what polygon an input point is contained into.
The Spatial Join composite operator contains a non parallelizable operator that blocks to read first time through into an in memory model the polygons.  The polygons input format is in the Esri JSON text format and we use the "GeometryEngine.jsonToGeometry" to convert it into a Polygon POJO. The second embedded operator is parallel enabled and walks the input points and uses the builtin in memory model of polygons to find the containing one using the "GeometryEngine.contains" function.  This "build and walk" pattern is used whenever a join or lookup needs to be performed.

The above graph adds one extra built-in operator (group by) where the output is the number of points that each input polygon contains.  The result of such a flow, can be used to thematically color code the polygons in ArcMap.

Displaying thousands or millions of points on a map is, well....not very smart and does not make sense. But if all these points are binned and each bin is weighted proportionally to the points in that bin, then a map displaying the color coded bins with respect to their weight is much more appealing and conveys much more information, such as in the above map, where I was rendering a heatmap of locations with businesses with more than 100,000 employees.

So I wrote a terminal operator, the raster operator, that writes bin information as an Esri raster file in either float or ASCII format. The interesting thing about that operator is that it is a "fan in" operator and despite being part of a graph that is executed on a remote cluster, the raster operator is executed on the client machine. The client being the machine that submitted the graph to the cluster.

One last thing. A KNIME workflow can be exported into an external representation that can be imported into a logical graph for local or remote execution.

This means, I can turn that code into an ArcMap GeoProcessing extension enabling me to invoke a DataRush workflow as a Tool and combine it with other existing tools for further analysis and visualization.

In my workflow example, rather than me manually calling the Float To Raster GP Task after Executing the DataRush Tool, I can combine that into one ArcPy Script Tool:

So to recap, you can store data into HDFS, or use the new GeoProcessing tools for Hadoop. Compose DataRush workflows with spatial operators that you export to be executed on a cluster from ArcMap, whose result is consumed back by ArcMap for further local GeoProcessing or visualization.

This is very cool!

Like usual, all the source code can be downloaded from here and here.

Monday, March 4, 2013

BigData meet Raspberry Pi

The Raspberry Pi is a very neat little machine - I bootstrapped it with Soft-Float Debian "Wheezy" so I can run Java Embedded on it - I download and install ejre-7u10-fcs-b18-linux-arm-vfp-client_headless-28_nov_2012.tar, and now can run java programs on the Pi. Wrote the ubiquitous "Hello World" to make sure it works (I should not have doubted the write once run anywhere motto) and jumped directly to running a Spring container with beans. So, the next thing to do is to run the Hadoop client from my previous post on it.  Now remember, there is not a lot of RAM on the Model B Rev 2 Pi (512MB) and the post-action runs on the client machine, so I reduced the cell size and the search radius, and....it worked like a charm. Now, why did I do that ? because... I can :-)
Next, is to come up with something to do with the generated data.

BigData: Kernel Density Analysis on Hadoop MapReduce

The storage of BigData has been democratized by Hadoop. And with that, rather than bringing the data to the program for analysis, we send the program to the data. That twist comes with a challenge to take existing serial algorithms and parallelizing them. This post is about such a process. I want to perform a Kernel Density analysis on billions of records loaded into HDFS. The data is fairly simple; a set of records separated by carriage return, and the fields in each record are tab separated. The fields' values contain a latitude, a longitude and a weight.
If you do not know how Kernel Density works, check out this ArcGIS reference.
To take advantage of the distributed HDFS datanodes, I have to transform the traditional state-full sequential implementation into a model that supports stateless parallelism. MapReduce is such a model, and Hadoop can distribute a MapReduce implementation to read data from the "local" HDFS datanodes and reduces back the result into HDFS.
A Kernel Density analysis expects 3 parameters when processing spatial data; an output extent, a cell size and a search radius. From the above reference description, a input value influences the cells in its search area, and the level of influence is proportional to the input weight and inversely proportional to the distance of the cell from the input. That inverse proportionality is computed using a Kernel function.
So, in the MapReduce programming paradigm, a map function expects a key(k1),value(v1) tuple and emits zero or more new key(k2),value(v2) tuples.

map(k1,v1) -> List(k2,v2)

The map portion of the parallel Kernel Density analysis implementation will emit all the influenced cells and their associated weights for a given input. A cell is defined as a string concatenating its row and column values with a slash character.
After a shuffle and sort intermediate step, the reduce function expects a key(k2),List(v2) tuple and emits zero or more new key(k3),value(v3) tuples.

reduce(k2,list(v2)) -> List(k3,v3)

The implementation will emit the sum of all the weights of a cell
To execute the kernel density map and reduce functions, I decided to use the newly released Spring Data - Apache Hadoop project. As you know, I love the Spring Framework and have been tracking the Spring Data Hadoop project for a while now and glad that it is finally released.
The following Spring application context bootstraps an application and runs the defined job. The job declaratively reference a mapper class, a reducer class and a combiner class. In addition, the job runner defines a pre-action and a post-action. The pre-action tests the existence of the HDFS output path and removes it if it exits. The post-action references a class that converts the content of the output path into a local float raster file. You can use the Float to Raster conversion tool in ArcMap to visualize the kernel density output. And like usual, all the source code can be found here.