Task-oriented guides for using the Katzenpost thin client API

Thin Client How-to Guide

This guide shows how to accomplish specific tasks with the Katzenpost thin client. Each section is self-contained: find the task you need and follow the steps.

If you are new to Pigeonhole, read Understanding Pigeonhole first for the concepts, then return here for the recipes, and consult the Thin Client API Reference for the precise signatures.

Throughout this guide and the API the words channel and stream are used interchangeably: they denote one and the same thing.

Authoritative working examples

A word of caution before you proceed. The code fragments in this guide are illustrative: they are written to teach one task at a time, and to keep the reader’s eye on the matter at hand they omit imports, error handling, and surrounding context. They are not compiled or run by our continuous integration, and so, as the API evolves, an individual snippet may fall out of step with it.

The integration tests below carry no such caveat. They are exercised on every change by CI, so they are guaranteed to compile and to pass against the code they accompany. When a fragment in this guide and a test disagree, the test is correct. Treat these files as the canonical, runnable companion to the prose:

Language Test file Repository
Go client/pigeonhole_docker_test.go katzenpost
Python tests/test_new_pigeonhole_api.py, tests/test_new_methods.py thin_client
Rust tests/channel_api_test.rs thin_client

These links track the main branch of each repository; should you be working against a pinned release, consult the corresponding files at that tag instead.

Table of Contents

Section Description
Connect to the daemon and handle events Establish a connection and process events
Discover network services Find services in the PKI document
Verify the PKI document yourself Check directory authority signatures against your own trust store
Send a message to a mixnet service Echo ping and other non-Pigeonhole services
Create a Pigeonhole channel Generate a stream with write/read capabilities
Write a message Encrypt and send a write to a stream
Read a message Retrieve and decrypt a message from a stream
Wait for a message not yet written Poll with bounded retry around BoxIDNotFound
Persist and restore channel state Survive a process restart without losing your place
Hold a two-way conversation Wire two streams into a bidirectional channel
Prepare operations offline Do the local crypto now, transmit when connected
A complete end-to-end example One runnable Alice-writes, Bob-reads program
Delete messages with tombstones Tombstone one or more boxes
Send to one channel atomically Single-destination copy command
Send to multiple channels atomically Multi-destination copy command
Multi-call buffer passing Incremental copy streams with crash recovery
Tombstone a range via copy stream Atomic tombstoning through a courier
Cancel in-flight operations Cancel individual operations or stop all at once
Handle daemon disconnects Automatic reconnection and request replay

How to connect to the daemon and handle events

Connect to the kpclientd daemon and set up event handling:

cfg, err := thin.LoadFile("thinclient.toml")
if err != nil {
    log.Fatal(err)
}

logging := &config.Logging{Level: "INFO"}
client := thin.NewThinClient(cfg, logging)

err = client.Dial()
if err != nil {
    log.Fatal(err)
}
defer client.Close()

// Listen for events
eventCh := client.EventSink()
defer client.StopEventSink(eventCh)

for ev := range eventCh {
    switch v := ev.(type) {
    case *thin.ConnectionStatusEvent:
        fmt.Printf("Connected: %v\n", v.IsConnected)
    case *thin.NewDocumentEvent:
        fmt.Println("New PKI document received")
    case *thin.MessageReplyEvent:
        fmt.Printf("Reply received for SURB %x\n", v.SURBID)
    }
}
let config = Config::new("thinclient.toml")?;
let client = ThinClient::new(config).await?;

// Listen for events
let mut event_rx = client.event_sink();
tokio::spawn(async move {
    while let Some(event) = event_rx.recv().await {
        // Process events
        println!("Event: {:?}", event);
    }
});

// ... do work ...

client.stop().await;
async def on_connection_status(event):
    print(f"Connected: {event.get('is_connected')}")

async def on_new_document(event):
    print("New PKI document received")

async def on_message_reply(event):
    print(f"Reply received")

config = Config("thinclient.toml")
config.on_connection_status = on_connection_status
config.on_new_pki_document = on_new_document
config.on_message_reply = on_message_reply

client = ThinClient(config)
loop = asyncio.get_running_loop()
await client.start(loop)

# ... do work ...

client.stop()

How to discover network services via the PKI document

NOTE that this isn’t necessary for using the Pigeonhole protocol because kpclientd does courier service discovery automatically.

The PKI document lists all available mixnet services. Use GetService to get a random instance of a named service:

doc := client.PKIDocument()
if doc == nil {
    log.Fatal("No PKI document available")
}

// Get a random echo service
desc, err := client.GetService("echo")
if err != nil {
    log.Fatal(err)
}

// Use desc.MixDescriptor.IdentityKey and desc.RecipientQueueID
// as the destination for SendMessage
destNode, destQueue := desc.ToDestination()
let doc = client.pki_document().await?;

// Get a random echo service
let desc = client.get_service("echo").await?;

// Use the destination for send_message
let (dest_node, dest_queue) = desc.to_destination();
doc = client.pki_document()
if doc is None:
    raise Exception("No PKI document available")

# Get a random echo service
desc = client.get_service("echo")

# Use the destination for send_message
dest_node, dest_queue = desc.to_destination()

How to verify the PKI document yourself

In ordinary use you do not need this section. kpclientd already verifies every PKI document against the directory authorities listed in client.toml, and only after a sufficient threshold of authority signatures has passed does it push the document on to the thin client. The pki_document() method described above hands you the post-verification document, and you inherit the daemon’s guarantee without further work. The signature map is stripped before that handoff precisely because the verification has already happened; carrying the signatures through would only invite confusion about whose trust root is in force.

get_pki_document_raw is the trapdoor for special applications and integrations that want the signed document. The cases that come up in practice include:

  • An application that wishes to anchor its own root of trust, independently of kpclientd’s configuration, for instance when shipping a hardened build with the authority keys compiled in.
  • A relay that forwards the signed document to a separate consumer (for archival, audit, or out-of-band verification) which does not itself speak the thin-client protocol.
  • A diagnostic or monitoring tool that wishes to display which authorities signed which consensus, across time.

The method returns the cert.Certificate-wrapped signed document together with the epoch the daemon resolved to. Pass 0 for the requested epoch to mean “whatever the daemon currently believes is the latest”.

The examples below verify the document against the post-quantum hybrid signature scheme Falcon-padded-512-Ed25519, the recommended production scheme published by hpqc in both its Python and Go forms. The authority public keys must come from the application’s own trust store, never from the daemon: if the daemon supplied them, the verification would establish only that the daemon was internally consistent, not that the document was signed by the real authorities.

