Generated API reference for the Katzenpost Python thin client (katzenpost_thinclient)

Python Thin Client API

This is the API reference for the katzenpost_thinclient Python package, the Python binding of the Katzenpost thin client. The thin client is an interface to the kpclientd daemon, which performs all cryptographic and network operations; the binding itself does no cryptography.

This page is generated by website/tools/python-api-gen/ from the docstrings of the pinned katzenpost_thinclient release, using the native Python documentation tool pydoc-markdown. Do not edit it directly: changes belong in the binding docstrings (in the thin_client repository) and will be overwritten by the next generation pass.

This page documents the 0.0.15 release of the Python binding (source, PyPI). Symbols are re-exported from katzenpost_thinclient, so application code may import them directly, for example from katzenpost_thinclient import ThinClient, Config.

For the curated cross-language reference covering the Go, Rust, and Python bindings side by side, see the Thin Client API Reference. For conceptual background see Understanding Pigeonhole, and for task-oriented guidance see the Thin Client How-to Guide.


katzenpost_thinclient.core

Katzenpost Python Thin Client - Core Module

This module provides the core functionality for the Katzenpost thin client, including the ThinClient class, configuration, and helper utilities.


thin_client_error_to_string

def thin_client_error_to_string(error_code: int) -> str

Convert a thin client error code to a human-readable string.


error_code_to_exception

def error_code_to_exception(error_code: int) -> Exception

Maps error codes to exception instances for StartResendingEncryptedMessage. This matches Go’s errorCodeToSentinel function in thin/pigeonhole.go.

The daemon passes through pigeonhole replica error codes (1-9) for replica-level errors. For other errors (thin client errors like decryption failures), specific exceptions are raised.


copy_reply_to_exception

def copy_reply_to_exception(reply: "Dict[str, Any]") -> "Exception | None"

Maps a StartResendingCopyCommandReply dict to an exception (or None on success).

Unlike error_code_to_exception(), this helper has access to the reply’s diagnostic fields (replica_error_code, failed_envelope_index), which it uses to construct a CopyCommandFailedError when the courier reports THIN_CLIENT_ERROR_COPY_COMMAND_FAILED.

Arguments:

  • reply - The decoded start_resending_copy_command_reply dict.

Returns:

None if error_code is 0 (success); otherwise an Exception instance.


is_expected_outcome

def is_expected_outcome(exc: Exception) -> bool

Returns True for exceptions that represent completed operations rather than failures. These errors should not trigger retries.


Geometry

class Geometry()

Geometry describes the geometry of a Sphinx packet.

NOTE: You must not try to compose a Sphinx Geometry yourself. It must be programmatically generated by Katzenpost genconfig or gensphinx CLI utilities.

We describe all the Sphinx Geometry attributes below, however the only one you are interested in to faciliate your thin client message bounds checking is UserForwardPayloadLength, which indicates the maximum sized message that you can send to a mixnet service in a single packet.

Attributes:

  • PacketLength int - The total length of a Sphinx packet in bytes.
  • NrHops int - The number of hops; determines the header’s structure.
  • HeaderLength int - The total size of the Sphinx header in bytes.
  • RoutingInfoLength int - The length of the routing information portion of the header.
  • PerHopRoutingInfoLength int - The length of routing info for a single hop.
  • SURBLength int - The length of a Single-Use Reply Block (SURB).
  • SphinxPlaintextHeaderLength int - The length of the unencrypted plaintext header.
  • PayloadTagLength int - The length of the tag used to authenticate the payload.
  • ForwardPayloadLength int - The size of the full payload including padding and tag.
  • UserForwardPayloadLength int - The usable portion of the payload intended for the recipient.
  • NextNodeHopLength int - Derived from the expected maximum routing info block size.
  • SPRPKeyMaterialLength int - The length of the key used for SPRP (Sphinx packet payload encryption).
  • NIKEName str - Name of the NIKE scheme (if used). Mutually exclusive with KEMName.
  • KEMName str - Name of the KEM scheme (if used). Mutually exclusive with NIKEName.

PigeonholeGeometry

class PigeonholeGeometry()

PigeonholeGeometry describes the geometry of a Pigeonhole envelope.

This provides mathematically precise geometry calculations for the Pigeonhole protocol using trunnel’s fixed binary format.

It supports 3 distinct use cases:

  1. Given MaxPlaintextPayloadLength → compute all envelope sizes
  2. Given precomputed Pigeonhole Geometry → derive accommodating Sphinx Geometry
  3. Given Sphinx Geometry constraint → derive optimal Pigeonhole Geometry

Attributes:

  • max_plaintext_payload_length int - The maximum usable plaintext payload size within a Box.
  • courier_query_read_length int - The size of a CourierQuery containing a ReplicaRead.
  • courier_query_write_length int - The size of a CourierQuery containing a ReplicaWrite.
  • courier_query_reply_read_length int - The size of a CourierQueryReply containing a ReplicaReadReply.
  • courier_query_reply_write_length int - The size of a CourierQueryReply containing a ReplicaWriteReply.
  • nike_name str - The NIKE scheme name used in MKEM for encrypting to multiple storage replicas.
  • signature_scheme_name str - The signature scheme used for BACAP (always “Ed25519”).

PigeonholeGeometry.validate

def validate() -> None

Validates that the geometry has valid parameters.

Raises:

  • ValueError - If the geometry is invalid.

PigeonholeGeometry.padded_payload_length

def padded_payload_length() -> int

Returns the payload size after adding length prefix.

Returns:

  • int - The padded payload length (max_plaintext_payload_length + 4).

ConfigFile

class ConfigFile()

ConfigFile represents everything loaded from a TOML file: only the subtable-discriminated Dial transport config. The geometries are supplied by the daemon over the handshake, not configured here.

ConfigFile.load

@classmethod
def load(cls, toml_path: str) -> "ConfigFile"

Parse a kpclientd-style thin-client TOML config.

