WebSocket Integration

AgenticFleet provides WebSocket support for real-time communication between agents and clients.

Setup

Install the required dependencies:

pip install agentic-fleet[websockets]

Basic Usage

Create a WebSocket server:

from agentic_fleet import Agent
from agentic_fleet.integrations.websockets import WSServer

# Create agent
agent = Agent(
    name="assistant",
    model="gpt-4"
)

# Create server
server = WSServer(agent)

# Start server
await server.start("localhost", 8765)

Client Connection

import asyncio
import websockets

async def connect():
    uri = "ws://localhost:8765"
    async with websockets.connect(uri) as websocket:
        # Send message
        await websocket.send("Hello!")
        
        # Receive response
        response = await websocket.recv()
        print(f"Response: {response}")

asyncio.run(connect())

Message Handling

Server-side

from agentic_fleet.integrations.websockets import (
    WSMessage,
    WSHandler
)

class CustomHandler(WSHandler):
    async def handle_message(
        self,
        message: WSMessage,
        websocket
    ):
        # Process message
        response = await self.agent.process(
            message.content
        )
        
        # Send response
        await websocket.send(response)

# Use custom handler
server = WSServer(
    agent,
    handler=CustomHandler()
)

Client-side

class WSClient:
    def __init__(self, uri: str):
        self.uri = uri
    
    async def connect(self):
        self.ws = await websockets.connect(self.uri)
    
    async def send(self, message: str):
        await self.ws.send(message)
    
    async def receive(self):
        return await self.ws.recv()
    
    async def close(self):
        await self.ws.close()

Broadcasting

# Server-side broadcasting
await server.broadcast(
    message="System update starting..."
)

# Send to specific clients
await server.send_to(
    client_id="user123",
    message="Private message"
)

Authentication

from agentic_fleet.integrations.websockets import WSAuth

class CustomAuth(WSAuth):
    async def authenticate(
        self,
        websocket
    ) -> bool:
        token = websocket.headers.get("Authorization")
        return await self.verify_token(token)

# Use authentication
server = WSServer(
    agent,
    auth=CustomAuth()
)

Connection Management

from agentic_fleet.integrations.websockets import WSManager

# Create connection manager
manager = WSManager()

# Handle connections
@server.on_connect
async def handle_connect(websocket):
    await manager.connect(websocket)
    
@server.on_disconnect
async def handle_disconnect(websocket):
    await manager.disconnect(websocket)

Error Handling

from agentic_fleet.exceptions import WSError

try:
    await server.start()
except WSError as e:
    print(f"WebSocket error: {e}")

# Client-side error handling
try:
    async with websockets.connect(uri) as ws:
        await ws.send("message")
except websockets.exceptions.WebSocketException as e:
    print(f"Connection error: {e}")

Advanced Features

Heartbeat

# Configure heartbeat
server = WSServer(
    agent,
    heartbeat_interval=30
)

# Client-side heartbeat
async def maintain_connection():
    while True:
        await websocket.ping()
        await asyncio.sleep(30)

SSL/TLS Support

import ssl

# Create SSL context
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ssl_context.load_cert_chain(
    "cert.pem",
    "key.pem"
)

# Start secure server
server = WSServer(
    agent,
    ssl=ssl_context
)

Best Practices

  1. Implement proper authentication
  2. Handle connection errors
  3. Use heartbeat for stability
  4. Implement SSL/TLS
  5. Manage connections properly
  6. Handle disconnections gracefully
  7. Monitor connection health
  8. Implement retry logic
  9. Document message formats