Skip to main content

Place Enrichment Pipeline Guide

Overview

The Place Enrichment Pipeline is a comprehensive DAG (Directed Acyclic Graph) in Dagster that orchestrates the complete workflow for importing, enriching, and analyzing place data. This pipeline transforms raw location data from Overture Maps into fully enriched business listings with categories, images, and detailed metadata.

Pipeline Architecture

┌─────────────────────────────────────────────────────────────────┐
│ Place Enrichment Pipeline │
└─────────────────────────────────────────────────────────────────┘

Step 1: Import Overture Places

│ Output: Raw places (name, address, location, basic metadata)

Step 2: Update Data Sources

│ Output: Places enriched with websites, ratings, reviews, social links

Step 3: Scrape for Completeness

│ Output: Places with complete data (phone, email, hours, description)

Step 4: NLP Categorization

│ Output: Places categorized with vibes (chill, upscale, etc.)

Step 5: Extract Business Images

│ Output: Places with scored, high-quality images

Summary Report

Pipeline Steps

Step 1: Import Overture Places

Asset: import_overture_places_enrichment Command: python manage.py import_overture_places --city <city>

Imports raw place data from Overture Maps:

  • Place names
  • Geographic coordinates
  • Addresses
  • Basic business categories
  • Confidence scores

Outputs:

  • New HotspotsPlace records created
  • Initial data in data_sources JSONField

Step 2: Update Data Sources

Asset: update_data_sources_enrichment Command: python manage.py update_data_sources --parallel

Enriches places with external data from DataForSEO:

  • Website URLs (with confidence scoring)
  • Google ratings and review counts
  • Social media profiles (Instagram, Facebook, Yelp)
  • Business descriptions
  • Additional metadata

Key Features:

  • Parallel processing with rate limiting
  • Confidence-based URL selection
  • Domain validation
  • Duplicate detection

Outputs:

  • url, instagram, facebook, yelp fields populated
  • data_sources array expanded with search results
  • aggregate_rating and aggregate_rating_count updated

Step 3: Scrape Places for Completeness

Asset: scrape_places_completeness_enrichment Command: python manage.py scrape_places_for_completeness --use-gemini --use-structured

Uses Crawl4AI and Gemini AI to extract structured business data:

  • Complete business descriptions
  • Contact information (phone, email)
  • Operating hours
  • Social media links (Twitter, Instagram, Facebook)
  • High-quality images

Only processes places with:

  • Completeness score < 0.6 (configurable)
  • Valid website URL (from Step 2)
  • Minimum data sources (default: 1)

Outputs:

  • description, telephone, email, hours fields populated
  • Social media fields enhanced
  • featured_image set if available

Step 4: NLP Categorization

Asset: nlp_categories_enrichment Command: python manage.py nlp_categories --type places

Applies ML-based categorization using Nyckel models:

  • Categories: food, drink, entertainment, health, etc.
  • Vibes: chill, upscale, family-friendly, romantic, etc.

Uses enhanced descriptions from Step 3 for better accuracy.

Outputs:

  • categories_level1 populated with primary category
  • vibe_list populated with relevant vibes
  • vibe_count incremented

Step 5: Extract Business Images

Asset: get_business_images_enrichment Command: python manage.py get_business_images --parallel

Scrapes and scores business images:

  • Extracts images from website HTML
  • Checks schema.org structured data
  • Scores images by quality, size, and relevance
  • Falls back to Google Images if needed
  • Uses OCR to filter out low-quality images

Outputs:

  • HotspotsImage records created and linked to places
  • Images scored and ranked
  • Best image set as featured

Step 6: Pipeline Summary

Asset: place_enrichment_summary

Generates a comprehensive report:

  • Success/failure status of each step
  • Metadata from all pipeline stages
  • Overall pipeline health

Usage

Running the Full Pipeline

  1. Open Dagster UI: https://data.vibemap.com/dagster
  2. Navigate to Jobsplace_enrichment_pipeline
  3. Click Launchpad
  4. Configure the run:
ops:
import_overture_places_enrichment:
config:
city: "oakland"
max_completeness: 0.6
limit: 100

