Skip to content

Server API

New here? Start with the Developer Guide for integration-first setup, then use Tutorials for the full step-by-step build. For a condensed options table, see Configuration Reference.

Quick Start

typescript
import { createServer } from 'http';
import { DatasoleServer } from 'datasole/server';

const ds = new DatasoleServer<AppContract>();
const http = createServer();
await ds.init();
ds.transport.attach(http);
http.listen(3000);

(await ds.init() connects distributed backends when needed; it is a no-op for MemoryBackend.)

Configuration Reference

All options are passed to new DatasoleServer<T>(options). Every field is optional — sensible defaults are applied.

DatasoleServerOptions

typescript
interface DatasoleServerOptions {
  path?: string;
  authHandler?: AuthHandlerFn;
  stateBackend?: StateBackend;
  backendConfig?: BackendConfig;
  rateLimiter?: RateLimiter;
  executor?: Partial<ExecutorOptions>;
  rateLimit?: RateLimitConfig;
  session?: SessionOptions;
  maxConnections?: number;
  maxCrdtKeys?: number;
  maxEventNameLength?: number;
}

path

WebSocket endpoint path. Clients connect to ws://<host><path>.

Typestring
Default'/__ds'
Example'/ws', '/realtime'

The double-underscore prefix convention signals "framework internal" and avoids collision with common application routes like /api or /ws.


authHandler

Authenticate the HTTP upgrade request before establishing the WebSocket connection.

Type(req: IncomingMessage) => Promise<AuthResult>
DefaultPass-through (all connections allowed, userId set to remote address)

Return { authenticated: true, userId, roles?, metadata? } to accept the connection, or { authenticated: false } to reject with HTTP 401.

AuthResult fields:

FieldTypeRequiredDescription
authenticatedbooleanyesWhether the connection is allowed
userIdstringnoUnique user identifier (populates ConnectionContext.userId)
rolesstring[]noAuthorization roles for permission checks in RPC handlers
metadataRecord<string, unknown>noArbitrary metadata attached to the connection context
typescript
const ds = new DatasoleServer<AppContract>({
  authHandler: async (req) => {
    const token = req.headers.authorization?.replace('Bearer ', '');
    if (!token) return { authenticated: false };

    const user = await verifyJwt(token);
    return {
      authenticated: true,
      userId: user.id,
      roles: user.roles,
      metadata: { displayName: user.name },
    };
  },
});

The auth result is available in all RPC handlers via ctx.auth and ctx.connection:

typescript
ds.rpc.register('protectedMethod', async (params, ctx) => {
  if (!ctx.auth?.roles?.includes('admin')) {
    throw new Error('Forbidden');
  }
  // ctx.connection.userId, ctx.connection.metadata, etc.
});

stateBackend

Pluggable key-value + pub/sub backend instance. All primitives share this single backend, so swapping it makes the entire server distributed.

TypeStateBackend
Defaultnew MemoryBackend()

Built-in backends:

BackendImportUse case
MemoryBackenddatasole/serverDevelopment, single-process
RedisBackenddatasole/serverMulti-process, production (optional peer dep)
PostgresBackenddatasole/serverPersistent state, audit trails (optional peer dep)
typescript
import { RedisBackend } from 'datasole/server';

const ds = new DatasoleServer<AppContract>({
  stateBackend: new RedisBackend({ url: 'redis://localhost:6379' }),
});

StateBackend interface (for custom implementations):

typescript
interface StateBackend {
  get<T = unknown>(key: string): Promise<T | undefined>;
  set<T = unknown>(key: string, value: T): Promise<void>;
  delete(key: string): Promise<boolean>;
  subscribe(key: string, handler: (key: string, value: unknown) => void): () => void;
  publish(key: string, value: unknown): Promise<void>;
}

backendConfig

Declarative backend configuration — an alternative to stateBackend. Useful when the backend type is loaded from a config file or environment variables.

TypeBackendConfig
Defaultundefined (uses stateBackend)

If both stateBackend and backendConfig are provided, construction throws — pass only one.

typescript
interface BackendConfig {
  type: 'memory' | 'redis' | 'postgres';
  redis?: { url?: string; keyPrefix?: string; prefix?: string };
  postgres?: { connectionString?: string; tableName?: string; prefix?: string };
}
typescript
const ds = new DatasoleServer<AppContract>({
  backendConfig: {
    type: 'redis',
    redis: { url: process.env.REDIS_URL, keyPrefix: 'myapp:' },
  },
});

rateLimiter

Pluggable frame rate limiter. If omitted, the server uses DefaultRateLimiter with the same StateBackend as the rest of the server.

TypeRateLimiter
Defaultnew DefaultRateLimiter(backend)

Implementations may expose optional connect() for async startup; it is invoked from await ds.init().


WebSocket compression note

