How to Build a Fully Automated Data Drift Detection Pipeline

An Automate Guide to Detect and Handle Data Drift

Khuyen Tran
Towards Data Science

--

Image by Author

Motivation

Data drift occurs when the distribution of input features in the production environment differs from the training data, leading to potential inaccuracies and decreased model performance.

Image by Author

To mitigate the impact of data drift on model performance, we can design a workflow that detects drift, notifies the data team, and triggers model retraining.

Image by Author

Workflows

The workflow comprises the following tasks:

  1. Fetch reference data from the Postgres database.
  2. Get the current production data from the web.
  3. Detect data drift by comparing the reference and current data.
  4. Append the current data to the existing Postgres database.
  5. When there is data drift, the following actions are taken:
  • Send a Slack message to alert the data team.
  • Retrain the model to update its performance.
  • Push the updated model to S3 for storage.

This workflow is scheduled to run at specific times, such as 11:00 AM every Monday.

Image by Author

Overall, the workflow includes two types of tasks: data science and data engineering tasks.

Data science tasks, represented by pink boxes, are performed by data scientists and involve data drift detection, data processing, and model training.

Data engineering tasks, represented by blue and purple boxes, are performed by data engineers and involve tasks related to data movement and sending notifications.

Data Science Tasks

Detect Data Drift

To detect data drift, we will create a Python script that takes two CSV files “data/reference.csv” (reference data) and “data/current.csv” (current data).

Image by Author

We will use Evidently, an open-source ML observability platform, to compare the reference data, serving as a baseline, with the current production data.

If dataset drift is detected, the “drift_detected” output will be True; otherwise, it will be False.

from evidently.metric_preset import DataDriftPreset
from evidently.report import Report
from kestra import Kestra


data_drift_report = Report(metrics=[DataDriftPreset()])
data_drift_report.run(reference_data=reference, current_data=current)
report = data_drift_report.as_dict()
drift_detected = report["metrics"][0]["result"]["dataset_drift"]

if drift_detected:
print("Detect dataset drift")
else:
print("Detect no dataset drift")

Kestra.outputs({"drift_detected": drift_detected})

Full code.

Retrain the Model

Next, we will create a Python script responsible for model training. This script takes the combined past and current data as input and saves the trained model as a “model.pkl” file.

Image by Author
def train_model(X_train: pd.DataFrame, y_train: pd.Series, model_params: DictConfig):
y_train_log = np.log1p(y_train)
model = Ridge()
scorer = metrics.make_scorer(rmsle, greater_is_better=True)
params = dict(model_params)

grid = GridSearchCV(model, params, scoring=scorer, cv=3, verbose=3)
grid.fit(X_train, y_train_log)
return grid


model = train_model(X_train, y_train, config.model.params)
joblib.dump(model, "model.pkl")

Full code.

Push to GitHub

After finishing developing these two scripts, data scientists can push them to GitHub, allowing data engineers to use them in creating workflows.

View the GitHub repository for these files here:

Data Engineering Tasks

Popular orchestration libraries such as Airflow, Prefect, and Dagster require modifications to the Python code to use their functionalities.

When Python scripts are tightly integrated into the data workflows, the overall codebase can become more complex and harder to maintain. Without independent Python script development, data engineers may need to modify the data science code to add orchestration logic.

Image by Author

On the other hand, Kestra, an open-source library, allows you to develop your Python scripts independently and then ​​seamlessly incorporate them into data workflows using YAML files.

This way, data scientists can focus on model processing and training, while data engineers can focus on handling orchestration.

Thus, we will use Kestra to design a more modular and efficient workflow.

Image by Author

Clone the detect-data-drift-pipeline repository to get the docker-compose file for Kestra, then run:

docker compose up -d

Navigate to localhost:8080 to access and explore the Kestra UI.

Image by Author

Follow these instructions to configure the required environment for this tutorial.

Before developing the target flows, let’s get familiar with Kestra by creating some simple flows.

Access Postgres Tables From a Python Script

We will create a flow that includes the following tasks:

  • getReferenceTable: Exports a CSV file from a Postgres table.
  • saveReferenceToCSV: Creates a local CSV file that can be accessed by the Python task.
  • runPythonScript: Reads the local CSV file with Python.

