Priostack · Engineering Blog · 10 March 2026 · 20 min read · Companion to Geometric Memory and Process Trajectories

EIP Pipelines and Geometric Memory:
Big-Data Corpus Building and Fast-Data Anomaly Routing

Deep dive Integration Engineering

The previous article explained what geometric memory is and why Fréchet distance is the right metric for comparing BPMN process trajectories. It ended with a question that article deliberately deferred: how do you actually plumb geometric memory into a production system that processes thousands of messages per second?

This article answers that question using pkg/integration — Priostack's implementation of the classic Enterprise Integration Patterns (EIP) catalogue. Together, the two articles cover the complete stack: geometric memory provides the mathematical substrate; EIP pipelines provide the operational infrastructure that feeds it at scale and routes its outputs in real time.

We will walk through two distinct integration topologies — a big-data path for high-volume historical corpus building and a fast-data path for sub-millisecond real-time anomaly routing — and show how each is expressed using the nine EIP primitives in pkg/integration.

1. Quick primer on pkg/integration primitives

pkg/integration implements nine EIP building blocks. They are plain Go structs — no goroutines, no hidden state machines. The Registry is a thread-safe catalogue that stores them. Your application code wires them together; Priostack's engine evaluates them during message dispatch. Here is the complete vocabulary:

Type EIP pattern One-line description
MessageChannelMessage ChannelNamed conduit; point-to-point or pub-sub delivery
MessageEndpointMessage EndpointConnects a service or process to a channel (inbound/outbound)
MessageRouterContent-Based RouterRoutes messages to channels based on FEEL predicates
MessageFilterMessage FilterDrops messages that do not satisfy a FEEL predicate
MessageTranslatorMessage TranslatorDeclares a FEEL-expression payload transformation
AggregatorAggregatorCollects correlated messages until a completion condition fires
SplitterSplitterFans out one message into many, one per list element
PipelinePipes and FiltersOrdered chain: each step is a registered EIP component
CorrelationContextCorrelation IdentifierMatches an incoming response message to a waiting process instance

All of these operate on *items.Message — the shared message type from pkg/items. A Message carries a typed Payload with a Variables map, plus metadata (ID, correlation key, timestamp). Crucially, it also carries a GeomTrace — and that is where the integration between EIP pipelines and geometric memory begins.

2. Messages carry geometry: pkg/items.Message and GeomTrace

In most integration frameworks, a message is a bag of bytes. In Priostack, a message is a bag of bytes plus a geometric history. The GeomTrace embedded in every Message is the same cell-ID sequence described in the companion article: an ordered list of int32 values, one per execution step that produced or consumed this message.

// pkg/items — abbreviated for clarity
type Message struct {
    ID             string
    CorrelationKey string
    Payload        *DataItem
    SentAt         time.Time
    // GeomTrace accumulates cell IDs as the message passes through
    // EIP components and process steps.  Each component that handles
    // the message appends its current BPM execution cell ID.
    GeomTrace      []int32
}

// A message produced at the boundary of a completed BPMN task looks like:
msg := &items.Message{
    ID:             "msg-loan-7821",
    CorrelationKey: "APP-7821",
    GeomTrace:      []int32{2, 5, 8, 11, 14}, // same cell IDs as the process instance
    Payload:        loanDataItem,
}

This design means that by the time a message reaches the geometric memory system, it carries all the information needed to register a corpus entry or compute an anomaly score — no separate lookup required. The message itself is the carrier of its own trajectory.

Why attach geometry to the message, not to the instance? Process execution and message passing are decoupled in an EIP architecture. An event from a completed process instance may travel through several channels, routers, and translators before it reaches the corpus service. If the geometry were attached only to the process instance record, you would need a database lookup at every hop. By attaching it to the message, the geometry travels with the data automatically — even across service boundaries.

3. The two topologies: big data vs fast data

Geometric memory requires two different integration topologies depending on where in the data lifecycle you are:

