File size: 3,422 Bytes
7431481
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a24940e
 
 
 
 
 
7431481
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
from transformers import AutoTokenizer, AutoModel
import torch
import faiss
import gradio as gr
import json

class FaissTextRetrieval:
    def __init__(self, model_name):
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModel.from_pretrained(model_name).eval()
        self.device = "cpu"

        self.all_index = faiss.read_index("data/all.index")
        with open("data/all.json", "r") as f:
            self.all_id2label = {int(k):v for k, v in json.load(f).items()}

        self.general_index = faiss.read_index("data/general.index")
        with open("data/general.json", "r") as f:
            self.general_id2label = {int(k):v for k, v in json.load(f).items()}

        self.character_index = faiss.read_index("data/character.index")
        with open("data/character.json", "r") as f:
            self.character_id2label = {int(k):v for k, v in json.load(f).items()}
        
    def to(self, device, dtype=torch.float32):
        self.device = device
        self.dtype = dtype if "cuda" in device else torch.float32
        self.model.to(device, dtype=dtype)
    
    @torch.no_grad()
    def average_pool(self, last_hidden_states, attention_mask):
        last_hidden = last_hidden_states.masked_fill(~attention_mask[..., None].bool(), 0.0)
        return last_hidden.sum(dim=1) / attention_mask.sum(dim=1)[..., None]
    
    @torch.no_grad()
    def get_embeddings(self, input_texts: list):
        batch_dict = self.tokenizer(input_texts, max_length=512, padding=True, truncation=True, return_tensors='pt')
        input_ids = batch_dict["input_ids"].to(self.device)
        attention_mask = batch_dict["attention_mask"].to(self.device)
        outputs = self.model(input_ids=input_ids, attention_mask=attention_mask)
        embeddings = self.average_pool(outputs.last_hidden_state, attention_mask)
        embeddings = torch.nn.functional.normalize(embeddings, p=2, dim=1)
        
        return embeddings
    
    def search(self, query, top_k: int = 5, search_type = "all") -> list:
        query = "query:" + query
        query_embeddings = self.get_embeddings([query]).float().cpu().numpy()

        if search_type == "all":
            index = self.all_index
            id2label = self.all_id2label
        elif search_type == "general":
            index = self.general_index
            id2label = self.general_id2label
        elif search_type == "character":
            index = self.character_index
            id2label = self.character_id2label
        
        distances, indices = index.search(query_embeddings, top_k)
        results = {id2label[idx]:distances[0][j] for j, idx in enumerate(indices[0])}

        return results
    
    def reset(self):
        self.passage_texts = []
        self.index = None

def main():
    rag = FaissTextRetrieval("intfloat/multilingual-e5-large")

    def search(query, search_type):
        return rag.search(query, top_k=50, search_type=search_type)
    
    description = """Model:[intfloat/multilingual-e5-large](https://huggingface.co/intfloat/multilingual-e5-large)
    
Tag:[SmilingWolf/wd-eva02-large-tagger-v3](https://huggingface.co/SmilingWolf/wd-eva02-large-tagger-v3)
"""
    
    gr.Interface(search, inputs=("textarea", gr.Radio(["all", "general", "character"], value="all")), outputs="label", title="Tag Search", description=description).launch()

if __name__ == "__main__":
    main()