Skip to main content

Pipeline Tracking Best Practices

Overview

This guide explains how to track which records have been processed by Dagster pipelines and maintain proper logging between Django and Dagster.

Model Field Recommendations

Current Fields in HotspotsPlace

The model already has some pipeline tracking fields:

# Existing in api/models/places.py
last_pipeline = models.CharField(max_length=50, null=True, blank=True,
help_text="The last completed data pipeline class")
last_sync = models.DateTimeField(null=True, blank=True)

Add these fields to api/models/places.py for comprehensive tracking:

from django.contrib.postgres.fields import ArrayField

class HotspotsPlace(Thing):
# ... existing fields ...

# Enhanced pipeline tracking
pipeline_status = models.CharField(
max_length=50,
null=True,
blank=True,
choices=[
('pending', 'Pending'),
('processing', 'Processing'),
('completed', 'Completed'),
('failed', 'Failed'),
('skipped', 'Skipped'),
],
help_text="Current status in the pipeline"
)

pipeline_stage = models.CharField(
max_length=100,
null=True,
blank=True,
help_text="Current pipeline stage (e.g., 'import', 'enrich', 'scrape', 'nlp', 'images')"
)

pipeline_run_id = models.CharField(
max_length=255,
null=True,
blank=True,
db_index=True,
help_text="Dagster run ID for tracking which batch this was processed in"
)

pipeline_metadata = models.JSONField(
encoder=DjangoJSONEncoder,
default=dict,
blank=True,
help_text="Metadata about pipeline processing (errors, metrics, timestamps)"
)

# More granular tracking
enrichment_completed_at = models.DateTimeField(
null=True,
blank=True,
help_text="When the enrichment pipeline completed for this place"
)

enrichment_steps_completed = ArrayField(
models.CharField(max_length=50),
default=list,
blank=True,
help_text="List of completed enrichment steps"
)

Migration Script

Create a migration:

python manage.py makemigrations api --name add_pipeline_tracking_fields
python manage.py migrate

Usage Patterns

Pattern 1: Track Individual Assets

Update each asset to mark progress:

# In dagster_vibemap/assets/place_enrichment.py

@asset(group_name="place_enrichment")
def import_overture_places_enrichment(context, config):
"""Import from Overture with tracking."""

# Get Dagster run ID
run_id = context.run_id

# Build command with tracking
command = [
"import_overture_places",
"--city", config.city,
"--dagster-run-id", run_id, # Pass to Django command
"--pipeline-stage", "import",
]

metadata = run_django_command(command, context)
return Output(value=metadata, metadata=metadata)

Pattern 2: Update Django Management Commands

Modify each command to accept tracking parameters:

# In api/management/commands/import_overture_places.py

class Command(BaseCommand):
def add_arguments(self, parser):
# ... existing arguments ...

# Pipeline tracking arguments
parser.add_argument(
'--dagster-run-id',
type=str,
help='Dagster run ID for tracking'
)
parser.add_argument(
'--pipeline-stage',
type=str,
default='import',
help='Pipeline stage name'
)

def handle(self, *args, **options):
run_id = options.get('dagster_run_id')
stage = options.get('pipeline_stage', 'import')

# ... existing logic ...

for idx, row in gdf.iterrows():
# ... create/update place ...

# Track pipeline progress
place.pipeline_status = 'processing'
place.pipeline_stage = stage
place.pipeline_run_id = run_id
place.last_pipeline = f'dagster_{stage}'
place.last_sync = timezone.now()

# Store metadata
if not place.pipeline_metadata:
place.pipeline_metadata = {}

place.pipeline_metadata[stage] = {
'completed_at': timezone.now().isoformat(),
'dagster_run_id': run_id,
'source': 'overture_maps',
'confidence': float(confidence) if confidence else None,
}

# Track completed steps
if stage not in place.enrichment_steps_completed:
place.enrichment_steps_completed.append(stage)

place.save()

# Mark overall completion
logger.info(f"Completed {stage} for {created_count} places in run {run_id}")

Pattern 3: Batch Status Updates

For performance, update status in batches:

# In api/management/commands/update_data_sources.py

from django.db import transaction
from django.utils import timezone

def handle(self, *args, **options):
run_id = options.get('dagster_run_id')
stage = 'enrich'

places = queryset[:num_rows]

# Process places...

# Batch update at end
with transaction.atomic():
HotspotsPlace.objects.filter(
id__in=[p.id for p in successfully_processed]
).update(
pipeline_status='completed',
pipeline_stage=stage,
pipeline_run_id=run_id,
last_sync=timezone.now()
)

# Update metadata individually (JSONField can't bulk update easily)
for place in successfully_processed:
place.pipeline_metadata[stage] = {
'completed_at': timezone.now().isoformat(),
'dagster_run_id': run_id,
'urls_found': place.data_sources_count,
}
place.enrichment_steps_completed.append(stage)
place.save(update_fields=['pipeline_metadata', 'enrichment_steps_completed'])

Querying Pipeline Status

Find Places at Specific Pipeline Stages

