"""Research Agent Behaviour for Olas Framework""" import asyncio import json import os from typing import Any, Dict, List import gradio as gr import PyPDF2 import chromadb import httpx from langchain_text_splitters import RecursiveCharacterTextSplitter import hashlib from pathlib import Path from openai import OpenAI from loguru import logger from fastapi import FastAPI import uvicorn import time # Olas framework imports (these would be the actual Olas imports) # from aea.behaviours.base import BaseBehaviour # from aea.skills.base import Skill class ResearchAgentBehaviour: """Main behaviour for the Research Agent""" def __init__(self): self.logger = logger self.logger.add("log.txt", format="{time} {level} Research Agent {message}", level="INFO", rotation="50 MB") self.app = FastAPI() self.demo = None self.setup_fastapi_routes() self.setup_gradio_interface() def setup_fastapi_routes(self): """Setup FastAPI health check and other routes""" @self.app.get("/healthcheck") async def health(): models = await self.call_anura_api_for_models() seconds_since_last_transition = 30 # Decide if the agent is "transitioning fast" or stuck is_transitioning_fast = models is not None and len(models) > 0 and len(models.get('data', [])) > 0 health_check_data = { "seconds_since_last_transition": f"{seconds_since_last_transition:.2f}", "period": 30, "reset_pause_duration": 120, "rounds": ["pdf_processing_round", "search_round", "rag_chat_round"], "is_transitioning_fast": is_transitioning_fast, "agent_health": { "is_making_on_chain_transactions": False, "is_staking_kpi_met": False, "has_required_funds": True, "staking_status": "healthy", }, "rounds_info": { "pdf_processing_round": { "name": "PDF Processing Round", "description": "Processing and indexing PDF documents", "transitions": { "documents_processed": "search_round", }, }, "search_round": { "name": "Document Search Round", "description": "Performing semantic search on indexed documents", "transitions": { "search_completed": "rag_chat_round", }, }, "rag_chat_round": { "name": "RAG Chat Round", "description": "Interactive AI-powered research chat", "transitions": { "chat_session_end": "pdf_processing_round", }, }, }, "env_var_status": { "needs_update": False, "env_vars": { "ANURA_API_KEY": os.getenv("CONNECTION_CONFIGS_CONFIG_ANURA_API_KEY", "not_set"), "SERVER_HOST": os.getenv("CONNECTION_CONFIGS_CONFIG_SERVER_HOST", "0.0.0.0"), "SERVER_PORT": os.getenv("CONNECTION_CONFIGS_CONFIG_SERVER_PORT", "8716"), }, }, } return health_check_data def setup_gradio_interface(self): """Setup the Gradio interface for the research assistant""" with gr.Blocks(title="AI-Enhanced Research Assistant") as self.demo: gr.Markdown("# šŸ“š AI-Enhanced Research Assistant") gr.Markdown("Upload PDF files and search through them with AI-powered analysis using Anura API integration") with gr.Tabs(): with gr.TabItem("PDF Upload"): with gr.Row(): with gr.Column(): pdf_input = gr.File( label="Upload PDF Files", file_types=[".pdf"], file_count="multiple", type="filepath" ) upload_btn = gr.Button("Process PDFs", variant="primary") with gr.Column(): pdf_output = gr.Textbox( label="Upload Status", lines=10, interactive=False ) upload_btn.click(fn=self.process_pdf, inputs=pdf_input, outputs=pdf_output) pdf_input.upload(fn=self.process_pdf, inputs=pdf_input, outputs=pdf_output) with gr.TabItem("Document Search"): with gr.Row(): with gr.Column(): search_input = gr.Textbox( label="Search Query", placeholder="Enter your search query here...", lines=2 ) num_results = gr.Slider( minimum=1, maximum=10, value=5, step=1, label="Number of Results" ) anura_api_key = gr.Textbox( label="Anura API Key (Optional)", placeholder="Enter your Anura API key for AI-enhanced analysis...", lines=1, type="password" ) search_btn = gr.Button("Search Documents", variant="primary") with gr.Column(): search_output = gr.Textbox( label="Search Results", lines=20, interactive=False ) search_btn.click(fn=self.search_documents, inputs=[search_input, num_results, anura_api_key], outputs=search_output) search_input.submit(fn=self.search_documents, inputs=[search_input, num_results, anura_api_key], outputs=search_output) with gr.TabItem("RAG Chat"): gr.Markdown("### šŸ¤– AI Research Chat with Document Analysis") gr.Markdown("Have a conversation with AI about your documents. Chat history is maintained for context.") with gr.Row(): with gr.Column(scale=1): rag_num_results = gr.Slider( minimum=1, maximum=10, value=5, step=1, label="Documents per Query" ) rag_anura_api_key = gr.Textbox( label="Anura API Key", placeholder="Enter your Anura API key for chat functionality...", lines=1, type="password" ) clear_btn = gr.Button("Clear Chat History", variant="secondary") with gr.Column(scale=3): chatbot = gr.Chatbot( label="AI Research Assistant Chat", height=400, show_label=True ) with gr.Row(): msg = gr.Textbox( label="", placeholder="Ask questions about your documents...", lines=1, scale=4, container=False ) submit_btn = gr.Button("Send", variant="primary", scale=1) chat_history = gr.State([]) def user_input(message, history): if message.strip(): return "", history + [[message, ""]] return message, history def bot_response(history, num_results, api_key): if history and history[-1][1] == "": user_message = history[-1][0] history_without_pending = history[:-1] updated_history, _ = self.advanced_rag_search_chat(user_message, history_without_pending, num_results, api_key) return updated_history return history msg.submit(user_input, [msg, chatbot], [msg, chatbot]).then( bot_response, [chatbot, rag_num_results, rag_anura_api_key], [chatbot] ) submit_btn.click(user_input, [msg, chatbot], [msg, chatbot]).then( bot_response, [chatbot, rag_num_results, rag_anura_api_key], [chatbot] ) clear_btn.click(self.clear_chat_history, outputs=[chatbot, chat_history]) def process_pdf(self, pdf_files): """Process uploaded PDF files, extract text, split into paragraphs, and store in ChromaDB""" if pdf_files is None or len(pdf_files) == 0: return "No files uploaded. Please select PDF files." try: self.logger.info(f"Processing {len(pdf_files)} PDF files") client = chromadb.PersistentClient(path="./chroma_db") try: collection = client.get_collection("pdf_documents") except: collection = client.create_collection("pdf_documents") text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=200, separators=["\n\n", "\n", ". ", "! ", "? ", " "] ) results = [] total_size = 0 valid_files = 0 total_paragraphs = 0 for i, pdf_file_path in enumerate(pdf_files, 1): file_name = os.path.basename(pdf_file_path) file_size = os.path.getsize(pdf_file_path) if not file_name.lower().endswith('.pdf'): results.append(f"āŒ File {i}: {file_name} - Error: Not a PDF file") continue try: with open(pdf_file_path, 'rb') as pdf_file: pdf_reader = PyPDF2.PdfReader(pdf_file) text_content = "" for page_num in range(len(pdf_reader.pages)): page = pdf_reader.pages[page_num] text_content += page.extract_text() + "\n" if not text_content.strip(): results.append(f"āŒ File {i}: {file_name} - Error: No extractable text found") continue paragraphs = text_splitter.split_text(text_content) if not paragraphs: results.append(f"āŒ File {i}: {file_name} - Error: No paragraphs extracted") continue file_hash = hashlib.md5(file_name.encode()).hexdigest()[:8] paragraph_ids = [f"{file_hash}_para_{j}" for j in range(len(paragraphs))] metadatas = [ { "source_file": file_name, "file_size": file_size, "paragraph_index": j, "total_paragraphs": len(paragraphs), "file_path": pdf_file_path } for j in range(len(paragraphs)) ] collection.add( documents=paragraphs, ids=paragraph_ids, metadatas=metadatas ) results.append(f"āœ… File {i}: {file_name} ({file_size:,} bytes) - {len(paragraphs)} paragraphs stored in ChromaDB") total_size += file_size valid_files += 1 total_paragraphs += len(paragraphs) except Exception as e: results.append(f"āŒ File {i}: {file_name} - Error: {str(e)}") continue summary = f"\nšŸ“Š Summary:\n" summary += f"• Total files uploaded: {len(pdf_files)}\n" summary += f"• Valid PDF files processed: {valid_files}\n" summary += f"• Total size: {total_size:,} bytes\n" summary += f"• Total paragraphs stored in ChromaDB: {total_paragraphs}\n" summary += f"• ChromaDB collection: pdf_documents\n" summary += f"• Status: Ready for research!\n" return "\n".join(results) + summary except Exception as e: self.logger.error(f"Error processing files: {str(e)}") return f"Error processing files: {str(e)}" def search_documents(self, query, num_results=5, anura_api_key=""): """Search the ChromaDB collection for relevant paragraphs and enhance with LLM analysis""" if not query.strip(): return "Please enter a search query." try: self.logger.info(f"Searching documents for query: '{query}' with num_results={num_results}") client = chromadb.PersistentClient(path="./chroma_db") try: collection = client.get_collection("pdf_documents") except: return "No documents found. Please upload and process some PDF files first." results = collection.query( query_texts=[query], n_results=min(num_results, 10) ) if not results['documents'] or not results['documents'][0]: return f"No relevant documents found for query: '{query}'" search_results = [] search_results.append(f"šŸ” Search Results for: '{query}'\n") documents = results['documents'][0] metadatas = results['metadatas'][0] if results['metadatas'] else [] distances = results['distances'][0] if results['distances'] else [] for i, (doc, metadata, distance) in enumerate(zip(documents, metadatas, distances), 1): source_file = metadata.get('source_file', 'Unknown') if metadata else 'Unknown' para_index = metadata.get('paragraph_index', 'N/A') if metadata else 'N/A' similarity_score = max(0, (2 - distance) / 2 * 100) cleaned_doc = ' '.join(doc.split()) search_results.append(f"šŸ“„ Result {i} (Similarity: {similarity_score:.1f}%):") search_results.append(f" Source: {source_file} (Paragraph {para_index})") search_results.append(f" Content: {cleaned_doc[:500]}{'...' if len(cleaned_doc) > 500 else ''}") search_results.append("") search_results_text = "\n".join(search_results) enhanced_results = self.enhance_search_with_llm(query, search_results_text, anura_api_key) return enhanced_results except Exception as e: self.logger.error(f"Error searching documents: {str(e)}") return f"Error searching documents: {str(e)}" def call_anura_api(self, prompt, anura_api_key): """Call Anura API to enhance search results with LLM analysis""" if not anura_api_key or not anura_api_key.strip(): # Try to get from environment variable anura_api_key = os.getenv("CONNECTION_CONFIGS_CONFIG_ANURA_API_KEY") if not anura_api_key or not anura_api_key.strip(): return None try: client = OpenAI( base_url=os.getenv("CONNECTION_CONFIGS_CONFIG_ANURA_API_BASE_URL", "https://anura-testnet.lilypad.tech/api/v1"), api_key=anura_api_key ) completion = client.chat.completions.create( model=os.getenv("CONNECTION_CONFIGS_CONFIG_ANURA_MODEL", "llama3.1:8b"), messages=[ {"role": "system", "content": "You are a helpful AI research assistant. Analyze the search results and provide insights, summaries, or answer questions based on the context provided."}, {"role": "user", "content": prompt} ] ) return completion.choices[0].message.content except Exception as e: self.logger.error(f"Error calling Anura API with exception: {str(e)}") return f"Error calling Anura API: {str(e)}" async def call_anura_api_for_models(self): """Call Anura API to retrieve the list of models""" try: client = httpx.AsyncClient() base_url = os.getenv("CONNECTION_CONFIGS_CONFIG_ANURA_API_BASE_URL", "https://anura-testnet.lilypad.tech/api/v1") response = await client.get(f"{base_url}/models") return response.json() except Exception as e: self.logger.error(f"Error calling Anura API for models with exception: {str(e)}") return None def enhance_search_with_llm(self, query, search_results_text, anura_api_key): """Enhance search results using Anura API LLM""" if not anura_api_key or not anura_api_key.strip(): return search_results_text enhancement_prompt = f""" Based on the user's query: "{query}" And the following search results from documents: {search_results_text} Please provide: 1. A concise summary of the key findings 2. Direct answers to the user's question if possible 3. Any insights or connections between the different search results 4. Suggestions for further research if needed Format your response clearly with headers and bullet points where appropriate. """ llm_enhancement = self.call_anura_api(enhancement_prompt, anura_api_key) if llm_enhancement and not llm_enhancement.startswith("Error"): return f"{search_results_text}\n\nšŸ¤– **AI Analysis & Summary:**\n{llm_enhancement}" else: return f"{search_results_text}\n\nšŸ¤– **AI Analysis:** {llm_enhancement or 'Unable to generate analysis'}" def advanced_rag_search_chat(self, message, history, num_results=5, anura_api_key=""): """Advanced RAG search with chat interface and history tracking""" if not message.strip(): return history, history if not anura_api_key or not anura_api_key.strip(): anura_api_key = os.getenv("CONNECTION_CONFIGS_CONFIG_ANURA_API_KEY") if not anura_api_key or not anura_api_key.strip(): error_response = "Advanced RAG search requires an Anura API key. Please provide your API key." history.append([message, error_response]) return history, history try: client = chromadb.PersistentClient(path="./chroma_db") try: collection = client.get_collection("pdf_documents") except: error_response = "No documents found. Please upload and process some PDF files first." history.append([message, error_response]) self.logger.error(error_response) return history, history results = collection.query( query_texts=[message], n_results=min(num_results * 2, 20) ) if not results['documents'] or not results['documents'][0]: error_response = f"No relevant documents found for query: '{message}'" history.append([message, error_response]) self.logger.error(error_response) return history, history documents = results['documents'][0] metadatas = results['metadatas'][0] if results['metadatas'] else [] distances = results['distances'][0] if results['distances'] else [] context_chunks = [] for i, (doc, metadata, distance) in enumerate(zip(documents, metadatas, distances)): source_file = metadata.get('source_file', 'Unknown') if metadata else 'Unknown' para_index = metadata.get('paragraph_index', 'N/A') if metadata else 'N/A' similarity_score = max(0, (2 - distance) / 2 * 100) context_chunks.append(f"[Source: {source_file}, Paragraph {para_index}, Similarity: {similarity_score:.1f}%]\n{doc}") comprehensive_context = "\n\n---\n\n".join(context_chunks) conversation_context = "" if history: conversation_context = "\n\nPrevious conversation:\n" for user_msg, assistant_msg in history[-3:]: conversation_context += f"User: {user_msg}\nAssistant: {assistant_msg}\n\n" rag_prompt = f""" You are an expert research assistant analyzing documents to answer user queries in a conversational manner. {conversation_context} Current User Query: "{message}" Document Context: {comprehensive_context} Please provide a comprehensive analysis that includes: 1. **Direct Answer**: If the documents contain a direct answer to the user's question, provide it clearly. 2. **Key Findings**: Summarize the most relevant information from the documents related to the query. 3. **Supporting Evidence**: Quote specific passages from the documents that support your findings (include source references). 4. **Analysis & Insights**: Provide deeper analysis, connections between different sources, and implications. 5. **Context Awareness**: If this relates to previous questions in our conversation, acknowledge the connection and build upon earlier discussion. 6. **Follow-up Questions**: Suggest relevant follow-up questions the user might want to explore. Format your response conversationally but with clear structure. Always cite your sources when making claims. """ llm_analysis = self.call_anura_api(rag_prompt, anura_api_key) if llm_analysis and not llm_analysis.startswith("Error"): response = f"{llm_analysis}\n\n---\n\nšŸ“Š **Search Statistics:**\n• Documents analyzed: {len(documents)}\n• Sources: {len(set(m.get('source_file', 'Unknown') for m in metadatas))}\n• Analysis powered by Anura API" else: response = f"Error in advanced analysis: {llm_analysis or 'Unable to generate comprehensive analysis'}" history.append([message, response]) return history, history except Exception as e: error_response = f"Error in advanced RAG search: {str(e)}" history.append([message, error_response]) self.logger.error(error_response) return history, history def clear_chat_history(self): """Clear the chat history""" return [], [] def start_server(self): """Start the research agent server""" # Mount the gradio app to the FastAPI app self.app = gr.mount_gradio_app(self.app, self.demo, path="/", root_path="/healthcheck") host = os.getenv("CONNECTION_CONFIGS_CONFIG_SERVER_HOST", "0.0.0.0") port = int(os.getenv("CONNECTION_CONFIGS_CONFIG_SERVER_PORT", "8716")) self.logger.info(f"Starting Research Agent server on {host}:{port}") uvicorn.run(self.app, host=host, port=port, reload=False) # Main execution if __name__ == "__main__": agent = ResearchAgentBehaviour() agent.start_server()