psy2 / app.py
DmitrMakeev's picture
Update app.py
668afb7 verified
raw
history blame
8.19 kB
from flask import Flask, request, render_template_string, send_from_directory, jsonify
from flask import render_template
import sqlite3
import os
import uuid
import json
import base64
import unittest
own_url = os.getenv('own_url')
key_d = os.getenv('gc_api')
test_url = os.getenv('gc_api')
import json
from datetime import datetime
import whatsapp_api_webhook_server_python.webhooksHandler as handler
app = Flask(__name__, template_folder="./")
app.config['DEBUG'] = True
UPLOAD_FOLDER = 'static'
# Создание директории, если она не существует
if not os.path.exists(UPLOAD_FOLDER):
os.makedirs(UPLOAD_FOLDER)
# Создание базы данных и таблицы
def init_db():
try:
conn = sqlite3.connect('data.db')
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS contacts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
phone TEXT NOT NULL,
email TEXT NOT NULL
)
''')
conn.commit()
conn.close()
except Exception as e:
print(f"Error initializing database: {e}")
# Вызов функции для инициализации базы данных
init_db()
@app.route('/settings', methods=['GET'])
def settings():
return render_template('settings.html')
@app.route('/online', methods=['GET'])
def onli():
return render_template('online.html')
@app.route('/ver', methods=['GET'])
def veref():
return render_template('ver.html')
@app.route('/se_mes', methods=['GET'])
def se_mes():
return render_template('se_mes.html')
@app.route('/se_mes_im', methods=['GET'])
def se_mes_im():
return render_template('se_mes_im.html')
@app.route('/se_mes_im2', methods=['GET'])
def se_mes_im2():
return render_template('se_mes_im2.html')
@app.route('/se_mes_f', methods=['GET'])
def se_mes_f():
return render_template('se_mes_f.html')
@app.route('/up_gr', methods=['GET'])
def up_gr():
return render_template('up_gr.html')
@app.route('/up_user_gp', methods=['GET'])
def up_user_gp():
return render_template('up_user_gp.html')
@app.route('/del_user_gp', methods=['GET'])
def del_user_gp():
return render_template('del_user_gp.html')
@app.route('/up_ad', methods=['GET'])
def up_ad():
return render_template('up_ad.html')
@app.route('/del_ad', methods=['GET'])
def del_ad():
return render_template('del_ad.html')
@app.route('/se_opr', methods=['GET'])
def se_opr():
return render_template('se_opr.html')
@app.route('/online', methods=['GET'])
def online():
return render_template('online.html')
@app.route('/upload', methods=['POST'])
def upload_file():
if 'file' not in request.files:
return "No file part", 400
file = request.files['file']
if file.filename == '':
return "No selected file", 400
# Генерация уникального имени файла
unique_filename = str(uuid.uuid4()) + os.path.splitext(file.filename)[1]
save_path = os.path.join(UPLOAD_FOLDER, unique_filename)
file.save(save_path)
# Возвращаем полный URL загруженного файла с протоколом https
full_url = request.url_root.replace('http://', 'https://') + 'uploads/' + unique_filename
return f"File uploaded successfully and saved to {full_url}", 200
@app.route('/uploads/<filename>', methods=['GET'])
def uploaded_file(filename):
return send_from_directory(UPLOAD_FOLDER, filename)
@app.route('/up_fa', methods=['GET'])
def up_fa():
return render_template('up_fa.html')
# Маршрут для обработки GET-запроса из gc
@app.route('/add_contact', methods=['GET'])
def add_contact():
try:
name = request.args.get('name')
phone = request.args.get('phone')
email = request.args.get('email')
if not name or not phone or not email:
return "Parameters 'name', 'phone', and 'email' are required.", 400
conn = sqlite3.connect('data.db')
cursor = conn.cursor()
cursor.execute('INSERT INTO contacts (name, phone, email) VALUES (?, ?, ?)', (name, phone, email))
conn.commit()
conn.close()
return f"Contact added: {name} - {phone} - {email}", 200
except Exception as e:
print(f"Error adding contact: {e}")
return "Internal Server Error", 500
# Маршрут для отображения таблицы контактов из gc
@app.route('/contacts')
def show_contacts():
try:
conn = sqlite3.connect('data.db')
cursor = conn.cursor()
cursor.execute('SELECT name, phone, email FROM contacts')
contacts = cursor.fetchall()
conn.close()
# HTML-шаблон для отображения таблицы
html = '''
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
<title>Contacts</title>
<style>
table {
width: 70%;
border-collapse: collapse;
}
th, td {
border: 1px solid black;
padding: 8px;
text-align: left;
}
th {
background-color: #f2f2f2;
}
</style>
</head>
<body>
<h1>Contacts</h1>
<table>
<tr>
<th>Name</th>
<th>Phone</th>
<th>Email</th>
</tr>
{% for contact in contacts %}
<tr>
<td>{{ contact[0] }}</td>
<td>{{ contact[1] }}</td>
<td>{{ contact[2] }}</td>
</tr>
{% endfor %}
</table>
</body>
</html>
'''
return render_template_string(html, contacts=contacts)
except Exception as e:
print(f"Error showing contacts: {e}")
return "Internal Server Error", 500
# Переменные с данными
action_d = "add"
params_d = ""
name_d = ""
email_d = ""
phone_d = ""
pr1_d = ""
pr2_d = ""
pr3_d = ""
@app.route('/getcurse_db', methods=['GET'])
def getcurse_db():
# Чтение параметров из GET-запроса
name_d = request.args.get('name', '')
email_d = request.args.get('email', '')
phone_d = request.args.get('phone', '')
pr1_d = request.args.get('pr1', '')
pr2_d = request.args.get('pr2', '')
pr3_d = request.args.get('pr3', '')
# Формирование JSON
json_data = {
"user": {
"email": email_d,
"phone": phone_d,
"first_name": name_d,
"addfields": {
"pr1": pr1_d,
"pr2": pr2_d,
"pr3": pr3_d
}
},
"system": {
"refresh_if_exists": 1
},
"session": {
"utm_source": "",
"utm_medium": "",
"utm_content": "",
"utm_campaign": "",
"utm_group": "",
"gcpc": "",
"gcao": "",
"referer": ""
}
}
# Конвертация JSON в Base64
json_str = json.dumps(json_data)
params_d = base64.b64encode(json_str.encode('utf-8')).decode('utf-8')
# Данные для отправки в теле запроса
data = {
'key': key_d,
'action': action_d,
'params': params_d
}
# Отправка POST-запроса с данными в формате "form-data"
response = requests.post(test_url, data=data)
# Возвращаем ответ от тестового адреса
return {
'status_code': response.status_code,
'response_body': response.text
}
if __name__ == '__main__':
app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 7860)))