Friday, August 21, 2015

A Whale and a Python GeoSearching on a Photon Wave

In the last post, we walked through how to setup Elasticsearch in a Docker container and how to bulk load the content of an ArcGIS feature class into ES, in such that it can be spatially searchable from an ArcPy based tool.

There was something nagging me about my mac development environment, as I was using docker in VirtualBox and ArcGIS Desktop on Windows in WMWare Fusion. I wish I had one unified virtualized environment.

Well, while at MesosCon in Seattle, I stopped by the VMWare booth and the folks there told me about a new project named Photon™. It is "a minimal Linux container host. It is designed to have a small footprint and boot extremely quickly on VMware platforms. Photon™ is intended to invite collaboration around running containerized applications in a virtualized environment.” - That was exactly what I needed, and docker is built into it !

See, what also got me excited, was the fact that in a couple of weeks, I will be visiting a very forward thinking client that is willing to bootstrap a cluster on an on-premise WMWare based cloud with Linux for a BigData project. See, his IT department is a Windows shop and I was going to ask him to install CentOS and yum install docker and all that jazz. As you can imagine, that was going to raise some eyebrows. However, now that Photon™ is made by VMWare, it will trusted by the customer (I hope) to move forward with focusing on the BigData aspect of the project and not be dragged down with Linux installation issues.

The following, is a retrofit of the walk through, but using Photon™. And the best part is….there are no changes due to docker’s universality.

I’m using VMWare Fusion on mac, so I followed these instructions. However, I set up Photon™ with 4 CPUs and 4 GB of RAM.

Once the system was up, I logged in as root, and got the IP address that is bound to eth0 using the ifconfig command.

I created a folder named config, and populated it with the following Elasticsearch configuration files:
$ mkdir config

$ cat << EOF > config/elasticsearch.yml
cluster.name: elasticsearch
index.number_of_shards: 1
index.number_of_replicas: 0
network.bind_host: dev
network.publish_host: dev
cluster.routing.allocation.disk.threshold_enabled: false
action.disable_delete_all_indices: true
EOF

$ cat << EOF > config/logging.yml
es.logger.level: INFO
rootLogger: ${es.logger.level}, console
logger:
  action: DEBUG
  com.amazonaws: WARN
appender:
  console:
    type: console
    layout:
      type: consolePattern
      conversionPattern: "[%d{ISO8601}][%-5p][%-25c] %m%n"
EOF

Next, I started Elasticsearch in docker:

docker run -d -p 9200:9200 -p 9300:9300 -h dev -v /root/config:/usr/share/elasticsearch/config elasticsearch

And validated that ES is up and running by opening a browser on my mac and navigated to IP_ADDRESS:9200 and got:

{
status: 200,
name:"Longshot",
cluster_name: "elasticsearch",
version: {
 number: "1.7.1",
 build_hash: "b88f43fc40b0bcd7f173a1f9ee2e97816de80b19",
 build_timestamp: "2015-07-29T09:54:16Z",
 build_snapshot: false,
 lucene_version: "4.10.4"
},
tagline: "You Know, for Search"
}

Excellent! From then on, the walk through is as previously described, but now I have one unified environment and that will be the same when in two weeks I will be on-site.

Final note: I set to yes the value of PermitRootLogin in the /etc/ssh/sshd_config file to able remote login as root into the VM from my mac iTerm. I recommend that you check out the FAQs.

Resources: Update to Docker 1.6

Bulk Load Features from ArcGIS Into Elasticsearch

I really like Elasticsearch because it natively supports geo spatial types and queries. I just added to gitbub a ArcPy based toolbox to bulk load the content of a feature class into an ES index/type.

The toolbox contains yet another tool as a proof-of-concept to spatially query the loaded document.

Thursday, August 13, 2015

BigData Point-In-Polygon GeoEnrichment

I’m always handed a huge set of CSV records with lat/lon coordinates, and the task at hand is to spatially join these records with a huge set of feature polygons where the output is a geoenhancement of the orignal points with the intersecting polygon’s attributes. An example is a set of retailer customer locations that need to be spatially intersected with demographic polygons for targeted advertisement (Sorry to send you all more junk mail :-).

This is a reference implementation, where both the points data and the polygon data are stored in raw text TSV format and the polygon geometries are in WKT format. Not the most efficient format, but at least the input is splittable for massive parallelization.

The feature class polygons can be converted to WKT using this ArcPy tool.

This Spark based job can be executed in local mode, or better in this docker container.

One of these days will have to re-implement the reading of the polygons from a binary source such as shapefiles or file geodatabases. Until then, you can download all the source code from here.

Wednesday, July 29, 2015

BigData Goes to Die on USB Drives

