Documentation Index Fetch the complete documentation index at: https://docs.duohub.ai/llms.txt
Use this file to discover all available pages before exploring further.
Integrating duohub memory with Pipecat
Learn how to enhance your Pipecat conversational AI agents with duohub memory capabilities for more contextual and persistent conversations.
Examples GitHub Repository See how to create a voice AI bot with Pipecat and duohub memory on GitHub
Overview
Pipecat is a framework from Daily.co for building voice AI agents. By integrating duohub memory, your agents can maintain context across conversations and access relevant information from past interactions.
Prerequisites
# Install Poetry if you haven't already
curl -sSL https://install.python-poetry.org | python3 -
# Initialize a new Poetry project (if starting from scratch)
poetry init
# Add required dependencies
poetry add pipecat duohub openai cartesia
Window Class Setup
First, create a Window class to manage conversation context and memory:
from duohub import Duohub
from openai.types.chat import ChatCompletionMessageParam
from typing import List
class Window :
def __init__ (
self ,
messages : List[ChatCompletionMessageParam] | None = None ,
memory_id : str | None = None ,
api_key : str | None = None ,
system_prompt : str = "You are a helpful assistant."
):
self .api_key = api_key
self .system_message = {
"role" : "system" ,
"content" : system_prompt
}
self .messages = messages or []
self .duohub_client = Duohub( api_key = self .api_key)
self .memory_id = memory_id
def add_message ( self , message : ChatCompletionMessageParam):
# Add message to conversation history
self .messages.append(message)
# Query duohub when user speaks
if message[ 'role' ] == 'user' and self .memory_id:
duohub_response = self .duohub_client.query(
query = message[ 'content' ],
memoryID = self .memory_id,
assisted = True
)
# Add retrieved context to conversation
if duohub_response and isinstance (duohub_response, dict ) and 'payload' in duohub_response:
context_message = {
"role" : "system" ,
"content" : f "Context from graph: { duohub_response[ 'payload' ] } "
}
self .messages.append(context_message)
def get_messages ( self ) -> List[ChatCompletionMessageParam]:
messages = [ self .system_message]
history_messages = self .messages[ - 10 :] if len ( self .messages) > 10 else self .messages
messages.extend(history_messages)
return messages
Integrating with Pipecat Bot
Here’s how to set up your bot with the Window class:
async def main ():
async with aiohttp.ClientSession() as session:
# Configure transport
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Chatbot" ,
DailyParams(
audio_out_enabled = True ,
camera_out_enabled = True ,
vad_enabled = True ,
transcription_enabled = True
),
)
# Initialize conversation context with duohub memory
initial_messages = [
{
"role" : "system" ,
"content" : "You are Chatbot, a friendly, helpful robot..."
}
]
context = Window(
messages = initial_messages,
memory_id = 'your-memory-id' , # Replace with your duohub memory ID
api_key = os.getenv( "DUOHUB_API_KEY" )
)
# Initialize services
tts = CartesiaTTSService(
api_key = os.getenv( "CARTESIA_API_KEY" ),
voice_id = "your-voice-id"
)
llm = OpenAILLMService(
api_key = os.getenv( "OPENAI_API_KEY" ),
model = "gpt-4"
)
# Create context aggregator
context_aggregator = llm.create_context_aggregator(context)
# Build pipeline
pipeline = Pipeline([
transport.input(),
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
])
task = PipelineTask(pipeline, PipelineParams( allow_interruptions = True ))
# Handle new participants
@transport.event_handler ( "on_first_participant_joined" )
async def on_first_participant_joined ( transport , participant ):
await transport.capture_participant_transcription(participant[ "id" ])
await task.queue_frames([LLMMessagesFrame(initial_messages)])
# Run the pipeline
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__" :
asyncio.run(main())
Key Components Explained
Window Class
Manages conversation history
Integrates with duohub client
Retrieves relevant context for each user message
Maintains conversation window size (latest 10 messages)
Context Aggregator
Handles message flow between the Window and pipeline
Ensures proper message ordering
Manages user and assistant message handling
Pipeline Configuration
The pipeline is set up with the following flow:
Input from Daily.co transport
User context processing
LLM processing
Text-to-speech conversion
Output to transport
Assistant context processing
Memory ID Configuration
You can configure your duohub memory ID in several ways:
Environment Variable :
export DUOHUB_MEMORY_ID = your-memory-id
Direct Assignment :
context = Window(
messages = initial_messages,
memory_id = "your-memory-id" ,
api_key = os.getenv( "DUOHUB_API_KEY" )
)
Best Practices
Memory Management
Keep memory IDs organized by conversation topic or user
Regularly clean up unused memories
Monitor memory usage and implement retention policies
Context Retrieval
Implement retry logic for duohub queries
Consider rate limiting for large conversation volumes
Cache frequently accessed context
Error Handling
def add_message ( self , message : ChatCompletionMessageParam):
try :
self .messages.append(message)
if message[ 'role' ] == 'user' and self .memory_id:
duohub_response = self .duohub_client.query(
query = message[ 'content' ],
memoryID = self .memory_id,
assisted = True
)
# Process response...
except Exception as e:
logger.error( f "Error processing message: { e } " )
# Implement appropriate fallback behavior
Performance Optimization
Implement message pruning for long conversations
Use async/await for network operations
Consider batching for multiple context queries
Debugging Tips
Enable debug logging:
import logging
logging.basicConfig( level = logging. DEBUG )
Monitor duohub responses:
logger.debug( f "duohub response: { duohub_response } " )
Track message flow:
logger.debug( f "Message added: { message[ 'role' ] } - { message[ 'content' ][: 50 ] } ..." )
Common Issues and Solutions
Missing Context
Verify memory ID is correct
Check duohub API key permissions
Ensure queries are properly formatted
Performance Issues
Implement caching for frequent queries
Optimize message window size
Use connection pooling for API calls
Memory Usage
Implement message cleanup
Monitor context size
Use streaming for large responses