Integrating duohub memory with Pipecat

Learn how to enhance your Pipecat conversational AI agents with duohub memory capabilities for more contextual and persistent conversations.

Overview

Pipecat is a framework from Daily.co for building voice AI agents. By integrating duohub memory, your agents can maintain context across conversations and access relevant information from past interactions.

Prerequisites

# Install Poetry if you haven't already
curl -sSL https://install.python-poetry.org | python3 -

# Initialize a new Poetry project (if starting from scratch)
poetry init

# Add required dependencies
poetry add pipecat duohub openai cartesia

Window Class Setup

First, create a Window class to manage conversation context and memory:

from duohub import Duohub
from openai.types.chat import ChatCompletionMessageParam
from typing import List

class Window:
    def __init__(
        self,
        messages: List[ChatCompletionMessageParam] | None = None,
        memory_id: str | None = None,
        api_key: str | None = None,
        system_prompt: str = "You are a helpful assistant."
    ):
        self.api_key = api_key
        self.system_message = {
            "role": "system",
            "content": system_prompt
        }
        self.messages = messages or []
        self.duohub_client = Duohub(api_key=self.api_key)
        self.memory_id = memory_id

    def add_message(self, message: ChatCompletionMessageParam):
        # Add message to conversation history
        self.messages.append(message)

        # Query duohub when user speaks
        if message['role'] == 'user' and self.memory_id:
            duohub_response = self.duohub_client.query(
                query=message['content'],
                memoryID=self.memory_id,
                assisted=True
            )

            # Add retrieved context to conversation
            if duohub_response and isinstance(duohub_response, dict) and 'payload' in duohub_response:
                context_message = {
                    "role": "system",
                    "content": f"Context from graph: {duohub_response['payload']}"
                }
                self.messages.append(context_message)

    def get_messages(self) -> List[ChatCompletionMessageParam]:
        messages = [self.system_message]
        history_messages = self.messages[-10:] if len(self.messages) > 10 else self.messages
        messages.extend(history_messages)
        return messages

Integrating with Pipecat Bot

Here’s how to set up your bot with the Window class:

async def main():
    async with aiohttp.ClientSession() as session:
        # Configure transport
        (room_url, token) = await configure(session)

        transport = DailyTransport(
            room_url,
            token,
            "Chatbot",
            DailyParams(
                audio_out_enabled=True,
                camera_out_enabled=True,
                vad_enabled=True,
                transcription_enabled=True
            ),
        )

        # Initialize conversation context with duohub memory
        initial_messages = [
            {
                "role": "system",
                "content": "You are Chatbot, a friendly, helpful robot..."
            }
        ]

        context = Window(
            messages=initial_messages,
            memory_id='your-memory-id',  # Replace with your duohub memory ID
            api_key=os.getenv("DUOHUB_API_KEY")
        )

        # Initialize services
        tts = CartesiaTTSService(
            api_key=os.getenv("CARTESIA_API_KEY"),
            voice_id="your-voice-id"
        )

        llm = OpenAILLMService(
            api_key=os.getenv("OPENAI_API_KEY"),
            model="gpt-4"
        )

        # Create context aggregator
        context_aggregator = llm.create_context_aggregator(context)

        # Build pipeline
        pipeline = Pipeline([
            transport.input(),
            context_aggregator.user(),
            llm,
            tts,
            transport.output(),
            context_aggregator.assistant(),
        ])

        task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

        # Handle new participants
        @transport.event_handler("on_first_participant_joined")
        async def on_first_participant_joined(transport, participant):
            await transport.capture_participant_transcription(participant["id"])
            await task.queue_frames([LLMMessagesFrame(initial_messages)])

        # Run the pipeline
        runner = PipelineRunner()
        await runner.run(task)

if __name__ == "__main__":
    asyncio.run(main())

Key Components Explained

Window Class

  • Manages conversation history
  • Integrates with duohub client
  • Retrieves relevant context for each user message
  • Maintains conversation window size (latest 10 messages)

Context Aggregator

  • Handles message flow between the Window and pipeline
  • Ensures proper message ordering
  • Manages user and assistant message handling

Pipeline Configuration

The pipeline is set up with the following flow:

  1. Input from Daily.co transport
  2. User context processing
  3. LLM processing
  4. Text-to-speech conversion
  5. Output to transport
  6. Assistant context processing

Memory ID Configuration

You can configure your duohub memory ID in several ways:

  1. Environment Variable:
export DUOHUB_MEMORY_ID=your-memory-id
  1. Direct Assignment:
context = Window(
    messages=initial_messages,
    memory_id="your-memory-id",
    api_key=os.getenv("DUOHUB_API_KEY")
)

Best Practices

  1. Memory Management

    • Keep memory IDs organized by conversation topic or user
    • Regularly clean up unused memories
    • Monitor memory usage and implement retention policies
  2. Context Retrieval

    • Implement retry logic for duohub queries
    • Consider rate limiting for large conversation volumes
    • Cache frequently accessed context
  3. Error Handling

def add_message(self, message: ChatCompletionMessageParam):
    try:
        self.messages.append(message)
        if message['role'] == 'user' and self.memory_id:
            duohub_response = self.duohub_client.query(
                query=message['content'],
                memoryID=self.memory_id,
                assisted=True
            )
            # Process response...
    except Exception as e:
        logger.error(f"Error processing message: {e}")
        # Implement appropriate fallback behavior
  1. Performance Optimization
    • Implement message pruning for long conversations
    • Use async/await for network operations
    • Consider batching for multiple context queries

Debugging Tips

  1. Enable debug logging:
import logging
logging.basicConfig(level=logging.DEBUG)
  1. Monitor duohub responses:
logger.debug(f"duohub response: {duohub_response}")
  1. Track message flow:
logger.debug(f"Message added: {message['role']} - {message['content'][:50]}...")

Common Issues and Solutions

  1. Missing Context

    • Verify memory ID is correct
    • Check duohub API key permissions
    • Ensure queries are properly formatted
  2. Performance Issues

    • Implement caching for frequent queries
    • Optimize message window size
    • Use connection pooling for API calls
  3. Memory Usage

    • Implement message cleanup
    • Monitor context size
    • Use streaming for large responses