Skip to main content

Command Palette

Search for a command to run...

πŸ’¬ Streaming AI Agents and an Interactive CLI: Real-Time MCP in TypeScript

See the AI think token-by-token, watch tool calls fire live, and chat with your agent in a real interactive loop β€” all with the Anthropic streaming API

Published
β€’10 min read
πŸ’¬ Streaming AI Agents and an Interactive CLI: Real-Time MCP in TypeScript
T

Hi πŸ‘‹, I'm Tushar Patil. Currently I am working as Frontend Developer (Angular) and also have expertise with .Net Core and Framework.


This is Part 4 of the AI Engineering with TypeScript series. Prerequisites: Part 1 Β· Part 2 Β· Part 3 β€” Building an AI Agent Stack: Node.js 20+ Β· @anthropic-ai/sdk Β· @modelcontextprotocol/sdk v1.x Β· TypeScript 5.x


πŸ—ΊοΈ What we'll cover

In Part 3 we built a complete agent loop β€” the model called tools in sequence and synthesized a final answer. But there was a catch: the user saw nothing until the entire response was ready. For a task that takes 3–4 tool calls and 10 seconds of thinking, that is a terrible experience.

In Part 4 we fix that with streaming. We'll also wrap everything in an interactive CLI so you can chat with your agent like a real app.

By the end you'll have:

  • ⚑ A streaming agent loop that prints tokens as they arrive
  • πŸ”§ Live tool-call display β€” the user sees tool calls fire in real time
  • πŸ’¬ An interactive multi-turn CLI powered by Node.js readline
  • πŸ” Conversation memory β€” the agent remembers what was said earlier in the session
  • 🧹 A clean graceful shutdown so the MCP server process exits cleanly

⚑ Part 1: Why Streaming Matters for Agents

Without streaming, your agent UX looks like this:

User asks question β†’ ....10 seconds of silence.... β†’ Full answer appears

With streaming:

User asks question β†’ "Let me check the weather..." β†’ πŸ”§ get_weather() β†’ "Pune is 31Β°C..." β†’ final answer

Streaming makes two things visible that were previously hidden: the model's reasoning text (before a tool call) and the tool calls themselves as they are decided. This transforms a black-box wait into a transparent thought process β€” and users trust it far more. 🎯


⚑ Part 2: Streaming with the Anthropic SDK

The Anthropic SDK exposes a .stream() method that returns an async event stream. Each event is typed and tells you exactly what is happening:

import Anthropic from "@anthropic-ai/sdk";

const anthropic = new Anthropic();

const stream = await anthropic.messages.stream({
  model: "claude-sonnet-4-20250514",
  max_tokens: 4096,
  tools: myTools,
  messages: myMessages,
});

for await (const event of stream) {
  switch (event.type) {
    case "content_block_start":
      // a new block started β€” text or tool_use
      break;
    case "content_block_delta":
      // a chunk of a block arrived
      if (event.delta.type === "text_delta") {
        process.stdout.write(event.delta.text); // stream text live
      } else if (event.delta.type === "input_json_delta") {
        // tool input JSON is streaming in β€” accumulate it
      }
      break;
    case "content_block_stop":
      // block finished
      break;
    case "message_stop":
      // full response done
      break;
  }
}

const finalMessage = await stream.finalMessage();

The key insight: stream.finalMessage() gives you the complete assembled Message object after the stream ends β€” the same shape as a non-streaming response. You use the streaming events for display, and finalMessage() for the logic that continues the agent loop. No need to manually reassemble tool inputs from deltas. βœ…


πŸ”§ Part 3: A Streaming Agent Loop

Here is the full streaming agent loop. Compare it to Part 3's non-streaming version β€” the structure is almost identical, but now text appears token by token:

// src/streaming-agent.ts
import Anthropic from "@anthropic-ai/sdk";
import { Client } from "@modelcontextprotocol/sdk/client/index.js";

const anthropic = new Anthropic();

