Place Enrichment Pipeline Guide
Overview
The Place Enrichment Pipeline is a comprehensive DAG (Directed Acyclic Graph) in Dagster that orchestrates the complete workflow for importing, enriching, and analyzing place data. This pipeline transforms raw location data from Overture Maps into fully enriched business listings with categories, images, and detailed metadata.
Pipeline Architecture
┌─────────────────────────────────────────────────────────────────┐
│ Place Enrichment Pipeline │
└─────────────────────────────────────────────────────────────────┘
Step 1: Import Overture Places
↓
│ Output: Raw places (name, address, location, basic metadata)
↓
Step 2: Update Data Sources
↓
│ Output: Places enriched with websites, ratings, reviews, social links
↓
Step 3: Scrape for Completeness
↓
│ Output: Places with complete data (phone, email, hours, description)
↓
Step 4: NLP Categorization
↓
│ Output: Places categorized with vibes (chill, upscale, etc.)
↓
Step 5: Extract Business Images
↓
│ Output: Places with scored, high-quality images
↓
Summary Report
Pipeline Steps
Step 1: Import Overture Places
Asset: import_overture_places_enrichment
Command: python manage.py import_overture_places --city <city>
Imports raw place data from Overture Maps:
- Place names
- Geographic coordinates
- Addresses
- Basic business categories
- Confidence scores
Outputs:
- New
HotspotsPlacerecords created - Initial data in
data_sourcesJSONField
Step 2: Update Data Sources
Asset: update_data_sources_enrichment
Command: python manage.py update_data_sources --parallel
Enriches places with external data from DataForSEO:
- Website URLs (with confidence scoring)
- Google ratings and review counts
- Social media profiles (Instagram, Facebook, Yelp)
- Business descriptions
- Additional metadata
Key Features:
- Parallel processing with rate limiting
- Confidence-based URL selection
- Domain validation
- Duplicate detection
Outputs:
url,instagram,facebook,yelpfields populateddata_sourcesarray expanded with search resultsaggregate_ratingandaggregate_rating_countupdated
Step 3: Scrape Places for Completeness
Asset: scrape_places_completeness_enrichment
Command: python manage.py scrape_places_for_completeness --use-gemini --use-structured
Uses Crawl4AI and Gemini AI to extract structured business data:
- Complete business descriptions
- Contact information (phone, email)
- Operating hours
- Social media links (Twitter, Instagram, Facebook)
- High-quality images
Only processes places with:
- Completeness score < 0.6 (configurable)
- Valid website URL (from Step 2)
- Minimum data sources (default: 1)
Outputs:
description,telephone,email,hoursfields populated- Social media fields enhanced
featured_imageset if available
Step 4: NLP Categorization
Asset: nlp_categories_enrichment
Command: python manage.py nlp_categories --type places
Applies ML-based categorization using Nyckel models:
- Categories: food, drink, entertainment, health, etc.
- Vibes: chill, upscale, family-friendly, romantic, etc.
Uses enhanced descriptions from Step 3 for better accuracy.
Outputs:
categories_level1populated with primary categoryvibe_listpopulated with relevant vibesvibe_countincremented
Step 5: Extract Business Images
Asset: get_business_images_enrichment
Command: python manage.py get_business_images --parallel
Scrapes and scores business images:
- Extracts images from website HTML
- Checks schema.org structured data
- Scores images by quality, size, and relevance
- Falls back to Google Images if needed
- Uses OCR to filter out low-quality images
Outputs:
HotspotsImagerecords created and linked to places- Images scored and ranked
- Best image set as featured
Step 6: Pipeline Summary
Asset: place_enrichment_summary
Generates a comprehensive report:
- Success/failure status of each step
- Metadata from all pipeline stages
- Overall pipeline health
Usage
Running the Full Pipeline
Via Dagster UI (Recommended)
- Open Dagster UI:
https://data.vibemap.com/dagster - Navigate to Jobs → place_enrichment_pipeline
- Click Launchpad
- Configure the run:
ops:
import_overture_places_enrichment:
config:
city: "oakland"
max_completeness: 0.6
limit: 100
# Config is shared across all assets via PlaceEnrichmentConfig
update_data_sources_enrichment:
config:
city: "oakland"
address: "Oakland, CA"
max_completeness: 0.6
limit: 100
scrape_places_completeness_enrichment:
config:
city: "oakland"
max_completeness: 0.6
limit: 100
nlp_categories_enrichment:
config:
city: "oakland"
boundary: null
ed_cat: null
get_business_images_enrichment:
config:
city: "oakland"
limit: 100
- Click Launch Run
Via CLI
# Launch the job with default config (Oakland)
dagster job execute -m dagster_vibemap -j place_enrichment_pipeline
# Launch with custom config
dagster job execute -m dagster_vibemap -j place_enrichment_pipeline \
--config-json '{
"ops": {
"import_overture_places_enrichment": {
"config": {
"city": "boston",
"max_completeness": 0.7,
"limit": 200
}
}
}
}'
Running Individual Steps
You can also run individual assets:
# Just import Overture data
dagster asset materialize -m dagster_vibemap -a import_overture_places_enrichment
# Just run NLP categorization
dagster asset materialize -m dagster_vibemap -a nlp_categories_enrichment
# Run Steps 1-3 only
dagster asset materialize -m dagster_vibemap \
-a import_overture_places_enrichment \
-a update_data_sources_enrichment \
-a scrape_places_completeness_enrichment
Configuration Options
PlaceEnrichmentConfig
All assets in the pipeline accept this configuration schema:
| Parameter | Type | Default | Description |
|---|---|---|---|
city | str | "oakland" | City name for Overture import (see available cities) |
address | str | None | Filter by address substring (e.g., "Oakland, CA") |
boundary | str | None | Filter by boundary name |
ed_cat | str | None | Editorial category filter |
max_completeness | float | 0.6 | Max completeness score for scraping (0.0-1.0) |
limit | int | 100 | Max places to process per step |
Available Cities for Overture Import
From import_overture_places.py:
oakland,boston,dallas,san_francisco,new_yorklos_angeles,chicago,seattle,austin,denvermiami,portland,houston,east_aldine,la_jollalynchburg,lynnwood,stevenson,toledo,utica,whittier
Use --list-cities to see full list with bounding boxes.
Scheduling
Manual Scheduling
Add to dagster_vibemap/schedules.py:
from dagster import ScheduleDefinition
# Weekly place enrichment (every Monday at 2am)
place_enrichment_schedule = ScheduleDefinition(
name="weekly_place_enrichment",
job=place_enrichment_pipeline_job,
cron_schedule="0 2 * * 1", # Monday 2am
execution_timezone="America/Los_Angeles",
)
On-Demand Triggers
Use Dagster sensors to trigger based on events:
from dagster import sensor, RunRequest
@sensor(job=place_enrichment_pipeline_job)
def new_customer_onboarding_sensor(context):
"""Trigger enrichment when new customer is added."""
# Your logic to detect new customers
new_customers = get_new_customers()
for customer in new_customers:
yield RunRequest(
run_config={
"ops": {
"import_overture_places_enrichment": {
"config": {
"city": customer.city,
"limit": 500,
}
}
}
},
tags={"customer_id": str(customer.id)}
)
Monitoring and Debugging
View Run Status in Dagster UI
- Navigate to Runs
- Click on a specific run to see:
- Step-by-step execution timeline
- Logs from each asset
- Metadata (success counts, errors, etc.)
- Asset lineage graph
Check Asset Metadata
Each asset returns rich metadata:
{
"command": "import_overture_places --city oakland",
"exit_code": 0,
"success": True,
"stdout_preview": "Imported 1,234 places...",
"step": "1_import",
"city": "oakland"
}
Common Issues
Import fails with "City not found"
- Check city name against available cities
- Use underscores:
east_aldinenoteast aldine
Data sources step times out
- Reduce
limitin config - Check DataForSEO API quota
- Review
--max_workersand--rate_limitsettings
Scraping fails for many places
- Check Crawl4AI/Gemini API credentials
- Verify places have valid website URLs
- Review
max_completenessthreshold
NLP categorization produces poor results
- Ensure Step 3 ran successfully (enriched descriptions)
- Check Nyckel API credentials
- Review training data quality
Performance Optimization
Parallel Processing
The pipeline uses parallel processing where possible:
- Step 2 (Data Sources): 6 workers, 2 concurrent API calls
- Step 5 (Images): 8 workers
Adjust in asset configuration:
command.extend([
"--parallel",
"--max_workers", "10", # Increase workers
"--rate_limit", "3", # More concurrent API calls
])
Batch Size
Control how many places are processed:
config:
limit: 50 # Process 50 places per run
For full city imports, set limit: 10000 or omit.
Resource Usage
Memory: Each step loads places into memory. For large cities:
- Run in stages (batch by boundary or editorial category)
- Use
addressorboundaryfilters
API Costs:
- Step 2: DataForSEO API calls (expensive!)
- Step 3: Gemini API calls (moderate cost)
- Step 5: Google Images fallback (free but rate-limited)
Monitor costs via Dagster metadata.
Best Practices
New Customer Onboarding
-
Import raw data
city=boston
dagster asset materialize -m dagster_vibemap -a import_overture_places_enrichment \
--config-json '{"ops": {"import_overture_places_enrichment": {"config": {"city": "'$city'", "limit": 10000}}}}' -
Enrich in batches (to control costs)
# Batch 1: First 100 places
dagster asset materialize -m dagster_vibemap \
-a update_data_sources_enrichment \
--config-json '{"ops": {"update_data_sources_enrichment": {"config": {"address": "Boston, MA", "limit": 100}}}}'
# Monitor results, then continue with more batches -
Run full pipeline once satisfied
dagster job execute -m dagster_vibemap -j place_enrichment_pipeline
Incremental Updates
For existing customers, run individual steps:
# Just scrape new descriptions
dagster asset materialize -m dagster_vibemap -a scrape_places_completeness_enrichment
# Re-categorize all places (e.g., after model update)
dagster asset materialize -m dagster_vibemap -a nlp_categories_enrichment
Quality Checks
After each run, check:
-
Completeness scores improved
SELECT AVG(completeness_score)
FROM api_hotspotsplace
WHERE address ILIKE '%Oakland%'; -
Data source coverage
SELECT
COUNT(*) FILTER (WHERE url IS NOT NULL) as has_url,
COUNT(*) FILTER (WHERE description IS NOT NULL) as has_desc,
COUNT(*) FILTER (WHERE telephone IS NOT NULL) as has_phone
FROM api_hotspotsplace; -
Image coverage
SELECT COUNT(DISTINCT hotspots_place_id)
FROM api_hotspotsimage;
Troubleshooting
Enable Debug Logging
In Dagster UI, view logs for each asset. For more verbose output:
# In place_enrichment.py, modify run_django_command calls:
command.extend(["--verbosity", "2"]) # Django verbosity
Test Individual Commands
Before running in Dagster, test commands manually:
# Test import
python manage.py import_overture_places --city oakland --limit 10 --dry-run
# Test data sources
python manage.py update_data_sources --address "Oakland, CA" --num_rows 5
# Test scraping
python manage.py scrape_places_for_completeness --limit 5 --dry-run
Revert Failed Runs
If a run partially completes, you can:
- Re-run entire pipeline (idempotent operations)
- Run remaining steps using asset selection
- Manually fix data and continue
Asset Lineage
View the dependency graph in Dagster UI:
import_overture_places_enrichment
↓
update_data_sources_enrichment
↓
scrape_places_completeness_enrichment
↓
nlp_categories_enrichment
↓
get_business_images_enrichment
↓
place_enrichment_summary
Each asset is only executed after its dependencies succeed.
Future Enhancements
Potential additions to the pipeline:
- Duplicate detection and merging
- Event venue prediction
- Review sentiment analysis
- Business hours validation
- Phone number verification
- Automated quality scoring
- Email validation
- Social media engagement metrics
Support
For issues or questions:
- Check Dagster UI logs
- Review command output in Django management command
- Consult CLAUDE.md for project context
- File issues in GitHub repository