Skip to main content

Integrating duohub memory with Pipecat

Learn how to enhance your Pipecat conversational AI agents with duohub memory capabilities for more contextual and persistent conversations.

Overview

Pipecat is a framework from Daily.co for building voice AI agents. By integrating duohub memory, your agents can maintain context across conversations and access relevant information from past interactions.

Prerequisites

# Install Poetry if you haven't already
curl -sSL https://install.python-poetry.org | python3 -

# Initialize a new Poetry project (if starting from scratch)
poetry init

# Add required dependencies
poetry add pipecat duohub openai cartesia

Window Class Setup

First, create a Window class to manage conversation context and memory:
from duohub import Duohub
from openai.types.chat import ChatCompletionMessageParam
from typing import List

class Window:
    def __init__(
        self,
        messages: List[ChatCompletionMessageParam] | None = None,
        memory_id: str | None = None,
        api_key: str | None = None,
        system_prompt: str = "You are a helpful assistant."
    ):
        self.api_key = api_key
        self.system_message = {
            "role": "system",
            "content": system_prompt
        }
        self.messages = messages or []
        self.duohub_client = Duohub(api_key=self.api_key)
        self.memory_id = memory_id

    def add_message(self, message: ChatCompletionMessageParam):
        # Add message to conversation history
        self.messages.append(message)

        # Query duohub when user speaks
        if message['role'] == 'user' and self.memory_id:
            duohub_response = self.duohub_client.query(
                query=message['content'],
                memoryID=self.memory_id,
                assisted=True
            )

            # Add retrieved context to conversation
            if duohub_response and isinstance(duohub_response, dict) and 'payload' in duohub_response:
                context_message = {
                    "role": "system",
                    "content": f"Context from graph: {duohub_response['payload']}"
                }
                self.messages.append(context_message)

    def get_messages(self) -> List[ChatCompletionMessageParam]:
        messages = [self.system_message]
        history_messages = self.messages[-10:] if len(self.messages) > 10 else self.messages
        messages.extend(history_messages)
        return messages

Integrating with Pipecat Bot

Here’s how to set up your bot with the Window class:
async def main():
    async with aiohttp.ClientSession() as session:
        # Configure transport
        (room_url, token) = await configure(session)

        transport = DailyTransport(
            room_url,
            token,
            "Chatbot",
            DailyParams(
                audio_out_enabled=True,
                camera_out_enabled=True,
                vad_enabled=True,
                transcription_enabled=True
            ),
        )

        # Initialize conversation context with duohub memory
        initial_messages = [
            {
                "role": "system",
                "content": "You are Chatbot, a friendly, helpful robot..."
            }
        ]

        context = Window(
            messages=initial_messages,
            memory_id='your-memory-id',  # Replace with your duohub memory ID
            api_key=os.getenv("DUOHUB_API_KEY")
        )

        # Initialize services
        tts = CartesiaTTSService(
            api_key=os.getenv("CARTESIA_API_KEY"),
            voice_id="your-voice-id"
        )

        llm = OpenAILLMService(
            api_key=os.getenv("OPENAI_API_KEY"),
            model="gpt-4"
        )

        # Create context aggregator
        context_aggregator = llm.create_context_aggregator(context)

        # Build pipeline
        pipeline = Pipeline([
            transport.input(),
            context_aggregator.user(),
            llm,
            tts,
            transport.output(),
            context_aggregator.assistant(),
        ])

        task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

        # Handle new participants
        @transport.event_handler("on_first_participant_joined")
        async def on_first_participant_joined(transport, participant):
            await transport.capture_participant_transcription(participant["id"])
            await task.queue_frames([LLMMessagesFrame(initial_messages)])

        # Run the pipeline
        runner = PipelineRunner()
        await runner.run(task)

if __name__ == "__main__":
    asyncio.run(main())

Key Components Explained

Window Class

  • Manages conversation history
  • Integrates with duohub client
  • Retrieves relevant context for each user message
  • Maintains conversation window size (latest 10 messages)

Context Aggregator

  • Handles message flow between the Window and pipeline
  • Ensures proper message ordering
  • Manages user and assistant message handling

Pipeline Configuration

The pipeline is set up with the following flow:
  1. Input from Daily.co transport
  2. User context processing
  3. LLM processing
  4. Text-to-speech conversion
  5. Output to transport
  6. Assistant context processing

Memory ID Configuration

You can configure your duohub memory ID in several ways:
  1. Environment Variable:
export DUOHUB_MEMORY_ID=your-memory-id
  1. Direct Assignment:
context = Window(
    messages=initial_messages,
    memory_id="your-memory-id",
    api_key=os.getenv("DUOHUB_API_KEY")
)

Best Practices

  1. Memory Management
    • Keep memory IDs organized by conversation topic or user
    • Regularly clean up unused memories
    • Monitor memory usage and implement retention policies
  2. Context Retrieval
    • Implement retry logic for duohub queries
    • Consider rate limiting for large conversation volumes
    • Cache frequently accessed context
  3. Error Handling
def add_message(self, message: ChatCompletionMessageParam):
    try:
        self.messages.append(message)
        if message['role'] == 'user' and self.memory_id:
            duohub_response = self.duohub_client.query(
                query=message['content'],
                memoryID=self.memory_id,
                assisted=True
            )
            # Process response...
    except Exception as e:
        logger.error(f"Error processing message: {e}")
        # Implement appropriate fallback behavior
  1. Performance Optimization
    • Implement message pruning for long conversations
    • Use async/await for network operations
    • Consider batching for multiple context queries

Debugging Tips

  1. Enable debug logging:
import logging
logging.basicConfig(level=logging.DEBUG)
  1. Monitor duohub responses:
logger.debug(f"duohub response: {duohub_response}")
  1. Track message flow:
logger.debug(f"Message added: {message['role']} - {message['content'][:50]}...")

Common Issues and Solutions

  1. Missing Context
    • Verify memory ID is correct
    • Check duohub API key permissions
    • Ensure queries are properly formatted
  2. Performance Issues
    • Implement caching for frequent queries
    • Optimize message window size
    • Use connection pooling for API calls
  3. Memory Usage
    • Implement message cleanup
    • Monitor context size
    • Use streaming for large responses
I