hashicorp/consul
Streaming and materialized views
Most Consul reads used to be polling: the agent issued a blocking RPC, waited until the data changed, then re-issued. With many agents and lots of subscribers, this generates a lot of leader load. The streaming subsystem replaces polling with a one-way push channel of typed events, and the agent rebuilds local materialized views from those events.
Purpose
- Server-side: publish events on every state-store mutation that downstream consumers might care about.
- Wire: transport events over a long-lived gRPC stream with per-topic backpressure.
- Agent-side: consume events and maintain a materialized view of the same data; serve cached reads from that view.
The result: regardless of how many local clients query the catalog, the agent maintains a single subscription per topic, and the server fans out only the deltas.
Directory layout
agent/consul/
├── stream/ # The event bus and per-topic publishers
│ ├── event_publisher.go
│ ├── subscription.go
│ ├── snapshot.go
│ ├── ...
├── subscribe_backend.go # gRPC handler for the streaming subscribe service
├── catalog_events.go # Catalog event encoders + topic registration
├── config_entry_events.go # Config-entry event encoders
├── acl_events.go # ACL event encoders
├── connect_ca_events.go # Connect CA event encoders
agent/submatview/ # Agent-side materialized view machinery
proto/private/pbsubscribe/ # The wire formatKey abstractions
| Type | File | Purpose |
|---|---|---|
EventPublisher |
agent/consul/stream/event_publisher.go |
Owns topic registries; publishes events from FSM apply hooks |
Topic |
agent/consul/stream/topic.go |
Identifies a stream of events (e.g., catalog.services, connect.roots) |
Snapshot / SnapshotFunc |
agent/consul/stream/snapshot.go |
Initial state burst when a subscriber attaches |
Subscription |
agent/consul/stream/subscription.go |
Per-subscriber cursor with backpressure |
submatview.Materializer |
agent/submatview/materializer.go |
Agent-side: applies events to a typed in-memory view |
submatview.Store |
agent/submatview/store.go |
Pool of materializers keyed by request |
pbsubscribe.SubscribeRequest |
proto/private/pbsubscribe/subscribe.proto |
The gRPC request: topic, partition/namespace, key |
Event flow
sequenceDiagram
participant FSM as FSM (Apply)
participant Pub as EventPublisher
participant Topic as catalog.services topic
participant Server as subscribe_backend.go (gRPC)
participant Agent as submatview.Materializer
participant Reader as Local reader (cache, DNS, etc.)
FSM->>FSM: state.WriteTxn(svc updates)
FSM->>Pub: Publish(events)
Pub->>Topic: append events (per partition/key)
Note over Agent,Server: agent had earlier subscribed
Server->>Topic: read since last cursor
Topic-->>Server: events
Server-->>Agent: pbsubscribe.Event*
Agent->>Agent: apply events to typed view
Reader->>Agent: query view
Agent-->>Reader: cached snapshotWhen an agent first subscribes to a topic, the publisher emits a snapshot followed by ongoing events. The materializer collapses the snapshot into an in-memory representation (e.g., a map[ServiceName]ServiceList) and then applies each subsequent event.
If the stream is reset (network blip), the agent re-subscribes from scratch — the snapshot replays, and the consumer never sees a partial view.
Topics in use
| Topic | What it carries | Producer |
|---|---|---|
| Catalog services | Service registrations and health changes | agent/consul/catalog_events.go |
| Catalog mesh-gateway routing | Mesh-gateway addresses for each DC/peer | agent/consul/catalog_events.go |
| Config entries | Per-kind config entry mutations | agent/consul/config_entry_events.go |
| Connect CA roots | Root rotation events | agent/consul/connect_ca_events.go |
| ACL changes | Token/policy/role updates | agent/consul/acl_events.go |
| Peering events | Peering state and trust-bundle updates | agent/consul/state/peering.go |
Backpressure
Each subscription has a bounded buffer. If a consumer falls behind, the publisher closes the subscription and the agent must resubscribe (which incurs a snapshot replay). This trades steady-state efficiency for safety under overload.
Agent-side caches
agent/cache-types/ includes streaming-aware variants for the most active topics. The proxy config manager (agent/proxycfg/) goes through these so a hundred Connect proxies on a node share a single subscription.
Integration points
- V2 resource framework: the v2 storage adapter has its own
WatchListmechanism for resource-typed events. The two systems coexist; they don't replicate the same data. - Peering replication: the peer-stream service (
agent/grpc-external/services/peerstream/) builds on the same publisher infrastructure. - DNS and HTTP: indirectly benefit because the cache types they use are fed by streaming.
Entry points for modification
- Add a new topic: register it in the relevant
<thing>_events.go, define an event proto, and update theEventPublishersetup. - Add a new consumer: create a
submatview.Materializerwith a typed view, register a cache type that constructs it. - Tune backpressure: see
agent/consul/stream/event_publisher.go'sbufferSize/subscriptiondefaults.
Built by Factory AutoWiki from public repository content. It is a generated preview for codebase exploration, not source-maintained documentation.