┌────────────────────────────────────────────────────────────────┐ │ BIG-DATA PATH (corpus building from historical records) │ │ │ │ Data warehouse / audit log │ │ │ │ │ [Inbound Endpoint] ──────────────────────────────────────► │ │ │ │ │ [Splitter] fan out → one message per process instance │ │ │ │ │ │ │ │ [Filter ×N] drop already-registered or gap-fill traces │ │ │ │ │ │ │ [Endpoint ×N] ──► corpus service RegisterSequence() │ │ │ │ │ [Aggregator] wait for all N registrations to complete │ │ │ │ │ [Outbound Endpoint] ──► "corpus ready" event │ └────────────────────────────────────────────────────────────────┘ ┌────────────────────────────────────────────────────────────────┐ │ FAST-DATA PATH (real-time anomaly detection per instance) │ │ │ │ Live BPMN engine (instance completed event) │ │ │ │ │ [Inbound Endpoint] │ │ │ │ │ [Pipeline] │ │ step 1: [Filter] pass only completed instances │ │ step 2: [Translator] reshape BPMN event → score request │ │ step 3: [Router] route by score threshold │ │ │ │ │ │ score ≤ 0.5 score > 0.5 │ │ [ch-normal-instances] [ch-anomalous-instances] │ │ │ │ │ │ register corpus open CMMN fraud case │ │ │ │ │ [CorrelationContext] │ │ reconnect async score → waiting case │ └────────────────────────────────────────────────────────────────┘

Both topologies are expressed using the same nine primitives and the same Registry. The difference is not in the infrastructure but in how many components you chain and what FEEL conditions you write.

4. Big-data path: bulk corpus ingestion with Splitter and Aggregator

When you first deploy Priostack against an existing process, your geometric memory corpus is empty. You need to bootstrap it from historical records. A typical source is a process audit log or data warehouse export: a single bulk message containing an array of process instance records, each with its cell-ID trace.

Step 1: declare the channels

reg := integration.NewRegistry()

// Source: bulk import arrives here
reg.RegisterChannel(&integration.MessageChannel{
    ID:        "ch-bulk-import",
    Name:      "Bulk Corpus Import",
    Kind:      integration.ChannelPointToPoint,
    ItemDefID: "def-process-batch",
})

// Fan-out: one channel per individual trace record
reg.RegisterChannel(&integration.MessageChannel{
    ID:        "ch-trace-record",
    Name:      "Individual Trace Record",
    Kind:      integration.ChannelPointToPoint,
    ItemDefID: "def-process-instance-trace",
})

// Corpus service output
reg.RegisterChannel(&integration.MessageChannel{
    ID:        "ch-registration-ack",
    Name:      "Corpus Registration Ack",
    Kind:      integration.ChannelPointToPoint,
    ItemDefID: "def-registration-result",
})

// Final "corpus-ready" signal (pub-sub so multiple consumers can react)
reg.RegisterChannel(&integration.MessageChannel{
    ID:        "ch-corpus-ready",
    Name:      "Corpus Ready",
    Kind:      integration.ChannelPubSub,
    ItemDefID: "def-corpus-ready-event",
})

Step 2: declare the Splitter

The Splitter takes the bulk import message and produces one output message per element in payload.Variables["instances"]. The FEEL expression "instances" names the list field to split on.

reg.RegisterSplitter(&integration.Splitter{
    ID:              "split-corpus-batch",
    Name:            "Split Corpus Batch",
    SplitExpr:       "instances",         // FEEL: name of the list in Payload.Variables
    OutputChannelID: "ch-trace-record",   // each element goes here as its own message
})

When the engine evaluates this Splitter, it iterates over msg.Payload.Variables["instances"], creates a new *items.Message per element, copies the parent message's CorrelationKey (so the downstream Aggregator can group them), and publishes each to ch-trace-record.

Step 3: declare the Filter (drop already-registered traces)

reg.RegisterFilter(&integration.MessageFilter{
    ID:        "filter-unregistered",
    Name:      "Pass Only Unregistered Traces",
    // FEEL: only pass messages where the trace has not yet been seen
    // The corpus service sets "already_registered" = true on known hashes
    Predicate: "already_registered != true",
    ChannelID: "ch-trace-record",
})

This is important for incremental re-runs: if you re-import a batch that partially overlaps with existing corpus content, the filter prevents duplicate registrations from polluting the centroid.

Step 4: declare the Aggregator

