hashicorp/consul
Server and Raft
A Consul server is an agent with extra responsibilities: it participates in a Raft consensus group, hosts the authoritative state store, services RPC requests, and runs leader-only background loops. All of this lives under agent/consul/.
Purpose
agent/consul/server.go (68 KB) defines the Server struct. A Server embeds the same RPC handlers as a client agent but adds:
- A
*raft.Raftinstance backed byraft-boltdborraft-wal. - The MemDB-backed state store (
agent/consul/state/). - The Raft FSM (
agent/consul/fsm/) that applies replicated log entries to MemDB. - Per-leader goroutines (CA rotation, federation state AE, intention migration, peering replication, ACL replication, ...) in
agent/consul/leader*.go. - The internal gRPC server (
agent/grpc-internal/) and the v2 resource service. - The Serf cluster handlers for both LAN (per-DC) and WAN (federation) pools.
Directory layout (server-only highlights)
agent/consul/
├── server.go # The Server struct (68 KB)
├── client.go # Client-mode counterpart that forwards to servers
├── server_serf.go # LAN/WAN Serf membership and event handling
├── client_serf.go # Same for client agents
├── leader.go # Top-level leader run loop (40 KB)
├── leader_*.go # Per-feature leader loops (CA, federation, intentions, peering, ACL)
├── raft_handle.go # Lifetime + apply helpers for the *raft.Raft instance
├── raft_rpc.go # Wires Raft transport over the existing TCP RPC port
├── *_endpoint.go # net/rpc endpoints (catalog, health, kvs, acl, intention, ca, ...)
├── server_grpc.go # Registers internal + external gRPC services
├── fsm/ # Raft FSM (Apply, Snapshot, Restore)
├── state/ # MemDB schema + per-table accessors (~50 files)
├── stream/ # Streaming subscribe event bus
├── usagemetrics/ # License/usage telemetry
├── rate/, multilimiter/ # RPC rate limiting + token-bucket primitives
├── reporting/ # Census reporting for license metering
├── autopilot.go # Autopilot integration
├── autopilotevents/ # Notification stream of autopilot health events
├── gateways/ # Gateway controller + state replication
├── controller/ # Older controller framework (pre-internal/controller)
├── discoverychain/ # Discovery chain compiler
├── peering_backend.go # Server-side cluster peering RPCs
├── prepared_query/ # Prepared query helpers
├── servercert/ # Certificate distribution to servers
├── stream/ # Streaming subscribe machinery
├── wanfed/ # WAN federation (cross-DC routing)
├── xdscapacity/ # xDS capacity tracker for proxy load balancing across servers
└── ...Key abstractions
| Type | File | Purpose |
|---|---|---|
Server |
agent/consul/server.go |
The server delegate: holds Raft, FSM, state store, all endpoints |
Client |
agent/consul/client.go |
The client delegate: forwards RPCs to a server via the connection pool |
FSM |
agent/consul/fsm/fsm.go |
Applies Raft log entries; restores from snapshot |
state.Store |
agent/consul/state/state_store.go |
MemDB wrapper; per-table accessor methods are on this type |
state.Restore |
agent/consul/state/state_store.go |
Bulk loader used by snapshot restore |
consul.RPCQueryFn |
agent/consul/rpc.go |
Generic blocking-query helper used by every read endpoint |
<X>Endpoint structs |
agent/consul/<x>_endpoint.go |
Registered via rpcServer.RegisterName; method names form the RPC verb |
leader.Loop |
agent/consul/leader.go |
Top-level supervisor for leader-only goroutines |
Server.handleConsulConn |
agent/consul/rpc.go |
Multiplexes incoming TCP connections into RPC, raft, gRPC, or TLS-wrapped variants |
Server.handleAcceptedConn |
agent/consul/rpc.go |
Per-connection codec dispatch |
*memdb.MemDB schema |
agent/consul/state/schema.go |
Defines every table, index, and tombstone behavior |
Read and write paths
sequenceDiagram
participant Caller as Client agent
participant Follower as Server (follower)
participant Leader as Server (leader)
participant FSM as FSM/Apply
participant State as MemDB
Caller->>Follower: net/rpc Foo.Apply (write)
Follower->>Leader: forward via pool/RPC
Leader->>Leader: encode raft log entry
Leader->>FSM: raft.Apply()
FSM->>State: WriteTxn().Insert/Update/Delete
State-->>FSM: index +1
FSM-->>Leader: result
Leader-->>Follower: reply
Follower-->>Caller: ok
Caller->>Follower: net/rpc Foo.Get (blocking)
Follower->>State: blockingQuery(state, key, index)
State-->>Follower: latest index, value
Follower-->>Caller: payload + X-Consul-IndexFor high-fanout reads, the streaming subscribe path (agent/consul/stream/ + agent/consul/subscribe_backend.go) replaces the polling loop on the agent side; see Streaming and materialized views.
State store
agent/consul/state/ builds a *memdb.MemDB whose schema is registered table-by-table:
catalog.go—nodes,services,service_kinds,health_checks,gateway_services, ...kvs.go,kvs_ce.go—kvs,tombstones,sessionsacl.go,acl_schema.go—acl-tokens,acl-policies,acl-roles,acl-auth-methods,acl-binding-rulesconfig_entry.go—config-entries(one indexed table for every kind)connect_ca.go—connect-ca-config,connect-ca-roots,connect-ca-leaf,connect-ca-builtinpeering.go—peering,peering-trust-bundlesintention.go— legacy intention table (now mostly served fromservice-intentionsconfig entries)prepared_query.go,coordinate.go,federation_state.go,system_metadata.go, ...
Each table has a <Thing>Entry, indexes for primary key + UUIDs + secondary lookups, and a tombstone strategy. Most read methods take a memdb.WatchSet so blocking queries can register interest and re-run when the table mutates.
graveyard.go collects tombstones for KV and prepared-query deletions; the GC runs from tombstone_gc.go.
FSM and Raft
The FSM (agent/consul/fsm/fsm.go) implements raft.FSM:
Apply(*raft.Log)— decodes the msgpack log entry, dispatches byMessageType(e.g.,RegisterRequestType,KVSRequestType,ACLTokenSetRequestType), and calls the matchingcommands_ce.gohandler that mutates the state store.Snapshot()— returns araft.FSMSnapshotthat streams the entire state store to araft.SnapshotSink.Restore(io.ReadCloser)— re-creates the state store from a snapshot stream viastate.Restore.
commands_ce.go is the dispatch table mapping every replicated message type to a state-store mutation. decode_downgrade.go handles cross-version decode where field renames or removals occurred.
snapshot_ce.go defines the persisted format. The same format is used by consul snapshot save and consul snapshot restore (see snapshot/archive.go).
RPC endpoints
Each *_endpoint.go file in agent/consul/ defines a struct (e.g., KVS, Catalog, Health, Intention, ACL, ConnectCA, PreparedQuery, ConfigEntry, Peering, Snapshot, Operator, Txn) registered with the underlying net/rpc server. Each method is a blocking RPC of the form:
func (s *Foo) Get(args *FooGetArgs, reply *FooGetReply) errorRead endpoints typically delegate to s.srv.blockingQuery(...). Write endpoints validate, ACL-check, then call s.srv.raftApply(MessageType, request) which encodes and applies the log entry.
There are 30+ endpoint structs. The full list:
| Endpoint | File |
|---|---|
Catalog |
catalog_endpoint.go |
Health |
health_endpoint.go |
KVS |
kvs_endpoint.go |
Session |
session_endpoint.go |
ACL |
acl_endpoint.go (65 KB) |
ConnectCA |
connect_ca_endpoint.go |
Intention |
intention_endpoint.go |
Internal |
internal_endpoint.go |
ConfigEntry |
config_endpoint.go |
Coordinate |
coordinate_endpoint.go |
DiscoveryChain |
discovery_chain_endpoint.go |
FederationState |
federation_state_endpoint.go |
Operator |
operator_endpoint.go + operator_*_endpoint.go |
OperatorRaft |
operator_raft_endpoint.go |
OperatorAutopilot |
operator_autopilot_endpoint.go |
OperatorUsage |
operator_usage_endpoint.go |
Peering |
peering_backend.go |
PreparedQuery |
prepared_query_endpoint.go |
Snapshot |
snapshot_endpoint.go |
Status |
status_endpoint.go |
Txn |
txn_endpoint.go |
AutoConfig |
auto_config_endpoint.go |
AutoEncrypt |
auto_encrypt_endpoint.go |
Most of these have CE / Enterprise variants with _ce.go files providing the open-source path.
Leader loops
When a server wins a Raft election, leader.go starts the goroutines that should only run on a leader:
| File | What it does |
|---|---|
leader_connect.go, leader_connect_ca.go |
Initialize Connect CA, rotate CA certs, cross-sign provider transitions |
leader_intentions.go |
Migrate legacy intentions into service-intentions config entries |
leader_peering.go |
Run peering replication, secret rotation, exported services |
leader_federation_state_ae.go |
Periodically push federation state across WAN |
leader_metrics.go |
Record cluster-wide gauges (config-entry count, intention count, ...) |
leader_log_verification.go |
Optional Raft log verification |
leader_registrator_v1.go |
V1 catalog ↔ V2 resource bridging |
Every loop watches s.shutdownCh and the leadership status, and exits cleanly when the server steps down.
Internal gRPC
server_grpc.go registers the gRPC services on the same TCP port as RPC by sniffing the connection prefix in agent/consul/rpc.go. Services include:
- The streaming subscribe backend (
agent/consul/subscribe_backend.go+agent/consul/stream/). - The peering control plane.
- The v2 resource service (defined in
proto-public/pbresource/). - Server discovery and dataplane services.
Snapshots
agent/consul/fsm/snapshot.go and snapshot/archive.go together produce a tar-style archive containing:
- The Raft snapshot (FSM state).
- Metadata (Raft configuration, peer set).
- A SHA256 manifest.
snapshot/ is consumed by consul snapshot save (which calls the Snapshot.Save RPC) and consul snapshot restore (which calls Snapshot.Restore).
Integration points
- Connection pool:
agent/pool/keeps long-lived TLS connections to peers; clients use it to reach servers and servers use it to reach each other across DCs. - Router:
agent/router/selects which server (and which DC area) to send a given RPC to. It learns about live servers from Serf events. - Metrics: every endpoint emits
consul.rpc.query.*andconsul.fsm.*histograms used by ops dashboards.
Entry points for modification
- To add a Raft message type: define the request struct in
agent/structs/, add a newMessageTypeconstant, register the apply handler inagent/consul/fsm/commands_ce.go, and emit it from a new endpoint method vias.raftApply. - To add a state-store table: define schemas in
agent/consul/state/schema*.go, add accessors in a new<thing>.go, and updatestate_store.goif the table needs special boot behavior. - To add a leader loop: create
agent/consul/leader_<feature>.gowith a singlefunc (s *Server) <feature>Loop(ctx context.Context)and start it inleader.go. - To wire a new gRPC service: create the handler under
agent/grpc-internal/services/<x>(oragent/grpc-external/services/<x>for public), register it inserver_grpc.go.
Built by Factory AutoWiki from public repository content. It is a generated preview for codebase exploration, not source-maintained documentation.