distribute/transport/internal/circuit_breaker

Circuit breaker implementation for per-node failure handling.

This module provides a type-safe circuit breaker pattern following Gleam/OTP best practices. It tracks per-node failures and automatically stops sending to consistently failing nodes.

Design

Circuit States

  1. Closed: Normal operation, all requests flow through
  2. Open: Too many failures, requests are rejected immediately
  3. HalfOpen: Testing recovery, limited requests allowed

Integration

Circuit breakers integrate with transport retry logic:

Example

let policy = default_policy()
let breaker = new_breaker()

// Check before sending
case should_allow_request(breaker, policy) {
  True -> {
    case send_to_node(node, payload) {
      Ok(_) -> record_success(breaker, policy)
      Error(_) -> record_failure(breaker, policy)
    }
  }
  False -> Error(CircuitOpen)
}

Types

Circuit breaker policy configuration.

pub type CircuitBreakerPolicy {
  CircuitBreakerPolicy(
    failure_threshold: Int,
    success_threshold: Int,
    timeout_ms: Int,
    half_open_max_calls: Int,
  )
}

Constructors

  • CircuitBreakerPolicy(
      failure_threshold: Int,
      success_threshold: Int,
      timeout_ms: Int,
      half_open_max_calls: Int,
    )

Per-node circuit breaker registry.

Maps node IDs to their circuit breaker state.

pub type CircuitBreakerRegistry =
  dict.Dict(String, NodeCircuitBreaker)

Circuit breaker state.

pub type CircuitState {
  Closed
  Open(opened_at_ms: Int)
  HalfOpen
}

Constructors

  • Closed

    Normal operation - all requests allowed

  • Open(opened_at_ms: Int)

    Too many failures - requests rejected

  • HalfOpen

    Testing recovery - limited requests allowed

Per-node circuit breaker state.

pub type NodeCircuitBreaker {
  NodeCircuitBreaker(
    state: CircuitState,
    consecutive_failures: Int,
    consecutive_successes: Int,
    total_failures: Int,
    total_successes: Int,
  )
}

Constructors

  • NodeCircuitBreaker(
      state: CircuitState,
      consecutive_failures: Int,
      consecutive_successes: Int,
      total_failures: Int,
      total_successes: Int,
    )

Values

pub fn default_policy() -> CircuitBreakerPolicy

Default circuit breaker policy.

Conservative settings:

  • Opens after 5 consecutive failures
  • Closes after 2 consecutive successes in HalfOpen
  • Waits 30 seconds before trying HalfOpen
  • Allows 1 test request in HalfOpen
pub fn get_metrics(
  breaker: NodeCircuitBreaker,
) -> dict.Dict(String, Int)

Get circuit breaker metrics for monitoring.

pub fn get_or_create(
  registry: dict.Dict(String, NodeCircuitBreaker),
  node: String,
) -> #(NodeCircuitBreaker, dict.Dict(String, NodeCircuitBreaker))

Get or create a circuit breaker for a node.

pub fn new_breaker() -> NodeCircuitBreaker

Create a new circuit breaker for a node.

Starts in Closed state with zero failures.

pub fn record_failure(
  breaker: NodeCircuitBreaker,
  policy: CircuitBreakerPolicy,
) -> NodeCircuitBreaker

Record a failed request outcome.

Updates circuit state based on policy:

  • Closed: Increment failure counter; open if threshold reached
  • HalfOpen: Increment failure counter; reopen circuit
  • Open: Increment failure counter (already open)
pub fn record_success(
  breaker: NodeCircuitBreaker,
  policy: CircuitBreakerPolicy,
) -> NodeCircuitBreaker

Record a successful request outcome.

Updates circuit state based on policy:

  • Closed: Increment success counter, reset failure counter
  • HalfOpen: Increment success counter; close if threshold reached
  • Open: Should not happen (requests blocked)
pub fn reset(
  registry: dict.Dict(String, NodeCircuitBreaker),
  node: String,
) -> dict.Dict(String, NodeCircuitBreaker)

Reset a circuit breaker to initial state.

Useful for manual recovery or health check-triggered resets.

pub fn should_allow_request(
  breaker: NodeCircuitBreaker,
  policy: CircuitBreakerPolicy,
) -> Bool

Check if a request should be allowed through the circuit breaker.

Returns True if the request can proceed, False if circuit is open.

Logic

  • Closed: Always allow
  • Open: Check if timeout expired; if so, transition to HalfOpen
  • HalfOpen: Allow limited requests (controlled by policy)
pub fn update(
  registry: dict.Dict(String, NodeCircuitBreaker),
  node: String,
  breaker: NodeCircuitBreaker,
) -> dict.Dict(String, NodeCircuitBreaker)

Update a circuit breaker in the registry.

Search Document