Raises ConfigError eagerly on any structural problem: unknown top-level sections (a leftover [SphinxGeometry] or [PigeonholeGeometry] is now rejected here), missing required sections, wrong types, or unknown / missing keys within the Dial subtable. The intent is that a stale or drifted config fails here at startup rather than producing a mysterious runtime failure later.


pretty_print_obj

def pretty_print_obj(obj: "Any") -> str

Pretty-print a Python object using indentation and return the formatted string.

This function uses pprintpp to format complex data structures (e.g., dictionaries, lists) in a readable, indented format.

Arguments:

  • obj Any - The object to pretty-print.

Returns:

  • str - The pretty-printed representation of the object.

ServiceDescriptor

class ServiceDescriptor()

Describes a mixnet service endpoint retrieved from the PKI document.

A ServiceDescriptor encapsulates the necessary information for communicating with a service on the mix network. The service node’s identity public key’s hash is used as the destination address along with the service’s queue ID.

Attributes:

  • recipient_queue_id bytes - The identifier of the recipient’s queue on the mixnet. (“Kaetzchen.endpoint” in the PKI)
  • mix_descriptor dict - A CBOR-decoded dictionary describing the mix node, typically includes the ‘IdentityKey’ and other metadata.

Methods:

  • to_destination() - Returns a tuple of (provider_id_hash, recipient_queue_id), where the provider ID is a 32-byte BLAKE2b hash of the IdentityKey.

ServiceDescriptor.to_destination

def to_destination() -> "Tuple[bytes,bytes]"

provider identity key hash and queue id


find_services

def find_services(capability: str,
                  doc: "Dict[str,Any]") -> "List[ServiceDescriptor]"

Search the PKI document for services supporting the specified capability.

This function iterates over all service nodes in the PKI document, deserializes each CBOR-encoded node, and looks for advertised capabilities. If a service provides the requested capability, it is returned as a ServiceDescriptor.

Arguments:

  • capability str - The name of the capability to search for (e.g., “echo”).
  • doc dict - The decoded PKI document as a Python dictionary, which must include a “ServiceNodes” key containing CBOR-encoded descriptors.

Returns:

  • List[ServiceDescriptor] - A list of matching service descriptors that advertise the capability.

Raises:

  • KeyError - If the ‘ServiceNodes’ field is missing from the PKI document.

Config

class Config()

Configuration object for the ThinClient containing connection details and event callbacks.

The Config class loads network configuration from a TOML file and provides optional callback functions that are invoked when specific events occur during client operation.

Attributes:

  • network str - Network type (’tcp’, ‘unix’, etc.)
  • address str - Network address (host:port for TCP, path for Unix sockets)
  • geometry Geometry - Sphinx packet geometry parameters
  • on_connection_status callable - Callback for connection status changes
  • on_new_pki_document callable - Callback for new PKI documents
  • on_message_sent callable - Callback for message transmission confirmations
  • on_message_reply callable - Callback for received message replies

Example:

def handle_reply(event):
# Process the received reply
payload = event['payload']

config = Config("client.toml", on_message_reply=handle_reply)
client = ThinClient(config)

Config.__init__

def __init__(filepath: str,
             on_connection_status: "Callable|None" = None,
             on_new_pki_document: "Callable|None" = None,
             on_message_sent: "Callable|None" = None,
             on_message_reply: "Callable|None" = None,
             on_daemon_disconnected: "Callable|None" = None) -> None

Initialize the Config object.

Arguments:

  • filepath str - Path to the TOML config file containing network, address, and geometry.

  • on_connection_status callable, optional - Callback invoked when the daemon’s connection status to the mixnet changes. The callback receives a single argument:

    • event (dict): Connection status event with keys:
    • ‘is_connected’ (bool): True if daemon is connected to mixnet, False otherwise
    • ’err’ (str, optional): Error message if connection failed, empty string if no error
  • Example - {'is_connected': True, 'err': ''}

  • on_new_pki_document callable, optional - Callback invoked when a new PKI document is received from the mixnet. The callback receives a single argument:

    • event (dict): PKI document event with keys:
    • ‘payload’ (bytes): CBOR-encoded PKI document data stripped of signatures
  • Example - {'payload': b'\xa5\x64Epoch\x00...'}

  • on_message_sent callable, optional - Callback invoked when a message has been successfully transmitted to the mixnet. The callback receives a single argument:

    • event (dict): Message sent event with keys:
    • ‘message_id’ (bytes): 16-byte unique identifier for the sent message
    • ‘surbid’ (bytes, optional): SURB ID if message was sent with SURB, None otherwise
    • ‘sent_at’ (str): ISO timestamp when message was sent
    • ‘reply_eta’ (float): Expected round-trip time in seconds for reply
    • ’err’ (str, optional): Error message if sending failed, empty string if successful
  • Example - {'message_id': b'\x01\x02...', 'surbid': b'\xaa\xbb...', 'sent_at': '2024-01-01T12:00:00Z', 'reply_eta': 30.5, 'err': ''}

  • on_message_reply callable, optional - Callback invoked when a reply is received for a previously sent message. The callback receives a single argument:

    • event (dict): Message reply event with keys:
    • ‘message_id’ (bytes): 16-byte identifier matching the original message
    • ‘surbid’ (bytes, optional): SURB ID if reply used SURB, None otherwise
    • ‘payload’ (bytes): Reply payload data from the service
    • ‘reply_index’ (int, optional): Index of reply used (relevant for channel reads)
    • ’error_code’ (int): Error code indicating success (0) or specific failure condition
  • Example - {'message_id': b'\x01\x02...', 'surbid': b'\xaa\xbb...', 'payload': b'echo response', 'reply_index': 0, 'error_code': 0}

Notes:

All callbacks are optional. If not provided, the corresponding events will be ignored. Callbacks should be lightweight and non-blocking as they are called from the client’s event processing loop.


ThinClient

class ThinClient()

A minimal Katzenpost Python thin client for communicating with the local Katzenpost client daemon over a UNIX or TCP socket.

The thin client is responsible for:

  • Establishing a connection to the client daemon.
  • Receiving and parsing PKI documents.
  • Sending messages to mixnet services (with or without SURBs).
  • Handling replies and events via user-defined callbacks.

