Saturday, January 30, 2016

DBSCAN on Spark

The applications of DBSCAN clustering straddle various domains including machine learning, anomaly detection and feature learning. But my favorite part about it, is that you do not have to specify apriori the number of clusters to classify the input data. You specify a neighborhood distance and the minimum numbers of points to form a cluster and it will return back a set of clusters with the associated points in the cluster that meet the input parameters.
However, DBSCAN can consume a lot of memory when the input is very large. And since I do BigData, my data inputs will overwhelm my MacBook Pro very quickly. Since I know Hadoop MapReduce fairly well, and MR has been around for quite some time, I decided to see how other folks implemented such a solution in a distributed share nothing environment. I came across this paper, which was very inspiring and found out that IrvingC used it too as a reference implementation. So I decided to implement my own DBSCAN on Spark as a way to further my education in Scala. And boy did I learn a lot when it comes to immutable data structures, type aliasing and collection folding. BTW, I highly recommend the Twitter Scala School.
Like usual, all the source code can be found here, and make sure to check out the “How It Works?” section.

[Update] After posting - I saw this post - very nice video too!

Monday, January 4, 2016

Spark Library To Read File Geodatabase

Happy 2016 all. Yes it has been a while and thanks for your patience. Like usual, at the beginning of every year, there are the promises to eat less, exercise more, climb Ventoux and blog more. Was listening to Feakonomic (When Willpower Isn’t Enough), and this initial post of the year is to harness the power of a fresh start.

Esri has been advocating for a while to use FileGeodatabase, and actually released a C++ based API to perform read-only operations on it. However, the read has to be performed off a local file system and the read is single threaded (you could write an abstract layer on top of the API to perform a parallel partitioned read if you have the time).

In my BigData uses cases, I need to place the GDB files in HDFS so I can perform Spark based GeoAnalytics. Well, that made the usage of the C++ API difficult (as it is not using the Hadoop File System API) and will have to map the Spark API to a native API and will have to publish the DLL and…(well you can imagine the pain) - I attempted this in my Ibn Battuta Project where I relied on the GeoTools implementation of the FileGeodatabase, but was not too happy with it.

I asked the core team if they will have a pure Java implementation of the API, but they told me it was low on their list of priorities. Googling around, I found somebody that published a reversed engineered specification. My co-worker Danny H. took an initial stab at the implementation and over the holidays, I took over targeting the Spark API and the DataFrames API. The implementation will enable me to do something like:

sc.gdbFile("hdfs:///data/Test.gdb", "Points", numPartitions = 2).map(row => {  row.getAs[Geometry](row.fieldIndex("Shape")).buffer(1)}).foreach(println)

and in SQL:

val df = sqlContext.read.
format("com.esri.gdb").
option("path", "hdfs:///data/Test.gdb").
option("name", "Lines").
option("numPartitions", "2").
load()
df.registerTempTable("lines")
sqlContext.sql("select * from lines").show()

Pretty cool, no ? Like usual all the source code can be found here.