Important: This documentation covers Yarn 1 (Classic).
For Yarn 2+ docs and migration guide, see yarnpkg.com.

Package detail

@cnstra/core

abaikov709MIT1.6.0TypeScript support: included

Neural network-inspired event flow system for building reactive applications with dependency-injected neurons and synapses

neural-network, event-flow, reactive, dependency-injection, neuron, synapse, axon, dendrite, typescript, node, browser

readme

@cnstra/core

Graph-routed, type-safe orchestration for reactive apps — no global event bus.

📚 Full Documentation | Quick Start | API Reference | Recipes

🧠 What is CNStra?

CNStra (Central Nervous System Orchestrator) models your app as a typed neuron graph.
You explicitly start a run with cns.stimulate(...); CNStra then performs a deterministic, hop-bounded traversal from collateral → dendrite → returned signal, step by step.

Zero dependencies: CNS has no third-party dependencies, making it suitable for any JavaScript/TypeScript environment - browsers, Node.js, serverless, edge functions, React Native, or embedded systems.

No pub/sub: there are no ambient listeners or global emit. Only the signal you return from a dendrite continues the traversal; returning null/undefined ends that branch. Hop limits guard against cycles.

We follow the IERG approach — Inverted Explicit Reactive Graph. You explicitly stimulate the graph; reactions are local and deterministic; no background listeners or buses exist.

💡 Why CNStra

IERG is our inverted control flow: you start the run, we walk the reaction graph explicitly. This is not Flux and not an event bus.

  • Deterministic routing: signals are delivered along an explicit neuron graph, not broadcast to whoever “happens to listen”.
  • Readable, reliable flows: each step is local and typed; branches are explicit, so debugging feels like reading a storyboard, not a log stream.
  • Backpressure & concurrency: built‑in per‑stimulation and per‑neuron concurrency limits keep workloads controlled without custom plumbing.
  • Saga‑grade orchestration: IERG already models long‑running, multi‑step reactions with retries/cancellation hooks (abort), so you rarely need to hand‑roll “sagas”.
  • Safer than ad‑hoc events: no hidden global listeners, no accidental fan‑out; every continuation must be returned explicitly.

🏗️ Core Model

Neurons

Units of logic with clear DI and sharp boundaries:

  • ID — unique name
  • Axon — the neuron's output channels (its collaterals)
  • Dendritesinput receptors (typed reactions bound to specific collaterals)

Collaterals

Typed output channels that mint signals:

  • ID — string identifier (e.g., 'user:created')
  • Payload — the shape carried by the signal
  • createSignal(payload){ collateral, payload }

Signals

The data structures that flow through the system:

  • collateral — reference to the collateral that created this signal
  • payload — the typed data being transmitted

🚀 Quick Start

npm install @cnstra/core
import { CNS, collateral, neuron } from '@cnstra/core';

// Define collaterals (communication channels)
const userCreated = collateral<{ id: string; name: string }>('user:created');
const userRegistered = collateral<{ userId: string; status: string }>('user:registered');

// Create a neuron
const userService = neuron('user-service', {
  userRegistered
})
.dendrite({
  collateral: userCreated,
  response: (payload, axon) => {
    const userData = payload;

    // Process the user creation
    console.log(`Processing user: ${userData.name}`);

    // Return the signal that will be processed by CNS
    return axon.userRegistered.createSignal({
      userId: userData.id,
      status: 'completed'
    });
  }
});

// Create the CNS system
const cns = new CNS([userService]);

// Stimulate the system
await cns.stimulate(userCreated.createSignal({
  id: '123',
  name: 'John Doe'
}));

📚 API Reference

collateral<T>(id: string)

Creates a new collateral (communication channel).

const userEvent = collateral<{ userId: string }>('user:event');
const simpleEvent = collateral('simple:event'); // No payload type

neuron(id: string, axon: Axon)

Creates a new neuron with the specified axon (output channels).

const myNeuron = neuron('my-neuron', {
  output: myCollateral
});

neuron.dendrite(dendrite: Dendrite)

Adds a dendrite (input receptor) to a neuron. Returns the neuron for chaining.

The response function can return:

  • A single signal: axon.output.createSignal(data)
  • An array of signals: [axon.output1.createSignal(data1), axon.output2.createSignal(data2)]
  • A Promise of either
  • undefined/void to end processing
myNeuron
  .dendrite({
    collateral: inputCollateral,
    response: async (payload, axon, ctx) => {
      // ctx: { get, set, abortSignal?, cns? }
      if (ctx.abortSignal?.aborted) return; // graceful cancel
      const prev = ctx.get();
      ctx.set({ ...prev, handled: true });

      // Return single signal or array of signals
      return axon.output.createSignal(result);
    }
  });