All cryptographic operations are handled by the daemon, not by this client.

ThinClient.__init__

def __init__(config: Config) -> None

Initialize the thin client with the given configuration.

Arguments:

  • config Config - The configuration object containing socket details and callbacks.

Raises:

  • RuntimeError - If the network type is not recognized or config is incomplete.

ThinClient.start

async def start(loop: asyncio.AbstractEventLoop) -> None

Start the thin client: establish connection to the daemon, read initial events, and begin the background event loop.

Arguments:

  • loop asyncio.AbstractEventLoop - The running asyncio event loop.

    Exceptions: BrokenPipeError

ThinClient.get_config

def get_config() -> Config

Returns the current configuration object.

Returns:

  • Config - The client configuration in use.

ThinClient.is_connected

def is_connected() -> bool

Returns True if the daemon is currently connected to the mixnet.

Note the distinction: this reflects the daemon’s mixnet connectivity, not the local socket between this thin client and the daemon. The daemon may be reachable while the mixnet itself is unreachable; in that case the local socket is fine but this method returns False, and send_message / blocking_send_message will raise ThinClientOfflineError. The latest value is updated from ConnectionStatusEvents pushed by the daemon.

Returns:

  • bool - True if the daemon is connected to the mixnet, False otherwise (offline mode).

ThinClient.stop

def stop() -> None

Gracefully shut down the client and close its socket. Sends a thin_close message to the daemon so it can clean up ARQ state for this connection before disconnecting.

ThinClient.disconnect

def disconnect() -> None

Close the connection without sending thin_close. The daemon preserves all state for this client’s app ID, allowing the client to reconnect and resume with the same session token.

ThinClient.recv

async def recv(loop: asyncio.AbstractEventLoop) -> "Dict[Any,Any]"

Receive a CBOR-encoded message from the daemon.

Arguments:

  • loop asyncio.AbstractEventLoop - Event loop to use for socket reads.

Returns:

  • dict - Decoded CBOR response from the daemon.

Raises:

  • BrokenPipeError - If connection fails
  • ValueError - If message framing fails.

ThinClient.worker_loop

async def worker_loop(loop: asyncio.events.AbstractEventLoop) -> None

Background task that listens for events and dispatches them. Survives daemon disconnects by automatically reconnecting with exponential backoff. Only stopping (from stop()) causes this task to exit.

ThinClient.parse_status

def parse_status(event: "Dict[str,Any]") -> None

Parse a connection status event and update connection state.

ThinClient.pki_document

def pki_document() -> "Dict[str,Any] | None"

Return the most recent PKI consensus document the daemon has forwarded to this thin client.

The document is a CBOR map describing the current mixnet topology, the set of available services, and per-node public-key material. Useful inputs include the PKI epoch, the list of mix nodes, the list of service providers, and the replica descriptors consulted by Pigeonhole.

Returns:

Dict[str, Any] | None: The parsed CBOR PKI document, or None if the daemon has not yet forwarded one (most commonly on a freshly-connected client, before the first on_new_pki_document callback has fired).

ThinClient.pki_document_for_epoch

def pki_document_for_epoch(epoch: int) -> "Dict[str,Any]"

Return the cached PKI document for a specific epoch.

Falls back to the current document if the requested epoch is not cached. Raises if no document is available at all.

Arguments:

  • epoch int - The epoch number.

Returns:

  • dict - Parsed PKI document for the given epoch.

Raises:

  • Exception - If no PKI document is available.

ThinClient.get_pki_document_raw

async def get_pki_document_raw(epoch: int = 0) -> "Tuple[bytes,int]"

Return the cert.Certificate-wrapped signed PKI document for the requested epoch, with every directory authority signature intact.

The thin client receives the stripped PKI document by default (via the on_new_pki_document callback, also available through :py:meth:pki_document and :py:meth:pki_document_for_epoch); the daemon nils the signature map before forwarding it. Use this method when the caller wishes to verify the directory authority signatures itself: the returned payload may be deserialized and verified with the katzenpost core/pki.FromPayload routine against the authorities listed in client.toml.

Arguments:

  • epoch int - Epoch for which the signed PKI document should be returned. Pass 0 (the default) to request the document the daemon believes is current.

Returns:

Tuple[bytes, int]: (payload, epoch) where payload is the cert.Certificate-wrapped signed PKI document and epoch is the epoch of the returned document. When 0 was passed in, epoch echoes the epoch the daemon resolved to.

Raises:

  • Exception - If the daemon has no cached document for the requested epoch, or any other error code is returned.

ThinClient.parse_pki_doc

def parse_pki_doc(event: "Dict[str,Any]") -> None

Parse and store a new PKI document received from the daemon.

ThinClient.get_services

def get_services(capability: str) -> "List[ServiceDescriptor]"

Look up all services in the PKI that advertise a given capability.

Arguments:

  • capability str - Capability name (e.g., “echo”).

Returns:

  • list[ServiceDescriptor] - Matching services.xsy

Raises:

  • Exception - If PKI is missing or no services match.

ThinClient.get_service

def get_service(service_name: str) -> ServiceDescriptor

Select one random service matching a capability from the current PKI document.

Multiple mix nodes may advertise the same capability; this method returns an arbitrary one. To see every advertised instance, use get_services.

Arguments:

  • service_name str - The capability name (e.g. "echo", "courier").

Returns:

  • ServiceDescriptor - One of the matching services.

Raises:

  • Exception - If the PKI document is missing, or no node in the current consensus advertises service_name.

ThinClient.get_all_couriers

def get_all_couriers() -> "List[Tuple[bytes, bytes]]"

Return every courier service advertised in the current PKI document, each described by an (identity_hash, queue_id) tuple. The list reflects only the couriers that the current consensus regards as serving.

The principal caller is the nested-copy-command machinery, which needs to choose particular couriers rather than accept the random draw made on the caller’s behalf by start_resending_copy_command; for simple cases where any courier will do, the default routing path is usually preferable.

Returns:

list[tuple[bytes, bytes]]: List of (identity_hash, queue_id) tuples.

Raises:

  • Exception - If no couriers are available.

ThinClient.get_distinct_couriers

def get_distinct_couriers(n: int) -> "List[Tuple[bytes, bytes]]"

Draw n couriers uniformly at random from the list returned by get_all_couriers, without replacement, so that no two entries in the returned list refer to the same courier. This is the usual building block for a nested copy command, every layer of which must be carried by a different courier.

Arguments:

  • n int - Number of distinct couriers to return.

Returns:

list[tuple[bytes, bytes]]: List of (identity_hash, queue_id) tuples.

Raises:

  • Exception - If the current PKI document advertises fewer than n couriers.

ThinClient.blocking_send_message

async def blocking_send_message(payload: bytes | str,
                                dest_node: bytes,
                                dest_queue: bytes,
                                timeout_seconds: float = 30.0) -> bytes

Send a message and block until a reply is received or timeout.

Arguments:

  • payload bytes or str - Message payload.
  • dest_node bytes - Destination node identity hash.
  • dest_queue bytes - Destination recipient queue ID.
  • timeout_seconds float - Timeout in seconds (default 30).

Returns:

  • bytes - Reply payload from the destination service.

Raises:

  • ThinClientOfflineError - If in offline mode.
  • asyncio.TimeoutError - If no reply within timeout.

ThinClient.new_message_id

@staticmethod
def new_message_id() -> bytes

Generate a new 16-byte random message ID.

Message IDs are used to correlate SendMessage requests with their corresponding MessageSentEvent and (if a SURB is present) MessageReplyEvent. Callers generally do not need to construct one by hand — blocking_send_message does it internally — but this helper is exposed for callers composing requests manually. Randomness is drawn from os.urandom.

Returns:

  • bytes - Random 16-byte identifier.

ThinClient.new_surb_id

def new_surb_id() -> bytes

Generate a new random SURB ID.

SURB IDs identify which Single Use Reply Block a given on_message_reply event corresponds to. Pass the returned bytes as the surb_id argument to send_message, then watch the callback for a matching reply. Randomness is drawn from os.urandom.

Returns:

  • bytes - Random identifier of SURB_ID_SIZE bytes.

ThinClient.new_query_id

def new_query_id() -> bytes

Generate a new 16-byte random query ID.

Query IDs correlate requests and replies within the thin client ↔ daemon CBOR protocol (distinct from mix-network SURB IDs, which identify replies within the mixnet itself). Most callers never touch query IDs directly; they are used internally by the Pigeonhole API helpers. Randomness is drawn from os.urandom.

Returns:

  • bytes - Random 16-byte identifier.

ThinClient.handle_response

async def handle_response(response: "Dict[str,Any]") -> None

Dispatch a parsed CBOR response to the appropriate handler or callback.

ThinClient.send_message_without_reply

async def send_message_without_reply(payload: bytes | str, dest_node: bytes,
                                     dest_queue: bytes) -> None

Send a fire-and-forget message with no SURB or reply handling. This method requires mixnet connectivity.

Arguments:

  • payload bytes or str - Message payload.
  • dest_node bytes - Destination node identity hash.
  • dest_queue bytes - Destination recipient queue ID.

Raises:

  • ThinClientOfflineError - If in offline mode (daemon not connected to mixnet).

ThinClient.send_message

async def send_message(surb_id: bytes, payload: bytes | str, dest_node: bytes,
                       dest_queue: bytes) -> None

Send a message using a SURB to allow the recipient to send a reply. This method requires mixnet connectivity.

Arguments:

  • surb_id bytes - SURB identifier for reply correlation.
  • payload bytes or str - Message payload.
  • dest_node bytes - Destination node identity hash.
  • dest_queue bytes - Destination recipient queue ID.

Raises:

  • ThinClientOfflineError - If in offline mode (daemon not connected to mixnet).

ThinClient.pretty_print_pki_doc

def pretty_print_pki_doc(doc: "Dict[str,Any]") -> None

Pretty-print a parsed PKI document with fully decoded CBOR nodes.

Arguments:

  • doc dict - Raw PKI document from the daemon.

katzenpost_thinclient.pigeonhole

Katzenpost Python Thin Client - New Pigeonhole API

This module provides the new capability-based Pigeonhole API methods. These methods use WriteCap/ReadCap keypairs and provide direct control over the Pigeonhole protocol.


KeypairResult

@dataclass
class KeypairResult()

Result from new_keypair containing the generated capabilities.


EncryptReadResult

@dataclass
class EncryptReadResult()

Result from encrypt_read containing the encrypted read request.


EncryptWriteResult

@dataclass
class EncryptWriteResult()

Result from encrypt_write containing the encrypted write request.


StartResendingResult

@dataclass
class StartResendingResult()

Result from start_resending_encrypted_message and its variants.

StartResendingResult.plaintext

Decrypted message for read operations, or empty bytes for writes.

StartResendingResult.courier_identity_hash

32-byte hash of the identity key of the courier that handled this message. Callers can watch PKI document updates for this courier disappearing from consensus and cancel+re-encrypt if needed.

StartResendingResult.courier_queue_id

Queue ID of the courier that handled this message.


new_keypair

async def new_keypair(self, seed: bytes) -> KeypairResult

Creates a new keypair for use with the Pigeonhole protocol.

This method generates a WriteCap and ReadCap from the provided seed using the BACAP (Blinding-and-Capability) protocol. The WriteCap should be stored securely for writing messages, while the ReadCap can be shared with others to allow them to read messages.

Arguments:

  • seed - 32-byte seed used to derive the keypair.

Returns:

  • KeypairResult - Contains write_cap, read_cap, and first_message_index.

Raises:

  • Exception - If the keypair creation fails.
  • ValueError - If seed is not exactly 32 bytes.

Example:

import os
seed = os.urandom(32)
result = await client.new_keypair(seed)
# Share result.read_cap with Bob so he can read messages
# Store result.write_cap for sending messages

encrypt_read

async def encrypt_read(self, read_cap: bytes,
                       message_box_index: bytes) -> EncryptReadResult

