Skip to main content

Implementing Streaming with HTTP SSE

This guide explains how to implement Server-Sent Events (SSE) for streaming responses from your MCP server.

Introduction

Streaming responses provide a more interactive experience for users by sending data incrementally as it becomes available, rather than waiting for the entire response to be generated. In the context of AI applications, streaming allows users to see AI-generated content as it's being produced.

The Model Context Protocol supports streaming using HTTP Server-Sent Events (SSE), a standard for server-to-client streaming that's well-supported across browsers and environments.

Understanding SSE

Server-Sent Events (SSE) is a web API that allows servers to push updates to clients over HTTP. Unlike WebSockets, SSE:

  • Uses a standard HTTP connection
  • Is unidirectional (server to client only)
  • Automatically reconnects if the connection is lost
  • Has built-in support in most browsers and HTTP clients

Implementing SSE in MCP Servers

Server-Side Implementation

Here's how to implement SSE streaming in your MCP server:

// src/controllers/streaming-controller.js
export async function streamResponse(req, res) {
// Set SSE headers
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');

// CORS headers if needed
res.setHeader('Access-Control-Allow-Origin', '*');

// Function to send an event
const sendEvent = (data) => {
res.write(`data: ${JSON.stringify(data)}\n\n`);
// Flush the response stream (important for some environments)
if (res.flush) res.flush();
};

try {
// Get conversation context
const { contextId } = req.params;
const { message } = req.body;

// Initialize AI service
const aiService = new AIService();

// Start streaming response
const stream = await aiService.generateStreamingResponse(message, contextId);

// Listen for chunks and send them as events
stream.on('data', (chunk) => {
sendEvent({
type: 'chunk',
content: chunk.content,
});
});

// Handle completion
stream.on('end', () => {
sendEvent({
type: 'done',
});
res.end();
});

// Handle errors
stream.on('error', (error) => {
sendEvent({
type: 'error',
error: error.message,
});
res.end();
});

// Handle client disconnect
req.on('close', () => {
stream.destroy();
});
} catch (error) {
sendEvent({
type: 'error',
error: error.message,
});
res.end();
}
}

Configuring Routes

Add the streaming endpoint to your routes:

// src/routes/streaming-routes.js
import express from 'express';
import { streamResponse } from '../controllers/streaming-controller';
import { authenticateUser } from '../middleware/auth';

const router = express.Router();

// Apply authentication middleware
router.use(authenticateUser);

// Streaming endpoint
router.post('/conversations/:contextId/stream', streamResponse);

export default router;

Register the routes in your main application:

// src/index.js
import express from 'express';
import streamingRoutes from './routes/streaming-routes';

const app = express();
app.use(express.json());

// Register streaming routes
app.use('/api', streamingRoutes);

// Start server
app.listen(3000, () => {
console.log('Server running on port 3000');
});

Integrating with AI Services

OpenAI Integration

Here's an example of integrating with OpenAI's streaming API:

// src/services/ai-service.js
import { OpenAI } from 'openai';
import { EventEmitter } from 'events';

export class AIService {
constructor() {
this.openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
});
}

async generateStreamingResponse(message, contextId) {
// Create an event emitter to handle the stream
const eventEmitter = new EventEmitter();

try {
// Get conversation history from your storage
const conversationHistory = await this.getConversationHistory(contextId);

// Prepare messages for OpenAI
const messages = [
{ role: 'system', content: 'You are a helpful assistant.' },
...conversationHistory,
{ role: 'user', content: message },
];

// Request streaming response from OpenAI
const stream = await this.openai.chat.completions.create({
model: 'gpt-4',
messages,
stream: true,
});

// Process the stream
let accumulatedContent = '';

// Handle OpenAI stream
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || '';
if (content) {
accumulatedContent += content;
eventEmitter.emit('data', { content });
}
}

// Save the complete response to your storage
await this.saveResponse(contextId, accumulatedContent);

// Signal completion
eventEmitter.emit('end');
} catch (error) {
// Handle errors
eventEmitter.emit('error', error);
}

