ETL

From WPRDC Wiki
Jump to navigation Jump to search

ETL overview

ETL (an acronym for "Extract-Transform-Load") describes a data process that obtains data from some source location, transforms it, and delivers it to some output destination.

Most WPRDC ETL processes are written in rocket-etl, an ETL framework customized for use with a) CKAN and b) the specific needs and uses of the Western Pennsylvania Regional Data Center open-data portal. It has been extended to allow the use of command-line parameters to (for instance) override the source and destination locations (pulling data instead from a local file or outputting data to a file, for the convenience of testing pipelines). It can pull data from web sites, FTP servers, GIS servers that use the data.json standard, and Google Cloud storage, and can deliver data either to the CKAN datastore or the CKAN filestore. It supports CKAN's Express Loader feature to allow faster loading of large data tables.

Some WPRDC ETL processes are still in an older framework; once they're all migrated over, it will be possible to extract a catalog of all ETL processes by parsing the job parameters in the files that represent the ETL jobs.

Getting data

Some of the sources we get data from:

  • FTP servers
  • APIs
    • Google Cloud infrastructure could count as an API
    • Some custom-built APIs by individual vendors
  • GIS servers
    • Historically this was done through CKAN's "Harvester" program.
    • Now we are switching to writing ETL code to analyze the data.json file and pull the desired files over HTTP.
  • Plain old web sites

Writing ETL jobs

A useful tool for writing ETL jobs is Little Lexicographer. While initially designed to just facilitate the writing of data dictionaries (by scanning each column and trying to determine the best type for it, then dumping field names and types into a data dictionary template), Little Lexicographer now also has the ability to output a proposed Marshmallow schema for a CSV file. Its type detection is not perfect, so manual review of the assigned types is necessary. Also, Little Lexicographer is often fooled by seemingly numeric values like ZIP codes; if a value is a code (like a ZIP code or a US Census tract), we treat it as a string. This is especially important in the case of codes that may have leading zeros that would be lost if the value were cast to an integer.

Schema design

After running Little Lexicographer on the source file you want to write an ETL job for and reviewing the proposed schema types for correctness, review the column names.

  1. Make column names clear. If you don't understand the meaning of the column from reading the column name and looking at sample values, figure out the column (by reading the data dictionary and documentation or asking someone closer to the source of the data) and then give it a meaningful name.
  2. Use snake case. Whenever possible, format column names in snake case. This means you should convert everything to lower case and change all spaces and punctuation to underscores (so FIELD NAME becomes field_name and # of pirates should be changed to number_of_pirates). Reasons we prefer snake case: a) Marshmallow already converts field names to snake case to some extent automatically. b) Snake case field names do not need to be quoted or escaped in PostgreSQL queries (making queries of the CKAN datastore easier).
  3. Standardize column names. Choose names that are already in use in other data tables published by the same publisher. For instance, if the source data calls the geocoordinates y and x (or lat and long) but latitude and longitude are already being used by other data tables, switch to latitude and longitude.
  4. Standardize column values. Where possible transform columns to standardize their values. The first step is to look at the histogram of every column (Shift+F in VisiData!) and see if anything is irregular. For instance, if the municipality column has 1038 records with municipality == "Pittsburgh" and two with municipality == "PGH", add to the schema a @pre_dump decorator function to change all instances of "PGH" to "Pittsburgh". In some cases, just converting an address field to upper case will go a long way toward standardizing it. You can think of this step as pre-cleaning the data. The Holy Grail of column standardization would be using the same values in every identically named column across the entire data portal. Maybe someday!
  5. Organize the column names. Often the source file comes with some record IDs on the left, followed by some highly relevant fields (e.g., names of things), but then the rest of the columns may be semirandomly ordered. Principles of column organization: a) The "input" should be on the left and the "output" should be on the right. Which fields is the user likeliest to use to look up a record (like you would look up a word in a dictionary)? Put those furthest to the left (or, at the top of the schema). Primary keys and unique identifiers should go on the far left. Things like the results of inspections are closer to outputs, and should be moved to the right. b) Group similar fields together. Obviously street address, city, state, and ZIP code should be grouped together and presented in the canonical order. This principle also applies to lists of geographic regions and other features. c) Prioritize important stuff. If there are fields you think are likely to be of most interest to the user, shift them as far left as you can (subject to other constraints). The further left the field is, the better chance the user will be able to see it in the Data Tables view (or their tabular data explorer of choice). d) Maximize readability. Think like a user. How can you order the columns so that the sequence is logical?

After writing this section, I discovered that some of the ideas above also appear in the Urban Institute's Do No Harm Guide: Applying Equity Awareness in Data Visualization. While it is focussed on visualizations, some of its suggestions can be applied to designing data schemas (and especially if your data table includes representations of race/ethnicity). The General Recommendations on page 41 provide a good overview.

Pitfalls

  • The byte-order mark showing up at the beginning of the first field name in your file. Excel seems to add this character by default (unless the user tells it not to). As usual, the moral of the story is "Never use Excel".
  • Using a local timestamp instead of a UTC timestamp as a primary key often leads to problems. Because of Daylight Savings Time, one day each year (prior to 2023) in a series of hourly local timestamps skips an hour and another day (prior to 2022) has the same local timestamp twice. The general advice is to store (and publish) both the UTC timestamp and the local timestamp. We use the UTC timestamp for primary keys and other data operations, but also publish the local timestamp to make it easier for the user to understand the data.

Testing ETL jobs

Typical initial tests of a rocket-etl job can be invoked like this:

> python launchpad.py engine/payload/<name for publisher/project>/<script name>.py mute to_file

where the mute parameter prevents errors from being sent to the "etl-hell" Slack channel and the to_file parameter writes the output to the default location for the job in question. For instance, the job

