Spaces:
Running
Running
from src.pytorch_modules.datasets.schema_string_dataset import SchemaStringDataset | |
import os | |
import pandas as pd | |
import numpy as np | |
import json | |
import faiss | |
import torch | |
class UtilsSearch: | |
def __init__(self, config): | |
self.config = config | |
def dataframe_to_index(df): | |
embeddings = np.stack(df['embeddings'].to_numpy()) | |
norm_embeddings = np.ascontiguousarray(embeddings / np.linalg.norm(embeddings, axis=1)[:, None]) | |
# Create a FAISS index (Step 2, unchanged but using normalized embeddings) | |
dimension = norm_embeddings.shape[1] | |
index = faiss.IndexFlatL2(dimension) | |
index.add(norm_embeddings) | |
return index # Ad | |
def retrieve(self, query, df, model, index, top_k=100, api=False): | |
query += "Represent this sentence for searching relevant passages: " | |
""" | |
Search the DataFrame for the given query and return a sorted DataFrame based on similarity. | |
:param query: The search query string. | |
:param df: The input DataFrame containing embeddings. | |
:param model: The model to encode the query and compute embeddings. | |
:param index: The search index for querying. | |
:param top_k: The number of top results to return. | |
:return: A new DataFrame sorted by similarity to the query, with a 'similarities' column. | |
""" | |
# Check if CUDA is available and set the device accordingly | |
if not api: | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
model.to(device) | |
# Compute the query embedding | |
query_vector = model.encode(query, convert_to_tensor=True, device=device).cpu().numpy() | |
query_vector /= np.linalg.norm(query_vector) | |
else: | |
res = model.embeddings( | |
input=[query], | |
model=self.config["sentence_transformer_name"], | |
prompt=None, | |
) | |
query_vector = np.array([entry.embedding for entry in res.data][0]).astype(np.float32) | |
# Normalize the query vector | |
# Perform the search | |
distances, indices = index.search(np.array([query_vector]), top_k) | |
# Retrieve the rows from the DataFrame corresponding to the indices | |
retrieved_df = df.iloc[indices[0]] | |
# Attach the distances as a new column named 'similarities' | |
# Ensure the distances array matches the size of the retrieved DataFrame, especially if using slicing or other operations that might change its shape | |
retrieved_df = retrieved_df.assign(similarities=distances[0]) | |
if 'similarities' in retrieved_df.columns: | |
retrieved_df = retrieved_df.sort_values(by='similarities', ascending=True) | |
# Optionally, you might want to reset the index if the order matters or if you need to serialize the DataFrame without index issues | |
retrieved_df = retrieved_df.reset_index(drop=True) | |
return retrieved_df | |
def rerank(self, query, df_top_100, cross_encoder, index): | |
# Convert the top 5 records to a list of dictionaries for processing | |
# print(df_top_100) | |
config = self.config | |
df_copy = df_top_100.copy().reset_index(drop=True) | |
records = df_copy.to_dict(orient='records')[:100] | |
# Assuming SchemaStringDataset can handle GPU data | |
dataset_str = SchemaStringDataset(records, config) | |
# Extract documents from dataset | |
documents = [batch["inputs"][:256] for batch in dataset_str] | |
# Rank documents based on the query | |
# Ensure data processed by cross_encoder is moved to the correct device | |
ids = [item["corpus_id"] for item in cross_encoder.rank(query, documents, top_k=10)] | |
# Use the ids to filter and reorder the original DataFrame | |
df_sorted_by_relevance = df_copy.loc[ids] | |
return df_sorted_by_relevance | |
def search(self, query, df, model, cross_encoder, index): | |
sorted_df = self.retrieve(query, df, model, index) | |
return self.rerank(query, sorted_df, cross_encoder, index) | |
def top_10_common_values(df, column_name): | |
""" | |
This function takes a pandas dataframe and a column name, | |
and returns the top 10 most common non-null values of that column as a list. | |
""" | |
# Drop null values from the specified column and count occurrences of each value | |
# Convert the index of the resulting Series (which contains the values) to a list | |
value_counts_list = df[column_name].dropna().value_counts().head(10).index.tolist() | |
return value_counts_list | |
def filter_dataframe(df, config, top_k_programmatic=100): | |
""" | |
Filters a DataFrame based on scalar and discrete column configurations, with type handling and null filtering. | |
Parameters: | |
- df: pandas.DataFrame to filter. | |
- config: Dictionary containing 'scalar_columns' and 'discrete_columns' configurations. | |
Returns: | |
- Filtered pandas.DataFrame. | |
""" | |
scalar_columns = config.get('scalar_columns', []) | |
discrete_columns = config.get('discrete_columns', []) | |
# Combine all column names to check for nulls | |
all_columns = [col["column_name"] for col in scalar_columns] + [col["column_name"] for col in discrete_columns] | |
# Drop rows where any of the specified columns have null values | |
df = df.dropna(subset=all_columns) | |
# Filtering based on scalar columns | |
for col in scalar_columns: | |
column_name = col["column_name"] | |
# Ensure min_value and max_value are of numeric type | |
min_value = float(col["min_value"]) | |
max_value = float(col["max_value"]) | |
# Convert the DataFrame column to numeric type to avoid comparison issues | |
df[column_name] = pd.to_numeric(df[column_name], errors='coerce') | |
df = df[df[column_name].between(min_value, max_value)] | |
# Filtering based on discrete columns | |
for col in discrete_columns: | |
column_name = col["column_name"] | |
default_values = col["default_values"] | |
if len(default_values) > 0: | |
df = df[df[column_name].isin(default_values)] | |
if 'similarities' in df.columns: | |
df = df.sort_values(by='similarities', ascending=False) | |
# Return the top 100 items with the highest similarity | |
return df | |
def drop_columns(df, config): | |
columns_to_drop = config.get('columns_to_drop', []) | |
df_dropped = df.drop(columns_to_drop, axis=1) | |
return df_dropped | |