Přeskočit na obsah
_CORE
AI & Agentic Systems Core Informační Systémy Cloud & Platform Engineering Data Platforma & Integrace Security & Compliance QA, Testing & Observability IoT, Automatizace & Robotika Mobile & Digital Banky & Finance Pojišťovnictví Veřejná správa Obrana & Bezpečnost Zdravotnictví Energetika & Utility Telco & Média Průmysl & Výroba Logistika & E-commerce Retail & Loyalty
Reference Technologie Blog Knowledge Base O nás Spolupráce Kariéra
Pojďme to probrat

Multi-Agent orchestrace

01. 01. 2024 4 min čtení intermediate

Multi-agent orchestrace představuje pokročilou metodu koordinace několika AI agentů založených na velkých jazykových modelech (LLM). Tato technologie umožňuje vytvářet komplexní systémy, kde různí specializovaní agenti spolupracují na řešení složitých úloh.

Co je Multi-Agent orchestrace

Multi-Agent orchestrace představuje koordinaci několika AI agentů, kteří spolupracují na řešení komplexních úloh. Každý agent má specializované schopnosti a společně tvoří inteligentní systém schopný zpracovat úkoly, které by jednotlivý agent nezvládl efektivně.

Klíčové výhody tohoto přístupu zahrnují modularitu, škálovatelnost a možnost paralelního zpracování. Místo jednoho monolitického agenta máme ekosystém specializovaných komponent, které mohou být nezávisle vyvíjeny a optimalizovány.

Architektury orchestrace

Centralizovaná orchestrace

V centralizovaném modelu existuje hlavní orchestrátor, který koordinuje práci všech agentů. Tento přístup je jednodušší na implementaci a poskytuje lepší kontrolu nad celým workflow.

class CentralOrchestrator:
    def __init__(self):
        self.agents = {
            'analyzer': DataAnalyzerAgent(),
            'processor': DataProcessorAgent(),
            'formatter': OutputFormatterAgent()
        }

    async def execute_workflow(self, task):
        # 1. Analýza dat
        analysis = await self.agents['analyzer'].analyze(task.data)

        # 2. Zpracování na základě analýzy
        processed = await self.agents['processor'].process(
            task.data, analysis.parameters
        )

        # 3. Formátování výstupu
        return await self.agents['formatter'].format(processed)

Decentralizovaná orchestrace

Decentralizovaný model umožňuje agentům komunikovat přímo mezi sebou pomocí message passing nebo event-driven architektury. Každý agent má vlastní rozhodovací logiku o tom, kdy a s kým komunikovat.

class DecentralizedAgent:
    def __init__(self, agent_id, message_bus):
        self.id = agent_id
        self.message_bus = message_bus
        self.capabilities = set()

    async def handle_message(self, message):
        if self.can_handle(message.task_type):
            result = await self.process(message.payload)

            # Najdi dalšího vhodného agenta
            next_agent = self.find_next_agent(message.workflow)
            if next_agent:
                await self.message_bus.send(next_agent, result)
        else:
            # Přeposlat jinému agentovi
            suitable_agent = self.find_suitable_agent(message.task_type)
            await self.message_bus.forward(suitable_agent, message)

Implementace s využitím LangChain

LangChain poskytuje robustní framework pro tvorbu multi-agent systémů. Zde je praktický příklad koordinace agentů pro analýzu a zpracování dokumentů:

from langchain.agents import AgentExecutor, create_openai_functions_agent
from langchain.tools import Tool
from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI

class DocumentAnalysisOrchestrator:
    def __init__(self):
        self.llm = ChatOpenAI(temperature=0)
        self.setup_agents()

    def setup_agents(self):
        # Specialized document reader agent
        self.reader_agent = self.create_reader_agent()

        # Content analyzer agent  
        self.analyzer_agent = self.create_analyzer_agent()

        # Report generator agent
        self.generator_agent = self.create_generator_agent()

    def create_reader_agent(self):
        def read_document(file_path: str) -> str:
            """Extract text content from document"""
            # Implementation for various file types
            return extracted_text

        reader_tool = Tool(
            name="document_reader",
            description="Reads and extracts text from documents",
            func=read_document
        )

        return create_openai_functions_agent(
            self.llm, [reader_tool], 
            "You are specialized in reading and extracting document content."
        )

    async def orchestrate_analysis(self, document_path: str):
        # Step 1: Document reading
        content = await self.reader_agent.ainvoke({
            "input": f"Read document: {document_path}"
        })

        # Step 2: Content analysis
        analysis = await self.analyzer_agent.ainvoke({
            "input": f"Analyze this content: {content['output']}"
        })

        # Step 3: Report generation
        report = await self.generator_agent.ainvoke({
            "input": f"Generate report from: {analysis['output']}"
        })

        return report['output']