> python launchpad.py engine/payload/robopgh/census.py mute to_file

would write its output to a file in the directory <PATH TO rocket-etl>/output_files/robopgh/. Note that the namespacing convention routes the output of robopgh jobs to a different directory than that of wormpgh jobs, but if there were two jobs in the robopgh payload folder that write to population.csv, each job would overwrite the output of the other. As this namespacing is for the convenience of testing and development, this level of collision avoidance seems sufficient for now. It's always possible to alter the default output file name by specifying the 'destination_file' parameter in the dict of parameters that define the job (found in, for instance, robopgh/census.py file).

After running the job, examine the output. VisiData is an excellent tool for rapidly examining and navigating CSV files. As a first step, it's a good idea to go through each column in the output and make sure that the results make sense. Often this can be done by opening the file in VisiData (> vd output_files/robopgh/population.csv) and invoking Shift+F on each column to calculate the histogram of its values. This is a quick way to catch empty columns (which is either a sign that the source file has only null values in it or that there's an error in your ETL code, often because there's a typo in the name of the field you're trying to load from... How marshmallow transforms the field names can often be non-intuitive.).

Try to understand what the records in the data represent. Are there any transformations that could be made to help the user understand the data?

Does the set of data as a whole make sense? For instance, look at counts over time (either by grouping records by year+month and aggregating to counts than you can visually scan or by plotting record counts by date or timestamp).

Are the field names clear? If not, change them to something clearer. Are they unreasonably long when a shorter name would do? Shorten them to something that is still clear.

If you can't figure out something about the data, ask someone else and/or the publisher.

Once you're satisfied with the output data you're getting, you can rerun the job with the test parameter to push the resulting output to the default testbed dataset (a private dataset used for testing ETL jobs):

> python launchpad.py engine/payload/robopgh/census.py mute test

Development instances of rocket-etl should be configured to load to this testbed dataset by default (that is, even if the test parameter is not specified) as a safety feature. The parameter that controls this setting is PRODUCTION, which can be found in the engine/parameters/local_parameters.py file and which should be defined like this: PRODUCTION = False

Only in production environments should PRODUCTION be set to True.

In a development environment, to run an ETL job and push the results to the production version of the dataset, do this: > python launchpad.py engine/payload/robopgh/census.py mute production

Schema-source comparison

When running ETL jobs, you will sometimes see console output indicating that 1) fields in the source file are not being used in the schema or 2) fields in the schema cannot be found in the source file. These are checks to ensure that the schema matches and accounts for all the fields in the source file. (While marshmallow does have some support for some operations, we've written our own code for handling these comparisons.)

If there's a field in the source file that you don't want to publish (and you don't want to keep getting console output about it), you can either list it in the exclude option, in the schema's Meta class, or you can add the field to the schema, but set it to load_only=True.

If there's a field in the schema that is supposed to be published ("dumped" in Marshmallow jargon) and is supposed to be loaded from the source file, but it can't be found in the source file, an error message will be printed to the console. If additionally the job is trying to push data to the CKAN datastore, an exception will be raised.

These checks are really helpful when writing/testing/modifying an ETL job, as they make it easy to find typos in field names or other errors that are preventing source data from getting to the output correctly.

Deploying ETL jobs

Once tested, an ETL job can be deployed by 1) moving the source code for the ETL job to a production server and 2) scheduling the job to run automatically.

Assuming that you are developing the ETL job on a separate computer and in a dev branch of rocket-etl, this is a typical deployment workflow:

  1. Use > git add -p to construct atomic commits (each of which should thematically cluster changes) and > git commit -m "<Meangingful commit description>") to commit them. Repeat until all the code that needs to be deployed has been committed. If you need to add a new file (like "sky_maintenance.py"), try > git add sky_maintenance.py and > git commit -m "Add ETL job for sky-maintenance data".
  2. If you have any other changes to your dev branch that aren't ready for deployment, type > git stash save to temporarily stash those changes (so you can switch to the master branch).
  3. > git checkout master lets you switch to the master branch.
  4. > git merge dev merges the changes committed to the dev branch into the master branch.
  5. Push the changes to GitHub: > git push
  6. Switch back to the dev branch: > git checkout dev
  7. Restore the stashed code: > git stash pop
  8. Shell into the production server with ssh.
  9. Navigate to wherever the rocket-etl directory is.
  10. Pull the changes from GitHub: > git pull
  11. At this point, it's usually best to test the ETL job to make sure it will work in the production environment. Either the test or to_file command-line parameters can be used if you're not ready to publish data to the production dataset. Failure at this stage usually means that some code or parameter that was supposed to be committed to the git repository didn't get committed or is not defined on the production server.
  12. Schedule the job by writing a cron job: > crontab -e + duplicate a launchpad line that's already in the crontab file + edit it to run the new ETL job and edit the schedule to match the desired ETL schedule.

Retiring ETL jobs

Every ETL job has a life cycle. When an ETL job comes to the end of its existence because the data source has disappeared, we designate this dataset "orphaned". Metadata values for the update frequency fields should be changed to the value that has "Historical" in its description. The dataset's "_etl" tag should be removed and replaced with the "_orphaned_etl" tag. If there is still a manual inventory of ETL jobs, that inventory should be updated.

Inventorying ETL jobs

When you add, delete, or modify ETL jobs, update this Google Sheet. (Eventually, we'll make ETL jobs self-tracking somehow, probably by adding the location of the script (and any other useful metadata) to the package extras metadata field.)

Currently ETL jobs are reporting as metadata things like last_etl_update date and the source file hash. Even the last source file name is being used in the new Data Rivers/Google Cloud Platform to check whether a given file is newer or older than the last one used.