Sense Worker — Components (L3)¶
Inside the Sense Worker container.
The Worker has two execution surfaces: Kafka consumers (reactive) and Hangfire jobs (scheduled). Both share the same DI container, the same WorkerDbContext, and the same observability pipeline.
Kafka consumers (KafkaFlow batch middlewares)¶
Each consumer is registered as a singleton batch middleware that creates an async DI scope per Kafka batch — necessary because EF Core DbContext is request-scoped.
| Middleware | Topic | Output |
|---|---|---|
TrackMetadataBatchMiddleware |
axion.sense.track.metadata |
ClickHouse tracks + frames; produces FrameRecognitionRequest to recognition_requests (Vision Worker intake) and TrackMatchingRequestEvent back to axion.sense.track.metadata (Vision Matching intake). Also consumes TrackMatchingResultEvent from the same topic — header-filtered — to apply matched routes and set is_map_matched=true. |
AuditEventBatchMiddleware |
axion.sense.audit.events |
ClickHouse audit_log (via ClickHouseAuditBatcher) |
CitylensTrackLifecycleBatchMiddleware |
axion.sense.citylens.track.lifecycle |
Postgres CitylensTrackMapping + ClickHouse tracks |
CitylensFrameLifecycleBatchMiddleware |
axion.sense.citylens.frame.lifecycle |
ClickHouse frames |
CitylensDetectionBatchMiddleware |
axion.sense.citylens.detections |
ClickHouse detections |
Common pipeline middlewares¶
Wrapped around every consumer:
FilterByEnv— drops messages with environment suffixes that don't match (lets dev/staging share a Kafka cluster without crosstalk).MessageTypeShortNameResolver— typed deserialization viamessage-typeheader. Unknown short names deserialize tonulland are silently acked instead of crashing the consumer.RequestIdProducerMiddleware— propagates the inboundrequest-idheader into outbound messages and OpenTelemetry spans for distributed tracing.
Hangfire jobs¶
Scheduler runs in-process. All recurring jobs are registered in Worker/Program.cs:
| Job | Cron | What it does |
|---|---|---|
PartmanMaintenanceJob |
Daily | CALL run_maintenance() on pg_partman — creates upcoming partitions, drops aged-out ones. |
AccessExpirationJob |
Every 12h | Sweeps expired org memberships and revokes their OpenFGA tuples. |
CustomLayersSyncJob |
Configurable (e.g. daily) | External RDBMS → tippecanoe → PMTiles → S3. Gated by CustomLayerSyncOptions.Enabled. |
CoverageSyncJob |
Configurable | ClickHouse frames → H3 cells → FlatGeobuf → tippecanoe → PMTiles → S3. Gated by CoverageSyncOptions.Enabled. |
CitylensSyncJob |
0 0 31 2 * (never) |
One-time bulk import of legacy Citylens data. Manual trigger only. Gated by CitylensSyncOptions.Enabled. See Citylens initial migration flow. |
HangfireTelemetryFilter subscribes to job state changes and emits OpenTelemetry spans/metrics.
Special: CitylensKafkaStartupService¶
A IHostedService that gates the three Citylens Kafka consumers. It reads a flag from Postgres (citylens_initial_migration_completed_at) and:
- Pauses the consumers while the bulk migration is running, so realtime events don't race against the backfill.
- Resumes them once the flag is set.
This prevents a class of consistency bug where a realtime event for track X arrives before the bulk migration has inserted track X's mapping row.
ClickHouse audit batcher¶
ClickHouseAuditBatcher buffers audit events from AuditEventBatchMiddleware:
- Flushes when buffer size hits
Kafka:Audit:BatchSize(default 1000) or everyKafka:Audit:FlushInterval(default 5s), whichever comes first. - Uses ClickHouse's bulk-insert protocol (
INSERT INTO audit_log VALUES (...) (...)with prepared columns). - Writes are at-least-once; the audit pipeline keeps duplicates out at the producer (Kafka idempotent producer + at-most-once acceptance per
request_id), so the underlying plainReplicatedMergeTreetable stays append-only.
External integrations¶
ValhallaClient¶
HTTP client to the in-cluster Valhalla service. Called by RoadDataServiceImpl.ConfirmTrack indirectly (via Kafka → Worker) to map-match the raw GPS track to OSM road segments. The matched track replaces the raw one in ClickHouse.
FrameAttributeApiClient¶
Optional HTTP client. When FrameAttributeApi:Url is set, the Worker enriches each frame with quality attributes (blur score, exposure, detector signals) before persisting it. When unset, frames go through unannotated.
WorkerDbContext¶
EF Core 9 + Npgsql. Targets axion_sense_tasks. Holds:
- Hangfire's standard tables (
HangfireJob,HangfireState, etc.) via Hangfire.PostgreSql 1.21. - Worker-only domain tables (e.g.
CitylensTrackMapping, the Citylens migration completion flag).
Observability¶
Every Kafka consume span and every Hangfire job span carries:
org.id,track.id,request.idattributes when present.- Topic / job name as the span name.
- Message offset and partition for Kafka.
Errors increment a worker.errors_total counter (Prometheus-exported via SigNoz's OTel collector).
Configuration¶
Worker-specific sections, all bound via IOptions:
Kafka:Audit:{BatchSize, FlushInterval}Kafka:Citylens:{Enabled, BatchSize, MaxConcurrency}CoverageSyncOptions:{Enabled, Cron, OutputBucket}CustomLayerSyncOptions:{Enabled, Cron, ExternalDb}CitylensSyncOptions:{Enabled, BatchSize, CitylensConnectionString}FrameAttributeApi:{Url, ApiKey, TimeoutSeconds}Hangfire:{ConnectionString, DashboardEnabled}