import struct
from hashlib import blake2b

import cbor2
from hpqc.sign.hybrid import FalconPadded512Ed25519


# The directory authority public keys in the wire format expected by
# hpqc's hybrid scheme: the byte concatenation
# ``falcon_padded_512_pub || ed25519_pub`` (929 bytes per authority).
# These must be obtained out of band, typically baked into the
# application or carried in a separately signed bundle. The hex
# strings below are placeholders.
AUTHORITY_PUBLIC_KEYS = [
    bytes.fromhex("ab" * 929),  # auth1
    bytes.fromhex("cd" * 929),  # auth2
    bytes.fromhex("ef" * 929),  # auth3
]
THRESHOLD = len(AUTHORITY_PUBLIC_KEYS) // 2 + 1
SCHEME_NAME = "Falcon-padded-512-Ed25519"


def _signed_message(cert: dict) -> bytes:
    """Reconstruct the byte string the authorities signed.

    A deterministic little-endian concatenation of the Certificate
    fields preceding Signatures; see katzenpost/core/cert/cert.go for
    the canonical encoding.
    """
    return b"".join([
        struct.pack("<I", cert["Version"]),
        struct.pack("<Q", cert["Expiration"]),
        cert["KeyType"].encode("utf-8"),
        cert["Certified"],
    ])


async def fetch_and_verify_pki(client, epoch: int = 0) -> bytes:
    """Fetch the signed PKI document and verify it against the trust root.

    Returns the inner Certified payload (the CBOR-encoded Document)
    once a sufficient threshold of authority signatures has verified;
    raises ValueError otherwise.
    """
    payload, returned_epoch = await client.get_pki_document_raw(epoch)
    cert = cbor2.loads(payload)

    if cert["Version"] != 0:
        raise ValueError(f"unknown certificate version: {cert['Version']}")
    if cert["KeyType"] != SCHEME_NAME:
        raise ValueError(
            f"unexpected key type {cert['KeyType']!r}, "
            f"expected {SCHEME_NAME!r}"
        )

    msg = _signed_message(cert)
    signatures = cert.get("Signatures") or {}

    verified = 0
    for pubkey in AUTHORITY_PUBLIC_KEYS:
        key_hash = blake2b(pubkey, digest_size=32).digest()
        sig = signatures.get(key_hash)
        if sig is None:
            continue
        if FalconPadded512Ed25519.verify(pubkey, msg, sig["Payload"]):
            verified += 1

    if verified < THRESHOLD:
        raise ValueError(
            f"only {verified} of {len(AUTHORITY_PUBLIC_KEYS)} authority "
            f"signatures verified for epoch {returned_epoch}; threshold "
            f"is {THRESHOLD}"
        )

    # cert["Certified"] is the CBOR-encoded Document. Decode it with
    # cbor2.loads(cert["Certified"]) if the application needs the
    # contents themselves.
    return cert["Certified"]
package main

import (
    "encoding/hex"
    "fmt"
    "log"

    "github.com/katzenpost/hpqc/sign"
    "github.com/katzenpost/hpqc/sign/hybrid"

    "github.com/katzenpost/katzenpost/client/thin"
    "github.com/katzenpost/katzenpost/core/cert"
)

// AuthorityPublicKeysHex is the application's root of trust for the
// network's directory: the wire-format hybrid public keys of each
// authority, hex-encoded. They must be obtained out of band and
// never from the daemon. Replace these placeholders with your own.
var AuthorityPublicKeysHex = []string{
    "abab...", // auth1
    "cdcd...", // auth2
    "efef...", // auth3
}

// FetchAndVerifyPKI fetches the signed PKI document for the given
// epoch (pass 0 for "current") and verifies it against the
// authority public keys above using core/cert.VerifyThreshold.
func FetchAndVerifyPKI(client *thin.ThinClient, epoch uint64) ([]byte, error) {
    scheme := hybrid.FalconPadded512Ed25519
    verifiers := make([]sign.PublicKey, 0, len(AuthorityPublicKeysHex))
    for _, hexKey := range AuthorityPublicKeysHex {
        raw, err := hex.DecodeString(hexKey)
        if err != nil {
            return nil, fmt.Errorf("decoding authority key: %w", err)
        }
        pub, err := scheme.UnmarshalBinaryPublicKey(raw)
        if err != nil {
            return nil, fmt.Errorf("parsing authority key: %w", err)
        }
        verifiers = append(verifiers, pub)
    }

    payload, returnedEpoch, err := client.GetPKIDocumentRaw(epoch)
    if err != nil {
        return nil, fmt.Errorf("fetching signed PKI doc: %w", err)
    }

    threshold := len(verifiers)/2 + 1
    certified, good, _, err := cert.VerifyThreshold(verifiers, threshold, payload)
    if err != nil {
        return nil, fmt.Errorf(
            "threshold verification failed for epoch %d: %w",
            returnedEpoch, err,
        )
    }
    log.Printf("verified %d of %d authority signatures for epoch %d",
        len(good), len(verifiers), returnedEpoch)
    return certified, nil
}

Considerations:

  • The authority public keys must come from a trust root external to the daemon. If the daemon supplied them, verification would prove only that the daemon was internally consistent.
  • The threshold above (a simple majority) matches the policy that the authorities themselves enforce when they admit a consensus. An application may apply a stricter policy, but should not relax it.
  • Should the network ever be reconfigured to use a different signature scheme, swap the hybrid for the corresponding hpqc verifier and adjust the expected KeyType (Python) or the hybrid.* selector (Go) accordingly. The KeyType field of the certificate is what the authorities signed under, and is the authoritative indicator of the scheme in force.
  • A Rust binding is not shown because hpqc does not yet publish a Rust port; a Rust application can compose the verification with the ed25519-dalek and falcon crates by the same wire layout (the public key and signature are simple concatenations of the two component halves).

How to send a message to a mixnet service

NOTE that this API call is NOT used with the Pigeonhole protocol. However it is still useful for writing other protocols and proving the echo service.

Use BlockingSendMessage for simple request-response interactions with non-Pigeonhole services (like the echo service):

desc, err := client.GetService("echo")
if err != nil {
    log.Fatal(err)
}

destNode, destQueue := desc.ToDestination()

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

reply, err := client.BlockingSendMessage(ctx, []byte("hello mixnet"), destNode, destQueue)
if err != nil {
    log.Fatal(err)
}
fmt.Printf("Reply: %s\n", reply)
let desc = client.get_service("echo").await?;
let (dest_node, dest_queue) = desc.to_destination();