To enable data passing between the saveReferenceToCSV and runPythonScript tasks, we will place these two tasks in the same working directory by enclosing them inside the wdir task.

id: get-reference-table
namespace: dev
tasks:
- id: getReferenceTable
type: io.kestra.plugin.jdbc.postgresql.CopyOut
url: jdbc:postgresql://host.docker.internal:5432/
username: "{{secret('POSTGRES_USERNAME')}}"
password: "{{secret('POSTGRES_PASSWORD')}}"
format: CSV
sql: SELECT * FROM reference
header: true
- id: wdir
type: io.kestra.core.tasks.flows.WorkingDirectory
tasks:
- id: saveReferenceToCSV
type: io.kestra.core.tasks.storages.LocalFiles
inputs:
data/reference.csv: "{{outputs.getReferenceTable.uri}}"
- id: runPythonScript
type: io.kestra.plugin.scripts.python.Script
beforeCommands:
- pip install pandas
script: |
import pandas as pd
df = pd.read_csv("data/reference.csv")
print(df.head(10))
Image by Author

Executing the flow will show the following logs:

Image by Author

Parameterize Flow with Inputs

Let’s create another flow that can be parameterized with inputs. This flow will have the following inputs: startDate, endDate, and dataURL.

The getCurrentCSV task can access these inputs using the {{inputs.name}} notation.

id: get-current-table
namespace: dev
inputs:
- name: startDate
type: STRING
defaults: "2011-03-01"
- name: endDate
type: STRING
defaults: "2011-03-31"
- name: dataURL
type: STRING
defaults: "https://raw.githubusercontent.com/khuyentran1401/detect-data-drift-pipeline/main/data/bikeride.csv"
tasks:
- id: getCurrentCSV
type: io.kestra.plugin.scripts.python.Script
beforeCommands:
- pip install pandas
script: |
import pandas as pd
df = pd.read_csv("{{inputs.dataURL}}", parse_dates=["dteday"])
print(f"Getting data from {{inputs.startDate}} to {{inputs.endDate}}")
df = df.loc[df.dteday.between("{{inputs.startDate}}", "{{inputs.endDate}}")]
df.to_csv("current.csv", index=False)
Image by Author

The values of these inputs can be specified in each flow execution.

Image by Author

Load a CSV File into a Postgres Table

The following flow does the following tasks:

  • getCurrentCSV: Runs a Python script to create a CSV file in the working directory.
  • saveFiles: Sends the CSV file from the working directory to Kestra's internal storage.
  • saveToCurrentTable: Loads the CSV file into a Postgres table.
iid: save-current-table
namespace: dev
tasks:
- id: wdir
type: io.kestra.core.tasks.flows.WorkingDirectory
tasks:
- id: getCurrentCSV
type: io.kestra.plugin.scripts.python.Script
beforeCommands:
- pip install pandas
script: |
import pandas as pd
data_url = "https://raw.githubusercontent.com/khuyentran1401/detect-data-drift-pipeline/main/data/bikeride.csv"
df = pd.read_csv(data_url, parse_dates=["dteday"])
df.to_csv("current.csv", index=False)
- id: saveFiles
type: io.kestra.core.tasks.storages.LocalFiles
outputs:
- current.csv
- id: saveToCurrentTable
type: io.kestra.plugin.jdbc.postgresql.CopyIn
url: jdbc:postgresql://host.docker.internal:5432/
username: "{{secret('POSTGRES_USERNAME')}}"
password: "{{secret('POSTGRES_PASSWORD')}}"
from: "{{outputs.saveFiles.uris['current.csv']}}"
table: current
format: CSV
header: true
delimiter: ","
Image by Author

After running this flow, you will see the resulting data in the “current” table within your Postgres database.

Image by Author

Run a File From a GitHub Repository

This flow includes the following tasks:

  • cloneRepository: Clones a public GitHub repository
  • runPythonCommand: Executes a Python script from a CLI

Both of these tasks will operate within the same working directory.

id: clone-repository
namespace: dev
tasks:
- id: wdir
type: io.kestra.core.tasks.flows.WorkingDirectory
tasks:
- id: cloneRepository
type: io.kestra.plugin.git.Clone
url: https://github.com/khuyentran1401/detect-data-drift-pipeline
branch: main
- id: runPythonCommand
type: io.kestra.plugin.scripts.python.Commands
commands:
- python src/example.py
Image by Author