The Aggregator groups all acknowledgement messages from the corpus service by their CorrelationKey (which equals the batch ID set by the Splitter) and waits until all expected registrations have arrived before emitting a single "corpus ready" event.

reg.RegisterAggregator(&integration.Aggregator{
    ID:   "agg-corpus-build",
    Name: "Corpus Build Aggregator",

    // FEEL: which batch does this acknowledgement belong to?
    CorrelationExpr: "batchID",

    // FEEL: release when all expected registrations are in
    // (totalExpected is set on each ack by the corpus service)
    CompletionCond: "registeredCount == totalExpected",

    // Safety valve: release after 30 minutes even if not all arrived
    Timeout: 30 * time.Minute,

    OutputChannelID: "ch-corpus-ready",
})
Why not just wait synchronously? In production, even a "small" historical corpus might contain 50 000 records. Registering each one involves a cell-ID hash computation and a 64-D centroid update. At 10 000 registrations per second, a 50 000-record batch takes ~5 seconds. The Aggregator lets that work happen concurrently across multiple corpus service workers while your application remains responsive.

Step 5: wire it into a Pipeline

reg.RegisterPipeline(&integration.Pipeline{
    ID:   "pipe-corpus-ingestion",
    Name: "Corpus Ingestion Pipeline",
    Steps: []integration.PipelineStep{
        {ComponentID: "split-corpus-batch",  ComponentKind: integration.ComponentSplitter},
        {ComponentID: "filter-unregistered", ComponentKind: integration.ComponentFilter},
        // The aggregator waits for acks on ch-registration-ack; it is
        // registered separately and not part of this linear pipeline
    },
})

End-to-end data flow

// 1. Your batch job sends one message to ch-bulk-import:
batchMsg := &items.Message{
    ID:             "batch-2026-03-10",
    CorrelationKey: "batch-2026-03-10", // used by Aggregator
    Payload: &items.DataItem{
        Variables: map[string]interface{}{
            "instances": []map[string]interface{}{
                {"traceID": "inst-001", "cells": []int32{2, 5, 8, 11, 14}},
                {"traceID": "inst-002", "cells": []int32{2, 5, 8, 9, 13, 14}},
                // ... up to N instances
            },
        },
    },
}

// 2. The Splitter fans out → N individual messages on ch-trace-record
// 3. The Filter drops any already-registered ones
// 4. The corpus service registers each remaining trace → ack on ch-registration-ack
// 5. The Aggregator fires when registeredCount == totalExpected
// 6. "corpus ready" is published on ch-corpus-ready (pub-sub)
// 7. All subscribers (analytics dashboard, alerting, downstream pipelines) react

5. Fast-data path: real-time routing with ContentBasedRouter and CorrelationContext

The fast-data path is the operational core of Priostack's anomaly detection. When a BPMN process instance completes, the engine emits a completion event carrying the instance's cell-ID trace. That event must be:

  1. Scored against the geometric memory corpus
  2. Routed to the appropriate downstream channel based on the score
  3. If anomalous, correlated with a waiting CMMN fraud investigation case

All of this must happen in under 2 ms to avoid adding latency to the user-facing process flow.

The ContentBasedRouter with FEEL conditions

The MessageRouter evaluates its Routes in descending priority order. The first route whose FEEL condition evaluates true against the message's Payload.Variables wins.

reg.RegisterRouter(&integration.MessageRouter{
    ID:   "router-anomaly-score",
    Name: "Anomaly Score Router",
    Routes: []integration.Route{
        {
            // High-confidence anomaly: score above 0.8
            ChannelID: "ch-confirmed-anomaly",
            Condition: "anomalyScore > 0.8",
            Priority:  100,
        },
        {
            // Uncertain: score in 0.5-0.8 range → human review queue
            ChannelID: "ch-review-queue",
            Condition: "anomalyScore > 0.5",
            Priority:  50,
        },
        {
            // Default: score ≤ 0.5 → normal, register in corpus
            ChannelID: "ch-normal-instances",
            Condition: "", // empty Condition = catch-all
            Priority:  0,
        },
    },
})
FEEL expressions have access to all Payload.Variables The condition anomalyScore > 0.8 is evaluated against a map[string]interface{}. Your corpus service populates anomalyScore, frechetDistance, and corpusSize before the router sees the message. You can write conditions as complex as anomalyScore > 0.5 and frechetDistance > 20 and corpusSize >= 10 to suppress false positives when the corpus is still small.

