Pipelines
Use multi-step pipelines with the usePipeline hook.
Pipeline Hook
usePipeline
Compose multi-step AI workflows with unified progress tracking and cancellation. Each step receives the output of the previous step, and the pipeline can be cancelled at any point between steps.
import { usePipeline, chunkStep, embedManyStep, storeStep } from '@localmode/react';
import { transformers } from '@localmode/transformers';
import { createVectorDB } from '@localmode/core';
const model = transformers.embedding('Xenova/all-MiniLM-L6-v2');
const db = await createVectorDB({ name: 'docs', dimensions: 384 });
function IngestPipeline() {
const { result, isRunning, progress, execute, cancel } = usePipeline([
chunkStep({ size: 512, overlap: 50 }),
embedManyStep(model),
storeStep(db),
]);
return (
<div>
<button onClick={() => execute(documentText)} disabled={isRunning}>
{isRunning ? 'Processing...' : 'Ingest Document'}
</button>
{isRunning && (
<p>
Step {progress?.completed}/{progress?.total}: {progress?.currentStep}
</p>
)}
{isRunning && <button onClick={cancel}>Cancel</button>}
{result && <p>Done — document ingested.</p>}
</div>
);
}Return Value
| Property | Type | Description |
|---|---|---|
result | TResult | null | Final output from the last successful run |
isRunning | boolean | Whether the pipeline is currently executing |
error | Error | null | Error from the last failed run |
currentStep | string | null | Name of the currently executing step |
progress | PipelineProgress | null | { completed, total, currentStep } |
execute | (input) => Promise | Start the pipeline with initial input |
cancel | () => void | Cancel the current run |
reset | () => void | Clear all state |
Step Factories
Pre-built step factories cover the most common operations. Each factory returns a PipelineStep object with a name and an execute function.
embedStep
Embed a single text value using an embedding model.
import { embedStep } from '@localmode/react';
embedStep(model)
// Input: string
// Output: EmbedResult { embedding, usage, response }embedManyStep
Embed multiple text values in batch.
import { embedManyStep } from '@localmode/react';
embedManyStep(model)
// Input: string[]
// Output: EmbedManyResult { embeddings, usage, response }chunkStep
Split text into smaller pieces using a chunking strategy.
import { chunkStep } from '@localmode/react';
chunkStep({ size: 512, overlap: 50 })
// Input: string
// Output: string[]searchStep
Search a vector database using an embedding result.
import { searchStep } from '@localmode/react';
searchStep(db, 10)
// Input: EmbedResult or Float32Array
// Output: SearchResult[]rerankStep
Rerank search results using a reranker model.
import { rerankStep } from '@localmode/react';
rerankStep(rerankerModel, { topK: 5 })
// Input: { query: string, documents: string[] }
// Output: RerankResultstoreStep
Store documents in a vector database.
import { storeStep } from '@localmode/react';
storeStep(db)
// Input: Document[]
// Output: Document[] (passthrough)classifyStep
Classify text using a classification model.
import { classifyStep } from '@localmode/react';
classifyStep(classifierModel)
// Input: string
// Output: ClassifyResultsummarizeStep
Summarize text using a summarization model.
import { summarizeStep } from '@localmode/react';
summarizeStep(summaryModel, { maxLength: 150, minLength: 30 })
// Input: string
// Output: SummarizeResultgenerateStep
Generate text using a language model.
import { generateStep } from '@localmode/react';
generateStep(languageModel, { maxTokens: 500, temperature: 0.7, systemPrompt: 'You are helpful.' })
// Input: string (prompt)
// Output: GenerateTextResultCustom Steps
Create custom steps by providing a name and an execute function. The function receives the previous step's output and an AbortSignal.
const filterStep = {
name: 'filter-results',
execute: async (results, signal) => {
signal.throwIfAborted();
return results.filter((r) => r.score > 0.8);
},
};
const { execute } = usePipeline([
embedStep(model),
searchStep(db, 20),
filterStep,
]);Progress Tracking
The progress object updates before each step executes, giving you real-time feedback for building progress UIs.
function PipelineProgress({ progress, currentStep, isRunning }) {
if (!isRunning || !progress) return null;
const percent = Math.round((progress.completed / progress.total) * 100);
return (
<div>
<progress className="progress progress-primary w-full" value={percent} max={100} />
<p>
Step {progress.completed + 1} of {progress.total}: {currentStep}
</p>
</div>
);
}Cancellation
Calling cancel() aborts the pipeline between steps. The current step finishes, and no further steps run. Steps that check signal.throwIfAborted() will abort mid-execution.
function CancellableIngest() {
const { isRunning, progress, execute, cancel } = usePipeline([
chunkStep({ size: 512 }),
embedManyStep(model),
storeStep(db),
]);
return (
<div>
<button onClick={() => execute(text)} disabled={isRunning}>Process</button>
{isRunning && (
<button onClick={cancel}>
Cancel (at step {progress?.completed}/{progress?.total})
</button>
)}
</div>
);
}Example: RAG Pipeline with Progress UI
A complete retrieval-augmented generation pipeline that chunks a document, embeds and stores it, then searches and summarizes results.
import {
usePipeline,
chunkStep,
embedManyStep,
storeStep,
embedStep,
searchStep,
summarizeStep,
} from '@localmode/react';
import { transformers } from '@localmode/transformers';
import { createVectorDB } from '@localmode/core';
const embeddingModel = transformers.embedding('Xenova/all-MiniLM-L6-v2');
const summaryModel = transformers.summarizer('Xenova/distilbart-cnn-6-6');
const db = await createVectorDB({ name: 'rag-docs', dimensions: 384 });
// Step 1: Ingest pipeline — chunk, embed, store
function IngestPanel() {
const { isRunning, progress, error, execute, cancel } = usePipeline([
chunkStep({ size: 512, overlap: 50 }),
embedManyStep(embeddingModel),
storeStep(db),
]);
const [text, setText] = useState('');
return (
<div>
<textarea value={text} onChange={(e) => setText(e.target.value)} />
<button onClick={() => execute(text)} disabled={isRunning}>
Ingest
</button>
{isRunning && (
<div>
<progress
className="progress progress-primary w-full"
value={progress?.completed ?? 0}
max={progress?.total ?? 1}
/>
<p>Running: {progress?.currentStep}</p>
<button onClick={cancel}>Cancel</button>
</div>
)}
{error && <p className="text-error">{error.message}</p>}
</div>
);
}
// Step 2: Query pipeline — embed query, search, summarize
function QueryPanel() {
const { result, isRunning, progress, error, execute, cancel } = usePipeline<{
summary: string;
}>([
embedStep(embeddingModel),
searchStep(db, 5),
summarizeStep(summaryModel, { maxLength: 200 }),
]);
const [query, setQuery] = useState('');
return (
<div>
<input value={query} onChange={(e) => setQuery(e.target.value)} />
<button onClick={() => execute(query)} disabled={isRunning}>
Ask
</button>
{isRunning && <p>Step: {progress?.currentStep}</p>}
{isRunning && <button onClick={cancel}>Cancel</button>}
{result && <p>{result.summary}</p>}
{error && <p className="text-error">{error.message}</p>}
</div>
);
}For full API reference on the underlying core functions (chunk(), embed(), semanticSearch()), see the Core RAG guide.