let reply = client.blocking_send_message(
    b"hello mixnet",
    dest_node,
    dest_queue,
    std::time::Duration::from_secs(30),
).await?;
println!("Reply: {:?}", reply);
desc = client.get_service("echo")
dest_node, dest_queue = desc.to_destination()

reply = await client.blocking_send_message(
    b"hello mixnet", dest_node, dest_queue, timeout_seconds=30.0
)
print(f"Reply: {reply}")

How to create a Pigeonhole channel

NOTE that this does NOT produce any network traffic. It’s a local cryptographic operation only.

A Pigeonhole channel (stream) is created from a 32-byte random seed. The writer keeps the write cap; the reader receives the read cap and first index out-of-band.

seed := make([]byte, 32)
_, err := rand.Reader.Read(seed)
if err != nil {
    log.Fatal(err)
}

writeCap, readCap, firstIndex, err := client.NewKeypair(seed)
if err != nil {
    log.Fatal(err)
}

// Writer keeps: writeCap, firstIndex
// Share with reader out-of-band: readCap, firstIndex
let seed: [u8; 32] = rand::random();
let result = client.new_keypair(&seed).await?;

// Writer keeps: result.write_cap, result.first_index
// Share with reader out-of-band: result.read_cap, result.first_index
import os
seed = os.urandom(32)
result = await client.new_keypair(seed)

# Writer keeps: result.write_cap, result.first_index
# Share with reader out-of-band: result.read_cap, result.first_index

How to write a message to a Pigeonhole channel

Writing is a two-step process: encrypt the message, then send it via ARQ.

// Encrypt the message
ciphertext, envDesc, envHash, nextIndex, err := client.EncryptWrite(
    []byte("hello"), writeCap, currentIndex,
)
if err != nil {
    log.Fatal(err)
}

// Send via ARQ (blocks until acknowledged)
_, err = client.StartResendingEncryptedMessage(
    nil,       // readCap (nil for writes)
    writeCap,
    nil,       // messageBoxIndex (nil for writes)
    nil,       // replyIndex
    envDesc,
    ciphertext,
    envHash,
)
if err != nil {
    log.Fatal(err)
}

// Advance the index for the next write
currentIndex = nextIndex
// Encrypt the message
let result = client.encrypt_write(
    b"hello", &write_cap, &current_index,
).await?;

// Send via ARQ (blocks until acknowledged)
client.start_resending_encrypted_message(
    None,                           // read_cap (None for writes)
    Some(&write_cap),
    None,                           // message_box_index
    None,                           // reply_index
    &result.envelope_descriptor,
    &result.message_ciphertext,
    &result.envelope_hash,
).await?;

// Advance the index for the next write
current_index = result.next_message_box_index;
# Encrypt the message
result = await client.encrypt_write(b"hello", write_cap, current_index)

# Send via ARQ (blocks until acknowledged)
await client.start_resending_encrypted_message(
    read_cap=None,
    write_cap=write_cap,
    next_message_index=None,
    reply_index=None,
    envelope_descriptor=result.envelope_descriptor,
    message_ciphertext=result.message_ciphertext,
    envelope_hash=result.envelope_hash,
)

# Advance the index for the next write
current_index = result.next_message_box_index

How to read a message from a Pigeonhole channel

Reading is also two steps: encrypt a read request, then send it via ARQ. The reply contains the plaintext.

// Encrypt a read request
ciphertext, envDesc, envHash, nextIndex, err := client.EncryptRead(
    readCap, currentIndex,
)
if err != nil {
    log.Fatal(err)
}

// Send via ARQ (blocks until the message is retrieved)
result, err := client.StartResendingEncryptedMessage(
    readCap,
    nil,       // writeCap (nil for reads)
    nil,       // messageBoxIndex
    nil,       // replyIndex
    envDesc,
    ciphertext,
    envHash,
)
if err != nil {
    log.Fatal(err)
}

plaintext := result.Plaintext
// Advance the index for the next read
currentIndex = nextIndex
// Encrypt a read request
let read_result = client.encrypt_read(&read_cap, &current_index).await?;

// Send via ARQ (blocks until the message is retrieved)
let result = client.start_resending_encrypted_message(
    Some(&read_cap),
    None,                                    // write_cap (None for reads)
    None,                                    // message_box_index
    None,                                    // reply_index
    &read_result.envelope_descriptor,
    &read_result.message_ciphertext,
    &read_result.envelope_hash,
).await?;

let plaintext = result.plaintext;
// Advance the index for the next read
current_index = read_result.next_message_box_index;
# Encrypt a read request
read_result = await client.encrypt_read(read_cap, current_index)

# Send via ARQ (blocks until the message is retrieved)
plaintext = await client.start_resending_encrypted_message(
    read_cap=read_cap,
    write_cap=None,
    next_message_index=None,
    reply_index=None,
    envelope_descriptor=read_result.envelope_descriptor,
    message_ciphertext=read_result.message_ciphertext,
    envelope_hash=read_result.envelope_hash,
)

# Advance the index for the next read
current_index = read_result.next_message_box_index

How to wait for a message that has not been written yet

Reads and writes are not coordinated: a reader routinely asks for an index before the writer has filled it, and replication lag can briefly hide a box that was in fact written. In both cases the daemon reports BoxIDNotFound. This is the expected answer to “anything here yet?”, not a failure. The correct pattern is a bounded poll: retry on the expected outcome, with a short delay between attempts, until the data appears or an application deadline elapses. Use IsExpectedOutcome to tell a benign “not yet” apart from a real error, so that genuine failures are not silently retried forever.

deadline := time.Now().Add(2 * time.Minute)
var plaintext []byte
for {
    ciphertext, envDesc, envHash, nextIndex, err := client.EncryptRead(
        readCap, currentIndex,
    )
    if err != nil {
        log.Fatal(err)
    }

    result, err := client.StartResendingEncryptedMessage(
        readCap, nil, nil, nil, envDesc, ciphertext, envHash,
    )
    if err == nil {
        plaintext = result.Plaintext
        currentIndex = nextIndex
        break
    }

    // BoxIDNotFound here just means "not written yet". Anything that
    // is not an expected outcome is a real failure worth surfacing.
    if !thin.IsExpectedOutcome(err) {
        log.Fatal(err)
    }
    if time.Now().After(deadline) {
        log.Fatal("gave up waiting for the message")
    }
    time.Sleep(3 * time.Second)
}
let deadline = std::time::Instant::now()
    + std::time::Duration::from_secs(120);