Encrypts a read operation for a given read capability.

This method prepares an encrypted read request that can be sent to the courier service to retrieve a message from a pigeonhole box. The returned ciphertext should be sent via start_resending_encrypted_message.

Arguments:

  • read_cap - Read capability that grants access to the channel.
  • message_box_index - Starting read position for the channel.

Returns:

  • EncryptReadResult - Contains message_ciphertext, envelope_descriptor, and envelope_hash.

Raises:

  • Exception - If the encryption fails.

Example:

result = await client.encrypt_read(read_cap, message_box_index)
# Send result.message_ciphertext via start_resending_encrypted_message

encrypt_write

async def encrypt_write(self, plaintext: bytes, write_cap: bytes,
                        message_box_index: bytes) -> EncryptWriteResult

Encrypts a write operation for a given write capability.

This method prepares an encrypted write request that can be sent to the courier service to store a message in a pigeonhole box. The returned ciphertext should be sent via start_resending_encrypted_message.

Plaintext Size Constraint: The plaintext must not exceed PigeonholeGeometry.max_plaintext_payload_length bytes. The daemon internally adds a 4-byte big-endian length prefix before padding and encryption, so the actual wire format is: [4-byte length][plaintext][zero padding].

If the plaintext exceeds the maximum size, the daemon will return ThinClientErrorInvalidRequest.

Arguments:

  • plaintext - The plaintext message to encrypt. Must be at most PigeonholeGeometry.max_plaintext_payload_length bytes.
  • write_cap - Write capability that grants access to the channel.
  • message_box_index - The message box index for this write operation.

Returns:

  • EncryptWriteResult - Contains message_ciphertext, envelope_descriptor, and envelope_hash.

Raises:

  • Exception - If the encryption fails (including if plaintext is too large).

Example:

plaintext = b"Hello, Bob!"
result = await client.encrypt_write(plaintext, write_cap, message_box_index)
# Send result.message_ciphertext via start_resending_encrypted_message

start_resending_encrypted_message

async def start_resending_encrypted_message(
        self,
        read_cap: "bytes|None",
        write_cap: "bytes|None",
        message_box_index: "bytes|None",
        reply_index: "int|None",
        envelope_descriptor: bytes,
        message_ciphertext: bytes,
        envelope_hash: bytes,
        no_retry_on_box_id_not_found: bool = False,
        no_idempotent_box_already_exists: bool = False
) -> StartResendingResult

Starts resending an encrypted message via ARQ.

This method initiates automatic repeat request (ARQ) for an encrypted message, which will be resent periodically until either:

  • A reply is received from the courier
  • The message is cancelled via cancel_resending_encrypted_message
  • The client is shut down

This is used for both read and write operations in the new Pigeonhole API.

The daemon implements a finite state machine (FSM) for handling the stop-and-wait ARQ protocol:

  • For default write operations (write_cap != None, read_cap == None, no_idempotent_box_already_exists == False): The method waits for an ACK from the courier and returns immediately. The ACK confirms the courier received the envelope and will dispatch it to both shard replicas. This requires only a single round-trip through the mixnet.
  • For BoxAlreadyExists-aware writes (no_idempotent_box_already_exists == True): The method waits for an ACK, then sends a second SURB to retrieve the replica’s error code. This requires two round-trips through the mixnet.
  • For read operations (read_cap != None, write_cap == None): The method waits for an ACK from the courier, then the daemon automatically sends a new SURB to request the payload, and this method waits for the payload. The daemon performs all decryption (MKEM envelope + BACAP payload) and returns the fully decrypted plaintext.

Arguments:

  • read_cap - Read capability (can be None for write operations, required for reads).
  • write_cap - Write capability (can be None for read operations, required for writes).
  • message_box_index - Current message box index being operated on (required for reads).
  • reply_index - Index of the reply to use (typically 0 or 1).
  • envelope_descriptor - Serialized envelope descriptor for MKEM decryption.
  • message_ciphertext - MKEM-encrypted message to send (from encrypt_read or encrypt_write).
  • envelope_hash - Hash of the courier envelope.
  • no_retry_on_box_id_not_found - If True, BoxIDNotFound errors on reads trigger immediate error instead of automatic retries. By default (False), reads retry on BoxIDNotFound until the box is found or the operation is cancelled, riding out replication lag; the retries are not capped. Set to True to get an immediate BoxIDNotFound error without retries.
  • no_idempotent_box_already_exists - If True, BoxAlreadyExists errors on writes are returned as errors instead of being treated as idempotent success. By default (False), BoxAlreadyExists is treated as success (the write already happened). Set to True to detect whether a write was actually performed or if the box already existed.

Returns:

  • StartResendingResult - Contains plaintext (decrypted message for reads, empty for writes), courier_identity_hash, and courier_queue_id.

Raises:

  • BoxIDNotFoundError - If no_retry_on_box_id_not_found=True and the box does not exist.
  • BoxAlreadyExistsError - If no_idempotent_box_already_exists=True and the box already contains data.
  • Exception - If the operation fails. Check error_code for specific errors.

Example:

result = await client.start_resending_encrypted_message(
read_cap, None, message_box_index, reply_idx, env_desc, ciphertext, env_hash)
print(f"Received: {result.plaintext}")

start_resending_encrypted_message_return_box_exists

async def start_resending_encrypted_message_return_box_exists(
        self, read_cap: "bytes|None", write_cap: "bytes|None",
        message_box_index: "bytes|None", reply_index: "int|None",
        envelope_descriptor: bytes, message_ciphertext: bytes,
        envelope_hash: bytes) -> StartResendingResult

Behaves exactly like start_resending_encrypted_message save that it raises BoxAlreadyExistsError when the replica reports the destination box has already been written, rather than swallowing the condition as idempotent success. Use this when one needs to distinguish a fresh write from a repeat: for instance, when implementing optimistic concurrency on top of the channel, or when establishing whether a particular call actually caused a state change at the replica.

