Saturday, September 21, 2013

BigData: Apache Flume, HDFS and HBase

In this post, I will show how to log very large amount of web requests to a BigData storage for traffic analysis. The source code for the project is on github. We will rely on the logging library log4j and the associated Flume NG appender implementation. For storage, we will place the log information into a set of HDFS files or into an HBase table. The HDFS files will be mapped into a Hive table as partitions for query and aggregation.
In this demonstration, all web requests are handled by a very simple web application servlet that is mapped to an access url:
In practice, all logic is handled by the servlet and the logging of the request information will be handled by a servlet filter that uses the log4j logging API.
In this demonstration the path info will define the log level and the query string will define the log message content.
Log4j is configured using the resource log4j.properties to use a Flume NG appender:
The appender will send all log events to a Flume agent instantiated on the host defined by the property log4j.appender.flume.Hostname.
The log event body content is defined by the ConversionPattern property, where each of the string tokens are separated by a tab character. This is how the content will be stored into an HDFS file and parsed back into tokens to be stored as columns into an HBase table.
Before starting the web application, a flume agent has to be instantiated with the predefined configuration:
This agent has an avro source that accepts incoming events on port 51515. The source has an interceptor to ensure that all incoming events have a timestamp.  The events are forwarded to an HDFS sink through a memory channel.  The HDFS sink writes the events to a file where the path of the file is specified by the current year, month, day and hour. Each file is closed on the hour and all events are "rolled" into a newly created file.  The content of the file is in a pure text format in this demonstration.   For efficiency, I could have used a binary compressed format.
You can start an agent from the command line as follows:
$ flume-ng agent -n agent -c conf -f conf/agent.properties
Or you can use a GUI like the Cloudera Manager:

The HDFS files are mapped onto a Hive partitioned table for query as follows:
And as file partitions are added onto the file system, the table can be altered to account for that new partition.  Here is an example:
The following is an agent configuration that will place all log events into an HBase table where each of the log event elements populates a column:
Note: Make sure to first create the HBase table using the shell:
create 'log4j',{NAME=>'info',VERSIONS=>1}
This agent has an avro source that listens for incoming event on port 61616 and forwards these event s to an HBase sink though a memory channel.  The Hbase sink uses a regular expression parser to tokenize the event body content and each token is respectively placed into a column in a table.
Like usual, all the source code is available here.

Friday, September 6, 2013

BigData GeoEnrichment

What is GeoEnrichment? An example would best describe it. Given a big set of customer location records, I would like each location to be GeoEnriched with the average income of the zip code where that location falls into and with the number of people between the age of 25 and 30 that live in that zip code.

Before GeoEnrichment:
CustId,Lat,Lon

After GeoEnrichment:
CustId,Lat,Lon,AverageIncome,Age25To30

Of course the key to this whole thing is the spatial reference data :-) and there are a lot of search options, such as Point-In-Polygon, Nearest Neighbor and enrichment based on a Drive Time Polygon from each location.

I've implemented two search methods:
  • Point-In-Polygon method
  • Nearest Neighbor Weighted method
The Point-In-Polygon (PiP) method is fairly simple. Given a point, find the polygon it falls into and pull from the polygon feature the selected attributes and add them to the original point.

The Nearest Neighbor Weighted (NNW) method finds all the reference points within a specified distance and weights each point based on its distance. The GeoEnrichment value is the sum of the weighted attribute value.

You can more details about this here, where I've used HBase and Geometry API for Java to perform the GeoEnrichment.

Tuesday, August 27, 2013

BigData Spatial Indexes and HBase Toolbox

This is a set of experiments with HBase. From within ArcMap, you can create an HTable and export the content of a feature class into that table. The HTable is mapped into a Hive table to perform SQL Like queries on it and of course the Spatial Framework UDFs can be applied for spatial operations. If Hive is not fast enough for you, try Impala. Finally, by taking advantage of the RowKey design, a spatial index is derived using geohash to be later used in a MapReduce big spatial joins job.

Like usual all the source code can be found here.

Wednesday, August 14, 2013

BigData: Experiments with Apache Avro and Parquet

 

In the GIS tools for Hadoop, we store and retrieve feature classes in Esri JSON or GeoJSON formats to and from HDFS. This post is about a set of experiments in different storage and serialization technics of feature classes. The two environments I evaluated are Apache Avro and Parquet. All the source code can be found here.

Experiment I - Simple Feature Class Schema

Despite that Avro has the capability of dynamic typing and being old fashion, I wanted to generate a set of POJOs for a simple feature class schema.  In my mind, a feature has a geometry and a set of attributes.   A geometry has a spatial reference and can be either a point, line or a polygon.  A point is made up of one coordinate. A coordinate is simply made up of an x and y value. Lines are made up of paths, where a path is a set of coordinates. Polygons are made of rings, where a ring is a set of coordinates. Based on the Avro specification, I wrote two schemas; one for a generic feature and one for a point feature. The feature attributes is a set of key values, where the key is in a text format, and the values can be either numeric or textural.

