Skip to content

tools

give models functions to execute

basic tool definition

const calculator = {
  name: "calculate",
  description: "Perform basic math operations",
  schema: {
    operation: {
      type: "string",
      description: "The operation to perform",
      enum: ["add", "subtract", "multiply", "divide"],
    },
    a: {
      type: "number",
      description: "First number",
    },
    b: {
      type: "number",
      description: "Second number",
    },
  },
  execute: async ({ operation, a, b }) => {
    switch (operation) {
      case "add": return a + b;
      case "subtract": return a - b;
      case "multiply": return a * b;
      case "divide": return a / b;
      default: return "invalid operation";
    }
  },
};

schema defines parameters, execute runs the function

using tools

import { compose, model, scope } from "@threaded/ai";

const workflow = compose(
  scope(
    {
      tools: [calculator],
    },
    model(),
  ),
);

const result = await workflow("what is 15 * 23?");

model calls tool automatically when needed

zod schemas

use zod for type-safe schemas

import { z } from "zod";

const weather = {
  name: "get_weather",
  description: "Get weather for a city",
  schema: z.object({
    city: z.string().describe("City name"),
    units: z.enum(["celsius", "fahrenheit"]).optional(),
  }),
  execute: async ({ city, units = "celsius" }) => {
    return { city, temp: 22, units };
  },
};

library converts zod schemas automatically

tool limits

limit how many times a tool can be called to prevent infinite loops or excessive api usage.

permanent limit on tool definition

add _maxCalls directly to the tool

const search = {
  name: "web_search",
  description: "Search the web",
  schema: {
    query: { type: "string", description: "Search query" },
  },
  execute: async ({ query }) => {
    return await fetch(`https://api.search.com?q=${query}`);
  },
  _maxCalls: 3,
};

this tool object will use a limit of 3 calls whenever it's added to a scope. if you pass this tool object to multiple workflows, they all use the same limit (3 calls per workflow execution).

dynamic limit per workflow

use maxCalls() helper to set different limits for different workflows

import { maxCalls, compose, model, scope } from "@threaded/ai";

const search = {
  name: "web_search",
  description: "Search the web",
  schema: {
    query: { type: "string", description: "Search query" },
  },
  execute: async ({ query }) => {
    return await fetch(`https://api.search.com?q=${query}`);
  },
};

const limitedWorkflow = compose(
  scope(
    {
      tools: [maxCalls(search, 2)],
    },
    model(),
  ),
);

const generousWorkflow = compose(
  scope(
    {
      tools: [maxCalls(search, 10)],
    },
    model(),
  ),
);

maxCalls() wraps the tool and adds a limit for that specific workflow. useful when different contexts need different limits.

when to use each: - _maxCalls on tool definition: sets default limit on the tool object. all scopes using this tool object see the same limit. - maxCalls() wrapper: creates a new tool object with a different limit. lets you use the same base tool with different limits in different workflows.

parallel execution

execute multiple tool calls at once

scope(
  {
    tools: [weather, calculator, search],
    toolConfig: {
      parallel: true,
    },
  },
  model(),
);

default is sequential execution

tool retry

retry failed tool calls

scope(
  {
    tools: [unreliableApi],
    toolConfig: {
      retryCount: 2,
    },
  },
  model(),
);

retries tool execution up to 2 times on failure

streaming tool events

stream callbacks let you react to tool execution in real-time. useful for showing progress in uis or logging tool usage.

web app streaming

import express from "express";
import { getOrCreateThread, compose, model, scope } from "@threaded/ai";

const app = express();
app.use(express.json());

const weatherTool = {
  name: "get_weather",
  description: "Get weather for a city",
  schema: {
    city: { type: "string", description: "City name" },
  },
  execute: async ({ city }) => {
    const response = await fetch(`https://wttr.in/${city}?format=j1`);
    const data = await response.json();
    return {
      city,
      temp: data.current_condition[0].temp_C,
      condition: data.current_condition[0].weatherDesc[0].value,
    };
  },
};

