There has been a lot of research on performing spatial joins using Hadoop MapReduce on BigData, specially when both sets are very big. A notable one is the Spatial Hadoop project at the University of Minnesota. This post is derived from that body of work and uses the Esri Geometry API in the GIS Tools for Hadoop project to perform the spatial operations in the map and reduce phases.
Lexicographical join of native types (numerical,textural) in BigData can be performed in the map phase or in the reduce phase depending on the size of the input data and how much memory the mapper or reducer has access to. Actually, Hadoop provides a join framework package as this is a common pattern. The same can be applied for spatially joining two big data sets. It can be performed in the mapper phase or the reducer phase depending on the size of the data and how much memory each phase has access to.
Let's start with a map phase spatial join. Let's say you have a billion point records and you have the polygon areas of the US zip codes, and the task at hand is to find the count of points per zip code. Since the US zip code feature class is a 'small' set (~41,700), it can be fully loaded into each mapper memory space at start up. In addition, it can be in-memory spatially indexed using the API QuadTree for quick look up based on the envelope of each feature. The GeometryEngine comes with a handy 'contains' method that can further refine the spatial constraint. The source of the zip code feature class can be from the DistributedCache or from HDFS. In addition, it can be in the Esri JSON format or GeoJSON or in the OGC format . The GeometryEngine API can parse all these formats into Geometry POJOs. As the mapper is reading each point feature, it locates the zip code polygon where the point falls into and emits the zip code to the reducer. The reducer task is to sum the emitted values per zip code.
Now, let's say that you have a billion polygons that you want to spatially join with yet another billion polygons and return the intersection set. How do you proceed with doing that in MapReduce when clearly a billion features cannot all fit in the mapper space (at least not in a typical node that I have access to :-). This is where we take advantage of the shuffle and sort phase of Hadoop's MapReduce implementation to partition and group all the "similar" elements, in such that the "reduced" set can be held in memory for processing. In our case that similarity is based on spatial grouping. So, in the map phase, the extent that the two polygon sets occupy is projected onto a grid with predefined cell sizes. The minimum bounding rectangle (MBR) of the input polygon from each set is overlaid onto the grid. The overlapping cells are emitted as keys whose value is a tuple consisting of an input source reference and the polygon shape. So now, all the polygons that overlap a specific cell are sent to the same reducer. In the reduce phase, we know the cell that we are operating over as this is derived from the key, we divide the list of values based on their source into separate lists where we can now perform a spatial cartesian product from one list to the other. Actually, one of the lists can be spatially indexed for fast lookup. Since an input polygon MBR can straddle multiple cells, there is a trick to not dispatch multiple times the spatial join by different reducers. It is best explained using the below picture.
Given two MBRs of polygons A and B, and given a grid partitioning of depth 1 with 4 quads, we can see that A and B overlap the top two quads (1,0) (1,1). So the mapper will emit values (A,B) for Quad (1,0) and (1,1) as keys. In the reducer for key (1,0) we have a reference to A and B and we calculate the lower left corner of the intersection of A and B MBRs (red dot). Because the intersection point falls into the Quad (1,0), we can then proceed with the spatial join and emit the result. Now, in the reducer for key (1,1), we can see that the intersection point does _not_ fall into quad (1,1) indicating that the spatial join was performed or will be performed by a reducer for another quad, thus any further processing is stopped and nothing is emitted.
Neat trick. However, the question at hand is "What is the 'correct' grid size ?" This takes me back to my days when I was an SDE specialist and had to tweak the spatial grid sizes and levels of a layer for best performance based on the layer content and usage. This will be a nice follow on post on writing an MR preprocessing task to scan through the static data and 'recommend' a value. In addition, the current implementation is assuming that you will want to spatially join _all_ the polygons in the full extent. What is missing is a spatial index to enable an initial cookie cutting of a region to operate over. This is what the above project does. However, it assumes the data is static and can be migrated into a new spatially optimized layout. Yet another post on how to do that on dynamic existing data.
Like usual all the source code is available here.
Lexicographical join of native types (numerical,textural) in BigData can be performed in the map phase or in the reduce phase depending on the size of the input data and how much memory the mapper or reducer has access to. Actually, Hadoop provides a join framework package as this is a common pattern. The same can be applied for spatially joining two big data sets. It can be performed in the mapper phase or the reducer phase depending on the size of the data and how much memory each phase has access to.
Let's start with a map phase spatial join. Let's say you have a billion point records and you have the polygon areas of the US zip codes, and the task at hand is to find the count of points per zip code. Since the US zip code feature class is a 'small' set (~41,700), it can be fully loaded into each mapper memory space at start up. In addition, it can be in-memory spatially indexed using the API QuadTree for quick look up based on the envelope of each feature. The GeometryEngine comes with a handy 'contains' method that can further refine the spatial constraint. The source of the zip code feature class can be from the DistributedCache or from HDFS. In addition, it can be in the Esri JSON format or GeoJSON or in the OGC format . The GeometryEngine API can parse all these formats into Geometry POJOs. As the mapper is reading each point feature, it locates the zip code polygon where the point falls into and emits the zip code to the reducer. The reducer task is to sum the emitted values per zip code.
Now, let's say that you have a billion polygons that you want to spatially join with yet another billion polygons and return the intersection set. How do you proceed with doing that in MapReduce when clearly a billion features cannot all fit in the mapper space (at least not in a typical node that I have access to :-). This is where we take advantage of the shuffle and sort phase of Hadoop's MapReduce implementation to partition and group all the "similar" elements, in such that the "reduced" set can be held in memory for processing. In our case that similarity is based on spatial grouping. So, in the map phase, the extent that the two polygon sets occupy is projected onto a grid with predefined cell sizes. The minimum bounding rectangle (MBR) of the input polygon from each set is overlaid onto the grid. The overlapping cells are emitted as keys whose value is a tuple consisting of an input source reference and the polygon shape. So now, all the polygons that overlap a specific cell are sent to the same reducer. In the reduce phase, we know the cell that we are operating over as this is derived from the key, we divide the list of values based on their source into separate lists where we can now perform a spatial cartesian product from one list to the other. Actually, one of the lists can be spatially indexed for fast lookup. Since an input polygon MBR can straddle multiple cells, there is a trick to not dispatch multiple times the spatial join by different reducers. It is best explained using the below picture.
Given two MBRs of polygons A and B, and given a grid partitioning of depth 1 with 4 quads, we can see that A and B overlap the top two quads (1,0) (1,1). So the mapper will emit values (A,B) for Quad (1,0) and (1,1) as keys. In the reducer for key (1,0) we have a reference to A and B and we calculate the lower left corner of the intersection of A and B MBRs (red dot). Because the intersection point falls into the Quad (1,0), we can then proceed with the spatial join and emit the result. Now, in the reducer for key (1,1), we can see that the intersection point does _not_ fall into quad (1,1) indicating that the spatial join was performed or will be performed by a reducer for another quad, thus any further processing is stopped and nothing is emitted.
Neat trick. However, the question at hand is "What is the 'correct' grid size ?" This takes me back to my days when I was an SDE specialist and had to tweak the spatial grid sizes and levels of a layer for best performance based on the layer content and usage. This will be a nice follow on post on writing an MR preprocessing task to scan through the static data and 'recommend' a value. In addition, the current implementation is assuming that you will want to spatially join _all_ the polygons in the full extent. What is missing is a spatial index to enable an initial cookie cutting of a region to operate over. This is what the above project does. However, it assumes the data is static and can be migrated into a new spatially optimized layout. Yet another post on how to do that on dynamic existing data.
Like usual all the source code is available here.
No comments:
Post a Comment