# src/create_database.py import os import json from src.memory import MemoryManager # Corrected import path import logging from typing import List, Dict # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') def parse_data_update(file_path: str, keyword_dir: str) -> List[Dict[str, str]]: if not os.path.exists(file_path): logging.error(f"File not found: {file_path}") return [] with open(file_path, 'r') as file_obj: content = file_obj.read() content = content.lower() # Normalize to lowercase sections = [] lines = content.split('\n') current_section = None current_content = [] for line in lines: if line.strip().startswith("chronique #") or line.strip().startswith("flash info fl-") or line.strip().startswith("chronique-faq #"): if current_section: sections.append({ "concept": current_section, "description": "\n".join(current_content) }) logging.info(f"Parsed section: {current_section}") current_section = line.strip() current_content = [] else: current_content.append(line) if current_section: sections.append({ "concept": current_section, "description": "\n".join(current_content) }) logging.info(f"Parsed section: {current_section}") return sections def get_keywords(number: str, keyword_dir: str) -> List[str]: keyword_file = os.path.join(keyword_dir, f"FL-{number}-KEYWORD.txt") if not os.path.exists(keyword_file): keyword_file = os.path.join(keyword_dir, f"INFO-{number}-KEYWORD.txt") if not os.path.exists(keyword_file): keyword_file = os.path.join(keyword_dir, f"CHRONIQUE{number}-KEYWORD.txt") if not os.path.exists(keyword_file): logging.warning(f"Keyword file not found: {keyword_file}") return [] with open(keyword_file, 'r') as file_obj: content = file_obj.read() if 'KEYWORD = ' in content: content = content.split('KEYWORD = ')[1] tags = content.split(', ') tags = [tag.strip() for tag in tags if tag.strip()] # Remove empty tags logging.info(f"Keywords for {number}: {tags}") return tags def load_and_process_dataset(data_update_path: str, keyword_dir: str, db_path: str): memory_manager = MemoryManager(db_path) sections = parse_data_update(data_update_path, keyword_dir) for section in sections: concept = section['concept'] description = section['description'] number = concept.split('#')[1].split()[0] # Extract the number from the concept tags = get_keywords(number, keyword_dir) # Check if the section already exists in the database if not memory_manager.section_exists(concept): memory_manager.add_semantic_memory(concept, description, tags=tags) logging.info(f"Added section: {concept}") else: logging.info(f"Section already exists: {concept}") if __name__ == "__main__": data_update_path = "data-update.txt" keyword_dir = "keyword" # Updated keyword directory db_path = "agent.db" load_and_process_dataset(data_update_path, keyword_dir, db_path)