threads
persistent conversation storage that automatically manages message history
threads let you maintain stateful conversations across multiple exchanges. each thread has an id and a storage backend. when you send a message, the thread loads history, runs your workflow, then saves the updated history back to storage.
basic usage
import { getOrCreateThread } from "@threaded/ai";
const thread = getOrCreateThread("user-123");
await thread.message("hello, i'm building a todo app");
await thread.message("what should i name it?");
here's what happens:
getOrCreateThread("user-123")creates or retrieves thread with id "user-123"- first
message()call:- loads history (empty on first call)
- adds your message to history
- calls
model()with default settings (openai/gpt-4o-miniat time of writing) - saves updated history (your message + model response)
- second
message()call:- loads history (now contains previous exchange)
- adds new message to history
- calls model with full conversation context
- model remembers the previous conversation
- saves updated history
default model: when you don't specify a workflow, thread.message() uses model() which defaults to openai/gpt-4o-mini.
specifying which model to use
pass a workflow with the model you want
import { getOrCreateThread, model } from "@threaded/ai";
const thread = getOrCreateThread("user-123");
await thread.message(
"explain quantum entanglement",
model({ model: "openai/gpt-4o" })
);
model is used only for this message. next call uses default again unless you specify.
using same model for all messages
create a reusable workflow
import { getOrCreateThread, model } from "@threaded/ai";
const thread = getOrCreateThread("user-123");
const gpt4 = model({ model: "openai/gpt-4o" });
await thread.message("first message", gpt4);
await thread.message("second message", gpt4);
await thread.message("third message", gpt4);
all three messages use gpt-4o.
different models for different threads
import { getOrCreateThread, model } from "@threaded/ai";
const fastThread = getOrCreateThread("quick-questions");
const smartThread = getOrCreateThread("complex-analysis");
const quick = model({ model: "openai/gpt-4o-mini" });
const smart = model({ model: "anthropic/claude-opus-4-20250514" });
await fastThread.message("what's 2+2?", quick);
await smartThread.message("analyze this research paper...", smart);
each thread maintains separate history with different models.
with tools and system prompts
use compose and scope for complex workflows
import { getOrCreateThread, compose, model, scope } from "@threaded/ai";
const thread = getOrCreateThread("user-123");
const workflow = compose(
scope(
{
system: "you are a helpful coding assistant",
tools: [readFile, writeFile, searchWeb],
},
model({ model: "anthropic/claude-sonnet-4-5-20250929" }),
),
);
await thread.message("help me debug this function", workflow);
await thread.message("now add error handling", workflow);
workflow includes system prompt, tools, and model choice. thread maintains history, workflow defines behavior.
in-memory storage
default storage keeps messages in memory
conversations are lost when process exits. useful for: - development and testing - temporary sessions - stateless deployments where you don't need persistence
custom storage
implement two methods to persist conversations to any database
import { getOrCreateThread, model } from "@threaded/ai";
import Database from "better-sqlite3";
const db = new Database("threads.db");
db.exec(`
CREATE TABLE IF NOT EXISTS threads (
id TEXT PRIMARY KEY,
messages TEXT,
updated_at INTEGER
)
`);
const thread = getOrCreateThread("user-123", {
async get(id) {
const row = db.prepare("SELECT messages FROM threads WHERE id = ?").get(id);
return row ? JSON.parse(row.messages) : [];
},
async set(id, messages) {
db.prepare(
"INSERT OR REPLACE INTO threads (id, messages, updated_at) VALUES (?, ?, ?)"
).run(id, JSON.stringify(messages), Date.now());
},
});
await thread.message("hello");
now conversations persist across restarts. works with any storage:
postgres:
const thread = getOrCreateThread("user-123", {
async get(id) {
const result = await pool.query(
"SELECT messages FROM threads WHERE id = $1",
[id]
);
return result.rows[0]?.messages || [];
},
async set(id, messages) {
await pool.query(
"INSERT INTO threads (id, messages) VALUES ($1, $2) ON CONFLICT (id) DO UPDATE SET messages = $2",
[id, JSON.stringify(messages)]
);
},
});
redis:
const thread = getOrCreateThread("user-123", {
async get(id) {
const data = await redis.get(`thread:${id}`);
return data ? JSON.parse(data) : [];
},
async set(id, messages) {
await redis.set(`thread:${id}`, JSON.stringify(messages));
},
});
thread methods
message
adds user message to history, runs workflow, saves result
const result = await thread.message("what's the weather?", workflow);
console.log(result.lastResponse.content);
parameters:
- content (string): user message to add to history
- workflow (optional): workflow to run. defaults to model() with openai/gpt-4o-mini
returns: ConversationContext with full history and model response
when to use: normal user interactions where you want to add their message to history
generate
runs workflow without adding user message
parameters:
- workflow: workflow to run with current history
returns: ConversationContext with updated history
when to use: - autonomous agents that act without user input - scheduled tasks that analyze conversation history - background processing that updates thread state
example - scheduled summary:
import { getOrCreateThread, model } from "@threaded/ai";
const thread = getOrCreateThread("support-ticket-123");
setInterval(async () => {
const history = await thread.store.get("support-ticket-123");
if (history.length > 20) {
await thread.generate(
model({
system: "summarize the conversation and add it to history as an assistant message",
})
);
}
}, 60000);
model generates summary based on history without needing a user message.
accessing thread history directly
read history without running a workflow
const thread = getOrCreateThread("user-123");
const history = await thread.store.get("user-123");
console.log(history);
useful for displaying conversation history in ui, analytics, etc.
real-world pattern - web app
complete example with express and persistent storage
import express from "express";
import { getOrCreateThread, compose, model, scope } from "@threaded/ai";
import Database from "better-sqlite3";
const app = express();
app.use(express.json());
const db = new Database("threads.db");
db.exec(`
CREATE TABLE IF NOT EXISTS threads (
id TEXT PRIMARY KEY,
messages TEXT,
updated_at INTEGER
)
`);
const createThread = (id) => getOrCreateThread(id, {
async get(id) {
const row = db.prepare("SELECT messages FROM threads WHERE id = ?").get(id);
return row ? JSON.parse(row.messages) : [];
},
async set(id, messages) {
db.prepare(
"INSERT OR REPLACE INTO threads (id, messages, updated_at) VALUES (?, ?, ?)"
).run(id, JSON.stringify(messages), Date.now());
},
});
const workflow = compose(
scope(
{
system: "you are a helpful assistant",
tools: [calculator, weather],
},
model({ model: "openai/gpt-4o" }),
),
);
app.post("/chat/:threadId", async (req, res) => {
const thread = createThread(req.params.threadId);
const result = await thread.message(req.body.message, workflow);
res.json({
response: result.lastResponse.content,
});
});
app.get("/history/:threadId", async (req, res) => {
const thread = createThread(req.params.threadId);
const history = await thread.store.get(req.params.threadId);
res.json({ history });
});
app.listen(3000);
each user gets their own thread. history persists in sqlite. all messages use same workflow with tools.
next: composition