# Config is shared across all assets via PlaceEnrichmentConfig
update_data_sources_enrichment:
config:
city: "oakland"
address: "Oakland, CA"
max_completeness: 0.6
limit: 100

scrape_places_completeness_enrichment:
config:
city: "oakland"
max_completeness: 0.6
limit: 100

nlp_categories_enrichment:
config:
city: "oakland"
boundary: null
ed_cat: null

get_business_images_enrichment:
config:
city: "oakland"
limit: 100
  1. Click Launch Run

Via CLI

# Launch the job with default config (Oakland)
dagster job execute -m dagster_vibemap -j place_enrichment_pipeline

# Launch with custom config
dagster job execute -m dagster_vibemap -j place_enrichment_pipeline \
--config-json '{
"ops": {
"import_overture_places_enrichment": {
"config": {
"city": "boston",
"max_completeness": 0.7,
"limit": 200
}
}
}
}'

Running Individual Steps

You can also run individual assets:

# Just import Overture data
dagster asset materialize -m dagster_vibemap -a import_overture_places_enrichment

# Just run NLP categorization
dagster asset materialize -m dagster_vibemap -a nlp_categories_enrichment

# Run Steps 1-3 only
dagster asset materialize -m dagster_vibemap \
-a import_overture_places_enrichment \
-a update_data_sources_enrichment \
-a scrape_places_completeness_enrichment

Configuration Options

PlaceEnrichmentConfig

All assets in the pipeline accept this configuration schema:

ParameterTypeDefaultDescription
citystr"oakland"City name for Overture import (see available cities)
addressstrNoneFilter by address substring (e.g., "Oakland, CA")
boundarystrNoneFilter by boundary name
ed_catstrNoneEditorial category filter
max_completenessfloat0.6Max completeness score for scraping (0.0-1.0)
limitint100Max places to process per step

Available Cities for Overture Import

From import_overture_places.py:

  • oakland, boston, dallas, san_francisco, new_york
  • los_angeles, chicago, seattle, austin, denver
  • miami, portland, houston, east_aldine, la_jolla
  • lynchburg, lynnwood, stevenson, toledo, utica, whittier

Use --list-cities to see full list with bounding boxes.

Scheduling

Manual Scheduling

Add to dagster_vibemap/schedules.py:

from dagster import ScheduleDefinition

# Weekly place enrichment (every Monday at 2am)
place_enrichment_schedule = ScheduleDefinition(
name="weekly_place_enrichment",
job=place_enrichment_pipeline_job,
cron_schedule="0 2 * * 1", # Monday 2am
execution_timezone="America/Los_Angeles",
)

On-Demand Triggers

Use Dagster sensors to trigger based on events:

from dagster import sensor, RunRequest

@sensor(job=place_enrichment_pipeline_job)
def new_customer_onboarding_sensor(context):
"""Trigger enrichment when new customer is added."""
# Your logic to detect new customers
new_customers = get_new_customers()

for customer in new_customers:
yield RunRequest(
run_config={
"ops": {
"import_overture_places_enrichment": {
"config": {
"city": customer.city,
"limit": 500,
}
}
}
},
tags={"customer_id": str(customer.id)}
)

Monitoring and Debugging

View Run Status in Dagster UI

  1. Navigate to Runs
  2. Click on a specific run to see:
    • Step-by-step execution timeline
    • Logs from each asset
    • Metadata (success counts, errors, etc.)
    • Asset lineage graph

Check Asset Metadata

Each asset returns rich metadata:

{
"command": "import_overture_places --city oakland",
"exit_code": 0,
"success": True,
"stdout_preview": "Imported 1,234 places...",
"step": "1_import",
"city": "oakland"
}

Common Issues

Import fails with "City not found"

  • Check city name against available cities
  • Use underscores: east_aldine not east aldine

Data sources step times out

  • Reduce limit in config
  • Check DataForSEO API quota
  • Review --max_workers and --rate_limit settings

Scraping fails for many places

  • Check Crawl4AI/Gemini API credentials
  • Verify places have valid website URLs
  • Review max_completeness threshold

