Integrating duohub memory with Pipecat
Learn how to enhance your Pipecat conversational AI agents with duohub memory capabilities for more contextual and persistent conversations.
Overview
Pipecat is a framework from Daily.co for building voice AI agents. By integrating duohub memory, your agents can maintain context across conversations and access relevant information from past interactions.
Prerequisites
# Install Poetry if you haven't already
curl -sSL https://install.python-poetry.org | python3 -
# Initialize a new Poetry project (if starting from scratch)
poetry init
# Add required dependencies
poetry add pipecat duohub openai cartesia
Window Class Setup
First, create a Window class to manage conversation context and memory:
from duohub import Duohub
from openai.types.chat import ChatCompletionMessageParam
from typing import List
class Window:
def __init__(
self,
messages: List[ChatCompletionMessageParam] | None = None,
memory_id: str | None = None,
api_key: str | None = None,
system_prompt: str = "You are a helpful assistant."
):
self.api_key = api_key
self.system_message = {
"role": "system",
"content": system_prompt
}
self.messages = messages or []
self.duohub_client = Duohub(api_key=self.api_key)
self.memory_id = memory_id
def add_message(self, message: ChatCompletionMessageParam):
# Add message to conversation history
self.messages.append(message)
# Query duohub when user speaks
if message['role'] == 'user' and self.memory_id:
duohub_response = self.duohub_client.query(
query=message['content'],
memoryID=self.memory_id,
assisted=True
)
# Add retrieved context to conversation
if duohub_response and isinstance(duohub_response, dict) and 'payload' in duohub_response:
context_message = {
"role": "system",
"content": f"Context from graph: {duohub_response['payload']}"
}
self.messages.append(context_message)
def get_messages(self) -> List[ChatCompletionMessageParam]:
messages = [self.system_message]
history_messages = self.messages[-10:] if len(self.messages) > 10 else self.messages
messages.extend(history_messages)
return messages
Integrating with Pipecat Bot
Here’s how to set up your bot with the Window class:
async def main():
async with aiohttp.ClientSession() as session:
# Configure transport
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Chatbot",
DailyParams(
audio_out_enabled=True,
camera_out_enabled=True,
vad_enabled=True,
transcription_enabled=True
),
)
# Initialize conversation context with duohub memory
initial_messages = [
{
"role": "system",
"content": "You are Chatbot, a friendly, helpful robot..."
}
]
context = Window(
messages=initial_messages,
memory_id='your-memory-id', # Replace with your duohub memory ID
api_key=os.getenv("DUOHUB_API_KEY")
)
# Initialize services
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="your-voice-id"
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4"
)
# Create context aggregator
context_aggregator = llm.create_context_aggregator(context)
# Build pipeline
pipeline = Pipeline([
transport.input(),
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
# Handle new participants
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
await task.queue_frames([LLMMessagesFrame(initial_messages)])
# Run the pipeline
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())
Key Components Explained
Window Class
- Manages conversation history
- Integrates with duohub client
- Retrieves relevant context for each user message
- Maintains conversation window size (latest 10 messages)
Context Aggregator
- Handles message flow between the Window and pipeline
- Ensures proper message ordering
- Manages user and assistant message handling
Pipeline Configuration
The pipeline is set up with the following flow:
- Input from Daily.co transport
- User context processing
- LLM processing
- Text-to-speech conversion
- Output to transport
- Assistant context processing
Memory ID Configuration
You can configure your duohub memory ID in several ways:
- Environment Variable:
export DUOHUB_MEMORY_ID=your-memory-id
- Direct Assignment:
context = Window(
messages=initial_messages,
memory_id="your-memory-id",
api_key=os.getenv("DUOHUB_API_KEY")
)
Best Practices
-
Memory Management
- Keep memory IDs organized by conversation topic or user
- Regularly clean up unused memories
- Monitor memory usage and implement retention policies
-
Context Retrieval
- Implement retry logic for duohub queries
- Consider rate limiting for large conversation volumes
- Cache frequently accessed context
-
Error Handling
def add_message(self, message: ChatCompletionMessageParam):
try:
self.messages.append(message)
if message['role'] == 'user' and self.memory_id:
duohub_response = self.duohub_client.query(
query=message['content'],
memoryID=self.memory_id,
assisted=True
)
# Process response...
except Exception as e:
logger.error(f"Error processing message: {e}")
# Implement appropriate fallback behavior
- Performance Optimization
- Implement message pruning for long conversations
- Use async/await for network operations
- Consider batching for multiple context queries
Debugging Tips
- Enable debug logging:
import logging
logging.basicConfig(level=logging.DEBUG)
- Monitor duohub responses:
logger.debug(f"duohub response: {duohub_response}")
- Track message flow:
logger.debug(f"Message added: {message['role']} - {message['content'][:50]}...")
Common Issues and Solutions
-
Missing Context
- Verify memory ID is correct
- Check duohub API key permissions
- Ensure queries are properly formatted
-
Performance Issues
- Implement caching for frequent queries
- Optimize message window size
- Use connection pooling for API calls
-
Memory Usage
- Implement message cleanup
- Monitor context size
- Use streaming for large responses