Spaces:
Building
Building
File size: 4,748 Bytes
7e8a8d2 913b6ab fccf15f 913b6ab bc665a5 0af7a49 913b6ab 0af7a49 913b6ab 0af7a49 913b6ab 0af7a49 913b6ab 0af7a49 913b6ab 0af7a49 9f52d4a f24e251 0d399a4 a47ff91 9f52d4a ebb0ff0 7e8a8d2 0d399a4 a47ff91 0d399a4 a47ff91 ebb0ff0 a47ff91 ebb0ff0 a47ff91 ebb0ff0 0d399a4 7e8a8d2 ebb0ff0 7e8a8d2 0d399a4 bc665a5 0d399a4 bc665a5 0d399a4 bc665a5 7e8a8d2 bc665a5 7e8a8d2 ebb0ff0 43a10de ebb0ff0 0d399a4 ebb0ff0 bc665a5 ebb0ff0 7e8a8d2 0d399a4 f24e251 0d399a4 f24e251 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
from flask import jsonify
import logging
import os
logger = logging.getLogger(__name__)
request_counts = {}
password = os.environ['ProxyPassword']
def authenticate_request(request):
auth_header = request.headers.get('Authorization')
if not auth_header:
return False, jsonify({'error': '缺少Authorization请求头'}), 401
try:
auth_type, pass_word = auth_header.split(' ', 1)
except ValueError:
return False, jsonify({'error': 'Authorization请求头格式错误'}), 401
if auth_type.lower() != 'bearer':
return False, jsonify({'error': 'Authorization类型必须为Bearer'}), 401
if pass_word != password:
return False, jsonify({'error': '未授权'}), 401
return True, None, None
def process_messages_for_gemini(messages, use_system_prompt=False):
gemini_history = []
errors = []
system_instruction_text = ""
is_system_phase = use_system_prompt
for i, message in enumerate(messages):
role = message.get('role')
content = message.get('content')
if isinstance(content, str):
if is_system_phase and role == 'system':
if system_instruction_text:
system_instruction_text += "\n" + content
else:
system_instruction_text = content
else:
is_system_phase = False
if role in ['user', 'system']:
role_to_use = 'user'
elif role == 'assistant':
role_to_use = 'model'
else:
errors.append(f"Invalid role: {role}")
continue
if gemini_history and gemini_history[-1]['role'] == role_to_use:
gemini_history[-1]['parts'].append({"text": content})
else:
gemini_history.append({"role": role_to_use, "parts": [{"text": content}]})
elif isinstance(content, list):
parts = []
for item in content:
if item.get('type') == 'text':
parts.append({"text": item.get('text')})
elif item.get('type') == 'image_url':
image_data = item.get('image_url', {}).get('url', '')
if image_data.startswith('data:image/'):
try:
mime_type, base64_data = image_data.split(';')[0].split(':')[1], image_data.split(',')[1]
parts.append({
"inline_data": {
"mime_type": mime_type,
"data": base64_data
}
})
except (IndexError, ValueError):
errors.append(f"Invalid data URI for image: {image_data}")
else:
errors.append(f"Invalid image URL format for item: {item}")
elif item.get('type') == 'file_url':
file_data = item.get('file_url', {}).get('url', '')
if file_data.startswith('data:'):
try:
mime_type, base64_data = file_data.split(';')[0].split(':')[1], file_data.split(',')[1]
parts.append({
"inline_data": {
"mime_type": mime_type,
"data": base64_data
}
})
except (IndexError, ValueError):
errors.append(f"Invalid data URI for file: {file_data}")
else:
errors.append(f"Invalid file URL format for item: {item}")
if parts:
if role in ['user', 'system']:
role_to_use = 'user'
elif role == 'assistant':
role_to_use = 'model'
else:
errors.append(f"Invalid role: {role}")
continue
if gemini_history and gemini_history[-1]['role'] == role_to_use:
gemini_history[-1]['parts'].extend(parts)
else:
gemini_history.append({"role": role_to_use, "parts": parts})
if errors:
return gemini_history, {"parts": [{"text": system_instruction_text}]}, (jsonify({'error': errors}), 400)
else:
return gemini_history, {"parts": [{"text": system_instruction_text}]}, None |