Note that this variant costs an additional mixnet round trip: the BoxAlreadyExists code is carried by the replica’s reply rather than the courier’s ACK, so the daemon must dispatch a second SURB before it can return the answer.

As with start_resending_encrypted_message, an in-flight call can be cancelled from another task via cancel_resending_encrypted_message.

Arguments:

  • read_cap - Read capability (can be None for write operations, required for reads).
  • write_cap - Write capability (can be None for read operations, required for writes).
  • message_box_index - Current message box index being operated on (required for reads).
  • reply_index - Index of the reply to use (typically 0 or 1).
  • envelope_descriptor - Serialized envelope descriptor for MKEM decryption.
  • message_ciphertext - MKEM-encrypted message to send (from encrypt_read or encrypt_write).
  • envelope_hash - Hash of the courier envelope.

Returns:

  • StartResendingResult - Contains plaintext, courier_identity_hash, and courier_queue_id.

Raises:

  • BoxAlreadyExistsError - If the box already contains data.
  • Exception - If the operation fails.

Example:

try:
await client.start_resending_encrypted_message_return_box_exists(
None, write_cap, None, None, env_desc, ciphertext, env_hash)
except BoxAlreadyExistsError:
print("Box already has data; write was idempotent")

start_resending_encrypted_message_no_retry

async def start_resending_encrypted_message_no_retry(
        self, read_cap: "bytes|None", write_cap: "bytes|None",
        message_box_index: "bytes|None", reply_index: "int|None",
        envelope_descriptor: bytes, message_ciphertext: bytes,
        envelope_hash: bytes) -> StartResendingResult

Behaves exactly like start_resending_encrypted_message save that it disables the daemon’s automatic retry of BoxIDNotFoundError. The caller learns at once that the box is absent rather than waiting for replication to settle.

Use this when polling a box that may not yet have been written: for instance, when a reader peeks ahead at a peer’s next message before that peer has produced it. The regular variant would block until the box appeared, which can be many round trips.

As with start_resending_encrypted_message, an in-flight call can be cancelled from another task via cancel_resending_encrypted_message.

Arguments:

  • read_cap - Read capability (can be None for write operations, required for reads).
  • write_cap - Write capability (can be None for read operations, required for writes).
  • message_box_index - Current message box index being operated on (required for reads).
  • reply_index - Index of the reply to use (typically 0 or 1).
  • envelope_descriptor - Serialized envelope descriptor for MKEM decryption.
  • message_ciphertext - MKEM-encrypted message to send (from encrypt_read or encrypt_write).
  • envelope_hash - Hash of the courier envelope.

Returns:

  • StartResendingResult - Contains plaintext, courier_identity_hash, and courier_queue_id.

Raises:

  • BoxIDNotFoundError - If the box does not exist (no automatic retries).
  • Exception - If the operation fails.

Example:

try:
result = await client.start_resending_encrypted_message_no_retry(
read_cap, None, message_box_index, reply_idx, env_desc, ciphertext, env_hash)
except BoxIDNotFoundError:
print("Box not found; message not yet written")

cancel_resending_encrypted_message

async def cancel_resending_encrypted_message(self,
                                             envelope_hash: bytes) -> None

Cancels ARQ resending for an encrypted message.

This method stops the automatic repeat request (ARQ) for a previously started encrypted message transmission. This is useful when:

  • A reply has been received through another channel
  • The operation should be aborted
  • The message is no longer needed

Arguments:

  • envelope_hash - Hash of the courier envelope to cancel.

Raises:

  • Exception - If the cancellation fails.

Example:

await client.cancel_resending_encrypted_message(env_hash)

next_message_box_index

async def next_message_box_index(self, message_box_index: bytes) -> bytes

Increments a MessageBoxIndex using the BACAP NextIndex method.

This method is used when sending multiple messages to different mailboxes using the same WriteCap or ReadCap. It properly advances the cryptographic state by:

  • Incrementing the Idx64 counter
  • Deriving new encryption and blinding keys using HKDF
  • Updating the HKDF state for the next iteration

The daemon handles the cryptographic operations internally, ensuring correct BACAP protocol implementation.

Arguments:

  • message_box_index - Current message box index to increment (as bytes).

Returns:

  • bytes - The next message box index.

Raises:

  • Exception - If the increment operation fails.

Example:

current_index = first_message_index
next_index = await client.next_message_box_index(current_index)
# Use next_index for the next message

get_message_box_index_counter

async def get_message_box_index_counter(self, message_box_index: bytes) -> int

Return the BACAP Idx64 counter embedded in a MessageBoxIndex.

Callers that persist MessageBoxIndex blobs across sessions can use this to order or compare two indexes — e.g. to detect a duplicate ACK that would otherwise regress a write-cap’s index — without having to peek at the binary layout themselves. The layout (first 8 bytes little-endian) is a BACAP implementation detail and must not be relied on outside the daemon.

Arguments:

  • message_box_index - MessageBoxIndex blob (as bytes) whose counter should be returned.

Returns:

  • int - The BACAP Idx64 value.

Raises:

  • Exception - If the daemon rejects the request.

Example:

current_idx = await client.get_message_box_index_counter(mbi_a)
next_idx = await client.get_message_box_index_counter(mbi_b)
if next_idx <= current_idx:
print("skipping stale ACK")

start_resending_copy_command

async def start_resending_copy_command(
        self,
        write_cap: bytes,
        courier_identity_hash: "bytes|None" = None,
        courier_queue_id: "bytes|None" = None) -> None

Starts resending a copy command to a courier via ARQ.

This method instructs a courier to read data from a temporary channel (identified by the write_cap) and write it to the destination channel. The command is automatically retransmitted until acknowledged.

If courier_identity_hash and courier_queue_id are both provided, the copy command is sent to that specific courier. Otherwise, a random courier is selected.

Arguments:

  • write_cap - Write capability for the temporary channel containing the data.
  • courier_identity_hash - Optional identity hash of a specific courier to use.
  • courier_queue_id - Optional queue ID for the specified courier. Must be set if courier_identity_hash is set.

Raises:

  • Exception - If the operation fails.

Example:

