from gevent import pywsgi import dotenv dotenv.load_dotenv(override=True) import sys import time import argparse import uvicorn from typing import Union from pydantic import BaseModel from transformers import AutoTokenizer, AutoModelForSequenceClassification import torch import openedai import numpy as np import onnxruntime as ort import asyncio app = openedai.OpenAIStub() moderation = None device = "cpu" if torch.cuda.is_available() else "cpu" #device = "cpu" labels = ['hate', 'hate_threatening', 'harassment', 'harassment_threatening', 'self_harm', 'self_harm_intent', 'self_harm_instructions', 'sexual', 'sexual_minors', 'violence', 'violence_graphic', ] label2id = {l:i for i, l in enumerate(labels)} id2label = {i:l for i, l in enumerate(labels)} model_name = "/root/autodl-tmp/moderation_0703_deberta_v3_small_onnx" tokenizer = AutoTokenizer.from_pretrained(model_name) model = ort.InferenceSession(model_name + "/model.onnx") #model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=len(labels),id2label=id2label, label2id=label2id, problem_type = "multi_label_classification") torch.set_num_threads(1) class ModerationsRequest(BaseModel): model: str = "text-moderation-latest" # or "text-moderation-stable" input: Union[str, list[str]] @app.post("/v1/moderations") async def moderations(request: ModerationsRequest): results = { "id": f"modr-{int(time.time()*1e9)}", "model": "text-moderation-005", "results": [], } if isinstance(request.input, str): request.input = [request.input] thresholds = { "sexual": 0.1, "hate": 0.25, "harassment": 0.5, "self_harm": 0.25, "sexual_minors": 0.5, "hate_threatening": 0.2, "violence_graphic": 0.25, "self_harm_intent": 0.2, "self_harm_instructions": 0.25, "harassment_threatening": 0.1, "violence": 0.25, } for text in request.input: predictions = await predict(text, model, tokenizer) category_scores = {labels[i]: predictions[0][i].item() for i in range(len(labels))} detect = {key: score > thresholds[key] for key, score in category_scores.items()} detected = any(detect.values()) results['results'].append({ 'flagged': detected, 'categories': detect, 'category_scores': category_scores, }) return results def sigmoid(x): return 1/(1 + np.exp(-x)) def parse_args(argv): parser = argparse.ArgumentParser(description='Moderation API') parser.add_argument('--host', type=str, default='0.0.0.0') parser.add_argument('--port', type=int, default=5002) parser.add_argument('--test-load', action='store_true') return parser.parse_args(argv) async def predict(text, ort_session, tokenizer): # 编码输入数据 encoding = tokenizer.encode_plus( text, return_tensors='np' # 使用 NumPy tensors ) input_ids = encoding['input_ids'] attention_mask = encoding['attention_mask'] # 定义 ONNX Runtime 推理函数 def _predict(): # 准备 ONNX Runtime 输入 ort_inputs = { ort_session.get_inputs()[0].name: input_ids, ort_session.get_inputs()[1].name: attention_mask } # 进行推理 ort_outs = ort_session.run(None, ort_inputs) return torch.sigmoid(torch.from_numpy(ort_outs[0])) # 将输出转为 PyTorch Tensor 并应用 sigmoid # 在独立线程中运行 ONNX 推理 loop = asyncio.get_running_loop() predictions = await loop.run_in_executor(None, _predict) return predictions # Main if __name__ == "__main__": args = parse_args(sys.argv[1:]) # start API print(f'Starting moderations[{device}] API on {args.host}:{args.port}', file=sys.stderr) app.register_model('text-moderations-latest', 'text-moderations-stable') app.register_model('text-moderations-005', 'text-moderations-ifmain') if not args.test_load: uvicorn.run(app, host=args.host, port=args.port)