Python Thin Client API
This is the API reference for the katzenpost_thinclient Python
package, the Python binding of the Katzenpost thin client. The thin
client is an interface to the kpclientd daemon, which performs all
cryptographic and network operations; the binding itself does no
cryptography.
This page is generated by website/tools/python-api-gen/ from the
docstrings of the pinned katzenpost_thinclient release, using the
native Python documentation tool pydoc-markdown.
Do not edit it directly: changes belong in the binding docstrings (in
the thin_client repository) and will be overwritten by the next
generation pass.
This page documents the 0.0.15 release of the Python
binding (source,
PyPI). Symbols are
re-exported from katzenpost_thinclient, so application code may
import them directly, for example from katzenpost_thinclient import ThinClient, Config.
For the curated cross-language reference covering the Go, Rust, and Python bindings side by side, see the Thin Client API Reference. For conceptual background see Understanding Pigeonhole, and for task-oriented guidance see the Thin Client How-to Guide.
katzenpost_thinclient.core
Katzenpost Python Thin Client - Core Module
This module provides the core functionality for the Katzenpost thin client, including the ThinClient class, configuration, and helper utilities.
thin_client_error_to_string
def thin_client_error_to_string(error_code: int) -> str
Convert a thin client error code to a human-readable string.
error_code_to_exception
def error_code_to_exception(error_code: int) -> Exception
Maps error codes to exception instances for StartResendingEncryptedMessage. This matches Go’s errorCodeToSentinel function in thin/pigeonhole.go.
The daemon passes through pigeonhole replica error codes (1-9) for replica-level errors. For other errors (thin client errors like decryption failures), specific exceptions are raised.
copy_reply_to_exception
def copy_reply_to_exception(reply: "Dict[str, Any]") -> "Exception | None"
Maps a StartResendingCopyCommandReply dict to an exception (or None on success).
Unlike error_code_to_exception(), this helper has access to the reply’s diagnostic fields (replica_error_code, failed_envelope_index), which it uses to construct a CopyCommandFailedError when the courier reports THIN_CLIENT_ERROR_COPY_COMMAND_FAILED.
Arguments:
reply- The decoded start_resending_copy_command_reply dict.
Returns:
None if error_code is 0 (success); otherwise an Exception instance.
is_expected_outcome
def is_expected_outcome(exc: Exception) -> bool
Returns True for exceptions that represent completed operations rather than failures. These errors should not trigger retries.
Geometry
class Geometry()
Geometry describes the geometry of a Sphinx packet.
NOTE: You must not try to compose a Sphinx Geometry yourself. It must be programmatically generated by Katzenpost genconfig or gensphinx CLI utilities.
We describe all the Sphinx Geometry attributes below, however the only one you are interested in to faciliate your thin client message bounds checking is UserForwardPayloadLength, which indicates the maximum sized message that you can send to a mixnet service in a single packet.
Attributes:
PacketLengthint - The total length of a Sphinx packet in bytes.NrHopsint - The number of hops; determines the header’s structure.HeaderLengthint - The total size of the Sphinx header in bytes.RoutingInfoLengthint - The length of the routing information portion of the header.PerHopRoutingInfoLengthint - The length of routing info for a single hop.SURBLengthint - The length of a Single-Use Reply Block (SURB).SphinxPlaintextHeaderLengthint - The length of the unencrypted plaintext header.PayloadTagLengthint - The length of the tag used to authenticate the payload.ForwardPayloadLengthint - The size of the full payload including padding and tag.UserForwardPayloadLengthint - The usable portion of the payload intended for the recipient.NextNodeHopLengthint - Derived from the expected maximum routing info block size.SPRPKeyMaterialLengthint - The length of the key used for SPRP (Sphinx packet payload encryption).NIKENamestr - Name of the NIKE scheme (if used). Mutually exclusive with KEMName.KEMNamestr - Name of the KEM scheme (if used). Mutually exclusive with NIKEName.
PigeonholeGeometry
class PigeonholeGeometry()
PigeonholeGeometry describes the geometry of a Pigeonhole envelope.
This provides mathematically precise geometry calculations for the Pigeonhole protocol using trunnel’s fixed binary format.
It supports 3 distinct use cases:
- Given MaxPlaintextPayloadLength → compute all envelope sizes
- Given precomputed Pigeonhole Geometry → derive accommodating Sphinx Geometry
- Given Sphinx Geometry constraint → derive optimal Pigeonhole Geometry
Attributes:
max_plaintext_payload_lengthint - The maximum usable plaintext payload size within a Box.courier_query_read_lengthint - The size of a CourierQuery containing a ReplicaRead.courier_query_write_lengthint - The size of a CourierQuery containing a ReplicaWrite.courier_query_reply_read_lengthint - The size of a CourierQueryReply containing a ReplicaReadReply.courier_query_reply_write_lengthint - The size of a CourierQueryReply containing a ReplicaWriteReply.nike_namestr - The NIKE scheme name used in MKEM for encrypting to multiple storage replicas.signature_scheme_namestr - The signature scheme used for BACAP (always “Ed25519”).
PigeonholeGeometry.validate
def validate() -> None
Validates that the geometry has valid parameters.
Raises:
ValueError- If the geometry is invalid.
PigeonholeGeometry.padded_payload_length
def padded_payload_length() -> int
Returns the payload size after adding length prefix.
Returns:
int- The padded payload length (max_plaintext_payload_length + 4).
ConfigFile
class ConfigFile()
ConfigFile represents everything loaded from a TOML file: only the subtable-discriminated Dial transport config. The geometries are supplied by the daemon over the handshake, not configured here.
ConfigFile.load
@classmethod
def load(cls, toml_path: str) -> "ConfigFile"
Parse a kpclientd-style thin-client TOML config.
Raises ConfigError eagerly on any structural problem: unknown top-level sections (a leftover [SphinxGeometry] or [PigeonholeGeometry] is now rejected here), missing required sections, wrong types, or unknown / missing keys within the Dial subtable. The intent is that a stale or drifted config fails here at startup rather than producing a mysterious runtime failure later.
pretty_print_obj
def pretty_print_obj(obj: "Any") -> str
Pretty-print a Python object using indentation and return the formatted string.
This function uses pprintpp to format complex data structures
(e.g., dictionaries, lists) in a readable, indented format.
Arguments:
objAny - The object to pretty-print.
Returns:
str- The pretty-printed representation of the object.
ServiceDescriptor
class ServiceDescriptor()
Describes a mixnet service endpoint retrieved from the PKI document.
A ServiceDescriptor encapsulates the necessary information for communicating with a service on the mix network. The service node’s identity public key’s hash is used as the destination address along with the service’s queue ID.
Attributes:
recipient_queue_idbytes - The identifier of the recipient’s queue on the mixnet. (“Kaetzchen.endpoint” in the PKI)mix_descriptordict - A CBOR-decoded dictionary describing the mix node, typically includes the ‘IdentityKey’ and other metadata.
Methods:
to_destination()- Returns a tuple of (provider_id_hash, recipient_queue_id), where the provider ID is a 32-byte BLAKE2b hash of the IdentityKey.
ServiceDescriptor.to_destination
def to_destination() -> "Tuple[bytes,bytes]"
provider identity key hash and queue id
find_services
def find_services(capability: str,
doc: "Dict[str,Any]") -> "List[ServiceDescriptor]"
Search the PKI document for services supporting the specified capability.
This function iterates over all service nodes in the PKI document,
deserializes each CBOR-encoded node, and looks for advertised capabilities.
If a service provides the requested capability, it is returned as a
ServiceDescriptor.
Arguments:
capabilitystr - The name of the capability to search for (e.g., “echo”).docdict - The decoded PKI document as a Python dictionary, which must include a “ServiceNodes” key containing CBOR-encoded descriptors.
Returns:
List[ServiceDescriptor]- A list of matching service descriptors that advertise the capability.
Raises:
KeyError- If the ‘ServiceNodes’ field is missing from the PKI document.
Config
class Config()
Configuration object for the ThinClient containing connection details and event callbacks.
The Config class loads network configuration from a TOML file and provides optional callback functions that are invoked when specific events occur during client operation.
Attributes:
networkstr - Network type (’tcp’, ‘unix’, etc.)addressstr - Network address (host:port for TCP, path for Unix sockets)geometryGeometry - Sphinx packet geometry parameterson_connection_statuscallable - Callback for connection status changeson_new_pki_documentcallable - Callback for new PKI documentson_message_sentcallable - Callback for message transmission confirmationson_message_replycallable - Callback for received message replies
Example:
def handle_reply(event):
# Process the received reply
payload = event['payload']
config = Config("client.toml", on_message_reply=handle_reply)
client = ThinClient(config)
Config.__init__
def __init__(filepath: str,
on_connection_status: "Callable|None" = None,
on_new_pki_document: "Callable|None" = None,
on_message_sent: "Callable|None" = None,
on_message_reply: "Callable|None" = None,
on_daemon_disconnected: "Callable|None" = None) -> None
Initialize the Config object.
Arguments:
-
filepathstr - Path to the TOML config file containing network, address, and geometry. -
on_connection_statuscallable, optional - Callback invoked when the daemon’s connection status to the mixnet changes. The callback receives a single argument:- event (dict): Connection status event with keys:
- ‘is_connected’ (bool): True if daemon is connected to mixnet, False otherwise
- ’err’ (str, optional): Error message if connection failed, empty string if no error
-
Example-{'is_connected': True, 'err': ''} -
on_new_pki_documentcallable, optional - Callback invoked when a new PKI document is received from the mixnet. The callback receives a single argument:- event (dict): PKI document event with keys:
- ‘payload’ (bytes): CBOR-encoded PKI document data stripped of signatures
-
Example-{'payload': b'\xa5\x64Epoch\x00...'} -
on_message_sentcallable, optional - Callback invoked when a message has been successfully transmitted to the mixnet. The callback receives a single argument:- event (dict): Message sent event with keys:
- ‘message_id’ (bytes): 16-byte unique identifier for the sent message
- ‘surbid’ (bytes, optional): SURB ID if message was sent with SURB, None otherwise
- ‘sent_at’ (str): ISO timestamp when message was sent
- ‘reply_eta’ (float): Expected round-trip time in seconds for reply
- ’err’ (str, optional): Error message if sending failed, empty string if successful
-
Example-{'message_id': b'\x01\x02...', 'surbid': b'\xaa\xbb...', 'sent_at': '2024-01-01T12:00:00Z', 'reply_eta': 30.5, 'err': ''} -
on_message_replycallable, optional - Callback invoked when a reply is received for a previously sent message. The callback receives a single argument:- event (dict): Message reply event with keys:
- ‘message_id’ (bytes): 16-byte identifier matching the original message
- ‘surbid’ (bytes, optional): SURB ID if reply used SURB, None otherwise
- ‘payload’ (bytes): Reply payload data from the service
- ‘reply_index’ (int, optional): Index of reply used (relevant for channel reads)
- ’error_code’ (int): Error code indicating success (0) or specific failure condition
-
Example-{'message_id': b'\x01\x02...', 'surbid': b'\xaa\xbb...', 'payload': b'echo response', 'reply_index': 0, 'error_code': 0}
Notes:
All callbacks are optional. If not provided, the corresponding events will be ignored. Callbacks should be lightweight and non-blocking as they are called from the client’s event processing loop.
ThinClient
class ThinClient()
A minimal Katzenpost Python thin client for communicating with the local Katzenpost client daemon over a UNIX or TCP socket.
The thin client is responsible for:
- Establishing a connection to the client daemon.
- Receiving and parsing PKI documents.
- Sending messages to mixnet services (with or without SURBs).
- Handling replies and events via user-defined callbacks.
All cryptographic operations are handled by the daemon, not by this client.
ThinClient.__init__
def __init__(config: Config) -> None
Initialize the thin client with the given configuration.
Arguments:
configConfig - The configuration object containing socket details and callbacks.
Raises:
RuntimeError- If the network type is not recognized or config is incomplete.
ThinClient.start
async def start(loop: asyncio.AbstractEventLoop) -> None
Start the thin client: establish connection to the daemon, read initial events, and begin the background event loop.
Arguments:
-
loopasyncio.AbstractEventLoop - The running asyncio event loop.Exceptions: BrokenPipeError
ThinClient.get_config
def get_config() -> Config
Returns the current configuration object.
Returns:
Config- The client configuration in use.
ThinClient.is_connected
def is_connected() -> bool
Returns True if the daemon is currently connected to the mixnet.
Note the distinction: this reflects the daemon’s mixnet
connectivity, not the local socket between this thin client and
the daemon. The daemon may be reachable while the mixnet itself
is unreachable; in that case the local socket is fine but this
method returns False, and send_message / blocking_send_message
will raise ThinClientOfflineError. The latest value is updated
from ConnectionStatusEvents pushed by the daemon.
Returns:
bool- True if the daemon is connected to the mixnet, False otherwise (offline mode).
ThinClient.stop
def stop() -> None
Gracefully shut down the client and close its socket. Sends a thin_close message to the daemon so it can clean up ARQ state for this connection before disconnecting.
ThinClient.disconnect
def disconnect() -> None
Close the connection without sending thin_close. The daemon preserves all state for this client’s app ID, allowing the client to reconnect and resume with the same session token.
ThinClient.recv
async def recv(loop: asyncio.AbstractEventLoop) -> "Dict[Any,Any]"
Receive a CBOR-encoded message from the daemon.
Arguments:
loopasyncio.AbstractEventLoop - Event loop to use for socket reads.
Returns:
dict- Decoded CBOR response from the daemon.
Raises:
BrokenPipeError- If connection failsValueError- If message framing fails.
ThinClient.worker_loop
async def worker_loop(loop: asyncio.events.AbstractEventLoop) -> None
Background task that listens for events and dispatches them. Survives daemon disconnects by automatically reconnecting with exponential backoff. Only stopping (from stop()) causes this task to exit.
ThinClient.parse_status
def parse_status(event: "Dict[str,Any]") -> None
Parse a connection status event and update connection state.
ThinClient.pki_document
def pki_document() -> "Dict[str,Any] | None"
Return the most recent PKI consensus document the daemon has forwarded to this thin client.
The document is a CBOR map describing the current mixnet topology, the set of available services, and per-node public-key material. Useful inputs include the PKI epoch, the list of mix nodes, the list of service providers, and the replica descriptors consulted by Pigeonhole.
Returns:
Dict[str, Any] | None: The parsed CBOR PKI document, or
None if the daemon has not yet forwarded one (most
commonly on a freshly-connected client, before the first
on_new_pki_document callback has fired).
ThinClient.pki_document_for_epoch
def pki_document_for_epoch(epoch: int) -> "Dict[str,Any]"
Return the cached PKI document for a specific epoch.
Falls back to the current document if the requested epoch is not cached. Raises if no document is available at all.
Arguments:
epochint - The epoch number.
Returns:
dict- Parsed PKI document for the given epoch.
Raises:
Exception- If no PKI document is available.
ThinClient.get_pki_document_raw
async def get_pki_document_raw(epoch: int = 0) -> "Tuple[bytes,int]"
Return the cert.Certificate-wrapped signed PKI document for the requested epoch, with every directory authority signature intact.
The thin client receives the stripped PKI document by default
(via the on_new_pki_document callback, also available through
:py:meth:pki_document and :py:meth:pki_document_for_epoch);
the daemon nils the signature map before forwarding it. Use this
method when the caller wishes to verify the directory authority
signatures itself: the returned payload may be deserialized and
verified with the katzenpost core/pki.FromPayload routine
against the authorities listed in client.toml.
Arguments:
epochint - Epoch for which the signed PKI document should be returned. Pass0(the default) to request the document the daemon believes is current.
Returns:
Tuple[bytes, int]: (payload, epoch) where payload is
the cert.Certificate-wrapped signed PKI document and
epoch is the epoch of the returned document. When 0
was passed in, epoch echoes the epoch the daemon
resolved to.
Raises:
Exception- If the daemon has no cached document for the requested epoch, or any other error code is returned.
ThinClient.parse_pki_doc
def parse_pki_doc(event: "Dict[str,Any]") -> None
Parse and store a new PKI document received from the daemon.
ThinClient.get_services
def get_services(capability: str) -> "List[ServiceDescriptor]"
Look up all services in the PKI that advertise a given capability.
Arguments:
capabilitystr - Capability name (e.g., “echo”).
Returns:
list[ServiceDescriptor]- Matching services.xsy
Raises:
Exception- If PKI is missing or no services match.
ThinClient.get_service
def get_service(service_name: str) -> ServiceDescriptor
Select one random service matching a capability from the current PKI document.
Multiple mix nodes may advertise the same capability; this method
returns an arbitrary one. To see every advertised instance, use
get_services.
Arguments:
service_namestr - The capability name (e.g."echo","courier").
Returns:
ServiceDescriptor- One of the matching services.
Raises:
Exception- If the PKI document is missing, or no node in the current consensus advertisesservice_name.
ThinClient.get_all_couriers
def get_all_couriers() -> "List[Tuple[bytes, bytes]]"
Return every courier service advertised in the current PKI
document, each described by an (identity_hash, queue_id)
tuple. The list reflects only the couriers that the current
consensus regards as serving.
The principal caller is the nested-copy-command machinery, which
needs to choose particular couriers rather than accept the random
draw made on the caller’s behalf by
start_resending_copy_command; for simple cases where any
courier will do, the default routing path is usually preferable.
Returns:
list[tuple[bytes, bytes]]: List of (identity_hash, queue_id) tuples.
Raises:
Exception- If no couriers are available.
ThinClient.get_distinct_couriers
def get_distinct_couriers(n: int) -> "List[Tuple[bytes, bytes]]"
Draw n couriers uniformly at random from the list returned by
get_all_couriers, without replacement, so that no two entries
in the returned list refer to the same courier. This is the usual
building block for a nested copy command, every layer of which
must be carried by a different courier.
Arguments:
nint - Number of distinct couriers to return.
Returns:
list[tuple[bytes, bytes]]: List of (identity_hash, queue_id) tuples.
Raises:
Exception- If the current PKI document advertises fewer thanncouriers.
ThinClient.blocking_send_message
async def blocking_send_message(payload: bytes | str,
dest_node: bytes,
dest_queue: bytes,
timeout_seconds: float = 30.0) -> bytes
Send a message and block until a reply is received or timeout.
Arguments:
payloadbytes or str - Message payload.dest_nodebytes - Destination node identity hash.dest_queuebytes - Destination recipient queue ID.timeout_secondsfloat - Timeout in seconds (default 30).
Returns:
bytes- Reply payload from the destination service.
Raises:
ThinClientOfflineError- If in offline mode.asyncio.TimeoutError- If no reply within timeout.
ThinClient.new_message_id
@staticmethod
def new_message_id() -> bytes
Generate a new 16-byte random message ID.
Message IDs are used to correlate SendMessage requests with their
corresponding MessageSentEvent and (if a SURB is present)
MessageReplyEvent. Callers generally do not need to construct
one by hand — blocking_send_message does it internally — but
this helper is exposed for callers composing requests manually.
Randomness is drawn from os.urandom.
Returns:
bytes- Random 16-byte identifier.
ThinClient.new_surb_id
def new_surb_id() -> bytes
Generate a new random SURB ID.
SURB IDs identify which Single Use Reply Block a given
on_message_reply event corresponds to. Pass the returned bytes
as the surb_id argument to send_message, then watch the
callback for a matching reply. Randomness is drawn from
os.urandom.
Returns:
bytes- Random identifier ofSURB_ID_SIZEbytes.
ThinClient.new_query_id
def new_query_id() -> bytes
Generate a new 16-byte random query ID.
Query IDs correlate requests and replies within the thin client ↔
daemon CBOR protocol (distinct from mix-network SURB IDs, which
identify replies within the mixnet itself). Most callers never
touch query IDs directly; they are used internally by the
Pigeonhole API helpers. Randomness is drawn from os.urandom.
Returns:
bytes- Random 16-byte identifier.
ThinClient.handle_response
async def handle_response(response: "Dict[str,Any]") -> None
Dispatch a parsed CBOR response to the appropriate handler or callback.
ThinClient.send_message_without_reply
async def send_message_without_reply(payload: bytes | str, dest_node: bytes,
dest_queue: bytes) -> None
Send a fire-and-forget message with no SURB or reply handling. This method requires mixnet connectivity.
Arguments:
payloadbytes or str - Message payload.dest_nodebytes - Destination node identity hash.dest_queuebytes - Destination recipient queue ID.
Raises:
ThinClientOfflineError- If in offline mode (daemon not connected to mixnet).
ThinClient.send_message
async def send_message(surb_id: bytes, payload: bytes | str, dest_node: bytes,
dest_queue: bytes) -> None
Send a message using a SURB to allow the recipient to send a reply. This method requires mixnet connectivity.
Arguments:
surb_idbytes - SURB identifier for reply correlation.payloadbytes or str - Message payload.dest_nodebytes - Destination node identity hash.dest_queuebytes - Destination recipient queue ID.
Raises:
ThinClientOfflineError- If in offline mode (daemon not connected to mixnet).
ThinClient.pretty_print_pki_doc
def pretty_print_pki_doc(doc: "Dict[str,Any]") -> None
Pretty-print a parsed PKI document with fully decoded CBOR nodes.
Arguments:
docdict - Raw PKI document from the daemon.
katzenpost_thinclient.pigeonhole
Katzenpost Python Thin Client - New Pigeonhole API
This module provides the new capability-based Pigeonhole API methods. These methods use WriteCap/ReadCap keypairs and provide direct control over the Pigeonhole protocol.
KeypairResult
@dataclass
class KeypairResult()
Result from new_keypair containing the generated capabilities.
EncryptReadResult
@dataclass
class EncryptReadResult()
Result from encrypt_read containing the encrypted read request.
EncryptWriteResult
@dataclass
class EncryptWriteResult()
Result from encrypt_write containing the encrypted write request.
StartResendingResult
@dataclass
class StartResendingResult()
Result from start_resending_encrypted_message and its variants.
StartResendingResult.plaintext
Decrypted message for read operations, or empty bytes for writes.
StartResendingResult.courier_identity_hash
32-byte hash of the identity key of the courier that handled this message. Callers can watch PKI document updates for this courier disappearing from consensus and cancel+re-encrypt if needed.
StartResendingResult.courier_queue_id
Queue ID of the courier that handled this message.
new_keypair
async def new_keypair(self, seed: bytes) -> KeypairResult
Creates a new keypair for use with the Pigeonhole protocol.
This method generates a WriteCap and ReadCap from the provided seed using the BACAP (Blinding-and-Capability) protocol. The WriteCap should be stored securely for writing messages, while the ReadCap can be shared with others to allow them to read messages.
Arguments:
seed- 32-byte seed used to derive the keypair.
Returns:
KeypairResult- Contains write_cap, read_cap, and first_message_index.
Raises:
Exception- If the keypair creation fails.ValueError- If seed is not exactly 32 bytes.
Example:
import os
seed = os.urandom(32)
result = await client.new_keypair(seed)
# Share result.read_cap with Bob so he can read messages
# Store result.write_cap for sending messages
encrypt_read
async def encrypt_read(self, read_cap: bytes,
message_box_index: bytes) -> EncryptReadResult
Encrypts a read operation for a given read capability.
This method prepares an encrypted read request that can be sent to the courier service to retrieve a message from a pigeonhole box. The returned ciphertext should be sent via start_resending_encrypted_message.
Arguments:
read_cap- Read capability that grants access to the channel.message_box_index- Starting read position for the channel.
Returns:
EncryptReadResult- Contains message_ciphertext, envelope_descriptor, and envelope_hash.
Raises:
Exception- If the encryption fails.
Example:
result = await client.encrypt_read(read_cap, message_box_index)
# Send result.message_ciphertext via start_resending_encrypted_message
encrypt_write
async def encrypt_write(self, plaintext: bytes, write_cap: bytes,
message_box_index: bytes) -> EncryptWriteResult
Encrypts a write operation for a given write capability.
This method prepares an encrypted write request that can be sent to the courier service to store a message in a pigeonhole box. The returned ciphertext should be sent via start_resending_encrypted_message.
Plaintext Size Constraint: The plaintext must not exceed PigeonholeGeometry.max_plaintext_payload_length bytes. The daemon internally adds a 4-byte big-endian length prefix before padding and encryption, so the actual wire format is: [4-byte length][plaintext][zero padding].
If the plaintext exceeds the maximum size, the daemon will return ThinClientErrorInvalidRequest.
Arguments:
plaintext- The plaintext message to encrypt. Must be at most PigeonholeGeometry.max_plaintext_payload_length bytes.write_cap- Write capability that grants access to the channel.message_box_index- The message box index for this write operation.
Returns:
EncryptWriteResult- Contains message_ciphertext, envelope_descriptor, and envelope_hash.
Raises:
Exception- If the encryption fails (including if plaintext is too large).
Example:
plaintext = b"Hello, Bob!"
result = await client.encrypt_write(plaintext, write_cap, message_box_index)
# Send result.message_ciphertext via start_resending_encrypted_message
start_resending_encrypted_message
async def start_resending_encrypted_message(
self,
read_cap: "bytes|None",
write_cap: "bytes|None",
message_box_index: "bytes|None",
reply_index: "int|None",
envelope_descriptor: bytes,
message_ciphertext: bytes,
envelope_hash: bytes,
no_retry_on_box_id_not_found: bool = False,
no_idempotent_box_already_exists: bool = False
) -> StartResendingResult
Starts resending an encrypted message via ARQ.
This method initiates automatic repeat request (ARQ) for an encrypted message, which will be resent periodically until either:
- A reply is received from the courier
- The message is cancelled via cancel_resending_encrypted_message
- The client is shut down
This is used for both read and write operations in the new Pigeonhole API.
The daemon implements a finite state machine (FSM) for handling the stop-and-wait ARQ protocol:
- For default write operations (write_cap != None, read_cap == None, no_idempotent_box_already_exists == False): The method waits for an ACK from the courier and returns immediately. The ACK confirms the courier received the envelope and will dispatch it to both shard replicas. This requires only a single round-trip through the mixnet.
- For BoxAlreadyExists-aware writes (no_idempotent_box_already_exists == True): The method waits for an ACK, then sends a second SURB to retrieve the replica’s error code. This requires two round-trips through the mixnet.
- For read operations (read_cap != None, write_cap == None): The method waits for an ACK from the courier, then the daemon automatically sends a new SURB to request the payload, and this method waits for the payload. The daemon performs all decryption (MKEM envelope + BACAP payload) and returns the fully decrypted plaintext.
Arguments:
read_cap- Read capability (can be None for write operations, required for reads).write_cap- Write capability (can be None for read operations, required for writes).message_box_index- Current message box index being operated on (required for reads).reply_index- Index of the reply to use (typically 0 or 1).envelope_descriptor- Serialized envelope descriptor for MKEM decryption.message_ciphertext- MKEM-encrypted message to send (from encrypt_read or encrypt_write).envelope_hash- Hash of the courier envelope.no_retry_on_box_id_not_found- If True, BoxIDNotFound errors on reads trigger immediate error instead of automatic retries. By default (False), reads retry on BoxIDNotFound until the box is found or the operation is cancelled, riding out replication lag; the retries are not capped. Set to True to get an immediate BoxIDNotFound error without retries.no_idempotent_box_already_exists- If True, BoxAlreadyExists errors on writes are returned as errors instead of being treated as idempotent success. By default (False), BoxAlreadyExists is treated as success (the write already happened). Set to True to detect whether a write was actually performed or if the box already existed.
Returns:
StartResendingResult- Contains plaintext (decrypted message for reads, empty for writes), courier_identity_hash, and courier_queue_id.
Raises:
BoxIDNotFoundError- If no_retry_on_box_id_not_found=True and the box does not exist.BoxAlreadyExistsError- If no_idempotent_box_already_exists=True and the box already contains data.Exception- If the operation fails. Check error_code for specific errors.
Example:
result = await client.start_resending_encrypted_message(
read_cap, None, message_box_index, reply_idx, env_desc, ciphertext, env_hash)
print(f"Received: {result.plaintext}")
start_resending_encrypted_message_return_box_exists
async def start_resending_encrypted_message_return_box_exists(
self, read_cap: "bytes|None", write_cap: "bytes|None",
message_box_index: "bytes|None", reply_index: "int|None",
envelope_descriptor: bytes, message_ciphertext: bytes,
envelope_hash: bytes) -> StartResendingResult
Behaves exactly like start_resending_encrypted_message save that
it raises BoxAlreadyExistsError when the replica reports the
destination box has already been written, rather than swallowing the
condition as idempotent success. Use this when one needs to
distinguish a fresh write from a repeat: for instance, when
implementing optimistic concurrency on top of the channel, or when
establishing whether a particular call actually caused a state
change at the replica.
Note that this variant costs an additional mixnet round trip: the BoxAlreadyExists code is carried by the replica’s reply rather than the courier’s ACK, so the daemon must dispatch a second SURB before it can return the answer.
As with start_resending_encrypted_message, an in-flight call
can be cancelled from another task via
cancel_resending_encrypted_message.
Arguments:
read_cap- Read capability (can be None for write operations, required for reads).write_cap- Write capability (can be None for read operations, required for writes).message_box_index- Current message box index being operated on (required for reads).reply_index- Index of the reply to use (typically 0 or 1).envelope_descriptor- Serialized envelope descriptor for MKEM decryption.message_ciphertext- MKEM-encrypted message to send (from encrypt_read or encrypt_write).envelope_hash- Hash of the courier envelope.
Returns:
StartResendingResult- Contains plaintext, courier_identity_hash, and courier_queue_id.
Raises:
BoxAlreadyExistsError- If the box already contains data.Exception- If the operation fails.
Example:
try:
await client.start_resending_encrypted_message_return_box_exists(
None, write_cap, None, None, env_desc, ciphertext, env_hash)
except BoxAlreadyExistsError:
print("Box already has data; write was idempotent")
start_resending_encrypted_message_no_retry
async def start_resending_encrypted_message_no_retry(
self, read_cap: "bytes|None", write_cap: "bytes|None",
message_box_index: "bytes|None", reply_index: "int|None",
envelope_descriptor: bytes, message_ciphertext: bytes,
envelope_hash: bytes) -> StartResendingResult
Behaves exactly like start_resending_encrypted_message save that
it disables the daemon’s automatic retry of BoxIDNotFoundError.
The caller learns at once that the box is absent rather than waiting
for replication to settle.
Use this when polling a box that may not yet have been written: for instance, when a reader peeks ahead at a peer’s next message before that peer has produced it. The regular variant would block until the box appeared, which can be many round trips.
As with start_resending_encrypted_message, an in-flight call
can be cancelled from another task via
cancel_resending_encrypted_message.
Arguments:
read_cap- Read capability (can be None for write operations, required for reads).write_cap- Write capability (can be None for read operations, required for writes).message_box_index- Current message box index being operated on (required for reads).reply_index- Index of the reply to use (typically 0 or 1).envelope_descriptor- Serialized envelope descriptor for MKEM decryption.message_ciphertext- MKEM-encrypted message to send (from encrypt_read or encrypt_write).envelope_hash- Hash of the courier envelope.
Returns:
StartResendingResult- Contains plaintext, courier_identity_hash, and courier_queue_id.
Raises:
BoxIDNotFoundError- If the box does not exist (no automatic retries).Exception- If the operation fails.
Example:
try:
result = await client.start_resending_encrypted_message_no_retry(
read_cap, None, message_box_index, reply_idx, env_desc, ciphertext, env_hash)
except BoxIDNotFoundError:
print("Box not found; message not yet written")
cancel_resending_encrypted_message
async def cancel_resending_encrypted_message(self,
envelope_hash: bytes) -> None
Cancels ARQ resending for an encrypted message.
This method stops the automatic repeat request (ARQ) for a previously started encrypted message transmission. This is useful when:
- A reply has been received through another channel
- The operation should be aborted
- The message is no longer needed
Arguments:
envelope_hash- Hash of the courier envelope to cancel.
Raises:
Exception- If the cancellation fails.
Example:
await client.cancel_resending_encrypted_message(env_hash)
next_message_box_index
async def next_message_box_index(self, message_box_index: bytes) -> bytes
Increments a MessageBoxIndex using the BACAP NextIndex method.
This method is used when sending multiple messages to different mailboxes using the same WriteCap or ReadCap. It properly advances the cryptographic state by:
- Incrementing the Idx64 counter
- Deriving new encryption and blinding keys using HKDF
- Updating the HKDF state for the next iteration
The daemon handles the cryptographic operations internally, ensuring correct BACAP protocol implementation.
Arguments:
message_box_index- Current message box index to increment (as bytes).
Returns:
bytes- The next message box index.
Raises:
Exception- If the increment operation fails.
Example:
current_index = first_message_index
next_index = await client.next_message_box_index(current_index)
# Use next_index for the next message
get_message_box_index_counter
async def get_message_box_index_counter(self, message_box_index: bytes) -> int
Return the BACAP Idx64 counter embedded in a MessageBoxIndex.
Callers that persist MessageBoxIndex blobs across sessions can use this to order or compare two indexes — e.g. to detect a duplicate ACK that would otherwise regress a write-cap’s index — without having to peek at the binary layout themselves. The layout (first 8 bytes little-endian) is a BACAP implementation detail and must not be relied on outside the daemon.
Arguments:
message_box_index- MessageBoxIndex blob (as bytes) whose counter should be returned.
Returns:
int- The BACAP Idx64 value.
Raises:
Exception- If the daemon rejects the request.
Example:
current_idx = await client.get_message_box_index_counter(mbi_a)
next_idx = await client.get_message_box_index_counter(mbi_b)
if next_idx <= current_idx:
print("skipping stale ACK")
start_resending_copy_command
async def start_resending_copy_command(
self,
write_cap: bytes,
courier_identity_hash: "bytes|None" = None,
courier_queue_id: "bytes|None" = None) -> None
Starts resending a copy command to a courier via ARQ.
This method instructs a courier to read data from a temporary channel (identified by the write_cap) and write it to the destination channel. The command is automatically retransmitted until acknowledged.
If courier_identity_hash and courier_queue_id are both provided, the copy command is sent to that specific courier. Otherwise, a random courier is selected.
Arguments:
write_cap- Write capability for the temporary channel containing the data.courier_identity_hash- Optional identity hash of a specific courier to use.courier_queue_id- Optional queue ID for the specified courier. Must be set if courier_identity_hash is set.
Raises:
Exception- If the operation fails.
Example:
# Send copy command to a random courier
await client.start_resending_copy_command(temp_write_cap)
# Send copy command to a specific courier
await client.start_resending_copy_command(
temp_write_cap, courier_identity_hash, courier_queue_id)
cancel_resending_copy_command
async def cancel_resending_copy_command(self, write_cap_hash: bytes) -> None
Cancels ARQ resending for a copy command.
This method stops the automatic repeat request (ARQ) for a previously started copy command. Use this when:
- The copy operation should be aborted
- The operation is no longer needed
- You want to clean up pending ARQ operations
Arguments:
write_cap_hash- Hash of the WriteCap used in start_resending_copy_command.
Raises:
Exception- If the cancellation fails.
Example:
await client.cancel_resending_copy_command(write_cap_hash)
create_courier_envelopes_from_payload
async def create_courier_envelopes_from_payload(
self, payload: bytes, dest_write_cap: bytes, dest_start_index: bytes,
is_start: bool, is_last: bool) -> "CreateEnvelopesResult"
Packs a payload of arbitrary size (up to 10 MB) into properly sized
CopyStreamElement chunks for one destination channel. Each chunk
is a serialised CopyStreamElement, ready to be written to a box
via encrypt_write followed by start_resending_encrypted_message;
the caller marks the boundaries of the stream with the is_start
and is_last flags.
This method is stateless: no daemon state is kept between calls, each invocation runs a fresh encoder and flushes before returning. The 10 MB cap guards against accidental memory exhaustion.
Once the chunks have been written to a temporary copy stream, a
copy command (start_resending_copy_command) is dispatched to a
courier with the write capability for that temporary stream; the
courier reads the chunks back and writes each envelope to its
destination box.
Multiple calls can target the same destination stream by passing
next_dest_index from the previous result as dest_start_index.
Arguments:
payload- The data to be encoded into courier envelopes (max 10MB).dest_write_cap- Write capability for the destination channel.dest_start_index- Starting index in the destination channel.is_start- Whether this is the first call (sets IsStart flag on first element).is_last- Whether this is the last call (sets IsFinal flag on last element).
Returns:
CreateEnvelopesResult- Contains envelopes and next_dest_index.
Raises:
Exception- If the envelope creation fails.
create_courier_envelopes_from_multi_payload
async def create_courier_envelopes_from_multi_payload(
self,
destinations: "List[Dict[str, Any]]",
is_start: bool,
is_last: bool,
buffer: "bytes | None" = None) -> "CreateEnvelopesResult"
Packs payloads bound for several destination channels into a single
stream of CopyStreamElement chunks. This is more space-efficient
than calling create_courier_envelopes_from_payload once per
destination, because the shared encoder runs all envelopes together
rather than padding the final box of each destination independently.
This method is stateless: the buffer argument carries any residual
encoder state across calls in place of daemon-side bookkeeping. Pass
None for buffer on the first call and the buffer returned
by the previous call thereafter; set is_last on the final call so
the encoder flushes its tail.
Arguments:
destinations- List of destination payloads, each a dict with:- “payload”: bytes - The data to be written
- “write_cap”: bytes - Write capability for destination
- “start_index”: bytes - Starting index in destination
is_start- Whether this is the first call in the sequence. When True, the first CopyStreamElement will have IsStart=true.is_last- Whether this is the last set of payloads in the sequence. When True, the final CopyStreamElement will have IsFinal=true.buffer- Residual encoder buffer from a previous call, or None.
Returns:
CreateEnvelopesResult- Contains envelopes and buffer for next call.
Raises:
Exception- If the envelope creation fails.
Example:
destinations = [
{"payload": data1, "write_cap": cap1, "start_index": idx1},
{"payload": data2, "write_cap": cap2, "start_index": idx2},
]
result = await client.create_courier_envelopes_from_multi_payload(
destinations, is_start=True, is_last=False)
# Pass buffer to next call
result2 = await client.create_courier_envelopes_from_multi_payload(
more_destinations, is_start=False, is_last=True, buffer=result.buffer)
CreateEnvelopesResult
@dataclass
class CreateEnvelopesResult()
Result of creating courier envelopes.
CreateEnvelopesResult.envelopes
The serialized CopyStreamElements to send to the network.
CreateEnvelopesResult.buffer
The buffered data that hasn’t been output yet. Persist this for crash recovery. Only populated by create_courier_envelopes_from_multi_payload.
CreateEnvelopesResult.next_dest_index
The next destination message box index after all boxes consumed by this call. Only populated by create_courier_envelopes_from_payload.
CreateEnvelopesResult.next_dest_indices
The next destination indices for each destination, in request order. Only populated by create_courier_envelopes_from_multi_payload.
TombstoneEnvelope
@dataclass
class TombstoneEnvelope()
A single tombstone envelope ready to be sent.
TombstoneRangeResult
@dataclass
class TombstoneRangeResult()
Result of a tombstone_range operation.
tombstone_range
async def tombstone_range(self, write_cap: bytes, start: bytes,
max_count: int) -> TombstoneRangeResult
Prepares the encrypted envelopes needed to tombstone a consecutive
range of pigeonhole boxes beginning at the supplied
MessageBoxIndex. A tombstone is a signed empty payload that the
replica recognises as a deletion marker; the daemon constructs one
by signing rather than encrypting whenever encrypt_write is
invoked with an empty plaintext.
This method does not itself touch the network: it returns the
envelopes for the caller to dispatch one by one, typically via
start_resending_encrypted_message. To tombstone a single box,
pass max_count=1.
Arguments:
write_cap- Write capability for the boxes.start- Starting MessageBoxIndex.max_count- Maximum number of boxes to tombstone.
Returns:
TombstoneRangeResult- Contains envelopes (list of TombstoneEnvelope) and next (the next MessageBoxIndex after the last processed).
Raises:
ValueError- If write_cap or start is None.
Example:
result = await client.tombstone_range(write_cap, start_index, 10)
for envelope in result.envelopes:
await client.start_resending_encrypted_message(
None, write_cap, None, None,
envelope.envelope_descriptor,
envelope.message_ciphertext,
envelope.envelope_hash)
create_courier_envelopes_from_tombstone_range
async def create_courier_envelopes_from_tombstone_range(
self,
dest_write_cap: bytes,
dest_start_index: bytes,
max_count: int,
is_start: bool,
is_last: bool,
buffer: "bytes | None" = None) -> "CreateEnvelopesResult"
Packs tombstones for a consecutive range of destination boxes into
CopyStreamElement chunks. The chunks are written to a temporary
copy stream and then dispatched as a copy command; the courier
applies all the tombstones atomically, which is the natural way to
retire a range of boxes as part of the same copy transaction that
writes their successors.
This method is stateless: the buffer argument carries any residual
encoder state across calls in place of daemon-side bookkeeping. Pass
None for buffer on the first call and the buffer returned
by the previous call thereafter; set is_last on the final call so
the encoder flushes its tail.
Arguments:
dest_write_cap- Write capability for the destination channel.dest_start_index- Starting index in the destination channel.max_count- Number of tombstones to create.is_start- Whether this is the first call in the sequence.is_last- Whether this is the last call in the sequence.buffer- Residual encoder buffer from a previous call, or None.
Returns:
CreateEnvelopesResult- Contains envelopes, buffer, and next_dest_index.
Raises:
Exception- If the operation fails.
Example:
result = await client.create_courier_envelopes_from_tombstone_range(
write_cap, start_index, 10, is_start=True, is_last=True)
for envelope in result.envelopes:
# write envelope to temp copy stream channel
pass
katzenpost_thinclient.transport.tcp
TCP transport for the thin-client.
TcpDialConfig
@dataclass
class TcpDialConfig()
Configures a TCP dialer.
address is in host:port form, e.g. “localhost:64331” or “[::1]:64331”. network is one of “tcp”, “tcp4”, “tcp6”; defaults to “tcp”.
katzenpost_thinclient.transport
Transport abstraction for the Python thin-client.
Each concrete transport (unix, tcp; in future ssh / pipe / pigeonhole) exposes a setup_socket() method that returns a ready-to-connect socket and the server address in the form expected by asyncio’s loop.sock_connect.
DialConfig is a discriminated-union container: exactly one of its inner variants must be populated. Zero or multiple populated variants is a configuration error.
DialConfig
@dataclass
class DialConfig()
Discriminated-union of dial transports. Exactly one subtable must be populated.
DialConfig.resolve
def resolve() -> Any
Return the single populated transport variant.
DialConfig.from_toml_dict
@classmethod
def from_toml_dict(cls, data: dict) -> "DialConfig"
Parse a TOML [Dial] subtable (dict) into a DialConfig.
Rejects unknown subtables (typos, removed variants, future names) and unknown keys inside a recognised subtable. Exactly one of [Dial.Unix] / [Dial.Tcp] must be populated.
katzenpost_thinclient.transport.unix
Unix-domain-socket transport for the thin-client.
UnixDialConfig
@dataclass
class UnixDialConfig()
Configures a unix-domain-socket dialer.
Exceptions
Error types raised by the thin client; each derives from the standard library Exception.
ConfigError
class ConfigError(Exception)
Raised when the thin-client TOML config is missing required sections, contains unknown keys, or otherwise fails structural validation.
Every caller of ConfigFile.load / Config(…) should expect this exception. It is raised eagerly at startup so that a stale or drifted config produces a loud, early failure instead of surfacing later as a mysterious runtime error during mixnet operations.
ReplicaError
class ReplicaError(Exception)
Base class for all replica errors.
BoxIDNotFoundError
class BoxIDNotFoundError(ReplicaError)
Box ID not found on the replica. Occurs when reading from a non-existent mailbox.
InvalidBoxIDError
class InvalidBoxIDError(ReplicaError)
Invalid box ID format.
InvalidSignatureError
class InvalidSignatureError(ReplicaError)
Signature verification failed.
DatabaseFailureError
class DatabaseFailureError(ReplicaError)
Replica encountered a database error.
InvalidPayloadError
class InvalidPayloadError(ReplicaError)
Payload data is invalid.
StorageFullError
class StorageFullError(ReplicaError)
Replica’s storage capacity has been exceeded.
ReplicaInternalError
class ReplicaInternalError(ReplicaError)
Internal error on the replica.
InvalidEpochError
class InvalidEpochError(ReplicaError)
Epoch is invalid or expired.
ReplicationFailedError
class ReplicationFailedError(ReplicaError)
Replication to other replicas failed.
BoxAlreadyExistsError
class BoxAlreadyExistsError(ReplicaError)
Box already contains data. Pigeonhole writes are immutable.
TombstoneError
class TombstoneError(ReplicaError)
Box contains a tombstone (intentional deletion). This is not a failure.
InvalidTombstoneSignatureError
class InvalidTombstoneSignatureError(Exception)
Tombstone signature verification failed (forgery or corruption).
MKEMDecryptionFailedError
class MKEMDecryptionFailedError(Exception)
MKEM envelope decryption failed with all replica keys.
BACAPDecryptionFailedError
class BACAPDecryptionFailedError(Exception)
BACAP payload decryption or signature verification failed.
StartResendingCancelledError
class StartResendingCancelledError(Exception)
StartResendingEncryptedMessage operation was cancelled.
CopyCommandFailedError
class CopyCommandFailedError(Exception)
StartResendingCopyCommand operation failed on the courier.
The courier aborted the Copy command because a replica rejected one of the embedded writes. Inspect the diagnostic attributes to determine the cause:
Attributes:
replica_error_codeint - The pigeonhole replica ErrorCode that triggered the abort (e.g. REPLICA_ERROR_BOX_ALREADY_EXISTS). 0 if not reported.failed_envelope_indexint - 1-based sequential position in the copy stream of the envelope whose write triggered the abort. 0 if not applicable. This is NOT a BACAP message index.