{ "cells": [ { "cell_type": "code", "execution_count": null, "id": "6d0008865049c12d", "metadata": { "collapsed": false }, "outputs": [], "source": [ "import subprocess\n", "import threading\n", "import time\n", "import mysql.connector as mysql\n", "import pandas as pd\n", "import warnings\n", "import pickle\n", "import numpy as np\n", "\n", "# Specify the file of the model for blackjack strategy\n", "filename = './blackjack.pkl'\n", "warnings.filterwarnings('ignore')" ] }, { "cell_type": "code", "outputs": [], "source": [ "# Load the model for blackjack strategy\n", "loaded_model = pickle.load(open(filename, 'rb'))" ], "metadata": { "collapsed": false }, "id": "d1e3920bbcfae571", "execution_count": null }, { "cell_type": "code", "outputs": [], "source": [ "try:\n", " # Connect to the database\n", " conn = mysql.connect(\n", " host='localhost',\n", " user='root',\n", " password='19731005',\n", " database='schema_blackjack'\n", " )\n", " cursor = conn.cursor()\n", "\n", " def enqueue_output(process, output_list, detected_classes_list, previous_detected_classes_list):\n", " \n", " # Read the output line by line\n", " for line in iter(process.stdout.readline, b''):\n", " detected_classes_list.clear()\n", " # Append the line to the output list\n", " output_list.append(line.rstrip('\\n'))\n", " # Add the result of the process_output function to the detected_classes_list\n", " detected_classes_list.extend(process_output(line,previous_detected_classes_list))\n", "\n", " process.stdout.close()\n", "\n", " def process_output(output, previous_detected_classes):\n", " detected_classes = []\n", " current_detected_classes = []\n", " \n", " # Split the output by comma and space\n", " result_parts = output.split(\", \")\n", " \n", " for part in result_parts:\n", " split_part = part.split(\" \")\n", " # Check if the part has at least 2 elements and the second last element is a digit\n", " if len(split_part) >= 2 and split_part[-2].isdigit():\n", " count = int(split_part[-2])\n", " class_name = split_part[-1]\n", " \n", " # Check if the count is equal to 4 detection of card number\n", " if count in [4]:\n", " # Append the class name to the current detected classes\n", " current_detected_classes.append(class_name)\n", "\n", " for class_name in current_detected_classes:\n", " # Check if the class name is not in the detected classes\n", " if class_name not in detected_classes:\n", " detected_classes.append(class_name)\n", "\n", " if not previous_detected_classes:\n", " # Append the current detected classes to the previous detected classes\n", " previous_detected_classes.extend(current_detected_classes)\n", " detected_classes = []\n", " else:\n", " # If the length is the same between the two lists, it means that the detection is the same and not changed\n", " if len(previous_detected_classes) == len(current_detected_classes):\n", " detected_classes = []\n", " else:\n", " # Get the last index of the previous detected classes\n", " index = len(previous_detected_classes)\n", " # Take the class_name end of previous detected classes for the first index of the current detected classes\n", " for class_name in current_detected_classes[index:]:\n", " # Check if the class name is not in the detected classes\n", " if class_name not in detected_classes:\n", " detected_classes.append(class_name)\n", " for class_name in detected_classes:\n", " if class_name not in current_detected_classes[index:]:\n", " detected_classes.remove(class_name)\n", "\n", " # Clear the previous detected classes and append the current detected classes\n", " previous_detected_classes.clear()\n", " previous_detected_classes.extend(current_detected_classes.copy())\n", " \n", " # Clear the current detected classes\n", " results_detection = detected_classes.copy()\n", " detected_classes.clear()\n", " current_detected_classes.clear()\n", "\n", " return results_detection\n", "\n", " # Run the script for player and dealer\n", " def run_script(script_name, source):\n", " # Run the script with the source of camera\n", " process = subprocess.Popen([\"python\", script_name, source], stdout=subprocess.PIPE, text=True)\n", " \n", " # Initialize the list needed for the detection\n", " output_list = []\n", " detected_classes_list = []\n", " previous_detected_classes_list = []\n", " \n", " # Start the thread for the detection using enqueue_output function\n", " t = threading.Thread(target=enqueue_output, args=(process, output_list, detected_classes_list, previous_detected_classes_list))\n", " \n", " # Set the thread as daemon\n", " t.daemon = True\n", " t.start()\n", " return process, output_list, detected_classes_list, previous_detected_classes_list\n", "\n", " # Run the player and dealer scripts\n", " player_process, player_output_list, player_detected_classes_list, player_previous_detected_classes_list = run_script(\"player.py\", \"0\") # 7\n", " dealer_process, dealer_output_list, dealer_detected_classes_list, dealer_previous_detected_classes_list = run_script(\"dealer.py\", \"5\") # 12\n", "\n", " # Initialize the variables needed for the detection\n", " player_previous_output = None\n", " dealer_previous_output = None\n", "\n", " player_card = None\n", " dealer_card = None\n", "\n", " detected_player = False\n", " detected_dealer = False\n", "\n", " print(\"Starting the detection...\")\n", " \n", " # Loop until the player and dealer cards are detected\n", " while True:\n", " time.sleep(1)\n", "\n", " # Check if the player and dealer processes are terminated\n", " if player_process.poll() is not None and dealer_process.poll() is not None:\n", " break\n", "\n", " # Check if the player and dealer output lists are not empty and the detected classes are not the same as the previous detected classes\n", " if player_output_list and player_detected_classes_list != player_previous_detected_classes_list and detected_player == False:\n", " # Check if the player has 2 cards\n", " if len(player_detected_classes_list) == 2:\n", " print(\"Player card detected!\")\n", " detected_player = True\n", " \n", " previous_output1 = player_detected_classes_list.copy()\n", " player_output_list.clear()\n", " \n", " # Join the detected classes with comma\n", " player_card = ', '.join(map(str, player_detected_classes_list))\n", "\n", " if dealer_output_list and dealer_detected_classes_list != dealer_previous_detected_classes_list and detected_dealer == False:\n", " # Check if the dealer has 1 card\n", " if len(dealer_detected_classes_list) == 1:\n", " print(\"Dealer card detected!\")\n", " detected_dealer = True\n", " previous_output2 = dealer_detected_classes_list.copy()\n", " dealer_output_list.clear()\n", "\n", " dealer_card = ', '.join(map(str, dealer_detected_classes_list))\n", "\n", " # Check if the player and dealer cards are detected correctly\n", " if player_card is not None and dealer_card is not None and ',' in player_card and dealer_card != '' and player_card != '':\n", " \n", " # Insert the player and dealer cards to the database\n", " query = f\"INSERT INTO classes (player, dealer) VALUES ('{player_card}', '{dealer_card}')\"\n", " \n", " cursor.execute(query)\n", " conn.commit()\n", " \n", " player_card = None\n", " dealer_card = None\n", " \n", " # Remove all unnecessary rows from the database\n", " cursor.execute(\"DELETE FROM classes WHERE player IS NULL OR player = ''\")\n", " cursor.execute(\"DELETE FROM classes WHERE dealer IS NULL OR dealer = ''\")\n", "\n", " # Get the last row from the database to get the last detected player and dealer cards\n", " query = \"SELECT player, dealer FROM classes ORDER BY id DESC LIMIT 1\"\n", " cursor.execute(query)\n", " player_cards, dealer_cards = cursor.fetchone()\n", "\n", " # See if the number of cards in last detection are correct\n", " if len(player_cards[0].split(',')) == 2 and len(dealer_cards[0].split(',')) == 1 and player_cards[0] != '' and dealer_cards[0] != '':\n", " print(\"Actualizing the database...\")\n", " time.sleep(2)\n", " break\n", " \n", " conn.commit()\n", " \n", "except mysql.errors.InterfaceError as e:\n", " error_code = e.errno\n", " if error_code == 2006:\n", " print(\"The MySQL server has gone away. Trying to reconnect...\")\n", "\n", " conn = mysql.connect(\n", " host='localhost',\n", " user='root',\n", " password='19731005',\n", " database='schema_blackjack'\n", " )\n", "\n", " cursor = conn.cursor()\n", " conn.commit()\n", " else:\n", " raise\n", "finally:\n", " cursor.close()\n", " conn.close()" ], "metadata": { "collapsed": true }, "id": "initial_id", "execution_count": null }, { "cell_type": "code", "outputs": [], "source": [ "# Calculate the quality of the game\n", "def calculate_run_count(row):\n", " # Take firstly the player cards and then the dealer card\n", " card_list = [row[1], row[2], row[3]]\n", " run_count = 0\n", " for card in card_list:\n", " if card in [2, 3, 4, 5, 6]:\n", " run_count += 1\n", " elif card in [10, 'J', 'Q', 'K', 'A']:\n", " run_count -= 1\n", " return run_count\n", "\n", "# Calculate the quality of the game link to the deck count\n", "def calculate_true_count(row):\n", " if row[0] != 0:\n", " deck_count = row[0] / 52\n", " true_count = row[1] / deck_count\n", " else:\n", " true_count = 0\n", " return true_count" ], "metadata": { "collapsed": false }, "id": "89559821fd638f43", "execution_count": null }, { "cell_type": "code", "outputs": [], "source": [ "def get_card_numbers(card_list):\n", " number_list = []\n", " \n", " # Get the number of the cards\n", " for card in card_list:\n", " number = card[:-2].strip()\n", " if number.isdigit():\n", " number_list.append(int(number))\n", " # For blackjack, the value is 10\n", " elif number in ['J', 'Q', 'K']:\n", " number_list.append(10)\n", " # For ace, the value is 1 or 11\n", " elif number == 'A':\n", " number_list.append(11)\n", " return numbers\n", "\n", "prediction = None\n", "\n", "try:\n", " conn = mysql.connect(\n", " host='localhost',\n", " user='root',\n", " password='19731005',\n", " database='schema_blackjack'\n", " )\n", "\n", " cursor = conn.cursor()\n", "\n", " # Get the last detected player and dealer cards\n", " query = \"SELECT CONCAT(player, ', ', dealer) as combined FROM classes ORDER BY id DESC LIMIT 1\"\n", " cursor.execute(query)\n", " result = cursor.fetchone()\n", " \n", " cards = result[0].split(', ')\n", " numbers = get_card_numbers(cards)\n", " print(numbers)\n", " \n", " conn.commit()\n", "\n", " # Get copy of the numbers\n", " prediction = numbers.copy()\n", " \n", " #TODO: Implement the blackjack detection and deck count\n", " \n", " # Insert value for deck count\n", " prediction.insert(0, 178)\n", " \n", " # Insert value for run and true count\n", " prediction.insert(1, calculate_run_count(prediction))\n", " prediction.insert(2, calculate_true_count(prediction))\n", " \n", " # If the dealer has blackjack\n", " prediction.insert(-1, 0)\n", " \n", " # If the player has blackjack\n", " prediction.insert(-2, 0)\n", "except mysql.errors.InterfaceError as e:\n", " error_code = e.errno\n", " if error_code == 2006:\n", " print(\"The MySQL server has gone away. Trying to reconnect...\")\n", "\n", " conn = mysql.connect(\n", " host='localhost',\n", " user='root',\n", " password='19731005',\n", " database='schema_blackjack'\n", " )\n", "\n", " cursor = conn.cursor()\n", " conn.commit()\n", " else:\n", " raise\n", "finally:\n", " cursor.close()\n", " conn.close()" ], "metadata": { "collapsed": false }, "id": "c093430eb1f42fa8", "execution_count": null }, { "cell_type": "code", "outputs": [], "source": [ "# Define the feature names\n", "feature_names = ['cards_remaining','run_count', 'true_count', 'player_card_1', 'player_card_2', 'dealer_card_1', 'is_blackjack_dealer', 'is_blackjack_player']\n", "\n", "# Define the action dictionary\n", "action_dict = {0: 'No Action', 1: 'Stand', 2: 'Hit', 3: 'Double', 4: 'Split', 5: 'Surrender', 6: 'No Insurance'}" ], "metadata": { "collapsed": false }, "id": "d907df9075dcf582", "execution_count": null }, { "cell_type": "code", "outputs": [], "source": [ "#TODO: Implement the interface for the action\n", "\n", "if prediction is not None:\n", " # Reshape the prediction\n", " prediction_array = np.array(prediction)\n", " prediction_reshaped = prediction_array.reshape(1, -1)\n", " prediction_df = pd.DataFrame(prediction_reshaped, columns=feature_names)\n", " \n", " # Predict the strategy\n", " strategy = loaded_model.predict(prediction_df)\n", "\n", " # Get the action name of the strategy\n", " action_name = [action_dict[i] for i in strategy[0] if i != 0]\n", " \n", " # Print the action\n", " print(f\"The action is: {action_name}\")" ], "metadata": { "collapsed": false }, "id": "aadd4af70c552995", "execution_count": null } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.6" } }, "nbformat": 4, "nbformat_minor": 5 }