Per-message deflate is not a DatasoleServer option: the transport always leaves it off. Outbound frames are compressed by datasole above the WebSocket layer when they exceed the internal threshold (see shared codec), which avoids the CPU and memory pitfalls of per-message-deflate at high connection counts.


executor

Connection executor configuration — controls how incoming WebSocket frames are dispatched and processed.

TypePartial<ExecutorOptions>
Default{ model: 'async' }

See Executor Models below for detailed guidance.

typescript
interface ExecutorOptions {
  model: 'async' | 'thread' | 'thread-pool';
  poolSize?: number;
  maxThreads?: number;
  workerScript?: string;
  idleTimeout?: number;
}
FieldTypeDefaultApplies toDescription
model'async' | 'thread' | 'thread-pool''async'allConcurrency model
poolSizenumberos.availableParallelism()thread-poolNumber of worker threads in the pool
maxThreadsnumber256threadUpper bound on per-connection threads
workerScriptstringundefinedthread, thread-poolPath to JS/TS module loaded inside each worker
idleTimeoutnumber30000thread, thread-poolMilliseconds before an idle thread is recycled
typescript
// Minimal — use defaults (async executor)
const ds = new DatasoleServer<AppContract>();

// I/O-bound workload — single event loop, lowest overhead
const ds = new DatasoleServer<AppContract>({
  executor: { model: 'async' },
});

// CPU-bound per-connection work
const ds = new DatasoleServer<AppContract>({
  executor: { model: 'thread', maxThreads: 64 },
});

// Explicit pool sizing
const ds = new DatasoleServer<AppContract>({
  executor: { model: 'thread-pool', poolSize: 8 },
});

rateLimit

Frame-level rate limiting configuration. Rate limits are enforced per connection per sliding window. Uses the configured StateBackend, so limits are automatically distributed with Redis or Postgres.

TypeRateLimitConfig
Default{ defaultRule: { windowMs: 60_000, maxRequests: 100 } }
typescript
interface RateLimitConfig {
  defaultRule: RateLimitRule;
  rules?: Record<string, RateLimitRule>;
  keyExtractor?: (connectionId: string, method?: string) => string;
}

interface RateLimitRule {
  windowMs: number; // Sliding window duration in milliseconds
  maxRequests: number; // Maximum requests allowed per window
}
FieldTypeDefaultDescription
defaultRuleRateLimitRule{ windowMs: 60_000, maxRequests: 100 }Applied when no per-method rule matches
rulesRecord<string, RateLimitRule>{}Per-method overrides keyed by RPC method name or event name
keyExtractor(connId, method?) => stringundefinedCustom key function for rate limit buckets (e.g., per-user instead of per-connection)
typescript
const ds = new DatasoleServer<AppContract>({
  rateLimit: {
    defaultRule: { windowMs: 60_000, maxRequests: 200 },
    rules: {
      search: { windowMs: 60_000, maxRequests: 30 },
      upload: { windowMs: 60_000, maxRequests: 5 },
    },
    keyExtractor: (connId, method) => `${connId}:${method ?? 'default'}`,
  },
});

session

Session persistence tuning. Sessions auto-flush dirty writes to the state backend.

TypeSessionOptions
Default{} (uses SessionManager internal defaults)
typescript
interface SessionOptions {
  flushThreshold?: number; // Persist after N mutations (default: 10)
  flushIntervalMs?: number; // Or every N ms (default: 5000)
  ttlMs?: number; // Session expiry TTL (default: no expiry)
  enableChangeStream?: boolean; // Emit change events (default: false)
}
FieldTypeDefaultDescription
flushThresholdnumber10Number of mutations before auto-flush to backend
flushIntervalMsnumber5000Timer-based flush interval (ms)
ttlMsnumberundefinedSession expiry (ms). No expiry if omitted
enableChangeStreambooleanfalseEmit change events via ds.primitives.sessions.onChange()
typescript
const ds = new DatasoleServer<AppContract>({
  session: {
    flushThreshold: 5,
    flushIntervalMs: 2000,
    ttlMs: 3_600_000, // 1 hour
    enableChangeStream: true,
  },
});

maxConnections

Maximum simultaneous WebSocket connections. Connections beyond this limit are rejected at the transport layer before auth.

Typenumber
Default10_000

maxCrdtKeys

Maximum number of distinct CRDT keys the server will track. Prevents memory exhaustion from unbounded CRDT registration.

Typenumber
Default1000

maxEventNameLength

Maximum allowed length (characters) for client-to-server event names. Events exceeding this limit are silently dropped.

Typenumber
Default256

Complete Example

typescript
import { createServer } from 'http';
import { DatasoleServer, RedisBackend } from 'datasole/server';

