Integrate with the Pipecat framework from Daily.co for creating conversational voice AI agents with duohub memory
# Install Poetry if you haven't already curl -sSL https://install.python-poetry.org | python3 - # Initialize a new Poetry project (if starting from scratch) poetry init # Add required dependencies poetry add pipecat duohub openai cartesia
from duohub import Duohub from openai.types.chat import ChatCompletionMessageParam from typing import List class Window: def __init__( self, messages: List[ChatCompletionMessageParam] | None = None, memory_id: str | None = None, api_key: str | None = None, system_prompt: str = "You are a helpful assistant." ): self.api_key = api_key self.system_message = { "role": "system", "content": system_prompt } self.messages = messages or [] self.duohub_client = Duohub(api_key=self.api_key) self.memory_id = memory_id def add_message(self, message: ChatCompletionMessageParam): # Add message to conversation history self.messages.append(message) # Query duohub when user speaks if message['role'] == 'user' and self.memory_id: duohub_response = self.duohub_client.query( query=message['content'], memoryID=self.memory_id, assisted=True ) # Add retrieved context to conversation if duohub_response and isinstance(duohub_response, dict) and 'payload' in duohub_response: context_message = { "role": "system", "content": f"Context from graph: {duohub_response['payload']}" } self.messages.append(context_message) def get_messages(self) -> List[ChatCompletionMessageParam]: messages = [self.system_message] history_messages = self.messages[-10:] if len(self.messages) > 10 else self.messages messages.extend(history_messages) return messages
async def main(): async with aiohttp.ClientSession() as session: # Configure transport (room_url, token) = await configure(session) transport = DailyTransport( room_url, token, "Chatbot", DailyParams( audio_out_enabled=True, camera_out_enabled=True, vad_enabled=True, transcription_enabled=True ), ) # Initialize conversation context with duohub memory initial_messages = [ { "role": "system", "content": "You are Chatbot, a friendly, helpful robot..." } ] context = Window( messages=initial_messages, memory_id='your-memory-id', # Replace with your duohub memory ID api_key=os.getenv("DUOHUB_API_KEY") ) # Initialize services tts = CartesiaTTSService( api_key=os.getenv("CARTESIA_API_KEY"), voice_id="your-voice-id" ) llm = OpenAILLMService( api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4" ) # Create context aggregator context_aggregator = llm.create_context_aggregator(context) # Build pipeline pipeline = Pipeline([ transport.input(), context_aggregator.user(), llm, tts, transport.output(), context_aggregator.assistant(), ]) task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) # Handle new participants @transport.event_handler("on_first_participant_joined") async def on_first_participant_joined(transport, participant): await transport.capture_participant_transcription(participant["id"]) await task.queue_frames([LLMMessagesFrame(initial_messages)]) # Run the pipeline runner = PipelineRunner() await runner.run(task) if __name__ == "__main__": asyncio.run(main())
export DUOHUB_MEMORY_ID=your-memory-id
context = Window( messages=initial_messages, memory_id="your-memory-id", api_key=os.getenv("DUOHUB_API_KEY") )
def add_message(self, message: ChatCompletionMessageParam): try: self.messages.append(message) if message['role'] == 'user' and self.memory_id: duohub_response = self.duohub_client.query( query=message['content'], memoryID=self.memory_id, assisted=True ) # Process response... except Exception as e: logger.error(f"Error processing message: {e}") # Implement appropriate fallback behavior
import logging logging.basicConfig(level=logging.DEBUG)
logger.debug(f"duohub response: {duohub_response}")
logger.debug(f"Message added: {message['role']} - {message['content'][:50]}...")