let plaintext = loop {
    let read = client.encrypt_read(&read_cap, &current_index).await?;
    match client.start_resending_encrypted_message(
        Some(&read_cap), None, None, None,
        &read.envelope_descriptor,
        &read.message_ciphertext,
        &read.envelope_hash,
    ).await {
        Ok(result) => {
            current_index = read.next_message_box_index;
            break result.plaintext;
        }
        Err(e) if e.is_expected_outcome() => {
            if std::time::Instant::now() > deadline {
                return Err(e);
            }
            tokio::time::sleep(
                std::time::Duration::from_secs(3)).await;
        }
        Err(e) => return Err(e),
    }
};
import asyncio, time
from katzenpost_thinclient import is_expected_outcome

deadline = time.monotonic() + 120
while True:
    read = await client.encrypt_read(read_cap, current_index)
    try:
        plaintext = await client.start_resending_encrypted_message(
            read_cap=read_cap, write_cap=None,
            next_message_index=None, reply_index=None,
            envelope_descriptor=read.envelope_descriptor,
            message_ciphertext=read.message_ciphertext,
            envelope_hash=read.envelope_hash,
        )
        current_index = read.next_message_box_index
        break
    except Exception as exc:
        # "not written yet" is expected; anything else is a real error.
        if not is_expected_outcome(exc):
            raise
        if time.monotonic() > deadline:
            raise
        await asyncio.sleep(3)

How to persist and restore channel state

The daemon keeps no per-application channel state. The write cap, the read cap, and above all the current index belong to your application, and if you lose the index across a restart you no longer know where to append next (re-using a filled index earns BoxAlreadyExists). Persist the index every time you advance it, durably, before you treat the write as done.

In Go the capabilities and the index are typed; serialise them with MarshalBinary and restore them with the bacap constructors. In Rust and Python new_keypair already hands you the caps and index as byte strings, so persistence is simply storing and reloading those bytes.

import "github.com/katzenpost/hpqc/bacap"

// Save: marshal each artefact to bytes and write atomically to disk.
wcBytes, _ := writeCap.MarshalBinary()
rcBytes, _ := readCap.MarshalBinary()
idxBytes, _ := currentIndex.MarshalBinary()
saveState(wcBytes, rcBytes, idxBytes) // your durable, atomic write

// Restore after a restart:
writeCap, err := bacap.NewWriteCapFromBytes(wcBytes)
if err != nil {
    log.Fatal(err)
}
readCap, err := bacap.ReadCapFromBytes(rcBytes)
if err != nil {
    log.Fatal(err)
}
currentIndex, err := bacap.NewEmptyMessageBoxIndexFromBytes(idxBytes)
if err != nil {
    log.Fatal(err)
}
// new_keypair already returns Vec<u8> for each artefact.
let kp = client.new_keypair(&seed).await?;
save_state(&kp.write_cap, &kp.read_cap, &kp.first_index);
let mut current_index = kp.first_index.clone();

// ... each time you advance, persist the new index bytes ...
save_index(&current_index);

// After a restart, the stored bytes are passed straight back into
// the API; no deserialisation step is required.
let (write_cap, read_cap, current_index) = load_state();
# new_keypair already returns bytes for each artefact.
kp = await client.new_keypair(seed)
save_state(kp.write_cap, kp.read_cap, kp.first_index)
current_index = kp.first_index

# ... each time you advance, persist the new index bytes ...
save_index(current_index)

# After a restart, the stored bytes are passed straight back into
# the API; no deserialisation step is required.
write_cap, read_cap, current_index = load_state()

The writer must persist currentIndex after every successful write, the reader after every successful read. Persist the index before acknowledging the message to the rest of your application, so that a crash cannot leave you having processed a message whose index you never recorded.


How to hold a two-way conversation

A stream has exactly one writer, so a conversation between two parties is two streams: each party writes to its own and reads from the other’s. The setup is symmetric: each creates a stream and shares its read cap (and first index) with the other out-of-band. Thereafter each party writes with its own write cap and polls the peer’s stream with the peer’s read cap, advancing two independent indices.

// Alice's side. (Bob's is the mirror image.)
aliceWrite, aliceRead, aliceIdx, err := client.NewKeypair(aliceSeed)
if err != nil {
    log.Fatal(err)
}

// Exchange read caps out-of-band: Alice sends aliceRead+aliceIdx to
// Bob and receives bobRead+bobIdx from Bob.
sendOutOfBand(aliceRead, aliceIdx)
bobRead, bobIdx := receiveOutOfBand()

// Send on Alice's own stream.
ct, ed, eh, nextOut, _ := client.EncryptWrite(
    []byte("hello Bob"), aliceWrite, aliceIdx)
_, err = client.StartResendingEncryptedMessage(
    nil, aliceWrite, nil, nil, ed, ct, eh)
if err != nil {
    log.Fatal(err)
}
aliceIdx = nextOut // persist this

// Receive on Bob's stream, using the polling pattern shown above,
// reading with bobRead and advancing bobIdx.
// Alice's side. (Bob's is the mirror image.)
let alice = client.new_keypair(&alice_seed).await?;

// Exchange read caps out-of-band.
send_out_of_band(&alice.read_cap, &alice.first_index);
let (bob_read, mut bob_idx) = receive_out_of_band();
let mut alice_idx = alice.first_index.clone();

// Send on Alice's own stream.
let w = client.encrypt_write(b"hello Bob",
    &alice.write_cap, &alice_idx).await?;
client.start_resending_encrypted_message(
    None, Some(&alice.write_cap), None, None,
    &w.envelope_descriptor, &w.message_ciphertext,
    &w.envelope_hash).await?;
alice_idx = w.next_message_box_index; // persist this

// Receive on Bob's stream with the polling pattern, reading with
// bob_read and advancing bob_idx.
# Alice's side. (Bob's is the mirror image.)
alice = await client.new_keypair(alice_seed)

# Exchange read caps out-of-band.
send_out_of_band(alice.read_cap, alice.first_index)
bob_read, bob_idx = receive_out_of_band()
alice_idx = alice.first_index

# Send on Alice's own stream.
w = await client.encrypt_write(b"hello Bob",
    alice.write_cap, alice_idx)
await client.start_resending_encrypted_message(
    read_cap=None, write_cap=alice.write_cap,
    next_message_index=None, reply_index=None,
    envelope_descriptor=w.envelope_descriptor,
    message_ciphertext=w.message_ciphertext,
    envelope_hash=w.envelope_hash)
alice_idx = w.next_message_box_index  # persist this

# Receive on Bob's stream with the polling pattern, reading with
# bob_read and advancing bob_idx.