# Places that completed import but not enrichment
places_needing_enrichment = HotspotsPlace.objects.filter(
enrichment_steps_completed__contains=['import']
).exclude(
enrichment_steps_completed__contains=['enrich']
)

# Places that failed
failed_places = HotspotsPlace.objects.filter(
pipeline_status='failed'
)

# Places from a specific Dagster run
run_places = HotspotsPlace.objects.filter(
pipeline_run_id='12345-abcde-67890'
)

Pipeline Progress Dashboard Query

-- Pipeline stage summary
SELECT
pipeline_stage,
pipeline_status,
COUNT(*) as count,
MAX(last_sync) as last_processed
FROM api_hotspotsplace
WHERE pipeline_run_id IS NOT NULL
GROUP BY pipeline_stage, pipeline_status
ORDER BY pipeline_stage, pipeline_status;

-- Recent pipeline runs
SELECT
pipeline_run_id,
pipeline_stage,
COUNT(*) as places_processed,
COUNT(*) FILTER (WHERE pipeline_status = 'completed') as completed,
COUNT(*) FILTER (WHERE pipeline_status = 'failed') as failed
FROM api_hotspotsplace
WHERE last_sync >= NOW() - INTERVAL '7 days'
GROUP BY pipeline_run_id, pipeline_stage;

Logging Best Practices

1. Use Django's Logger with JSON Output

Configure Django logging to output JSON that Dagster can parse:

# In hotspots/settings.py or local_settings.py

import logging

LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'json': {
'()': 'pythonjsonlogger.jsonlogger.JsonFormatter',
'format': '%(asctime)s %(name)s %(levelname)s %(message)s %(pathname)s %(lineno)d'
},
'simple': {
'format': '%(levelname)s %(message)s'
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'simple', # Use 'json' for production
'stream': 'ext://sys.stdout',
},
'file': {
'class': 'logging.handlers.RotatingFileHandler',
'filename': '/tmp/vibemap_logs/django_pipeline.log',
'maxBytes': 10485760, # 10MB
'backupCount': 5,
'formatter': 'json',
},
},
'loggers': {
'django': {
'handlers': ['console', 'file'],
'level': 'INFO',
},
'api': {
'handlers': ['console', 'file'],
'level': 'INFO',
},
'etl': {
'handlers': ['console', 'file'],
'level': 'INFO',
},
},
}

2. Structured Logging in Management Commands

Use structured logging with extra context:

# In any management command

import logging
from django.utils import timezone

logger = logging.getLogger(__name__)

class Command(BaseCommand):
def handle(self, *args, **options):
run_id = options.get('dagster_run_id')
city = options.get('city')

# Structured log entry
logger.info(
"Starting Overture import",
extra={
'dagster_run_id': run_id,
'city': city,
'stage': 'import',
'timestamp': timezone.now().isoformat(),
}
)

try:
# ... process places ...

logger.info(
"Import completed successfully",
extra={
'dagster_run_id': run_id,
'places_created': created_count,
'places_updated': updated_count,
'places_skipped': skipped_count,
'duration_seconds': duration,
}
)
except Exception as e:
logger.error(
"Import failed",
extra={
'dagster_run_id': run_id,
'error': str(e),
'traceback': traceback.format_exc(),
},
exc_info=True
)
raise

3. Progress Logging for Long Operations

Use tqdm with logging for visibility:

from tqdm import tqdm
import logging

logger = logging.getLogger(__name__)

for idx, place in enumerate(tqdm(places, desc="Processing places")):
# Log every N items
if idx % 50 == 0:
logger.info(
f"Progress: {idx}/{len(places)}",
extra={
'progress_percent': (idx / len(places)) * 100,
'items_processed': idx,
'items_total': len(places),
}
)

4. Capture Logs in Dagster Asset

The run_django_command helper already captures stdout/stderr:

# In dagster_vibemap/assets/data_import.py

def run_django_command(command: list[str], context: AssetExecutionContext) -> dict:
"""Execute Django command and capture logs."""

result = subprocess.run(
full_command,
capture_output=True,
text=True,
timeout=3600,
)

# Dagster automatically captures this
if result.stdout:
context.log.info(f"Command output:\n{result.stdout}")

if result.stderr:
context.log.warning(f"Command stderr:\n{result.stderr}")

# Parse structured logs from output
metadata = {
"command": " ".join(command),
"exit_code": result.returncode,
"success": result.returncode == 0,
"stdout_preview": MetadataValue.text(result.stdout[:1000]),
}

# Extract metrics from logs (if using JSON logging)
try:
import json
for line in result.stdout.split('\n'):
if line.strip().startswith('{'):
log_entry = json.loads(line)
if 'places_created' in log_entry:
metadata['places_created'] = log_entry['places_created']
if 'places_updated' in log_entry:
metadata['places_updated'] = log_entry['places_updated']
except:
pass

return metadata

Advanced: Dagster Sensors for Failed Places

Create a sensor to detect and retry failed places:

# In dagster_vibemap/sensors.py

