katzenpost_thinclient
Katzenpost Python Thin Client
This module provides a minimal async Python client for communicating with the Katzenpost client daemon over an abstract Unix domain socket. It allows applications to send and receive messages via the mix network by interacting with the daemon.
The thin client handles:
- Connecting to the local daemon
- Sending messages
- Receiving events and responses from the daemon
- Accessing the current PKI document and service descriptors
All cryptographic operations, including PQ Noise transport, Sphinx packet construction, and retransmission mechanisms are handled by the client daemon, and not this thin client library.
For more information, see our client integration guide: https://katzenpost.network/docs/client_integration/
Usage Example
import asyncio
from thinclient import ThinClient, Config
def on_message_reply(event):
print("Got reply:", event)
async def main():
cfg = Config("./thinclient.toml", on_message_reply=on_message_reply)
client = ThinClient(cfg)
loop = asyncio.get_running_loop()
await client.start(loop)
service = client.get_service("echo")
surb_id = client.new_surb_id()
client.send_message(surb_id, "hello mixnet", *service.to_destination())
await client.await_message_reply()
asyncio.run(main())
Geometry Objects
class Geometry()
Geometry describes the geometry of a Sphinx packet.
NOTE: You must not try to compose a Sphinx Geometry yourself. It must be programmatically generated by Katzenpost genconfig or gensphinx CLI utilities.
We describe all the Sphinx Geometry attributes below, however the only one you are interested in to faciliate your thin client message bounds checking is UserForwardPayloadLength, which indicates the maximum sized message that you can send to a mixnet service in a single packet.
Attributes:
PacketLengthint - The total length of a Sphinx packet in bytes.NrHopsint - The number of hops; determines the header’s structure.HeaderLengthint - The total size of the Sphinx header in bytes.RoutingInfoLengthint - The length of the routing information portion of the header.PerHopRoutingInfoLengthint - The length of routing info for a single hop.SURBLengthint - The length of a Single-Use Reply Block (SURB).SphinxPlaintextHeaderLengthint - The length of the unencrypted plaintext header.PayloadTagLengthint - The length of the tag used to authenticate the payload.ForwardPayloadLengthint - The size of the full payload including padding and tag.UserForwardPayloadLengthint - The usable portion of the payload intended for the recipient.NextNodeHopLengthint - Derived from the expected maximum routing info block size.SPRPKeyMaterialLengthint - The length of the key used for SPRP (Sphinx packet payload encryption).NIKENamestr - Name of the NIKE scheme (if used). Mutually exclusive with KEMName.KEMNamestr - Name of the KEM scheme (if used). Mutually exclusive with NIKEName.
ConfigFile Objects
class ConfigFile()
ConfigFile represents everything loaded from a TOML file: network, address, and geometry.
pretty_print_obj
def pretty_print_obj(obj)
Pretty-print a Python object using indentation.
This function uses pprintpp to print complex data structures
(e.g., dictionaries, lists) in a readable, indented format.
Arguments:
objAny - The object to pretty-print.
ServiceDescriptor Objects
class ServiceDescriptor()
Describes a mixnet service endpoint retrieved from the PKI document.
A ServiceDescriptor encapsulates the necessary information for communicating with a service on the mix network. The service node’s identity public key’s hash is used as the destination address along with the service’s queue ID.
Attributes:
recipient_queue_idbytes - The identifier of the recipient’s queue on the mixnet.mix_descriptordict - A CBOR-decoded dictionary describing the mix node, typically includes the ‘IdentityKey’ and other metadata.
Methods:
to_destination()- Returns a tuple of (provider_id_hash, recipient_queue_id), where the provider ID is a 32-byte BLAKE2b hash of the IdentityKey.
find_services
def find_services(capability, doc)
Search the PKI document for services supporting the specified capability.
This function iterates over all service nodes in the PKI document,
deserializes each CBOR-encoded node, and looks for advertised capabilities.
If a service provides the requested capability, it is returned as a
ServiceDescriptor.
Arguments:
capabilitystr - The name of the capability to search for (e.g., “echo”).docdict - The decoded PKI document as a Python dictionary, which must include a “ServiceNodes” key containing CBOR-encoded descriptors.
Returns:
List[ServiceDescriptor]- A list of matching service descriptors that advertise the capability.
Raises:
KeyError- If the ‘ServiceNodes’ field is missing from the PKI document.
Config Objects
class Config()
Config is the configuration object for the ThinClient.
__init__
def __init__(filepath,
on_connection_status=None,
on_new_pki_document=None,
on_message_sent=None,
on_message_reply=None)
Initialize the Config object.
Arguments:
filepathstr - Path to the TOML config file.on_connection_statuscallable - Callback for connection status events.on_new_pki_documentcallable - Callback for new PKI document events.on_message_sentcallable - Callback for sent message events.on_message_replycallable - Callback for message reply events.
ThinClient Objects
class ThinClient()
A minimal Katzenpost Python thin client for communicating with the local Katzenpost client daemon over a UNIX or TCP socket.
The thin client is responsible for:
- Establishing a connection to the client daemon.
- Receiving and parsing PKI documents.
- Sending messages to mixnet services (with or without SURBs).
- Handling replies and events via user-defined callbacks.
All cryptographic operations are handled by the daemon, not by this client.
__init__
def __init__(config)
Initialize the thin client with the given configuration.
Arguments:
configConfig - The configuration object containing socket details and callbacks.
Raises:
RuntimeError- If the network type is not recognized or config is incomplete.
start
async def start(loop)
Start the thin client: establish connection to the daemon, read initial events, and begin the background event loop.
Arguments:
loopasyncio.AbstractEventLoop - The running asyncio event loop.
get_config
def get_config()
Returns the current configuration object.
Returns:
Config- The client configuration in use.
stop
def stop()
Gracefully shut down the client and close its socket.
recv
async def recv(loop)
Receive a CBOR-encoded message from the daemon.
Arguments:
loopasyncio.AbstractEventLoop - Event loop to use for socket reads.
Returns:
dict- Decoded CBOR response from the daemon.
Raises:
ValueError- If message framing fails.
worker_loop
async def worker_loop(loop)
Background task that listens for events and dispatches them.
parse_status
def parse_status(event)
Parse a connection status event and assert daemon connectivity.
pki_document
def pki_document()
Retrieve the latest PKI document received.
Returns:
dict- Parsed CBOR PKI document.
parse_pki_doc
def parse_pki_doc(event)
Parse and store a new PKI document received from the daemon.
get_services
def get_services(capability)
Look up all services in the PKI that advertise a given capability.
Arguments:
capabilitystr - Capability name (e.g., “echo”).
Returns:
list[ServiceDescriptor]- Matching services.
Raises:
Exception- If PKI is missing or no services match.
get_service
def get_service(service_name)
Select a random service matching a capability.
Arguments:
service_namestr - The capability name (e.g., “echo”).
Returns:
ServiceDescriptor- One of the matching services.
new_message_id
def new_message_id()
Generate a new 16-byte message ID for use with ARQ sends.
Returns:
bytes- Random 16-byte identifier.
new_surb_id
def new_surb_id()
Generate a new 16-byte SURB ID for reply-capable sends.
Returns:
bytes- Random 16-byte identifier.
handle_response
def handle_response(response)
Dispatch a parsed CBOR response to the appropriate handler or callback.
send_message_without_reply
def send_message_without_reply(payload, dest_node, dest_queue)
Send a fire-and-forget message with no SURB or reply handling.
Arguments:
payloadbytes or str - Message payload.dest_nodebytes - Destination node identity hash.dest_queuebytes - Destination recipient queue ID.
send_message
def send_message(surb_id, payload, dest_node, dest_queue)
Send a message using a SURB to allow the recipient to send a reply.
Arguments:
surb_idbytes - SURB identifier for reply correlation.payloadbytes or str - Message payload.dest_nodebytes - Destination node identity hash.dest_queuebytes - Destination recipient queue ID.
send_reliable_message
def send_reliable_message(message_id, payload, dest_node, dest_queue)
Send a reliable message using an ARQ mechanism and message ID.
Arguments:
message_idbytes - Message ID for reply correlation.payloadbytes or str - Message payload.dest_nodebytes - Destination node identity hash.dest_queuebytes - Destination recipient queue ID.
pretty_print_pki_doc
def pretty_print_pki_doc(doc)
Pretty-print a parsed PKI document with fully decoded CBOR nodes.
Arguments:
docdict - Raw PKI document from the daemon.
await_message_reply
async def await_message_reply()
Asynchronously block until a reply is received from the daemon.