#!/usr/bin/env python # coding: utf-8 # # Bibliothek #!pip install PyMySQL import pprint import json import pymysql.cursors from fastapi import FastAPI, HTTPException from fastapi.responses import JSONResponse, HTMLResponse import time from dbutils.pooled_db import PooledDB import os # Create a connection pool db_config = { 'host': os.environ['MURMEL_DB_HOST'], 'user': os.environ['MURMEL_DB_USER'], 'password': os.environ['MURMEL_DB_PASSWORD'], 'database': 'murmel', 'cursorclass': pymysql.cursors.DictCursor } pool = PooledDB(pymysql, maxconnections=5, **db_config) app = FastAPI() def execute_query(sql, params=None, max_retries=3, retry_delay=1): for attempt in range(max_retries): try: connection = pool.connection() with connection.cursor() as cursor: cursor.execute(sql, params or ()) result = cursor.fetchall() connection.commit() return result except pymysql.OperationalError as e: if attempt == max_retries - 1: raise HTTPException(status_code=500, detail="Database connection error") time.sleep(retry_delay) finally: if 'connection' in locals(): connection.close() @app.get("/groups") def get_groups(): sql = "SELECT Gruppe, idGruppe FROM `gruppe` WHERE aktuell is True ORDER BY idGruppe ASC;" return execute_query(sql) @app.get("/students/{idGruppe}") def get_students(idGruppe): if idGruppe == 'all': sql = """ SELECT DISTINCT concat(`ki`.`Vorname`,' ',`ki`.`Nachnamen`) AS `Kind`, `ki`.`idKind` AS `idKind` FROM `kind` `ki` JOIN `kind_x_gruppe_x_schuljahr` `kgs` ON `ki`.`idKind` = `kgs`.`x_kind` JOIN `schuljahr` `sch` ON `sch`.`idschuljahr` = `kgs`.`x_schuljahr` WHERE `sch`.`aktuell` = 1 ORDER BY 1 """ return execute_query(sql) else: sql = """ SELECT concat(`ki`.`Vorname`,' ',`ki`.`Nachnamen`) AS `Kind`, `ki`.`idKind` AS `idKind` FROM `kind` `ki` JOIN `kind_x_gruppe_x_schuljahr` `kgs` ON `ki`.`idKind` = `kgs`.`x_kind` JOIN `schuljahr` `sch` ON `sch`.`idschuljahr` = `kgs`.`x_schuljahr` JOIN `gruppe` `gr` ON `gr`.`idGruppe` = `kgs`.`x_gruppe` WHERE `sch`.`aktuell` = 1 AND (`gr`.`Gruppe` = %s OR `gr`.`idGruppe` = %s) ORDER BY 1 """ return execute_query(sql, (idGruppe, idGruppe)) @app.get("/return/{idBuch}") def rueckgabe(idBuch, grund='rueckgabe'): """ Updates the database to mark a book as returned. Parameters: - idBuch (int): The ID of the book to be returned. - grund (str): The reason for the return (default: 'rueckgabe'). Returns: - int: 0 if the book was not found or already returned. """ sql = "SELECT `idBuch`, `idKind`, `ausleihe`, `rueckgabe`, `rueckGrund` FROM `ausleihe_test` WHERE `idBuch` = %s AND `rueckgabe` is NULL;" result = execute_query(sql, (idBuch,)) if len(result) == 0: return 0 # return the book sql = "UPDATE `ausleihe_test` SET `rueckgabe` = NOW(), `rueckGrund` = %s WHERE `idBuch` = %s AND `rueckgabe` is NULL;" execute_query(sql, (grund, idBuch)) #pprint.pprint(result) # return if the book was returned or not and who had it before return result @app.get("/borrow/{idBuch}/{idKind}") def ausleihe(idBuch, idKind): """ Performs a book loan operation by inserting a new record into the 'ausleihe_test' table. Parameters: - idBuch (int): The ID of the book being loaned. - idKind (int): The ID of the child borrowing the book. Returns: - dict: A dictionary containing the result of the borrowing operation. """ rueckgabe_result = rueckgabe(idBuch, grund="neu-ausleihe") message = "Buch erfolgreich ausgeliehen" if rueckgabe_result: # Get the name of the previous borrower prev_borrower_id = rueckgabe_result[0]['idKind'] sql = "SELECT CONCAT(Vorname, ' ', Nachnamen) AS full_name FROM kind WHERE idKind = %s;" prev_borrower_name = execute_query(sql, (prev_borrower_id,))[0]['full_name'] message += f". Zuvor ausgeliehen von {prev_borrower_name}" # Insert new borrowing record sql = "INSERT INTO `ausleihe_test` (`idBuch`, `idKind`, `ausleihe`) VALUES (%s, %s, NOW());" execute_query(sql, (idBuch, idKind)) return {"message": message} @app.get("/borrowed/{idKind}") def ausgeliehen(idKind): """ Retrieves the books that are currently borrowed by a specific child. Args: idKind (int): The ID of the child. Returns: list: A list of tuples containing the book ID and the borrowing date for each book that is currently borrowed by the child. """ sql = "SELECT `idBuch`, `ausleihe` FROM `ausleihe_test` WHERE `idKind` = %s AND `rueckgabe` IS NULL;" result = execute_query(sql, (idKind,)) #pprint.pprint(result) return result @app.get("/", response_class=HTMLResponse) async def read_root(): with open("index.html", "r") as f: content = f.read() return HTMLResponse(content=content) # run the app if __name__ == '__main__': import uvicorn uvicorn.run(app, host='localhost', port=5000) # %%