from dagster import sensor, RunRequest, SkipReason
from api.models import HotspotsPlace

@sensor(
job=place_enrichment_pipeline_job,
minimum_interval_seconds=3600, # Check hourly
)
def failed_places_retry_sensor(context):
"""Retry places that failed in previous runs."""

# Query for recently failed places
failed_places = HotspotsPlace.objects.filter(
pipeline_status='failed',
last_sync__gte=timezone.now() - timedelta(hours=24)
).values_list('pipeline_run_id', flat=True).distinct()

if not failed_places:
return SkipReason("No failed places found")

# Create run requests for each failed batch
for run_id in failed_places[:5]: # Limit to 5 retries at once
yield RunRequest(
run_config={
"ops": {
"import_overture_places_enrichment": {
"config": {
"retry_failed_run_id": run_id
}
}
}
},
tags={"retry_for_run": run_id}
)

Example: Complete Tracking Flow

Here's how a place flows through the pipeline with full tracking:

# Step 1: Import (import_overture_places.py)
place = HotspotsPlace(
name="Cool Cafe",
address="123 Main St, Oakland, CA",
pipeline_status='processing',
pipeline_stage='import',
pipeline_run_id='abc-123',
pipeline_metadata={
'import': {
'completed_at': '2025-12-07T10:00:00Z',
'source': 'overture_maps',
'confidence': 0.95
}
},
enrichment_steps_completed=['import']
)
place.save()

# Step 2: Enrich (update_data_sources.py)
place.pipeline_stage = 'enrich'
place.pipeline_metadata['enrich'] = {
'completed_at': '2025-12-07T10:05:00Z',
'urls_found': 5,
'website_confidence': 85.5
}
place.enrichment_steps_completed.append('enrich')
place.save()

# Step 3: Scrape (scrape_places_for_completeness.py)
place.pipeline_stage = 'scrape'
place.pipeline_metadata['scrape'] = {
'completed_at': '2025-12-07T10:10:00Z',
'fields_updated': ['description', 'phone', 'hours'],
'completeness_before': 0.4,
'completeness_after': 0.75
}
place.enrichment_steps_completed.append('scrape')
place.save()

# Step 4: NLP (nlp_categories.py)
place.pipeline_stage = 'nlp'
place.pipeline_metadata['nlp'] = {
'completed_at': '2025-12-07T10:15:00Z',
'categories_assigned': ['food', 'coffee'],
'vibes_assigned': ['chill', 'cozy']
}
place.enrichment_steps_completed.append('nlp')
place.save()

# Step 5: Images (get_business_images.py)
place.pipeline_stage = 'images'
place.pipeline_status = 'completed' # Final step!
place.enrichment_completed_at = timezone.now()
place.pipeline_metadata['images'] = {
'completed_at': '2025-12-07T10:20:00Z',
'images_found': 8,
'best_image_score': 92.3
}
place.enrichment_steps_completed.append('images')
place.save()

Monitoring and Alerts

Create Admin View for Pipeline Status

# In api/admin.py

@admin.register(HotspotsPlace)
class HotspotsPlaceAdmin(admin.ModelAdmin):
list_display = ['name', 'pipeline_status', 'pipeline_stage', 'last_sync']
list_filter = ['pipeline_status', 'pipeline_stage', 'last_sync']
search_fields = ['pipeline_run_id', 'name']

fieldsets = [
# ... existing fieldsets ...
('Pipeline Tracking', {
'fields': [
'pipeline_status',
'pipeline_stage',
'pipeline_run_id',
'enrichment_steps_completed',
'enrichment_completed_at',
'pipeline_metadata',
'last_pipeline',
'last_sync',
],
'classes': ['collapse'],
}),
]

Dagster Asset Checks

Add asset checks to validate data quality:

# In dagster_vibemap/assets/place_enrichment.py

from dagster import AssetCheckResult, asset_check

@asset_check(asset=place_enrichment_summary)
def validate_enrichment_completion(context):
"""Check that all places completed all steps."""

from api.models import HotspotsPlace

incomplete = HotspotsPlace.objects.filter(
pipeline_run_id=context.run_id,
pipeline_status='processing'
).count()

passed = incomplete == 0

return AssetCheckResult(
passed=passed,
metadata={
"incomplete_places": incomplete,
"message": f"Found {incomplete} incomplete places" if not passed else "All places completed"
}
)

Summary

Key Principles:

  1. ✅ Pass --dagster-run-id to all Django commands
  2. ✅ Update pipeline_status and pipeline_stage at each step
  3. ✅ Store rich metadata in pipeline_metadata JSONField
  4. ✅ Use structured logging (JSON) for Dagster to parse
  5. ✅ Log to stdout (captured by Dagster automatically)
  6. ✅ Track completed steps in array field for queries
  7. ✅ Use transactions for batch status updates
  8. ✅ Add indexes on tracking fields for performance

This approach gives you:

  • Full visibility in Dagster UI
  • Ability to resume from failures
  • Rich debugging information
  • Performance metrics per stage
  • Easy querying of pipeline state