const ds = new DatasoleServer<AppContract>({
  path: '/__ds',

  authHandler: async (req) => {
    const token = req.headers.authorization?.replace('Bearer ', '');
    if (!token) return { authenticated: false };
    const user = await verifyJwt(token);
    return { authenticated: true, userId: user.id, roles: user.roles };
  },

  stateBackend: new RedisBackend({ url: process.env.REDIS_URL }),

  executor: { model: 'thread-pool', poolSize: 8 },

  rateLimit: {
    defaultRule: { windowMs: 60_000, maxRequests: 200 },
    rules: { 'heavy-rpc': { windowMs: 60_000, maxRequests: 10 } },
  },

  session: {
    flushThreshold: 10,
    flushIntervalMs: 5000,
    ttlMs: 3_600_000,
  },

  maxConnections: 50_000,
  maxCrdtKeys: 500,
  maxEventNameLength: 128,
});

const http = createServer();
await ds.init();
ds.transport.attach(http);
http.listen(3000);

Executor Models

The executor determines how incoming WebSocket frames are dispatched and processed. All models are cluster-friendly — no shared mutable state in the main process.

ModelDescriptionWhen to useOverhead
asyncSingle event loop, all connections in-process (default)I/O-bound: chat, notifications, dashboardsLowest
threadDedicated worker_threads per connectionCPU-bound per-connection: game logic, computationMedium
thread-poolFixed pool, least-connections assignmentProduction workloads, general-purposeLow–medium

async — Single Event Loop (Default)

All frames are processed on the Node.js event loop with no thread isolation. This is the default model and the lightest option, ideal when your handlers are predominantly I/O-bound (database queries, external API calls, broadcasting). No serialization overhead.

typescript
const ds = new DatasoleServer<AppContract>({
  executor: { model: 'async' },
});

thread — Thread per Connection

Spawns a dedicated worker_threads thread for each connection. Best for CPU-intensive per-connection processing (game physics, real-time computation, audio/video processing). Each thread can initialize its own backend instance or share the parent's.

typescript
const ds = new DatasoleServer<AppContract>({
  executor: {
    model: 'thread',
    maxThreads: 64,
    idleTimeout: 60_000,
  },
});

Use maxThreads to cap thread count and prevent resource exhaustion during connection spikes. Threads are recycled after idleTimeout ms of inactivity.

A fixed pool of worker_threads with least-connections assignment. Recommended for production deployments. It balances thread isolation with resource efficiency — a small number of threads handle many connections.

typescript
const ds = new DatasoleServer<AppContract>({
  executor: {
    model: 'thread-pool',
    poolSize: 8,
  },
});

poolSize defaults to os.availableParallelism() (typically the number of CPU cores). For I/O-heavy workloads with occasional CPU bursts, this is the best default.

Worker Scripts

For thread and thread-pool models, you can specify a workerScript — a JS/TS module loaded inside each worker thread. This module can register RPC handlers and primitives that run in the thread context:

typescript
const ds = new DatasoleServer<AppContract>({
  executor: {
    model: 'thread-pool',
    poolSize: 4,
    workerScript: './src/worker-setup.js',
  },
});

pm2 Cluster Mode

Because the executor keeps no shared mutable state in the main process, and Redis/Postgres backends provide cross-process pub/sub, pm2 cluster mode works out of the box:

bash
pm2 start dist/server.js -i max

Attach to HTTP Server

typescript
import { createServer } from 'http';

const http = createServer();
ds.transport.attach(http);
http.listen(3000);

ds.transport.attach() hooks into the HTTP server's upgrade event to handle WebSocket connections. Works with any Node.js HTTP server — Express, Koa, Fastify, NestJS, or plain http.createServer().


RPC Handlers

Register typed request/response handlers. The client calls them with client.rpc(RpcMethod.Foo, params) — use RpcMethod string enums in shared/contract.ts, not raw string literals.

typescript
import { RpcMethod } from './shared/contract';

ds.rpc.register(RpcMethod.GetUser, async (params, ctx) => {
  // ctx.auth — the authenticated user's identity
  // ctx.connectionId — unique connection ID
  // ctx.connection — full ConnectionContext (metadata, tags, get/set)
  console.log(`User ${ctx.auth?.userId} is looking up ${params.userId}`);
  return { name: 'Alice', email: 'alice@example.com' };
});

Tutorial: RPC — Call the Server, Get a Response

Server → Client Live State

The most powerful pattern: mutate a data structure on the server, and every connected client sees a live mirror. Only the JSON Patch diff is sent.

typescript
import { StateKey } from './shared/contract';

await ds.primitives.live.setState(StateKey.Dashboard, { visitors: 0, active: 0 });

setInterval(async () => {
  await ds.primitives.live.setState(StateKey.Dashboard, {
    visitors: getVisitorCount(),
    active: getActiveCount(),
  });
}, 1000);

