Wednesday, August 14, 2013

BigData: Experiments with Apache Avro and Parquet

 

In the GIS tools for Hadoop, we store and retrieve feature classes in Esri JSON or GeoJSON formats to and from HDFS. This post is about a set of experiments in different storage and serialization technics of feature classes. The two environments I evaluated are Apache Avro and Parquet. All the source code can be found here.

Experiment I - Simple Feature Class Schema

Despite that Avro has the capability of dynamic typing and being old fashion, I wanted to generate a set of POJOs for a simple feature class schema.  In my mind, a feature has a geometry and a set of attributes.   A geometry has a spatial reference and can be either a point, line or a polygon.  A point is made up of one coordinate. A coordinate is simply made up of an x and y value. Lines are made up of paths, where a path is a set of coordinates. Polygons are made of rings, where a ring is a set of coordinates. Based on the Avro specification, I wrote two schemas; one for a generic feature and one for a point feature. The feature attributes is a set of key values, where the key is in a text format, and the values can be either numeric or textural.

Using maven, a set of concrete class can be generated to be used in static programming:

$ mvn avro:schema

Experiment II - Generate Data

Once the classes are generated, the project is compiled and the set of features with random geographic coordinates is generated and placed into HDFS:

$ mvn exec:java -q -Dexec.mainClass=com.esri.RandomPoints -Dexec.args="cloudera hdfs://localhost.localdomain:8020/user/cloudera/points/points.avro"

Experiment II - View Avro Data

Since the Avro data is in a binary format, a "cat" of the data will render gibberish.  So I borrowed an idea from the Avro tool project to dump the content to the console in JSON text format:

$ mvn exec:java -q -Dexec.mainClass=com.esri.AvroToJson -Dexec.args="hdfs://localhost.localdomain:8020/user/cloudera/points/points.avro 10"

Experiment III - Run MapReduce Job on Avro Data

The MapReduce job in mind is a simple bin clustering, where the output is the number of points per bin - think of a bin as a cell in a grid.  Here are the mapper and the reducer.

private static final class FeatureMapper extends AvroMapper<
AvroPointFeature, Pair<Long, Integer>>
{
@Override
public void map(
final AvroPointFeature feature,
final AvroCollector<Pair<Long, Integer>> collector,
final Reporter reporter)
throws IOException
{
final AvroPoint avroPoint = feature.getGeometry();
final AvroCoord coord = avroPoint.getCoord();
final long x = (long) Math.floor(coord.getX() - XMIN);
final long y = (long) Math.floor(coord.getY() - YMIN);
final long g = (y << 32) | x;
collector.collect(new Pair<Long, Integer>(g, 1));
}
}
private static final class FeatureReducer extends AvroReducer<Long, Integer,
Pair<Long, Integer>>
{
@Override
public void reduce(
final Long key,
final Iterable<Integer> values,
final AvroCollector<Pair<Long, Integer>> collector,
final Reporter reporter)
throws IOException
{
int sum = 0;
for (final Integer value : values)
{
sum += value;
}
collector.collect(new Pair<Long, Integer>(key, sum));
}
}

Of course the Avro output can be viewed using the Avro to JSON tool from the second experiment :-)

Experiment IV - GeoProcessing Tool To Generate Schema.

I wanted to generate an Avro schema that closely resembles a specific simple feature class metadata. So, I wrote the SchemaTool as an ArcMap extension that writes to HDFS the specific Avro schema of a given feature class.

Experiment V - Export Feature Class Based on Specific Schema

Now that we have a specific schema, I wrote yet another extension (ExportToGenericAvro) to export the content of a feature class into an Avro formatted HDFS file. Again, you can validate the content using the Avro to JSON tool :-)

Experiment VI - Use Hive To Query Avro Data

Hive takes a SQL-like statement and converts to a MapReduce Job to operate on a table that is mapped to an HDFS path. Hive maps binary content as table using a concept called SerDe. In my experiments, I am using the Cloudera VM as my sandbox and it comes preinstalled with Hive and all kind of SerDe. An external Hive table can be defined as follows:

CREATE EXTERNAL TABLE points
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION '/user/cloudera/points/'
TBLPROPERTIES ('avro.schema.url'='hdfs://localhost.localdomain/user/cloudera/points.avsc');

Once the table is registered, you can start querying it.  Here is a sample query:

select geometry.coord,attributes['id'] from points limit 10;

Experiment VII - Using Spatial UDFs

The GIS Tools for Hadoop comes with a set of Spatial Hive User Defined Functions. This enable you to "augment" HQL with spatial operations. Make sure to first add the geometry and spatial framework jars to Hive and register the spatial UDFs. In my case, I cloned the projects, compiled them and copied the resulting jars to my local home folder from where I launched Hive.

hive> add jar /home/cloudera/esri-geometry-api-1.2-SNAPSHOT.jar;
hive> add jar /home/cloudera/spatial-sdk-hive-1.0-SNAPSHOT.jar;
hive> source function-ddl.sql;

Now, I can perform spatial queries on my points table. Here is a sample query that counts the number of points that are within a certain distance from a specified point:

hive> select count(1) from points
where ST_DISTANCE(ST_POINT(geometry.coord.x,geometry.coord.y),ST_POINT(75,24)) < 5;

Experiment VIII - Parquet Format.

So, all the above experiments place the data in a row format in HDFS.  Parquet is a columnar storage for Hadoop. Typically the column schema is defined in Apache Thrift. However, there is a sub project that enables you to define your column schema in Avro - cool, no ? So I wrote yet another tool (ExportToAvroParquetTool) to export the content of a feature class into Parquet based format and based on a specific schema defined in Avro.

Final Thoughts…

Time is due to evaluate performance in capacity and speed - that will be an interesting post - per my readings, Twitter is so impressed with Parquet that they are already moving it into production - I like Avro, as it enables a future proof schema changes - after all you cannot "readjust" your data to fit a new schema when it is in the petabytes :-)

I do all my development on a mac and have a Windows VM running ArcMap and the Cloudera VM running Hadoop all on the same machine (Yea, it is a beefy machine :-) I had to use Putty to SSH tunnel all the ArcMap communication with Hadoop - check out this post for some hints.

Rather than using Hive, I desperately wanted to use Impala, however as of this writing, Impala does not support structs, maps nor UDFs. But when it does….it will be AMAZING, as in the past, when still in beta, it saved my "derrière", where I could query 4.5 billion records in under 20 seconds live on a stage in front of thousand of people !

Again, like usual, all the source code can be download from here.

2 comments:

Ravi Chetan said...

Very informative post, thank you. Also, can you please let me know if there is a way to come-up with generic avro schema file for a esri rest service exposed? or should it be built manually?

Mansour Raad said...

Not aware of an automated way to do it - there could be a way - today, my implementation is simple that I hand code it.