|
import gradio as gr |
|
import os |
|
import random |
|
import string |
|
from PIL import Image |
|
from src.GameMaster import GameMaster |
|
from collections import deque |
|
import time |
|
import threading |
|
import queue |
|
|
|
from src.Founder import Founder |
|
|
|
import os |
|
|
|
|
|
|
|
game_master = GameMaster() |
|
|
|
founder_base = Founder() |
|
|
|
blank_image_path = "datas/blank_item.jpg" |
|
|
|
random_data = game_master.random_image_text_data( 12 ) |
|
recent_generated_items = deque(random_data , maxlen=12) |
|
|
|
def refresh_contribution_ladder(): |
|
top_contributors = founder_base.get_top_rank(top_k=20) |
|
contributions = [] |
|
|
|
for founder_name, items in top_contributors: |
|
random_items = random.sample(items, min(5, len(items))) |
|
cultivation_names = [] |
|
for item in random_items: |
|
result = game_master.textdb.search_by_en_keyword(item) |
|
if result is not None and "name_in_cultivation" in result: |
|
cultivation_names.append(result['name_in_cultivation']) |
|
else: |
|
cultivation_names.append(item) |
|
|
|
items_description = ', '.join(cultivation_names) |
|
|
|
contribution = f"""道友姓名: {founder_name}\n发现了{items_description}等物品""" |
|
contributions.append(contribution) |
|
|
|
while len(contributions) < 20: |
|
contributions.append("") |
|
|
|
return contributions |
|
|
|
|
|
def expensive_generating(save_path, image_feature, backup_results, game_master, founder_name = None): |
|
|
|
|
|
|
|
image_search_result = game_master.imgdb.top_k_search(image_feature, top_k=1) |
|
|
|
if image_search_result and len(image_search_result)>0 and image_search_result[0]['similarity'] > game_master.minimal_image_threshold: |
|
return |
|
|
|
global recent_generated_items |
|
|
|
recent_generated_items.append((save_path, "天机阁长老正在鉴定,道友请耐心等待。。")) |
|
|
|
search_result, backup_results, image_feature = game_master.search_with_path(save_path) |
|
if search_result is None: |
|
|
|
cultivation_data = game_master.generate_cultivation_data( \ |
|
save_path, image_feature, backup_results ) |
|
description = cultivation_data["description_in_cultivation"] |
|
print("鉴定得到新物品:", description) |
|
else: |
|
cultivation_data = search_result |
|
description = cultivation_data["description_in_cultivation"] |
|
print("鉴定这是记载过的物品,但是第一次发现:", description) |
|
|
|
suffix = "" |
|
|
|
if founder_name is not None: |
|
global founder_base |
|
if "translated_word" in cultivation_data: |
|
translated_word = cultivation_data["translated_word"] |
|
else: |
|
translated_word = cultivation_data["name_in_cultivation"] |
|
|
|
if founder_base.get_founder(translated_word) is None: |
|
if founder_name.strip() != "": |
|
founder_base.set_founder(translated_word, founder_name) |
|
suffix = f" 由道友 {founder_name} 发现" |
|
else: |
|
suffix = f" 由道友 {founder_base.get_founder(translated_word)} 发现" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
flag = True |
|
for index, item in enumerate(recent_generated_items): |
|
if item[0] == save_path: |
|
if "name_in_cultivation" in cultivation_data: |
|
description = cultivation_data["name_in_cultivation"] + suffix + "--" + description |
|
recent_generated_items[index] = (save_path, description) |
|
flag = False |
|
if flag: |
|
if "name_in_cultivation" in cultivation_data: |
|
description = cultivation_data["name_in_cultivation"] + suffix + "--" + description |
|
recent_generated_items.append((save_path, description)) |
|
|
|
return |
|
|
|
|
|
|
|
|
|
task_queue = queue.Queue() |
|
|
|
|
|
def worker(): |
|
while True: |
|
save_path, image_feature, backup_results, founder_base = task_queue.get() |
|
if save_path is None: |
|
break |
|
expensive_generating(save_path, image_feature, backup_results, game_master, founder_base) |
|
task_queue.task_done() |
|
|
|
|
|
thread = threading.Thread(target=worker, daemon=True) |
|
thread.start() |
|
|
|
|
|
|
|
def similarity2level( sim, max_val , min_val ): |
|
level_num = 3 |
|
level = int( (sim - min_val) / (max_val - min_val) * level_num ) |
|
level = max(0, min(level_num, level)) |
|
return level |
|
|
|
from src.get_comments_from_level import get_comments_from_level |
|
|
|
def is_empty_name( founder_name ): |
|
empty_names = ["","测试","鲁鲁道祖"] |
|
if founder_name.strip() in empty_names: |
|
return True |
|
|
|
|
|
def process_image(image, founder_name): |
|
|
|
os.makedirs('temp_images', exist_ok=True) |
|
|
|
prefix = "" |
|
|
|
founder_name = founder_name.strip() |
|
|
|
if is_empty_name(founder_name): |
|
prefix = "道友,请在下面留下您的尊姓大名,提供大量珍宝鉴定的道友,天机阁将在后面送出灵泉玉液(珍珠奶茶)。" |
|
|
|
try: |
|
|
|
|
|
random_name = ''.join(random.choices(string.ascii_lowercase + string.digits, k=12)) + '.jpg' |
|
save_path = os.path.join('temp_images', random_name) |
|
|
|
|
|
img = Image.fromarray(image) |
|
|
|
|
|
img.thumbnail((img.width, 480)) |
|
img.save(save_path) |
|
except: |
|
return "","",prefix + "道友你有什么物品要来鉴定吗?" |
|
|
|
search_result, backup_results, image_feature = game_master.search_with_path(save_path, threshold = 0) |
|
|
|
|
|
image_similarity = search_result['similarity'] |
|
print("image_similarity:", image_similarity) |
|
|
|
inbase_similarity_level = similarity2level(image_similarity, max_val=0.99, min_val=0.80) |
|
|
|
suffix = "" |
|
|
|
if image_similarity > game_master.minimal_image_threshold: |
|
result = search_result |
|
|
|
|
|
|
|
inlibrary_similarity_level = inbase_similarity_level |
|
|
|
if "translated_word" in search_result: |
|
translated_word = search_result["translated_word"] |
|
else: |
|
translated_word = search_result["name_in_cultivation"] |
|
|
|
find_founder = founder_base.get_founder(translated_word) |
|
|
|
if find_founder is None: |
|
suffix = "这个物品很早就在天机阁中记载了,道友还有什么物品要鉴定吗?" |
|
else: |
|
if find_founder == founder_name: |
|
suffix = "感谢道友送来这个物品进行鉴定,天机阁的天梯上已经记录了你的贡献。" |
|
else: |
|
suffix = f"这个物品是由道友{find_founder}发现的,道友还有什么物品要鉴定吗?" |
|
else: |
|
if len(backup_results) > 0 and "similarity" in backup_results[0]: |
|
text_similarity = backup_results[0]['similarity'] |
|
print("text_similarity:", text_similarity) |
|
inlibrary_similarity_level = similarity2level(text_similarity, max_val=0.79, min_val=0.35) |
|
|
|
result = backup_results[0] |
|
|
|
|
|
task_queue.put((save_path, image_feature, backup_results, founder_name)) |
|
|
|
suffix = "道友请移步天机阁内阁查询长老的鉴定结果。" |
|
|
|
if is_empty_name( founder_name ): |
|
suffix += "另外,只有留下姓名的道友,才能最终被记录在天梯获取天机阁奖励。" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
name = result["name_in_cultivation"] |
|
description_in_cultivation = result["description_in_cultivation"] |
|
|
|
comments = get_comments_from_level( inbase_similarity_level, inlibrary_similarity_level ) |
|
|
|
comments = prefix + comments.format(name=name) + suffix |
|
|
|
|
|
return name, description_in_cultivation, comments |
|
|
|
|
|
|
|
def refresh_recent_items(): |
|
image_paths = [] |
|
descriptions = [] |
|
|
|
|
|
for img_path, desc in reversed(recent_generated_items): |
|
img = Image.open(img_path) |
|
img.thumbnail((200, 200)) |
|
image_paths.append(img) |
|
descriptions.append(desc) |
|
return image_paths + descriptions |
|
|
|
|
|
example_images_dir = "datas/example_images" |
|
example_images = [os.path.join(example_images_dir, img) for img in os.listdir(example_images_dir) if img.endswith('.jpg')] |
|
|
|
|
|
TODO_list = """ |
|
# TODO |
|
|
|
- [ ] 增加天梯匹配系统 |
|
|
|
- [ ] 增加readme |
|
|
|
- [ ] 部署到gitee |
|
""" |
|
|
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("# 这个系统是智障,错把日常当修仙\n\nby [李鲁鲁](https://github.com/LC1332)") |
|
|
|
|
|
with gr.Tab("鉴定"): |
|
with gr.Row(): |
|
with gr.Column(scale = 1): |
|
image_input = gr.Image(label="上传图片") |
|
|
|
with gr.Column(scale = 2): |
|
comments = gr.Text("道友你有什么物品要来鉴定吗?", label = "") |
|
|
|
name_output = gr.Textbox(label="鉴定物品名称") |
|
|
|
description_output = gr.Textbox(label="物品描述") |
|
|
|
submit_button = gr.Button("鉴定") |
|
|
|
with gr.Row(): |
|
founder_name = gr.Textbox(label="道友尊姓大名?", interactive=True) |
|
|
|
image_input.upload(process_image, inputs=[image_input,founder_name], outputs=[name_output, description_output, comments]) |
|
submit_button.click(process_image, inputs=[image_input,founder_name], outputs=[name_output, description_output, comments]) |
|
|
|
|
|
|
|
gr.Examples(examples=example_images, inputs=image_input, label="选择一个示例图片") |
|
gr.Markdown(TODO_list) |
|
|
|
|
|
with gr.Tab("天机阁最新鉴定"): |
|
refresh_button = gr.Button("询问长老最新鉴定") |
|
recent_images = [] |
|
recent_descriptions = [] |
|
|
|
for i in range(4): |
|
|
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
recent_image = gr.Image(label=f"物品 { i * 4 + 1}") |
|
recent_description = gr.Textbox(label="描述", interactive=False) |
|
recent_images.append(recent_image) |
|
recent_descriptions.append(recent_description) |
|
with gr.Column(): |
|
recent_image = gr.Image(label=f"物品 { i * 4 + 2}") |
|
recent_description = gr.Textbox(label="描述", interactive=False) |
|
recent_images.append(recent_image) |
|
recent_descriptions.append(recent_description) |
|
with gr.Column(): |
|
recent_image = gr.Image(label=f"物品 { i * 4 + 3}") |
|
recent_description = gr.Textbox(label="描述", interactive=False) |
|
recent_images.append(recent_image) |
|
recent_descriptions.append(recent_description) |
|
|
|
|
|
refresh_button.click(refresh_recent_items, outputs=recent_images + recent_descriptions) |
|
|
|
with gr.Tab("贡献天梯"): |
|
refresh_ladder_button = gr.Button("刷新天梯") |
|
contribution_textboxes = [gr.Textbox(label=f"贡献者 {i + 1}", interactive=False) for i in range(20)] |
|
|
|
refresh_ladder_button.click(refresh_contribution_ladder, outputs=contribution_textboxes) |
|
|
|
|
|
demo.launch(share=True) |
|
|
|
|