Spaces:
Sleeping
Sleeping
| # src/create_database.py | |
| import os | |
| import json | |
| from src.memory import MemoryManager # Corrected import path | |
| import logging | |
| from typing import List, Dict | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| def parse_data_update(file_path: str, keyword_dir: str) -> List[Dict[str, str]]: | |
| if not os.path.exists(file_path): | |
| logging.error(f"File not found: {file_path}") | |
| return [] | |
| with open(file_path, 'r') as file_obj: | |
| content = file_obj.read() | |
| content = content.lower() # Normalize to lowercase | |
| sections = [] | |
| lines = content.split('\n') | |
| current_section = None | |
| current_content = [] | |
| for line in lines: | |
| if line.strip().startswith("chronique #") or line.strip().startswith("flash info fl-") or line.strip().startswith("chronique-faq #"): | |
| if current_section: | |
| sections.append({ | |
| "concept": current_section, | |
| "description": "\n".join(current_content) | |
| }) | |
| logging.info(f"Parsed section: {current_section}") | |
| current_section = line.strip() | |
| current_content = [] | |
| else: | |
| current_content.append(line) | |
| if current_section: | |
| sections.append({ | |
| "concept": current_section, | |
| "description": "\n".join(current_content) | |
| }) | |
| logging.info(f"Parsed section: {current_section}") | |
| return sections | |
| def get_keywords(number: str, keyword_dir: str) -> List[str]: | |
| keyword_file = os.path.join(keyword_dir, f"FL-{number}-KEYWORD.txt") | |
| if not os.path.exists(keyword_file): | |
| keyword_file = os.path.join(keyword_dir, f"INFO-{number}-KEYWORD.txt") | |
| if not os.path.exists(keyword_file): | |
| keyword_file = os.path.join(keyword_dir, f"CHRONIQUE{number}-KEYWORD.txt") | |
| if not os.path.exists(keyword_file): | |
| logging.warning(f"Keyword file not found: {keyword_file}") | |
| return [] | |
| with open(keyword_file, 'r') as file_obj: | |
| content = file_obj.read() | |
| if 'KEYWORD = ' in content: | |
| content = content.split('KEYWORD = ')[1] | |
| tags = content.split(', ') | |
| tags = [tag.strip() for tag in tags if tag.strip()] # Remove empty tags | |
| logging.info(f"Keywords for {number}: {tags}") | |
| return tags | |
| def load_and_process_dataset(data_update_path: str, keyword_dir: str, db_path: str): | |
| memory_manager = MemoryManager(db_path) | |
| sections = parse_data_update(data_update_path, keyword_dir) | |
| for section in sections: | |
| concept = section['concept'] | |
| description = section['description'] | |
| number = concept.split('#')[1].split()[0] # Extract the number from the concept | |
| tags = get_keywords(number, keyword_dir) | |
| # Check if the section already exists in the database | |
| if not memory_manager.section_exists(concept): | |
| memory_manager.add_semantic_memory(concept, description, tags=tags) | |
| logging.info(f"Added section: {concept}") | |
| else: | |
| logging.info(f"Section already exists: {concept}") | |
| if __name__ == "__main__": | |
| data_update_path = "data-update.txt" | |
| keyword_dir = "keyword" # Updated keyword directory | |
| db_path = "agent.db" | |
| load_and_process_dataset(data_update_path, keyword_dir, db_path) | |