LocalMode
Core

Pipelines

Composable multi-step workflows with progress tracking and cancellation.

Pipelines let you compose multiple AI operations into a single, cancellable workflow with progress tracking. Chain embedding, search, classification, summarization, and generation steps together.

createPipeline()

Create a pipeline with the builder API:

import { createPipeline, embed } from '@localmode/core';
import { transformers } from '@localmode/transformers';

const model = transformers.embedding('Xenova/bge-small-en-v1.5');
const db = await createVectorDB({ name: 'docs', dimensions: 384 });

const pipeline = createPipeline('embed-search')
  .step('embed', async (text: string, signal) => {
    return embed({ model, value: text, abortSignal: signal });
  })
  .step('search', async (embedResult, signal) => {
    return db.search(embedResult.embedding, { k: 10 });
  })
  .build();

const { result, durationMs } = await pipeline.run('What is machine learning?');
console.log(`Found ${result.length} results in ${durationMs}ms`);
const { result } = await pipeline.run('query text', {
  onProgress: (progress) => {
    console.log(
      `Step ${progress.completed + 1}/${progress.total}: ${progress.currentStep}`
    );
  },
});
const controller = new AbortController();

const { result } = await pipeline.run('query text', {
  abortSignal: controller.signal,
});

// Cancel the pipeline at any point
controller.abort();

Builder API

The builder returned by createPipeline() has two methods for adding steps:

.step(name, execute)

Add an inline step with a name and async function. Each step receives the output of the previous step as its input, plus an AbortSignal.

createPipeline('my-pipeline')
  .step('first', async (input: string, signal) => {
    // Returns output that becomes the next step's input
    return { text: input.toUpperCase() };
  })
  .step('second', async (prev, signal) => {
    return prev.text.length;
  })
  .build();

.addStep(pipelineStep)

Add a pre-built PipelineStep object. Useful with the step factories below.

import { createPipeline, pipelineEmbedStep } from '@localmode/core';

createPipeline('ingest')
  .addStep(pipelineEmbedStep(model))
  .addStep(pipelineSearchStep(db, { k: 5 }))
  .build();

.build()

Finalize the pipeline and return a Pipeline object ready for execution.

Pipeline Interface

Prop

Type

PipelineRunOptions

Prop

Type

PipelineProgress

Prop

Type

PipelineResult

Prop

Type

Pre-Built Step Factories

LocalMode provides step factories for common AI operations. Each returns a PipelineStep that can be passed to .addStep().

FactoryInputOutputDescription
pipelineEmbedStep(model)stringEmbedResultEmbed a single text value
pipelineEmbedManyStep(model)string[]EmbedManyResultEmbed multiple values
pipelineChunkStep(options)stringChunk[]Chunk text using a given strategy
pipelineSearchStep(db, options?)Float32Array | EmbedResultSearchResult[]Search a vector database
pipelineRerankStep(model, options?){ query, documents }RerankResultRerank search results
pipelineStoreStep(db)Document[]Document[]Store documents in a vector database
pipelineClassifyStep(model)stringClassifyResultClassify text
pipelineSummarizeStep(model, options?)stringSummarizeResultSummarize text
pipelineGenerateStep(model, options?)stringGenerateTextResultGenerate text with an LLM
pipelineSemanticChunkStep(model, options?)stringChunk[]Semantic (embedding-aware) chunking
import {
  createPipeline,
  pipelineEmbedStep,
  pipelineSearchStep,
} from '@localmode/core';

const pipeline = createPipeline('semantic-search')
  .addStep(pipelineEmbedStep(embeddingModel))
  .addStep(pipelineSearchStep(db, { k: 5 }))
  .build();

Error Handling

When a step throws, the pipeline wraps the error in a PipelineError that includes step context:

import { PipelineError } from '@localmode/core';

try {
  await pipeline.run('input');
} catch (error) {
  if (error instanceof PipelineError) {
    console.log(error.message);
    // 'Pipeline "my-pipeline" failed at step "embed" (1/3)'
    console.log(error.stepName); // 'embed'
    console.log(error.stepIndex); // 0
    console.log(error.cause); // The original error
  }
}

AbortError exceptions are re-thrown as-is and are not wrapped in PipelineError, so you can catch cancellations with standard error.name === 'AbortError' checks.

Examples

RAG Ingest Pipeline

import {
  createPipeline,
  pipelineChunkStep,
  pipelineEmbedManyStep,
  pipelineStoreStep,
} from '@localmode/core';

const ingestPipeline = createPipeline('rag-ingest')
  .step('chunk', async (text: string) => {
    const { chunk } = await import('@localmode/core');
    const chunks = chunk(text, { strategy: 'recursive', size: 512, overlap: 50 });
    return chunks.map((c) => c.text);
  })
  .addStep(pipelineEmbedManyStep(embeddingModel))
  .step('store', async (embedResult) => {
    const docs = embedResult.embeddings.map((vec, i) => ({
      id: crypto.randomUUID(),
      vector: vec,
      metadata: { text: embedResult.values?.[i] ?? '' },
    }));
    await db.addMany(docs);
    return docs.length;
  })
  .build();

const { result, durationMs } = await ingestPipeline.run(documentText, {
  onProgress: (p) => console.log(`${p.currentStep}...`),
});
console.log(`Ingested ${result} chunks in ${durationMs}ms`);

Classify-Then-Summarize Pipeline

const pipeline = createPipeline('classify-summarize')
  .addStep(pipelineClassifyStep(classifierModel))
  .step('summarize-if-relevant', async (classifyResult, signal) => {
    const topLabel = classifyResult.labels[0];
    if (topLabel.label === 'relevant' && topLabel.score > 0.8) {
      const { summarize } = await import('@localmode/core');
      return summarize({ model: summarizerModel, text: classifyResult.text, abortSignal: signal });
    }
    return { summary: 'Not relevant', usage: { tokens: 0 } };
  })
  .build();

React Integration

Use the usePipeline hook from @localmode/react for pipelines in React components:

import { usePipeline, embedStep, searchStep } from '@localmode/react';

function SearchComponent() {
  const pipeline = createPipeline('search')
    .addStep(embedStep(model))
    .addStep(searchStep(db, { k: 5 }))
    .build();

  const { execute, data, isLoading, error, cancel } = usePipeline({ pipeline });

  return (
    <button onClick={() => execute('search query')} disabled={isLoading}>
      {isLoading ? 'Searching...' : 'Search'}
    </button>
  );
}

See the React Hooks documentation for more details.

Next Steps

On this page