Komunikační vzory mezi agenty

Request-Response pattern

Nejjednodušší vzor, kde jeden agent vyšle požadavek a čeká na odpověď. Vhodný pro synchronní operace s jasně definovaným vstupem a výstupem.

class RequestResponseCommunication:
    async def request(self, target_agent, request_data, timeout=30):
        message = {
            'id': self.generate_message_id(),
            'sender': self.agent_id,
            'target': target_agent,
            'payload': request_data,
            'timestamp': time.time()
        }

        response = await asyncio.wait_for(
            self.send_and_wait(message), 
            timeout=timeout
        )

        return response.payload

Event-driven communication

Agenti publikují události a ostatní se přihlašují k odběru relevantních událostí. Umožňuje asynchronní a loosely-coupled komunikaci.

class EventDrivenAgent:
    def __init__(self, event_bus):
        self.event_bus = event_bus
        self.subscriptions = {}

    def subscribe(self, event_type, handler):
        if event_type not in self.subscriptions:
            self.subscriptions[event_type] = []
        self.subscriptions[event_type].append(handler)

    async def publish_event(self, event_type, data):
        event = {
            'type': event_type,
            'data': data,
            'timestamp': time.time(),
            'sender': self.agent_id
        }

        await self.event_bus.publish(event)

    async def handle_event(self, event):
        if event.type in self.subscriptions:
            for handler in self.subscriptions[event.type]:
                await handler(event.data)

Správa stavu a koordinace

V multi-agent systémech je kritické udržovat konzistentní stav a koordinovat přístup ke sdíleným zdrojům. Redis nebo podobné in-memory databáze jsou často využívány pro sdílení stavu mezi agenty.

import redis.asyncio as redis
import json

class SharedState:
    def __init__(self, redis_url="redis://localhost:6379"):
        self.redis = redis.from_url(redis_url)

    async def set_workflow_state(self, workflow_id, state):
        await self.redis.hset(
            f"workflow:{workflow_id}", 
            "state", 
            json.dumps(state)
        )

    async def get_workflow_state(self, workflow_id):
        state_data = await self.redis.hget(
            f"workflow:{workflow_id}", 
            "state"
        )
        return json.loads(state_data) if state_data else None

    async def acquire_lock(self, resource_id, timeout=10):
        lock_key = f"lock:{resource_id}"
        acquired = await self.redis.set(
            lock_key, 
            self.agent_id, 
            nx=True, 
            ex=timeout
        )
        return acquired

Monitoring a debugování

Multi-agent systémy mohou být složité na debugging. Je důležité implementovat comprehensive logging a metrics collection:

import structlog
from opentelemetry import trace

class AgentMonitoring:
    def __init__(self, agent_id):
        self.logger = structlog.get_logger()
        self.tracer = trace.get_tracer(__name__)
        self.agent_id = agent_id

    async def log_agent_interaction(self, interaction_type, target_agent, payload):
        with self.tracer.start_as_current_span("agent_interaction") as span:
            span.set_attribute("agent.source", self.agent_id)
            span.set_attribute("agent.target", target_agent)
            span.set_attribute("interaction.type", interaction_type)

            self.logger.info(
                "Agent interaction",
                source_agent=self.agent_id,
                target_agent=target_agent,
                interaction_type=interaction_type,
                payload_size=len(str(payload))
            )

Praktické případy použití

Multi-agent orchestrace najde uplatnění v mnoha oblastech:

  • Document processing pipelines - jeden agent extrahuje text, druhý analyzuje sentiment, třetí generuje summary
  • Data ETL workflows - specializované agenty pro extraction, transformation a loading
  • Customer service automation - routing agent určí kategorii, specialized agent řeší konkrétní problém
  • Code review systems - různí agenti kontrolují style, security, performance a logic

Shrnutí

Multi-Agent orchestrace představuje mocný nástroj pro tvorbu škálovatelných AI systémů. Klíčem k úspěchu je správný návrh komunikačních vzorů, efektivní správa stavu a robustní monitoring. Při implementaci je důležité zvážit trade-offy mezi centralizovanou a decentralizovanou architekturou podle konkrétních požadavků projektu. S rostoucí složitostí AI aplikací se multi-agent přístupy stávají nezbytností pro udržení modularity a možnosti nezávislého vývoje jednotlivých komponent.

multi-agentorchestraceai agenti