search_demo / src /utils_search.py
bibliotecadebabel
mxbai endpoint
b1179cf
raw
history blame
6.68 kB
from src.pytorch_modules.datasets.schema_string_dataset import SchemaStringDataset
import os
import pandas as pd
import numpy as np
import json
import faiss
import torch
class UtilsSearch:
def __init__(self, config):
self.config = config
@staticmethod
def dataframe_to_index(df):
embeddings = np.stack(df['embeddings'].to_numpy())
norm_embeddings = np.ascontiguousarray(embeddings / np.linalg.norm(embeddings, axis=1)[:, None])
# Create a FAISS index (Step 2, unchanged but using normalized embeddings)
dimension = norm_embeddings.shape[1]
index = faiss.IndexFlatL2(dimension)
index.add(norm_embeddings)
return index # Ad
def retrieve(self, query, df, model, index, top_k=100, api=False):
query += "Represent this sentence for searching relevant passages: "
"""
Search the DataFrame for the given query and return a sorted DataFrame based on similarity.
:param query: The search query string.
:param df: The input DataFrame containing embeddings.
:param model: The model to encode the query and compute embeddings.
:param index: The search index for querying.
:param top_k: The number of top results to return.
:return: A new DataFrame sorted by similarity to the query, with a 'similarities' column.
"""
# Check if CUDA is available and set the device accordingly
if not api:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
# Compute the query embedding
query_vector = model.encode(query, convert_to_tensor=True, device=device).cpu().numpy()
query_vector /= np.linalg.norm(query_vector)
else:
res = model.embeddings(
input=[query],
model=self.config["sentence_transformer_name"],
prompt=None,
)
query_vector = np.array([entry.embedding for entry in res.data][0]).astype(np.float32)
# Normalize the query vector
# Perform the search
distances, indices = index.search(np.array([query_vector]), top_k)
# Retrieve the rows from the DataFrame corresponding to the indices
retrieved_df = df.iloc[indices[0]]
# Attach the distances as a new column named 'similarities'
# Ensure the distances array matches the size of the retrieved DataFrame, especially if using slicing or other operations that might change its shape
retrieved_df = retrieved_df.assign(similarities=distances[0])
if 'similarities' in retrieved_df.columns:
retrieved_df = retrieved_df.sort_values(by='similarities', ascending=True)
# Optionally, you might want to reset the index if the order matters or if you need to serialize the DataFrame without index issues
retrieved_df = retrieved_df.reset_index(drop=True)
return retrieved_df
def rerank(self, query, df_top_100, cross_encoder, index):
# Convert the top 5 records to a list of dictionaries for processing
# print(df_top_100)
config = self.config
df_copy = df_top_100.copy().reset_index(drop=True)
records = df_copy.to_dict(orient='records')[:100]
# Assuming SchemaStringDataset can handle GPU data
dataset_str = SchemaStringDataset(records, config)
# Extract documents from dataset
documents = [batch["inputs"][:256] for batch in dataset_str]
# Rank documents based on the query
# Ensure data processed by cross_encoder is moved to the correct device
ids = [item["corpus_id"] for item in cross_encoder.rank(query, documents, top_k=10)]
# Use the ids to filter and reorder the original DataFrame
df_sorted_by_relevance = df_copy.loc[ids]
return df_sorted_by_relevance
def search(self, query, df, model, cross_encoder, index):
sorted_df = self.retrieve(query, df, model, index)
return self.rerank(query, sorted_df, cross_encoder, index)
@staticmethod
def top_10_common_values(df, column_name):
"""
This function takes a pandas dataframe and a column name,
and returns the top 10 most common non-null values of that column as a list.
"""
# Drop null values from the specified column and count occurrences of each value
# Convert the index of the resulting Series (which contains the values) to a list
value_counts_list = df[column_name].dropna().value_counts().head(10).index.tolist()
return value_counts_list
@staticmethod
def filter_dataframe(df, config, top_k_programmatic=100):
"""
Filters a DataFrame based on scalar and discrete column configurations, with type handling and null filtering.
Parameters:
- df: pandas.DataFrame to filter.
- config: Dictionary containing 'scalar_columns' and 'discrete_columns' configurations.
Returns:
- Filtered pandas.DataFrame.
"""
scalar_columns = config.get('scalar_columns', [])
discrete_columns = config.get('discrete_columns', [])
# Combine all column names to check for nulls
all_columns = [col["column_name"] for col in scalar_columns] + [col["column_name"] for col in discrete_columns]
# Drop rows where any of the specified columns have null values
df = df.dropna(subset=all_columns)
# Filtering based on scalar columns
for col in scalar_columns:
column_name = col["column_name"]
# Ensure min_value and max_value are of numeric type
min_value = float(col["min_value"])
max_value = float(col["max_value"])
# Convert the DataFrame column to numeric type to avoid comparison issues
df[column_name] = pd.to_numeric(df[column_name], errors='coerce')
df = df[df[column_name].between(min_value, max_value)]
# Filtering based on discrete columns
for col in discrete_columns:
column_name = col["column_name"]
default_values = col["default_values"]
if len(default_values) > 0:
df = df[df[column_name].isin(default_values)]
if 'similarities' in df.columns:
df = df.sort_values(by='similarities', ascending=False)
# Return the top 100 items with the highest similarity
return df
@staticmethod
def drop_columns(df, config):
columns_to_drop = config.get('columns_to_drop', [])
df_dropped = df.drop(columns_to_drop, axis=1)
return df_dropped