return eventEmitter;
}

async getConversationHistory(contextId) {
// Implement this method to retrieve conversation history
// from your database or storage
return [];
}

async saveResponse(contextId, content) {
// Implement this method to save the response
// to your database or storage
}
}

Anthropic Integration

For Anthropic's Claude model:

import Anthropic from '@anthropic-ai/sdk';

// In the AIService class
async generateStreamingResponse(message, contextId) {
const eventEmitter = new EventEmitter();

try {
const anthropic = new Anthropic({
apiKey: process.env.ANTHROPIC_API_KEY,
});

const history = await this.getConversationHistory(contextId);

const stream = await anthropic.messages.create({
model: 'claude-2',
messages: [...history, { role: 'user', content: message }],
max_tokens: 1000,
stream: true,
});

let fullContent = '';

for await (const chunk of stream) {
if (chunk.type === 'content_block_delta' && chunk.delta.text) {
fullContent += chunk.delta.text;
eventEmitter.emit('data', { content: chunk.delta.text });
}
}

await this.saveResponse(contextId, fullContent);
eventEmitter.emit('end');
} catch (error) {
eventEmitter.emit('error', error);
}

return eventEmitter;
}

Client-Side Implementation

Browser Implementation

Here's how to consume SSE streams in a browser application:

// client-side.js
async function streamConversation(message, contextId) {
// Create result container
const resultElement = document.getElementById('result');
resultElement.textContent = '';

try {
// Make the streaming request
const response = await fetch(`https://your-mcp-server.trmx.ai/api/conversations/${contextId}/stream`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${getToken()}`,
},
body: JSON.stringify({ message }),
});

// Create an SSE reader
const reader = response.body.getReader();
const decoder = new TextDecoder();

// Process the stream
while (true) {
const { done, value } = await reader.read();

if (done) break;

// Decode and parse the chunk
const chunk = decoder.decode(value, { stream: true });
const lines = chunk.split('\n\n');

for (const line of lines) {
if (line.startsWith('data: ')) {
try {
const event = JSON.parse(line.substring(6));

if (event.type === 'chunk') {
// Append content to the result
resultElement.textContent += event.content;
} else if (event.type === 'error') {
console.error('Stream error:', event.error);
resultElement.textContent += `\nError: ${event.error}`;
}
} catch (e) {
// Handle parsing errors
console.error('Error parsing SSE event:', e);
}
}
}
}
} catch (error) {
console.error('Stream request failed:', error);
resultElement.textContent = `Error: ${error.message}`;
}
}

// Helper function to get auth token
function getToken() {
return localStorage.getItem('authToken');
}

Node.js Client Implementation

For Node.js clients:

// node-client.js
import fetch from 'node-fetch';
import { EventSource } from 'eventsource';

async function streamConversation(message, contextId) {
const url = `https://your-mcp-server.trmx.ai/api/conversations/${contextId}/stream`;

const eventSource = new EventSource(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${getToken()}`,
},
body: JSON.stringify({ message }),
});

return new Promise((resolve, reject) => {
let fullResponse = '';

eventSource.onmessage = (event) => {
try {
const data = JSON.parse(event.data);

if (data.type === 'chunk') {
fullResponse += data.content;
process.stdout.write(data.content); // Print chunk to console
} else if (data.type === 'done') {
eventSource.close();
resolve(fullResponse);
} else if (data.type === 'error') {
eventSource.close();
reject(new Error(data.error));
}
} catch (error) {
eventSource.close();
reject(error);
}
};

eventSource.onerror = (error) => {
eventSource.close();
reject(error);
};
});
}

// Helper function to get auth token
function getToken() {
return process.env.AUTH_TOKEN;
}

Using the MCP SDK for Streaming

The TRMX AI SDK provides built-in support for streaming:

import { MCPClient } from '@trmx/client';

async function streamWithMCPClient() {
// Initialize the client
const client = new MCPClient({
serverUrl: 'https://your-mcp-server.trmx.ai',
apiKey: 'your-api-key',
});

// Create a streaming conversation
const conversation = client.createConversation({
contextId: 'user-123',
streaming: true, // Enable streaming
});

// Send a message with streaming response
const stream = await conversation.sendMessage('Generate a story about space exploration');

// Handle the stream
stream.on('data', (chunk) => {
// Process each chunk as it arrives
process.stdout.write(chunk.content);
});

stream.on('end', () => {
console.log('\n\nStream complete!');
});

stream.on('error', (error) => {
console.error('Stream error:', error);
});
}

Performance Considerations

When implementing streaming in production:

  1. Buffer Management: Implement proper buffer management to avoid memory issues
  2. Timeouts: Set appropriate timeouts for connections
  3. Rate Limiting: Apply rate limiting to prevent abuse
  4. Error Handling: Implement robust error handling and recovery
  5. Load Testing: Test your streaming implementation under load

Example: Real-time AI Assistant

Here's a complete example of implementing a real-time AI assistant with streaming:

// server.js
import express from 'express';
import cors from 'cors';
import { MCPServer } from '@trmx/server';
import { streamingController } from './controllers';

const app = express();

// Configure middleware
app.use(cors());
app.use(express.json());

// Create MCP server with streaming support
const mcpServer = new MCPServer({
app,
port: 3000,
streaming: true, // Enable streaming support
});

// Configure streaming endpoint
app.post('/api/chat/stream', streamingController.streamResponse);

// Start the server
mcpServer.start().then(() => {
console.log('MCP server with streaming support running on port 3000');
});
// client.js
import { MCPClient } from '@trmx/client';

class ChatUI {
constructor() {
this.client = new MCPClient({
serverUrl: 'http://localhost:3000',
});

this.conversation = this.client.createConversation({
contextId: 'chat-ui-demo',
streaming: true,
});

this.setupEventListeners();
}

setupEventListeners() {
const form = document.getElementById('chat-form');
const input = document.getElementById('chat-input');
const output = document.getElementById('chat-output');

form.addEventListener('submit', async (e) => {
e.preventDefault();
const message = input.value.trim();
if (!message) return;

// Add user message to the UI
this.appendMessage('user', message);
input.value = '';

// Create assistant response container
const responseId = Date.now();
this.appendMessage('assistant', '', responseId);

try {
// Send message and get streaming response
const stream = await this.conversation.sendMessage(message);
let fullResponse = '';

// Process the stream
stream.on('data', (chunk) => {
fullResponse += chunk.content;
this.updateMessage(responseId, fullResponse);
});

stream.on('end', () => {
console.log('Stream complete');
});

stream.on('error', (error) => {
console.error('Stream error:', error);
this.updateMessage(responseId, `Error: ${error.message}`);
});
} catch (error) {
console.error('Error sending message:', error);
this.appendMessage('system', `Error: ${error.message}`);
}
});
}

appendMessage(role, content, id = null) {
const output = document.getElementById('chat-output');
const messageEl = document.createElement('div');
messageEl.className = `message ${role}`;
if (id) messageEl.id = `message-${id}`;

const contentEl = document.createElement('div');
contentEl.className = 'content';
contentEl.textContent = content;

messageEl.appendChild(contentEl);
output.appendChild(messageEl);
output.scrollTop = output.scrollHeight;
}

updateMessage(id, content) {
const messageEl = document.getElementById(`message-${id}`);
if (messageEl) {
const contentEl = messageEl.querySelector('.content');
contentEl.textContent = content;
const output = document.getElementById('chat-output');
output.scrollTop = output.scrollHeight;
}
}
}

// Initialize chat UI
document.addEventListener('DOMContentLoaded', () => {
new ChatUI();
});

Next Steps