> ## Documentation Index
> Fetch the complete documentation index at: https://docs.duohub.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Use with Pipecat

> Integrate with the Pipecat framework from Daily.co for creating conversational voice AI agents with duohub memory

# Integrating duohub memory with Pipecat

Learn how to enhance your Pipecat conversational AI agents with duohub memory capabilities for more contextual and persistent conversations.

<CardGroup cols={1}>
  <Card title="Examples GitHub Repository" icon="github" href="https://github.com/duohub-ai/examples/tree/main/pipecat">
    See how to create a voice AI bot with Pipecat and duohub memory on GitHub
  </Card>
</CardGroup>

## Overview

Pipecat is a framework from Daily.co for building voice AI agents. By integrating duohub memory, your agents can maintain context across conversations and access relevant information from past interactions.

## Prerequisites

```bash theme={null}
# Install Poetry if you haven't already
curl -sSL https://install.python-poetry.org | python3 -

# Initialize a new Poetry project (if starting from scratch)
poetry init

# Add required dependencies
poetry add pipecat duohub openai cartesia
```

## Window Class Setup

First, create a Window class to manage conversation context and memory:

```python theme={null}
from duohub import Duohub
from openai.types.chat import ChatCompletionMessageParam
from typing import List

class Window:
    def __init__(
        self,
        messages: List[ChatCompletionMessageParam] | None = None,
        memory_id: str | None = None,
        api_key: str | None = None,
        system_prompt: str = "You are a helpful assistant."
    ):
        self.api_key = api_key
        self.system_message = {
            "role": "system",
            "content": system_prompt
        }
        self.messages = messages or []
        self.duohub_client = Duohub(api_key=self.api_key)
        self.memory_id = memory_id

    def add_message(self, message: ChatCompletionMessageParam):
        # Add message to conversation history
        self.messages.append(message)

        # Query duohub when user speaks
        if message['role'] == 'user' and self.memory_id:
            duohub_response = self.duohub_client.query(
                query=message['content'],
                memoryID=self.memory_id,
                assisted=True
            )

            # Add retrieved context to conversation
            if duohub_response and isinstance(duohub_response, dict) and 'payload' in duohub_response:
                context_message = {
                    "role": "system",
                    "content": f"Context from graph: {duohub_response['payload']}"
                }
                self.messages.append(context_message)

    def get_messages(self) -> List[ChatCompletionMessageParam]:
        messages = [self.system_message]
        history_messages = self.messages[-10:] if len(self.messages) > 10 else self.messages
        messages.extend(history_messages)
        return messages
```

## Integrating with Pipecat Bot

Here's how to set up your bot with the Window class:

```python theme={null}
async def main():
    async with aiohttp.ClientSession() as session:
        # Configure transport
        (room_url, token) = await configure(session)

        transport = DailyTransport(
            room_url,
            token,
            "Chatbot",
            DailyParams(
                audio_out_enabled=True,
                camera_out_enabled=True,
                vad_enabled=True,
                transcription_enabled=True
            ),
        )

        # Initialize conversation context with duohub memory
        initial_messages = [
            {
                "role": "system",
                "content": "You are Chatbot, a friendly, helpful robot..."
            }
        ]

        context = Window(
            messages=initial_messages,
            memory_id='your-memory-id',  # Replace with your duohub memory ID
            api_key=os.getenv("DUOHUB_API_KEY")
        )

        # Initialize services
        tts = CartesiaTTSService(
            api_key=os.getenv("CARTESIA_API_KEY"),
            voice_id="your-voice-id"
        )

        llm = OpenAILLMService(
            api_key=os.getenv("OPENAI_API_KEY"),
            model="gpt-4"
        )

        # Create context aggregator
        context_aggregator = llm.create_context_aggregator(context)

        # Build pipeline
        pipeline = Pipeline([
            transport.input(),
            context_aggregator.user(),
            llm,
            tts,
            transport.output(),
            context_aggregator.assistant(),
        ])

        task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

        # Handle new participants
        @transport.event_handler("on_first_participant_joined")
        async def on_first_participant_joined(transport, participant):
            await transport.capture_participant_transcription(participant["id"])
            await task.queue_frames([LLMMessagesFrame(initial_messages)])

        # Run the pipeline
        runner = PipelineRunner()
        await runner.run(task)

if __name__ == "__main__":
    asyncio.run(main())
```

## Key Components Explained

### Window Class

* Manages conversation history
* Integrates with duohub client
* Retrieves relevant context for each user message
* Maintains conversation window size (latest 10 messages)

### Context Aggregator

* Handles message flow between the Window and pipeline
* Ensures proper message ordering
* Manages user and assistant message handling

### Pipeline Configuration

The pipeline is set up with the following flow:

1. Input from Daily.co transport
2. User context processing
3. LLM processing
4. Text-to-speech conversion
5. Output to transport
6. Assistant context processing

## Memory ID Configuration

You can configure your duohub memory ID in several ways:

1. **Environment Variable**:

```bash theme={null}
export DUOHUB_MEMORY_ID=your-memory-id
```

2. **Direct Assignment**:

```python theme={null}
context = Window(
    messages=initial_messages,
    memory_id="your-memory-id",
    api_key=os.getenv("DUOHUB_API_KEY")
)
```

## Best Practices

1. **Memory Management**

   * Keep memory IDs organized by conversation topic or user
   * Regularly clean up unused memories
   * Monitor memory usage and implement retention policies

2. **Context Retrieval**

   * Implement retry logic for duohub queries
   * Consider rate limiting for large conversation volumes
   * Cache frequently accessed context

3. **Error Handling**

```python theme={null}
def add_message(self, message: ChatCompletionMessageParam):
    try:
        self.messages.append(message)
        if message['role'] == 'user' and self.memory_id:
            duohub_response = self.duohub_client.query(
                query=message['content'],
                memoryID=self.memory_id,
                assisted=True
            )
            # Process response...
    except Exception as e:
        logger.error(f"Error processing message: {e}")
        # Implement appropriate fallback behavior
```

4. **Performance Optimization**
   * Implement message pruning for long conversations
   * Use async/await for network operations
   * Consider batching for multiple context queries

## Debugging Tips

1. Enable debug logging:

```python theme={null}
import logging
logging.basicConfig(level=logging.DEBUG)
```

2. Monitor duohub responses:

```python theme={null}
logger.debug(f"duohub response: {duohub_response}")
```

3. Track message flow:

```python theme={null}
logger.debug(f"Message added: {message['role']} - {message['content'][:50]}...")
```

## Common Issues and Solutions

1. **Missing Context**

   * Verify memory ID is correct
   * Check duohub API key permissions
   * Ensure queries are properly formatted

2. **Performance Issues**

   * Implement caching for frequent queries
   * Optimize message window size
   * Use connection pooling for API calls

3. **Memory Usage**
   * Implement message cleanup
   * Monitor context size
   * Use streaming for large responses
