distribute/transport
High-level transport API for the distribute library.
This module provides a unified facade for the transport layer, managing the default BEAM distribution adapter as a singleton. Other modules (messaging, handshake) can use these functions without explicitly passing adapter handles.
Architecture
The transport layer is designed as a pluggable system:
transport/types- Shared type definitionstransport/behaviour- Adapter contract (behaviour)transport/adapter- Adapter utilities and defaultstransport/beam_adapter- BEAM distribution implementationtransport(this module) - High-level singleton facade
Usage
Start the transport as part of your supervision tree:
import distribute/transport
import gleam/otp/static_supervisor as supervisor
pub fn start_app() {
supervisor.new(supervisor.OneForOne)
|> supervisor.add(transport.child_spec())
|> supervisor.start()
}
Then use the transport functions:
// Send a message
transport.send("my_process", <<1, 2, 3>>, adapter.default_send_options())
// Check health
case transport.health() {
types.Up -> io.println("Transport is healthy")
types.Degraded(reason) -> io.println("Degraded: " <> reason)
types.Down(reason) -> io.println("Down: " <> reason)
}
Values
pub fn broadcast(
group: String,
payload: BitArray,
opts: types.SendOptions,
) -> Result(Nil, types.SendError)
Broadcast a message to all members of a group.
Group membership is managed by the adapter. The special group "all"
broadcasts to all known peers.
Example
transport.broadcast("workers", payload, opts)
pub fn child_spec() -> supervision.ChildSpecification(
types.AdapterHandle,
)
Create a child specification for starting the transport adapter under a supervisor.
This is the recommended way to start the transport layer. The adapter will be automatically restarted if it crashes.
Example
import distribute/transport
import gleam/otp/static_supervisor as supervisor
supervisor.new(supervisor.OneForOne)
|> supervisor.add(transport.child_spec())
|> supervisor.start()
pub const default_adapter_name: String
The registered name for the default transport adapter process.
This name is used to look up the singleton adapter instance. You typically don’t need to use this directly - the functions in this module handle the lookup automatically.
pub fn health() -> types.HealthStatus
Get the health status of the default transport.
Returns:
Up- Transport is fully operationalDegraded(reason)- Transport is working but with issuesDown(reason)- Transport is not operational
Example
case transport.health() {
types.Up -> io.println("All good!")
types.Degraded(r) -> io.println("Warning: " <> r)
types.Down(r) -> io.println("Error: " <> r)
}
pub fn send(
peer: String,
payload: BitArray,
opts: types.SendOptions,
) -> Result(Nil, types.SendError)
Send a message to a peer using the default transport.
The peer can be a local registered process name (e.g., "my_process")
or a remote process ("my_process@node").
Example
let payload = <<"hello">>
let opts = adapter.default_send_options()
transport.send("calculator", payload, opts)
pub fn start_link() -> Result(
types.AdapterHandle,
types.AdapterError,
)
Start the transport adapter directly without supervision.
Note: For production use, prefer child_spec() with a supervisor.
This function is primarily useful for testing or simple scripts.
Returns the adapter handle on success, or an error if startup fails. Uses a default retry policy with 3 attempts and exponential backoff.
pub fn start_link_with_retry(
policy: retry.RetryPolicy,
) -> Result(types.AdapterHandle, types.AdapterError)
Start the transport adapter with a custom retry policy.
This function will retry the startup operation according to the provided policy, with exponential backoff and optional jitter between attempts.
Example
import distribute/transport
import distribute/retry
// Aggressive retry for critical systems
let policy = retry.aggressive()
let assert Ok(handle) = transport.start_link_with_retry(policy)
// Or with custom settings
let custom = retry.default_with_jitter()
|> retry.with_max_attempts(5)
|> retry.with_base_delay_ms(200)
let assert Ok(handle) = transport.start_link_with_retry(custom)
pub fn subscribe(
callback: fn(String, BitArray, types.MessageMetadata) -> Nil,
) -> Result(types.SubscriptionId, types.AdapterError)
Subscribe to incoming messages on the default transport.
The callback will be invoked for each message received by the adapter. Returns a subscription ID that can be used to unsubscribe later.
Example
let callback = fn(from, payload) {
io.println("Received from: " <> from)
}
let assert Ok(sub_id) = transport.subscribe(callback)
pub fn unsubscribe(
id: types.SubscriptionId,
) -> Result(Nil, types.AdapterError)
Unsubscribe from the default transport.
After calling this function, the callback associated with the given subscription ID will no longer receive messages.