How to prepare operations offline

The daemon distinguishes two kinds of work. Key generation and envelope encryption (NewKeypair, EncryptWrite, EncryptRead, TombstoneRange, and the copy-stream constructors) are local cryptography and succeed even when the daemon is not connected to the mixnet. Only StartResendingEncryptedMessage and StartResendingCopyCommand require connectivity; called offline they fail rather than block.

You can therefore prepare envelopes while offline, persist them, and transmit once connectivity returns. Test IsConnected before transmitting, or watch the connection event and flush a queue when it turns true.

// Offline: this is pure local crypto and works regardless.
ciphertext, envDesc, envHash, nextIndex, err := client.EncryptWrite(
    []byte("written while offline"), writeCap, currentIndex,
)
if err != nil {
    log.Fatal(err)
}
enqueue(envDesc, ciphertext, envHash) // persist for later

// Later, only transmit once the daemon is connected.
if client.IsConnected() {
    for _, e := range drainQueue() {
        _, err = client.StartResendingEncryptedMessage(
            nil, writeCap, nil, nil, e.desc, e.ct, e.hash)
        if err != nil {
            log.Fatal(err)
        }
    }
}
// Offline: pure local crypto, works regardless.
let w = client.encrypt_write(
    b"written while offline", &write_cap, &current_index).await?;
enqueue(&w); // persist for later

// Later, only transmit once connected.
if client.is_connected() {
    for e in drain_queue() {
        client.start_resending_encrypted_message(
            None, Some(&write_cap), None, None,
            &e.envelope_descriptor, &e.message_ciphertext,
            &e.envelope_hash).await?;
    }
}
# Offline: pure local crypto, works regardless.
w = await client.encrypt_write(
    b"written while offline", write_cap, current_index)
enqueue(w)  # persist for later

# Later, only transmit once connected.
if client.is_connected():
    for e in drain_queue():
        await client.start_resending_encrypted_message(
            read_cap=None, write_cap=write_cap,
            next_message_index=None, reply_index=None,
            envelope_descriptor=e.envelope_descriptor,
            message_ciphertext=e.message_ciphertext,
            envelope_hash=e.envelope_hash)

A complete end-to-end example

The fragments above each show one task. Here they are assembled into a single runnable program: Alice creates a stream, writes one message, and Bob reads it back. This is the smallest complete program that exercises the Pigeonhole path. As with every example in this guide it omits production concerns (durable persistence, structured logging), but it compiles into the shape of a real application; the CI-verified tests are the authority on exact, current usage.

package main

import (
    "log"

    "github.com/katzenpost/hpqc/rand"

    "github.com/katzenpost/katzenpost/client/thin"
    "github.com/katzenpost/katzenpost/core/config"
)

func main() {
    cfg, err := thin.LoadFile("thinclient.toml")
    if err != nil {
        log.Fatal(err)
    }
    client := thin.NewThinClient(cfg, &config.Logging{Level: "INFO"})
    if err := client.Dial(); err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    // Alice creates a stream.
    seed := make([]byte, 32)
    if _, err := rand.Reader.Read(seed); err != nil {
        log.Fatal(err)
    }
    writeCap, readCap, idx, err := client.NewKeypair(seed)
    if err != nil {
        log.Fatal(err)
    }

    // Alice writes one message.
    ct, ed, eh, _, err := client.EncryptWrite(
        []byte("hello from Alice"), writeCap, idx)
    if err != nil {
        log.Fatal(err)
    }
    if _, err := client.StartResendingEncryptedMessage(
        nil, writeCap, nil, nil, ed, ct, eh); err != nil {
        log.Fatal(err)
    }

    // Bob reads it back (readCap would normally be shared out-of-band).
    rct, red, reh, _, err := client.EncryptRead(readCap, idx)
    if err != nil {
        log.Fatal(err)
    }
    result, err := client.StartResendingEncryptedMessage(
        readCap, nil, nil, nil, red, rct, reh)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("Bob read: %s", result.Plaintext)
}
use katzenpost_thin_client::{Config, ThinClient};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = Config::new("thinclient.toml")?;
    let client = ThinClient::new(config).await?;

    // Alice creates a stream.
    let seed: [u8; 32] = rand::random();
    let kp = client.new_keypair(&seed).await?;

    // Alice writes one message.
    let w = client.encrypt_write(
        b"hello from Alice", &kp.write_cap, &kp.first_index).await?;
    client.start_resending_encrypted_message(
        None, Some(&kp.write_cap), None, None,
        &w.envelope_descriptor, &w.message_ciphertext,
        &w.envelope_hash).await?;

    // Bob reads it back (read_cap would normally be shared out-of-band).
    let r = client.encrypt_read(&kp.read_cap, &kp.first_index).await?;
    let result = client.start_resending_encrypted_message(
        Some(&kp.read_cap), None, None, None,
        &r.envelope_descriptor, &r.message_ciphertext,
        &r.envelope_hash).await?;
    println!("Bob read: {:?}", result.plaintext);

    client.stop().await;
    Ok(())
}
import asyncio, os
from katzenpost_thinclient import ThinClient, Config

async def main():
    config = Config("thinclient.toml")
    client = ThinClient(config)
    await client.start(asyncio.get_running_loop())

    # Alice creates a stream.
    seed = os.urandom(32)
    kp = await client.new_keypair(seed)

    # Alice writes one message.
    w = await client.encrypt_write(
        b"hello from Alice", kp.write_cap, kp.first_index)
    await client.start_resending_encrypted_message(
        read_cap=None, write_cap=kp.write_cap,
        next_message_index=None, reply_index=None,
        envelope_descriptor=w.envelope_descriptor,
        message_ciphertext=w.message_ciphertext,
        envelope_hash=w.envelope_hash)

    # Bob reads it back (read_cap would normally be shared out-of-band).
    r = await client.encrypt_read(kp.read_cap, kp.first_index)
    plaintext = await client.start_resending_encrypted_message(
        read_cap=kp.read_cap, write_cap=None,
        next_message_index=None, reply_index=None,
        envelope_descriptor=r.envelope_descriptor,
        message_ciphertext=r.message_ciphertext,
        envelope_hash=r.envelope_hash)
    print("Bob read:", plaintext)

    client.stop()

asyncio.run(main())

How to delete messages with tombstones

Use TombstoneRange to create tombstone envelopes, then send each one via StartResendingEncryptedMessage. To tombstone a single box, use maxCount=1.

// Create tombstone envelopes for 5 boxes
result, err := client.TombstoneRange(writeCap, startIndex, 5)
if err != nil {
    log.Fatal(err)
}

