Snowflake and Geospatial

I was working on proof of concept to migrate from RDBMS data warehouse to Snowflake data warehouse and one of the requirements we had was to support goespatial during ETL.

Snowflake doesn't support geospatial and our requirements were

  • find distance between 2 points
  • find country/state info for a given point
  • find timezone for a given point.

No lookup for street addresses was required.

Number of records to be processed daily are in millions

Input  : Records in stage table
Table structure

  • Id
  • EventDatetime
  • Lat
  • Lon

below is the design and approach I used to meet the above requirements

Requirement : Calculate distance and find US state for each record based on lat/lon

All record that needs to be processed were loaded into a stage table called eventStage. For POC purpose, table has 53 millions records.

Calculate distance

To calculate the distance , I used snowflake function  HAVERSINE which can calculate the distance between 2 points. Below is the code.

drop table eventDistanceTmp ;

    create temp table eventDistanceTmp as
    select Id, EventDatetime, HAVERSINE(toLat, toLon,fromLat,fromLon ) as distance_traveled
    FROM(
               SELECT id, EventDatetime, e.lat as toLat,e.lon as toLon,
                               LAG(lon,1) OVER (PARTITION BY er_id ORDER by EventDatetime) as fromLon,
                             LAG(lat,1) OVER (PARTITION BY er_id ORDER by EventDatetime) as fromLat
               FROM eventStage e
               WHERE lon <>0
             )

I'm filtering out records which didn't had a valid lat/lon. In my table invalid lat/lon was identified by 0.

Now we have the distance.
On a cluster of single node of X small machine, it took less than 1 minutes for above operation. WOW..  You can use update statement to update the eventStage table using eventDistanceTmp.

Find US state:
Next step would be to find the US state. Since Snowflake doesn't support geospatial, we have to perform next operation outside of snowflake.

There are multiple ways we can find US state for each lat/lon .

  • Postgis database: install Postgres and PostGIS database locally and run PostGIS queries against it. 
  • Making API call to Google/Bing/ Tiger/etc to get the geolocation.. There is price associated to it. Also you have to make call for each lat/lon and in this case it 53 millions calls.
  • Python.
  • Other services 

I choose python Geopandas as there were many library available.

Approach
Python installed on a local machine and we will call this as ETL machine. EC2 machine can also be used.
From ETL machine, we will pull record from Snowflake table using  Snowflake python connector in batches, process it  and writeback the results to a file. Then use copy command to push back the data to stage table and from there we use merge statement to update the target table.

Why batches?
If we do a geo lookup for all records in one go then there will be performance challenges..Geospaitial lookup is expensive operation.

Since we are pulling records in batches, we need to add a row number to each record.  To assign row number to each row we have to use analytical function called Row_Number()

We will create a table which will have records for which location needs to be determined.

create table eventLatLon AS 
select id, LAT, LON, row_number() over (order by EventDatetime ) as rnumber 
FROM eventStage 
WHERE LON <>0;

I had python already installed on ETL machine.

Created a virtual environment and install geopandas using conda on ETL machine.

I have used below link to install geopandas.

https://anaconda.org/conda-forge/geopandas

Downloaded US maps from

https://www.census.gov/geo/maps-data/data/cbf/cbf_state.html


Downloaded snowflake python connector from Snowflake website.


here is the python code.

import snowflake.connector
from shapely.geometry import Point
import geopandas.tools
import geopandas as gpd
import pandas
from datetime import date, datetime, time
ctx = snowflake.connector.connect(user='u123',password='abc123',account='acc123',)
cs = ctx.cursor()
cs.execute("use role accountadmin")
cs.execute("use database test")
cs.execute("use schema test")

try:
    cnt = cs.execute("select count(1) from eventLatLon").fetchone()
    print(datetime.today())
    i=0
    while i<= int(cnt[0]):
        j=i+1
        k=i+1000000
        print("variable are %s, %s", j,k)
        results = cs.execute("SELECT id, lat, lon FROM eventLatLon where rnumber  between "+str(j)+ " and "+str(k)).fetchall()
        print(datetime.today())
        raw_data = list(results)
        points = pandas.DataFrame(raw_data, columns=["id", "latitude", "longitude"])
        points["geometry"] = points.apply(lambda row: Point(row["longitude"], row["latitude"]), axis=1)
        del(points["latitude"], points["longitude"])
        points = geopandas.GeoDataFrame(points, geometry="geometry")
        fp = "C:\Suraj\Python\geospatial\data\Data\Data\cb_2016_us_state_500k.shp"
        data = gpd.read_file(fp)
        selected_cols = ['NAME', 'geometry']
        data = data[selected_cols]
        popt_wgs84 = data.to_crs({'init': 'epsg:4326'})
        points.crs = popt_wgs84.crs
        result = geopandas.tools.sjoin(popt_wgs84,points, how="inner", op="intersects")
        print(result)
        selected_cols = ['NAME', 'id']
        result = result[selected_cols]
        print(datetime.today())
        result.to_csv(r'c:\Suraj\Python\pandas'+str(i)+'.txt', header=None, index=None, sep=' ')
        print(datetime.today())
        i=i+1000000
finally:
    cs.close()


It took an hour to process all 53 millions records... There are multiple ways to speed up the processes like having good configuration ETL machine, making the process distributed or using different technology like spark.

Once the files are created, then it's snowflake vanilla feature to copy files from ETL machine to stage area, and then use merge statement to copy the required columns to target table.


This finding US state approach can be used to find TimeZone for a point. There are shapefiles available for world TimeZone also.

Hope you find this article helpful.

















Comments


  1. Nice information, this is will helpfull a lot, Thank for sharing, Keep do posting i like to follow this informatica online training
    informatica online course
    informatica bdm training

    ReplyDelete
  2. Pretty section of content. I just stumbled upon your website and in accession capital to assert that I get actually enjoyed account your blog posts. Anyway I will be subscribing to your augment and even I achievement you access consistently rapidly.

    ReplyDelete
  3. Snowflake is an analytical database that runs on AWS and enables users to work with multiple data sources. TheSnowflake data warehouseserviceis provided by the cloud-based analytics company Snowflake Computing. The cloud service is offered through a Software as a service (SaaS) licensing model. The cloud-based service is supported by a client-server API, and it provides data processing and data storage.

    ReplyDelete

Post a Comment

Popular posts from this blog

Installing Higher Python Versions on AWS Cloud9 and Creating Lambda Layers for Snowflake Connector