Skip to content

Sense Worker — Components (L3)

Inside the Sense Worker container.

Sense Worker Components

The Worker has two execution surfaces: Kafka consumers (reactive) and Hangfire jobs (scheduled). Both share the same DI container, the same WorkerDbContext, and the same observability pipeline.

Kafka consumers (KafkaFlow batch middlewares)

Each consumer is registered as a singleton batch middleware that creates an async DI scope per Kafka batch — necessary because EF Core DbContext is request-scoped.

Middleware Topic Output
TrackMetadataBatchMiddleware axion.sense.track.metadata ClickHouse tracks + frames; produces FrameRecognitionRequest to recognition_requests (Vision Worker intake) and TrackMatchingRequestEvent back to axion.sense.track.metadata (Vision Matching intake). Also consumes TrackMatchingResultEvent from the same topic — header-filtered — to apply matched routes and set is_map_matched=true.
AuditEventBatchMiddleware axion.sense.audit.events ClickHouse audit_log (via ClickHouseAuditBatcher)
CitylensTrackLifecycleBatchMiddleware axion.sense.citylens.track.lifecycle Postgres CitylensTrackMapping + ClickHouse tracks
CitylensFrameLifecycleBatchMiddleware axion.sense.citylens.frame.lifecycle ClickHouse frames
CitylensDetectionBatchMiddleware axion.sense.citylens.detections ClickHouse detections

Common pipeline middlewares

Wrapped around every consumer:

  • FilterByEnv — drops messages with environment suffixes that don't match (lets dev/staging share a Kafka cluster without crosstalk).
  • MessageTypeShortNameResolver — typed deserialization via message-type header. Unknown short names deserialize to null and are silently acked instead of crashing the consumer.
  • RequestIdProducerMiddleware — propagates the inbound request-id header into outbound messages and OpenTelemetry spans for distributed tracing.

Hangfire jobs

Scheduler runs in-process. All recurring jobs are registered in Worker/Program.cs:

Job Cron What it does
PartmanMaintenanceJob Daily CALL run_maintenance() on pg_partman — creates upcoming partitions, drops aged-out ones.
AccessExpirationJob Every 12h Sweeps expired org memberships and revokes their OpenFGA tuples.
CustomLayersSyncJob Configurable (e.g. daily) External RDBMS → tippecanoe → PMTiles → S3. Gated by CustomLayerSyncOptions.Enabled.
CoverageSyncJob Configurable ClickHouse frames → H3 cells → FlatGeobuf → tippecanoe → PMTiles → S3. Gated by CoverageSyncOptions.Enabled.
CitylensSyncJob 0 0 31 2 * (never) One-time bulk import of legacy Citylens data. Manual trigger only. Gated by CitylensSyncOptions.Enabled. See Citylens initial migration flow.

HangfireTelemetryFilter subscribes to job state changes and emits OpenTelemetry spans/metrics.

Special: CitylensKafkaStartupService

A IHostedService that gates the three Citylens Kafka consumers. It reads a flag from Postgres (citylens_initial_migration_completed_at) and:

  • Pauses the consumers while the bulk migration is running, so realtime events don't race against the backfill.
  • Resumes them once the flag is set.

This prevents a class of consistency bug where a realtime event for track X arrives before the bulk migration has inserted track X's mapping row.

ClickHouse audit batcher

ClickHouseAuditBatcher buffers audit events from AuditEventBatchMiddleware:

  • Flushes when buffer size hits Kafka:Audit:BatchSize (default 1000) or every Kafka:Audit:FlushInterval (default 5s), whichever comes first.
  • Uses ClickHouse's bulk-insert protocol (INSERT INTO audit_log VALUES (...) (...) with prepared columns).
  • Writes are at-least-once; the audit pipeline keeps duplicates out at the producer (Kafka idempotent producer + at-most-once acceptance per request_id), so the underlying plain ReplicatedMergeTree table stays append-only.

External integrations

ValhallaClient

HTTP client to the in-cluster Valhalla service. Called by RoadDataServiceImpl.ConfirmTrack indirectly (via Kafka → Worker) to map-match the raw GPS track to OSM road segments. The matched track replaces the raw one in ClickHouse.

FrameAttributeApiClient

Optional HTTP client. When FrameAttributeApi:Url is set, the Worker enriches each frame with quality attributes (blur score, exposure, detector signals) before persisting it. When unset, frames go through unannotated.

WorkerDbContext

EF Core 9 + Npgsql. Targets axion_sense_tasks. Holds:

  • Hangfire's standard tables (HangfireJob, HangfireState, etc.) via Hangfire.PostgreSql 1.21.
  • Worker-only domain tables (e.g. CitylensTrackMapping, the Citylens migration completion flag).

Observability

Every Kafka consume span and every Hangfire job span carries:

  • org.id, track.id, request.id attributes when present.
  • Topic / job name as the span name.
  • Message offset and partition for Kafka.

Errors increment a worker.errors_total counter (Prometheus-exported via SigNoz's OTel collector).

Configuration

Worker-specific sections, all bound via IOptions:

  • Kafka:Audit:{BatchSize, FlushInterval}
  • Kafka:Citylens:{Enabled, BatchSize, MaxConcurrency}
  • CoverageSyncOptions:{Enabled, Cron, OutputBucket}
  • CustomLayerSyncOptions:{Enabled, Cron, ExternalDb}
  • CitylensSyncOptions:{Enabled, BatchSize, CitylensConnectionString}
  • FrameAttributeApi:{Url, ApiKey, TimeoutSeconds}
  • Hangfire:{ConnectionString, DashboardEnabled}