// Send each tombstone
for _, envelope := range result.Envelopes {
    _, err = client.StartResendingEncryptedMessage(
        nil, writeCap, nil, nil,
        envelope.EnvelopeDescriptor,
        envelope.MessageCiphertext,
        envelope.EnvelopeHash,
    )
    if err != nil {
        log.Fatal(err)
    }
}
// result.Next is the index after the last tombstoned box
// Create tombstone envelopes for 5 boxes
let result = client.tombstone_range(&write_cap, &start_index, 5).await;

// Send each tombstone
for envelope in &result.envelopes {
    client.start_resending_encrypted_message(
        None,
        Some(&write_cap),
        None,
        None,
        &envelope.envelope_descriptor,
        &envelope.message_ciphertext,
        &envelope.envelope_hash,
    ).await?;
}
// result.next is the index after the last tombstoned box
# Create tombstone envelopes for 5 boxes
result = await client.tombstone_range(write_cap, start_index, 5)

# Send each tombstone
for envelope in result.envelopes:
    await client.start_resending_encrypted_message(
        read_cap=None,
        write_cap=write_cap,
        next_message_index=None,
        reply_index=None,
        envelope_descriptor=envelope.envelope_descriptor,
        message_ciphertext=envelope.message_ciphertext,
        envelope_hash=envelope.envelope_hash,
    )
# result.next is the index after the last tombstoned box

How to send to one channel atomically via copy command

A copy command writes data to a destination channel atomically via a courier. The steps are:

  1. Create a temporary channel and a destination channel.
  2. Pack the payload into copy stream elements.
  3. Write each element to the temporary channel.
  4. Send a copy command referencing the temporary channel.
// Create destination channel
destSeed := make([]byte, 32)
rand.Reader.Read(destSeed)
destWriteCap, destReadCap, destFirstIndex, err := client.NewKeypair(destSeed)
if err != nil {
    log.Fatal(err)
}

// Create temporary channel
tempSeed := make([]byte, 32)
rand.Reader.Read(tempSeed)
tempWriteCap, _, tempFirstIndex, err := client.NewKeypair(tempSeed)
if err != nil {
    log.Fatal(err)
}

// Pack payload into copy stream elements
envelopes, _, err := client.CreateCourierEnvelopesFromPayload(
    payload, destWriteCap, destFirstIndex,
    true,  // isStart
    true,  // isLast
)
if err != nil {
    log.Fatal(err)
}

// Write each element to the temporary channel
tempIndex := tempFirstIndex
for _, chunk := range envelopes {
    ciphertext, envDesc, envHash, nextIdx, err := client.EncryptWrite(
        chunk, tempWriteCap, tempIndex,
    )
    if err != nil {
        log.Fatal(err)
    }
    _, err = client.StartResendingEncryptedMessage(
        nil, tempWriteCap, nil, nil, envDesc, ciphertext, envHash,
    )
    if err != nil {
        log.Fatal(err)
    }
    tempIndex = nextIdx
}

// Send the copy command (blocks until courier acknowledges)
err = client.StartResendingCopyCommand(tempWriteCap)
if err != nil {
    log.Fatal(err)
}

// Share destReadCap and destFirstIndex with the reader
// Create destination channel
let dest_seed: [u8; 32] = rand::random();
let dest = client.new_keypair(&dest_seed).await?;

// Create temporary channel
let temp_seed: [u8; 32] = rand::random();
let temp = client.new_keypair(&temp_seed).await?;

// Pack payload into copy stream elements
let envelopes_result = client.create_courier_envelopes_from_payload(
    &payload, &dest.write_cap, &dest.first_index,
    true,  // is_start
    true,  // is_last
).await?;

// Write each element to the temporary channel
let mut temp_index = temp.first_index.clone();
for chunk in &envelopes_result.envelopes {
    let write_result = client.encrypt_write(
        chunk, &temp.write_cap, &temp_index,
    ).await?;
    client.start_resending_encrypted_message(
        None, Some(&temp.write_cap), None, None,
        &write_result.envelope_descriptor,
        &write_result.message_ciphertext,
        &write_result.envelope_hash,
    ).await?;
    temp_index = write_result.next_message_box_index;
}

// Send the copy command (blocks until courier acknowledges)
client.start_resending_copy_command(&temp.write_cap, None, None).await?;

// Share dest.read_cap and dest.first_index with the reader
import os

# Create destination channel
dest_seed = os.urandom(32)
dest = await client.new_keypair(dest_seed)

# Create temporary channel
temp_seed = os.urandom(32)
temp = await client.new_keypair(temp_seed)

# Pack payload into copy stream elements
envelopes_result = await client.create_courier_envelopes_from_payload(
    payload, dest.write_cap, dest.first_index,
    is_start=True,
    is_last=True,
)

# Write each element to the temporary channel
temp_index = temp.first_index
for chunk in envelopes_result.envelopes:
    write_result = await client.encrypt_write(chunk, temp.write_cap, temp_index)
    await client.start_resending_encrypted_message(
        read_cap=None, write_cap=temp.write_cap,
        next_message_index=None, reply_index=None,
        envelope_descriptor=write_result.envelope_descriptor,
        message_ciphertext=write_result.message_ciphertext,
        envelope_hash=write_result.envelope_hash,
    )
    temp_index = write_result.next_message_box_index

# Send the copy command (blocks until courier acknowledges)
await client.start_resending_copy_command(temp.write_cap)

# Share dest.read_cap and dest.first_index with the reader

How to send to multiple channels atomically

Use CreateCourierEnvelopesFromMultiPayload to pack payloads for different destinations into a single copy stream efficiently:

// Create destination channels
dest1WriteCap, dest1ReadCap, dest1Index, _ := client.NewKeypair(seed1)
dest2WriteCap, dest2ReadCap, dest2Index, _ := client.NewKeypair(seed2)

// Create temporary channel
tempWriteCap, _, tempFirstIndex, _ := client.NewKeypair(tempSeed)

// Pack multiple payloads
destinations := []thin.DestinationPayload{
    {Payload: payload1, WriteCap: dest1WriteCap, StartIndex: dest1Index},
    {Payload: payload2, WriteCap: dest2WriteCap, StartIndex: dest2Index},
}

result, err := client.CreateCourierEnvelopesFromMultiPayload(
    destinations,
    true,  // isStart
    true,  // isLast
    nil,   // buffer (nil for first call)
)
if err != nil {
    log.Fatal(err)
}

