Fleet Chat

Fleet Chat is the core communication system that enables agents to collaborate and coordinate within AgenticFleet. It provides structured message passing, state management, and coordination patterns.

Chat Architecture

Core Components

Message System

1. Message Types

class ChatMessageType(str, Enum):
    # Basic message types
    TEXT = "text"
    CODE = "code"
    FUNCTION = "function"
    TOOL = "tool"
    
    # Control messages
    SYSTEM = "system"
    STATUS = "status"
    ERROR = "error"
    
    # Task-specific
    TASK = "task"
    RESULT = "result"
    FEEDBACK = "feedback"

2. Message Format

@dataclass
class ChatMessage:
    id: str
    type: ChatMessageType
    content: Any
    sender: str
    receiver: str
    timestamp: datetime
    metadata: Dict[str, Any]
    thread_id: Optional[str] = None
    parent_id: Optional[str] = None
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            "id": self.id,
            "type": self.type.value,
            "content": self.content,
            "sender": self.sender,
            "receiver": self.receiver,
            "timestamp": self.timestamp.isoformat(),
            "metadata": self.metadata,
            "thread_id": self.thread_id,
            "parent_id": self.parent_id
        }

Chat Patterns

1. Sequential Chat

async def sequential_chat(
    fleet: Fleet,
    agents: List[Agent],
    initial_message: str
) -> ChatHistory:
    history = ChatHistory()
    current_message = initial_message
    
    for agent in agents:
        # Process message
        response = await agent.process_message(current_message)
        
        # Record in history
        history.add_message(
            sender=agent.name,
            content=response,
            type=ChatMessageType.TEXT
        )
        
        # Update for next agent
        current_message = response
        
    return history

2. Swarm Chat

async def swarm_chat(
    fleet: Fleet,
    agents: List[Agent],
    task: Task
) -> List[ChatMessage]:
    # Create message queue
    queue = asyncio.Queue()
    
    # Initialize with task
    await queue.put(ChatMessage(
        type=ChatMessageType.TASK,
        content=task,
        sender="system"
    ))
    
    # Process in parallel
    async def process_agent(agent: Agent):
        while True:
            msg = await queue.get()
            if msg.type == ChatMessageType.TASK:
                result = await agent.process_task(msg.content)
                await queue.put(ChatMessage(
                    type=ChatMessageType.RESULT,
                    content=result,
                    sender=agent.name
                ))
    
    # Start all agents
    tasks = [
        asyncio.create_task(process_agent(agent))
        for agent in agents
    ]
    
    # Wait for completion
    results = await asyncio.gather(*tasks)
    return results

3. Routed Chat

class RoutedChat:
    def __init__(
        self,
        fleet: Fleet,
        routing_rules: Dict[str, List[str]]
    ):
        self.fleet = fleet
        self.rules = routing_rules
        self.state = {}
        
    async def route_message(
        self,
        message: ChatMessage
    ) -> Optional[str]:
        """Route message to next agent based on content."""
        content_type = self.analyze_content(message.content)
        next_agents = self.rules.get(content_type, [])
        
        if not next_agents:
            return None
            
        # Select best agent
        return self.select_agent(next_agents, message)
        
    def analyze_content(
        self,
        content: str
    ) -> str:
        """Analyze message content to determine type."""
        if "review" in content.lower():
            return "code_review"
        elif "implement" in content.lower():
            return "implementation"
        return "general"

State Management

1. Chat State

@dataclass
class ChatState:
    fleet_id: str
    active_agents: Set[str]
    message_count: int
    start_time: datetime
    last_message_time: datetime
    metadata: Dict[str, Any]
    
    def is_active(self) -> bool:
        """Check if chat is still active."""
        if not self.active_agents:
            return False
            
        # Check timeout
        timeout = timedelta(minutes=30)
        if datetime.now() - self.last_message_time > timeout:
            return False
            
        return True

2. History Management

class ChatHistory:
    def __init__(self):
        self.messages: List[ChatMessage] = []
        self.threads: Dict[str, List[ChatMessage]] = {}
        
    def add_message(
        self,
        message: ChatMessage
    ) -> None:
        """Add message to history."""
        self.messages.append(message)
        
        # Handle threading
        if message.thread_id:
            if message.thread_id not in self.threads:
                self.threads[message.thread_id] = []
            self.threads[message.thread_id].append(message)
            
    def get_thread(
        self,
        thread_id: str
    ) -> List[ChatMessage]:
        """Get all messages in a thread."""
        return self.threads.get(thread_id, [])

