Skip to main content

Event Collection with Dagster

This guide covers using Dagster to manage event collection jobs.

Quick Start

cd /home/vibemap/Vibemap-Analysis-dev
dagster dev -m dagster_vibemap

Dagster UI runs at http://localhost:3000 (mapped to https://data.vibemap.com/dagster).

Jobs

Event Collection

event_collection_pipeline — Primary event collection job. Four-step pipeline with CSV exports.

StepAssetDescriptionEventLink Status
1. Discoverevent_pipeline_discoverFind event URLs, save to EventLink tablepending
2. Scrapeevent_pipeline_scrapeAI-powered data extractionscraped
3. Validateevent_pipeline_validateData quality checksvalidated
4. Storeevent_pipeline_storeCreate HotspotsEvent recordsstored

Each step also produces a CSV export (export_discovered_links, export_scraped_links, export_validated_links, export_stored_events).

Each step can be run individually via standalone jobs: event_discovery, event_scrape_links, event_validate_links, event_store_links.

event_collection — Combined discovery + scraping in one step via event_pipeline_run. Used by DataForSeo and SF Live schedules.

Event Processing

event_processing — Validate listings (check_listings_events) and apply NLP categorization (nlp_categories_events).

event_quality_check — Validate recent event data (check_recent_events + check_listings_events).

full_event_pipeline — Runs everything: scrape, categorize, validate. Manual runs only.

Simpleview Sync

simpleview_events_sync — Sync events from Simpleview Peoria API.

simpleview_places_sync — Sync places/listings from Simpleview Peoria API.

Schedules

Enable schedules in the UI under Overview > Schedules.

Event Collection

ScheduleJobFrequencyConfig
dataforseo_events_scheduleevent_collectionEvery 8 hrsExcludes sf-live
sf_live_events_scheduleevent_collectionThu 2 AMsf-live, both strategies
king_of_prussia_events_scheduleevent_collection_pipelineThu 2 AMKing of Prussia, PA
fayetteville_events_scheduleevent_collection_pipelineWed 2 AMFayetteville, AR
deep_ellum_events_scheduleevent_collection_pipelineFri 2 AMDeep Ellum, TX

Event Processing & Quality

ScheduleJobFrequency
check_events_scheduleevent_processingEvery 4 hrs
nlp_events_scheduleevent_processingDaily 2 AM
event_quality_scheduleevent_quality_checkDaily 3 AM

Simpleview

ScheduleJobFrequency
simpleview_events_schedulesimpleview_events_syncEvery 4 hrs
simpleview_places_schedulesimpleview_places_syncDaily 1 AM

Run Config for Pipeline Jobs

When launching event_collection_pipeline manually, use the correct op names:

ops:
event_pipeline_discover:
config:
editorial_category: "events_feed"
num_per_batch: 10
boundary: "King of Prussia, PA"
theme: "king-of-prussia"
event_pipeline_scrape:
config:
limit: 100
event_pipeline_validate:
config:
limit: 100
event_pipeline_store:
config:
theme: "king-of-prussia"
send_email: true

Valid op names: event_pipeline_discover, event_pipeline_scrape, event_pipeline_validate, event_pipeline_store.

CLI Usage

# Run full pipeline
dagster job launch -m dagster_vibemap -j event_collection_pipeline

# Run a single step
dagster asset materialize -m dagster_vibemap --select event_pipeline_discover

# Run event processing
dagster job launch -m dagster_vibemap -j event_processing

CSV Exports

The pipeline automatically exports CSVs at each step to exports/<date>/<run_id>/:

  • 1_discover_progress.csv
  • 2_scrape_progress.csv
  • 3_validate_progress.csv
  • 4_store_progress.csv

Troubleshooting

  • Config error "unexpected config entries": Use the correct op names listed above, not asset names.
  • Schedule not running: Check it's enabled in the UI. Ensure dagster-daemon is running in production.
  • Asset fails: Check run logs in the UI. Verify the underlying Django command works manually.
  • Scrapy timeout: Default is 2 hours; adjust in assets/data_import.py if needed.