ETL
ETL overview
ETL (an acronym for "Extract-Transform-Load") describes a data process that obtains data from some source location, transforms it, and delivers it to some output destination.
Most WPRDC ETL processes are written in rocket-etl, an ETL framework customized for use with a) CKAN and b) the specific needs and uses of the Western Pennsylvania Regional Data Center open-data portal. It has been extended to allow the use of command-line parameters to (for instance) override the source and destination locations (pulling data instead from a local file or outputting data to a file, for the convenience of testing pipelines). It can pull data from web sites, FTP servers, GIS servers that use the data.json standard, and Google Cloud storage, and can deliver data either to the CKAN datastore or the CKAN filestore. It supports CKAN's Express Loader feature to allow faster loading of large data tables.
Some WPRDC ETL processes are still in an older framework; once they're all migrated over, it will be possible to extract a catalog of all ETL processes by parsing the job parameters in the files that represent the ETL jobs.
Getting data
Some of the sources we get data from:
- FTP servers
- Here's the API documentation for the MOVEit FTP server
- APIs
- Google Cloud infrastructure could count as an API
- Some custom-built APIs by individual vendors
- GIS servers
- Historically this was done through CKAN's "Harvester" program.
- Now we are switching to writing ETL code to analyze the data.json file and pull the desired files over HTTP.
- Plain old web sites
Writing ETL jobs
A useful tool for writing ETL jobs is Little Lexicographer. While initially designed to just facilitate the writing of data dictionaries (by scanning each column and trying to determine the best type for it, then dumping field names and types into a data dictionary template), Little Lexicographer now also has the ability to output a proposed Marshmallow schema for a CSV file. Its type detection is not perfect, so manual review of the assigned types is necessary. Also, Little Lexicographer is often fooled by seemingly numeric values like ZIP codes; if a value is a code (like a ZIP code or a US Census tract), we treat it as a string. This is especially important in the case of codes that may have leading zeros that would be lost if the value were cast to an integer.
Schema design
After running Little Lexicographer on the source file you want to write an ETL job for and reviewing the proposed schema types for correctness, review the column names.
- Make column names clear. If you don't understand the meaning of the column from reading the column name and looking at sample values, figure out the column (by reading the data dictionary and documentation or asking someone closer to the source of the data) and then give it a meaningful name.
- Use snake case. Whenever possible, format column names in snake case. This means you should convert everything to lower case and change all spaces and punctuation to underscores (so
FIELD NAME
becomesfield_name
and# of pirates
should be changed tonumber_of_pirates
). Reasons we prefer snake case: a) Marshmallow already converts field names to snake case to some extent automatically. b) Snake case field names do not need to be quoted or escaped in PostgreSQL queries (making queries of the CKAN datastore easier). - Standardize column names. Choose names that are already in use in other data tables published by the same publisher. For instance, if the source data calls the geocoordinates
y
andx
(orlat
andlong
) butlatitude
andlongitude
are already being used by other data tables, switch tolatitude
andlongitude
. - Standardize column values. Where possible transform columns to standardize their values. The first step is to look at the histogram of every column (
Shift+F
in VisiData!) and see if anything is irregular. For instance, if themunicipality
column has 1038 records withmunicipality
== "Pittsburgh" and two withmunicipality
== "PGH", add to the schema a @pre_dump decorator function to change all instances of "PGH" to "Pittsburgh". In some cases, just converting an address field to upper case will go a long way toward standardizing it. You can think of this step as pre-cleaning the data. The Holy Grail of column standardization would be using the same values in every identically named column across the entire data portal. Maybe someday! - Organize the column names. Often the source file comes with some record IDs on the left, followed by some highly relevant fields (e.g., names of things), but then the rest of the columns may be semirandomly ordered. Principles of column organization: a) The "input" should be on the left and the "output" should be on the right. Which fields is the user likeliest to use to look up a record (like you would look up a word in a dictionary)? Put those furthest to the left (or, at the top of the schema). Primary keys and unique identifiers should go on the far left. Things like the results of inspections are closer to outputs, and should be moved to the right. b) Prioritize important stuff. If there are fields you think are likely to be of most interest to the user, shift them as far left as you can (subject to other constraints). The further left the field is, the better chance the user will be able to see it in the Data Tables view (or their tabular data explorer of choice). c) Group similar fields together. Obviously street address, city, state, and ZIP code should be grouped together and presented in the canonical order. This principle also applies to lists of geographic regions and other features. d) Maximize readability. Think like a user. How can you order the columns so that the sequence is logical?
Pitfalls
- The byte-order mark showing up at the beginning of the first field name in your file. Excel seems to add this character by default (unless the user tells it not to). As usual, the moral of the story is "Never use Excel".
- Using a local timestamp instead of a UTC timestamp as a primary key often leads to problems. Because of Daylight Savings Time, one day each year (prior to 2023) in a series of hourly local timestamps skips an hour and another day (prior to 2022) has the same local timestamp twice. The general advice is to store (and publish) both the UTC timestamp and the local timestamp. We use the UTC timestamp for primary keys and other data operations, but also publish the local timestamp to make it easier for the user to understand the data.
Testing ETL jobs
Typical initial tests of a rocket-etl job can be invoked like this:
> python launchpad.py engine/payload/<name for publisher/project>/<script name>.py mute to_file
where the mute
parameter prevents errors from being sent to the "etl-hell" Slack channel and the to_file parameter writes the output to the default location for the job in question. For instance, the job
> python launchpad.py engine/payload/robopgh/census.py mute to_file
would write its output to a file in the directory <PATH TO rocket-etl>/output_files/robopgh/
. Note that the namespacing convention routes the output of robopgh
jobs to a different directory than that of wormpgh
jobs, but if there were two jobs in the robopgh
payload folder that write to population.csv
, each job would overwrite the output of the other. As this namespacing is for the convenience of testing and development, this level of collision avoidance seems sufficient for now. It's always possible to alter the default output file name by specifying the 'destination_file' parameter in the dict of parameters that define the job (found in, for instance, robopgh/census.py
file).
After running the job, examine the output. VisiData is an excellent tool for rapidly examining and navigating CSV files. As a first step, it's a good idea to go through each column in the output and make sure that the results make sense. Often this can be done by opening the file in VisiData (> vd output_files/robopgh/population.csv
) and invoking Shift+F
on each column to calculate the histogram of its values. This is a quick way to catch empty columns (which is either a sign that the source file has only null values in it or that there's an error in your ETL code, often because there's a typo in the name of the field you're trying to load from... How marshmallow transforms the field names can often be non-intuitive.).
Try to understand what the records in the data represent. Are there any transformations that could be made to help the user understand the data?
Does the set of data as a whole make sense? For instance, look at counts over time (either by grouping records by year+month and aggregating to counts than you can visually scan or by plotting record counts by date or timestamp).
Are the field names clear? If not, change them to something clearer. Are they unreasonably long when a shorter name would do? Shorten them to something that is still clear.
If you can't figure out something about the data, ask someone else and/or the publisher.
Once you're satisfied with the output data you're getting, you can rerun the job with the test
parameter to push the resulting output to the default testbed dataset (a private dataset used for testing ETL jobs):
> python launchpad.py engine/payload/robopgh/census.py mute test
Development instances of rocket-etl should be configured to load to this testbed dataset by default (that is, even if the test
parameter is not specified) as a safety feature. The parameter that controls this setting is PRODUCTION
, which can be found in the engine/parameters/local_parameters.py
file and which should be defined like this:
PRODUCTION = False
Only in production environments should PRODUCTION
be set to True
.
In a development environment, to run an ETL job and push the results to the production version of the dataset, do this:
> python launchpad.py engine/payload/robopgh/census.py mute production
Deploying ETL jobs
Once tested, an ETL job can be deployed by 1) moving the source code for the ETL job to a production server and 2) scheduling the job to run automatically.
Assuming that you are developing the ETL job on a separate computer and in a dev branch of rocket-etl
, this is a typical deployment workflow:
- Use
> git add -p
to construct atomic commits (each of which should thematically cluster changes) and> git commit -m "<Meangingful commit description>")
to commit them. Repeat until all the code that needs to be deployed has been committed. If you need to add a new file (like "sky_maintenance.py"), try> git add sky_maintenance.py
and> git commit -m "Add ETL job for sky-maintenance data"
. - If you have any other changes to your dev branch that aren't ready for deployment, type
> git stash save
to temporarily stash those changes (so you can switch to themaster
branch). > git checkout master
lets you switch to themaster
branch.> git merge dev
merges the changes committed to thedev
branch into themaster
branch.- Push the changes to GitHub:
> git push
- Switch back to the
dev
branch:> git checkout dev
- Restore the stashed code:
> git stash pop
- Shell into the production server with
ssh
. - Navigate to wherever the
rocket-etl
directory is. - Pull the changes from GitHub:
> git pull
- At this point, it's usually best to test the ETL job to make sure it will work in the production environment. Either the
test
orto_file
command-line parameters can be used if you're not ready to publish data to the production dataset. Failure at this stage usually means that some code or parameter that was supposed to be committed to the git repository didn't get committed or is not defined on the production server. - Schedule the job by writing a cron job:
> crontab -e
+ duplicate a launchpad line that's already in the crontab file + edit it to run the new ETL job and edit the schedule to match the desired ETL schedule.