Flow — Citylens Realtime Sync¶
Steady-state ingestion: Citylens publishes lifecycle events; Sense Worker keeps both stores eventually consistent.
This is the post-migration regime. The initial migration brought the data over; realtime sync keeps it current.
Topics consumed¶
| Topic | Middleware | Effect |
|---|---|---|
axion.sense.citylens.track.lifecycle |
CitylensTrackLifecycleBatchMiddleware |
Upsert ClickHouse tracks; create CitylensTrackMapping row if missing |
axion.sense.citylens.frame.lifecycle |
CitylensFrameLifecycleBatchMiddleware |
Upsert ClickHouse frames |
axion.sense.citylens.detections |
CitylensDetectionBatchMiddleware |
Upsert ClickHouse detections |
All three use Kafka key = track_id for per-track ordering.
Pipeline overview¶
Three Synchronizer middlewares share a single Postgres mapping table to translate Citylens IDs into Axion track IDs. Track events are the source of truth — they create the mapping. Frame and detection events look it up and produce remapped events into the Axion Sense topics (and S3 for detailed GPS).
Sequence¶
Why this shape¶
Why ClickHouse upserts via writer-side DELETE + INSERT¶
We want at-least-once semantics from Kafka without leaving duplicates in the read path. Each Synchronizer issues a lightweight DELETE by primary key (with mutations_sync = 1, allow_nondeterministic_mutations = 1) before its INSERT, so re-delivery after a consumer restart leaves a single row.
We tried ReplacingMergeTree(updated_at) first — it collapses duplicates during background merges, which sounds ideal — but querying it correctly required SELECT … FINAL, and the FINAL cost killed read latency under load. Plain ReplicatedMergeTree plus client-side dedup keeps reads fast.
Why the mapping table is shared with the initial migration¶
The same CitylensTrackMapping rows that the initial migration created are the lookup table for realtime events. After migration completes:
- An event for an already-imported track → mapping found, upsert.
- A truly new track born in Citylens after migration → no mapping yet, create one.
This means realtime sync can keep working past the migration without distinguishing between "new" and "imported" tracks.
Why fail loudly on missing mapping for frames/detections¶
Track events should arrive before frame and detection events for the same track (Citylens emits them in order on the same Kafka key). If they don't (Citylens-side bug, out-of-order replay), letting the batch fail and block the partition is safer than silently creating a synthetic mapping — we want a human to look at it.
Why audit events for every Citylens-sourced row¶
Provenance tracking. Operators want to know which detections came from Citylens vs. from the live ML pipeline. The audit row carries source="citylens" so the audit table can be filtered by data origin.
Why republish to clusterization¶
Once detections land, the recognition pipeline's downstream stage (clusterization — grouping nearby detections into "objects") should run for them too. Rather than build a parallel clusterization path for Citylens, we re-emit on the existing topic and reuse the same pipeline. One way in.
Ordering and parallelism¶
- Per-track ordering is preserved (Kafka key =
track_id). - Cross-track parallelism is the consumer's
MaxConcurrencysetting (Kafka:Citylens:MaxConcurrency). - A track's events can interleave with frames and detections for other tracks, but never with its own.
Failure modes¶
| Failure | What happens | Recovery |
|---|---|---|
| Citylens publishes a malformed event (schema drift) | KafkaFlow's typed deserializer fails; offset not committed and the batch keeps replaying until it succeeds or the offset is skipped manually. | Operator inspects via Kafbat UI, decides to fix Citylens, update the deserializer, or skip the offending offset. |
| ClickHouse temporarily unavailable | Insert fails; consumer offset not committed; on next poll, the batch is replayed. | At-least-once redelivery + writer-side DELETE+INSERT dedup = correct. |
| Mapping lookup fails because Citylens emits a brand-new track ID we haven't seen | Track-lifecycle middleware creates the mapping. Frame/detection middlewares treat as missing and fail the batch — partition stalls until the mapping appears. | Operator either re-publishes the missing track event, then the stalled batch succeeds on replay. |
| Worker pod crashes mid-batch | Offset not committed; replay handles the rest. | No action needed. |
| Kafka topic deletion / rebalance | Consumer rebalances; partitions reassigned. | Built-in to KafkaFlow. |
| Initial migration flag is unset (somehow) | CitylensKafkaStartupService keeps consumers paused. |
Set the flag manually after verifying migration completeness. |
Backpressure¶
Tunable via config (appsettings.json or env):
Kafka:Citylens:BatchSize— how many messages a middleware processes per scope. Default 500.Kafka:Citylens:MaxConcurrency— how many partitions a single Worker pod consumes in parallel. Default 4.Kafka:Citylens:LingerMs— wait time to fill a batch before flushing. Default 100ms.
Higher batch size → fewer ClickHouse round trips, but bigger replay if the pod crashes. Lower batch size → smoother memory profile.
Code references¶
CitylensTrackLifecycleBatchMiddleware,CitylensFrameLifecycleBatchMiddleware,CitylensDetectionBatchMiddleware:axion.sense.backend/src/Axion.Sense.Worker/KafkaConsumers/Citylens/- Audit event schema:
axion.sense.backend/src/Axion.Sense.Common/Models/AuditEvent.cs.