distribute/transport

High-level transport API for the distribute library.

This module provides a unified facade for the transport layer, managing the default BEAM distribution adapter as a singleton. Other modules (messaging, handshake) can use these functions without explicitly passing adapter handles.

Architecture

The transport layer is designed as a pluggable system:

Usage

Start the transport as part of your supervision tree:

import distribute/transport
import gleam/otp/static_supervisor as supervisor

pub fn start_app() {
  supervisor.new(supervisor.OneForOne)
  |> supervisor.add(transport.child_spec())
  |> supervisor.start()
}

Then use the transport functions:

// Send a message
transport.send("my_process", <<1, 2, 3>>, adapter.default_send_options())

// Check health
case transport.health() {
  types.Up -> io.println("Transport is healthy")
  types.Degraded(reason) -> io.println("Degraded: " <> reason)
  types.Down(reason) -> io.println("Down: " <> reason)
}

Values

pub fn broadcast(
  group: String,
  payload: BitArray,
  opts: types.SendOptions,
) -> Result(Nil, types.SendError)

Broadcast a message to all members of a group.

Group membership is managed by the adapter. The special group "all" broadcasts to all known peers.

Example

transport.broadcast("workers", payload, opts)
pub fn child_spec() -> supervision.ChildSpecification(
  types.AdapterHandle,
)

Create a child specification for starting the transport adapter under a supervisor.

This is the recommended way to start the transport layer. The adapter will be automatically restarted if it crashes.

Example

import distribute/transport
import gleam/otp/static_supervisor as supervisor

supervisor.new(supervisor.OneForOne)
|> supervisor.add(transport.child_spec())
|> supervisor.start()
pub const default_adapter_name: String

The registered name for the default transport adapter process.

This name is used to look up the singleton adapter instance. You typically don’t need to use this directly - the functions in this module handle the lookup automatically.

pub fn health() -> types.HealthStatus

Get the health status of the default transport.

Returns:

  • Up - Transport is fully operational
  • Degraded(reason) - Transport is working but with issues
  • Down(reason) - Transport is not operational

Example

case transport.health() {
  types.Up -> io.println("All good!")
  types.Degraded(r) -> io.println("Warning: " <> r)
  types.Down(r) -> io.println("Error: " <> r)
}
pub fn send(
  peer: String,
  payload: BitArray,
  opts: types.SendOptions,
) -> Result(Nil, types.SendError)

Send a message to a peer using the default transport.

The peer can be a local registered process name (e.g., "my_process") or a remote process ("my_process@node").

Example

let payload = <<"hello">>
let opts = adapter.default_send_options()
transport.send("calculator", payload, opts)
pub fn start_link() -> Result(
  types.AdapterHandle,
  types.AdapterError,
)

Start the transport adapter directly without supervision.

Note: For production use, prefer child_spec() with a supervisor. This function is primarily useful for testing or simple scripts.

Returns the adapter handle on success, or an error if startup fails. Uses a default retry policy with 3 attempts and exponential backoff.

pub fn start_link_with_retry(
  policy: retry.RetryPolicy,
) -> Result(types.AdapterHandle, types.AdapterError)

Start the transport adapter with a custom retry policy.

This function will retry the startup operation according to the provided policy, with exponential backoff and optional jitter between attempts.

Example

import distribute/transport
import distribute/retry

// Aggressive retry for critical systems
let policy = retry.aggressive()
let assert Ok(handle) = transport.start_link_with_retry(policy)

// Or with custom settings
let custom = retry.default_with_jitter()
  |> retry.with_max_attempts(5)
  |> retry.with_base_delay_ms(200)
let assert Ok(handle) = transport.start_link_with_retry(custom)
pub fn subscribe(
  callback: fn(String, BitArray, types.MessageMetadata) -> Nil,
) -> Result(types.SubscriptionId, types.AdapterError)

Subscribe to incoming messages on the default transport.

The callback will be invoked for each message received by the adapter. Returns a subscription ID that can be used to unsubscribe later.

Example

let callback = fn(from, payload) {
  io.println("Received from: " <> from)
}
let assert Ok(sub_id) = transport.subscribe(callback)
pub fn unsubscribe(
  id: types.SubscriptionId,
) -> Result(Nil, types.AdapterError)

Unsubscribe from the default transport.

After calling this function, the callback associated with the given subscription ID will no longer receive messages.

Search Document