distribute/transport/adapter
Transport Adapter contract and utilities.
This module defines the behaviour contract that all transport adapters must implement, along with helper functions for creating default options.
Adapter Contract
A transport adapter handles low-level sending and receiving of binary payloads between peers or groups. It provides:
- Lifecycle management (start/stop)
- Unicast and broadcast messaging
- Subscription-based message reception
- Health monitoring and metrics
Implementing an Adapter
To implement a custom adapter, create a module that provides a function
returning a TransportAdapter record with all required functions:
pub fn new() -> TransportAdapter {
TransportAdapter(
start: my_start,
stop: my_stop,
send: my_send,
broadcast: my_broadcast,
subscribe: my_subscribe,
unsubscribe: my_unsubscribe,
health: my_health,
metrics: my_metrics,
)
}
See distribute/transport/beam_adapter for a reference implementation.
Types
Transport adapter behaviour contract.
All transport implementations must provide these 8 core functions.
- start - Initialize and start the adapter
- stop - Graceful shutdown with timeout
- send - Unicast to a single peer
- broadcast - Multicast to a group
- subscribe - Register callback for incoming messages
- unsubscribe - Remove a subscription
- health - Get current adapter health status
- metrics - Get adapter-specific metrics
pub type TransportAdapter {
TransportAdapter(
start: fn(types.AdapterOptions) -> Result(
types.AdapterHandle,
types.AdapterError,
),
stop: fn(types.AdapterHandle, Int) -> Result(
Nil,
types.AdapterError,
),
send: fn(
types.AdapterHandle,
String,
BitArray,
types.SendOptions,
) -> Result(Nil, types.SendError),
broadcast: fn(
types.AdapterHandle,
String,
BitArray,
types.SendOptions,
) -> Result(Nil, types.SendError),
subscribe: fn(
types.AdapterHandle,
fn(String, BitArray, types.MessageMetadata) -> Nil,
) -> Result(types.SubscriptionId, types.AdapterError),
unsubscribe: fn(types.AdapterHandle, types.SubscriptionId) -> Result(
Nil,
types.AdapterError,
),
health: fn(types.AdapterHandle) -> types.HealthStatus,
metrics: fn(types.AdapterHandle) -> dict.Dict(
String,
dynamic.Dynamic,
),
)
}
Constructors
-
TransportAdapter( start: fn(types.AdapterOptions) -> Result( types.AdapterHandle, types.AdapterError, ), stop: fn(types.AdapterHandle, Int) -> Result( Nil, types.AdapterError, ), send: fn( types.AdapterHandle, String, BitArray, types.SendOptions, ) -> Result(Nil, types.SendError), broadcast: fn( types.AdapterHandle, String, BitArray, types.SendOptions, ) -> Result(Nil, types.SendError), subscribe: fn( types.AdapterHandle, fn(String, BitArray, types.MessageMetadata) -> Nil, ) -> Result(types.SubscriptionId, types.AdapterError), unsubscribe: fn(types.AdapterHandle, types.SubscriptionId) -> Result( Nil, types.AdapterError, ), health: fn(types.AdapterHandle) -> types.HealthStatus, metrics: fn(types.AdapterHandle) -> dict.Dict( String, dynamic.Dynamic, ), )Arguments
- start
-
Start the adapter with provided options. Returns an opaque handle for subsequent operations.
Errors are fatal start errors that prevent adapter initialization.
- stop
-
Stop the adapter gracefully within the timeout period (milliseconds).
The adapter should attempt to flush in-flight messages and close connections cleanly. If graceful shutdown cannot complete within the timeout, the adapter should abort and return ShutdownTimeout error.
- send
-
Send a binary payload to a single peer (unicast).
Peer is an opaque identifier from the registry/discovery subsystem. This function should return quickly; implementations may queue sends asynchronously. Ok(Nil) indicates queuing success (not delivery).
SendError classifications:
- Transient (retryable): ConnectionClosed, Timeout, Backpressure
- Permanent (do not retry): InvalidPeer, PayloadTooLarge, SerializationError
- AdapterFailure: Internal error, escalate to registry
- broadcast
-
Broadcast a binary payload to a logical group (multicast).
Group is defined by the groups subsystem. Semantics are best-effort unless the adapter explicitly supports acknowledged broadcasts.
Implementations should document delivery guarantees.
- subscribe
-
Subscribe to incoming messages.
Register a callback to receive (peer_id, payload, metadata) tuples. Returns a subscription ID for later unsubscription. Adapters can support multiple subscribers.
- unsubscribe
-
Unsubscribe from incoming messages.
Remove a previously registered subscription by ID.
- health
-
Get current health status.
Returns lightweight snapshot: Up, Degraded(reason), or Down(reason). Should include basic diagnostics without heavy computation.
- metrics
-
Get adapter-specific metrics.
Returns a map of metrics for observability integration. Common metrics: message_sent_count, message_received_count, error_count, last_error_timestamp_ms, active_connections.
Values
pub fn default_options(name: String) -> types.AdapterOptions
Create default adapter options.
Provides sensible defaults:
- max_payload_bytes: 1MB
- connect_timeout_ms: 5000
- telemetry_prefix: “transport”
pub fn default_send_options() -> types.SendOptions
Create default send options.
Returns options suitable for most use cases:
- No explicit timeout (adapter default applies)
- Normal priority
- Best-effort delivery
- No correlation ID