NLP categorization produces poor results

  • Ensure Step 3 ran successfully (enriched descriptions)
  • Check Nyckel API credentials
  • Review training data quality

Performance Optimization

Parallel Processing

The pipeline uses parallel processing where possible:

  • Step 2 (Data Sources): 6 workers, 2 concurrent API calls
  • Step 5 (Images): 8 workers

Adjust in asset configuration:

command.extend([
"--parallel",
"--max_workers", "10", # Increase workers
"--rate_limit", "3", # More concurrent API calls
])

Batch Size

Control how many places are processed:

config:
limit: 50 # Process 50 places per run

For full city imports, set limit: 10000 or omit.

Resource Usage

Memory: Each step loads places into memory. For large cities:

  • Run in stages (batch by boundary or editorial category)
  • Use address or boundary filters

API Costs:

  • Step 2: DataForSEO API calls (expensive!)
  • Step 3: Gemini API calls (moderate cost)
  • Step 5: Google Images fallback (free but rate-limited)

Monitor costs via Dagster metadata.

Best Practices

New Customer Onboarding

  1. Import raw data

    city=boston
    dagster asset materialize -m dagster_vibemap -a import_overture_places_enrichment \
    --config-json '{"ops": {"import_overture_places_enrichment": {"config": {"city": "'$city'", "limit": 10000}}}}'
  2. Enrich in batches (to control costs)

    # Batch 1: First 100 places
    dagster asset materialize -m dagster_vibemap \
    -a update_data_sources_enrichment \
    --config-json '{"ops": {"update_data_sources_enrichment": {"config": {"address": "Boston, MA", "limit": 100}}}}'

    # Monitor results, then continue with more batches
  3. Run full pipeline once satisfied

    dagster job execute -m dagster_vibemap -j place_enrichment_pipeline

Incremental Updates

For existing customers, run individual steps:

# Just scrape new descriptions
dagster asset materialize -m dagster_vibemap -a scrape_places_completeness_enrichment

# Re-categorize all places (e.g., after model update)
dagster asset materialize -m dagster_vibemap -a nlp_categories_enrichment

Quality Checks

After each run, check:

  1. Completeness scores improved

    SELECT AVG(completeness_score)
    FROM api_hotspotsplace
    WHERE address ILIKE '%Oakland%';
  2. Data source coverage

    SELECT
    COUNT(*) FILTER (WHERE url IS NOT NULL) as has_url,
    COUNT(*) FILTER (WHERE description IS NOT NULL) as has_desc,
    COUNT(*) FILTER (WHERE telephone IS NOT NULL) as has_phone
    FROM api_hotspotsplace;
  3. Image coverage

    SELECT COUNT(DISTINCT hotspots_place_id)
    FROM api_hotspotsimage;

Troubleshooting

Enable Debug Logging

In Dagster UI, view logs for each asset. For more verbose output:

# In place_enrichment.py, modify run_django_command calls:
command.extend(["--verbosity", "2"]) # Django verbosity

Test Individual Commands

Before running in Dagster, test commands manually:

# Test import
python manage.py import_overture_places --city oakland --limit 10 --dry-run

# Test data sources
python manage.py update_data_sources --address "Oakland, CA" --num_rows 5

# Test scraping
python manage.py scrape_places_for_completeness --limit 5 --dry-run

Revert Failed Runs

If a run partially completes, you can:

  1. Re-run entire pipeline (idempotent operations)
  2. Run remaining steps using asset selection
  3. Manually fix data and continue

Asset Lineage

View the dependency graph in Dagster UI:

import_overture_places_enrichment

update_data_sources_enrichment

scrape_places_completeness_enrichment

nlp_categories_enrichment

get_business_images_enrichment

place_enrichment_summary

Each asset is only executed after its dependencies succeed.

Future Enhancements

Potential additions to the pipeline:

  • Duplicate detection and merging
  • Event venue prediction
  • Review sentiment analysis
  • Business hours validation
  • Phone number verification
  • Automated quality scoring
  • Email validation
  • Social media engagement metrics

Support

For issues or questions:

  1. Check Dagster UI logs
  2. Review command output in Django management command
  3. Consult CLAUDE.md for project context
  4. File issues in GitHub repository