distribute/transport/behaviour
Transport behaviour contract for distributed messaging.
This module defines the contract that transport adapters must implement to provide network communication in the distribute library. Transport adapters handle sending and receiving binary payloads between nodes in the cluster.
Design Philosophy
This behaviour follows distribute’s patterns:
- Pure function signatures (like
crypto/provider) - Actor-based implementations (like
registry/actor) - Integration with
process.Subject(BitArray)(likeglobal,messaging) - Capability negotiation support (like
handshake)
Implementation Approaches
Adapters can be implemented as:
- Actor-based: Use
gleam/otp/actor(recommended, seeregistry/actor) - Port/NIF-based: Wrap external transports
- Pure Erlang: Use Erlang distribution protocol
Integration with distribute
Transport adapters integrate with:
handshakemodule for capability negotiationcodecmodule for message encoding/decodingglobalfor GlobalSubject message deliverymessagingfor typed cross-node communication
Example Implementation
See examples/adapters/transport_tcp for a complete implementation.
Types
Acknowledgment message from receiver.
Sent by the receiver to confirm delivery and processing.
Variants
Ack: Message successfully received and processedNack: Message received but processing failed (with reason)
pub type AckMessage {
Ack(message_id: String)
Nack(message_id: String, reason: String)
}
Constructors
-
Ack(message_id: String) -
Nack(message_id: String, reason: String)
Result of waiting for acknowledgment.
pub type AckResult {
Acknowledged
NotAcknowledged(reason: String)
AckTimedOut
}
Constructors
-
Acknowledged -
NotAcknowledged(reason: String) -
AckTimedOut
Result of a broadcast operation with per-node outcomes.
Maps each node to its delivery result, allowing the caller to handle partial failures and retry individual nodes if needed.
pub type BroadcastResult =
dict.Dict(String, Result(Nil, TransportError))
Circuit breaker configuration.
Controls when a circuit opens/closes based on failure patterns.
Fields
failure_threshold: Number of consecutive failures before opening circuitsuccess_threshold: Number of consecutive successes in HalfOpen to close circuittimeout_ms: How long to wait in Open state before trying HalfOpenhalf_open_max_calls: Maximum concurrent calls allowed in HalfOpen state
pub type CircuitBreakerPolicy {
CircuitBreakerPolicy(
failure_threshold: Int,
success_threshold: Int,
timeout_ms: Int,
half_open_max_calls: Int,
)
}
Constructors
-
CircuitBreakerPolicy( failure_threshold: Int, success_threshold: Int, timeout_ms: Int, half_open_max_calls: Int, )
Circuit breaker state for a node.
Tracks failure rate and decides when to stop trying a failing node. Based on the circuit breaker pattern from resilience engineering.
States
Closed: Normal operation, requests flow throughOpen: Too many failures, requests are blockedHalfOpen: Testing if node recovered, limited requests allowed
pub type CircuitState {
Closed
Open(opened_at_ms: Int)
HalfOpen(test_started_at_ms: Int)
}
Constructors
-
Closed -
Open(opened_at_ms: Int) -
HalfOpen(test_started_at_ms: Int)
Delivery guarantee level for messages.
Controls the reliability semantics of message delivery.
Variants
AtMostOnce: Fire and forget, no ack required (fastest, may lose messages)AtLeastOnce: Wait for ack, retry on failure (may duplicate)ExactlyOnce: Idempotent delivery with deduplication (slowest, no duplicates)
pub type DeliveryGuarantee {
AtMostOnce
AtLeastOnce(ack_timeout_ms: Int, max_retries: Int)
ExactlyOnce(
ack_timeout_ms: Int,
max_retries: Int,
dedup_window_ms: Int,
)
}
Constructors
-
AtMostOnce -
AtLeastOnce(ack_timeout_ms: Int, max_retries: Int) -
ExactlyOnce( ack_timeout_ms: Int, max_retries: Int, dedup_window_ms: Int, )
Metadata associated with a delivered message.
Transport adapters may include additional information such as:
- Timestamps (“received_at”, “sent_at”)
- Routing information (“via_node”, “hops”)
- Transport-specific details (“protocol_version”, “compression”)
Keys and values are strings for simplicity and portability across adapters.
pub type DeliveryMetadata =
dict.Dict(String, String)
Fallback strategy when primary transport fails.
Allows graceful degradation by trying alternative transports.
Variants
NoFallback: Fail immediately if primary failsRetryPrimary: Only retry on the same transport (uses RetryPolicy)FailoverList: Try transports in order until one succeeds
pub type FallbackStrategy {
NoFallback
RetryPrimary(RetryPolicy)
FailoverList(fallback_transports: List(String))
}
Constructors
-
NoFallback -
RetryPrimary(RetryPolicy) -
FailoverList(fallback_transports: List(String))
Health status of a transport adapter.
Variants
Up: The transport is fully operationalDegraded: The transport is operational but experiencing issues (with description)Down: The transport is not operational (with reason)
Health checks should be lightweight and fast, suitable for frequent polling by monitoring systems.
pub type HealthStatus {
Up
Degraded(String)
Down(String)
}
Constructors
-
Up -
Degraded(String) -
Down(String)
Message delivered to the receiver subject.
Contains:
- Source node identifier
- Binary payload (to be decoded by the receiver)
- Delivery metadata
pub type IncomingMessage =
#(String, BitArray, dict.Dict(String, String))
Message identifier for acknowledgment tracking.
Unique identifier for each message sent, used to correlate acks. Format: “{node_id}{timestamp}{sequence}”
pub type MessageId =
String
Per-node circuit breaker state.
Tracks circuit state and recent outcomes for each node.
pub type NodeCircuitBreaker {
NodeCircuitBreaker(
state: CircuitState,
consecutive_failures: Int,
consecutive_successes: Int,
total_failures: Int,
total_successes: Int,
)
}
Constructors
-
NodeCircuitBreaker( state: CircuitState, consecutive_failures: Int, consecutive_successes: Int, total_failures: Int, total_successes: Int, )
Node identifier used across the distribute library.
Matches the NodeId from registry/behaviour and handshake modules.
Typically formatted as “node@host” (Erlang node naming convention).
This type alias ensures consistency across all distribute modules.
pub type NodeId =
String
State of a pending message awaiting acknowledgment.
pub type PendingMessage {
PendingMessage(
message_id: String,
node: String,
payload: BitArray,
sent_at_ms: Int,
retry_count: Int,
ack_timeout_ms: Int,
)
}
Constructors
-
PendingMessage( message_id: String, node: String, payload: BitArray, sent_at_ms: Int, retry_count: Int, ack_timeout_ms: Int, )
Retry policy for transport operations.
Controls how failed sends/broadcasts are retried with exponential backoff.
Fields
max_attempts: Maximum number of send attempts (1 = no retry)initial_backoff_ms: Initial backoff duration in millisecondsmax_backoff_ms: Maximum backoff duration (prevents runaway waits)
pub type RetryPolicy {
RetryPolicy(
max_attempts: Int,
initial_backoff_ms: Int,
max_backoff_ms: Int,
)
}
Constructors
-
RetryPolicy( max_attempts: Int, initial_backoff_ms: Int, max_backoff_ms: Int, )
Transport capabilities for negotiation during handshake.
Capabilities declare what features the transport supports, allowing nodes to negotiate compatible communication modes during connection setup.
Examples:
- Protocol versions (“tcp”, min: 1, max: 2)
- Compression support (“compression”, min: 1, max: 1)
- Encryption requirements (“encryption”, min: 1, max: 1)
pub type TransportCapability =
capability.Capability
Errors that can occur during transport operations.
Variants
AdapterFailure: Generic adapter failure with error descriptionInvalidNode: The specified node identifier is invalid or unknownConnectionClosed: The connection to the node was closedBackpressure: The transport is under load and cannot accept more messagesPayloadTooLarge: The message payload exceeds size limitsTimeout: The operation did not complete within the allowed timeShutdownTimeout: Graceful shutdown did not complete within the timeoutEncodeError: Failed to encode the message (wraps codec.EncodeError)
pub type TransportError {
AdapterFailure(String)
InvalidNode
ConnectionClosed
Backpressure
PayloadTooLarge
Timeout
ShutdownTimeout
EncodeError(codec.EncodeError)
}
Constructors
-
AdapterFailure(String) -
InvalidNode -
ConnectionClosed -
Backpressure -
PayloadTooLarge -
Timeout -
ShutdownTimeout -
EncodeError(codec.EncodeError)
Configuration options for transport initialization.
All fields are optional to allow flexibility in adapter implementations.
Fields
name: Human-readable identifier for this transport instancebind_address: Network address to bind to (e.g., “0.0.0.0”, “127.0.0.1”)port: Port number to listen onmax_payload_bytes: Maximum message payload size in bytesconnect_timeout_ms: Timeout for connection attemptsheartbeat_interval_ms: Interval for heartbeat messages (keeps connections alive)retry_policy: Retry policy for send/broadcast failurescircuit_breaker_policy: Circuit breaker configuration for per-node failure handlingfallback_strategy: Strategy for handling transport-level failuresdelivery_guarantee: Delivery semantics (at-most-once, at-least-once, exactly-once)capabilities: Transport capabilities for handshake negotiation
pub type TransportOpts {
TransportOpts(
name: option.Option(String),
bind_address: option.Option(String),
port: option.Option(Int),
max_payload_bytes: option.Option(Int),
connect_timeout_ms: option.Option(Int),
heartbeat_interval_ms: option.Option(Int),
retry_policy: option.Option(RetryPolicy),
circuit_breaker_policy: option.Option(CircuitBreakerPolicy),
fallback_strategy: option.Option(FallbackStrategy),
delivery_guarantee: option.Option(DeliveryGuarantee),
capabilities: List(capability.Capability),
)
}
Constructors
-
TransportOpts( name: option.Option(String), bind_address: option.Option(String), port: option.Option(Int), max_payload_bytes: option.Option(Int), connect_timeout_ms: option.Option(Int), heartbeat_interval_ms: option.Option(Int), retry_policy: option.Option(RetryPolicy), circuit_breaker_policy: option.Option(CircuitBreakerPolicy), fallback_strategy: option.Option(FallbackStrategy), delivery_guarantee: option.Option(DeliveryGuarantee), capabilities: List(capability.Capability), )
Opaque handle representing the internal state of a transport adapter.
Unlike crypto/provider which uses simple opaque types, transport adapters
typically need more complex state management. Following the registry/actor
pattern, implementations will usually wrap this in an actor with a command
message type.
Example (actor-based implementation):
pub type TransportCommand {
Send(NodeId, BitArray, Subject(Result(Nil, TransportError)))
Broadcast(List(NodeId), BitArray, Subject(Result(Nil, TransportError)))
GetHealth(Subject(HealthStatus))
}
pub type TransportState {
TransportState
}
Constructors
-
TransportState
Values
pub fn broadcast(
nodes: List(String),
payload: BitArray,
) -> Result(
dict.Dict(String, Result(Nil, TransportError)),
TransportError,
)
Broadcast a binary payload to multiple nodes.
Returns a BroadcastResult containing per-node delivery outcomes.
This allows the caller to detect and handle partial failures gracefully.
This is a pure function signature. Implementations will vary based on the transport adapter architecture (actor-based, synchronous, async).
The adapter may optimize this (e.g., multicast) but should ensure best-effort delivery to all reachable nodes.
Parameters
nodes: List of destination node identifierspayload: The message payload as a binary array
Returns
Ok(BroadcastResult): At least one node succeeded or some nodes failedError(TransportError): All nodes failed or transport is completely unavailable
The BroadcastResult is a dict.Dict(NodeId, Result(Nil, TransportError)) where:
Ok(Nil)= message delivered to that nodeError(err)= delivery to that node failed with reason
Behavior
- If all nodes fail: Return
Error(TransportError)(catastrophic failure) - If some nodes fail: Return
Ok(BroadcastResult)with mixed outcomes - If all nodes succeed: Return
Ok(BroadcastResult)with allOk(Nil)entries
Implementations should apply retry policies to each node independently, allowing fast nodes to complete before slow/failing nodes exhaust retries.
Example
let result = broadcast(["node2@host", "node3@host"], payload)
case result {
Ok(outcomes) ->
dict.each(outcomes, fn(node, res) {
case res {
Ok(Nil) -> io.println("Delivered to " <> node)
Error(err) -> io.println("Failed to " <> node)
}
})
Error(TransportError(_)) ->
// All nodes failed or transport is unavailable
io.println("Broadcast failed completely")
}
pub fn health() -> HealthStatus
Check the health status of the transport adapter.
This is a pure function signature for health checks. Should be lightweight and suitable for frequent polling by monitoring systems.
Returns
HealthStatus indicating the current health
pub fn init() -> TransportState
Initialize a new transport adapter state.
This is a stub - actual implementations will have their own initialization.
Following the crypto/provider pattern, this provides a minimal contract.
Actor-based adapters (recommended) should use start() or start_link()
instead, returning Result(Subject(Command), actor.StartError).
Returns
A placeholder TransportState. Real adapters ignore this.
Example (Actor Pattern - Recommended)
// In your adapter module:
pub fn start_link(
opts: behaviour.TransportOpts
) -> Result(process.Subject(TransportCommand), actor.StartError) {
actor.new(initial_state(opts))
|> actor.on_message(handle_message)
|> actor.start()
|> result.map(fn(s) { s.data })
}
pub fn metrics() -> dict.Dict(String, Int)
Retrieve adapter-specific operational metrics.
This is a pure function signature for metrics retrieval.
Returns
Dictionary of metric names to values
Common Metrics
Adapters typically include:
messages_sent: Total number of messages sentmessages_received: Total number of messages receivedbytes_sent: Total bytes transmittedbytes_received: Total bytes receivedconnections_active: Current number of active connectionssend_errors: Number of send failuresrecv_errors: Number of receive failures
pub fn send(
node: String,
payload: BitArray,
) -> Result(Nil, TransportError)
Send a binary payload to a specific node.
This is a pure function signature following the crypto/provider pattern.
Actual implementations will vary:
- Actor-based (recommended): Send command to transport actor via Subject
- Synchronous: Call Erlang/NIF function directly
- Async: Queue message and return immediately
The delivery semantics (at-most-once, at-least-once, exactly-once) are adapter-specific and should be documented by each implementation.
Parameters
node: Identifier of the destination node (e.g., “node@host”)payload: The message payload as a binary array
Returns
Result(Nil, TransportError) indicating success or failure
Errors
InvalidNode: The node identifier is invalid or unknownConnectionClosed: The connection to the node is closedPayloadTooLarge: The payload exceeds size limitsBackpressure: The transport is under loadTimeout: The send operation timed out
Example (Actor-based implementation)
pub fn send(
transport: Subject(TransportCommand),
node: NodeId,
payload: BitArray
) -> Result(Nil, TransportError) {
let reply_subject = process.new_subject()
process.send(transport, Send(node, payload, reply_subject))
case process.receive(reply_subject, 5000) {
Ok(result) -> result
Error(_) -> Error(Timeout)
}
}