Saturday, September 21, 2013

BigData: Apache Flume, HDFS and HBase

In this post, I will show how to log very large amount of web requests to a BigData storage for traffic analysis. The source code for the project is on github. We will rely on the logging library log4j and the associated Flume NG appender implementation. For storage, we will place the log information into a set of HDFS files or into an HBase table. The HDFS files will be mapped into a Hive table as partitions for query and aggregation.
In this demonstration, all web requests are handled by a very simple web application servlet that is mapped to an access url:
In practice, all logic is handled by the servlet and the logging of the request information will be handled by a servlet filter that uses the log4j logging API.
In this demonstration the path info will define the log level and the query string will define the log message content.
Log4j is configured using the resource log4j.properties to use a Flume NG appender:
The appender will send all log events to a Flume agent instantiated on the host defined by the property log4j.appender.flume.Hostname.
The log event body content is defined by the ConversionPattern property, where each of the string tokens are separated by a tab character. This is how the content will be stored into an HDFS file and parsed back into tokens to be stored as columns into an HBase table.
Before starting the web application, a flume agent has to be instantiated with the predefined configuration:
This agent has an avro source that accepts incoming events on port 51515. The source has an interceptor to ensure that all incoming events have a timestamp.  The events are forwarded to an HDFS sink through a memory channel.  The HDFS sink writes the events to a file where the path of the file is specified by the current year, month, day and hour. Each file is closed on the hour and all events are "rolled" into a newly created file.  The content of the file is in a pure text format in this demonstration.   For efficiency, I could have used a binary compressed format.
You can start an agent from the command line as follows:
$ flume-ng agent -n agent -c conf -f conf/agent.properties
Or you can use a GUI like the Cloudera Manager:

The HDFS files are mapped onto a Hive partitioned table for query as follows:
And as file partitions are added onto the file system, the table can be altered to account for that new partition.  Here is an example:
The following is an agent configuration that will place all log events into an HBase table where each of the log event elements populates a column:
Note: Make sure to first create the HBase table using the shell:
create 'log4j',{NAME=>'info',VERSIONS=>1}
This agent has an avro source that listens for incoming event on port 61616 and forwards these event s to an HBase sink though a memory channel.  The Hbase sink uses a regular expression parser to tokenize the event body content and each token is respectively placed into a column in a table.
Like usual, all the source code is available here.

Friday, September 6, 2013

BigData GeoEnrichment

What is GeoEnrichment? An example would best describe it. Given a big set of customer location records, I would like each location to be GeoEnriched with the average income of the zip code where that location falls into and with the number of people between the age of 25 and 30 that live in that zip code.

Before GeoEnrichment:
CustId,Lat,Lon

After GeoEnrichment:
CustId,Lat,Lon,AverageIncome,Age25To30

Of course the key to this whole thing is the spatial reference data :-) and there are a lot of search options, such as Point-In-Polygon, Nearest Neighbor and enrichment based on a Drive Time Polygon from each location.

I've implemented two search methods:
  • Point-In-Polygon method
  • Nearest Neighbor Weighted method
The Point-In-Polygon (PiP) method is fairly simple. Given a point, find the polygon it falls into and pull from the polygon feature the selected attributes and add them to the original point.

The Nearest Neighbor Weighted (NNW) method finds all the reference points within a specified distance and weights each point based on its distance. The GeoEnrichment value is the sum of the weighted attribute value.

You can more details about this here, where I've used HBase and Geometry API for Java to perform the GeoEnrichment.