Open-Source Wikis

/

Consul

/

Systems

/

Streaming and materialized views

hashicorp/consul

Streaming and materialized views

Most Consul reads used to be polling: the agent issued a blocking RPC, waited until the data changed, then re-issued. With many agents and lots of subscribers, this generates a lot of leader load. The streaming subsystem replaces polling with a one-way push channel of typed events, and the agent rebuilds local materialized views from those events.

Purpose

  • Server-side: publish events on every state-store mutation that downstream consumers might care about.
  • Wire: transport events over a long-lived gRPC stream with per-topic backpressure.
  • Agent-side: consume events and maintain a materialized view of the same data; serve cached reads from that view.

The result: regardless of how many local clients query the catalog, the agent maintains a single subscription per topic, and the server fans out only the deltas.

Directory layout

agent/consul/
├── stream/                   # The event bus and per-topic publishers
│   ├── event_publisher.go
│   ├── subscription.go
│   ├── snapshot.go
│   ├── ...
├── subscribe_backend.go      # gRPC handler for the streaming subscribe service
├── catalog_events.go         # Catalog event encoders + topic registration
├── config_entry_events.go    # Config-entry event encoders
├── acl_events.go             # ACL event encoders
├── connect_ca_events.go      # Connect CA event encoders
agent/submatview/            # Agent-side materialized view machinery
proto/private/pbsubscribe/   # The wire format

Key abstractions

Type File Purpose
EventPublisher agent/consul/stream/event_publisher.go Owns topic registries; publishes events from FSM apply hooks
Topic agent/consul/stream/topic.go Identifies a stream of events (e.g., catalog.services, connect.roots)
Snapshot / SnapshotFunc agent/consul/stream/snapshot.go Initial state burst when a subscriber attaches
Subscription agent/consul/stream/subscription.go Per-subscriber cursor with backpressure
submatview.Materializer agent/submatview/materializer.go Agent-side: applies events to a typed in-memory view
submatview.Store agent/submatview/store.go Pool of materializers keyed by request
pbsubscribe.SubscribeRequest proto/private/pbsubscribe/subscribe.proto The gRPC request: topic, partition/namespace, key

Event flow

sequenceDiagram
    participant FSM as FSM (Apply)
    participant Pub as EventPublisher
    participant Topic as catalog.services topic
    participant Server as subscribe_backend.go (gRPC)
    participant Agent as submatview.Materializer
    participant Reader as Local reader (cache, DNS, etc.)

    FSM->>FSM: state.WriteTxn(svc updates)
    FSM->>Pub: Publish(events)
    Pub->>Topic: append events (per partition/key)
    Note over Agent,Server: agent had earlier subscribed
    Server->>Topic: read since last cursor
    Topic-->>Server: events
    Server-->>Agent: pbsubscribe.Event*
    Agent->>Agent: apply events to typed view
    Reader->>Agent: query view
    Agent-->>Reader: cached snapshot

When an agent first subscribes to a topic, the publisher emits a snapshot followed by ongoing events. The materializer collapses the snapshot into an in-memory representation (e.g., a map[ServiceName]ServiceList) and then applies each subsequent event.

If the stream is reset (network blip), the agent re-subscribes from scratch — the snapshot replays, and the consumer never sees a partial view.

Topics in use

Topic What it carries Producer
Catalog services Service registrations and health changes agent/consul/catalog_events.go
Catalog mesh-gateway routing Mesh-gateway addresses for each DC/peer agent/consul/catalog_events.go
Config entries Per-kind config entry mutations agent/consul/config_entry_events.go
Connect CA roots Root rotation events agent/consul/connect_ca_events.go
ACL changes Token/policy/role updates agent/consul/acl_events.go
Peering events Peering state and trust-bundle updates agent/consul/state/peering.go

Backpressure

Each subscription has a bounded buffer. If a consumer falls behind, the publisher closes the subscription and the agent must resubscribe (which incurs a snapshot replay). This trades steady-state efficiency for safety under overload.

Agent-side caches

agent/cache-types/ includes streaming-aware variants for the most active topics. The proxy config manager (agent/proxycfg/) goes through these so a hundred Connect proxies on a node share a single subscription.

Integration points

  • V2 resource framework: the v2 storage adapter has its own WatchList mechanism for resource-typed events. The two systems coexist; they don't replicate the same data.
  • Peering replication: the peer-stream service (agent/grpc-external/services/peerstream/) builds on the same publisher infrastructure.
  • DNS and HTTP: indirectly benefit because the cache types they use are fed by streaming.

Entry points for modification

  • Add a new topic: register it in the relevant <thing>_events.go, define an event proto, and update the EventPublisher setup.
  • Add a new consumer: create a submatview.Materializer with a typed view, register a cache type that constructs it.
  • Tune backpressure: see agent/consul/stream/event_publisher.go's bufferSize/subscription defaults.

Built by Factory AutoWiki from public repository content. It is a generated preview for codebase exploration, not source-maintained documentation.

Streaming and materialized views – Consul wiki | Factory