Event Collection with Dagster
This guide covers using Dagster to manage event collection jobs.
Quick Start
cd /home/vibemap/Vibemap-Analysis-dev
dagster dev -m dagster_vibemap
Dagster UI runs at http://localhost:3000 (mapped to https://data.vibemap.com/dagster).
Jobs
Event Collection
event_collection_pipeline — Primary event collection job. Four-step pipeline with CSV exports.
| Step | Asset | Description | EventLink Status |
|---|---|---|---|
| 1. Discover | event_pipeline_discover | Find event URLs, save to EventLink table | pending |
| 2. Scrape | event_pipeline_scrape | AI-powered data extraction | scraped |
| 3. Validate | event_pipeline_validate | Data quality checks | validated |
| 4. Store | event_pipeline_store | Create HotspotsEvent records | stored |
Each step also produces a CSV export (export_discovered_links, export_scraped_links, export_validated_links, export_stored_events).
Each step can be run individually via standalone jobs: event_discovery, event_scrape_links, event_validate_links, event_store_links.
event_collection — Combined discovery + scraping in one step via event_pipeline_run. Used by DataForSeo and SF Live schedules.
Event Processing
event_processing — Validate listings (check_listings_events) and apply NLP categorization (nlp_categories_events).
event_quality_check — Validate recent event data (check_recent_events + check_listings_events).
full_event_pipeline — Runs everything: scrape, categorize, validate. Manual runs only.
Simpleview Sync
simpleview_events_sync — Sync events from Simpleview Peoria API.
simpleview_places_sync — Sync places/listings from Simpleview Peoria API.
Schedules
Enable schedules in the UI under Overview > Schedules.
Event Collection
| Schedule | Job | Frequency | Config |
|---|---|---|---|
dataforseo_events_schedule | event_collection | Every 8 hrs | Excludes sf-live |
sf_live_events_schedule | event_collection | Thu 2 AM | sf-live, both strategies |
king_of_prussia_events_schedule | event_collection_pipeline | Thu 2 AM | King of Prussia, PA |
fayetteville_events_schedule | event_collection_pipeline | Wed 2 AM | Fayetteville, AR |
deep_ellum_events_schedule | event_collection_pipeline | Fri 2 AM | Deep Ellum, TX |
Event Processing & Quality
| Schedule | Job | Frequency |
|---|---|---|
check_events_schedule | event_processing | Every 4 hrs |
nlp_events_schedule | event_processing | Daily 2 AM |
event_quality_schedule | event_quality_check | Daily 3 AM |
Simpleview
| Schedule | Job | Frequency |
|---|---|---|
simpleview_events_schedule | simpleview_events_sync | Every 4 hrs |
simpleview_places_schedule | simpleview_places_sync | Daily 1 AM |
Run Config for Pipeline Jobs
When launching event_collection_pipeline manually, use the correct op names:
ops:
event_pipeline_discover:
config:
editorial_category: "events_feed"
num_per_batch: 10
boundary: "King of Prussia, PA"
theme: "king-of-prussia"
event_pipeline_scrape:
config:
limit: 100
event_pipeline_validate:
config:
limit: 100
event_pipeline_store:
config:
theme: "king-of-prussia"
send_email: true
Valid op names: event_pipeline_discover, event_pipeline_scrape, event_pipeline_validate, event_pipeline_store.
CLI Usage
# Run full pipeline
dagster job launch -m dagster_vibemap -j event_collection_pipeline
# Run a single step
dagster asset materialize -m dagster_vibemap --select event_pipeline_discover
# Run event processing
dagster job launch -m dagster_vibemap -j event_processing
CSV Exports
The pipeline automatically exports CSVs at each step to exports/<date>/<run_id>/:
1_discover_progress.csv2_scrape_progress.csv3_validate_progress.csv4_store_progress.csv
Troubleshooting
- Config error "unexpected config entries": Use the correct op names listed above, not asset names.
- Schedule not running: Check it's enabled in the UI. Ensure
dagster-daemonis running in production. - Asset fails: Check run logs in the UI. Verify the underlying Django command works manually.
- Scrapy timeout: Default is 2 hours; adjust in
assets/data_import.pyif needed.