export async function runStreamingAgent(
  client: Client,
  tools: Anthropic.Tool[],
  messages: Anthropic.MessageParam[]
): Promise<string> {
  while (true) {
    process.stdout.write("\nπŸ€– Agent: ");

    const stream = await anthropic.messages.stream({
      model: "claude-sonnet-4-20250514",
      max_tokens: 4096,
      tools,
      messages,
    });

    // Stream text tokens live as they arrive
    for await (const event of stream) {
      if (
        event.type === "content_block_delta" &&
        event.delta.type === "text_delta"
      ) {
        process.stdout.write(event.delta.text);
      }
    }

    // Get the fully assembled response
    const response = await stream.finalMessage();

    // Append the assistant turn to history
    messages.push({ role: "assistant", content: response.content });

    if (response.stop_reason === "end_turn") {
      process.stdout.write("\n");
      // Extract and return the final text
      return response.content
        .filter((b): b is Anthropic.TextBlock => b.type === "text")
        .map((b) => b.text)
        .join("\n");
    }

    if (response.stop_reason === "tool_use") {
      const toolResults: Anthropic.ToolResultBlockParam[] = [];

      for (const block of response.content) {
        if (block.type !== "tool_use") continue;

        process.stdout.write(
          `\n  πŸ”§ [tool_use] \({block.name}(\){JSON.stringify(block.input)})\n`
        );

        const resultText = await callMcpTool(client, block.name, block.input as Record<string, unknown>);

        process.stdout.write(`  βœ… [result] ${resultText.slice(0, 80)}...\n`);

        toolResults.push({
          type: "tool_result",
          tool_use_id: block.id,
          content: resultText,
        });
      }

      messages.push({ role: "user", content: toolResults });
    }
  }
}

async function callMcpTool(
  client: Client,
  toolName: string,
  toolInput: Record<string, unknown>
): Promise<string> {
  try {
    const result = await client.callTool({ name: toolName, arguments: toolInput });
    const text = result.content
      .filter((c) => c.type === "text")
      .map((c) => (c as { type: "text"; text: string }).text)
      .join("\n");
    return result.isError ? `Error: ${text}` : text;
  } catch (err) {
    return `Tool \({toolName} failed: \){err instanceof Error ? err.message : String(err)}`;
  }
}

The streaming and tool-call logic are completely decoupled: streaming only affects what you display. The agent loop logic β€” collecting tool results, appending to history β€” stays identical to Part 3. 🎯


πŸ’¬ Part 4: Building the Interactive CLI

Now let's wrap the agent in a proper interactive session. We use Node.js's built-in readline module β€” no extra dependencies:

// src/cli.ts
import readline from "readline";
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import Anthropic from "@anthropic-ai/sdk";
import { runStreamingAgent } from "./streaming-agent.js";

const SYSTEM_PROMPT = `You are a helpful weather assistant with access to real-time weather data via MCP tools.
Be concise, friendly, and always use the available tools to give accurate answers.
When checking weather for events, proactively mention relevant details like rain, UV index, or wind.`;

export async function startInteractiveCLI(
  client: Client,
  tools: Anthropic.Tool[]
) {
  const rl = readline.createInterface({
    input: process.stdin,
    output: process.stdout,
    terminal: true,
  });

  // Conversation history persists across turns
  const messages: Anthropic.MessageParam[] = [];

  console.log("\n🌀️  Weather Agent ready! Type your question or 'exit' to quit.\n");

  const askQuestion = () => {
    rl.question("You: ", async (userInput) => {
      const trimmed = userInput.trim();

      if (!trimmed) {
        askQuestion();
        return;
      }

      if (trimmed.toLowerCase() === "exit") {
        console.log("\nπŸ‘‹ Goodbye!\n");
        rl.close();
        return;
      }

      // Add user message to history
      messages.push({ role: "user", content: trimmed });

      try {
        await runStreamingAgent(client, tools, messages);
      } catch (err) {
        console.error("\n❌ Agent error:", err instanceof Error ? err.message : err);
      }

      // Loop β€” ask for the next input
      askQuestion();
    });
  };

  askQuestion();

  // Return a promise that resolves when the user exits
  return new Promise<void>((resolve) => {
    rl.on("close", resolve);
  });
}

Three things to notice here. The messages array is declared outside askQuestion() β€” this is your conversation memory. Every turn appends to it, so the agent knows what was said before. Calling askQuestion() recursively at the end of each turn creates the interactive loop without blocking the event loop. The readline.close promise lets main.ts await clean shutdown. βœ…


πŸ” Part 5: Conversation Memory in Action

Because we persist the messages array across turns, the agent can answer follow-up questions that reference earlier context:

You: Should I plan a cricket match in Pune on Saturday?

πŸ€– Agent: Let me check the weather...
  πŸ”§ [tool_use] get_current_weather({"city":"Pune"})
  βœ… [result] 31 degrees C, Partly Cloudy
  πŸ”§ [tool_use] get_forecast({"city":"Pune"})
  βœ… [result] Sat: 29Β°C, Light Rain Likely
Saturday has light rain forecast β€” I'd pick Friday instead! 🏏

You: What about the UV index on Friday?

πŸ€– Agent: Based on Friday's partly cloudy forecast for Pune...
  πŸ”§ [tool_use] get_forecast({"city":"Pune"})
  βœ… [result] Fri: 32Β°C, UV Index 7 (High)
Friday's UV index is 7 (High). Sunscreen and hats are a must for an outdoor match!