Chat API

The Chat API enables real-time communication with agents and fleets through a WebSocket interface.

Start Chat Session

Start a new chat session with an agent or fleet.

from agentic_fleet import Chat

chat = Chat.create(
    target_id="flt_789xyz",  # Fleet or agent ID
    target_type="fleet",     # "fleet" or "agent"
    config={
        "max_turns": 10,
        "temperature": 0.7
    }
)

WebSocket Connection

wss://api.agenticfleet.com/v1/chat

Connection Parameters

{
  "session_id": "cht_123abc",
  "target_id": "flt_789xyz",
  "target_type": "fleet",
  "auth_token": "your_auth_token"
}

Response

{
  "session_id": "cht_123abc",
  "status": "connected",
  "target": {
    "id": "flt_789xyz",
    "type": "fleet",
    "name": "code_development"
  }
}

Send Message

Send a message in an active chat session.

response = chat.send(
    message="Can you help me with code review?",
    context={
        "code_file": "main.py",
        "language": "python"
    }
)

WebSocket Message

{
  "type": "message",
  "session_id": "cht_123abc",
  "content": "Can you help me with code review?",
  "context": {
    "code_file": "main.py",
    "language": "python"
  }
}

Response

{
  "type": "response",
  "message_id": "msg_456def",
  "content": "I'd be happy to help with your code review...",
  "timestamp": "2025-02-24T15:30:00Z"
}

Stream Response

Get streaming responses from agents.

async for chunk in chat.stream_response():
    print(chunk.content)

WebSocket Stream

{
  "type": "stream",
  "message_id": "msg_456def",
  "chunk": "I'd be happy",
  "is_complete": false
}
{
  "type": "stream",
  "message_id": "msg_456def",
  "chunk": " to help with your code review...",
  "is_complete": true
}

End Chat Session

End an active chat session.

chat.end()

WebSocket Message

{
  "type": "end_session",
  "session_id": "cht_123abc"
}

Response

{
  "type": "session_ended",
  "session_id": "cht_123abc",
  "timestamp": "2025-02-24T16:00:00Z"
}

List Chat Sessions

Retrieve a list of chat sessions.

sessions = Chat.list(limit=10, offset=0)

HTTP Request

GET /v1/chat/sessions

Query Parameters

ParameterTypeDescription
limitintegerMaximum number of sessions to return
offsetintegerNumber of sessions to skip
statusstringFilter by session status

Response

{
  "sessions": [
    {
      "session_id": "cht_123abc",
      "target_id": "flt_789xyz",
      "target_type": "fleet",
      "status": "active",
      "created_at": "2025-02-24T15:30:00Z"
    }
  ],
  "total": 1,
  "has_more": false
}

Get Chat History

Retrieve chat history for a session.

history = chat.get_history(
    session_id="cht_123abc",
    limit=50
)

HTTP Request

GET /v1/chat/sessions/{session_id}/history

Query Parameters

ParameterTypeDescription
limitintegerMaximum number of messages to return
beforestringReturn messages before this timestamp
afterstringReturn messages after this timestamp

Response

{
  "messages": [
    {
      "message_id": "msg_456def",
      "type": "user",
      "content": "Can you help me with code review?",
      "timestamp": "2025-02-24T15:30:00Z"
    },
    {
      "message_id": "msg_789xyz",
      "type": "assistant",
      "content": "I'd be happy to help with your code review...",
      "timestamp": "2025-02-24T15:30:05Z"
    }
  ],
  "has_more": false
}

Best Practices

1. Message Handling

  • Use appropriate message types
  • Include necessary metadata
  • Implement proper error handling
  • Handle message timeouts

2. State Management

  • Track chat state properly
  • Implement cleanup for inactive chats
  • Handle agent disconnections
  • Monitor chat metrics

3. Performance

  • Use async/await for I/O operations
  • Implement message batching
  • Use proper indexing for history
  • Cache frequently accessed data

4. Security

  • Validate all messages
  • Implement rate limiting
  • Handle sensitive data properly
  • Audit message history

Next Steps