neuron.bind(axon, map) — exhaustive subscriptions (with shorthand)

Bind this neuron to every collateral of another neuron's axon in one place. The map must be exhaustive: you must provide a handler for each collateral key of axon. This gives you compile-time safety: if someone adds a new collateral later, TypeScript will immediately flag missing handlers.

  • You can pass either a full dendrite object per key or just a response function shorthand.
  • Payload types are inferred from the followed axon.

This pattern is especially useful for domain-oriented neurons that must react to every way a record can be created/changed. For example, an email-notifier neuron can safely ensure emails are sent for every creation path; if a new creation collateral is introduced, the build will fail until the notifier adds a corresponding handler.

import { withCtx, collateral } from '@cnstra/core';

// Order domain model (axon)
const order = {
  created: collateral<{ id: string; amount: number }>('order:created'),
  updated: collateral<{ id: string; changes: Record<string, unknown> }>('order:updated'),
  cancelled: collateral<{ id: string; reason?: string }>('order:cancelled'),
};

// Mailer neuron must react to ALL order events
withCtx()
  .neuron('order-mailer', { /* your axon if you emit follow-up signals */ })
  .bind(order, {
    created: (payload) => {
      sendEmail(`Order created #${payload.id} for $${payload.amount}`);
      return undefined;
    },
    updated: (payload) => {
      sendEmail(`Order updated #${payload.id} (changes: ${Object.keys(payload.changes).join(', ')})`);
      return undefined;
    },
    cancelled: (payload) => {
      sendEmail(`Order cancelled #${payload.id}${payload.reason ? `: ${payload.reason}` : ''}`);
      return undefined;
    },
  });

// If later someone adds a new event variant, e.g. refunds:
// const order = { ...order, refunded: collateral<{ id: string; amount: number }>('order:refunded') };
// TypeScript will now error until you also add a `refunded` handler in the .bind(...) map.

CNS Class

The main orchestrator that manages signal flow between neurons.

Constructor

new CNS(neurons, options?)

Parameters:

  • neurons: Array of neurons that process signals
  • options: Optional CNS configuration

Global listeners

const unsubscribe = cns.addResponseListener(r => {
  // fires for every stimulation (input + outputs)
});
// unsubscribe();

stimulate() Method

cns.stimulate(signal | signal[], options?)

Parameters:

  • signal: A signal or array of signals created by collateral.createSignal(payload)
  • options: Optional stimulation configuration

Returns: Promise<void> that resolves when stimulation completes

Example:

// Single signal
await cns.stimulate(
  userCreated.createSignal({ id: '123', name: 'John' })
);

// Multiple signals at once
await cns.stimulate([
  userCreated.createSignal({ id: '123', name: 'John' }),
  userCreated.createSignal({ id: '456', name: 'Jane' })
]);

⚙️ Stimulation Options

maxNeuronHops?: number (default: 1000)

Prevents infinite loops by limiting signal traversal depth.

await cns.stimulate(signal, {
  maxNeuronHops: 50 // Stop after 50 neuron hops
});

onResponse?: (response) => void

Real-time callback for monitoring signal flow and completion.

await cns.stimulate(signal, {
  onResponse: (response) => {
    console.log(`Signal: ${response.outputSignal?.collateral.id}`);
    console.log(`Hops: ${response.hops}`);

    if (response.error) {
      console.error('Processing failed:', response.error);
    }

    if (response.queueLength === 0) {
      console.log('Stimulation completed');
    }
  }
});

Response Object:

  • outputSignal — The signal being processed (if any)
  • hops — Number of neuron hops taken so far
  • queueLength — Remaining signals in processing queue (0 = complete)
  • error — Any error that occurred during processing
  • stimulationId — Unique identifier for this stimulation

abortSignal?: AbortSignal

Gracefully stop stimulation using AbortController.

const controller = new AbortController();

cns.stimulate(signal, {
  abortSignal: controller.signal
});

// Cancel after 5 seconds
setTimeout(() => controller.abort(), 5000);

Inside dendrites, read it via ctx.abortSignal.

stimulationId?: string

Custom identifier for this stimulation cascade. Auto-generated if not provided.

await cns.stimulate(signal, {
  stimulationId: 'user-action-' + Date.now()
});

allowName?: (collateralName: string) => boolean

Filter which collateral names can be processed.

await cns.stimulate(signal, {
  allowName: (name) => name.startsWith('user:') // Only process user-related signals
});

concurrency?: number (default: unlimited)

Limit concurrent operations to prevent resource exhaustion.

await cns.stimulate(signal, {
  concurrency: 10 // Max 10 operations at once
});