Routing based on corpus size (false-positive suppression)

With a small corpus the anomaly score is noisy, as described in the companion article. An important operational pattern is to suppress routing to the fraud queue until the corpus has reached a minimum size:

Routes: []integration.Route{
    {
        // Only route to fraud queue when corpus is mature AND score is high
        ChannelID: "ch-confirmed-anomaly",
        Condition: "anomalyScore > 0.8 and corpusSize >= 50",
        Priority:  100,
    },
    {
        // Corpus still warming up: threshold is tighter, route to review instead
        ChannelID: "ch-review-queue",
        Condition: "anomalyScore > 0.5 and corpusSize < 50",
        Priority:  90,
    },
    {
        // Normal: add to corpus unconditionally (builds it up)
        ChannelID: "ch-normal-instances",
        Condition: "",
        Priority:  0,
    },
},

This means the system operates safely from day one: even with only 3 seed corpus entries it will not incorrectly flag thousands of legitimate transactions as fraud. As the corpus grows past 50, the router switches to the higher-confidence path automatically.

6. Pipeline composition: chaining Filter → Router → Endpoint

Individual EIP components are powerful in isolation, but the Pipeline primitive (Pipes-and-Filters pattern) is where they become composable. A pipeline is an ordered sequence of component references; messages are passed through each step in list order.

// Declare the full fast-data pipeline
reg.RegisterPipeline(&integration.Pipeline{
    ID:   "pipe-anomaly-detection",
    Name: "Real-Time Anomaly Detection Pipeline",
    Steps: []integration.PipelineStep{
        // 1. Only pass messages from completed instances (not suspended/cancelled)
        {ComponentID: "filter-completed-only",   ComponentKind: integration.ComponentFilter},
        // 2. Translate BPMN completion event schema → anomaly score request schema
        {ComponentID: "tr-bpmn-to-score-req",    ComponentKind: integration.ComponentTranslator},
        // 3. Score against corpus (this step calls the corpus service inline)
        {ComponentID: "ep-corpus-scorer",         ComponentKind: integration.ComponentEndpoint},
        // 4. Route by score to fraud/review/normal channels
        {ComponentID: "router-anomaly-score",    ComponentKind: integration.ComponentRouter},
    },
})

The engine executes step 1 through step 4 for each incoming message. If the filter at step 1 rejects the message, execution stops there and nothing downstream sees it. If the translator at step 2 produces a message on a different channel (translated schema), the router at step 4 sees the translated version.

Component declarations to support the pipeline

// Step 1: filter — only completed instances
reg.RegisterFilter(&integration.MessageFilter{
    ID:        "filter-completed-only",
    Name:      "Pass Completed Instances Only",
    Predicate: "instanceStatus == \"completed\"",
    ChannelID: "ch-bpmn-completion-events",
})

// Steps 2 → translator (declared below in section 7)

// Step 3: outbound endpoint — calls corpus service
reg.RegisterEndpoint(&integration.MessageEndpoint{
    ID:         "ep-corpus-scorer",
    Name:       "Corpus Scorer",
    ChannelID:  "ch-score-requests",
    Direction:  integration.EndpointOutbound,
    ServiceRef: "svc-geometric-memory", // matches ea.BusinessService.ID
})

// Step 4: router (declared above)

7. MessageTranslator: bridging BPMN event schemas to anomaly score schemas

BPMN completion events have a rich schema: process definition ID, instance key, variable snapshot, timestamp, SLA deadline, compliance flags. The corpus scorer expects a far simpler schema: just the cell-ID trace and a correlation key. The MessageTranslator declares this mapping using a FEEL expression.

reg.RegisterTranslator(&integration.MessageTranslator{
    ID:            "tr-bpmn-to-score-req",
    Name:          "BPMN Event → Score Request",
    SourceItemDef: "def-bpmn-completion-event",   // rich BPMN schema
    TargetItemDef: "def-corpus-score-request",     // lean scorer schema
    // FEEL expression: project the fields we need
    MappingExpr: `{
        correlationKey: instanceKey,
        cellTrace:      geometricTrace,
        processDefID:   processDefinitionID,
        completedAt:    completionTimestamp
    }`,
})