Using maven, a set of concrete class can be generated to be used in static programming:

$ mvn avro:schema

Experiment II - Generate Data

Once the classes are generated, the project is compiled and the set of features with random geographic coordinates is generated and placed into HDFS:

$ mvn exec:java -q -Dexec.mainClass=com.esri.RandomPoints -Dexec.args="cloudera hdfs://localhost.localdomain:8020/user/cloudera/points/points.avro"

Experiment II - View Avro Data

Since the Avro data is in a binary format, a "cat" of the data will render gibberish.  So I borrowed an idea from the Avro tool project to dump the content to the console in JSON text format:

$ mvn exec:java -q -Dexec.mainClass=com.esri.AvroToJson -Dexec.args="hdfs://localhost.localdomain:8020/user/cloudera/points/points.avro 10"

Experiment III - Run MapReduce Job on Avro Data

The MapReduce job in mind is a simple bin clustering, where the output is the number of points per bin - think of a bin as a cell in a grid.  Here are the mapper and the reducer.

private static final class FeatureMapper extends AvroMapper<
AvroPointFeature, Pair<Long, Integer>>
{
@Override
public void map(
final AvroPointFeature feature,
final AvroCollector<Pair<Long, Integer>> collector,
final Reporter reporter)
throws IOException
{
final AvroPoint avroPoint = feature.getGeometry();
final AvroCoord coord = avroPoint.getCoord();
final long x = (long) Math.floor(coord.getX() - XMIN);
final long y = (long) Math.floor(coord.getY() - YMIN);
final long g = (y << 32) | x;
collector.collect(new Pair<Long, Integer>(g, 1));
}
}
private static final class FeatureReducer extends AvroReducer<Long, Integer,
Pair<Long, Integer>>
{
@Override
public void reduce(
final Long key,
final Iterable<Integer> values,
final AvroCollector<Pair<Long, Integer>> collector,
final Reporter reporter)
throws IOException
{
int sum = 0;
for (final Integer value : values)
{
sum += value;
}
collector.collect(new Pair<Long, Integer>(key, sum));
}
}

Of course the Avro output can be viewed using the Avro to JSON tool from the second experiment :-)

Experiment IV - GeoProcessing Tool To Generate Schema.

I wanted to generate an Avro schema that closely resembles a specific simple feature class metadata. So, I wrote the SchemaTool as an ArcMap extension that writes to HDFS the specific Avro schema of a given feature class.

Experiment V - Export Feature Class Based on Specific Schema

Now that we have a specific schema, I wrote yet another extension (ExportToGenericAvro) to export the content of a feature class into an Avro formatted HDFS file. Again, you can validate the content using the Avro to JSON tool :-)

Experiment VI - Use Hive To Query Avro Data

Hive takes a SQL-like statement and converts to a MapReduce Job to operate on a table that is mapped to an HDFS path. Hive maps binary content as table using a concept called SerDe. In my experiments, I am using the Cloudera VM as my sandbox and it comes preinstalled with Hive and all kind of SerDe. An external Hive table can be defined as follows:

CREATE EXTERNAL TABLE points
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION '/user/cloudera/points/'
TBLPROPERTIES ('avro.schema.url'='hdfs://localhost.localdomain/user/cloudera/points.avsc');

Once the table is registered, you can start querying it.  Here is a sample query:

select geometry.coord,attributes['id'] from points limit 10;

Experiment VII - Using Spatial UDFs

The GIS Tools for Hadoop comes with a set of Spatial Hive User Defined Functions. This enable you to "augment" HQL with spatial operations. Make sure to first add the geometry and spatial framework jars to Hive and register the spatial UDFs. In my case, I cloned the projects, compiled them and copied the resulting jars to my local home folder from where I launched Hive.

hive> add jar /home/cloudera/esri-geometry-api-1.2-SNAPSHOT.jar;
hive> add jar /home/cloudera/spatial-sdk-hive-1.0-SNAPSHOT.jar;
hive> source function-ddl.sql;

Now, I can perform spatial queries on my points table. Here is a sample query that counts the number of points that are within a certain distance from a specified point:

hive> select count(1) from points
where ST_DISTANCE(ST_POINT(geometry.coord.x,geometry.coord.y),ST_POINT(75,24)) < 5;

Experiment VIII - Parquet Format.

So, all the above experiments place the data in a row format in HDFS.  Parquet is a columnar storage for Hadoop. Typically the column schema is defined in Apache Thrift. However, there is a sub project that enables you to define your column schema in Avro - cool, no ? So I wrote yet another tool (ExportToAvroParquetTool) to export the content of a feature class into Parquet based format and based on a specific schema defined in Avro.

Final Thoughts…

