Event Streams
Standard subscriptions deliver coalesced state notifications. Multiple writes to the same entity within a coalescing window are merged into a single notification containing the final converged state. This is the right default for most consumers, but some use cases need to observe every individual operation.
Event streams deliver a notification for every ASSERT, PATCH, and RETRACT operation, in order, without coalescing. Each event identifies the source that produced it, carries a monotonic sequence number, and includes the raw assert payload (not the merged state).
When to use event streams
Section titled “When to use event streams”Use event streams when you need:
- Audit logs. Record every write operation with its source identity.
- Replay and projection. Rebuild derived state by replaying the full sequence of operations.
- Per-source tracking. React to individual source contributions rather than the merged result.
- External event buses. Forward individual operations to Kafka, NATS, or similar systems.
For rendering UI, syncing mirror databases, or building local caches, standard subscriptions are a better fit. They handle coalescing and convergence for you.
Opting in
Section titled “Opting in”Event streams are opt-in per entity kind. Set EventStream = true on the attribute:
[ConvergenceEntity("PlayerAction", EventStream = true)]public partial struct PlayerAction{ [EntityId] public ReadOnlyMemory<byte> EntityId { get; set; } public int ActionType { get; set; } public float TargetX { get; set; } public float TargetY { get; set; }}// Coming soonThis flag is included in the schema registration. The server allocates the per-assert event ring buffer only for kinds that opt in. Kinds without EventStream = true pay zero cost.
Subscribing to events
Section titled “Subscribing to events”Use SubscribeEventsAsync on the kind handle:
await foreach (var evt in actions.SubscribeEventsAsync(ct: ct)){ Console.WriteLine($"[{evt.Sequence}] source={evt.SourceId} type={evt.Type}");
if (evt.Entity is { } entity) { Console.WriteLine($" ActionType={entity.ActionType} Target=({entity.TargetX}, {entity.TargetY})"); }}// Coming soonEvent subscriptions are live-only. There is no bootstrap mode because events are ephemeral and not retained after delivery.
The EntityEvent type
Section titled “The EntityEvent type”Each event in the stream is an EntityEvent<T> with these fields:
| Property | Description |
|---|---|
Type | Created, Updated, or Retracted. |
Version | The committed entity version from the flush cycle. Multiple events in the same coalescing window share this version. |
PrevVersion | The committed entity version before this flush cycle. 0 for Created events (entity did not exist). Useful for detecting gaps in event processing. |
SourceId | Which source (0-63) produced this operation. |
Sequence | Partition-global monotonic sequence number. Provides a total ordering across all events. |
Entity | The raw assert payload deserialized as T. This is the individual assert data, not the converged state. Null for Retracted events. |
PreviousEntity | For Updated: the entity state that ChangedFields was computed against (committed state for the first assert in a coalescing window, accumulated state for subsequent asserts). For Retracted: the entity state immediately before the retraction. Null for Created events. |
ChangedFields | Bitmask of which fields this assert changed relative to the previous state. All bits set for Created, zero for Retracted. |
PresenceMask | For PATCH operations: which fields were present in the partial update. |
Metadata | Opaque metadata from the assert or retract. See Metadata Passthrough. |
Self-contained diffs
Section titled “Self-contained diffs”Each update event carries both Entity (the new values) and PreviousEntity (the state before this assert). Combined with ChangedFields, these three properties form a coherent diff triple. You can see exactly what each field was before and after this specific assert:
if (evt.Type == EventType.Updated && evt.PreviousEntity is { } prev){ if (evt.HasChanged(PlayerAction.Fields.TargetX)) { Console.WriteLine($"TargetX: {prev.TargetX} -> {evt.Entity!.Value.TargetX}"); }}// Coming soonField-level change detection
Section titled “Field-level change detection”EntityEvent<T> has the same HasChanged method as regular notifications:
if (evt.HasChanged(PlayerAction.Fields.TargetX)){ Console.WriteLine("Target position changed");}// Coming soonEvent types
Section titled “Event types”| Type | Meaning |
|---|---|
Created | First assert for a previously non-existent entity. |
Updated | An assert or patch that modifies an existing entity. |
Retracted | A source retracted its assertion. PreviousEntity contains the entity state before retraction; Entity is null. |
Typical consumer pattern
Section titled “Typical consumer pattern”A common pattern is to process events in a background loop and forward them to an external system. Wrap each event in a try-catch so that a single malformed entity or transient error does not kill the subscription:
var actions = await client.RegisterKindAsync<PlayerAction>();
await foreach (var evt in actions.SubscribeEventsAsync(ct: stoppingToken)){ try { switch (evt.Type) { case EventType.Created: case EventType.Updated: await externalBus.PublishAsync(new ActionEvent { Sequence = evt.Sequence, SourceId = evt.SourceId, ActionType = evt.Entity!.Value.ActionType, TargetX = evt.Entity.Value.TargetX, TargetY = evt.Entity.Value.TargetY, }, stoppingToken); break;
case EventType.Retracted: await externalBus.PublishAsync(new ActionRetracted { Sequence = evt.Sequence, SourceId = evt.SourceId, // PreviousEntity contains the entity state before retraction. LastActionType = evt.PreviousEntity?.ActionType, }, stoppingToken); break; } } catch (Exception ex) { logger.LogError(ex, "Failed to process event {Sequence}", evt.Sequence); }}// Coming soonAccessing entity data on retraction
Section titled “Accessing entity data on retraction”For Retracted events, Entity is null but PreviousEntity contains the entity state before retraction. A clean way to handle both Created/Updated and Retracted events uniformly is to use null-coalescing:
await foreach (var evt in listings.SubscribeEventsAsync(ct: stoppingToken)){ // Entity is set for Created/Updated, PreviousEntity for Retracted. var entity = evt.Entity ?? evt.PreviousEntity; if (entity is not { } e) continue;
// Use 'e' uniformly regardless of event type. await ProcessAsync(e, evt.Type, evt.Metadata);}// Coming soonDelta detection with metadata
Section titled “Delta detection with metadata”When the writer attaches metadata to operations, event consumers can use it for routing and filtering. Combined with PreviousEntity, this enables patterns like trade detection (detecting when a quantity field decreased):
await foreach (var evt in listings.SubscribeEventsAsync(ct: stoppingToken)){ if (evt.Type == EventType.Updated && evt.Entity is { } current && evt.PreviousEntity is { } prev && current.Quantity < prev.Quantity) { var reducer = evt.Metadata?.GetString("rd"); var tradedQty = prev.Quantity - current.Quantity; await RecordTradeAsync(tradedQty, reducer); }}// Coming soonCombining event streams with state subscriptions
Section titled “Combining event streams with state subscriptions”A single kind can have both a state subscription (for bootstrap and coalesced updates) and an event subscription (for per-operation processing) active simultaneously. This is useful when you need a local mirror of current state and also need to react to individual operations:
var listings = await client.ResolveKindAsync<AuctionListing>();
// State subscription: bootstrap + live coalesced updates for local mirror.var stateTask = Task.Run(async () =>{ await foreach (var change in listings.SubscribeAsync(bootstrap: true, ct: ct)) { SyncToDatabase(change); }});
// Event subscription: per-operation trade detection (no bootstrap, events are ephemeral).var eventTask = Task.Run(async () =>{ await foreach (var evt in listings.SubscribeEventsAsync(ct: ct)) { DetectTrade(evt); }});
await Task.WhenAll(stateTask, eventTask);// Coming soonThe two subscriptions are independent. The state subscription delivers coalesced snapshots and is suitable for maintaining a database mirror. The event subscription delivers every individual operation and is suitable for auditing, trade detection, or forwarding to an external event bus.
Event streams vs. subscriptions
Section titled “Event streams vs. subscriptions”| Subscriptions | Event streams | |
|---|---|---|
| Granularity | One notification per entity per flush cycle (coalesced). | One event per individual ASSERT/PATCH/RETRACT. |
| Payload | Converged/merged entity state. | Raw assert payload from a single source. |
| Source identity | SourceSet bitmask (which sources assert the entity). | SourceId (the specific source that produced this event). |
| Ordering | Per-entity version ordering. | Partition-global monotonic Sequence. |
| Bootstrap | Supported. Delivers a snapshot of all current entities. | Not supported. Events are ephemeral. |
| Opt-in | Always available. | Requires EventStream = true on the entity kind. |
| Epochs | Notifications delivered as normal during epochs. | Events suppressed during epochs (and for all server-synthesised operations). |
| Cost | Default. No extra server-side buffering. | Server allocates a per-assert event ring buffer for opted-in kinds. |
Epochs and server-synthesised operations
Section titled “Epochs and server-synthesised operations”Event streams only record source-originated operations. Server-synthesised operations are excluded:
- Source epochs: Assertions, patches, and retractions made during an active epoch do not produce events. Epochs seed “current” state; they do not represent “what happened”.
- Epoch stale retractions: When
EpochEndAsync()synthesises retractions for entities not re-asserted, these do not produce events. - Liveness deadline retractions: When a source’s liveness timer fires and the server retracts all of its entities, these do not produce events.
In all three cases, standard subscriptions still produce Created, Updated, and Deleted notifications as normal. Only event stream recording is suppressed.
After an epoch completes, subsequent assertions from the source produce events normally. Sequence numbers remain contiguous: suppressed operations are never assigned a sequence, so event consumers see no gaps.
Performance characteristics
Section titled “Performance characteristics”Event streaming is zero-cost for kinds that do not opt in. The server only allocates and maintains the event ring buffer for kinds with EventStream = true in their schema.
For opted-in kinds, the server records each individual operation into the ring buffer during the accumulation phase. This adds a small amount of memory and CPU overhead proportional to the write rate for that kind. The ring buffer is bounded, so slow event subscribers are disconnected the same way slow state subscribers are (see Backpressure).