The engine evaluates MappingExpr against the source message's Payload.Variables to produce the target Variables map. The translated message retains the parent's GeomTrace (the cell-ID sequence is copied verbatim), so the scorer can operate on msg.GeomTrace directly without parsing any FEEL.

Why declare mapping intent rather than implement it?

A MessageTranslator is a declaration of intent, not an implementation. The FEEL expression in MappingExpr is evaluated by the BPM engine, not by your application code. This means:

8. Async corpus scoring and the CorrelationContext reconnect pattern

In the fast-data path, the corpus scorer is called inline and returns in under 1 ms for small corpora. But for large corpora — tens of thousands of registered trajectories — the Fréchet distance computation is O(mn) for each corpus entry, and the total scoring time may rise to 50–200 ms. That is too slow to hold open a synchronous connection.

The solution is to make scoring asynchronous: the pipeline submits a score request to a queue and the BPMN process suspends at a ReceiveTask. When the corpus service completes the scoring, it posts the result to a reply channel. The CorrelationContext matches the result back to the waiting process instance.

Synchronous path (corpus ≤ 1 000 entries, <1 ms): BPMN task ──► Endpoint ──► corpus service ──► score ──► Router ──► done Asynchronous path (corpus > 1 000 entries, 50–200 ms): BPMN task ReceiveTask │ │ [Outbound Endpoint] │ ← suspended, waiting for score │ │ ▼ [CorrelationContext] ch-score-requests │ │ │ corpus service (async worker pool) │ │ │ [ch-score-results] ──────────────────►│ Match(correlationKey) │ instance resumes with anomalyScore in Variables

Declaring the CorrelationContext

// The key expression extracts the correlationKey field from the
// scored result message's Payload.Variables.
scoringCtx := integration.NewCorrelationContext("correlationKey")

// When the process reaches the ReceiveTask:
instanceKey  := "inst-loan-7821"
resumePlace  := "after_score_received"   // BPMN place to resume from
correlKey    := "APP-7821"               // same key set on the score request

scoringCtx.Register(correlKey, instanceKey, resumePlace)

// ... (corpus service runs async) ...

// When the score result arrives on ch-score-results:
result := receiveFromChannel("ch-score-results")
correlKey := result.Payload.Variables["correlationKey"].(string)
entry := scoringCtx.Match(correlKey)
if entry != nil {
    // Inject the score into the waiting instance and resume
    engine.ResumeInstance(entry.InstanceKey, entry.ResumePlace, map[string]interface{}{
        "anomalyScore":    result.Payload.Variables["anomalyScore"],
        "frechetDistance": result.Payload.Variables["frechetDistance"],
    })
}
The CorrelationContext is a one-shot match Match(key) removes the entry after the first match — guaranteeing that each score result resumes exactly one process instance, even under high concurrency. If you need fan-out (one score triggers multiple instances), register each instance separately under a unique key.

Timeout handling

The Aggregator has a built-in Timeout field. The CorrelationContext does not — expiry is your application's responsibility. A common pattern is to combine the CorrelationContext with a BPMN boundary timer event on the ReceiveTask: if no score arrives within 5 seconds, the timer fires and the instance takes the exception path (typically routed to manual review).

9. The Registry as the single source of truth

The Registry is a thread-safe, in-memory catalogue that stores every declared EIP component. It is not a runtime bus — it does not move messages itself. Think of it as a configuration store that the engine consults when dispatching messages.

reg := integration.NewRegistry()

// Register everything during startup (order does not matter)
reg.RegisterChannel(...)
reg.RegisterFilter(...)
reg.RegisterRouter(...)
reg.RegisterTranslator(...)
reg.RegisterAggregator(...)
reg.RegisterSplitter(...)
reg.RegisterEndpoint(...)
reg.RegisterPipeline(...)

// Lookups are O(1) map reads; thread-safe via sync.RWMutex
pipe := reg.LookupPipeline("pipe-anomaly-detection")
router := reg.LookupRouter("router-anomaly-score")