Per‑neuron global concurrency

Set a limit per neuron; parallel stimulations share the same gate.

const worker = neuron('worker', { out })
  .setConcurrency(2)
  .dendrite({ /* ... */ });

ctx?: ICNSStimulationContextStore

Provide existing context store for recovery/retry scenarios.

await cns.stimulate(signal, {
  ctx: savedContextStore // Restore previous state
});

createContextStore?: () => ICNSStimulationContextStore

Factory for custom context store implementations.

await cns.stimulate(signal, {
  createContextStore: () => new CustomContextStore()
});

🔄 Signal Flow Patterns

Basic Chain Processing

const input = collateral<{ value: number }>('input');
const middle = collateral<{ doubled: number }>('middle');
const output = collateral<{ result: string }>('output');

const step1 = neuron('step1', { middle }).dendrite({
  collateral: input,
  response: (payload, axon) => {
    return axon.middle.createSignal({ doubled: payload.value * 2 });
  }
});

const step2 = neuron('step2', { output }).dendrite({
  collateral: middle,
  response: (payload, axon) => {
    return axon.output.createSignal({ result: `Final: ${payload.doubled}` });
  }
});

const cns = new CNS([step1, step2]);

await cns.stimulate(input.createSignal({ value: 5 }));
// Flows: input(5) → middle(10) → output("Final: 10")

Context-Aware Processing

import { withCtx } from '@cnstra/core';

const input = collateral<{ increment: number }>('input');
const output = collateral<{ count: number }>('output');

const counter = withCtx<{ total: number }>()
  .neuron('counter', { output })
  .dendrite({
    collateral: input,
    response: async (payload, axon, ctx) => {
      const current = ctx.get()?.total || 0;
      const newTotal = current + payload.increment;

      ctx.set({ total: newTotal });

      return axon.output.createSignal({ count: newTotal });
    }
  });

const cns = new CNS([counter]);

await cns.stimulate(input.createSignal({ increment: 5 })); // count: 5
await cns.stimulate(input.createSignal({ increment: 3 })); // count: 8 (separate context)

Multiple Signals (Fan-out Pattern)

Neurons can return arrays of signals for parallel processing:

const orderPlaced = collateral<{ orderId: string; items: string[] }>('order:placed');
const updateInventory = collateral<{ orderId: string; item: string }>('update:inventory');
const sendEmail = collateral<{ orderId: string }>('send:email');
const logAudit = collateral<{ orderId: string }>('log:audit');

const orderProcessor = neuron('order-processor', { 
  updateInventory, 
  sendEmail, 
  logAudit 
}).dendrite({
  collateral: orderPlaced,
  response: (payload, axon) => {
    // Return array of signals - each will be processed independently
    return [
      // Create signal for each item
      ...payload.items.map(item => 
        axon.updateInventory.createSignal({ orderId: payload.orderId, item })
      ),
      // Also send confirmation email
      axon.sendEmail.createSignal({ orderId: payload.orderId }),
      // And log the action
      axon.logAudit.createSignal({ orderId: payload.orderId })
    ];
  }
});

// Downstream neurons process each signal independently
const inventoryService = neuron('inventory-service', {}).dendrite({
  collateral: updateInventory,
  response: (payload) => {
    console.log(`Updating inventory for ${payload.item}`);
    // Each item gets its own execution
  }
});

const cns = new CNS([orderProcessor, inventoryService /* ... */]);

// Single order triggers multiple parallel operations
await cns.stimulate(
  orderPlaced.createSignal({ orderId: 'ORD-001', items: ['A', 'B', 'C'] })
);
// Flows: orderPlaced → [inventory(A), inventory(B), inventory(C), email, audit]

Conditional Arrays:

const validator = neuron('validator', { success, error, audit }).dendrite({
  collateral: input,
  response: (payload, axon) => {
    const signals = [];

    // Conditionally add signals
    if (isValid(payload.data)) {
      signals.push(axon.success.createSignal({ validated: payload.data }));
    } else {
      signals.push(axon.error.createSignal({ error: 'Invalid' }));
    }

    // Always audit
    signals.push(axon.audit.createSignal({ attempted: true }));

    return signals; // Return empty array to emit nothing
  }
});

Starting with Multiple Signals:

// Process multiple orders in parallel
await cns.stimulate([
  orderPlaced.createSignal({ orderId: 'ORD-001', items: ['X'] }),
  orderPlaced.createSignal({ orderId: 'ORD-002', items: ['Y', 'Z'] })
]);

See Multiple Signals Recipe for more patterns and best practices.

🧠 Memory & Performance

