Day 41 of 50 Days of Python: Workflow Orchestration with Airflow or Luigi
Part of Week 6: Advanced Topics
Welcome back to the series! Day 40 had us exposing our model via FastAPI. Today, we will cover how to automate model retraining, data ingestion and batch inference using two popular Python workflow engines:
Apache Airflow
Spotify Luigi
What We Will Cover
→ The concepts of tasks, dependencies, DAGs, and schedulers.
→ How Airflow’s @dag decorator & Luigi’s Task classes map to the same ideas.
→ Configuring cron‑style schedules, retries, and alerts.
→ Monitoring runs via the Airflow UI & Luigi central scheduler.
→ Strategies to keep your workflows idempotent and reproducible.
Key Concepts
Schedule - When the DAG runs.
Retries - How many times to retry for flaky tasks
Retry Delay - When you want to give it x amount of time before retrying, usually when an API has a limit on immediate calls.
Concurrency - How many task you can run in parallel.
Task Timeout - Prevents jobs hanging about and wasting compute costs.
Alerting - Notifications on failure/success.
Hands On: Simple ETL → Train → Publish DAG
Option A: Apache Airflow
from airflow import DAG
from airflow.decorators import task
from pendulum import datetime
with DAG(
dag_id="calihouse_pipeline",
start_date=datetime(2025, 6, 30, tz="UTC"),
schedule="0 2 * * *", # run nightly at 02:00 UTC
catchup=False,
default_args={"retries": 2, "retry_delay": 300},
) as dag:
@task
def extract():
# imagine pulling CSV from S3
import pandas as pd, boto3
# ... download & save to /tmp/housing.csv
return "/tmp/housing.csv"
@task
def transform(path):
import pandas as pd
df = pd.read_csv(path)
# feature engineering ...
out = "/tmp/housing_clean.csv"
df.to_csv(out, index=False)
return out
@task
def train(clean_path):
import pandas as pd, tensorflow as tf, numpy as np
from sklearn.model_selection import train_test_split
df = pd.read_csv(clean_path)
X, y = df.iloc[:, :-1], df["median_house_value"]
X_train, _, y_train, _ = train_test_split(X, y, test_size=0.2, random_state=42)
scaler_path = "/tmp/scaler.joblib"
# ... fit scaler, model, save both
model_path = "/tmp/calihouse.h5"
model.save(model_path)
return model_path
@task
def publish(model_path):
import boto3, pathlib
s3 = boto3.client("s3")
s3.upload_file(model_path, "ml‑models", f"calihouse/{pathlib.Path(model_path).name}")
publish(train(transform(extract())))
Launch Airflow via Docker Compose (official quick‑start bundle) and watch the graph in the UI.
Option B: Luigi
import luigi, pandas as pd, joblib, tensorflow as tf
class Extract(luigi.Task):
date = luigi.DateParameter()
def output(self):
return luigi.LocalTarget(f"data/raw_{self.date}.csv")
def run(self):
# download logic ...
with self.output().open("w") as f:
f.write("dummy csv")
class Transform(luigi.Task):
date = luigi.DateParameter()
def requires(self):
return Extract(self.date)
def output(self):
return luigi.LocalTarget(f"data/clean_{self.date}.csv")
def run(self):
df = pd.read_csv(self.input().path)
# cleaning ...
df.to_csv(self.output().path, index=False)
class Train(luigi.Task):
date = luigi.DateParameter()
def requires(self):
return Transform(self.date)
def output(self):
return luigi.LocalTarget(f"models/model_{self.date}.h5")
def run(self):
df = pd.read_csv(self.input().path)
# training ...
model = tf.keras.Sequential()
model.save(self.output().path)
if __name__ == "__main__":
luigi.build([Train(date="2025‑06‑30")], workers=4, local_scheduler=True)
Run via python pipeline.py or luigid + web UI.
Comparing Airflow and Luigi
Rule of thumb: pick Airflow for complex, multi‑team data platforms; choose Luigi for lightweight pipelines and research projects.
TL;DR
Define DAG = Extract → Transform → Train → Publish using Airflow (@task) or Luigi (Task subclasses).
Schedule nightly via cron (0 2 * * *) with 2 retries + 5 min delay.
Monitor via Airflow UI or Luigi central scheduler; add email/Slack alerts.
Choose Airflow for scale & plugins; Luigi for minimal setup.
Next Up: Day 42 - Setting Up CI/CD for Python Data Projects.
Day 42 is the final day of the advanced topics week. Which is absolutely crazy as this technically means we are in our final week before the series comes to a close. Really hope everyone has enjoyed the content so far!
Day 42 as the title says, will cover setting up CI./CD for Python data projects specifically. The focus will be around GitHub Actions, GitHub Container Registry and AWS ECR. As well as the key concepts of the CI/CD lifecycle.
So keep reading, keep learning and as always… Happy coding!