// Write envelopes to temporary channel
tempIndex := tempFirstIndex
for _, chunk := range result.Envelopes {
    ciphertext, envDesc, envHash, nextIdx, err := client.EncryptWrite(
        chunk, tempWriteCap, tempIndex,
    )
    if err != nil {
        log.Fatal(err)
    }
    _, err = client.StartResendingEncryptedMessage(
        nil, tempWriteCap, nil, nil, envDesc, ciphertext, envHash,
    )
    if err != nil {
        log.Fatal(err)
    }
    tempIndex = nextIdx
}

// Send copy command
err = client.StartResendingCopyCommand(tempWriteCap)
// Create destination channels
let dest1 = client.new_keypair(&seed1).await?;
let dest2 = client.new_keypair(&seed2).await?;

// Create temporary channel
let temp = client.new_keypair(&temp_seed).await?;

// Pack multiple payloads
let destinations = vec![
    (&payload1[..], &dest1.write_cap[..], &dest1.first_index[..]),
    (&payload2[..], &dest2.write_cap[..], &dest2.first_index[..]),
];

let result = client.create_courier_envelopes_from_multi_payload(
    destinations,
    true,  // is_start
    true,  // is_last
    None,  // buffer (None for first call)
).await?;

// Write envelopes to temporary channel
let mut temp_index = temp.first_index.clone();
for chunk in &result.envelopes {
    let write_result = client.encrypt_write(
        chunk, &temp.write_cap, &temp_index,
    ).await?;
    client.start_resending_encrypted_message(
        None, Some(&temp.write_cap), None, None,
        &write_result.envelope_descriptor,
        &write_result.message_ciphertext,
        &write_result.envelope_hash,
    ).await?;
    temp_index = write_result.next_message_box_index;
}

// Send copy command
client.start_resending_copy_command(&temp.write_cap, None, None).await?;
# Create destination channels
dest1 = await client.new_keypair(seed1)
dest2 = await client.new_keypair(seed2)

# Create temporary channel
temp = await client.new_keypair(temp_seed)

# Pack multiple payloads
destinations = [
    {"payload": payload1, "write_cap": dest1.write_cap, "start_index": dest1.first_index},
    {"payload": payload2, "write_cap": dest2.write_cap, "start_index": dest2.first_index},
]

result = await client.create_courier_envelopes_from_multi_payload(
    destinations,
    is_start=True,
    is_last=True,
    buffer=None,
)

# Write envelopes to temporary channel
temp_index = temp.first_index
for chunk in result.envelopes:
    write_result = await client.encrypt_write(chunk, temp.write_cap, temp_index)
    await client.start_resending_encrypted_message(
        read_cap=None, write_cap=temp.write_cap,
        next_message_index=None, reply_index=None,
        envelope_descriptor=write_result.envelope_descriptor,
        message_ciphertext=write_result.message_ciphertext,
        envelope_hash=write_result.envelope_hash,
    )
    temp_index = write_result.next_message_box_index

# Send copy command
await client.start_resending_copy_command(temp.write_cap)

How to handle multi-call buffer passing for large copy streams

When building a copy stream across multiple calls (because you have more data than fits in a single call, or data arrives incrementally), pass the buffer from each result to the next call:

var buffer []byte // nil on first call

// First batch of destinations
result1, err := client.CreateCourierEnvelopesFromMultiPayload(
    batch1Destinations,
    true,   // isStart (first call)
    false,  // isLast (more calls coming)
    buffer,
)
if err != nil {
    log.Fatal(err)
}
// Write result1.Envelopes to temp channel...
buffer = result1.Buffer // save for next call

// Persist buffer to disk for crash recovery
saveState(buffer)

// Second batch (final)
result2, err := client.CreateCourierEnvelopesFromMultiPayload(
    batch2Destinations,
    false,  // isStart (not the first call)
    true,   // isLast (final call)
    buffer,
)
if err != nil {
    log.Fatal(err)
}
// Write result2.Envelopes to temp channel...

// On crash recovery, reload buffer from disk and continue
// with isStart=false
let mut buffer: Option<Vec<u8>> = None; // None on first call

// First batch
let result1 = client.create_courier_envelopes_from_multi_payload(
    batch1_destinations,
    true,   // is_start
    false,  // is_last
    buffer,
).await?;
// Write result1.envelopes to temp channel...
buffer = result1.buffer; // save for next call

// Persist buffer to disk for crash recovery
save_state(&buffer);

// Second batch (final)
let result2 = client.create_courier_envelopes_from_multi_payload(
    batch2_destinations,
    false,  // is_start
    true,   // is_last
    buffer,
).await?;
// Write result2.envelopes to temp channel...
buffer = None  # None on first call

# First batch
result1 = await client.create_courier_envelopes_from_multi_payload(
    batch1_destinations,
    is_start=True,   # first call
    is_last=False,   # more calls coming
    buffer=buffer,
)
# Write result1.envelopes to temp channel...
buffer = result1.buffer  # save for next call

# Persist buffer to disk for crash recovery
save_state(buffer)

# Second batch (final)
result2 = await client.create_courier_envelopes_from_multi_payload(
    batch2_destinations,
    is_start=False,  # not the first call
    is_last=True,    # final call
    buffer=buffer,
)
# Write result2.envelopes to temp channel...

How to tombstone a range via copy stream

Use CreateCourierEnvelopesFromTombstoneRange to atomically tombstone boxes as part of a copy command. The courier performs the tombstoning, so it either all succeeds or none of it is visible.

tempWriteCap, _, tempFirstIndex, _ := client.NewKeypair(tempSeed)

// Create tombstone copy stream elements
envelopes, nextBuffer, nextDestIndex, err := client.CreateCourierEnvelopesFromTombstoneRange(
    destWriteCap,
    destStartIndex,
    10,     // tombstone 10 boxes
    true,   // isStart
    true,   // isLast
    nil,    // buffer
)
if err != nil {
    log.Fatal(err)
}

// Write to temporary channel
tempIndex := tempFirstIndex
for _, chunk := range envelopes {
    ciphertext, envDesc, envHash, nextIdx, err := client.EncryptWrite(
        chunk, tempWriteCap, tempIndex,
    )
    if err != nil {
        log.Fatal(err)
    }
    _, err = client.StartResendingEncryptedMessage(
        nil, tempWriteCap, nil, nil, envDesc, ciphertext, envHash,
    )
    if err != nil {
        log.Fatal(err)
    }
    tempIndex = nextIdx
}

// Send copy command
err = client.StartResendingCopyCommand(tempWriteCap)
let temp = client.new_keypair(&temp_seed).await?;

