EIP Pipelines and Geometric Memory:
Big-Data Corpus Building and Fast-Data Anomaly Routing
The previous article explained what geometric memory is and why Fréchet distance is the right metric for comparing BPMN process trajectories. It ended with a question that article deliberately deferred: how do you actually plumb geometric memory into a production system that processes thousands of messages per second?
This article answers that question using pkg/integration —
Priostack's implementation of the classic
Enterprise Integration Patterns
(EIP) catalogue. Together, the two articles cover the complete stack:
geometric memory provides the mathematical substrate; EIP pipelines provide
the operational infrastructure that feeds it at scale and routes its
outputs in real time.
We will walk through two distinct integration topologies — a
big-data path for high-volume historical corpus building and
a fast-data path for sub-millisecond real-time anomaly
routing — and show how each is expressed using the nine EIP primitives in
pkg/integration.
Contents
- Quick primer on pkg/integration primitives
- Messages carry geometry: pkg/items.Message and GeomTrace
- The two topologies: big data vs fast data
- Big-data path: bulk corpus ingestion with Splitter and Aggregator
- Fast-data path: real-time routing with ContentBasedRouter and CorrelationContext
- Pipeline composition: chaining Filter → Router → Endpoint
- MessageTranslator: bridging BPMN event schemas to anomaly score schemas
- Async corpus scoring and the CorrelationContext reconnect pattern
- The Registry as the single source of truth
- Operational characteristics and tuning guidance
1. Quick primer on pkg/integration primitives
pkg/integration implements nine EIP building blocks. They are
plain Go structs — no goroutines, no hidden state machines. The
Registry is a thread-safe catalogue that stores them. Your
application code wires them together; Priostack's engine evaluates them
during message dispatch. Here is the complete vocabulary:
| Type | EIP pattern | One-line description |
|---|---|---|
MessageChannel | Message Channel | Named conduit; point-to-point or pub-sub delivery |
MessageEndpoint | Message Endpoint | Connects a service or process to a channel (inbound/outbound) |
MessageRouter | Content-Based Router | Routes messages to channels based on FEEL predicates |
MessageFilter | Message Filter | Drops messages that do not satisfy a FEEL predicate |
MessageTranslator | Message Translator | Declares a FEEL-expression payload transformation |
Aggregator | Aggregator | Collects correlated messages until a completion condition fires |
Splitter | Splitter | Fans out one message into many, one per list element |
Pipeline | Pipes and Filters | Ordered chain: each step is a registered EIP component |
CorrelationContext | Correlation Identifier | Matches an incoming response message to a waiting process instance |
All of these operate on *items.Message — the shared message
type from pkg/items. A Message carries a typed
Payload with a Variables map, plus metadata
(ID, correlation key, timestamp). Crucially, it also carries a
GeomTrace — and that is where the integration between EIP
pipelines and geometric memory begins.
2. Messages carry geometry: pkg/items.Message and GeomTrace
In most integration frameworks, a message is a bag of bytes. In Priostack,
a message is a bag of bytes plus a geometric history. The
GeomTrace embedded in every Message is the same
cell-ID sequence described in the companion article: an ordered list of
int32 values, one per execution step that produced or consumed
this message.
// pkg/items — abbreviated for clarity
type Message struct {
ID string
CorrelationKey string
Payload *DataItem
SentAt time.Time
// GeomTrace accumulates cell IDs as the message passes through
// EIP components and process steps. Each component that handles
// the message appends its current BPM execution cell ID.
GeomTrace []int32
}
// A message produced at the boundary of a completed BPMN task looks like:
msg := &items.Message{
ID: "msg-loan-7821",
CorrelationKey: "APP-7821",
GeomTrace: []int32{2, 5, 8, 11, 14}, // same cell IDs as the process instance
Payload: loanDataItem,
}
This design means that by the time a message reaches the geometric memory system, it carries all the information needed to register a corpus entry or compute an anomaly score — no separate lookup required. The message itself is the carrier of its own trajectory.
3. The two topologies: big data vs fast data
Geometric memory requires two different integration topologies depending on where in the data lifecycle you are:
Both topologies are expressed using the same nine primitives and the same
Registry. The difference is not in the infrastructure but in
how many components you chain and what FEEL conditions you write.
4. Big-data path: bulk corpus ingestion with Splitter and Aggregator
When you first deploy Priostack against an existing process, your geometric memory corpus is empty. You need to bootstrap it from historical records. A typical source is a process audit log or data warehouse export: a single bulk message containing an array of process instance records, each with its cell-ID trace.
Step 1: declare the channels
reg := integration.NewRegistry()
// Source: bulk import arrives here
reg.RegisterChannel(&integration.MessageChannel{
ID: "ch-bulk-import",
Name: "Bulk Corpus Import",
Kind: integration.ChannelPointToPoint,
ItemDefID: "def-process-batch",
})
// Fan-out: one channel per individual trace record
reg.RegisterChannel(&integration.MessageChannel{
ID: "ch-trace-record",
Name: "Individual Trace Record",
Kind: integration.ChannelPointToPoint,
ItemDefID: "def-process-instance-trace",
})
// Corpus service output
reg.RegisterChannel(&integration.MessageChannel{
ID: "ch-registration-ack",
Name: "Corpus Registration Ack",
Kind: integration.ChannelPointToPoint,
ItemDefID: "def-registration-result",
})
// Final "corpus-ready" signal (pub-sub so multiple consumers can react)
reg.RegisterChannel(&integration.MessageChannel{
ID: "ch-corpus-ready",
Name: "Corpus Ready",
Kind: integration.ChannelPubSub,
ItemDefID: "def-corpus-ready-event",
})
Step 2: declare the Splitter
The Splitter takes the bulk import message and produces one
output message per element in payload.Variables["instances"].
The FEEL expression "instances" names the list field to split on.
reg.RegisterSplitter(&integration.Splitter{
ID: "split-corpus-batch",
Name: "Split Corpus Batch",
SplitExpr: "instances", // FEEL: name of the list in Payload.Variables
OutputChannelID: "ch-trace-record", // each element goes here as its own message
})
When the engine evaluates this Splitter, it iterates over
msg.Payload.Variables["instances"], creates a new
*items.Message per element, copies the parent message's
CorrelationKey (so the downstream Aggregator can group them),
and publishes each to ch-trace-record.
Step 3: declare the Filter (drop already-registered traces)
reg.RegisterFilter(&integration.MessageFilter{
ID: "filter-unregistered",
Name: "Pass Only Unregistered Traces",
// FEEL: only pass messages where the trace has not yet been seen
// The corpus service sets "already_registered" = true on known hashes
Predicate: "already_registered != true",
ChannelID: "ch-trace-record",
})
This is important for incremental re-runs: if you re-import a batch that partially overlaps with existing corpus content, the filter prevents duplicate registrations from polluting the centroid.
Step 4: declare the Aggregator
The Aggregator groups all acknowledgement messages from the
corpus service by their CorrelationKey (which equals the batch
ID set by the Splitter) and waits until all expected registrations have
arrived before emitting a single "corpus ready" event.
reg.RegisterAggregator(&integration.Aggregator{
ID: "agg-corpus-build",
Name: "Corpus Build Aggregator",
// FEEL: which batch does this acknowledgement belong to?
CorrelationExpr: "batchID",
// FEEL: release when all expected registrations are in
// (totalExpected is set on each ack by the corpus service)
CompletionCond: "registeredCount == totalExpected",
// Safety valve: release after 30 minutes even if not all arrived
Timeout: 30 * time.Minute,
OutputChannelID: "ch-corpus-ready",
})
Step 5: wire it into a Pipeline
reg.RegisterPipeline(&integration.Pipeline{
ID: "pipe-corpus-ingestion",
Name: "Corpus Ingestion Pipeline",
Steps: []integration.PipelineStep{
{ComponentID: "split-corpus-batch", ComponentKind: integration.ComponentSplitter},
{ComponentID: "filter-unregistered", ComponentKind: integration.ComponentFilter},
// The aggregator waits for acks on ch-registration-ack; it is
// registered separately and not part of this linear pipeline
},
})
End-to-end data flow
// 1. Your batch job sends one message to ch-bulk-import:
batchMsg := &items.Message{
ID: "batch-2026-03-10",
CorrelationKey: "batch-2026-03-10", // used by Aggregator
Payload: &items.DataItem{
Variables: map[string]interface{}{
"instances": []map[string]interface{}{
{"traceID": "inst-001", "cells": []int32{2, 5, 8, 11, 14}},
{"traceID": "inst-002", "cells": []int32{2, 5, 8, 9, 13, 14}},
// ... up to N instances
},
},
},
}
// 2. The Splitter fans out → N individual messages on ch-trace-record
// 3. The Filter drops any already-registered ones
// 4. The corpus service registers each remaining trace → ack on ch-registration-ack
// 5. The Aggregator fires when registeredCount == totalExpected
// 6. "corpus ready" is published on ch-corpus-ready (pub-sub)
// 7. All subscribers (analytics dashboard, alerting, downstream pipelines) react
5. Fast-data path: real-time routing with ContentBasedRouter and CorrelationContext
The fast-data path is the operational core of Priostack's anomaly detection. When a BPMN process instance completes, the engine emits a completion event carrying the instance's cell-ID trace. That event must be:
- Scored against the geometric memory corpus
- Routed to the appropriate downstream channel based on the score
- If anomalous, correlated with a waiting CMMN fraud investigation case
All of this must happen in under 2 ms to avoid adding latency to the user-facing process flow.
The ContentBasedRouter with FEEL conditions
The MessageRouter evaluates its Routes in
descending priority order. The first route whose FEEL condition evaluates
true against the message's Payload.Variables wins.
reg.RegisterRouter(&integration.MessageRouter{
ID: "router-anomaly-score",
Name: "Anomaly Score Router",
Routes: []integration.Route{
{
// High-confidence anomaly: score above 0.8
ChannelID: "ch-confirmed-anomaly",
Condition: "anomalyScore > 0.8",
Priority: 100,
},
{
// Uncertain: score in 0.5-0.8 range → human review queue
ChannelID: "ch-review-queue",
Condition: "anomalyScore > 0.5",
Priority: 50,
},
{
// Default: score ≤ 0.5 → normal, register in corpus
ChannelID: "ch-normal-instances",
Condition: "", // empty Condition = catch-all
Priority: 0,
},
},
})
anomalyScore > 0.8 is evaluated against a
map[string]interface{}. Your corpus service populates
anomalyScore, frechetDistance, and
corpusSize before the router sees the message. You can write
conditions as complex as
anomalyScore > 0.5 and frechetDistance > 20 and corpusSize >= 10
to suppress false positives when the corpus is still small.
Routing based on corpus size (false-positive suppression)
With a small corpus the anomaly score is noisy, as described in the companion article. An important operational pattern is to suppress routing to the fraud queue until the corpus has reached a minimum size:
Routes: []integration.Route{
{
// Only route to fraud queue when corpus is mature AND score is high
ChannelID: "ch-confirmed-anomaly",
Condition: "anomalyScore > 0.8 and corpusSize >= 50",
Priority: 100,
},
{
// Corpus still warming up: threshold is tighter, route to review instead
ChannelID: "ch-review-queue",
Condition: "anomalyScore > 0.5 and corpusSize < 50",
Priority: 90,
},
{
// Normal: add to corpus unconditionally (builds it up)
ChannelID: "ch-normal-instances",
Condition: "",
Priority: 0,
},
},
This means the system operates safely from day one: even with only 3 seed corpus entries it will not incorrectly flag thousands of legitimate transactions as fraud. As the corpus grows past 50, the router switches to the higher-confidence path automatically.
6. Pipeline composition: chaining Filter → Router → Endpoint
Individual EIP components are powerful in isolation, but the
Pipeline primitive (Pipes-and-Filters pattern) is where they
become composable. A pipeline is an ordered sequence of component
references; messages are passed through each step in list order.
// Declare the full fast-data pipeline
reg.RegisterPipeline(&integration.Pipeline{
ID: "pipe-anomaly-detection",
Name: "Real-Time Anomaly Detection Pipeline",
Steps: []integration.PipelineStep{
// 1. Only pass messages from completed instances (not suspended/cancelled)
{ComponentID: "filter-completed-only", ComponentKind: integration.ComponentFilter},
// 2. Translate BPMN completion event schema → anomaly score request schema
{ComponentID: "tr-bpmn-to-score-req", ComponentKind: integration.ComponentTranslator},
// 3. Score against corpus (this step calls the corpus service inline)
{ComponentID: "ep-corpus-scorer", ComponentKind: integration.ComponentEndpoint},
// 4. Route by score to fraud/review/normal channels
{ComponentID: "router-anomaly-score", ComponentKind: integration.ComponentRouter},
},
})
The engine executes step 1 through step 4 for each incoming message. If the filter at step 1 rejects the message, execution stops there and nothing downstream sees it. If the translator at step 2 produces a message on a different channel (translated schema), the router at step 4 sees the translated version.
Component declarations to support the pipeline
// Step 1: filter — only completed instances
reg.RegisterFilter(&integration.MessageFilter{
ID: "filter-completed-only",
Name: "Pass Completed Instances Only",
Predicate: "instanceStatus == \"completed\"",
ChannelID: "ch-bpmn-completion-events",
})
// Steps 2 → translator (declared below in section 7)
// Step 3: outbound endpoint — calls corpus service
reg.RegisterEndpoint(&integration.MessageEndpoint{
ID: "ep-corpus-scorer",
Name: "Corpus Scorer",
ChannelID: "ch-score-requests",
Direction: integration.EndpointOutbound,
ServiceRef: "svc-geometric-memory", // matches ea.BusinessService.ID
})
// Step 4: router (declared above)
7. MessageTranslator: bridging BPMN event schemas to anomaly score schemas
BPMN completion events have a rich schema: process definition ID, instance
key, variable snapshot, timestamp, SLA deadline, compliance flags.
The corpus scorer expects a far simpler schema: just the cell-ID trace and
a correlation key. The MessageTranslator declares this mapping
using a FEEL expression.
reg.RegisterTranslator(&integration.MessageTranslator{
ID: "tr-bpmn-to-score-req",
Name: "BPMN Event → Score Request",
SourceItemDef: "def-bpmn-completion-event", // rich BPMN schema
TargetItemDef: "def-corpus-score-request", // lean scorer schema
// FEEL expression: project the fields we need
MappingExpr: `{
correlationKey: instanceKey,
cellTrace: geometricTrace,
processDefID: processDefinitionID,
completedAt: completionTimestamp
}`,
})
The engine evaluates MappingExpr against the source message's
Payload.Variables to produce the target Variables
map. The translated message retains the parent's GeomTrace
(the cell-ID sequence is copied verbatim), so the scorer can operate on
msg.GeomTrace directly without parsing any FEEL.
Why declare mapping intent rather than implement it?
A MessageTranslator is a declaration of intent, not
an implementation. The FEEL expression in MappingExpr is
evaluated by the BPM engine, not by your application code. This means:
- Auditability: every schema transformation is recorded in the registry and visible to tooling. You can inspect all translators and trace how a field value moved from one schema to another.
-
Version independence: when the BPMN completion event
schema gains a new field, you update the
MappingExprin one place rather than touching every consumer. -
Testing: you can unit-test a
MessageTranslatorby evaluating its FEEL expression against a fixture message without running the full BPM engine.
8. Async corpus scoring and the CorrelationContext reconnect pattern
In the fast-data path, the corpus scorer is called inline and returns in under 1 ms for small corpora. But for large corpora — tens of thousands of registered trajectories — the Fréchet distance computation is O(mn) for each corpus entry, and the total scoring time may rise to 50–200 ms. That is too slow to hold open a synchronous connection.
The solution is to make scoring asynchronous: the pipeline submits a score
request to a queue and the BPMN process suspends at a ReceiveTask.
When the corpus service completes the scoring, it posts the result to a
reply channel. The CorrelationContext matches the result back
to the waiting process instance.
Declaring the CorrelationContext
// The key expression extracts the correlationKey field from the
// scored result message's Payload.Variables.
scoringCtx := integration.NewCorrelationContext("correlationKey")
// When the process reaches the ReceiveTask:
instanceKey := "inst-loan-7821"
resumePlace := "after_score_received" // BPMN place to resume from
correlKey := "APP-7821" // same key set on the score request
scoringCtx.Register(correlKey, instanceKey, resumePlace)
// ... (corpus service runs async) ...
// When the score result arrives on ch-score-results:
result := receiveFromChannel("ch-score-results")
correlKey := result.Payload.Variables["correlationKey"].(string)
entry := scoringCtx.Match(correlKey)
if entry != nil {
// Inject the score into the waiting instance and resume
engine.ResumeInstance(entry.InstanceKey, entry.ResumePlace, map[string]interface{}{
"anomalyScore": result.Payload.Variables["anomalyScore"],
"frechetDistance": result.Payload.Variables["frechetDistance"],
})
}
Match(key) removes the entry after the first match —
guaranteeing that each score result resumes exactly one process instance,
even under high concurrency. If you need fan-out (one score triggers
multiple instances), register each instance separately under a unique key.
Timeout handling
The Aggregator has a built-in Timeout field.
The CorrelationContext does not — expiry is your application's
responsibility. A common pattern is to combine the CorrelationContext
with a BPMN boundary timer event on the ReceiveTask: if no
score arrives within 5 seconds, the timer fires and the instance takes the
exception path (typically routed to manual review).
9. The Registry as the single source of truth
The Registry is a thread-safe, in-memory catalogue that stores
every declared EIP component. It is not a runtime bus — it does not move
messages itself. Think of it as a configuration store that the
engine consults when dispatching messages.
reg := integration.NewRegistry()
// Register everything during startup (order does not matter)
reg.RegisterChannel(...)
reg.RegisterFilter(...)
reg.RegisterRouter(...)
reg.RegisterTranslator(...)
reg.RegisterAggregator(...)
reg.RegisterSplitter(...)
reg.RegisterEndpoint(...)
reg.RegisterPipeline(...)
// Lookups are O(1) map reads; thread-safe via sync.RWMutex
pipe := reg.LookupPipeline("pipe-anomaly-detection")
router := reg.LookupRouter("router-anomaly-score")
// All endpoints for a given service (useful for service mesh registration)
eps := reg.EndpointsForService("svc-geometric-memory")
// → returns only inbound endpoints for that service
Duplicate detection
Every Register* method returns an error if the ID is already
registered. This prevents silent overwrites during misconfigured restarts:
if err := reg.RegisterChannel(ch); err != nil {
// "integration: channel "ch-bulk-import" already registered"
log.Fatal(err)
}
One Registry per process, multiple Registries in tests
In production, create one Registry at application startup and
share it as a dependency. In tests, create a fresh Registry per
test to avoid state leakage between test cases. The zero-cost constructor
(integration.NewRegistry()) makes this practical:
func TestMyPipeline(t *testing.T) {
reg := integration.NewRegistry() // fresh per test
// register only the components this test needs
reg.RegisterFilter(&integration.MessageFilter{
ID: "f-test",
Predicate: "amount > 10000",
ChannelID: "ch-high-value",
})
// exercise the component in isolation
}
10. Operational characteristics and tuning guidance
Memory footprint
The Registry holds all components in maps. An average
production setup has 10–30 channels, 5–15 endpoints, 3–10 routers, 2–5
aggregators, and 1–3 pipelines. At typical struct sizes this is well under
1 MB. The corpus itself (cell-ID sequences) grows linearly with registered
traces but is managed by the geometric memory system, not the Registry.
FEEL expression performance
FEEL predicate evaluation (PassesFilter, route conditions,
aggregator completion conditions) is implemented as a lightweight interpreter.
Simple comparisons (score > 0.5) are O(1). Compound conditions
(a and b and c) are O(n) where n is the number of conjuncts.
Avoid FEEL expressions that iterate over arrays in hot paths — those are
O(m × n) and should be pre-computed before the message enters the pipeline.
Splitter fan-out limits
There is no built-in fan-out limit on the Splitter. For
batch sizes above 10 000 records, apply back-pressure at the source:
split the batch into chunks before sending to ch-bulk-import,
and let each chunk run through the pipeline independently. This keeps the
Aggregator's in-memory correlation table bounded.
Serialisation and process-definition alignment
Every EIP struct in pkg/integration has JSON tags. The full
Registry can be serialised and restored at startup from a configuration file,
a database, or a remote config service. The only binding between an EIP
component and a BPMN process definition is the ServiceRef on
MessageEndpoint (which must match an ea.BusinessService.ID)
and channel IDs in the process definition's sourceRef / targetRef.
Neither side validates the other at registration time — mismatches surface
at runtime when message dispatch tries a lookup and gets nil.
When to use a Pipeline vs a direct channel
| Use | Scenario |
|---|---|
| Direct channel | Simple point-to-point forwarding with no transformation or routing |
| Filter alone | Noise reduction: one channel, one predicate, no further processing |
| Router alone | Fan-out to N channels based on message content |
| Pipeline | Any sequence of ≥ 2 EIP steps that must be executed in order |
| Aggregator + Pipeline | Collecting results from parallel workers before proceeding |
Conclusion
pkg/integration transforms geometric memory from a
per-instance computation into a production-grade data infrastructure.
The two topologies described in this article cover the complete lifecycle:
-
Big-data corpus building: a
Splitterfans out historical records; aMessageFilterdeduplicates; anAggregatorgates on completion; a pub-subMessageChannelbroadcasts readiness to all consumers. -
Fast-data anomaly routing: a
Pipelinechains Filter → Translator → Scorer → Router in a single declaration; aCorrelationContextreconnects async scores to suspended process instances; FEEL conditions suppress false positives during corpus warm-up.
Both topologies share the same Registry, the same
*items.Message carrier type, and the same FEEL expression
language. There is no separate integration server, no schema registry,
and no deployment pipeline for configuration changes — the Registry is
populated in Go code at startup, alongside your BPMN process deployment.
To see both topologies in action, start with the agentic credit tutorial, which runs the complete fast-data path. Then read the companion article for the mathematical details on cell IDs, shape vectors, and Fréchet distance that power the scoring step at the heart of both pipelines.
pkg/integration maps directly to a pattern in
that book. The FEEL expression language used in predicates and mapping
expressions is defined by the OMG DMN 1.3 specification.