|
import numpy as np |
|
from collections import defaultdict |
|
from typing import List, Tuple, Callable |
|
from aimakerspace.openai_utils.embedding import EmbeddingModel |
|
import hashlib |
|
from qdrant_client import QdrantClient |
|
from qdrant_client.http.models import PointStruct |
|
|
|
def cosine_similarity(vector_a: np.array, vector_b: np.array) -> float: |
|
"""Computes the cosine similarity between two vectors.""" |
|
dot_product = np.dot(vector_a, vector_b) |
|
norm_a = np.linalg.norm(vector_a) |
|
norm_b = np.linalg.norm(vector_b) |
|
return dot_product / (norm_a * norm_b) |
|
|
|
|
|
class QdrantDatabase: |
|
def __init__(self, qdrant_client: QdrantClient, collection_name: str, embedding_model=None): |
|
self.qdrant_client = qdrant_client |
|
self.collection_name = collection_name |
|
self.embedding_model = embedding_model or EmbeddingModel() |
|
self.vectors = defaultdict(np.array) |
|
|
|
def string_to_int_id(self, s: str) -> int: |
|
return int(hashlib.sha256(s.encode('utf-8')).hexdigest(), 16) % (10**8) |
|
def get_test_vector(self): |
|
retrieved_vector = self.qdrant_client.retrieve( |
|
collection_name="my_collection", |
|
ids=[self.string_to_int_id("test_key")] |
|
) |
|
return retrieved_vector |
|
def insert(self, key: str, vector: np.array) -> None: |
|
point_id = self.string_to_int_id(key) |
|
payload = {"text": key} |
|
|
|
point = PointStruct( |
|
id=point_id, |
|
vector={"default": vector.tolist()}, |
|
payload=payload |
|
) |
|
print(f"Inserting vector for key: {key}, ID: {point_id}") |
|
|
|
self.qdrant_client.upsert( |
|
collection_name=self.collection_name, |
|
points=[point] |
|
) |
|
print(f"Inserted vector for key: {key} with ID: {point_id}") |
|
retrieved_vector = self.qdrant_client.retrieve( |
|
collection_name=self.collection_name, |
|
ids=[point_id] |
|
) |
|
print(f"Inserted vector with ID: {point_id}, retrieved: {retrieved_vector}") |
|
self.list_vectors() |
|
|
|
|
|
def list_vectors(self): |
|
|
|
collection_info = self.qdrant_client.get_collection(self.collection_name) |
|
print(f"Collection info: {collection_info}") |
|
|
|
def search( |
|
self, |
|
query_vector: np.array, |
|
k: int, |
|
distance_measure: Callable = None, |
|
) -> List[Tuple[str, float]]: |
|
|
|
if isinstance(query_vector, list): |
|
query_vector = np.array(query_vector) |
|
print(self.collection_name) |
|
print(f"Searching in collection: {self.collection_name} with vector: {query_vector}") |
|
collection_info = self.qdrant_client.get_collection(self.collection_name) |
|
print(f"Collection info: {collection_info}") |
|
|
|
search_results = self.qdrant_client.search( |
|
collection_name=self.collection_name, |
|
query_vector=query_vector.tolist(), |
|
limit=k |
|
) |
|
|
|
print(f"Search results: {search_results}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return [(result.payload['text'], result.score) for result in search_results] |
|
|
|
def search_by_text( |
|
self, |
|
query_text: str, |
|
k: int, |
|
distance_measure: Callable = None, |
|
return_as_text: bool = False, |
|
) -> List[Tuple[str, float]]: |
|
self.list_vectors() |
|
query_vector = self.embedding_model.get_embedding(query_text) |
|
results = self.search(query_vector, k, distance_measure) |
|
return [result[0] for result in results] if return_as_text else results |
|
|
|
def retrieve_from_key(self, key: str) -> np.array: |
|
|
|
return self.vectors.get(key, None) |
|
|
|
async def abuild_from_list(self, list_of_text: List[str]) -> "QdrantDatabase": |
|
embeddings = await self.embedding_model.async_get_embeddings(list_of_text) |
|
for text, embedding in zip(list_of_text, embeddings): |
|
self.insert(text, np.array(embedding)) |
|
return self |
|
|