katzenpost_thinclient
Katzenpost Python Thin Client
This module provides a minimal async Python client for communicating with the Katzenpost client daemon over an abstract Unix domain socket. It allows applications to send and receive messages via the mix network by interacting with the daemon.
The thin client handles:
- Connecting to the local daemon
- Sending messages
- Receiving events and responses from the daemon
- Accessing the current PKI document and service descriptors
All cryptographic operations, including PQ Noise transport, Sphinx packet construction, and retransmission mechanisms are handled by the client daemon, and not this thin client library.
For more information, see our client integration guide: https://katzenpost.network/docs/client_integration/
Usage Example
import asyncio
from thinclient import ThinClient, Config
def on_message_reply(event):
print("Got reply:", event)
async def main():
cfg = Config("./thinclient.toml", on_message_reply=on_message_reply)
client = ThinClient(cfg)
loop = asyncio.get_running_loop()
await client.start(loop)
service = client.get_service("echo")
surb_id = client.new_surb_id()
client.send_message(surb_id, "hello mixnet", *service.to_destination())
await client.await_message_reply()
asyncio.run(main())
Geometry Objects
class Geometry()
Geometry describes the geometry of a Sphinx packet.
NOTE: You must not try to compose a Sphinx Geometry yourself. It must be programmatically generated by Katzenpost genconfig or gensphinx CLI utilities.
We describe all the Sphinx Geometry attributes below, however the only one you are interested in to faciliate your thin client message bounds checking is UserForwardPayloadLength, which indicates the maximum sized message that you can send to a mixnet service in a single packet.
Attributes:
PacketLength
int - The total length of a Sphinx packet in bytes.NrHops
int - The number of hops; determines the header’s structure.HeaderLength
int - The total size of the Sphinx header in bytes.RoutingInfoLength
int - The length of the routing information portion of the header.PerHopRoutingInfoLength
int - The length of routing info for a single hop.SURBLength
int - The length of a Single-Use Reply Block (SURB).SphinxPlaintextHeaderLength
int - The length of the unencrypted plaintext header.PayloadTagLength
int - The length of the tag used to authenticate the payload.ForwardPayloadLength
int - The size of the full payload including padding and tag.UserForwardPayloadLength
int - The usable portion of the payload intended for the recipient.NextNodeHopLength
int - Derived from the expected maximum routing info block size.SPRPKeyMaterialLength
int - The length of the key used for SPRP (Sphinx packet payload encryption).NIKEName
str - Name of the NIKE scheme (if used). Mutually exclusive with KEMName.KEMName
str - Name of the KEM scheme (if used). Mutually exclusive with NIKEName.
ConfigFile Objects
class ConfigFile()
ConfigFile represents everything loaded from a TOML file: network, address, and geometry.
pretty_print_obj
def pretty_print_obj(obj)
Pretty-print a Python object using indentation.
This function uses pprintpp
to print complex data structures
(e.g., dictionaries, lists) in a readable, indented format.
Arguments:
obj
Any - The object to pretty-print.
ServiceDescriptor Objects
class ServiceDescriptor()
Describes a mixnet service endpoint retrieved from the PKI document.
A ServiceDescriptor encapsulates the necessary information for communicating with a service on the mix network. The service node’s identity public key’s hash is used as the destination address along with the service’s queue ID.
Attributes:
recipient_queue_id
bytes - The identifier of the recipient’s queue on the mixnet.mix_descriptor
dict - A CBOR-decoded dictionary describing the mix node, typically includes the ‘IdentityKey’ and other metadata.
Methods:
to_destination()
- Returns a tuple of (provider_id_hash, recipient_queue_id), where the provider ID is a 32-byte BLAKE2b hash of the IdentityKey.
find_services
def find_services(capability, doc)
Search the PKI document for services supporting the specified capability.
This function iterates over all service nodes in the PKI document,
deserializes each CBOR-encoded node, and looks for advertised capabilities.
If a service provides the requested capability, it is returned as a
ServiceDescriptor
.
Arguments:
capability
str - The name of the capability to search for (e.g., “echo”).doc
dict - The decoded PKI document as a Python dictionary, which must include a “ServiceNodes” key containing CBOR-encoded descriptors.
Returns:
List[ServiceDescriptor]
- A list of matching service descriptors that advertise the capability.
Raises:
KeyError
- If the ‘ServiceNodes’ field is missing from the PKI document.
Config Objects
class Config()
Config is the configuration object for the ThinClient.
__init__
def __init__(filepath,
on_connection_status=None,
on_new_pki_document=None,
on_message_sent=None,
on_message_reply=None)
Initialize the Config object.
Arguments:
filepath
str - Path to the TOML config file.on_connection_status
callable - Callback for connection status events.on_new_pki_document
callable - Callback for new PKI document events.on_message_sent
callable - Callback for sent message events.on_message_reply
callable - Callback for message reply events.
ThinClient Objects
class ThinClient()
A minimal Katzenpost Python thin client for communicating with the local Katzenpost client daemon over a UNIX or TCP socket.
The thin client is responsible for:
- Establishing a connection to the client daemon.
- Receiving and parsing PKI documents.
- Sending messages to mixnet services (with or without SURBs).
- Handling replies and events via user-defined callbacks.
All cryptographic operations are handled by the daemon, not by this client.
__init__
def __init__(config)
Initialize the thin client with the given configuration.
Arguments:
config
Config - The configuration object containing socket details and callbacks.
Raises:
RuntimeError
- If the network type is not recognized or config is incomplete.
start
async def start(loop)
Start the thin client: establish connection to the daemon, read initial events, and begin the background event loop.
Arguments:
loop
asyncio.AbstractEventLoop - The running asyncio event loop.
get_config
def get_config()
Returns the current configuration object.
Returns:
Config
- The client configuration in use.
stop
def stop()
Gracefully shut down the client and close its socket.
recv
async def recv(loop)
Receive a CBOR-encoded message from the daemon.
Arguments:
loop
asyncio.AbstractEventLoop - Event loop to use for socket reads.
Returns:
dict
- Decoded CBOR response from the daemon.
Raises:
ValueError
- If message framing fails.
worker_loop
async def worker_loop(loop)
Background task that listens for events and dispatches them.
parse_status
def parse_status(event)
Parse a connection status event and assert daemon connectivity.
pki_document
def pki_document()
Retrieve the latest PKI document received.
Returns:
dict
- Parsed CBOR PKI document.
parse_pki_doc
def parse_pki_doc(event)
Parse and store a new PKI document received from the daemon.
get_services
def get_services(capability)
Look up all services in the PKI that advertise a given capability.
Arguments:
capability
str - Capability name (e.g., “echo”).
Returns:
list[ServiceDescriptor]
- Matching services.
Raises:
Exception
- If PKI is missing or no services match.
get_service
def get_service(service_name)
Select a random service matching a capability.
Arguments:
service_name
str - The capability name (e.g., “echo”).
Returns:
ServiceDescriptor
- One of the matching services.
new_message_id
def new_message_id()
Generate a new 16-byte message ID for use with ARQ sends.
Returns:
bytes
- Random 16-byte identifier.
new_surb_id
def new_surb_id()
Generate a new 16-byte SURB ID for reply-capable sends.
Returns:
bytes
- Random 16-byte identifier.
handle_response
def handle_response(response)
Dispatch a parsed CBOR response to the appropriate handler or callback.
send_message_without_reply
def send_message_without_reply(payload, dest_node, dest_queue)
Send a fire-and-forget message with no SURB or reply handling.
Arguments:
payload
bytes or str - Message payload.dest_node
bytes - Destination node identity hash.dest_queue
bytes - Destination recipient queue ID.
send_message
def send_message(surb_id, payload, dest_node, dest_queue)
Send a message using a SURB to allow the recipient to send a reply.
Arguments:
surb_id
bytes - SURB identifier for reply correlation.payload
bytes or str - Message payload.dest_node
bytes - Destination node identity hash.dest_queue
bytes - Destination recipient queue ID.
send_reliable_message
def send_reliable_message(message_id, payload, dest_node, dest_queue)
Send a reliable message using an ARQ mechanism and message ID.
Arguments:
message_id
bytes - Message ID for reply correlation.payload
bytes or str - Message payload.dest_node
bytes - Destination node identity hash.dest_queue
bytes - Destination recipient queue ID.
pretty_print_pki_doc
def pretty_print_pki_doc(doc)
Pretty-print a parsed PKI document with fully decoded CBOR nodes.
Arguments:
doc
dict - Raw PKI document from the daemon.
await_message_reply
async def await_message_reply()
Asynchronously block until a reply is received from the daemon.