Monday, February 8, 2016

(Web)Mapping Elephants with Sparks

CSV files (though not the most efficient format and least expressive due to meager header metadata) is one of the most ubiquitous formats to place data in BigData stores like HDFS. In addition, geospatial information such as latitude and longitude is now the norm as fields in those CSV files origination from say a moving GPS based device.
A constant request that I receive all the time is “How do I visualize on the web all these data points?” There is a legitimate concern in this question which is “How do I visualize on the web millions and millions on points?”. Well the short answer is “You Don’t!” (actually, you can…but that is blog post for another day). Though you can download a couple of million points to a web client, after a while the transfer time will be prohibitive. However, if you process the data on the server and send down the aggregated information to be symbolized on the client, then things become more interesting.
A common aggregation processing is binning, where, imagine you have a virtual fishnet and you cast that fishnet on your point space. All the points that are in the same fishnet cell are collapsed together to be represented by that cell. What you return now are the cells and their associated aggregates.

project is a collection of Python tools using the ArcGIS System that retrieves CSV data with geospatial fields from HDFS and displays the aggregation in the form of hexagonal areas using ArcGIS online web maps and web apps. The processing is done in Python using Apache Spark.

The ArcGIS System is a sequential composition of:

  • Desktop with Python based GeoProcessing extensions for authoring.
  • Server with GeoProcessing endpoints for publishing.
  • Online with WebMaps and WebApps built using AppBuilder for presenting.

Like usual, all the source code is here.

Saturday, January 30, 2016

DBSCAN on Spark

The applications of DBSCAN clustering straddle various domains including machine learning, anomaly detection and feature learning. But my favorite part about it, is that you do not have to specify apriori the number of clusters to classify the input data. You specify a neighborhood distance and the minimum numbers of points to form a cluster and it will return back a set of clusters with the associated points in the cluster that meet the input parameters.
However, DBSCAN can consume a lot of memory when the input is very large. And since I do BigData, my data inputs will overwhelm my MacBook Pro very quickly. Since I know Hadoop MapReduce fairly well, and MR has been around for quite some time, I decided to see how other folks implemented such a solution in a distributed share nothing environment. I came across this paper, which was very inspiring and found out that IrvingC used it too as a reference implementation. So I decided to implement my own DBSCAN on Spark as a way to further my education in Scala. And boy did I learn a lot when it comes to immutable data structures, type aliasing and collection folding. BTW, I highly recommend the Twitter Scala School.
Like usual, all the source code can be found here, and make sure to check out the “How It Works?” section.

[Update] After posting - I saw this post - very nice video too!

Monday, January 4, 2016

Spark Library To Read File Geodatabase

Happy 2016 all. Yes it has been a while and thanks for your patience. Like usual, at the beginning of every year, there are the promises to eat less, exercise more, climb Ventoux and blog more. Was listening to Feakonomic (When Willpower Isn’t Enough), and this initial post of the year is to harness the power of a fresh start.

Esri has been advocating for a while to use FileGeodatabase, and actually released a C++ based API to perform read-only operations on it. However, the read has to be performed off a local file system and the read is single threaded (you could write an abstract layer on top of the API to perform a parallel partitioned read if you have the time).

In my BigData uses cases, I need to place the GDB files in HDFS so I can perform Spark based GeoAnalytics. Well, that made the usage of the C++ API difficult (as it is not using the Hadoop File System API) and will have to map the Spark API to a native API and will have to publish the DLL and…(well you can imagine the pain) - I attempted this in my Ibn Battuta Project where I relied on the GeoTools implementation of the FileGeodatabase, but was not too happy with it.

I asked the core team if they will have a pure Java implementation of the API, but they told me it was low on their list of priorities. Googling around, I found somebody that published a reversed engineered specification. My co-worker Danny H. took an initial stab at the implementation and over the holidays, I took over targeting the Spark API and the DataFrames API. The implementation will enable me to do something like:

sc.gdbFile("hdfs:///data/Test.gdb", "Points", numPartitions = 2).map(row => {  row.getAs[Geometry](row.fieldIndex("Shape")).buffer(1)}).foreach(println)

and in SQL:

val df =
option("path", "hdfs:///data/Test.gdb").
option("name", "Lines").
option("numPartitions", "2").
sqlContext.sql("select * from lines").show()

Pretty cool, no ? Like usual all the source code can be found here.

Friday, August 21, 2015

A Whale and a Python GeoSearching on a Photon Wave

In the last post, we walked through how to setup Elasticsearch in a Docker container and how to bulk load the content of an ArcGIS feature class into ES, in such that it can be spatially searchable from an ArcPy based tool.

There was something nagging me about my mac development environment, as I was using docker in VirtualBox and ArcGIS Desktop on Windows in WMWare Fusion. I wish I had one unified virtualized environment.

Well, while at MesosCon in Seattle, I stopped by the VMWare booth and the folks there told me about a new project named Photon™. It is "a minimal Linux container host. It is designed to have a small footprint and boot extremely quickly on VMware platforms. Photon™ is intended to invite collaboration around running containerized applications in a virtualized environment.” - That was exactly what I needed, and docker is built into it !

See, what also got me excited, was the fact that in a couple of weeks, I will be visiting a very forward thinking client that is willing to bootstrap a cluster on an on-premise WMWare based cloud with Linux for a BigData project. See, his IT department is a Windows shop and I was going to ask him to install CentOS and yum install docker and all that jazz. As you can imagine, that was going to raise some eyebrows. However, now that Photon™ is made by VMWare, it will trusted by the customer (I hope) to move forward with focusing on the BigData aspect of the project and not be dragged down with Linux installation issues.

