distribute/transport/adapter

Transport Adapter contract and utilities.

This module defines the behaviour contract that all transport adapters must implement, along with helper functions for creating default options.

Adapter Contract

A transport adapter handles low-level sending and receiving of binary payloads between peers or groups. It provides:

Implementing an Adapter

To implement a custom adapter, create a module that provides a function returning a TransportAdapter record with all required functions:

pub fn new() -> TransportAdapter {
  TransportAdapter(
    start: my_start,
    stop: my_stop,
    send: my_send,
    broadcast: my_broadcast,
    subscribe: my_subscribe,
    unsubscribe: my_unsubscribe,
    health: my_health,
    metrics: my_metrics,
  )
}

See distribute/transport/beam_adapter for a reference implementation.

Types

Transport adapter behaviour contract.

All transport implementations must provide these 8 core functions.

  1. start - Initialize and start the adapter
  2. stop - Graceful shutdown with timeout
  3. send - Unicast to a single peer
  4. broadcast - Multicast to a group
  5. subscribe - Register callback for incoming messages
  6. unsubscribe - Remove a subscription
  7. health - Get current adapter health status
  8. metrics - Get adapter-specific metrics
pub type TransportAdapter {
  TransportAdapter(
    start: fn(types.AdapterOptions) -> Result(
      types.AdapterHandle,
      types.AdapterError,
    ),
    stop: fn(types.AdapterHandle, Int) -> Result(
      Nil,
      types.AdapterError,
    ),
    send: fn(
      types.AdapterHandle,
      String,
      BitArray,
      types.SendOptions,
    ) -> Result(Nil, types.SendError),
    broadcast: fn(
      types.AdapterHandle,
      String,
      BitArray,
      types.SendOptions,
    ) -> Result(Nil, types.SendError),
    subscribe: fn(
      types.AdapterHandle,
      fn(String, BitArray, types.MessageMetadata) -> Nil,
    ) -> Result(types.SubscriptionId, types.AdapterError),
    unsubscribe: fn(types.AdapterHandle, types.SubscriptionId) -> Result(
      Nil,
      types.AdapterError,
    ),
    health: fn(types.AdapterHandle) -> types.HealthStatus,
    metrics: fn(types.AdapterHandle) -> dict.Dict(
      String,
      dynamic.Dynamic,
    ),
  )
}

Constructors

  • TransportAdapter(
      start: fn(types.AdapterOptions) -> Result(
        types.AdapterHandle,
        types.AdapterError,
      ),
      stop: fn(types.AdapterHandle, Int) -> Result(
        Nil,
        types.AdapterError,
      ),
      send: fn(
        types.AdapterHandle,
        String,
        BitArray,
        types.SendOptions,
      ) -> Result(Nil, types.SendError),
      broadcast: fn(
        types.AdapterHandle,
        String,
        BitArray,
        types.SendOptions,
      ) -> Result(Nil, types.SendError),
      subscribe: fn(
        types.AdapterHandle,
        fn(String, BitArray, types.MessageMetadata) -> Nil,
      ) -> Result(types.SubscriptionId, types.AdapterError),
      unsubscribe: fn(types.AdapterHandle, types.SubscriptionId) -> Result(
        Nil,
        types.AdapterError,
      ),
      health: fn(types.AdapterHandle) -> types.HealthStatus,
      metrics: fn(types.AdapterHandle) -> dict.Dict(
        String,
        dynamic.Dynamic,
      ),
    )

    Arguments

    start

    Start the adapter with provided options. Returns an opaque handle for subsequent operations.

    Errors are fatal start errors that prevent adapter initialization.

    stop

    Stop the adapter gracefully within the timeout period (milliseconds).

    The adapter should attempt to flush in-flight messages and close connections cleanly. If graceful shutdown cannot complete within the timeout, the adapter should abort and return ShutdownTimeout error.

    send

    Send a binary payload to a single peer (unicast).

    Peer is an opaque identifier from the registry/discovery subsystem. This function should return quickly; implementations may queue sends asynchronously. Ok(Nil) indicates queuing success (not delivery).

    SendError classifications:

    • Transient (retryable): ConnectionClosed, Timeout, Backpressure
    • Permanent (do not retry): InvalidPeer, PayloadTooLarge, SerializationError
    • AdapterFailure: Internal error, escalate to registry
    broadcast

    Broadcast a binary payload to a logical group (multicast).

    Group is defined by the groups subsystem. Semantics are best-effort unless the adapter explicitly supports acknowledged broadcasts.

    Implementations should document delivery guarantees.

    subscribe

    Subscribe to incoming messages.

    Register a callback to receive (peer_id, payload, metadata) tuples. Returns a subscription ID for later unsubscription. Adapters can support multiple subscribers.

    unsubscribe

    Unsubscribe from incoming messages.

    Remove a previously registered subscription by ID.

    health

    Get current health status.

    Returns lightweight snapshot: Up, Degraded(reason), or Down(reason). Should include basic diagnostics without heavy computation.

    metrics

    Get adapter-specific metrics.

    Returns a map of metrics for observability integration. Common metrics: message_sent_count, message_received_count, error_count, last_error_timestamp_ms, active_connections.

Values

pub fn default_options(name: String) -> types.AdapterOptions

Create default adapter options.

Provides sensible defaults:

  • max_payload_bytes: 1MB
  • connect_timeout_ms: 5000
  • telemetry_prefix: “transport”
pub fn default_send_options() -> types.SendOptions

Create default send options.

Returns options suitable for most use cases:

  • No explicit timeout (adapter default applies)
  • Normal priority
  • Best-effort delivery
  • No correlation ID
Search Document