// All endpoints for a given service (useful for service mesh registration)
eps := reg.EndpointsForService("svc-geometric-memory")
// → returns only inbound endpoints for that service

Duplicate detection

Every Register* method returns an error if the ID is already registered. This prevents silent overwrites during misconfigured restarts:

if err := reg.RegisterChannel(ch); err != nil {
    // "integration: channel "ch-bulk-import" already registered"
    log.Fatal(err)
}

One Registry per process, multiple Registries in tests

In production, create one Registry at application startup and share it as a dependency. In tests, create a fresh Registry per test to avoid state leakage between test cases. The zero-cost constructor (integration.NewRegistry()) makes this practical:

func TestMyPipeline(t *testing.T) {
    reg := integration.NewRegistry() // fresh per test
    // register only the components this test needs
    reg.RegisterFilter(&integration.MessageFilter{
        ID:        "f-test",
        Predicate: "amount > 10000",
        ChannelID: "ch-high-value",
    })
    // exercise the component in isolation
}

10. Operational characteristics and tuning guidance

Memory footprint

The Registry holds all components in maps. An average production setup has 10–30 channels, 5–15 endpoints, 3–10 routers, 2–5 aggregators, and 1–3 pipelines. At typical struct sizes this is well under 1 MB. The corpus itself (cell-ID sequences) grows linearly with registered traces but is managed by the geometric memory system, not the Registry.

FEEL expression performance

FEEL predicate evaluation (PassesFilter, route conditions, aggregator completion conditions) is implemented as a lightweight interpreter. Simple comparisons (score > 0.5) are O(1). Compound conditions (a and b and c) are O(n) where n is the number of conjuncts. Avoid FEEL expressions that iterate over arrays in hot paths — those are O(m × n) and should be pre-computed before the message enters the pipeline.

Splitter fan-out limits

There is no built-in fan-out limit on the Splitter. For batch sizes above 10 000 records, apply back-pressure at the source: split the batch into chunks before sending to ch-bulk-import, and let each chunk run through the pipeline independently. This keeps the Aggregator's in-memory correlation table bounded.

<1ms
pipeline execution for small corpora (<1 000 entries)
50–200ms
async scoring for large corpora (10 000+ entries)
<1MB
Registry memory for a typical production setup
O(1)
all Registry lookup operations

Serialisation and process-definition alignment

Every EIP struct in pkg/integration has JSON tags. The full Registry can be serialised and restored at startup from a configuration file, a database, or a remote config service. The only binding between an EIP component and a BPMN process definition is the ServiceRef on MessageEndpoint (which must match an ea.BusinessService.ID) and channel IDs in the process definition's sourceRef / targetRef. Neither side validates the other at registration time — mismatches surface at runtime when message dispatch tries a lookup and gets nil.

When to use a Pipeline vs a direct channel

Use Scenario
Direct channel Simple point-to-point forwarding with no transformation or routing
Filter alone Noise reduction: one channel, one predicate, no further processing
Router alone Fan-out to N channels based on message content
Pipeline Any sequence of ≥ 2 EIP steps that must be executed in order
Aggregator + Pipeline Collecting results from parallel workers before proceeding

Conclusion

pkg/integration transforms geometric memory from a per-instance computation into a production-grade data infrastructure.

The two topologies described in this article cover the complete lifecycle:

Both topologies share the same Registry, the same *items.Message carrier type, and the same FEEL expression language. There is no separate integration server, no schema registry, and no deployment pipeline for configuration changes — the Registry is populated in Go code at startup, alongside your BPMN process deployment.

To see both topologies in action, start with the agentic credit tutorial, which runs the complete fast-data path. Then read the companion article for the mathematical details on cell IDs, shape vectors, and Fréchet distance that power the scoring step at the heart of both pipelines.

Further reading The canonical reference for all nine EIP patterns is Hohpe & Woolf's Enterprise Integration Patterns (Addison-Wesley, 2003). Each pattern in pkg/integration maps directly to a pattern in that book. The FEEL expression language used in predicates and mapping expressions is defined by the OMG DMN 1.3 specification.

Priostack Engineering

Technical deep-dives on process automation, workflow engines, and the systems behind Priostack.