Memory-Efficient Design

  • Zero dependencies: No third-party packages
  • No error storage: Errors delivered via callbacks, not stored
  • Streaming responses: Signal traces delivered via callbacks
  • Context on-demand: Context stores created only when needed
  • No global state: Clean slate between stimulations

Performance Characteristics

  • Sync-first: Synchronous chains execute in single tick
  • Minimal async overhead: Promises created only when needed
  • Stack-safe: Handles deep chains without stack overflow
  • Bounded execution: maxNeuronHops prevents runaway processing

Best Practices

  • Keep context data minimal (IDs, counters, flags)
  • Use synchronous responses when possible
  • Set reasonable maxNeuronHops limits
  • Implement proper error handling in onResponse
  • Use array returns for genuine fan-out patterns, not just convenience
  • Consider concurrency limits when returning many signals

🎯 Common Use Cases

HTTP Request Processing

const httpRequest = collateral<{ method: string; url: string }>('http:request');
const requestValidated = collateral<{ method: string; url: string }>('request:validated');
const responseReady = collateral<{ status: number; body: any }>('response:ready');

const validator = neuron('validator', { requestValidated }).dendrite({
  collateral: httpRequest,
  response: (payload, axon) => {
    if (!payload.url.startsWith('https://')) {
      throw new Error('Only HTTPS URLs allowed');
    }
    return axon.requestValidated.createSignal(payload);
  }
});

const handler = neuron('handler', { responseReady }).dendrite({
  collateral: requestValidated,
  response: async (payload, axon) => {
    const response = await fetch(payload.url, { method: payload.method });
    const body = await response.json();
    return axon.responseReady.createSignal({ status: response.status, body });
  }
});

const cns = new CNS([validator, handler]);

Event Sourcing

const eventReceived = collateral<{ name: string; data: any }>('event:received');
const eventStored = collateral<{ eventId: string }>('event:stored');
const stateUpdated = collateral<{ aggregateId: string }>('state:updated');

const eventStore = neuron('event-store', { eventStored }).dendrite({
  collateral: eventReceived,
  response: async (payload, axon) => {
    const eventId = await saveEvent(payload);
    return axon.eventStored.createSignal({ eventId });
  }
});

const stateManager = neuron('state-manager', { stateUpdated }).dendrite({
  collateral: eventStored,
  response: async (payload, axon) => {
    const aggregateId = await updateState(payload.eventId);
    return axon.stateUpdated.createSignal({ aggregateId });
  }
});

const cns = new CNS([eventStore, stateManager]);

🚨 Error Handling

Errors are delivered immediately via onResponse callbacks:

await cns.stimulate(signal, {
  onResponse: (response) => {
    if (response.error) {
      console.error(`Error in neuron processing:`, response.error);

      // Log error details
      console.error(`Signal: ${response.outputSignal?.collateral.id}`);
      console.error(`Stimulation: ${response.stimulationId}`);

      // Handle specific error types
      if (response.error instanceof ValidationError) {
        handleValidationError(response.error);
      }
    }
  }
});

Error Recovery with Context:

let savedContext: ICNSStimulationContextStore | undefined;

await cns.stimulate(signal, {
  onResponse: (response) => {
    if (response.error) {
      // Save context for retry
      savedContext = response.contextStore;
    }
  }
});

// Retry with preserved context
if (savedContext) {
  await cns.stimulate(retrySignal, {
    ctx: savedContext
  });
}

🔧 Advanced Configuration

Custom Context Store

class RedisContextStore implements ICNSStimulationContextStore {
  constructor(private client: RedisClient, private sessionId: string) {}

  get<T>(): T | undefined {
    // Implement Redis-backed context retrieval
  }

  set<T>(value: T): void {
    // Implement Redis-backed context storage
  }
}

await cns.stimulate(signal, {
  createContextStore: () => new RedisContextStore(redisClient, 'session-123')
});

CNS Configuration

const cns = new CNS(neurons, {
  autoCleanupContexts: true, // Auto-cleanup unused contexts
  defaultConcurrency: 50     // Default concurrency limit
});

⚠️ Performance Warning: autoCleanupContexts adds computational overhead due to:

  • O(V²) initialization cost - building SCC (Strongly Connected Components) structures
  • O(1 + A) runtime cost per cleanup check (where A = number of SCC ancestors)
  • Memory overhead for storing SCC graphs and ancestor relationships

Use only when:

  • Memory leaks are a critical issue
  • You have a small to medium-sized neuron graph (< 1000 neurons)
  • Performance is less critical than memory management

For production systems, consider manual context cleanup or custom cleanup strategies instead.


CNStra provides deterministic, type-safe orchestration without the complexity of traditional event systems. Build reliable, maintainable reactive applications with clear data flow and predictable behavior.