The following, is a retrofit of the walk through, but using Photon™. And the best part is….there are no changes due to docker’s universality.

I’m using VMWare Fusion on mac, so I followed these instructions. However, I set up Photon™ with 4 CPUs and 4 GB of RAM.

Once the system was up, I logged in as root, and got the IP address that is bound to eth0 using the ifconfig command.

I created a folder named config, and populated it with the following Elasticsearch configuration files:
$ mkdir config

$ cat << EOF > config/elasticsearch.yml elasticsearch
index.number_of_shards: 1
index.number_of_replicas: 0
network.bind_host: dev
network.publish_host: dev
cluster.routing.allocation.disk.threshold_enabled: false
action.disable_delete_all_indices: true

$ cat << EOF > config/logging.yml
es.logger.level: INFO
rootLogger: ${es.logger.level}, console
  action: DEBUG
  com.amazonaws: WARN
    type: console
      type: consolePattern
      conversionPattern: "[%d{ISO8601}][%-5p][%-25c] %m%n"

Next, I started Elasticsearch in docker:

docker run -d -p 9200:9200 -p 9300:9300 -h dev -v /root/config:/usr/share/elasticsearch/config elasticsearch

And validated that ES is up and running by opening a browser on my mac and navigated to IP_ADDRESS:9200 and got:

status: 200,
cluster_name: "elasticsearch",
version: {
 number: "1.7.1",
 build_hash: "b88f43fc40b0bcd7f173a1f9ee2e97816de80b19",
 build_timestamp: "2015-07-29T09:54:16Z",
 build_snapshot: false,
 lucene_version: "4.10.4"
tagline: "You Know, for Search"

Excellent! From then on, the walk through is as previously described, but now I have one unified environment and that will be the same when in two weeks I will be on-site.

Final note: I set to yes the value of PermitRootLogin in the /etc/ssh/sshd_config file to able remote login as root into the VM from my mac iTerm. I recommend that you check out the FAQs.

Resources: Update to Docker 1.6

Bulk Load Features from ArcGIS Into Elasticsearch

I really like Elasticsearch because it natively supports geo spatial types and queries. I just added to gitbub a ArcPy based toolbox to bulk load the content of a feature class into an ES index/type.

The toolbox contains yet another tool as a proof-of-concept to spatially query the loaded document.

Thursday, August 13, 2015

BigData Point-In-Polygon GeoEnrichment

I’m always handed a huge set of CSV records with lat/lon coordinates, and the task at hand is to spatially join these records with a huge set of feature polygons where the output is a geoenhancement of the orignal points with the intersecting polygon’s attributes. An example is a set of retailer customer locations that need to be spatially intersected with demographic polygons for targeted advertisement (Sorry to send you all more junk mail :-).

This is a reference implementation, where both the points data and the polygon data are stored in raw text TSV format and the polygon geometries are in WKT format. Not the most efficient format, but at least the input is splittable for massive parallelization.

The feature class polygons can be converted to WKT using this ArcPy tool.

This Spark based job can be executed in local mode, or better in this docker container.

One of these days will have to re-implement the reading of the polygons from a binary source such as shapefiles or file geodatabases. Until then, you can download all the source code from here.

Wednesday, July 29, 2015

BigData Goes to Die on USB Drives

With the User Conference plenary behind me, I can catch up on my blog posts. I want to share something with you all. A BigData request pattern that I have been encountering a lot lately and how I've been responding to it.
See, people are accumulating lots of data and their traditional means to store and process (forget visualize) this data have been overwhelmed. So, they offload their "old" data into USB drives to make room for the new data. And IMHO BigData goes to die on USB drives, and that is a shame. And a lot of this offloading happens as CSV file exports. I wish they do the exports in something like Avro where data and schema is stored together.
Nevertheless, they now want to process all this data and I'm handed these drives with the statement “What can we do with these ?"
Lot of this data (and I'm talking millions and millions of records) has a spatial and temporal component and the questions to me (as I do geo) is more like “How can I filter, dissect and view on a map this data?"
Typically, these folks have virtualized environments on or off premise and can spin up a couple of Linux or Windows machines very quickly. Well...more than a couple, I usually request 4 to start. Once I have the machines' IPs, I install Hadoop using either Hortonworks or Cloudera depends on what the client has adopted and Elasticsearch.
From the Hadoop stack, I only install Zookeeper, YARN, HDFS and Spark. Some folks might wonder, why not use Solr since it is part of the distribution. I find ES easier to use and Hadoop is "temporary" as eventually, I just need Spark and Elasticsearch.
Then, I start transferring all the data off the USB drives to HDFS. See, HDFS gives me the parallel read that I need to massively bulk load the data using Spark into ES, where all the data is indexed in a spatial, temporal and attribute matter.
Once the data is in Elasticsearch, I can query it using ArcPy from ArcGIS Desktop or Server as a GeoProcessing extension or service.
If the resulting data is "small", then the GP returns a FeatureSet instance - however, that is not what is interesting in the data. What these folks want to see is how the data is clustered and what temporal and spatial patterns are been formed? What are the “life” patterns? What they are interested in is seeing the density, hotspots and more importantly the emerging hotpspots. That is where the Esri platform excels !
Like usual, all the source code to do this here. It is a bit crude, but it works.