Skip to content

Kafka

The async backbone of the platform. Decouples API from Worker, Worker from external ML, and brings Citylens data in as a stream.

At a glance

Property Value
Distribution Apache Kafka (KRaft mode — no ZooKeeper)
Deployed Externally managed (cloud-managed Kafka or customer infra)
Ports 9092 (in-cluster), 9094 (external listener)
Client lib KafkaFlow 3.1 (.NET)
UI Kafbat Kafka UI (https://axion-kafka-ui-staging.dev.axionx.ai)
Format JSON (with KafkaFlow short-name typed deserialization)
Default replication 3 (where supported)

Topic catalog

Sense ↔ Vision (frame recognition + map matching)

Topic Producer Consumer Key Notes
axion.sense.track.metadata Sense API + Sense Worker; Vision Matching Sense Worker (TrackMetadataBatchMiddleware); Vision Matching (group axion-sense-vision-map-matching) track_id Multi-purpose: frame, track, track-status-update for the Mobile→Backend flow; TrackMatchingRequestEvent (Sense Worker → Vision Matching) and TrackMatchingResultEvent (Vision Matching → Sense Worker) for map matching, distinguished by the message-type header. Both ends sharing one topic with track_id keying guarantees per-track ordering.
recognition_requests (+ recognition_requests.high) Sense Worker Vision Worker (process_recognition_request, group axion-sense-vision-incoming) track_id One FrameRecognitionRequest per frame. The .high priority tier is drained before the base topic by Vision's priority_polling_loop.
vision_frames_lifecycle (+ vision_frames_lifecycle.high) Vision Worker, Vision Quality, Vision Detections API Vision Worker results handler (axion-sense-vision-results); Vision Quality (axion-sense-vision-quality); per-detector dispatch loops (axion-sense-vision-recognition-{name}); Vision Clusterization location estimation (axion-sense-vision-loc-estimation) frame_id Discriminated union of PredictionRequiredEvent, PredictionDoneEvent (incl. QualityPredictionDoneEvent), LocationEstimationRequiredEvent. Each consumer filters by type / detector_name; non-matches are silently acked.
clusterization_requests Vision Clusterization (estimate_location loop, after enriching per-bbox lat/lon) Vision Clusterization (clusterize_detections subscriber, group axion-sense-vision-clusterization) H3 res-2 cell Spatially close frames land on the same partition for cluster locality. Carries the enriched ClusterizationRequestEvent.

Vision results never travel back to Sense via Kafka — Vision writes detection rows directly into the shared axion_sense.detections table, and Sense reads from it. The only Vision → Sense Kafka path is TrackMatchingResultEvent on axion.sense.track.metadata.

Sense internal

Topic Producer Consumer Key Notes
axion.sense.audit.events Sense API Sense Worker (AuditEventBatchMiddleware) request_id Batched into ClickHouse audit_log

Citylens ingestion

Topic Producer Consumer Key Notes
axion.sense.citylens.track.lifecycle Citylens Sense Worker (CitylensTrackLifecycleBatchMiddleware) track_id Realtime track upserts from Citylens
axion.sense.citylens.frame.lifecycle Citylens Sense Worker (CitylensFrameLifecycleBatchMiddleware) track_id Realtime frame upserts
axion.sense.citylens.detections Citylens Sense Worker (CitylensDetectionBatchMiddleware) track_id Realtime detection upserts

Vision priority tiers

recognition_requests and vision_frames_lifecycle each ship with a .high sibling. Vision's priority_polling_loop (in vision/common/priority_polling.py) accepts (consumer, handler, priority_label) tuples ordered highest-to-lowest, drains the first consumer entirely before advancing — guaranteeing high-priority frames are processed ahead of normal traffic. Tier suffixes are configurable via kafka.topics.recognitionRequests.priorities.

Message format conventions

Headers (always present)

Header Purpose
message-type KafkaFlow short name → CLR type (e.g. frame, track, recognition-request). Drives typed deserialization.
request-id End-to-end correlation ID. Propagated across producer/consumer hops by RequestIdProducerMiddleware.
org-id Tenant scope, when applicable.
env Environment marker (prod, staging, dev). FilterByEnv middleware ignores cross-env messages.

Body

JSON via System.Text.Json, camelCase, default option set in Axion.Sense.Common. We don't currently use Avro/Protobuf for Kafka payloads — JSON is simpler for the operations team to inspect in the Kafka UI, and the CPU cost is negligible at our throughput.

Why per-track keying

Every Sense topic uses track_id as the key. This guarantees:

  • A single track's events land in one partition → consumed in order.
  • Different tracks are processed in parallel (limited by partition count).
  • Re-balances don't reorder events within a track.

Audit uses request_id as the key — different invariant (we want to keep the audit pieces of a single request together but don't care about cross-request order).

Why JSON, not Avro/Protobuf

We considered Avro and Protobuf (the contracts repo already uses Protobuf). JSON won for Kafka payloads because:

  • The operations team already uses Kafka UI to spot-check messages — JSON renders without a schema registry.
  • Most messages are small; the size overhead is a few percent.
  • KafkaFlow's MessageTypeShortNameResolver gives us typed deserialization without a registry, which is the part Avro is good at.

We accept that this means less schema enforcement at the broker level. Producers and consumers compile against the same DTO classes in Axion.Sense.Common, so divergence is caught at build time.

Consumer pattern

KafkaFlow batch middleware:

public class TrackMetadataBatchMiddleware(
    IServiceScopeFactory scopes
) : IMessageMiddleware
{
    public async Task Invoke(IMessageContext ctx, MiddlewareDelegate next) {
        await using var scope = scopes.CreateAsyncScope();
        var processor = scope.ServiceProvider.GetRequiredService<TrackMetadataBatchProcessor>();
        await processor.HandleBatch((Batch)ctx.Message.Value);
        await next(ctx);
    }
}

The middleware itself is a singleton (no allocation per message). It opens a DI scope per batch because EF Core DbContext is scoped, not singleton. The scope is disposed when the batch finishes; on exception the offset isn't committed and the batch is replayed.

Topic management

Topics are created idempotently by Axion.Sense.Worker.dll --migrate. Configuration:

  • Partitions: tuned per topic (default 12 for Sense topics, 6 for audit, 24 for high-volume Citylens topics).
  • Retention: 7 days for transactional topics (recognition.{requests,responses}, track.metadata); 30 days for audit (so ClickHouse can replay if needed).
  • Compaction: not used — we want full history within the retention window.

Failure handling

  • Batch middlewares don't catch exceptions — a failure bubbles up, the offset isn't committed, and KafkaFlow replays the batch on the next poll.
  • A poison message will block the partition until it succeeds or is manually skipped (offset reset via Kafka admin tool).

Observability

  • KafkaFlow's OpenTelemetry middleware emits spans per message.
  • Consumer lag exported to SigNoz (Kafka exporter); alert when lag > 5 minutes for any topic.
  • Producer ack rate, retry count, error count exported.

Operations

  • Topic resize / re-partition: avoid mid-life — moves data and disrupts ordering. Plan partition counts at creation.
  • Consumer rebalance is normal during rolling Worker deploys; KafkaFlow handles it gracefully.
  • Replay a topic from a specific offset: stop the consumer, reset the offset (via Kafka admin tool), restart.