distribute/transport/internal/circuit_breaker
Circuit breaker implementation for per-node failure handling.
This module provides a type-safe circuit breaker pattern following Gleam/OTP best practices. It tracks per-node failures and automatically stops sending to consistently failing nodes.
Design
- Pure functions for state transitions
- Exhaustive pattern matching
- Type-safe Result handling
- No side effects; caller decides when to persist state
Circuit States
- Closed: Normal operation, all requests flow through
- Open: Too many failures, requests are rejected immediately
- HalfOpen: Testing recovery, limited requests allowed
Integration
Circuit breakers integrate with transport retry logic:
- Before send: Check if circuit allows the request
- After send: Record outcome to update circuit state
- Periodic health checks can reset circuits
Example
let policy = default_policy()
let breaker = new_breaker()
// Check before sending
case should_allow_request(breaker, policy) {
True -> {
case send_to_node(node, payload) {
Ok(_) -> record_success(breaker, policy)
Error(_) -> record_failure(breaker, policy)
}
}
False -> Error(CircuitOpen)
}
Types
Circuit breaker policy configuration.
pub type CircuitBreakerPolicy {
CircuitBreakerPolicy(
failure_threshold: Int,
success_threshold: Int,
timeout_ms: Int,
half_open_max_calls: Int,
)
}
Constructors
-
CircuitBreakerPolicy( failure_threshold: Int, success_threshold: Int, timeout_ms: Int, half_open_max_calls: Int, )
Per-node circuit breaker registry.
Maps node IDs to their circuit breaker state.
pub type CircuitBreakerRegistry =
dict.Dict(String, NodeCircuitBreaker)
Circuit breaker state.
pub type CircuitState {
Closed
Open(opened_at_ms: Int)
HalfOpen
}
Constructors
-
ClosedNormal operation - all requests allowed
-
Open(opened_at_ms: Int)Too many failures - requests rejected
-
HalfOpenTesting recovery - limited requests allowed
Per-node circuit breaker state.
pub type NodeCircuitBreaker {
NodeCircuitBreaker(
state: CircuitState,
consecutive_failures: Int,
consecutive_successes: Int,
total_failures: Int,
total_successes: Int,
)
}
Constructors
-
NodeCircuitBreaker( state: CircuitState, consecutive_failures: Int, consecutive_successes: Int, total_failures: Int, total_successes: Int, )
Values
pub fn default_policy() -> CircuitBreakerPolicy
Default circuit breaker policy.
Conservative settings:
- Opens after 5 consecutive failures
- Closes after 2 consecutive successes in HalfOpen
- Waits 30 seconds before trying HalfOpen
- Allows 1 test request in HalfOpen
pub fn get_metrics(
breaker: NodeCircuitBreaker,
) -> dict.Dict(String, Int)
Get circuit breaker metrics for monitoring.
pub fn get_or_create(
registry: dict.Dict(String, NodeCircuitBreaker),
node: String,
) -> #(NodeCircuitBreaker, dict.Dict(String, NodeCircuitBreaker))
Get or create a circuit breaker for a node.
pub fn new_breaker() -> NodeCircuitBreaker
Create a new circuit breaker for a node.
Starts in Closed state with zero failures.
pub fn record_failure(
breaker: NodeCircuitBreaker,
policy: CircuitBreakerPolicy,
) -> NodeCircuitBreaker
Record a failed request outcome.
Updates circuit state based on policy:
Closed: Increment failure counter; open if threshold reachedHalfOpen: Increment failure counter; reopen circuitOpen: Increment failure counter (already open)
pub fn record_success(
breaker: NodeCircuitBreaker,
policy: CircuitBreakerPolicy,
) -> NodeCircuitBreaker
Record a successful request outcome.
Updates circuit state based on policy:
Closed: Increment success counter, reset failure counterHalfOpen: Increment success counter; close if threshold reachedOpen: Should not happen (requests blocked)
pub fn reset(
registry: dict.Dict(String, NodeCircuitBreaker),
node: String,
) -> dict.Dict(String, NodeCircuitBreaker)
Reset a circuit breaker to initial state.
Useful for manual recovery or health check-triggered resets.
pub fn should_allow_request(
breaker: NodeCircuitBreaker,
policy: CircuitBreakerPolicy,
) -> Bool
Check if a request should be allowed through the circuit breaker.
Returns True if the request can proceed, False if circuit is open.
Logic
Closed: Always allowOpen: Check if timeout expired; if so, transition to HalfOpenHalfOpen: Allow limited requests (controlled by policy)
pub fn update(
registry: dict.Dict(String, NodeCircuitBreaker),
node: String,
breaker: NodeCircuitBreaker,
) -> dict.Dict(String, NodeCircuitBreaker)
Update a circuit breaker in the registry.