File size: 4,483 Bytes
0614fbf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import numpy as np
from collections import defaultdict
from typing import List, Tuple, Callable
from aimakerspace.openai_utils.embedding import EmbeddingModel
import hashlib
from qdrant_client import QdrantClient
from qdrant_client.http.models import PointStruct

def cosine_similarity(vector_a: np.array, vector_b: np.array) -> float:
    """Computes the cosine similarity between two vectors."""
    dot_product = np.dot(vector_a, vector_b)
    norm_a = np.linalg.norm(vector_a)
    norm_b = np.linalg.norm(vector_b)
    return dot_product / (norm_a * norm_b)


class QdrantDatabase:
    def __init__(self, qdrant_client: QdrantClient, collection_name: str, embedding_model=None):
        self.qdrant_client = qdrant_client
        self.collection_name = collection_name
        self.embedding_model = embedding_model or EmbeddingModel()
        self.vectors = defaultdict(np.array)  # Still keeps a local copy if needed

    def string_to_int_id(self, s: str) -> int:
        return int(hashlib.sha256(s.encode('utf-8')).hexdigest(), 16) % (10**8)
    def get_test_vector(self):
        retrieved_vector = self.qdrant_client.retrieve(
                collection_name="my_collection",
                ids=[self.string_to_int_id("test_key")]
            )
        return retrieved_vector
    def insert(self, key: str, vector: np.array) -> None:
        point_id = self.string_to_int_id(key)
        payload = {"text": key}  

        point = PointStruct(
            id=point_id,
            vector={"default": vector.tolist()},
            payload=payload
        )
        print(f"Inserting vector for key: {key}, ID: {point_id}")
        # Insert the vector into Qdrant with the associated document
        self.qdrant_client.upsert(
            collection_name=self.collection_name,
            points=[point]  # Qdrant expects a list of PointStruct
        )
        print(f"Inserted vector for key: {key} with ID: {point_id}")
        retrieved_vector = self.qdrant_client.retrieve(
            collection_name=self.collection_name,
            ids=[point_id]
        )
        print(f"Inserted vector with ID: {point_id}, retrieved: {retrieved_vector}")
        self.list_vectors()


    def list_vectors(self):
        # List all vectors in the collection for debugging
        collection_info = self.qdrant_client.get_collection(self.collection_name)
        print(f"Collection info: {collection_info}")

    def search(
        self,
        query_vector: np.array,
        k: int,
        distance_measure: Callable = None,
    ) -> List[Tuple[str, float]]:
        # Perform search in Qdrant
        if isinstance(query_vector, list):
            query_vector = np.array(query_vector)
        print(self.collection_name)
        print(f"Searching in collection: {self.collection_name} with vector: {query_vector}")
        collection_info = self.qdrant_client.get_collection(self.collection_name)
        print(f"Collection info: {collection_info}")

        search_results = self.qdrant_client.search(
            collection_name=self.collection_name,
            query_vector=query_vector.tolist(),  # Pass the vector as a list
            limit=k
        )

        print(f"Search results: {search_results}")
        # print(query_vector.tolist())
        # search_results = self.qdrant_client.query_points(
        #     collection_name=self.collection_name,
        #     query=query_vector.tolist(),  # Pass the vector as a list
        #     limit=k,
        # )
        # Extract and return results
        return [(result.payload['text'], result.score) for result in search_results]

    def search_by_text(
        self,
        query_text: str,
        k: int,
        distance_measure: Callable = None,
        return_as_text: bool = False,
    ) -> List[Tuple[str, float]]:
        self.list_vectors()
        query_vector = self.embedding_model.get_embedding(query_text)
        results = self.search(query_vector, k, distance_measure)
        return [result[0] for result in results] if return_as_text else results

    def retrieve_from_key(self, key: str) -> np.array:
        # Retrieve from local cache
        return self.vectors.get(key, None)

    async def abuild_from_list(self, list_of_text: List[str]) -> "QdrantDatabase":
        embeddings = await self.embedding_model.async_get_embeddings(list_of_text)
        for text, embedding in zip(list_of_text, embeddings):
            self.insert(text, np.array(embedding))
        return self