Time is due to evaluate performance in capacity and speed - that will be an interesting post - per my readings, Twitter is so impressed with Parquet that they are already moving it into production - I like Avro, as it enables a future proof schema changes - after all you cannot "readjust" your data to fit a new schema when it is in the petabytes :-)

I do all my development on a mac and have a Windows VM running ArcMap and the Cloudera VM running Hadoop all on the same machine (Yea, it is a beefy machine :-) I had to use Putty to SSH tunnel all the ArcMap communication with Hadoop - check out this post for some hints.

Rather than using Hive, I desperately wanted to use Impala, however as of this writing, Impala does not support structs, maps nor UDFs. But when it does….it will be AMAZING, as in the past, when still in beta, it saved my "derrière", where I could query 4.5 billion records in under 20 seconds live on a stage in front of thousand of people !

Again, like usual, all the source code can be download from here.

Monday, July 29, 2013

Minecraft: The Gamification of GIS BigData

So in this short post I am integrating 4 things that I love the most - GIS, Hadoop, BigMemory and Minecraft - The idea is the following, I would like to visualize in Minecraft the result of Kernel Density calculation performed by Hadoop.  The job output is placed into distributed memory rather than back into HDFS in such that a Minecraft server can read that memory and populate a world that I have teleported into.

Here is a short video that showcases the last steps after I started the BigMemory server and ran the Hadoop Job.
Like usual all the source code is available here.

Monday, July 15, 2013

Minecraft: The Gamification of GIS

At this year's User Conference, I presented in the Esri Labs booth "The Gamification of GIS", where I showcased how a Minecraft player can teleport and interact with GIS worlds. It was a simple proof of concept, but I believe the ramification can be significant, especially after listening to Amber Case talking about STEM in the plenary.
My boys and their friends can play Minecraft for hours. They build collaboratively amazing online worlds and pixel art. And despite the "blockiness" of the worlds and the art, the game is pretty immersive.
One day my son tells me "Hey dad, I think I can build one of your worlds in Minecraft". What he meant about my world is a GIS world that I render using ArcMap. So I started thinking about this and wondered if I too can do this, but programmatically. A bit of googling, and found out that I can program the Minecraft server using Java.  My boys play Minecraft on their XBox and have a desktop version too. The latter has additional features such as connecting to your own server that is extensible with plugins.
So, I setup my own server, and started programming my own plugin to build my GIS based Minecraft worlds.
Since Minecraft is based on blocks, I needed to convert the world vector features into "block" . The raster format fits that requirement. Using ArcMap's built-in conversion tools, I can convert features to raster, where the raster values are based on the features attribute values.  The idea in the usage of the attribute values is to create worlds with heights that are proportional to the attribute values. To enable the plugin to read these raster values, I converted the raster to float values.
To know where I am in these generated worlds, I need sign posts. In GIS parlance, a sign post is a symbol to render point features. So I used the built-in data management feature to point tool, followed by the execution of a Python based GeoProcessing task (thanks @pheede) to convert the point feature class to a CSV file that can be read by the plugin to create sign post block types. The text on the sign post is based on a feature attribute value.
My UC presentation was based on navigating the countries of the world, where the height of each country is proportional to its population and the sign posts in each country are annotated with the country name.

Here is the sequence of steps that I followed to enable that navigation:
After installing and setting up a Bukkit server (as of this writing, I am using version 1.6.2 Build #2815 to work with the latest 1.6.2 Desktop version), a plugins folder will be created.
Stop the server and download into the plugins folder the Multiverse-Core-2.5.jar and the RasterPlugin-1.0-SNAPSHOT.jar.
Download and unzip the content of the minecraft-cntry.zip file into your home folder.  You should see 3 files; cntry.flt, cntry.hdr and cntry.csv
Start the server and issue the following commands at the server command line prompt:
mv create cntry normal -g RasterPlugin:/path/to/your/home/folder/cntry.flt -t FLAT
This command uses the Multiverse plugin to create a normal flat world named 'cntry' using the RasterPlugin generator.
To place the sign posts in the generated world, type the following command:
label cntry /path/to/your/home/folder/cntry.csv
Now start the Minecraft desktop application. Click the 'Multiplayer' button. Click the 'Direct Connect' button. Enter localhost in the Server Address text input, then click the 'Join Server' button.
Make sure the keyboard focus is in the Minecraft application by clicking on it. To teleport to the newly generated 'cntry' world, type the following command:
/mv tp cntry
Place yourself in 'creative' mode:
/gamemode 1
To teleport yourself to the highest location in that world, issue the following command:
/pos max
Start walking or flying around and explore.
Hope you find this fun and educational - like usual all the source code can be downloaded from here.
This post is dedicated to my son Coln who is my inspiration. Him and his Minecraft friends raised some serious cash at this year's St Baldrick's Foundation event.
IMG 1221