Integration Guide

Learn how to integrate AgenticFleet with your existing infrastructure.

Overview

AgenticFleet can be integrated with:

  • Web frameworks (FastAPI, Django, Flask)
  • Message brokers (RabbitMQ, Redis, Kafka)
  • Databases (PostgreSQL, MongoDB, Redis)
  • LLM providers (OpenAI, Anthropic, Google)
  • Monitoring systems (Prometheus, Grafana)

Web Framework Integration

FastAPI Integration

from fastapi import FastAPI, Depends
from agentic_fleet import FleetChat
from agentic_fleet.integrations.fastapi import (
    FleetMiddleware,
    get_current_fleet
)

app = FastAPI()
app.add_middleware(FleetMiddleware)

@app.post("/chat")
async def chat(
    message: str,
    fleet: FleetChat = Depends(get_current_fleet)
):
    response = await fleet.send_message(message)
    return response

Django Integration

# settings.py
INSTALLED_APPS = [
    'agentic_fleet.integrations.django',
]

AGENTIC_FLEET = {
    'API_KEY': 'your-api-key',
    'DEFAULT_MODEL': 'gpt-4',
}

# views.py
from agentic_fleet.integrations.django import FleetView

class ChatView(FleetView):
    async def handle_message(self, message):
        return await self.fleet.send_message(message)

Flask Integration

from flask import Flask
from agentic_fleet.integrations.flask import (
    init_fleet,
    fleet_required
)

app = Flask(__name__)
init_fleet(app)

@app.route('/chat', methods=['POST'])
@fleet_required
async def chat(fleet):
    message = request.json['message']
    return await fleet.send_message(message)

Message Broker Integration

RabbitMQ Integration

from agentic_fleet.integrations.rabbitmq import RabbitMQBroker

broker = RabbitMQBroker(
    url="amqp://localhost",
    queue="fleet_tasks"
)

# Producer
await broker.publish(
    routing_key="tasks",
    message={
        "type": "code_review",
        "content": "Review this PR"
    }
)

# Consumer
@broker.consume("tasks")
async def handle_task(message):
    task_type = message["type"]
    content = message["content"]
    await fleet.process_task(task_type, content)

Kafka Integration

from agentic_fleet.integrations.kafka import KafkaBroker

broker = KafkaBroker(
    bootstrap_servers="localhost:9092",
    topic="fleet_events"
)

# Producer
await broker.produce(
    topic="events",
    message={
        "type": "code_generated",
        "code": "def hello(): pass"
    }
)

# Consumer
@broker.consume("events")
async def handle_event(event):
    await fleet.handle_event(event)

Database Integration

PostgreSQL Integration

from agentic_fleet.integrations.postgres import PostgresStorage

storage = PostgresStorage(
    url="postgresql://user:pass@localhost/db"
)

# Store fleet state
await storage.save_fleet_state(
    fleet_id="fleet_123",
    state={
        "agents": ["architect", "developer"],
        "status": "active"
    }
)

# Query fleet state
state = await storage.get_fleet_state("fleet_123")

MongoDB Integration

from agentic_fleet.integrations.mongo import MongoStorage

storage = MongoStorage(
    url="mongodb://localhost",
    database="agentic_fleet"
)

# Store agent memory
await storage.save_memory(
    agent_id="agent_123",
    memory={
        "key": "project_info",
        "value": "API project"
    }
)

# Query memory
memory = await storage.get_memory("agent_123")

LLM Provider Integration

OpenAI Integration

from agentic_fleet.integrations.openai import OpenAIProvider

provider = OpenAIProvider(
    api_key="your-key",
    model="gpt-4"
)

# Generate response
response = await provider.generate(
    prompt="Design a REST API",
    temperature=0.7
)

Anthropic Integration

from agentic_fleet.integrations.anthropic import AnthropicProvider

provider = AnthropicProvider(
    api_key="your-key",
    model="claude-2"
)

# Generate response
response = await provider.generate(
    prompt="Analyze this code",
    max_tokens=1000
)

Monitoring Integration

Prometheus Integration

from agentic_fleet.integrations.prometheus import (
    PrometheusMetrics,
    FleetMetricsCollector
)

metrics = PrometheusMetrics()

# Add metrics
metrics.message_counter.inc()
metrics.response_time.observe(0.5)

# Custom collector
class FleetCollector(FleetMetricsCollector):
    def collect_metrics(self):
        yield self.gauge(
            'fleet_agents_total',
            'Total number of agents'
        )

Grafana Integration

from agentic_fleet.integrations.grafana import (
    GrafanaDashboard,
    FleetPanel
)

dashboard = GrafanaDashboard(
    title="Fleet Metrics",
    panels=[
        FleetPanel(
            title="Message Rate",
            query="rate(fleet_messages_total[5m])"
        ),
        FleetPanel(
            title="Response Time",
            query="histogram_quantile(0.95, fleet_response_time)"
        )
    ]
)

Best Practices

  1. Error Handling
try:
    await fleet.process_message(message)
except IntegrationError as e:
    logger.error(f"Integration error: {e}")
    await notify_admin(e)
  1. Connection Management
async with ConnectionManager() as conn:
    await conn.execute_task(task)
  1. Retry Logic
from agentic_fleet.utils import retry_with_backoff

@retry_with_backoff(max_retries=3)
async def send_message(message):
    await broker.send(message)
  1. Health Checks
@app.get("/health")
async def health_check():
    checks = {
        "database": await check_db(),
        "broker": await check_broker(),
        "llm": await check_llm()
    }
    return checks

Security Considerations

  1. API Security

    • Use HTTPS
    • Implement rate limiting
    • Validate inputs
  2. Data Security

    • Encrypt sensitive data
    • Regular backups
    • Access control
  3. Monitoring

    • Log security events
    • Set up alerts
    • Regular audits