# Send copy command to a random courier
await client.start_resending_copy_command(temp_write_cap)
# Send copy command to a specific courier
await client.start_resending_copy_command(
temp_write_cap, courier_identity_hash, courier_queue_id)

cancel_resending_copy_command

async def cancel_resending_copy_command(self, write_cap_hash: bytes) -> None

Cancels ARQ resending for a copy command.

This method stops the automatic repeat request (ARQ) for a previously started copy command. Use this when:

  • The copy operation should be aborted
  • The operation is no longer needed
  • You want to clean up pending ARQ operations

Arguments:

  • write_cap_hash - Hash of the WriteCap used in start_resending_copy_command.

Raises:

  • Exception - If the cancellation fails.

Example:

await client.cancel_resending_copy_command(write_cap_hash)

create_courier_envelopes_from_payload

async def create_courier_envelopes_from_payload(
        self, payload: bytes, dest_write_cap: bytes, dest_start_index: bytes,
        is_start: bool, is_last: bool) -> "CreateEnvelopesResult"

Packs a payload of arbitrary size (up to 10 MB) into properly sized CopyStreamElement chunks for one destination channel. Each chunk is a serialised CopyStreamElement, ready to be written to a box via encrypt_write followed by start_resending_encrypted_message; the caller marks the boundaries of the stream with the is_start and is_last flags.

This method is stateless: no daemon state is kept between calls, each invocation runs a fresh encoder and flushes before returning. The 10 MB cap guards against accidental memory exhaustion.

Once the chunks have been written to a temporary copy stream, a copy command (start_resending_copy_command) is dispatched to a courier with the write capability for that temporary stream; the courier reads the chunks back and writes each envelope to its destination box.

Multiple calls can target the same destination stream by passing next_dest_index from the previous result as dest_start_index.

Arguments:

  • payload - The data to be encoded into courier envelopes (max 10MB).
  • dest_write_cap - Write capability for the destination channel.
  • dest_start_index - Starting index in the destination channel.
  • is_start - Whether this is the first call (sets IsStart flag on first element).
  • is_last - Whether this is the last call (sets IsFinal flag on last element).

Returns:

  • CreateEnvelopesResult - Contains envelopes and next_dest_index.

Raises:

  • Exception - If the envelope creation fails.

create_courier_envelopes_from_multi_payload

async def create_courier_envelopes_from_multi_payload(
        self,
        destinations: "List[Dict[str, Any]]",
        is_start: bool,
        is_last: bool,
        buffer: "bytes | None" = None) -> "CreateEnvelopesResult"

Packs payloads bound for several destination channels into a single stream of CopyStreamElement chunks. This is more space-efficient than calling create_courier_envelopes_from_payload once per destination, because the shared encoder runs all envelopes together rather than padding the final box of each destination independently.

This method is stateless: the buffer argument carries any residual encoder state across calls in place of daemon-side bookkeeping. Pass None for buffer on the first call and the buffer returned by the previous call thereafter; set is_last on the final call so the encoder flushes its tail.

Arguments:

  • destinations - List of destination payloads, each a dict with:
    • “payload”: bytes - The data to be written
    • “write_cap”: bytes - Write capability for destination
    • “start_index”: bytes - Starting index in destination
  • is_start - Whether this is the first call in the sequence. When True, the first CopyStreamElement will have IsStart=true.
  • is_last - Whether this is the last set of payloads in the sequence. When True, the final CopyStreamElement will have IsFinal=true.
  • buffer - Residual encoder buffer from a previous call, or None.

Returns:

  • CreateEnvelopesResult - Contains envelopes and buffer for next call.

Raises:

  • Exception - If the envelope creation fails.

Example:

destinations = [
{"payload": data1, "write_cap": cap1, "start_index": idx1},
{"payload": data2, "write_cap": cap2, "start_index": idx2},
]
result = await client.create_courier_envelopes_from_multi_payload(
destinations, is_start=True, is_last=False)
# Pass buffer to next call
result2 = await client.create_courier_envelopes_from_multi_payload(
more_destinations, is_start=False, is_last=True, buffer=result.buffer)

CreateEnvelopesResult

@dataclass
class CreateEnvelopesResult()

Result of creating courier envelopes.

CreateEnvelopesResult.envelopes

The serialized CopyStreamElements to send to the network.

CreateEnvelopesResult.buffer

The buffered data that hasn’t been output yet. Persist this for crash recovery. Only populated by create_courier_envelopes_from_multi_payload.

CreateEnvelopesResult.next_dest_index

The next destination message box index after all boxes consumed by this call. Only populated by create_courier_envelopes_from_payload.

CreateEnvelopesResult.next_dest_indices

The next destination indices for each destination, in request order. Only populated by create_courier_envelopes_from_multi_payload.


TombstoneEnvelope

@dataclass
class TombstoneEnvelope()

A single tombstone envelope ready to be sent.


TombstoneRangeResult

@dataclass
class TombstoneRangeResult()

Result of a tombstone_range operation.


tombstone_range

async def tombstone_range(self, write_cap: bytes, start: bytes,
                          max_count: int) -> TombstoneRangeResult

Prepares the encrypted envelopes needed to tombstone a consecutive range of pigeonhole boxes beginning at the supplied MessageBoxIndex. A tombstone is a signed empty payload that the replica recognises as a deletion marker; the daemon constructs one by signing rather than encrypting whenever encrypt_write is invoked with an empty plaintext.

This method does not itself touch the network: it returns the envelopes for the caller to dispatch one by one, typically via start_resending_encrypted_message. To tombstone a single box, pass max_count=1.

Arguments:

  • write_cap - Write capability for the boxes.
  • start - Starting MessageBoxIndex.
  • max_count - Maximum number of boxes to tombstone.

Returns:

  • TombstoneRangeResult - Contains envelopes (list of TombstoneEnvelope) and next (the next MessageBoxIndex after the last processed).

Raises:

  • ValueError - If write_cap or start is None.

Example:

result = await client.tombstone_range(write_cap, start_index, 10)
for envelope in result.envelopes:
await client.start_resending_encrypted_message(
None, write_cap, None, None,
envelope.envelope_descriptor,
envelope.message_ciphertext,
envelope.envelope_hash)

create_courier_envelopes_from_tombstone_range

async def create_courier_envelopes_from_tombstone_range(
        self,
        dest_write_cap: bytes,
        dest_start_index: bytes,
        max_count: int,
        is_start: bool,
        is_last: bool,
        buffer: "bytes | None" = None) -> "CreateEnvelopesResult"

Packs tombstones for a consecutive range of destination boxes into CopyStreamElement chunks. The chunks are written to a temporary copy stream and then dispatched as a copy command; the courier applies all the tombstones atomically, which is the natural way to retire a range of boxes as part of the same copy transaction that writes their successors.

This method is stateless: the buffer argument carries any residual encoder state across calls in place of daemon-side bookkeeping. Pass None for buffer on the first call and the buffer returned by the previous call thereafter; set is_last on the final call so the encoder flushes its tail.

Arguments:

  • dest_write_cap - Write capability for the destination channel.
  • dest_start_index - Starting index in the destination channel.
  • max_count - Number of tombstones to create.
  • is_start - Whether this is the first call in the sequence.
  • is_last - Whether this is the last call in the sequence.
  • buffer - Residual encoder buffer from a previous call, or None.

Returns:

  • CreateEnvelopesResult - Contains envelopes, buffer, and next_dest_index.

Raises:

  • Exception - If the operation fails.

Example:

result = await client.create_courier_envelopes_from_tombstone_range(
write_cap, start_index, 10, is_start=True, is_last=True)
for envelope in result.envelopes:
# write envelope to temp copy stream channel
pass

katzenpost_thinclient.transport.tcp

TCP transport for the thin-client.


TcpDialConfig

@dataclass
class TcpDialConfig()

Configures a TCP dialer.

address is in host:port form, e.g. “localhost:64331” or “[::1]:64331”. network is one of “tcp”, “tcp4”, “tcp6”; defaults to “tcp”.


katzenpost_thinclient.transport

Transport abstraction for the Python thin-client.

Each concrete transport (unix, tcp; in future ssh / pipe / pigeonhole) exposes a setup_socket() method that returns a ready-to-connect socket and the server address in the form expected by asyncio’s loop.sock_connect.

DialConfig is a discriminated-union container: exactly one of its inner variants must be populated. Zero or multiple populated variants is a configuration error.


DialConfig

@dataclass
class DialConfig()

Discriminated-union of dial transports. Exactly one subtable must be populated.

DialConfig.resolve

def resolve() -> Any

Return the single populated transport variant.

DialConfig.from_toml_dict

@classmethod
def from_toml_dict(cls, data: dict) -> "DialConfig"

Parse a TOML [Dial] subtable (dict) into a DialConfig.

Rejects unknown subtables (typos, removed variants, future names) and unknown keys inside a recognised subtable. Exactly one of [Dial.Unix] / [Dial.Tcp] must be populated.


katzenpost_thinclient.transport.unix

Unix-domain-socket transport for the thin-client.


UnixDialConfig

@dataclass
class UnixDialConfig()

Configures a unix-domain-socket dialer.


Exceptions

Error types raised by the thin client; each derives from the standard library Exception.

ConfigError

class ConfigError(Exception)

Raised when the thin-client TOML config is missing required sections, contains unknown keys, or otherwise fails structural validation.

Every caller of ConfigFile.load / Config(…) should expect this exception. It is raised eagerly at startup so that a stale or drifted config produces a loud, early failure instead of surfacing later as a mysterious runtime error during mixnet operations.

ReplicaError

class ReplicaError(Exception)

Base class for all replica errors.

BoxIDNotFoundError

class BoxIDNotFoundError(ReplicaError)

Box ID not found on the replica. Occurs when reading from a non-existent mailbox.

InvalidBoxIDError

class InvalidBoxIDError(ReplicaError)

Invalid box ID format.

InvalidSignatureError

class InvalidSignatureError(ReplicaError)

Signature verification failed.

DatabaseFailureError

class DatabaseFailureError(ReplicaError)

Replica encountered a database error.

InvalidPayloadError

class InvalidPayloadError(ReplicaError)

Payload data is invalid.

StorageFullError

class StorageFullError(ReplicaError)

Replica’s storage capacity has been exceeded.

ReplicaInternalError

class ReplicaInternalError(ReplicaError)

Internal error on the replica.

InvalidEpochError

class InvalidEpochError(ReplicaError)

Epoch is invalid or expired.

ReplicationFailedError

class ReplicationFailedError(ReplicaError)

Replication to other replicas failed.

BoxAlreadyExistsError

class BoxAlreadyExistsError(ReplicaError)

Box already contains data. Pigeonhole writes are immutable.

TombstoneError

class TombstoneError(ReplicaError)

Box contains a tombstone (intentional deletion). This is not a failure.

InvalidTombstoneSignatureError

class InvalidTombstoneSignatureError(Exception)

Tombstone signature verification failed (forgery or corruption).

MKEMDecryptionFailedError

class MKEMDecryptionFailedError(Exception)

MKEM envelope decryption failed with all replica keys.

BACAPDecryptionFailedError

class BACAPDecryptionFailedError(Exception)

BACAP payload decryption or signature verification failed.

StartResendingCancelledError

class StartResendingCancelledError(Exception)

StartResendingEncryptedMessage operation was cancelled.

CopyCommandFailedError

class CopyCommandFailedError(Exception)

StartResendingCopyCommand operation failed on the courier.

The courier aborted the Copy command because a replica rejected one of the embedded writes. Inspect the diagnostic attributes to determine the cause:

Attributes:

  • replica_error_code int - The pigeonhole replica ErrorCode that triggered the abort (e.g. REPLICA_ERROR_BOX_ALREADY_EXISTS). 0 if not reported.
  • failed_envelope_index int - 1-based sequential position in the copy stream of the envelope whose write triggered the abort. 0 if not applicable. This is NOT a BACAP message index.