Pipelines
Composable multi-step workflows with progress tracking and cancellation.
Pipelines let you compose multiple AI operations into a single, cancellable workflow with progress tracking. Chain embedding, search, classification, summarization, and generation steps together.
createPipeline()
Create a pipeline with the builder API:
import { createPipeline, embed } from '@localmode/core';
import { transformers } from '@localmode/transformers';
const model = transformers.embedding('Xenova/bge-small-en-v1.5');
const db = await createVectorDB({ name: 'docs', dimensions: 384 });
const pipeline = createPipeline('embed-search')
.step('embed', async (text: string, signal) => {
return embed({ model, value: text, abortSignal: signal });
})
.step('search', async (embedResult, signal) => {
return db.search(embedResult.embedding, { k: 10 });
})
.build();
const { result, durationMs } = await pipeline.run('What is machine learning?');
console.log(`Found ${result.length} results in ${durationMs}ms`);const { result } = await pipeline.run('query text', {
onProgress: (progress) => {
console.log(
`Step ${progress.completed + 1}/${progress.total}: ${progress.currentStep}`
);
},
});const controller = new AbortController();
const { result } = await pipeline.run('query text', {
abortSignal: controller.signal,
});
// Cancel the pipeline at any point
controller.abort();Builder API
The builder returned by createPipeline() has two methods for adding steps:
.step(name, execute)
Add an inline step with a name and async function. Each step receives the output of the previous step as its input, plus an AbortSignal.
createPipeline('my-pipeline')
.step('first', async (input: string, signal) => {
// Returns output that becomes the next step's input
return { text: input.toUpperCase() };
})
.step('second', async (prev, signal) => {
return prev.text.length;
})
.build();.addStep(pipelineStep)
Add a pre-built PipelineStep object. Useful with the step factories below.
import { createPipeline, pipelineEmbedStep } from '@localmode/core';
createPipeline('ingest')
.addStep(pipelineEmbedStep(model))
.addStep(pipelineSearchStep(db, { k: 5 }))
.build();.build()
Finalize the pipeline and return a Pipeline object ready for execution.
Pipeline Interface
Prop
Type
PipelineRunOptions
Prop
Type
PipelineProgress
Prop
Type
PipelineResult
Prop
Type
Pre-Built Step Factories
LocalMode provides step factories for common AI operations. Each returns a PipelineStep that can be passed to .addStep().
| Factory | Input | Output | Description |
|---|---|---|---|
pipelineEmbedStep(model) | string | EmbedResult | Embed a single text value |
pipelineEmbedManyStep(model) | string[] | EmbedManyResult | Embed multiple values |
pipelineChunkStep(options) | string | Chunk[] | Chunk text using a given strategy |
pipelineSearchStep(db, options?) | Float32Array | EmbedResult | SearchResult[] | Search a vector database |
pipelineRerankStep(model, options?) | { query, documents } | RerankResult | Rerank search results |
pipelineStoreStep(db) | Document[] | Document[] | Store documents in a vector database |
pipelineClassifyStep(model) | string | ClassifyResult | Classify text |
pipelineSummarizeStep(model, options?) | string | SummarizeResult | Summarize text |
pipelineGenerateStep(model, options?) | string | GenerateTextResult | Generate text with an LLM |
pipelineSemanticChunkStep(model, options?) | string | Chunk[] | Semantic (embedding-aware) chunking |
import {
createPipeline,
pipelineEmbedStep,
pipelineSearchStep,
} from '@localmode/core';
const pipeline = createPipeline('semantic-search')
.addStep(pipelineEmbedStep(embeddingModel))
.addStep(pipelineSearchStep(db, { k: 5 }))
.build();Error Handling
When a step throws, the pipeline wraps the error in a PipelineError that includes step context:
import { PipelineError } from '@localmode/core';
try {
await pipeline.run('input');
} catch (error) {
if (error instanceof PipelineError) {
console.log(error.message);
// 'Pipeline "my-pipeline" failed at step "embed" (1/3)'
console.log(error.stepName); // 'embed'
console.log(error.stepIndex); // 0
console.log(error.cause); // The original error
}
}AbortError exceptions are re-thrown as-is and are not wrapped in PipelineError, so you can catch cancellations with standard error.name === 'AbortError' checks.
Examples
RAG Ingest Pipeline
import {
createPipeline,
pipelineChunkStep,
pipelineEmbedManyStep,
pipelineStoreStep,
} from '@localmode/core';
const ingestPipeline = createPipeline('rag-ingest')
.step('chunk', async (text: string) => {
const { chunk } = await import('@localmode/core');
const chunks = chunk(text, { strategy: 'recursive', size: 512, overlap: 50 });
return chunks.map((c) => c.text);
})
.addStep(pipelineEmbedManyStep(embeddingModel))
.step('store', async (embedResult) => {
const docs = embedResult.embeddings.map((vec, i) => ({
id: crypto.randomUUID(),
vector: vec,
metadata: { text: embedResult.values?.[i] ?? '' },
}));
await db.addMany(docs);
return docs.length;
})
.build();
const { result, durationMs } = await ingestPipeline.run(documentText, {
onProgress: (p) => console.log(`${p.currentStep}...`),
});
console.log(`Ingested ${result} chunks in ${durationMs}ms`);Classify-Then-Summarize Pipeline
const pipeline = createPipeline('classify-summarize')
.addStep(pipelineClassifyStep(classifierModel))
.step('summarize-if-relevant', async (classifyResult, signal) => {
const topLabel = classifyResult.labels[0];
if (topLabel.label === 'relevant' && topLabel.score > 0.8) {
const { summarize } = await import('@localmode/core');
return summarize({ model: summarizerModel, text: classifyResult.text, abortSignal: signal });
}
return { summary: 'Not relevant', usage: { tokens: 0 } };
})
.build();React Integration
Use the usePipeline hook from @localmode/react for pipelines in React components:
import { usePipeline, embedStep, searchStep } from '@localmode/react';
function SearchComponent() {
const pipeline = createPipeline('search')
.addStep(embedStep(model))
.addStep(searchStep(db, { k: 5 }))
.build();
const { execute, data, isLoading, error, cancel } = usePipeline({ pipeline });
return (
<button onClick={() => execute('search query')} disabled={isLoading}>
{isLoading ? 'Searching...' : 'Search'}
</button>
);
}See the React Hooks documentation for more details.