Snapshots

What snapshotting is, and how to use it

What is snapshotting?

hotglue's transformation layer allows you to save and persist data across sync jobs for every tenant. This feature is called snapshotting. Some use cases include:

  • storing some metadata about the tenant necessary for the transformation script to run (some sort of mapping, an API key/identifier, etc.)
  • persisting a state of the data that this tenant has already synced. For example, you could keep a full copy of all the data the tenant has synced to detect which records are new vs updated.
  • allowing different integrations to access information about the tenant. For example, you could correlate data across Salesforce and Quickbooks to generate some unified output.

How to use snapshots?

Transformation Script

Note: this guide assumes you've read the writing a basic script of the docs.

As described in the writing a basic script section, one of the standard directories for hotglue transformation scripts is the snapshots directory. Any data saved to that directory will be persisted across job runs.

Let's walk through a quick sample using pandas:

import os
import pandas as pd

def get_snapshot(snapshot_dir, stream):
    snap_path = f"{snapshot_dir}/{stream}.snapshot.csv"

    # Ensure file exists
    if os.path.isfile(snap_path) is False:
        return None

    # Read the old snapshot, if present
    snap_df = pd.read_csv(snap_path)

    return snap_df


def update_snapshot(snapshot_dir, stream, key, data_df, persist=True, override=False,
                    drop_column=None, sort_columns=None, drop_duplicates=True):
    snap_df = data_df

    if drop_duplicates:
        snap_df = snap_df.drop_duplicates(key, keep="last")

    if not override:
        # Get the old snapshot dataframe
        psnap_df = get_snapshot(snapshot_dir, stream)

        if psnap_df is not None:
            # Combine with prior snapshot
            snap_df = snap_df.set_index(
                key).combine_first(psnap_df.set_index(key))
            snap_df = snap_df.reset_index()
    if drop_column is not None:
        snap_df = snap_df.drop(columns=drop_column, errors='ignore')
    if persist:
        # Save this snapshot in correct spot
        snap_path = f"{snapshot_dir}/{stream}.snapshot.csv"
        snap_df.to_csv(snap_path, index=False)

    if sort_columns is not None:
        snap_df.sort_values(by=sort_columns)

    return snap_df

The file above defines two functions:

  • update_snapshot is designed to take data new data from a job sync, and append it to any existing snapshot. This will ensure every job run has access to the full history of data that has been synced by this tenant
  • get_snapshot reads the snapshot directory to get the snapshot for the stream id passed (you can think of a stream as the name of a table)

Using the above, we could do the following in our etl.ipynb to generate a snapshot of all the Account data from an integration (like Quickbooks):

import gluestick as gs
import pandas as pd
import os

from lib import util

# standard directory for hotglue
ROOT_DIR = os.environ.get("ROOT_DIR", ".")
INPUT_DIR = f"{ROOT_DIR}/sync-output"
OUTPUT_DIR = f"{ROOT_DIR}/etl-output"
SNAPSHOT_DIR = f"{ROOT_DIR}/snapshots"

# Read input data
input_data = gs.read_csv_folder(INPUT_DIR)

# Create a snapshot of the Account stream if it is available
if input_data.get("Account") is not None:
    util.update_snapshot(
        SNAPSHOT_DIR, 
        "Account",
        ['Id'], 
        input_data["Account"]
     )

# Get Account snapshot data
accounts = util.get_snapshot(SNAPSHOT_DIR, "Account")

That's all there is to it!

API

Additionally, you can modify the snapshots on a tenant level via the API. For example, you could save a tenant config.json in the snapshot to store some metadata such as an API key programmatically.


Did this page help you?