Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
import pandas as pd | |
path_to_data = "./docStore/" | |
from appStore.prep_utils import create_chunks | |
from appStore.search import hybrid_search | |
from datetime import datetime | |
def convert_to_date(val): | |
try: | |
# If val is a string, first check if it represents a numeric value. | |
if isinstance(val, str): | |
val_str = val.strip() | |
try: | |
# Try converting the string to a float (i.e. it’s an epoch in string form) | |
num = float(val_str) | |
return datetime.utcfromtimestamp(num / 1000).strftime("%Y-%m-%d") | |
except ValueError: | |
# Not a numeric string; assume it's already a date string in "YYYY-MM-DD" format. | |
# Optionally, you can validate it: | |
datetime.strptime(val_str, "%Y-%m-%d") | |
return val_str | |
elif isinstance(val, (int, float)): | |
return datetime.utcfromtimestamp(val / 1000).strftime("%Y-%m-%d") | |
else: | |
return "Unknown" | |
except Exception: | |
return "Unknown" | |
def process_iati(): | |
""" | |
this will read the iati files and create the chunks | |
""" | |
orgas_df = pd.read_csv(f"{path_to_data}iati_files/project_orgas.csv") | |
region_df = pd.read_csv(f"{path_to_data}iati_files/project_region.csv") | |
sector_df = pd.read_csv(f"{path_to_data}iati_files/project_sector.csv") | |
status_df = pd.read_csv(f"{path_to_data}iati_files/project_status.csv") | |
texts_df = pd.read_csv(f"{path_to_data}iati_files/project_texts.csv") | |
projects_df = pd.merge(orgas_df, region_df, on='iati_id', how='inner') | |
projects_df = pd.merge(projects_df, sector_df, on='iati_id', how='inner') | |
projects_df = pd.merge(projects_df, status_df, on='iati_id', how='inner') | |
projects_df = pd.merge(projects_df, texts_df, on='iati_id', how='inner') | |
projects_df = projects_df[projects_df.client.str.contains('bmz')].reset_index(drop=True) | |
projects_df.drop(columns= ['orga_abbreviation', 'client', | |
'orga_full_name', 'country', | |
'country_flag', 'crs_5_code', 'crs_3_code','country_code_list', | |
'sgd_pred_code','crs_5_name', 'crs_3_name', 'sgd_pred_str'], inplace=True) | |
#print(projects_df.columns) | |
projects_df['text_size'] = projects_df.apply(lambda x: len((x['title_main'] + x['description_main']).split()), axis=1) | |
projects_df['chunks'] = projects_df.apply(lambda x:create_chunks(x['title_main'] + x['description_main']),axis=1) | |
projects_df = projects_df.explode(column=['chunks'], ignore_index=True) | |
projects_df['source'] = 'IATI' | |
projects_df.rename(columns = {'iati_id':'id','iati_orga_id':'org'}, inplace=True) | |
return projects_df | |
def process_giz_worldwide(): | |
""" | |
This function reads the new giz_worldwide file and prepares the data for embedding. | |
Adjustments made: | |
- Reads the file 'giz_worldwide_api_download_23_02_2025.json' | |
- Renames 'name.en' to 'project_name' | |
- Uses the 'merged_text' column for embedding the whole text (no chunking) | |
- Creates an empty 'url' column (since the new dataset has an empty URL) | |
- Renames 'country' to 'countries' | |
- Renames 'duration.project.start' to 'start_year' and 'duration.project.end' to 'end_year' | |
""" | |
# Read the new JSON file | |
giz_df = pd.read_json(f'{path_to_data}giz_worldwide/giz_worldwide_api_download_23_02_2025.json') | |
# Sample random rows for quick embeddings (seed set for reproducibility) | |
#giz_df = giz_df.sample(n=100, random_state=42) | |
# Reset the index so that create_documents can iterate using integer indices | |
giz_df = giz_df.reset_index(drop=True) | |
# Rename columns per new dataset requirements | |
giz_df = giz_df.rename(columns={ | |
'name.en': 'project_name', | |
'country': 'countries', | |
'duration.project.start': 'start_year', | |
'duration.project.end': 'end_year' | |
}) | |
giz_df['end_year'] = giz_df['end_year'].apply(convert_to_date) | |
# Create an empty 'url' column as the new dataset has an empty URL | |
giz_df['url'] = '' | |
# Compute text_size based on merged_text and assign full text to the 'chunks' column | |
giz_df['text_size'] = giz_df['merged_text'].apply(lambda text: len(text.split()) if isinstance(text, str) else 0) | |
# Use the full merged_text for embedding (no chunking). | |
# If downstream code expects a list, you could instead wrap it in a list: | |
# giz_df['chunks'] = giz_df['merged_text'].apply(lambda text: [text] if isinstance(text, str) else []) | |
giz_df['chunks'] = giz_df['merged_text'] | |
giz_df['source'] = 'GIZ_WORLDWIDE' | |
return giz_df | |
# def process_giz_worldwide(): | |
# """ | |
# this will read the giz_worldwide files and create the chunks | |
# """ | |
# giz_df = pd.read_json(f'{path_to_data}giz_worldwide/data_giz_website.json') | |
# giz_df = giz_df.rename(columns={'content':'project_description'}) | |
# # Sample random rows for quick embeddings (seed set for reproducibility) | |
# giz_df = giz_df.sample(n=5, random_state=42) | |
# giz_df['text_size'] = giz_df.apply(lambda x: len((x['project_name'] + x['project_description']).split()), axis=1) | |
# giz_df['chunks'] = giz_df.apply(lambda x:create_chunks(x['project_name'] + x['project_description']),axis=1) | |
# print("initial df length:",len(giz_df)) | |
# giz_df = giz_df.explode(column=['chunks'], ignore_index=True) | |
# print("new df length:",len(giz_df)) | |
# print(giz_df.columns) | |
# #giz_df.drop(columns = ['filename', 'url', 'name', 'mail', | |
# # 'language', 'start_year', 'end_year','poli_trager'], inplace=True) | |
# giz_df['source'] = 'GIZ_WORLDWIDE' | |
# return giz_df | |
def remove_duplicates(results_list): | |
""" | |
Return a new list of results with duplicates removed, | |
based on 'url' in metadata. | |
""" | |
unique_results = [] | |
seen_urls = set() | |
for r in results_list: | |
# Safely get the URL from metadata | |
url = r.payload['metadata'].get('id', None) | |
if url not in seen_urls: | |
seen_urls.add(url) | |
unique_results.append(r) | |
return unique_results | |
def extract_year(date_str): | |
try: | |
return str(datetime.strptime(date_str, "%Y-%m-%d").year) | |
except Exception: | |
return "Unknown" | |
def get_max_end_year(_client, collection_name): | |
""" | |
Return the maximum 'end_year' in the entire collection | |
so we can set the slider's max_value dynamically. | |
""" | |
# For safety, get a large pool of items | |
all_res = hybrid_search(_client, "", collection_name, limit=2000) | |
big_list = all_res[0] + all_res[1] | |
years = [] | |
for r in big_list: | |
metadata = r.payload.get('metadata', {}) | |
year_str = metadata.get('end_year', None) | |
if year_str: | |
try: | |
years.append(float(year_str)) | |
except ValueError: | |
pass | |
if not years: | |
# fallback if no valid end years found | |
return 2030 | |
return int(max(years)) | |