The agent did not ask "which city?" on the second turn β€” it remembered Pune from earlier. That is conversation memory at work. 🎯


🧹 Part 6: Graceful Shutdown

When the user types exit, we need to close the MCP server process cleanly β€” otherwise it lingers as an orphan process. Here is the full main.ts with proper cleanup:

// src/main.ts
import { createMcpClient } from "./client.js";
import { discoverTools } from "./agent.js";
import { startInteractiveCLI } from "./cli.js";
import path from "path";
import { fileURLToPath } from "url";

const __dirname = path.dirname(fileURLToPath(import.meta.url));

async function main() {
  const serverPath = path.resolve(
    __dirname,
    "../../weather-server/dist/index.js"
  );

  console.log("πŸ”Œ Connecting to Weather MCP server...");
  const client = await createMcpClient("node", [serverPath]);

  const tools = await discoverTools(client);

  // Run the interactive CLI β€” this awaits until the user types 'exit'
  await startInteractiveCLI(client, tools);

  // Clean up: close the MCP client (kills the server process)
  await client.close();
  console.log("βœ… MCP server disconnected. Bye!");
  process.exit(0);
}

main().catch((err) => {
  console.error("Fatal error:", err);
  process.exit(1);
});

client.close() sends a proper JSON-RPC shutdown to the server and then terminates the child process. Always call it on exit β€” never rely on process garbage collection to clean up child processes. 🚨


πŸƒ Part 7: Running It

# Build everything
npm run build

# Start the agent
node dist/main.js

# Sample session:
🌀️  Weather Agent ready! Type your question or 'exit' to quit.

You: What is the weather like in Mumbai today?

πŸ€– Agent: Let me check that for you...
  πŸ”§ [tool_use] get_current_weather({"city":"Mumbai","country":"IN"})
  βœ… [result] 34Β°C, Humid, Mostly Sunny
Mumbai is hot and humid today at 34Β°C with mostly sunny skies. Stay hydrated if you're heading out! β˜€οΈ

You: And Pune?

πŸ€– Agent:
  πŸ”§ [tool_use] get_current_weather({"city":"Pune","country":"IN"})
  βœ… [result] 31Β°C, Partly Cloudy
Pune is a bit cooler at 31Β°C with partly cloudy skies β€” more pleasant than Mumbai today! 🌀️

You: exit
πŸ‘‹ Goodbye!

πŸ› οΈ Part 8: Tips for Production CLI Agents

Handle Ctrl+C gracefully.

Add a SIGINT handler so the user can quit with Ctrl+C in addition to typing "exit":

process.on("SIGINT", async () => {
  console.log("\n\nCaught SIGINT β€” shutting down...");
  await client.close();
  process.exit(0);
});

Cap conversation history length.

The messages array grows forever in a long session and will eventually overflow the context window. A simple fix is to keep only the last N turns:

const MAX_HISTORY_TURNS = 20;

if (messages.length > MAX_HISTORY_TURNS * 2) {
  // Keep the first message (system context if any) + recent turns
  messages.splice(1, messages.length - MAX_HISTORY_TURNS * 2);
}

Show a spinner during tool calls.

Tool calls can take a second or two. A simple spinner prevents the terminal from feeling frozen:

const spinner = ["|", "/", "-", "\\"];
let i = 0;
const interval = setInterval(() => {
  process.stdout.write(`\r  ⏳ ${spinner[i++ % spinner.length]}`);
}, 100);

const result = await client.callTool({ name: toolName, arguments: toolInput });

clearInterval(interval);
process.stdout.write("\r");

Persist conversation to disk for resumable sessions.

import fs from "fs";

function saveSession(messages: Anthropic.MessageParam[]) {
  fs.writeFileSync("session.json", JSON.stringify(messages, null, 2));
}

function loadSession(): Anthropic.MessageParam[] {
  if (fs.existsSync("session.json")) {
    return JSON.parse(fs.readFileSync("session.json", "utf-8"));
  }
  return [];
}

🎯 Summary

In Part 4 you upgraded the agent from Part 3 with:

  • ⚑ Streaming β€” text appears token by token using .stream() and finalMessage()
  • πŸ”§ Live tool-call display β€” users see exactly which tools fire and with what inputs
  • πŸ’¬ Interactive CLI β€” a proper readline loop with clean shutdown
  • πŸ” Conversation memory β€” the agent remembers earlier turns in the session
  • 🧹 Graceful shutdown β€” client.close() cleans up the MCP server process

In Part 5 we'll move beyond stdio and build a production-ready MCP server with Streamable HTTP transport β€” so your server can run as a proper web service, support OAuth authentication, and be deployed in a Docker container. 🐳


πŸ“š Further Reading