app.post("/chat/:threadId", async (req, res) => {
  const { threadId } = req.params;
  const { message } = req.body;

  res.setHeader("Content-Type", "text/event-stream");
  res.setHeader("Cache-Control", "no-cache");
  res.setHeader("Connection", "keep-alive");

  const thread = getOrCreateThread(threadId);

  const workflow = compose(
    scope(
      {
        tools: [weatherTool],
        toolConfig: {
          parallel: true,
        },
        stream: (event) => {
          switch (event.type) {
            case "content":
              res.write(`data: ${JSON.stringify({ type: "content", content: event.content })}\n\n`);
              break;
            case "tool_calls_ready":
              res.write(`data: ${JSON.stringify({ type: "tool_calls_ready", calls: event.calls.map(c => c.function.name) })}\n\n`);
              break;
            case "tool_executing":
              res.write(`data: ${JSON.stringify({ type: "tool_executing", name: event.call.function.name, args: JSON.parse(event.call.function.arguments) })}\n\n`);
              break;
            case "tool_complete":
              res.write(`data: ${JSON.stringify({ type: "tool_complete", name: event.call.function.name, result: event.result })}\n\n`);
              break;
            case "tool_error":
              res.write(`data: ${JSON.stringify({ type: "tool_error", name: event.call.function.name, error: event.error })}\n\n`);
              break;
          }
        },
      },
      model(),
    ),
  );

  await thread.message(message, workflow);
  res.write(`data: ${JSON.stringify({ type: "complete" })}\n\n`);
  res.end();
});

app.listen(3000);

stream callback receives events during model execution and sends them to client via sse.

const chatForm = document.getElementById("chat-form");
const messagesDiv = document.getElementById("messages");
const toolStatusDiv = document.getElementById("tool-status");

chatForm.addEventListener("submit", async (e) => {
  e.preventDefault();
  const message = e.target.message.value;

  const response = await fetch("/chat/user-123", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ message }),
  });

  const reader = response.body.getReader();
  const decoder = new TextDecoder();

  while (true) {
    const { value, done } = await reader.read();
    if (done) break;

    const chunk = decoder.decode(value);
    const lines = chunk.split("\n\n");

    for (const line of lines) {
      if (!line.trim() || !line.startsWith("data: ")) continue;

      const data = JSON.parse(line.replace("data: ", ""));

      if (data.type === "content") {
        messagesDiv.textContent += data.content;
      }

      if (data.type === "tool_calls_ready") {
        toolStatusDiv.textContent = `calling: ${data.calls.join(", ")}`;
      }

      if (data.type === "tool_executing") {
        toolStatusDiv.textContent = `executing ${data.name}...`;
      }

      if (data.type === "tool_complete") {
        console.log(`${data.name} result:`, data.result);
        toolStatusDiv.textContent = "";
      }

      if (data.type === "tool_error") {
        toolStatusDiv.textContent = `error: ${data.error}`;
      }

      if (data.type === "complete") {
        toolStatusDiv.textContent = "";
      }
    }
  }
});

client listens to sse stream, updates ui based on event types.

cli streaming

import { compose, model, scope } from "@threaded/ai";

const workflow = compose(
  scope(
    {
      tools: [calculator, weather, search],
      stream: (event) => {
        switch (event.type) {
          case "content":
            process.stdout.write(event.content);
            break;
          case "tool_calls_ready":
            console.log(`\n[tools queued: ${event.calls.map(c => c.function.name).join(", ")}]`);
            break;
          case "tool_executing":
            console.log(`[executing: ${event.call.function.name}]`);
            break;
          case "tool_complete":
            console.log(`[${event.call.function.name} complete]`);
            break;
          case "tool_error":
            console.log(`[${event.call.function.name} failed: ${event.error}]`);
            break;
        }
      },
    },
    model(),
  ),
);

await workflow("what's the weather in tokyo and what's 15 * 23?");

shows tool execution progress in terminal.

tool approval with streaming

when building interactive applications, you often want user approval before executing tools. streaming and approval work together - stream events let the ui show what tools are being called, then approval lets users decide whether to allow them.

see tool approval for the full frontend/backend approval flow with sse.

next: schemas