up_fail / app.py
DmitrMakeev's picture
Update app.py
377d960 verified
from flask import Flask, request, render_template_string, render_template, jsonify, Response, send_from_directory, render_template
import sqlite3
import os
import random
import requests
import time
import re
import json
import logging
import csv
import io
from urllib.parse import quote, urlparse, parse_qsl, urlencode, parse_qs, unquote, urlencode
import string
from datetime import datetime
import pytz
import socket
from unidecode import unidecode
import uuid
import shutil
import psutil
import hashlib
import hmac
from base64 import b64encode
from collections import OrderedDict
from hashlib import sha256
from hmac import HMAC
from base import replace_null_with_empty_string
from webhook_handler import handle_webhook
# Замените на ваш реальный ключ Системы
api_key_sys = os.getenv('api_key_sys')
ALLOWED_ORIGIN = "https://diamonik7777-up-fail.hf.space"
# Глобальные переменные для хранения настроек
api_key_auth = ''
api_key_apps_vk = ''
vk_api_key = ''
vk_st_alone = ''
key_callback_vk = ''
senler_token = ''
wa_ak = ''
wa_api_key = ''
curators = ''
call_api_key = ''
# Замените на ваш реальный access_token СЕНДЛЕРА
import logging
logging.basicConfig(level=logging.DEBUG)
# Глобальная переменная для управления верификацией
current_curator_index = 0
verifikation_start = "1" # Глобальная переменная для управления верификацией
curator_on_off = "0" # Глобальная переменная для управления назначением куратора
# curators = ["Anna", "Ekaterina", "Ivan", "Maria", "Sergey", "Olga", "Alex", "Natalia", "Dmitry", "Elena"]
# Глобальная переменная для
wa_url = os.getenv('wa_url')
ws_url_mes = "/sendMessage/"
ws_url_ver = "/checkWhatsapp/"
app = Flask(__name__, template_folder="./")
app.config['DEBUG'] = True
UPLOAD_FOLDER = 'static'
UPLOAD_FOLDER_VK = 'static'
HTML_FOLDER = 'html'
HTML_FOLDER_VK = 'html'
# Создание директорий, если они не существуют
if not os.path.exists(UPLOAD_FOLDER):
os.makedirs(UPLOAD_FOLDER)
if not os.path.exists(UPLOAD_FOLDER_VK):
os.makedirs(UPLOAD_FOLDER_VK)
if not os.path.exists(HTML_FOLDER):
os.makedirs(HTML_FOLDER)
if not os.path.exists(HTML_FOLDER_VK):
os.makedirs(HTML_FOLDER_VK)
DATABASES = ['data_gc.db', 'site_data.db', 'ws_data.db', 'vk_data.db', 'tg_data.db', 'gk_data.db']
SETTINGS_DB = 'settings.db'
def init_db(db_name):
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS contacts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
phone TEXT,
email TEXT,
vk_id TEXT,
chat_id TEXT,
ws_st TEXT,
ws_stop TEXT,
web_st INTEGER,
fin_prog INTEGER,
b_city TEXT,
b_fin TEXT,
b_ban TEXT,
b_ign TEXT,
b_baners TEXT,
b_butt TEXT,
b_mess TEXT,
orders TEXT,
curator TEXT,
bonus TEXT,
shop_status TEXT,
answers TEXT,
quiz TEXT,
kassa TEXT,
gc_url TEXT,
key_pr TEXT,
n_con TEXT,
canal TEXT,
data_on TEXT,
data_t TEXT,
utm_source TEXT,
utm_medium TEXT,
utm_campaign TEXT,
utm_term TEXT,
utm_content TEXT,
gcpc TEXT
)
''')
conn.commit()
conn.close()
def init_settings_db():
conn = sqlite3.connect(SETTINGS_DB)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS settings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
api_key_auth TEXT,
api_key_apps_vk TEXT,
vk_api_key TEXT,
vk_st_alone TEXT,
key_callback_vk TEXT,
senler_token TEXT,
wa_ak TEXT,
wa_api_key TEXT,
curators TEXT,
call_api_key TEXT
)
''')
conn.commit()
conn.close()
for db in DATABASES:
init_db(db)
init_settings_db()
DATABASE_NEW = 'data_gc.db'
def load_settings():
global api_key_auth, api_key_apps_vk, vk_api_key, vk_st_alone, key_callback_vk
global senler_token, wa_ak, wa_api_key, curators, call_api_key
default_settings = {
'api_key_auth': '',
'api_key_apps_vk': '',
'vk_api_key': '',
'vk_st_alone': '',
'key_callback_vk': '',
'senler_token': '',
'wa_ak': '',
'wa_api_key': '',
'curators': '',
'call_api_key': ''
}
# Загрузка данных из базы
with sqlite3.connect(SETTINGS_DB) as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM settings')
settings = cursor.fetchone()
if settings is None:
settings = default_settings
else:
settings = {key: settings[i + 1] or '' for i, key in enumerate(default_settings)}
# Заполняем глобальные переменные
api_key_auth = settings['api_key_auth']
api_key_apps_vk = settings['api_key_apps_vk']
vk_api_key = settings['vk_api_key']
vk_st_alone = settings['vk_st_alone']
key_callback_vk = settings['key_callback_vk']
senler_token = settings['senler_token']
wa_ak = settings['wa_ak']
wa_api_key = settings['wa_api_key']
curators = settings['curators']
call_api_key = settings['call_api_key']
# Выводим значения в консоль
print(f"wa_api_key: {wa_api_key}")
print(f"wa_ak: {wa_ak}")
print(f"vk_api_key: {vk_api_key}")
print(f"vk_st_alone: {vk_st_alone}")
print(f"key_callback_vk: {key_callback_vk}")
print(f"senler_token: {senler_token}")
print(f"api_key_auth: {api_key_auth}")
print(f"api_key_apps_vk: {api_key_apps_vk}")
print(f"curators: {curators}")
print(f"call_api_key: {call_api_key}")
# Возвращаем настройки, чтобы использовать в API-роуте
return settings
# Запуск функции для инициализации глобальных переменных при старте сервера
load_settings()
def save_settings(settings_dict):
global api_key_auth, api_key_apps_vk, vk_api_key, vk_st_alone, key_callback_vk
global senler_token, wa_ak, wa_api_key, curators, call_api_key
# Удаляем api_key_sys из словаря перед сохранением
if 'api_key_sys' in settings_dict:
del settings_dict['api_key_sys']
conn = sqlite3.connect(SETTINGS_DB)
cursor = conn.cursor()
# Проверка существования записи
cursor.execute('SELECT id FROM settings LIMIT 1')
settings_exist = cursor.fetchone() is not None
if settings_exist:
# Обновляем запись
cursor.execute('''
UPDATE settings SET
api_key_auth = ?, api_key_apps_vk = ?, vk_api_key = ?, vk_st_alone = ?, key_callback_vk = ?,
senler_token = ?, wa_ak = ?, wa_api_key = ?, curators = ?, call_api_key = ?
''', (
settings_dict.get('api_key_auth', ''),
settings_dict.get('api_key_apps_vk', ''),
settings_dict.get('vk_api_key', ''),
settings_dict.get('vk_st_alone', ''),
settings_dict.get('key_callback_vk', ''),
settings_dict.get('senler_token', ''),
settings_dict.get('wa_ak', ''),
settings_dict.get('wa_api_key', ''),
settings_dict.get('curators', ''),
settings_dict.get('call_api_key', '')
))
else:
# Создаем новую запись
cursor.execute('''
INSERT INTO settings (
api_key_auth, api_key_apps_vk, vk_api_key, vk_st_alone, key_callback_vk, senler_token,
wa_ak, wa_api_key, curators, call_api_key
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
settings_dict.get('api_key_auth', ''),
settings_dict.get('api_key_apps_vk', ''),
settings_dict.get('vk_api_key', ''),
settings_dict.get('vk_st_alone', ''),
settings_dict.get('key_callback_vk', ''),
settings_dict.get('senler_token', ''),
settings_dict.get('wa_ak', ''),
settings_dict.get('wa_api_key', ''),
settings_dict.get('curators', ''),
settings_dict.get('call_api_key', '')
))
conn.commit()
conn.close()
# Обновляем глобальные переменные
api_key_auth = settings_dict.get('api_key_auth', '')
api_key_apps_vk = settings_dict.get('api_key_apps_vk', '')
vk_api_key = settings_dict.get('vk_api_key', '')
vk_st_alone = settings_dict.get('vk_st_alone', '')
key_callback_vk = settings_dict.get('key_callback_vk', '')
senler_token = settings_dict.get('senler_token', '')
wa_ak = settings_dict.get('wa_ak', '')
wa_api_key = settings_dict.get('wa_api_key', '')
curators = settings_dict.get('curators', '')
call_api_key = settings_dict.get('call_api_key', '')
# Выводим значения в консоль
print(f"wa_api_key: {wa_api_key}")
print(f"wa_ak: {wa_ak}")
print(f"vk_api_key: {vk_api_key}")
print(f"vk_st_alone: {vk_st_alone}")
print(f"key_callback_vk: {key_callback_vk}")
print(f"senler_token: {senler_token}")
print(f"api_key_auth: {api_key_auth}")
print(f"api_key_apps_vk: {api_key_apps_vk}")
print(f"curators: {curators}")
print(f"call_api_key: {call_api_key}")
@app.route('/load_settings', methods=['POST'])
def get_settings():
# Получаем ключ авторизации из запроса
client_api_key_sys = request.json.get('api_key_sys')
# Проверка ключа авторизации
if client_api_key_sys != os.getenv('api_key_sys'):
return jsonify({"error": "Unauthorized access"}), 403
# Загружаем настройки из базы данных
settings = load_settings()
return jsonify(settings)
@app.route('/save_settings', methods=['POST'])
def save_settings_route():
# Получаем ключ авторизации из запроса
client_api_key_sys = request.json.get('api_key_sys')
# Проверка ключа авторизации
if client_api_key_sys != os.getenv('api_key_sys'):
return jsonify({"error": "Unauthorized access"}), 403
data = request.json.get('data', {})
if data:
# Выводим полученные данные в консоль сервера
print("Received data from page:", data)
save_settings(data)
return jsonify({'status': 'success'})
else:
return jsonify({'status': 'error', 'message': 'No data provided'}), 400
@app.route('/set')
def index_set():
return render_template('settings.html')
# Имя базы данных
db_name = 'data_gc.db'
@app.route('/vk_webhook', methods=['POST'])
def vk_webhook():
data = request.json
# Логируем полученные данные для отладки
print(f"Received data: {data}")
# Обрабатываем вебхук с помощью функции из webhook_handler.py
response = handle_webhook(data, key_callback_vk, db_name, vk_st_alone)
# Возвращаем ответ ВКонтакте
return response
# Пример вызова функции handle_webhook с передачей vk_st_alone
def process_webhook(data):
response = handle_webhook(data, key_callback_vk, 'your_db_name.db', vk_st_alone)
print(response)
# Пример вызова функции process_webhook с данными из вебхука
# Этот код будет выполняться, когда ВКонтакте вызовет ваш вебхук
# data будет содержать данные, пришедшие из вебхука
# process_webhook(data)
mapping_template = {
"username": "name",
"phone": "phone",
"email": "email",
"city": "b_city",
"finished": "b_fin",
"ban": "b_ban",
"ignore": "b_ign",
"banners": "b_baners",
"buttons": "b_butt",
"messages": "b_mess"
}
def get_db_connection_user():
conn = sqlite3.connect(DATABASE_NEW)
conn.row_factory = sqlite3.Row
return conn
@app.route('/user_db_set', methods=['GET'])
def get_user():
# Проверка API-ключа
api_key_sys_control = request.args.get('api_sys')
if api_key_sys_control != api_key_sys:
return jsonify({"error": "Invalid API key"}), 403
email = request.args.get('email')
vk_id = request.args.get('vk_id')
phone = request.args.get('phone')
if not email and not vk_id and not phone:
return jsonify({"error": "Either email, vk_id, or phone must be provided"}), 400
conn = get_db_connection_user()
cursor = conn.cursor()
query = "SELECT * FROM contacts WHERE "
params = []
if email:
query += "email = ?"
params.append(email)
if email and (vk_id or phone):
query += " OR "
if vk_id:
query += "vk_id = ?"
params.append(vk_id)
if vk_id and phone:
query += " OR "
if phone:
query += "phone = ?"
params.append(phone)
cursor.execute(query, params)
user = cursor.fetchone()
conn.close()
if user:
user_dict = dict(user)
# Преобразование строки "orders" в JSON-объект
if 'orders' in user_dict and isinstance(user_dict['orders'], str):
try:
user_dict['orders'] = json.loads(user_dict['orders'])
except json.JSONDecodeError:
user_dict['orders'] = {} # Если не удалось преобразовать, устанавливаем пустой объект
# Преобразование строки "bonus" в JSON-объект
if 'bonus' in user_dict and isinstance(user_dict['bonus'], str):
try:
user_dict['bonus'] = json.loads(user_dict['bonus'])
except json.JSONDecodeError:
user_dict['bonus'] = {} # Если не удалось преобразовать, устанавливаем пустой объект
return jsonify(user_dict)
else:
return jsonify({"error": "User not found"}), 404
# Отдаем дату онлайн
@app.route('/get_current_time', methods=['GET'])
def get_current_time():
utc_now = datetime.utcnow()
msk_tz = pytz.timezone('Europe/Moscow')
msk_now = utc_now.replace(tzinfo=pytz.utc).astimezone(msk_tz)
current_time = msk_now.isoformat(timespec='microseconds')
return jsonify({'current_time': current_time})
# Функция для очистки номера телефона
def clean_phone_number_ss(phone_number):
return re.sub(r'\D', '', phone_number)
# Добавляем пользователя
mt_site = {
'name': 'name',
'phone': 'phone',
'email': 'email',
'utm_campaign': 'utm_campaign',
'utm_content': 'utm_content',
'utm_medium': 'utm_medium',
'utm_source': 'utm_source',
'utm_term': 'utm_term',
'gcpc': 'gcpc'
}
mt_vk = {
'name': 'name',
'phone': 'phone',
'email': 'email',
'vk_id': 'vk_id',
'utm_campaign': 'utm_campaign',
'utm_content': 'utm_content',
'utm_medium': 'utm_medium',
'utm_source': 'utm_source',
'utm_term': 'utm_term',
'gcpc': 'gcpc'
}
mt_tg = {
'name': 'name',
'phone': 'phone',
'email': 'email',
'chat_id': 'chat_id',
'utm_campaign': 'utm_campaign',
'utm_content': 'utm_content',
'utm_medium': 'utm_medium',
'utm_source': 'utm_source',
'utm_term': 'utm_term',
'gcpc': 'gcpc'
}
mt_gc = {
'name': 'name',
'phone': 'phone',
'email': 'email',
'vk_id': 'vk_id',
'gc_url': 'gc_url',
'utm_campaign': 'utm_campaign',
'utm_content': 'utm_content',
'utm_medium': 'utm_medium',
'utm_source': 'utm_source',
'utm_term': 'utm_term',
'gcpc': 'gcpc'
}
mt_pass = {
'name': 'name',
'phone': 'phone',
'email': 'email',
'kol': 'pr1',
'pr2': 'pr2',
'gen_pass': 'pr5',
'utm_campaign': 'utm_campaign',
'utm_content': 'utm_content',
'utm_medium': 'utm_medium',
'utm_source': 'utm_source',
'utm_term': 'utm_term',
'gcpc': 'gcpc'
}
tl_quest = {
'name': 'name',
'phone': 'phone',
'email': 'email',
'pr2': 'pr2',
'utm_campaign': 'utm_campaign',
'utm_content': 'utm_content',
'utm_medium': 'utm_medium',
'utm_source': 'utm_source',
'utm_term': 'utm_term',
'gcpc': 'gcpc'
}
mapp_templates = {
'site': mt_site,
'vk': mt_vk,
'tg': mt_tg,
'gc': mt_gc,
'tilda': mt_pass,
'quest': tl_quest
}
def verify_phone_number(phone_number):
full_url_ver = f"{wa_url}{wa_ak}{ws_url_ver}{wa_api_key}"
payload = {"phoneNumber": phone_number}
headers = {'Content-Type': 'application/json'}
response = requests.post(full_url_ver, headers=headers, json=payload)
if response.status_code == 200:
response_body = response.json()
return response_body.get('existsWhatsapp', 'false')
else:
return "false"
def generate_password(length=8):
letters_and_digits = string.ascii_letters + string.digits
return ''.join(random.choice(letters_and_digits) for i in range(length))
def add_or_update_contact(contact_data, db_name):
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
email = contact_data.get('email')
if not email:
logging.error(f"Missing email in contact data: {contact_data}")
return
utc_now = datetime.utcnow()
msk_tz = pytz.timezone('Europe/Moscow')
msk_now = utc_now.replace(tzinfo=pytz.utc).astimezone(msk_tz)
contact_data['data_t'] = msk_now.strftime('%Y-%m-%d %H:%M:%S')
fields = [
'name', 'phone', 'email', 'vk_id', 'chat_id', 'ws_st', 'ws_stop', 'web_st', 'fin_prog',
'b_city', 'b_fin', 'b_ban', 'b_ign', 'b_baners', 'b_butt', 'b_mess', 'orders', 'curator',
'bonus', 'shop_status', 'answers', 'quiz', 'kassa', 'gc_url', 'key_pr', 'n_con', 'canal', 'data_on', 'data_t', 'utm_source', 'utm_medium', 'utm_campaign', 'utm_term', 'utm_content', 'gcpc'
]
for field in fields:
if field not in contact_data:
contact_data[field] = ''
cursor.execute("SELECT id FROM contacts WHERE email = ?", (email,))
contact = cursor.fetchone()
if contact:
update_fields = [f"{field} = ?" for field in fields if contact_data[field] != '']
update_values = [contact_data[field] for field in fields if contact_data[field] != '']
update_values.append(contact[0])
update_query = f"UPDATE contacts SET {', '.join(update_fields)} WHERE id = ?"
cursor.execute(update_query, update_values)
else:
insert_query = f"INSERT INTO contacts ({', '.join(fields)}) VALUES ({', '.join(['?' for _ in fields])})"
cursor.execute(insert_query, tuple(contact_data[field] for field in fields))
conn.commit()
conn.close()
@app.route('/add_user_home', methods=['GET'])
def add_user_home():
global current_curator_index
veref_on_off = request.args.get('ver', '0')
curator_on_off = request.args.get('cur', '0')
db_name = request.args.get('db', 'data_gc.db') # Получаем имя базы данных из запроса
template_key = request.args.get('player', 'site')
mapping_template_cur = mapp_templates.get(template_key, mt_site)
user_data = {mapping_template_cur[key]: request.args.get(key, "") for key in mapping_template_cur}
logging.debug(f"Received data: {user_data}")
if curator_on_off == "1":
user_data['curator'] = curators[current_curator_index]
current_curator_index = (current_curator_index + 1) % len(curators)
else:
user_data['curator'] = user_data.get('curator', '')
if veref_on_off == "1":
phone_number = user_data.get('phone', '')
if not phone_number:
logging.error("Phone number is empty")
return jsonify({'status': 'error', 'message': 'Phone number is empty'}), 400
phone_verification_response = verify_phone_number(phone_number)
if phone_verification_response is not None:
user_data['ws_st'] = '1' if phone_verification_response else '0'
else:
user_data['ws_st'] = user_data.get('ws_st', '')
try:
add_or_update_contact(user_data, db_name)
return jsonify({'status': 'success', 'message': f'User added {user_data.get("curator", "not assigned")}'})
except Exception as e:
logging.error(f"Error adding user: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@app.route('/user', methods=['GET'])
def user():
global current_curator_index
veref_on_off = request.args.get('ver', '0')
curator_on_off = request.args.get('cur', '0')
db_name = request.args.get('db', 'data_gc.db') # Получаем имя базы данных из запроса
template_key = request.args.get('player', 'site')
mapping_template_cur = mapp_templates.get(template_key, mt_site)
user_data = {mapping_template_cur[key]: request.args.get(key, "") for key in mapping_template_cur}
logging.debug(f"Received data: {user_data}")
if curator_on_off == "1":
user_data['curator'] = curators[current_curator_index]
current_curator_index = (current_curator_index + 1) % len(curators)
else:
user_data['curator'] = user_data.get('curator', '')
if veref_on_off == "1":
phone_number = user_data.get('phone', '')
if not phone_number:
logging.error("Phone number is empty")
return jsonify({'status': 'error', 'message': 'Phone number is empty'}), 400
phone_verification_response = verify_phone_number(phone_number)
if phone_verification_response is not None:
user_data['ws_st'] = '1' if phone_verification_response else '0'
else:
user_data['ws_st'] = user_data.get('ws_st', '')
try:
add_or_update_contact(user_data, db_name)
return jsonify({'status': 'success', 'message': f'User added {user_data.get("curator", "not assigned")}'})
except Exception as e:
logging.error(f"Error adding user: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@app.route('/add_user_mess', methods=['GET'])
def add_user_mess():
global current_curator_index
veref_on_off = request.args.get('ver', '0')
curator_on_off = request.args.get('cur', '0')
db_name = request.args.get('db', 'site_data.db') # Получаем имя базы данных из запроса
template_key = request.args.get('player', 'site')
mapping_template_cur = mapp_templates.get(template_key, mt_site)
user_data = {mapping_template_cur[key]: request.args.get(key, "") for key in mapping_template_cur}
logging.debug(f"Received data: {user_data}")
if curator_on_off == "1":
user_data['curator'] = curators[current_curator_index]
current_curator_index = (current_curator_index + 1) % len(curators)
else:
user_data['curator'] = user_data.get('curator', '')
if veref_on_off == "1":
phone_number = user_data.get('phone', '')
if not phone_number:
logging.error("Phone number is empty")
return jsonify({'status': 'error', 'message': 'Phone number is empty'}), 400
phone_verification_response = verify_phone_number(phone_number)
if phone_verification_response is not None:
user_data['ws_st'] = '1' if phone_verification_response else '0'
else:
user_data['ws_st'] = user_data.get('ws_st', '')
try:
add_or_update_contact(user_data, db_name)
return jsonify({'status': 'success', 'message': f'User added {user_data.get("curator", "not assigned")}'})
except Exception as e:
logging.error(f"Error adding user: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
# Работа с ордером из сайта
@app.route('/wr_order', methods=['GET','POST'])
def shop_order_new():
try:
logging.debug("Starting shop_order_new")
api_sys_control = request.args.get('api_sys')
if api_sys_control != api_key_sys:
logging.warning("Unauthorized access attempt")
return json.dumps({"error": "Unauthorized access"}), 403
name = request.args.get('name', '')
email = request.args.get('email', '')
phone = request.args.get('phone', '').lstrip('+')
order = request.args.get('order', '')
status = request.args.get('status', '')
del_flag = request.args.get('del', '')
if not email or not phone:
logging.error("Email and phone are required")
return json.dumps({"error": "Email and phone are required"}), 400
phone = clean_phone_number_ss(phone)
conn = sqlite3.connect(DATABASE_NEW)
cursor = conn.cursor()
cursor.execute("SELECT * FROM contacts WHERE email = ? OR phone = ?", (email, phone))
result = cursor.fetchone()
if result:
shop_st = result[17] if result[17] else '{}'
shop_st_data = json.loads(shop_st)
logging.debug(f"Existing record found. Loaded JSON: {shop_st_data}")
else:
shop_st_data = {}
if del_flag == '1':
if order in shop_st_data:
del shop_st_data[order]
elif order and status:
shop_st_data[order] = status
shop_st_json = json.dumps(shop_st_data)
utc_now = datetime.utcnow()
msk_tz = pytz.timezone('Europe/Moscow')
msk_now = utc_now.replace(tzinfo=pytz.utc).astimezone(msk_tz)
data_on = msk_now.strftime('%Y-%m-%d %H:%M:%S')
columns_to_update = ['name', 'phone', 'email', 'orders', 'data_on']
values_to_update = [name, phone, email, shop_st_json, data_on]
if result:
set_clause = ', '.join([f"{col} = ?" for col in columns_to_update])
query = f"UPDATE contacts SET {set_clause} WHERE email = ? OR phone = ?"
cursor.execute(query, values_to_update + [email, phone])
else:
query = f"INSERT INTO contacts ({', '.join(columns_to_update)}) VALUES ({', '.join(['?' for _ in columns_to_update])})"
cursor.execute(query, values_to_update)
conn.commit()
replace_null_with_empty_string(conn)
conn.close()
return json.dumps(shop_st_data), 200
except Exception as e:
logging.error(f"An error occurred: {str(e)}")
return json.dumps({"error": str(e)}), 500
# Данные формы из ВК
@app.route('/wr_form_vk', methods=['POST'])
def wr_form_vk():
try:
logging.debug("Starting wr_form_vk")
# Читаем параметры из POST-запроса (формы)
api_sys_control = request.form.get('api_sys')
if api_sys_control != api_key_sys:
logging.warning("Unauthorized access attempt")
return json.dumps({"error": "Unauthorized access"}), 403
name = request.form.get('name', '')
email = request.form.get('email', '')
phone = request.form.get('phone', '').lstrip('+')
vkid = request.form.get('vk_id', '')
if not email or not phone:
logging.error("Email and phone are required")
return json.dumps({"error": "Email and phone are required"}), 400
phone = clean_phone_number_ss(phone)
conn = sqlite3.connect(DATABASE_NEW)
cursor = conn.cursor()
# Ищем по email, phone или vk_id
cursor.execute("SELECT * FROM contacts WHERE email = ? OR phone = ? OR vk_id = ?", (email, phone, vkid))
result = cursor.fetchone()
columns_to_update = ['name', 'phone', 'email', 'vk_id']
values_to_update = [name, phone, email, vkid]
if result:
set_clause = ', '.join([f"{col} = ?" for col in columns_to_update])
query = f"UPDATE contacts SET {set_clause} WHERE email = ? OR phone = ? OR vk_id = ?"
cursor.execute(query, values_to_update + [email, phone, vkid])
else:
query = f"INSERT INTO contacts ({', '.join(columns_to_update)}) VALUES ({', '.join(['?' for _ in columns_to_update])})"
cursor.execute(query, values_to_update)
conn.commit()
replace_null_with_empty_string(conn)
conn.close()
return json.dumps({"success": True}), 200
except Exception as e:
logging.error(f"An error occurred: {str(e)}")
return json.dumps({"error": str(e)}), 500
# Запись ордера по ключу и ВК ИД
@app.route('/set_order_vk', methods=['POST'])
def set_order_vk():
try:
logging.debug("Starting wr_order_vk")
# Читаем параметры из POST-запроса
api_sys_control = request.form.get('api_sys')
if api_sys_control != api_key_sys:
logging.warning("Unauthorized access attempt")
return json.dumps({"error": "Unauthorized access"}), 403
vkid = request.form.get('vk_id', '')
order = request.form.get('order', '')
status = request.form.get('status', '')
del_flag = request.form.get('del', '')
n_con_flag = request.form.get('n_con', '')
if not vkid:
logging.error("VK ID is required")
return json.dumps({"error": "VK ID is required"}), 400
conn = sqlite3.connect(DATABASE_NEW)
cursor = conn.cursor()
cursor.execute("SELECT * FROM contacts WHERE vk_id = ?", (vkid,))
result = cursor.fetchone()
if result:
shop_st = result[17] if result[17] else '{}'
shop_st_data = json.loads(shop_st)
logging.debug(f"Existing record found. Loaded JSON: {shop_st_data}")
else:
shop_st_data = {}
if del_flag == '1':
if order in shop_st_data:
del shop_st_data[order]
elif order and status:
shop_st_data[order] = status
shop_st_json = json.dumps(shop_st_data)
utc_now = datetime.utcnow()
msk_tz = pytz.timezone('Europe/Moscow')
msk_now = utc_now.replace(tzinfo=pytz.utc).astimezone(msk_tz)
data_on = msk_now.strftime('%Y-%m-%d %H:%M:%S')
columns_to_update = ['vk_id', 'orders', 'n_con', 'data_on']
values_to_update = [vkid, shop_st_json, n_con_flag, data_on]
if result:
set_clause = ', '.join([f"{col} = ?" for col in columns_to_update])
query = f"UPDATE contacts SET {set_clause} WHERE vk_id = ?"
cursor.execute(query, values_to_update + [vkid])
else:
query = f"INSERT INTO contacts ({', '.join(columns_to_update)}) VALUES ({', '.join(['?' for _ in columns_to_update])})"
cursor.execute(query, values_to_update)
conn.commit()
replace_null_with_empty_string(conn)
conn.close()
return json.dumps(shop_st_data), 200
except Exception as e:
logging.error(f"An error occurred: {str(e)}")
return json.dumps({"error": str(e)}), 500
# Чтение ордера
@app.route('/get_order_monitop', methods=['GET'])
def get_order_monitop():
try:
logging.debug("Starting set_order_monitop")
# Читаем параметры из GET-запроса
vkid = request.args.get('vk_id', '')
order = request.args.get('order', '')
if not vkid or not order:
logging.error("VK ID and order are required")
return json.dumps({"error": "VK ID and order are required"}), 400
conn = sqlite3.connect(DATABASE_NEW)
cursor = conn.cursor()
# Ищем запись по vk_id
cursor.execute("SELECT orders FROM contacts WHERE vk_id = ?", (vkid,))
result = cursor.fetchone()
# Получаем текущую дату и время на сервере
utc_now = datetime.utcnow()
msk_tz = pytz.timezone('Europe/Moscow')
msk_now = utc_now.replace(tzinfo=pytz.utc).astimezone(msk_tz)
current_time = msk_now.isoformat(timespec='microseconds')
# Если запись по vk_id не найдена, возвращаем значение "not" для ордера
if not result:
logging.error(f"VK ID {vkid} not found")
response = {order: 'not', 'online_date': current_time}
return jsonify(response), 200
shop_st = result[0] if result[0] else '{}'
shop_st_data = json.loads(shop_st)
logging.debug(f"Existing record found. Loaded JSON: {shop_st_data}")
# Ищем значение по ключу order
value = shop_st_data.get(order, 'not')
# Возвращаем данные из столбца и текущую дату и время
response = {order: value, 'online_date': current_time}
return jsonify(response), 200
except Exception as e:
logging.error(f"An error occurred: {str(e)}")
return json.dumps({"error": str(e)}), 500
# Функция для валидации подписи ВК приложения ОБЩАЯ
def is_valid(*, query: dict, secret: str) -> bool:
"""Check VK Apps signature"""
# Отфильтровываем параметры, начинающиеся с "vk_"
vk_subset = OrderedDict(sorted((k, v) for k, v in query.items() if k.startswith("vk_")))
logging.debug(f"Filtered VK params: {vk_subset}")
# Объединяем параметры в строку
encoded_params = urlencode(vk_subset, doseq=True)
logging.debug(f"Encoded params: {encoded_params}")
# Вычисляем хеш-код с использованием HMAC и SHA256
hash_code = b64encode(HMAC(secret.encode(), encoded_params.encode(), sha256).digest())
decoded_hash_code = hash_code.decode('utf-8')[:-1].replace('+', '-').replace('/', '_')
logging.debug(f"Calculated signature: {decoded_hash_code}")
# Сравниваем с переданной подписью
return query.get("sign") == decoded_hash_code
# Функция для работы с базой данных
def get_order_from_db(vkid):
conn = sqlite3.connect(DATABASE_NEW)
cursor = conn.cursor()
# Ищем запись по vk_id
cursor.execute("SELECT orders FROM contacts WHERE vk_id = ?", (vkid,))
result = cursor.fetchone()
logging.debug(f"Database result: {result}")
# Если запись по vk_id не найдена, возвращаем значение "not" для ордера
if not result:
logging.error(f"VK ID {vkid} not found")
return None
shop_st = result[0] if result[0] else '{}'
logging.debug(f"Shop_st: {shop_st}")
shop_st_data = json.loads(shop_st)
logging.debug(f"Existing record found. Loaded JSON: {shop_st_data}")
return shop_st_data
# Чтение ордера по ключу и ВК ИД для приложения
@app.route('/get_order', methods=['POST'])
def get_order():
try:
logging.debug("Starting get_order")
# Читаем параметры из POST-запроса
vkid = request.form.get('vk_id', '')
order = request.form.get('order', '')
apps_id = request.form.get('apps_id', '') # Сюда придёт ИД ВК приложения
fullUrl = request.form.get('fullUrl', '') # Полный URL, который выдаёт ВКонтакте
logging.debug(f"Received data: vk_id={vkid}, order={order}, apps_id={apps_id}, fullUrl={fullUrl}")
# Преобразуем строку в JSON
try:
api_key_apps_vk_dict = json.loads(api_key_apps_vk)
except json.JSONDecodeError as e:
logging.error(f"Error decoding JSON: {e}")
return jsonify({"status": "invalid"}), 200
# Проверка подписи для приложения
if str(apps_id) not in api_key_apps_vk_dict: # Приводим apps_id к строке
logging.error("Invalid apps_id")
return json.dumps({"error": "Invalid apps_id"}), 400
secret = api_key_apps_vk_dict[str(apps_id)] # Приводим apps_id к строке
logging.debug(f"Using secret: {secret}")
# Парсим полный URL для получения параметров запроса
query_params = dict(parse_qsl(urlparse(fullUrl).query, keep_blank_values=True))
logging.debug(f"Query params for signature check: {query_params}")
# Проверяем подпись
if not is_valid(query=query_params, secret=secret):
logging.error("Invalid signature")
return json.dumps({"error": "Invalid signature"}), 400
# Получаем данные из базы данных
shop_st_data = get_order_from_db(vkid)
if not shop_st_data:
response = {order: 'not'}
return jsonify(response), 200
# Ищем значение по ключу order
value = shop_st_data.get(order, 'not')
logging.debug(f"Value for order {order}: {value}")
# Возвращаем данные из столбца
response = {order: value}
return jsonify(response), 200
except Exception as e:
logging.error(f"An error occurred: {str(e)}")
return json.dumps({"error": str(e)}), 500
# Функция для работы с базой данных - Чтение значения разрешена ли рассылка ВК
def get_lo_mess_from_db(vkid):
conn = sqlite3.connect(DATABASE_NEW)
cursor = conn.cursor()
# Ищем запись по vk_id
cursor.execute("SELECT canal FROM contacts WHERE vk_id = ?", (vkid,))
result = cursor.fetchone()
logging.debug(f"Database result: {result}")
# Если запись по vk_id не найдена, возвращаем None
if not result:
logging.error(f"VK ID {vkid} not found")
return None
canal_data = result[0] if result[0] else '{}'
logging.debug(f"Canal data: {canal_data}")
canal_data_json = json.loads(canal_data)
logging.debug(f"Existing record found. Loaded JSON: {canal_data_json}")
return canal_data_json
# Чтение значения разрешена ли рассылка ВК из базы
@app.route('/get_lo_mess', methods=['POST'])
def getlo_mess():
try:
logging.debug("Starting get_lo_mess")
# Читаем параметры из POST-запроса
vkid = request.form.get('vk_id', '')
grup_id = request.form.get('grup_id', '') # Сюда придёт номер сообщества, который является ключом для поиска
apps_id = request.form.get('apps_id', '') # Сюда придёт ИД ВК приложения
fullUrl = request.form.get('fullUrl', '') # Полный URL, который выдаёт ВКонтакте
logging.debug(f"Received data: vk_id={vkid}, grup_id={grup_id}, apps_id={apps_id}, fullUrl={fullUrl}")
# Преобразуем строку в JSON
try:
api_key_apps_vk_dict = json.loads(api_key_apps_vk)
except json.JSONDecodeError as e:
logging.error(f"Error decoding JSON: {e}")
return jsonify({"status": "invalid"}), 200
# Проверка подписи для приложения
if str(apps_id) not in api_key_apps_vk_dict: # Приводим apps_id к строке
logging.error("Invalid apps_id")
return json.dumps({"error": "Invalid apps_id"}), 400
secret = api_key_apps_vk_dict[str(apps_id)] # Приводим apps_id к строке
logging.debug(f"Using secret: {secret}")
# Парсим полный URL для получения параметров запроса
query_params = dict(parse_qsl(urlparse(fullUrl).query, keep_blank_values=True))
logging.debug(f"Query params for signature check: {query_params}")
# Проверяем подпись
if not is_valid(query=query_params, secret=secret):
logging.error("Invalid signature")
return json.dumps({"error": "Invalid signature"}), 400
# Получаем данные из базы данных
canal_data_json = get_lo_mess_from_db(vkid)
if not canal_data_json:
response = {"status": "not"}
return jsonify(response), 200
# Ищем значение по ключу grup_id
value = canal_data_json.get(grup_id, 'not')
logging.debug(f"Value for grup_id {grup_id}: {value}")
# Возвращаем данные из столбца
response = {"status": value}
return jsonify(response), 200
except Exception as e:
logging.error(f"An error occurred: {str(e)}")
return json.dumps({"error": str(e)}), 500
# Работа с бонусами из сайта без VK_ID
@app.route('/wr_bonus', methods=['GET','POST'])
def shop_bonus_new():
try:
logging.debug("Starting shop_bonus_new")
api_sys_control = request.args.get('api_sys')
if api_sys_control != api_key_sys:
logging.warning("Unauthorized access attempt")
return json.dumps({"error": "Unauthorized access"}), 403
name = request.args.get('name', '')
email = request.args.get('email', '')
phone = request.args.get('phone', '').lstrip('+')
bonus = request.args.get('bonus', '')
status = request.args.get('status', '')
del_flag = request.args.get('del', '')
if not email or not phone:
logging.error("Email and phone are required")
return json.dumps({"error": "Email and phone are required"}), 400
phone = clean_phone_number_ss(phone)
conn = sqlite3.connect(DATABASE_NEW)
cursor = conn.cursor()
cursor.execute("SELECT * FROM contacts WHERE email = ? OR phone = ?", (email, phone))
result = cursor.fetchone()
if result:
bonus_st = result[19] if result[19] else '{}'
bonus_st_data = json.loads(bonus_st)
logging.debug(f"Existing record found. Loaded JSON: {bonus_st_data}")
else:
bonus_st_data = {}
if del_flag == '1':
if bonus in bonus_st_data:
del bonus_st_data[bonus]
elif bonus and status:
bonus_st_data[bonus] = status
bonus_st_json = json.dumps(bonus_st_data)
utc_now = datetime.utcnow()
msk_tz = pytz.timezone('Europe/Moscow')
msk_now = utc_now.replace(tzinfo=pytz.utc).astimezone(msk_tz)
data_on = msk_now.strftime('%Y-%m-%d %H:%M:%S')
columns_to_update = ['name', 'phone', 'email', 'bonus', 'data_on']
values_to_update = [name, phone, email, bonus_st_json, data_on]
if result:
set_clause = ', '.join([f"{col} = ?" for col in columns_to_update])
query = f"UPDATE contacts SET {set_clause} WHERE email = ? OR phone = ?"
cursor.execute(query, values_to_update + [email, phone])
else:
query = f"INSERT INTO contacts ({', '.join(columns_to_update)}) VALUES ({', '.join(['?' for _ in columns_to_update])})"
cursor.execute(query, values_to_update)
conn.commit()
replace_null_with_empty_string(conn)
conn.close()
return json.dumps(bonus_st_data), 200
except Exception as e:
logging.error(f"An error occurred: {str(e)}")
return json.dumps({"error": str(e)}), 500
# Добавление пользователя в группу СЕНДЛЕРА
@app.route('/add_user_senler', methods=['POST'])
def add_user_senler():
# Получаем параметры из POST-запроса, если параметр отсутствует, устанавливаем значение пустой строки
vk_user_id = request.form.get('vk_user_id', "")
vk_group_id = request.form.get('vk_group_id', "")
subscription_id = request.form.get('sub_id', "")
utm_source = request.form.get('utm_source', "")
utm_medium = request.form.get('utm_medium', "")
utm_campaign = request.form.get('utm_campaign', "")
utm_content = request.form.get('utm_content', "")
utm_term = request.form.get('utm_term', "")
# Формируем данные для запроса к API Senler
senler_data = {
'vk_user_id': vk_user_id,
'vk_group_id': vk_group_id,
'subscription_id': subscription_id,
'utm_source': utm_source,
'utm_medium': utm_medium,
'utm_campaign': utm_campaign,
'utm_content': utm_content,
'utm_term': utm_term,
'access_token': senler_token,
'v': 2
}
# Отправляем запрос к API Senler
response = requests.post('https://senler.ru/api/subscribers/add', data=senler_data)
# Проверяем успешность запроса
success = response.status_code == 200 and response.json().get('success', False)
# Возвращаем результат
return jsonify({
'success': success
})
# Добавление пользователя в группу СЕНДЛЕРА с данными
@app.route('/add_user_senler_full', methods=['POST'])
def add_user_senler_full():
# Получаем параметры из POST-запроса, если параметр отсутствует, устанавливаем значение пустой строки
vk_user_id = request.form.get('vk_user_id', "")
vk_group_id = request.form.get('vk_group_id', "")
subscription_id = request.form.get('sub_id', "")
utm_source = request.form.get('utm_source', "")
utm_medium = request.form.get('utm_medium', "")
utm_campaign = request.form.get('utm_campaign', "")
utm_content = request.form.get('utm_content', "")
utm_term = request.form.get('utm_term', "")
name = request.form.get('name', "")
email = request.form.get('email', "")
phone = request.form.get('phone', "")
utms = request.form.get('utms', "")
# Формируем данные для запроса к API Senler для добавления пользователя
add_data = {
'vk_user_id': vk_user_id,
'vk_group_id': vk_group_id,
'subscription_id': subscription_id,
'utm_source': utm_source,
'utm_medium': utm_medium,
'utm_campaign': utm_campaign,
'utm_content': utm_content,
'utm_term': utm_term,
'access_token': senler_token,
'v': 2
}
# Отправляем запрос к API Senler для добавления пользователя
add_response = requests.post('https://senler.ru/api/subscribers/add', data=add_data)
print("Add Data:", add_data)
print("Add User Error Response:", add_response.json())
# Проверяем успешность добавления пользователя
if add_response.json().get('success'):
# Формируем данные для запроса к API Senler для установки name
name_data = {
'vk_user_id': vk_user_id,
'vk_group_id': vk_group_id,
'name': 'gb_name',
'value': name,
'access_token': senler_token,
'v': 2
}
# Формируем данные для запроса к API Senler для установки email
email_data = {
'vk_user_id': vk_user_id,
'vk_group_id': vk_group_id,
'name': 'gb_email',
'value': email,
'access_token': senler_token,
'v': 2
}
# Формируем данные для запроса к API Senler для установки телефона
phone_data = {
'vk_user_id': vk_user_id,
'vk_group_id': vk_group_id,
'name': 'gb_phone',
'value': phone,
'access_token': senler_token,
'v': 2
}
# Отправляем запрос к API Senler для установки name
name_response = requests.post('https://senler.ru/api/vars/set', data=name_data)
# Отправляем запрос к API Senler для установки email
email_response = requests.post('https://senler.ru/api/vars/set', data=email_data)
# Отправляем запрос к API Senler для установки телефона
phone_response = requests.post('https://senler.ru/api/vars/set', data=phone_data)
# Возвращаем результат
return jsonify({
'add_response': add_response.json(),
'name_response': name_response.json(),
'email_response': email_response.json(),
'phone_response': phone_response.json()
})
else:
# Возвращаем ошибку добавления пользователя
return jsonify(add_response.json())
# Проверка групп СЕНДЛЕРА на рассылку
@app.route('/get_Lo_Mess_senler', methods=['POST'])
def get_Lo_Mess_senler():
try:
# Получаем параметры из POST-запроса, если параметр отсутствует, устанавливаем значение пустой строки
vk_user_id = request.form.get('vk_user_id', "")
vk_group_id = request.form.get('vk_group_id', "") # Добавляем параметр vk_group_id
subscription_id = request.form.get('sub_id', "")
# Проверяем, что все необходимые параметры переданы
if not vk_user_id or not vk_group_id or not subscription_id:
return jsonify({"status": "error", "message": "Missing required parameters"}), 400
payload = {
"vk_user_id": [vk_user_id],
'vk_group_id': vk_group_id,
"access_token": senler_token,
"v": 2
}
# Выводим данные запроса для отладки
logging.debug(f"Request payload: {payload}")
# Выполняем запрос к API Senler
response = requests.post('https://senler.ru/api/subscribers/get', data=payload)
# Проверяем статус ответа
if response.status_code != 200:
logging.error(f"Failed to fetch data from Senler API: {response.status_code} - {response.text}")
return jsonify({"status": "error", "message": "Failed to fetch data from Senler API"}), 500
# Парсим ответ
data = response.json()
# Выводим полный ответ от сервера Senler в консоль
logging.debug(f"Senler API response: {data}")
# Проверяем, что ответ содержит данные
if not data.get('success'):
return jsonify({"status": "error", "message": "Failed to fetch data from Senler API"}), 500
# Проверяем, что пользователь подписан на указанную группу
user_subscriptions = data.get('items', [])
if not user_subscriptions:
return jsonify({"status": "not"}), 200
for user in user_subscriptions:
subscriptions = user.get('subscriptions', [])
for sub in subscriptions:
if sub.get('subscription_id') == int(subscription_id):
return jsonify({"status": "1"}), 200
# Если группа не найдена
return jsonify({"status": "not"}), 200
except Exception as e:
logging.error(f"An error occurred: {str(e)}")
return jsonify({"status": "error", "message": str(e)}), 500
# Отписка пользователя от группы СЕНДЛЕРА
@app.route('/del_user_senler', methods=['POST'])
def del_user_senler():
try:
# Получаем параметры из POST-запроса, если параметр отсутствует, устанавливаем значение пустой строки
vk_user_id = request.form.get('vk_user_id', "")
vk_group_id = request.form.get('vk_group_id', "") # Добавляем параметр vk_group_id
subscription_id = request.form.get('sub_id', "")
# Проверяем, что все необходимые параметры переданы
if not vk_user_id or not vk_group_id or not subscription_id:
return jsonify({"status": "error", "message": "Missing required parameters"}), 400
# Преобразуем параметры в строки, если это необходимо
vk_user_id = int(vk_user_id)
vk_group_id = int(vk_group_id)
subscription_id = int(subscription_id)
payload = {
"vk_user_id": vk_user_id,
'vk_group_id': vk_group_id,
"subscription_id": subscription_id,
"access_token": senler_token,
"v": 2
}
# Выводим данные запроса для отладки
logging.debug(f"Request payload: {payload}")
# Выполняем запрос к API Senler для удаления подписки
response = requests.post('https://senler.ru/api/subscribers/del', data=payload)
# Проверяем статус ответа
if response.status_code != 200:
logging.error(f"Failed to delete subscription from Senler API: {response.status_code} - {response.text}")
return jsonify({"status": "error", "message": "Failed to delete subscription from Senler API"}), 500
# Парсим ответ
data = response.json()
# Выводим полный ответ от сервера Senler в консоль
logging.debug(f"Senler API response: {data}")
# Проверяем, что ответ содержит данные
if not data.get('success'):
return jsonify({"status": "error", "message": "Failed to delete subscription from Senler API"}), 500
# Возвращаем успешный ответ
return jsonify({"status": "success", "message": "Subscription deleted successfully"}), 200
except Exception as e:
logging.error(f"An error occurred: {str(e)}")
return jsonify({"status": "error", "message": str(e)}), 500
# ЗАПИСЫВАЕМ ДАННЫЕ В ВК СТОРИДЖ
# Функция для обработки ошибок
def error_finish(message):
return jsonify({'success': False, 'error_type': 'Setup error', 'error_msg': message}), 400
# Функция для отправки запроса
def sky_request(url, params):
response = requests.post(url, data=params)
return response.json()
@app.route('/vk_s_set', methods=['POST'])
def vk_s_set():
# Читаем контрольную переменную
api_sys_control = request.form.get('api_sys')
if api_sys_control != api_key_sys:
return "EUR 22", 200
# Читаем параметры
vk_id = request.form.get('vk_id', '')
vk_key = request.form.get('vk_key', '')
vk_value = request.form.get('vk_value', '')
# Проверка на наличие обязательных параметров
if not vk_id or not vk_key:
return "Missing required parameters", 400
# Формируем URL для вызова метода storage.set
url = f"https://api.vk.com/method/storage.set"
params = {
'access_token': vk_api_key,
'v': '5.131', # Версия API
'key': vk_key,
'value': vk_value,
'user_id': vk_id
}
# Выполняем запрос к API ВКонтакте
data = sky_request(url, params)
# Обрабатываем результат
if 'response' in data and data['response'] == 1:
return jsonify({vk_key: vk_value}), 200
elif 'error' in data:
error_code = data['error']['error_code']
error_msg = data['error']['error_msg']
return jsonify({"error": f"Error {error_code}: {error_msg}"}), 400
else:
return jsonify({"error": "Unknown error"}), 500
# ЗАПИСЫВАЕМ ДАННЫЕ В ВК СТОРИДЖ
@app.route('/vk_s_get', methods=['GET','POST'])
def vk_s_get():
api_sys_control = request.args.get('api_sys')
if api_sys_control != api_key_sys:
return "EUR 22", 200
# Читаем параметры
vk_id = request.args.get('vk_id', '')
vk_key = request.args.get('vk_key', '')
# Проверка на наличие обязательных параметров
if not vk_id or not vk_key:
return "Missing required parameters", 400
# Формируем URL для вызова метода storage.get
url = f"https://api.vk.com/method/storage.get"
params = {
'access_token': vk_api_key,
'v': '5.131', # Версия API
'key': vk_key,
'user_id': vk_id
}
# Выполняем запрос к API ВКонтакте
response = requests.get(url, params=params)
data = response.json()
# Обрабатываем результат
if 'response' in data and data['response']:
value = data['response'][0]['value']
return jsonify({vk_key: value}), 200
elif 'error' in data:
error_code = data['error']['error_code']
error_msg = data['error']['error_msg']
return f"Error {error_code}: {error_msg}", 400
else:
return "Unknown error", 500
# Поднятие страницы с таблицей
@app.route('/data_gc_tab', methods=['GET'])
def data_gc_tab():
api_sys_control = request.args.get('api_sys')
if api_sys_control != api_key_sys:
return "EUR 22", 200
return render_template('data_gc_tab.html')
# Данные в таблицу
@app.route('/data_gc_tab_out', methods=['GET'])
def data_gc_tab_out():
try:
api_sys_control = request.args.get('api_sys')
if api_sys_control != api_key_sys:
return "EUR 22", 200
conn = sqlite3.connect('data_gc.db')
cursor = conn.cursor()
cursor.execute('''
SELECT id, name, phone, email, vk_id, chat_id, ws_st, ws_stop, web_st, fin_prog,
b_city, b_fin, b_ban, b_ign, b_baners, b_butt, b_mess, orders, curator,
bonus, shop_status, answers, quiz, kassa, gc_url, key_pr, n_con, canal, data_on, data_t, utm_source, utm_medium, utm_campaign, utm_term, utm_content, gcpc
FROM contacts
''')
contacts = cursor.fetchall()
conn.close()
contacts_json = [{
'id': contact[0], 'name': contact[1], 'phone': contact[2], 'email': contact[3],
'vk_id': contact[4], 'chat_id': contact[5], 'ws_st': contact[6], 'ws_stop': contact[7],
'web_st': contact[8], 'fin_prog': contact[9], 'b_city': contact[10], 'b_fin': contact[11],
'b_ban': contact[12], 'b_ign': contact[13], 'b_baners': contact[14], 'b_butt': contact[15],
'b_mess': contact[16], 'orders': contact[17], 'curator': contact[18], 'bonus': contact[19],
'shop_status': contact[20], 'answers': contact[21], 'quiz': contact[22], 'kassa': contact[23],
'gc_url': contact[24], 'key_pr': contact[25], 'n_con': contact[26], 'canal': contact[27],'data_on': contact[28],
'data_t': contact[29],'utm_source': contact[30], 'utm_medium': contact[31], 'utm_campaign': contact[32],
'utm_term': contact[33], 'utm_content': contact[34], 'gcpc': contact[34]
} for contact in contacts]
return jsonify(contacts_json), 200
except Exception as e:
error_message = f"Error getting data from data_gc: {e}"
print(error_message)
return error_message, 500
# Поднимаем страницу обновления базы
@app.route('/biz_v', methods=['GET'])
def biz_v():
api_sys_control = request.args.get('api_sys')
if api_sys_control != api_key_sys:
return "EUR 22", 200
return render_template('biz_v.html')
# ОБНОВЛЯЕМ CSV-файла
DATABASE2 = 'data_gc.db'
def parse_csv_data(data):
parsed_data = []
for item in data:
for key, value in item.items():
headers = key.split(';')
row = value.split(';')
parsed_data.append(dict(zip(headers, row)))
return parsed_data
def insert_data(data, verify_phone, add_curator):
global current_curator_index
with sqlite3.connect(DATABASE2) as conn:
cursor = conn.cursor()
for row in data:
name = row.get('Name', '')
phone = row.get('Phone', '').lstrip('+')
email = row.get('Email', '')
data_t = row.get('Date', '').strip('"')
cursor.execute("SELECT 1 FROM contacts WHERE email = ? OR phone = ?", (email, phone))
user_exists = cursor.fetchone()
if user_exists:
print(f"User with email {email} or phone {phone} already exists. Skipping insert.")
continue
if add_curator == "1":
curator = curators[current_curator_index]
current_curator_index = (current_curator_index + 1) % len(curators)
else:
curator = row.get('curator', '')
if verify_phone == "1":
ws_st = verify_phone_number(phone)
else:
ws_st = row.get('ws_st', '')
columns = ['name', 'phone', 'email', 'vk_id', 'chat_id', 'ws_st', 'ws_stop', 'web_st', 'fin_prog', 'b_city', 'b_fin', 'b_ban', 'b_ign', 'b_baners', 'b_butt', 'b_mess', 'orders', 'curator', 'bonus', 'shop_status', 'answers', 'quiz', 'kassa', 'gc_url', 'key_pr', 'n_con', 'canal', 'data_on', 'data_t', 'utm_source', 'utm_medium', 'utm_campaign', 'utm_term', 'utm_content', 'gcpc']
values = [name, phone, email, row.get('vk_id', ''), row.get('chat_id', ''), ws_st, row.get('ws_stop', ''), row.get('web_st', 0), row.get('fin_prog', 0), row.get('b_city', ''), row.get('b_fin', ''), row.get('b_ban', ''), row.get('b_ign', ''), row.get('b_baners', ''), row.get('b_butt', ''), row.get('b_mess', ''), row.get('orders', ''), curator, row.get('bonus', ''), row.get('shop_status', ''), row.get('answers', ''), row.get('quiz', ''), row.get('kassa', ''), row.get('gc_url', ''), row.get('key_pr', ''), row.get('n_con', ''), row.get('canal', ''), row.get('data_on', ''), row.get('data_t', ''), row.get('utm_source', ''), row.get('utm_medium', ''), row.get('utm_campaign', ''), row.get('utm_term', ''), row.get('utm_content', ''), row.get('gcpc', '')]
placeholders = ', '.join(['?' for _ in columns])
columns_str = ', '.join(columns)
query = f'''
INSERT INTO contacts ({columns_str})
VALUES ({placeholders})
'''
try:
cursor.execute(query, values)
except Exception as e:
print(f"Error inserting row: {row}")
print(f"Error message: {str(e)}")
conn.rollback()
raise
conn.commit()
@app.route('/upload_csv', methods=['POST'])
def upload_csv():
if 'file' not in request.files:
return jsonify({"error": "No file part"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"error": "No selected file"}), 400
if file and file.filename.endswith('.csv'):
stream = io.StringIO(file.stream.read().decode("UTF8"), newline=None)
csv_input = csv.DictReader(stream)
data = [row for row in csv_input]
parsed_data = parse_csv_data(data)
verify_phone = request.form.get('verify_phone', '0')
add_curator = request.form.get('add_curator', '0')
print(f"Verify Phone: {verify_phone}")
print(f"Add Curator: {add_curator}")
insert_data(parsed_data, verify_phone, add_curator)
return jsonify({"message": "Data uploaded and inserted successfully"})
return jsonify({"error": "Invalid file format"}), 400
# ОБНОВЛЯЕМ JSON-файла
DATABASE = 'data_gc.db'
# Функция для очистки номера телефона
def clean_phone_number_j(phone_number):
return re.sub(r'\D', '', phone_number)
# Функция для вставки данных в базу данных
def insert_data_j(data):
conn = sqlite3.connect(DATABASE) # Подключаемся к базе данных
cursor = conn.cursor()
for row in data:
name = row.get('name', '')
phone = row.get('phone', '').lstrip('+')
email = row.get('email', '')
data_t = row.get('data_t', '').strip('"')
# Очистка номера телефона
phone = clean_phone_number_j(phone)
cursor.execute("SELECT 1 FROM contacts WHERE email = ? OR phone = ?", (email, phone))
user_exists = cursor.fetchone()
if user_exists:
print(f"User with email {email} or phone {phone} already exists. Skipping insert.")
continue
columns = ['name', 'phone', 'email', 'vk_id', 'chat_id', 'ws_st', 'ws_stop', 'web_st', 'fin_prog', 'b_city', 'b_fin', 'b_ban', 'b_ign', 'b_baners', 'b_butt', 'b_mess', 'orders', 'curator', 'bonus', 'shop_status', 'answers', 'quiz', 'kassa', 'gc_url', 'key_pr', 'n_con', 'canal', 'data_on', 'data_t', 'utm_source', 'utm_medium', 'utm_campaign', 'utm_term', 'utm_content', 'gcpc']
values = [name, phone, email, row.get('vk_id', ''), row.get('chat_id', ''), row.get('ws_st', ''), row.get('ws_stop', ''), row.get('web_st', 0), row.get('fin_prog', 0), row.get('b_city', ''), row.get('b_fin', ''), row.get('b_ban', ''), row.get('b_ign', ''), row.get('b_baners', ''), row.get('b_butt', ''), row.get('b_mess', ''), row.get('orders', ''), row.get('curator', ''), row.get('bonus', ''), row.get('shop_status', ''), row.get('answers', ''), row.get('quiz', ''), row.get('kassa', ''), row.get('gc_url', ''), row.get('key_pr', ''), row.get('n_con', ''), row.get('canal', ''), row.get('data_on', ''), row.get('data_t', ''), row.get('utm_source', ''), row.get('utm_medium', ''), row.get('utm_campaign', ''), row.get('utm_term', ''), row.get('utm_content', ''), row.get('gcpc', '')]
placeholders = ', '.join(['?' for _ in columns])
columns_str = ', '.join(columns)
query = f'''
INSERT INTO contacts ({columns_str})
VALUES ({placeholders})
'''
try:
cursor.execute(query, values)
except Exception as e:
print(f"Error inserting row: {row}")
print(f"Error message: {str(e)}")
conn.rollback()
continue
conn.commit()
conn.close()
# Маршрут для загрузки JSON-файла
@app.route('/upload_json', methods=['POST'])
def upload_json():
if 'file' not in request.files:
return jsonify({"error": "No file part"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"error": "No selected file"}), 400
if file and file.filename.endswith('.json'):
data = json.load(file)
insert_data_j(data)
return jsonify({"message": "Data uploaded and inserted successfully"})
return jsonify({"error": "Invalid file format"}), 400
# ОБНОВЛЯЕМ Бизон 365
DATABASE_NAME = 'data_gc.db'
def update_or_insert_user(db_name, user_data, mapping_template):
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
# Получение email пользователя из данных
email = user_data.get('email')
if not email:
logging.error(f"User data missing email: {user_data}")
return
logging.debug(f"Processing user with email: {email}")
# Проверка существования пользователя в базе данных по email
cursor.execute("SELECT web_st, ws_st, b_mess FROM contacts WHERE email = ?", (email,))
user = cursor.fetchone()
logging.debug(f"User found: {user}")
# Вынесение увеличения значения web_st в отдельный блок
web_st_value = 1 # Инициализация значения web_st
if user:
# Проверка текущего значения web_st и его инкрементация
current_web_st = user[0] if user[0] is not None and user[0] != "" else 0
web_st_value = int(current_web_st) + 1
logging.debug(f"Calculated web_st_value: {web_st_value}")
# Обновление значения web_st
cursor.execute("UPDATE contacts SET web_st = ? WHERE email = ?", (web_st_value, email))
conn.commit()
conn.close()
logging.debug(f"User {email} web_st updated to {web_st_value}")
else:
conn.close()
logging.debug(f"User {email} not found, proceeding with insert")
# Открываем соединение снова для остальных операций
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
# Преобразование данных пользователя на основе шаблона сопоставления
transformed_data = {}
for json_key, db_column in mapping_template.items():
value = user_data.get(json_key, "")
if isinstance(value, list):
# Проверяем тип элементов списка
if all(isinstance(item, str) for item in value):
transformed_data[db_column] = "; ".join(value) # Сохраняем сообщения в строку
else:
logging.error(f"Expected list of strings for key {json_key}, but got: {value}")
transformed_data[db_column] = ""
else:
transformed_data[db_column] = str(value)
logging.debug(f"Transformed data: {transformed_data}")
# Заполнение обязательных полей значениями по умолчанию
required_fields = [
"vk_id", "chat_id", "ws_st", "ws_stop", "web_st", "fin_prog",
"b_city", "b_fin", "b_ban", "b_ign", "b_baners", "b_butt", "b_mess",
"orders", "curator", "bonus", "shop_status", "answers", "quiz", "kassa", "gc_url",
"key_pr", "n_con", "canal", "data_on", "data_t", 'utm_source', 'utm_medium', 'utm_campaign', 'utm_term', 'utm_content', 'gcpc'
]
for field in required_fields:
if field not in transformed_data:
transformed_data[field] = ""
logging.debug(f"Transformed data after adding required fields: {transformed_data}")
# Обработка номера телефона, если он есть
if 'phone' in user_data:
phone = user_data['phone']
if phone.startswith('+'):
phone = phone[1:]
transformed_data['phone'] = phone
logging.debug(f"Transformed data after phone processing: {transformed_data}")
# Добавление значения web_st в данные для вставки
transformed_data['web_st'] = web_st_value
# Обновление данных пользователя в базе данных
if user:
# Объединение новых сообщений с существующими
if 'b_mess' in transformed_data and user[2]:
transformed_data['b_mess'] = user[2] + "; " + transformed_data['b_mess']
update_query = "UPDATE contacts SET "
update_values = []
for column, value in transformed_data.items():
if column != 'ws_st' or not user[1]: # Проверка на наличие существующего ws_st
update_query += f"{column} = ?, "
update_values.append(value)
update_query = update_query.rstrip(", ") + " WHERE email = ?"
update_values.append(email)
logging.debug(f"Update query: {update_query} with values: {update_values}")
cursor.execute(update_query, update_values)
else:
columns = ', '.join(transformed_data.keys())
placeholders = ', '.join('?' for _ in transformed_data)
insert_query = f"INSERT INTO contacts ({columns}) VALUES ({placeholders})"
insert_values = list(transformed_data.values())
logging.debug(f"Insert query: {insert_query} with values: {insert_values}")
cursor.execute(insert_query, insert_values)
# Подтверждение изменений и закрытие соединения
conn.commit()
conn.close()
logging.debug(f"User with email {email} processed successfully")
@app.route('/send_request', methods=['POST'])
def send_request():
token = request.form.get('token')
min_date = request.form.get('minDate')
type = request.form.get('type')
url = f'https://online.bizon365.ru/api/v1/webinars/reports/getlist?minDate={min_date}&limit=100&type={type}'
response = requests.get(url, headers={'X-Token': token})
if response.status_code == 200:
data = response.json()
webinar_ids = [item['webinarId'] for item in data['list']]
return jsonify(webinar_ids)
else:
return jsonify({'error': 'Failed to fetch data from the API'}), response.status_code
@app.route('/send_get_request', methods=['GET'])
def send_get_request():
token = request.args.get('token')
webinarId = request.args.get('webinarId')
url = f'https://online.bizon365.ru/api/v1/webinars/reports/get?webinarId={webinarId}'
try:
response = requests.get(url, headers={'X-Token': token})
response.raise_for_status() # Проверка на ошибки HTTP
data = response.json()
# Убедитесь, что report существует в данных
if data is None or 'report' not in data:
return jsonify({'error': 'No report data found'}), 500
report = data.get('report', {})
messages = data.get('messages', {})
# Проверка на None перед использованием
if report is None:
return jsonify({'error': 'No report data found in the response'}), 500
report_json_str = report.get('report', '{}')
try:
report_json = json.loads(report_json_str)
except json.JSONDecodeError:
report_json = {}
messages_json_str = report.get('messages', '{}')
try:
messages_json = json.loads(messages_json_str)
except json.JSONDecodeError:
messages_json = {}
users_meta = report_json.get('usersMeta', {})
processed_emails = set()
for user_id, user_data in users_meta.items():
user_messages = messages_json.get(user_id, [])
user_data['messages'] = user_messages
email = user_data.get('email')
if email and email not in processed_emails:
update_or_insert_user(DATABASE_NAME, user_data, mapping_template)
processed_emails.add(email)
return jsonify({'status': 'User data saved successfully'})
except requests.exceptions.RequestException as e:
return jsonify({'error': f'API request failed: {str(e)}'}), 500
@app.route('/webhookbz', methods=['POST'])
def webhookbz():
api_sys_control = request.args.get('api_sys')
if api_sys_control != api_key_sys:
return "EUR 22", 200
data = request.json
webinar_id = data.get('webinarId')
if not webinar_id:
return jsonify({'error': 'webinarId is required'}), 400
url = f'https://online.bizon365.ru/api/v1/webinars/reports/get?webinarId={webinar_id}'
response = requests.get(url, headers={'X-Token': api_key_sys})
if response.status_code == 200:
data = response.json()
report = data.get('report', {})
messages = data.get('messages', {})
report_json_str = report.get('report', '{}')
try:
report_json = json.loads(report_json_str)
except json.JSONDecodeError:
report_json = {}
messages_json_str = report.get('messages', '{}')
try:
messages_json = json.loads(messages_json_str)
except json.JSONDecodeError:
messages_json = {}
users_meta = report_json.get('usersMeta', {})
processed_emails = set()
for user_id, user_data in users_meta.items():
user_messages = messages_json.get(user_id, [])
user_data['messages'] = user_messages
email = user_data.get('email')
if email and email not in processed_emails:
update_or_insert_user(DATABASE_NAME, user_data, mapping_template)
processed_emails.add(email)
return jsonify({'status': 'User data saved successfully'})
else:
return jsonify({'error': 'Failed to fetch data from the API'}), response.status_code
# Отправка в НС1 в раб. дни нужно поправить нас групп
@app.route('/add_ns', methods=['GET'])
def handle_in1():
name = request.args.get('name')
email = request.args.get('email')
phone = request.args.get('phone')
base_url = 'https://api.notisend.ru/v1'
token = request.args.get('token')
list_id = request.args.get('list_id')
phone_id = request.args.get('phone_id')
name_id = request.args.get('name_id')
# Проверка наличия всех необходимых параметров
if not all([name, email, phone, token, list_id, phone_id, name_id]):
return jsonify({'error': 'Missing required parameters'}), 400
# Отправляем запросы в три разных места
response_ns = send_ns(base_url, token, list_id, email, phone, name, phone_id, name_id)
# Возвращаем список ответов
return jsonify({'responses': [response_ns]})
@app.route('/ns_info', methods=['GET'])
def ns_info():
return render_template('ns_info.html')
@app.route('/api/group/<int:group_id>/parameters', methods=['GET'])
def get_group_parameters(group_id):
api_token = request.args.get('apiToken')
if not api_token:
return jsonify({'error': 'API Token is required'}), 400
url = f'https://api.notisend.ru/v1/email/lists/{group_id}/parameters'
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {api_token}'
}
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
data = response.json()
return jsonify(data)
except requests.RequestException as e:
return jsonify({'error': str(e)}), 500
@app.route('/upload', methods=['POST'])
def upload_file():
# Получаем ключ авторизации из запроса
api_sys_control = request.form.get('api_key_sys')
# Проверка ключа авторизации
if api_sys_control != api_key_sys:
return jsonify({"error": "Unauthorized access"}), 403
# Проверяем, что файл был отправлен
if 'file' not in request.files:
return jsonify({"error": "No file part"}), 400
file = request.files['file']
# Если пользователь не выбрал файл, браузер может отправить пустой файл без имени
if file.filename == '':
return jsonify({"error": "No selected file"}), 400
# Генерация уникального имени файла
unique_filename = str(uuid.uuid4()) + os.path.splitext(file.filename)[1]
save_path = os.path.join(UPLOAD_FOLDER, unique_filename)
file.save(save_path)
# Возвращаем полный URL загруженного файла с протоколом https
full_url = request.url_root.replace('http://', 'https://') + 'uploads/' + unique_filename
return jsonify({"message": "File uploaded successfully", "url": full_url}), 200
@app.route('/uploads/<filename>', methods=['GET'])
def uploaded_file(filename):
return send_from_directory(UPLOAD_FOLDER, filename)
@app.route('/up_fa', methods=['GET'])
def up_fa():
return render_template('up_fa.html')
@app.route('/upload_vk', methods=['POST'])
def upload_file_vk():
# Получаем ключ авторизации из запроса
api_sys_control = request.form.get('api_key_sys')
# Проверка ключа авторизации
if api_sys_control != api_key_sys:
return jsonify({"error": "Unauthorized access"}), 403
# Проверяем, что файл был отправлен
if 'file' not in request.files:
return jsonify({"error": "No file part"}), 400
file = request.files['file']
# Если пользователь не выбрал файл, браузер может отправить пустой файл без имени
if file.filename == '':
return jsonify({"error": "No selected file"}), 400
# Генерация уникального имени файла
unique_filename = str(uuid.uuid4()) + os.path.splitext(file.filename)[1]
save_path = os.path.join('static', unique_filename)
file.save(save_path)
# Возвращаем полный URL загруженного файла с протоколом https
full_url = request.url_root.replace('http://', 'https://') + 'uploads_vk/' + unique_filename
return jsonify({"message": "File uploaded successfully", "url": full_url}), 200
@app.route('/uploads_vk/<filename>', methods=['GET'])
def uploaded_file_vk(filename):
return send_from_directory('static', filename)
@app.route('/up_fa_vk', methods=['GET'])
def up_fa_vk():
return render_template('up_fa_vk.html')
@app.route('/up_page', methods=['POST'])
def upload_page():
# Получаем ключ авторизации из запроса
api_sys_control = request.form.get('api_key_sys')
# Проверка ключа авторизации
if api_sys_control != api_key_sys:
return jsonify({"error": "Unauthorized access"}), 403
if 'file' not in request.files:
return jsonify({"error": "No file part"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"error": "No selected file"}), 400
filename = request.form.get('filename')
if not filename:
return jsonify({"error": "Filename is required"}), 400
save_path = os.path.join(HTML_FOLDER, filename + '.html')
file.save(save_path)
# Возвращаем полный URL загруженного файла с протоколом https
full_url = request.url_root.replace('http://', 'https://') + filename
return jsonify({"message": "Page uploaded successfully", "url": full_url}), 200
@app.route('/<path:filename>', methods=['GET'])
def serve_html(filename):
if not filename.endswith('.html'):
filename += '.html'
return send_from_directory(HTML_FOLDER, filename)
@app.route('/up_page', methods=['GET'])
def up_page():
return render_template('up_page.html')
# Дублированный маршрут для загрузки страницы через POST-запрос
@app.route('/up_page_vk', methods=['POST'])
def upload_page_vk():
# Получаем ключ авторизации из запроса
api_sys_control = request.form.get('api_key_sys')
# Проверка ключа авторизации
if api_sys_control != api_key_sys:
return jsonify({"error": "Unauthorized access"}), 403
if 'file' not in request.files:
return jsonify({"error": "No file part"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"error": "No selected file"}), 400
filename = request.form.get('filename')
if not filename:
return jsonify({"error": "Filename is required"}), 400
save_path = os.path.join(HTML_FOLDER_VK, filename + '.html')
file.save(save_path)
# Возвращаем полный URL загруженного файла с протоколом https
full_url = request.url_root.replace('http://', 'https://') + 'page_vk/' + filename
return jsonify({"message": "Page uploaded successfully", "url": full_url}), 200
# Дублированный маршрут для обслуживания загруженных страниц
@app.route('/page_vk/<path:filename>', methods=['GET'])
def serve_html_vk(filename):
try:
# Получаем параметры из GET-запроса
apps_id = request.args.get('apps_id')
in_url = request.args.get('fullUrl')
# Проверяем, есть ли параметры
if not apps_id or not in_url:
return jsonify({"error": "Access denied"}), 400
# Декодируем URL перед парсингом
decoded_url = unquote(in_url)
# Получаем базовую часть URL (до знака вопроса)
parsed_url = urlparse(decoded_url)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
logging.debug(f"Decoded URL: {decoded_url}")
logging.debug(f"Parsed base URL: {base_url}")
# Парсим параметры из декодированного URL
query_params = dict(parse_qsl(parsed_url.query, keep_blank_values=True))
# Добавляем все возможные параметры, даже если они пустые
all_params = {
'vk_access_token_settings': query_params.get('vk_access_token_settings', ''),
'vk_app_id': query_params.get('vk_app_id', ''),
'vk_are_notifications_enabled': query_params.get('vk_are_notifications_enabled', ''),
'vk_is_app_user': query_params.get('vk_is_app_user', ''),
'vk_is_favorite': query_params.get('vk_is_favorite', ''),
'vk_language': query_params.get('vk_language', ''),
'vk_platform': query_params.get('vk_platform', ''),
'vk_ref': query_params.get('vk_ref', ''),
'vk_ts': query_params.get('vk_ts', ''),
'vk_user_id': query_params.get('vk_user_id', ''),
'vk_are_notifications_enabled': query_params.get('vk_are_notifications_enabled', ''),
'vk_chat_id': query_params.get('vk_chat_id', ''),
'vk_group_id': query_params.get('vk_group_id', ''),
'vk_has_profile_button': query_params.get('vk_has_profile_button', ''),
'vk_is_favorite': query_params.get('vk_is_favorite', ''),
'vk_is_recommended': query_params.get('vk_is_recommended', ''),
'vk_is_play_machine': query_params.get('vk_is_play_machine', ''),
'vk_is_widescreen': query_params.get('vk_is_widescreen', ''),
'vk_profile_id': query_params.get('vk_profile_id', ''),
'vk_testing_group_id': query_params.get('vk_testing_group_id', ''),
'vk_viewer_group_role': query_params.get('vk_viewer_group_role', ''),
'sign': query_params.get('sign', '')
}
# Очищаем параметры от пустых значений, кроме vk_access_token_settings
cleaned_params = {key: value for key, value in all_params.items() if value or key == 'vk_access_token_settings'}
# Формирование URL с использованием f-строк
fullUrl = f"{base_url}?{urlencode(cleaned_params)}"
logging.debug(f"Received params: fullUrl={fullUrl}")
# Преобразуем строку в JSON
try:
api_key_apps_vk_dict = json.loads(api_key_apps_vk)
except json.JSONDecodeError as e:
logging.error(f"Error decoding JSON: {e}")
return jsonify({"status": "invalid"}), 200
logging.debug(f"api_key_apps_vk_dict: {api_key_apps_vk_dict}")
# Проверка подписи для приложения
if str(apps_id) not in api_key_apps_vk_dict: # Приводим apps_id к строке
logging.error("Invalid apps_id")
return jsonify({"error": "Invalid apps_id"}), 400
secret = api_key_apps_vk_dict[str(apps_id)] # Приводим apps_id к строке
logging.debug(f"Using secret: {secret}")
# Парсим полный URL для получения параметров запроса
query_params = dict(parse_qsl(urlparse(fullUrl).query, keep_blank_values=True))
logging.debug(f"Query params for signature check: {query_params}")
# Проверяем подпись
if not is_valid(query=query_params, secret=secret):
logging.error("Invalid signature")
return jsonify({"error": "Invalid signature"}), 400
# Если верификация прошла успешно, отдаём файл
if not filename.endswith('.html'):
filename += '.html'
return send_from_directory(HTML_FOLDER_VK, filename)
except Exception as e:
logging.error(f"An error occurred: {str(e)}")
return jsonify({"error": str(e)}), 500
# Дублированный маршрут для отображения страницы загрузки
@app.route('/up_page_vk', methods=['GET'])
def up_page_vk():
return render_template('up_page_vk.html')
@app.route('/monitor', methods=['GET'])
def monitor():
# Получаем информацию о загруженных файлах
files = os.listdir(UPLOAD_FOLDER)
html_files = os.listdir(HTML_FOLDER)
# Получаем информацию о дисковом пространстве
total, used, free = shutil.disk_usage("/")
# Преобразуем байты в гигабайты для удобства чтения
total_gb = total // (2**30)
used_gb = used // (2**30)
free_gb = free // (2**30)
# Получаем информацию об использовании оперативной памяти
memory = psutil.virtual_memory()
memory_total_gb = memory.total // (2**30)
memory_used_gb = memory.used // (2**30)
memory_free_gb = memory.free // (2**30)
# Получаем информацию о количестве процессоров
cpu_count = psutil.cpu_count(logical=True)
return render_template('monitor.html',
uploaded_files=files,
uploaded_html_files=html_files,
disk_space={
'total': f"{total_gb} GB",
'used': f"{used_gb} GB",
'free': f"{free_gb} GB"
},
memory_usage={
'total': f"{memory_total_gb} GB",
'used': f"{memory_used_gb} GB",
'free': f"{memory_free_gb} GB"
},
cpu_count=cpu_count)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 7860)))