Skip to content

Protocol Specification

This document fully specifies the datasole wire protocol. It is intended to be machine-readable by LLMs and sufficient to implement a complete, interoperable datasole client in any language (Rust, Go, C++, C, Python, R, Java, etc.).

Protocol version: 1

1. Transport Layer

1.1 WebSocket Connection

PropertyValue
TransportWebSocket (RFC 6455)
Frame typeBinary (opcode 0x02) — all frames are binary, never text
SubprotocolNone (do not set Sec-WebSocket-Protocol)
ExtensionsDo not use permessage-deflate — compression is handled at the application layer (see §4)
Max frame size1,048,576 bytes (1 MiB)

1.2 Endpoint URL

The server listens for WebSocket upgrade requests at a configurable path. The default is:

ws[s]://<host>[:<port>]/__ds[?token=<auth_token>]
ComponentDefaultNotes
Schemews:// or wss://Use wss:// in production
Path/__dsConfigurable by the server
Query: token(optional)Authentication token — see §2

The client constructs the URL by:

  1. Taking the base URL (e.g. https://example.com)
  2. Replacing http with ws (i.e. https://wss://, http://ws://)
  3. Appending the path (/__ds)
  4. Appending ?token=<value> if an auth token is provided

1.3 HTTP Upgrade

The WebSocket handshake is a standard RFC 6455 upgrade. Key constraints:

  • The browser WebSocket API does not allow custom headers during the upgrade handshake
  • Authentication tokens are therefore sent as a query parameter (?token=…), not as an Authorization header
  • Non-browser clients (Go, Rust, Python, etc.) may additionally send an Authorization: Bearer <token> header during the upgrade, but the server extracts the token from the query parameter

Server-side upgrade behavior:

  • If the path does not match, the server destroys the socket (no response)
  • If authentication fails, the server responds HTTP/1.1 401 Unauthorized and destroys the socket
  • If an internal error occurs, the server responds HTTP/1.1 500 Internal Server Error

2. Authentication

2.1 Token Delivery

GET /__ds?token=eyJhbGciOiJIUzI1NiJ9... HTTP/1.1
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Version: 13
Sec-WebSocket-Key: ...

The server extracts the token from url.searchParams.get('token').

2.2 Auth Result

The server's auth handler returns a result object. On success, the connection proceeds. On failure, the socket is closed with 401.

json
{
  "authenticated": true,
  "userId": "user-42",
  "roles": ["admin", "user"],
  "metadata": { "displayName": "Alice" }
}

Auth context is available to all RPC handlers and events for the lifetime of the connection.

2.3 Anonymous Connections

If no auth handler is configured on the server, all connections are accepted with { "authenticated": true }.

3. Frame Format

Every message sent over the WebSocket is a binary frame with the following structure:

3.1 Frame Header (9 bytes, big-endian)

Offset  Size  Type     Field
──────  ────  ───────  ──────────────
0       1     uint8    opcode
1       4     uint32   correlationId
5       4     uint32   payloadLength
9       N     bytes    payload (JSON, UTF-8)
FieldTypeByte orderDescription
opcodeuint8Message type identifier (see §3.2)
correlationIduint32Big-endian (network order)RPC correlation; 0 for non-RPC messages
payloadLengthuint32Big-endian (network order)Length of the payload in bytes
payloadbyte[payloadLength]JSON-encoded UTF-8 bytes

Total frame size = 9 + payloadLength bytes.

3.2 Opcodes

ValueNameDirectionDescription
0x01RPC_REQClient → ServerRPC request
0x02RPC_RESServer → ClientRPC response
0x03EVENT_C2SClient → ServerClient-to-server event (fire-and-forget)
0x04EVENT_S2CServer → ClientServer-to-client event (broadcast)
0x05STATE_PATCHServer → ClientJSON Patch state update (RFC 6902)
0x06STATE_SNAPSHOTServer → ClientFull state snapshot
0x07PINGClient → ServerKeepalive ping
0x08PONGServer → ClientKeepalive pong
0x09ERRORServer → ClientError notification
0x0ACRDT_OPClient → ServerCRDT operation
0x0BCRDT_STATEServer → ClientCRDT state broadcast

3.3 Payload Encoding

The payload field is always a UTF-8 encoded JSON string, serialized with JSON.stringify() and encoded with TextEncoder.encode(). To decode: JSON.parse(TextDecoder.decode(payload)).

Implementations in other languages: serialize to JSON, then encode the JSON string as UTF-8 bytes.

4. Compression

datasole uses user-space compression on the entire frame (header + payload), not WebSocket protocol extensions.

4.1 Algorithm

PropertyValue
AlgorithmDEFLATE (RFC 1951) via zlib/pako
Wrapperzlib wrapper (RFC 1950) — pako.deflate() / pako.inflate()
Threshold256 bytes — frames ≤ 256 bytes are sent uncompressed

4.2 Compression Decision

Sender side (applies to both client and server):

encoded_frame = encode_frame(opcode, correlationId, payload)
if len(encoded_frame) > 256:
    wire_bytes = zlib_compress(encoded_frame)
else:
    wire_bytes = encoded_frame
websocket.send_binary(wire_bytes)

Receiver side:

wire_bytes = websocket.receive_binary()
if len(wire_bytes) > 256:
    frame_bytes = zlib_decompress(wire_bytes)
else:
    frame_bytes = wire_bytes
frame = decode_frame(frame_bytes)

4.3 Implementation Notes

  • The compression is applied to the entire frame (9-byte header + payload), not just the payload
  • The threshold check (> 256) is on the encoded frame size, not the payload size
  • Use standard zlib deflate/inflate (with zlib header, not raw deflate and not gzip)
  • In Python: zlib.compress() / zlib.decompress()
  • In Go: compress/flate with a zlib wrapper, or compress/zlib
  • In Rust: flate2 crate with ZlibEncoder / ZlibDecoder
  • In C/C++: zlib library, compress() / uncompress() or deflate() / inflate() with Z_DEFAULT_COMPRESSION
  • Do not negotiate permessage-deflate in the WebSocket handshake — this causes known memory leaks and data corruption in many implementations

5. Message Payloads

Each opcode has a specific JSON payload schema.

5.1 RPC Request (0x01 — Client → Server)

json
{
  "method": "getUser",
  "params": { "userId": "123" },
  "correlationId": 1
}
FieldTypeRequiredDescription
methodstringyesRPC method name
paramsanyyesMethod parameters (any JSON-serializable value)
correlationIdnumberyesMust match the frame header's correlationId

5.2 RPC Response (0x02 — Server → Client)

Success:

json
{
  "correlationId": 1,
  "result": { "name": "Alice", "email": "alice@example.com" }
}

Error:

json
{
  "correlationId": 1,
  "error": {
    "code": -1,
    "message": "User not found",
    "data": null
  }
}
FieldTypeRequiredDescription
correlationIdnumberyesMatches the request's correlationId
resultanyif successReturn value from the handler
errorobjectif errorError object with code, message, optional data

RPC is multiplexed: multiple requests can be in-flight simultaneously. The client matches responses to requests using correlationId. Clients should implement a timeout (default: 30 seconds).

5.3 Client Event (0x03 — Client → Server)

json
{
  "event": "analytics",
  "data": { "action": "click", "target": "buy-button" }
}
FieldTypeRequiredDescription
eventstringyesEvent name
dataanyyesEvent payload

Fire-and-forget. No response from the server. The correlationId in the frame header should be 0.

5.4 Server Event (0x04 — Server → Client)

json
{
  "event": "notification",
  "data": { "title": "Server restarting" },
  "timestamp": 1711234567890
}
FieldTypeRequiredDescription
eventstringyesEvent name
dataanyyesEvent payload
timestampnumberyesUnix timestamp in milliseconds

The correlationId in the frame header is 0.

5.5 State Patch (0x05 — Server → Client)

json
{
  "key": "dashboard",
  "patches": [
    { "op": "replace", "path": "/visitors", "value": 42 },
    { "op": "add", "path": "/active", "value": 7 }
  ]
}
FieldTypeRequiredDescription
keystringyesState key being updated
patchesarrayyesArray of JSON Patch operations (RFC 6902)

Each patch object:

FieldTypeRequiredDescription
opstringyesOne of: add, remove, replace, move, copy, test
pathstringyesJSON Pointer (RFC 6901) to the target location
valueanyfor add/replace/testThe value to apply
fromstringfor move/copySource JSON Pointer

Clients must maintain local state per key and apply patches in order using a conformant JSON Patch implementation. After applying all patches, the local state matches the server's state.

5.6 State Snapshot (0x06 — Server → Client)

json
{
  "key": "dashboard",
  "version": 1,
  "data": { "visitors": 0, "active": 0 }
}
FieldTypeRequiredDescription
keystringyesState key
versionnumberyesMonotonic version number
dataanyyesComplete state value

Sent when a client first subscribes to a state key, or when the server determines a full snapshot is more efficient than patches.

5.7 Ping (0x07 — Client → Server)

Payload: null (JSON).

The correlationId may be any value. The server responds with a PONG using the same correlationId.

5.8 Pong (0x08 — Server → Client)

Payload: null (JSON).

The correlationId matches the PING's correlationId.

5.9 Error (0x09 — Server → Client)

json
{
  "message": "Rate limit exceeded",
  "retryAfter": 5000
}
FieldTypeRequiredDescription
messagestringyesHuman-readable error message
retryAfternumbernoMilliseconds before the client should retry

5.10 CRDT Operation (0x0A — Client → Server)

json
{
  "key": "votes",
  "op": {
    "type": "pn-counter",
    "nodeId": "client-1",
    "timestamp": 1711234567890,
    "op": "increment",
    "value": 1
  }
}
FieldTypeRequiredDescription
keystringyesCRDT key
op.typestringyesCRDT type: pn-counter, lww-register, lww-map
op.nodeIdstringyesUnique node identifier for the client
op.timestampnumberyesUnix timestamp in milliseconds (hybrid logical clock)
op.opstringyesOperation: increment, decrement, set, add, remove
op.keystringnoSub-key for maps
op.valueanynoValue for set/add operations

5.11 CRDT State (0x0B — Server → Client)

json
{
  "key": "votes",
  "state": {
    "type": "pn-counter",
    "value": 42,
    "metadata": {
      "type": "pn-counter",
      "nodeId": "server",
      "timestamp": 1711234567890,
      "version": 7
    }
  }
}
FieldTypeRequiredDescription
keystringyesCRDT key
state.typestringyesCRDT type
state.valueanyyesCurrent merged value
state.metadataobjectyesCRDT metadata (type, nodeId, timestamp, version)

6. Connection Lifecycle

6.1 Connection Flow

Client                              Server
  |                                     |
  |--- HTTP Upgrade (GET /__ds?token=…) -->
  |                                     |-- Auth handler
  |<-- 101 Switching Protocols ---------|  (or 401 Unauthorized)
  |                                     |
  |===== Binary WebSocket Frames =======|
  |                                     |
  |--- RPC_REQ (correlationId=1) ------>|
  |<-- RPC_RES (correlationId=1) -------|
  |                                     |
  |--- EVENT_C2S ---------------------->|
  |<-- EVENT_S2C -----------------------|
  |<-- STATE_PATCH ---------------------|
  |<-- CRDT_STATE ----------------------|
  |                                     |
  |--- PING --------------------------->|
  |<-- PONG ----------------------------|
  |                                     |
  |--- Close (1000) ------------------->|
  |<-- Close (1000) --------------------|

6.2 Reconnection

Clients should implement automatic reconnection with linear backoff capped at 5x the base interval:

delay = base_interval * min(attempt_number, 5)
ParameterDefaultDescription
reconnecttrueEnable automatic reconnection
reconnectInterval1000 msBase delay between attempts
maxReconnectAttempts10Maximum number of reconnection attempts

Reconnection algorithm:

python
attempt = 0
while attempt < max_reconnect_attempts:
    attempt += 1
    delay = reconnect_interval * min(attempt, 5)
    sleep(delay)
    try:
        connect()
        attempt = 0  # reset on success
        break
    except:
        continue

On successful reconnection, the attempt counter resets to 0. All pending RPC calls should be rejected (cleared) before reconnection.

6.3 Close Codes

CodeMeaning
1000Normal closure
1001Server shutting down
1006Abnormal closure (connection lost)

7. Client Implementation Checklist

A complete datasole client implementation must:

7.1 Transport

  • [ ] Connect via WebSocket (binary mode) to ws[s]://<host>/__ds?token=<token>
  • [ ] Not negotiate permessage-deflate
  • [ ] Handle binary frames only
  • [ ] Implement reconnection with linear backoff (capped at 5x base)

7.2 Framing

  • [ ] Encode outgoing frames: 9-byte header (opcode, correlationId, payloadLength) + JSON payload
  • [ ] Decode incoming frames: parse 9-byte header, extract payload
  • [ ] Use big-endian (network byte order) for correlationId and payloadLength

7.3 Compression

  • [ ] Before sending: if encoded frame > 256 bytes, zlib-compress the entire frame
  • [ ] After receiving: if wire data > 256 bytes, zlib-decompress before decoding
  • [ ] Use standard zlib (RFC 1950) compression, not raw deflate or gzip

7.4 RPC

  • [ ] Assign monotonically increasing correlationId to each RPC request
  • [ ] Maintain a pending-requests map keyed by correlationId
  • [ ] Match incoming RPC_RES to pending requests by correlationId
  • [ ] Implement per-call timeout (default: 30 seconds)
  • [ ] Reject all pending RPCs on disconnect

7.5 Events

  • [ ] Send client events with opcode 0x03, correlationId = 0
  • [ ] Receive server events with opcode 0x04
  • [ ] Support multiple event listeners per event name

7.6 State

  • [ ] Maintain local state store keyed by state key
  • [ ] Apply JSON Patch (RFC 6902) operations from STATE_PATCH messages
  • [ ] Replace entire state on STATE_SNAPSHOT messages
  • [ ] Notify subscribers after each state update

7.7 CRDTs (optional)

  • [ ] Send CRDT operations with opcode 0x0A
  • [ ] Receive CRDT state broadcasts with opcode 0x0B
  • [ ] Implement at least: pn-counter (increment/decrement), lww-register (set), lww-map (set/remove)
  • [ ] Use hybrid logical clock timestamps (millisecond-resolution Date.now())
  • [ ] Merge remote state using last-writer-wins semantics

7.8 Keepalive

  • [ ] Periodically send PING frames
  • [ ] Expect PONG responses
  • [ ] Detect dead connections via PONG timeout

8. Reference Implementation

The canonical TypeScript implementation is at github.com/mayanklahiri/datasole.

Key source files for implementors:

FileWhat it defines
src/shared/protocol/opcodes.tsOpcode enum
src/shared/protocol/frames.tsFrame encode/decode (9-byte header)
src/shared/codec/compression.tszlib compress/decompress
src/shared/codec/serialization.tsJSON serialize/deserialize via TextEncoder
src/shared/types/rpc.tsRPC request/response/error types
src/shared/types/events.tsEvent payload types
src/shared/types/state.tsState patch/snapshot types
src/shared/crdt/types.tsCRDT operation/state types
src/shared/build-constants.tsProtocol version, defaults, thresholds
src/client/client.tsReference client (URL construction, reconnection, dispatch)
src/server/server.tsReference server (frame handling, broadcast, state management)

9. Example: Minimal Client in Pseudocode

python
import websocket
import json
import zlib

COMPRESSION_THRESHOLD = 256
correlation_counter = 0
pending_rpcs = {}

def encode_frame(opcode, correlation_id, payload_obj):
    payload_bytes = json.dumps(payload_obj).encode('utf-8')
    header = bytes([opcode])
    header += correlation_id.to_bytes(4, 'big')
    header += len(payload_bytes).to_bytes(4, 'big')
    return header + payload_bytes

def decode_frame(data):
    opcode = data[0]
    correlation_id = int.from_bytes(data[1:5], 'big')
    payload_length = int.from_bytes(data[5:9], 'big')
    payload = json.loads(data[9:9+payload_length].decode('utf-8'))
    return opcode, correlation_id, payload

def send(ws, opcode, correlation_id, payload):
    frame = encode_frame(opcode, correlation_id, payload)
    if len(frame) > COMPRESSION_THRESHOLD:
        frame = zlib.compress(frame)
    ws.send_binary(frame)

def receive(ws):
    data = ws.recv()
    if len(data) > COMPRESSION_THRESHOLD:
        data = zlib.decompress(data)
    return decode_frame(data)

def rpc(ws, method, params):
    global correlation_counter
    correlation_counter += 1
    cid = correlation_counter
    send(ws, 0x01, cid, {"method": method, "params": params, "correlationId": cid})
    # wait for RPC_RES with matching correlationId...

# Connect
ws = websocket.connect("ws://localhost:3000/__ds?token=my-token",
                        suppress_origin=True)
# disable permessage-deflate in your WebSocket library configuration

# Send an RPC
rpc(ws, "getUser", {"id": "42"})