After running the flow, you will see the following logs:

Image by Author

Run a Flow on Schedule

We will create another flow that runs a flow based on a specific schedule. The following flow runs at 11:00 AM every Monday.

id: triggered-flow
namespace: dev
tasks:
- id: hello
type: io.kestra.core.tasks.log.Log
message: Hello world
triggers:
- id: schedule
type: io.kestra.core.models.triggers.types.Schedule
cron: "0 11 * * MON"

Upload to S3

This flow includes the following tasks:

  • createPickle: Generates a pickle file in Python
  • savetoPickle: Transfers the pickle file to Kestra's internal storage
  • upload: Uploads the pickle file to an S3 bucket
id: upload-to-S3
namespace: dev
tasks:
- id: wdir
type: io.kestra.core.tasks.flows.WorkingDirectory
tasks:
- id: createPickle
type: io.kestra.plugin.scripts.python.Script
script: |
import pickle
data = [1, 2, 3]
with open('data.pkl', 'wb') as f:
pickle.dump(data, f)
- id: saveToPickle
type: io.kestra.core.tasks.storages.LocalFiles
outputs:
- data.pkl
- id: upload
type: io.kestra.plugin.aws.s3.Upload
accessKeyId: "{{secret('AWS_ACCESS_KEY_ID')}}"
secretKeyId: "{{secret('AWS_SECRET_ACCESS_KEY_ID')}}"
region: us-east-2
from: '{{outputs.saveToPickle.uris["data.pkl"]}}'
bucket: bike-sharing
key: data.pkl
Image by Author

After running this flow, the data.pkl file will be uploaded to the "bike-sharing" bucket.

Image by Author

Put Everything Together

Build a Flow to Detect Data Drift

Now, let’s combine what we have learned to create a flow to detect data drift. At 11:0 AM every Monday, this flow executes the following tasks:

  • Fetches reference data from the Postgres database.
  • Runs a Python script to get the current production data from the web.
  • Clones the GitHub repository containing the drift detection code
  • Runs a Python script to data drift by comparing the reference and current data.
  • Appends the current data to the existing Postgres database.
Image by Author
id: detect-data-drift
namespace: dev
inputs:
- name: startDate
type: STRING
defaults: "2011-03-01"
- name: endDate
type: STRING
defaults: "2011-03-31"
- name: data_url
type: STRING
defaults: "https://raw.githubusercontent.com/khuyentran1401/detect-data-drift-pipeline/main/data/bikeride.csv"
tasks:
- id: getReferenceTable
type: io.kestra.plugin.jdbc.postgresql.CopyOut
url: jdbc:postgresql://host.docker.internal:5432/
username: "{{secret('POSTGRES_USERNAME')}}"
password: "{{secret('POSTGRES_PASSWORD')}}"
format: CSV
sql: SELECT * FROM reference
header: true
- id: wdir
type: io.kestra.core.tasks.flows.WorkingDirectory
tasks:
- id: cloneRepository
type: io.kestra.plugin.git.Clone
url: https://github.com/khuyentran1401/detect-data-drift-pipeline
branch: main
- id: saveReferenceToCSV
type: io.kestra.core.tasks.storages.LocalFiles
inputs:
data/reference.csv: "{{outputs.getReferenceTable.uri}}"
- id: getCurrentCSV
type: io.kestra.plugin.scripts.python.Script
beforeCommands:
- pip install pandas
script: |
import pandas as pd
df = pd.read_csv("{{inputs.data_url}}", parse_dates=["dteday"])
print(f"Getting data from {{inputs.startDate}} to {{inputs.endDate}}")
df = df.loc[df.dteday.between("{{inputs.startDate}}", "{{inputs.endDate}}")]
df.to_csv("data/current.csv", index=False)
- id: detectDataDrift
type: io.kestra.plugin.scripts.python.Commands
beforeCommands:
- pip install -r src/detect/requirements.txt
commands:
- python src/detect/detect_data_drift.py
- id: saveFileInStorage
type: io.kestra.core.tasks.storages.LocalFiles
outputs:
- data/current.csv
- id: saveToCurrentTable
type: io.kestra.plugin.jdbc.postgresql.CopyIn
url: jdbc:postgresql://host.docker.internal:5432/
username: "{{secret('POSTGRES_USERNAME')}}"
password: "{{secret('POSTGRES_PASSWORD')}}"
from: "{{outputs.saveFileInStorage.uris['data/current.csv']}}"
table: current
format: CSV
header: true
delimiter: ","
triggers:
- id: schedule
type: io.kestra.core.models.triggers.types.Schedule
cron: "0 11 * * MON"