With the User Conference plenary behind me, I can catch up on my blog posts. I want to share something with you all. A BigData request pattern that I have been encountering a lot lately and how I've been responding to it.
See, people are accumulating lots of data and their traditional means to store and process (forget visualize) this data have been overwhelmed. So, they offload their "old" data into USB drives to make room for the new data. And IMHO BigData goes to die on USB drives, and that is a shame. And a lot of this offloading happens as CSV file exports. I wish they do the exports in something like Avro where data and schema is stored together.
Nevertheless, they now want to process all this data and I'm handed these drives with the statement “What can we do with these ?"
Lot of this data (and I'm talking millions and millions of records) has a spatial and temporal component and the questions to me (as I do geo) is more like “How can I filter, dissect and view on a map this data?"
Typically, these folks have virtualized environments on or off premise and can spin up a couple of Linux or Windows machines very quickly. Well...more than a couple, I usually request 4 to start. Once I have the machines' IPs, I install Hadoop using either Hortonworks or Cloudera depends on what the client has adopted and Elasticsearch.
From the Hadoop stack, I only install Zookeeper, YARN, HDFS and Spark. Some folks might wonder, why not use Solr since it is part of the distribution. I find ES easier to use and Hadoop is "temporary" as eventually, I just need Spark and Elasticsearch.
Then, I start transferring all the data off the USB drives to HDFS. See, HDFS gives me the parallel read that I need to massively bulk load the data using Spark into ES, where all the data is indexed in a spatial, temporal and attribute matter.
Once the data is in Elasticsearch, I can query it using ArcPy from ArcGIS Desktop or Server as a GeoProcessing extension or service.
If the resulting data is "small", then the GP returns a FeatureSet instance - however, that is not what is interesting in the data. What these folks want to see is how the data is clustered and what temporal and spatial patterns are been formed? What are the “life” patterns? What they are interested in is seeing the density, hotspots and more importantly the emerging hotpspots. That is where the Esri platform excels !
Like usual, all the source code to do this here. It is a bit crude, but it works.

Thursday, April 9, 2015

Calculating weighted variance using Spark

Spark provides a stat() function on a DoubleRDD to calculate in a robust way the count, mean and standard deviation of the double values. The inputs are unweighted (or all have a weight of 1), and I ran into a situation where I needed to perform the statistics on weighted values. The use case is to find the density area of locations with associated number of incidents. Here is the result on a map:
So, after re-reading the "Weighted Incremental Algorithm" section on wikipedia, I ripped the original StatCounter code and and implemented a WeightedStatCounter class that calculates the statistics for an RDD of WeightedValues. The above use case is based on the Standard Distance GeoProcessing function and generates a WKT Polygon that is converted to a Feature through an ArcPy Toolbox.

Works like a charm, and like usual, all the source code can be found here.

Monday, March 16, 2015

BigData, MemSQL and ArcGIS Interceptors

Last week, at the Developer Summit, we unveiled Server Object Interceptors. They have the same API as Server Object Extensions, and are intended to extend an ArcGIS Server with custom capabilities. An SOI intercepts REST and/or SOAP calls on a MapServer before and/or after it executes the operation on an SOE or SO. Think servlet filters.

A use case of an SOI associated with a published MXD is to intercept an export image operation on its MapService and digitally watermark the original resulting image. Another use case of an interceptor is to use the associated user credentials in the single-sign-on request to restrict the visibility of layers or data fields.

This is pretty neat and being the BigData Advocate, I started thinking how to use this interceptor in a BigData context. The stars could not have been more aligned than when I heard that the MemSQL folks have announced geospatial capabilities in their InMemory database.  See, I knew for a while that they were spitballing native geospatial types, but the fact that they showcased it at Strata + Hadoop World made me reach back to them to see how we can collaborate.
The idea is that since ArcGIS server does not natively support MemSQL, and since MemSQL natively supports the MySQL wire protocol,  I can use the MySQL JDBC driver to query MemSQL from an SOI and display the result in a map.
The good folks at MemSQL bootstrapped a set of AWS instances with their “new” engine and loaded the now-very-famous New York City taxis trips data. This (very very small) set consists of about 170 million records with geospatial and temporal information such as pickup and drop off locations and times.  Each trip has additional attributes such as travel times, distances and number of passengers. It was up to me now to query and display dynamically this information in a standard WebMap on every map pan and zoom. What do I mean by “standard” here, is that an out-of-the-box WebMap should be able to interact with this MemSQL database without being augmented with a new layer type or any other functionality. Thus the usage of an SOI. It will intercept the call to an export image operation with a map extent as an argument in a “stand-in” MapService and will execute a spatial MemSQL call on the AWS instances. The result set is drawn on an off-screen PNG image and is sent back to the requesting WebMap for display as a layer on a map.

Like usual, all the source code can be found here.

Tuesday, February 17, 2015

A Whale, an Elephant, a Dolphin and a Python walked into a bar....

This project started simply as an experiment in trying to execute a Spark job that writes to specific path locations based on partitioned key/value tuples. Once I figured out the usage of rdd.saveAsHadoopFile with a customized MultipleOutputFormat implementation and a customized RecordWriter, I was partitioning and shuffling data in all the right places.
Though I could read the content of a file in a path, I could not query selectively the content. So to query the data, I need to SQL map the content. Enter Hive.  It enables me to define a table that is externally mapped by partition to path locations. What makes Hive so neat is that schema is applied on read rather than on write, this is very unlike traditional RDBMS systems. Now, to execute HQL statements, I need a fast engine. Enter SparkSQL. It is such an active project, and with all the optimizations that can be applied to the engine, I think it will rival Impala and Hive on Tez !!
So I came to a point where I can query the data using SQL. But, what if the data becomes too big ? Enter HDFS.  So now, I need to run HDFS on my mac. I could download a bloated Hadoop distribution VM like Cloudera QuickStart or HortworkWorks Sandbox, but I just need HDFS (and maybe YARN :-) Enter Docker. Found the perfect Hadoop image from SequenceIQ that just runs HDFS and YARN on a single node. So now, with a small addition of a config file to my classpath, I can write the data into HDFS and since I have docker now, this enables me to move the Hive Metastore from the embedded Derby to an external RDBMS. Found a post that describes that and bootstrapped yet another container with a MySQL instance to house the Hive Metastore.
Seeing data streaming on the screen like in the Matrix is no fun for me - but placing that data on a map, now that is expressive and can tell a story.  Enter ArcMap (On the TODO list, is to use Pro). Using a Python Toolbox extension, I can include a library that can make me communicate with SparkSQL to query the data and turn it into a set of features on the map.

Wow...Here is what the "Zoo" looks like:


And like usual, all the source code and how to do this yourself is available here.