// Create tombstone copy stream elements
let result = client.create_courier_envelopes_from_tombstone_range(
    &dest_write_cap,
    &dest_start_index,
    10,     // tombstone 10 boxes
    true,   // is_start
    true,   // is_last
    None,   // buffer
).await?;

// Write to temporary channel
let mut temp_index = temp.first_index.clone();
for chunk in &result.envelopes {
    let write_result = client.encrypt_write(
        chunk, &temp.write_cap, &temp_index,
    ).await?;
    client.start_resending_encrypted_message(
        None, Some(&temp.write_cap), None, None,
        &write_result.envelope_descriptor,
        &write_result.message_ciphertext,
        &write_result.envelope_hash,
    ).await?;
    temp_index = write_result.next_message_box_index;
}

// Send copy command
client.start_resending_copy_command(&temp.write_cap, None, None).await?;
temp = await client.new_keypair(temp_seed)

# Create tombstone copy stream elements
result = await client.create_courier_envelopes_from_tombstone_range(
    dest_write_cap,
    dest_start_index,
    10,              # tombstone 10 boxes
    is_start=True,
    is_last=True,
    buffer=None,
)

# Write to temporary channel
temp_index = temp.first_index
for chunk in result.envelopes:
    write_result = await client.encrypt_write(chunk, temp.write_cap, temp_index)
    await client.start_resending_encrypted_message(
        read_cap=None, write_cap=temp.write_cap,
        next_message_index=None, reply_index=None,
        envelope_descriptor=write_result.envelope_descriptor,
        message_ciphertext=write_result.message_ciphertext,
        envelope_hash=write_result.envelope_hash,
    )
    temp_index = write_result.next_message_box_index

# Send copy command
await client.start_resending_copy_command(temp.write_cap)

How to cancel in-flight operations

Both StartResendingEncryptedMessage and StartResendingCopyCommand block until completion. You can cancel them individually, or stop everything at once by closing the thin client.

To cancel a specific operation, call the corresponding cancel method from another thread/task:

// Cancel an encrypted message operation
err := client.CancelResendingEncryptedMessage(envelopeHash)

// Cancel a copy command (needs blake2b-256 hash of the write cap)
writeCapBytes, _ := tempWriteCap.MarshalBinary()
writeCapHash := blake2b.Sum256(writeCapBytes)
err = client.CancelResendingCopyCommand(&writeCapHash)
// Cancel an encrypted message operation
client.cancel_resending_encrypted_message(&envelope_hash).await?;

// Cancel a copy command
use blake2::{Blake2b, Digest};
use digest::consts::U32;
let write_cap_hash: [u8; 32] = Blake2b::<U32>::digest(&temp_write_cap).into();
client.cancel_resending_copy_command(&write_cap_hash).await?;
# Cancel an encrypted message operation
await client.cancel_resending_encrypted_message(envelope_hash)

# Cancel a copy command
from hashlib import blake2b
write_cap_hash = blake2b(temp_write_cap, digest_size=32).digest()
await client.cancel_resending_copy_command(write_cap_hash)

To stop all in-flight operations at once, call Close() (Go), stop() (Rust), or stop() (Python). This shuts down the thin client entirely – all blocked callers receive an error, and the daemon stops all ARQ retransmission loops for this client. This is useful when your application is shutting down or when you want to abandon all pending work without cancelling each operation individually.


How to handle daemon disconnects and restarts

The thin client automatically reconnects when the daemon connection is lost. It uses an instance token to detect whether it reconnected to the same daemon or a new one:

  • Same instance token: The daemon still has its state. No action needed.

  • Different instance token: The daemon is a new process. The thin client automatically replays all in-flight StartResendingEncryptedMessage and StartResendingCopyCommand operations. Callers blocked on these methods are unaware of the disconnect.

Applications do not need to manage reconnection or replay. You can observe disconnect events to log or update UI state:

eventCh := client.EventSink()
defer client.StopEventSink(eventCh)

for ev := range eventCh {
    switch v := ev.(type) {
    case *thin.DaemonDisconnectedEvent:
        if v.IsGraceful {
            fmt.Println("Daemon shut down gracefully")
        } else {
            fmt.Printf("Daemon connection lost: %v\n", v.Err)
        }
        // No action needed -- thin client reconnects automatically
        // and replays in-flight requests if the daemon instance changed.
    case *thin.ConnectionStatusEvent:
        fmt.Printf("Connected: %v\n", v.IsConnected)
        // v.InstanceToken identifies the daemon process.
        // The thin client compares this internally on reconnect.
    }
}
let config = Config::new("thinclient.toml")?;
// Set disconnect callback during config
config.on_daemon_disconnected = Some(Box::new(|graceful, err_msg| {
    if graceful {
        println!("Daemon shut down gracefully");
    } else {
        println!("Daemon connection lost: {:?}", err_msg);
    }
    // No action needed -- thin client reconnects automatically
    // and replays in-flight requests if the daemon instance changed.
}));
let client = ThinClient::new(config).await?;
async def on_daemon_disconnected(event):
    if event.get("is_graceful"):
        print("Daemon shut down gracefully")
    else:
        print(f"Daemon connection lost: {event.get('error')}")
    # No action needed -- thin client reconnects automatically
    # and replays in-flight requests if the daemon instance changed.

async def on_connection_status(event):
    print(f"Connected: {event['is_connected']}")
    # event contains 'instance_token' identifying the daemon process.
    # The thin client compares this internally on reconnect.

config = Config(
    "thinclient.toml",
    on_daemon_disconnected=on_daemon_disconnected,
    on_connection_status=on_connection_status,
)
client = ThinClient(config)

If the thin client is disconnected when you cancel an operation, the cancel just removes it from in-flight tracking – it will not be replayed on reconnect:

// Safe to call while disconnected -- removes from tracking,
// no message sent to daemon since there is no connection.
err := client.CancelResendingEncryptedMessage(envelopeHash)
err = client.CancelResendingCopyCommand(&writeCapHash)
// Safe to call while disconnected -- removes from tracking,
// no message sent to daemon since there is no connection.
client.cancel_resending_encrypted_message(&envelope_hash).await?;
client.cancel_resending_copy_command(&write_cap_hash).await?;
# Safe to call while disconnected -- removes from tracking,
# no message sent to daemon since there is no connection.
await client.cancel_resending_encrypted_message(envelope_hash)
await client.cancel_resending_copy_command(write_cap_hash)

To terminate the thin client entirely (all blocked callers receive an error, daemon disconnects never kill the thin client):

err := client.Close()
client.stop().await;
client.stop()