Build a Flow to Send Slack Messages

Next, we will create a flow to send Slack messages via a Slack Webhook URL when the detectDataDrift task inside the detect-data-drift flow returns drift_detected=true.

Image by Author
id: send-slack-message
namespace: dev
tasks:
- id: send
type: io.kestra.plugin.notifications.slack.SlackExecution
url: "{{secret('SLACK_WEBHOOK')}}"
customMessage: Detect data drift

triggers:
- id: listen
type: io.kestra.core.models.triggers.types.Flow
conditions:
- type: io.kestra.core.models.conditions.types.ExecutionFlowCondition
namespace: dev
flowId: detect-data-drift
- type: io.kestra.core.models.conditions.types.VariableCondition
expression: "{{outputs.detectDataDrift.vars.drift_detected}} == true"

After running the detect-data-drift flow, the send-slack-message flow will send a message on Slack.

Image by Author

Build a Flow to Retrain the Model

Lastly, we will create a flow to retrain the model. This flow executes the following tasks:

  • Exports a CSV file from the current table in the Postgres database
  • Clones the GitHub repository containing the model training code
  • Runs a Python script to train the model and generates a pickle file
  • Uploads the pickle file to S3
Image by Author
id: train-model
namespace: dev
tasks:
- id: getCurrentTable
type: io.kestra.plugin.jdbc.postgresql.CopyOut
url: jdbc:postgresql://host.docker.internal:5432/
username: "{{secret('POSTGRES_USERNAME')}}"
password: "{{secret('POSTGRES_PASSWORD')}}"
format: CSV
sql: SELECT * FROM current
header: true
- id: wdir
type: io.kestra.core.tasks.flows.WorkingDirectory
tasks:
- id: cloneRepository
type: io.kestra.plugin.git.Clone
url: https://github.com/khuyentran1401/detect-data-drift-pipeline
branch: main
- id: saveCurrentToCSV
type: io.kestra.core.tasks.storages.LocalFiles
inputs:
data/current.csv: "{{outputs.getCurrentTable.uri}}"
- id: trainModel
type: io.kestra.plugin.scripts.python.Commands
beforeCommands:
- pip install -r src/train/requirements.txt
commands:
- python src/train/train_model.py
- id: saveToPickle
type: io.kestra.core.tasks.storages.LocalFiles
outputs:
- model.pkl
- id: upload
type: io.kestra.plugin.aws.s3.Upload
accessKeyId: "{{secret('AWS_ACCESS_KEY_ID')}}"
secretKeyId: "{{secret('AWS_SECRET_ACCESS_KEY_ID')}}"
region: us-east-2
from: '{{outputs.saveToPickle.uris["model.pkl"]}}'
bucket: bike-sharing
key: model.pkl
triggers:
- id: listenFlow
type: io.kestra.core.models.triggers.types.Flow
conditions:
- type: io.kestra.core.models.conditions.types.ExecutionFlowCondition
namespace: dev
flowId: detect-data-drift
- type: io.kestra.core.models.conditions.types.VariableCondition
expression: "{{outputs.detectDataDrift.vars.drift_detected}} == true"After running this flow, the model.pkl file will be uploaded to the "bike-sharing" bucket.

After running this flow, the model.pkl file will be uploaded to the “bike-sharing” bucket.

Image by Author

Ideas to Extend This Workflow

Rather than relying on scheduled data pulls to identify data drift, we can leverage Grafana’s outgoing webhook and Kestra’s inbound webhook to establish real-time data monitoring and trigger a flow instantly when data drift occurs. This approach enables the detection of data drift as soon as it happens, eliminating the need to wait for a scheduled script to run.

Image by Author

Let me know in the comments how you think this workflow could be extended and what other use cases you would like to see in future content.

I love writing about data science concepts and playing with different data science tools. You can stay up-to-date with my latest posts by:

--

--