Clients subscribe with client.subscribeState(StateKey.Dashboard, handler) — no polling, no event mapping, no client-side state management.

Tutorial: Live State — A Server-Synced Dashboard

Events

typescript
import { Event } from './shared/contract';

ds.primitives.events.on(Event.ChatMessage, ({ data }) => {
  console.log('Received:', data.text);
});

ds.primitives.fanout.broadcast(Event.Notification, { title: 'Server restarting in 5 minutes' });

ds.primitives.events.off(Event.ChatMessage, handler);

Tutorial: Server Events — A Live Stock Ticker

Sync Channels

Fine-grained control over when state updates are flushed to clients.

typescript
const alerts = ds.primitives.live.createSyncChannel({
  key: 'alerts',
  direction: 'server-to-client',
  mode: 'json-patch',
  flush: { flushStrategy: 'immediate' },
});

const metrics = ds.primitives.live.createSyncChannel({
  key: 'metrics',
  direction: 'server-to-client',
  mode: 'json-patch',
  flush: { flushStrategy: 'batched', batchIntervalMs: 200, maxBatchSize: 50 },
});

const form = ds.primitives.live.createSyncChannel({
  key: 'form',
  direction: 'client-to-server',
  mode: 'json-patch',
  flush: { flushStrategy: 'debounced', debounceMs: 500 },
});

Tutorial: Sync Channels — Controlled Flush Granularity

Session Manager

Per-user state that survives disconnections. Auto-flushes to the state backend.

typescript
const state = await ds.primitives.sessions.restore(ctx.connection);

ds.primitives.sessions.set('user-123', 'lastPage', '/dashboard');
const page = ds.primitives.sessions.get<string>('user-123', 'lastPage');

ds.primitives.sessions.onChange((userId, key, value, version) => {
  console.log(`${userId} → ${key} = ${JSON.stringify(value)} (v${version})`);
});

Tutorial: Session Persistence — Surviving Reconnections

Rate Limiting

Frame-level rate limiting on persistent WebSocket connections. Rate limiting uses a DefaultRateLimiter backed by the configured StateBackend, so it is automatically distributed when using Redis or Postgres.

typescript
const ds = new DatasoleServer<AppContract>({
  rateLimit: {
    defaultRule: { windowMs: 60_000, maxRequests: 100 },
    rules: {
      search: { windowMs: 60_000, maxRequests: 30 },
      upload: { windowMs: 60_000, maxRequests: 5 },
    },
  },
});

Authentication

Hook into the HTTP upgrade request to authenticate connections. Supports any auth scheme (JWT, OAuth, SSO, API keys).

typescript
const ds = new DatasoleServer<AppContract>({
  authHandler: async (req) => {
    const token = req.headers.authorization?.replace('Bearer ', '');
    if (!token) return { authenticated: false };

    const user = await verifyJwt(token);
    return {
      authenticated: true,
      userId: user.id,
      roles: user.roles,
      metadata: { displayName: user.name },
    };
  },
});

The auth result is available everywhere via ConnectionContext:

typescript
import { RpcMethod } from './shared/contract';

ds.rpc.register(RpcMethod.ProtectedMethod, async (params, ctx) => {
  if (!ctx.auth?.roles?.includes('admin')) {
    throw new Error('Forbidden');
  }
});

Tutorial: Client Events + Auth — A Chat Room

Framework Adapters

Express

typescript
import express from 'express';
import { createServer } from 'http';
import { DatasoleServer } from 'datasole/server';

const app = express();
const httpServer = createServer(app);
const ds = new DatasoleServer<AppContract>();
await ds.init();
ds.transport.attach(httpServer);
httpServer.listen(3000);

NestJS

typescript
import { DatasoleServer } from 'datasole/server';

const ds = new DatasoleServer<AppContract>();
await ds.init();
ds.transport.attach(app.getHttpServer());

Native HTTP

typescript
import { createServer } from 'http';
import { DatasoleServer } from 'datasole/server';

const server = createServer();
const ds = new DatasoleServer<AppContract>();
await ds.init();
ds.transport.attach(server);
server.listen(3000);

API surface

MemberRole
await ds.init()Connect StateBackend / optional RateLimiter before transport.attach
ds.transport.attach(httpServer, adapter?)WebSocket upgrade + static client/worker assets
ds.transport.getConnectionCount()Connected WebSocket clients
ds.primitives.live (setState, getState, sync/data channels) / ds.primitives.fanout.broadcastServer→client orchestration
ds.rpcTyped RPC registry
ds.metricsIn-process counters (snapshot(), etc.)
ds.primitives.state / events / crdt / sessions / rateLimiterDirect primitive access
ds.close()Graceful shutdown

Facades expose readonly server: DatasoleServer<T> for sibling access from nested code.