Spaces:
Sleeping
Sleeping
| """ | |
| FileName: app.py | |
| Author: Benhao Huang | |
| Create Date: 2023/11/18 | |
| Description: this file is used to display our project and add visualization elements to the game, using Streamlit | |
| """ | |
| import time | |
| import pandas as pd | |
| from copy import deepcopy | |
| # import torch | |
| import numpy as np | |
| import streamlit as st | |
| from scipy.signal import convolve # this is used to check if any player wins | |
| from streamlit import session_state | |
| from streamlit_server_state import server_state, server_state_lock | |
| from Gomoku_MCTS import MCTSpure, alphazero, Board, PolicyValueNet | |
| import matplotlib.pyplot as plt | |
| from const import ( | |
| _BLACK, # 1, for human | |
| _WHITE, # 2 , for AI | |
| _BLANK, | |
| _PLAYER_COLOR, | |
| _PLAYER_SYMBOL, | |
| _ROOM_COLOR, | |
| _VERTICAL, | |
| _HORIZONTAL, | |
| _DIAGONAL_UP_LEFT, | |
| _DIAGONAL_UP_RIGHT, | |
| _BOARD_SIZE, | |
| _BOARD_SIZE_1D | |
| ) | |
| # Utils | |
| class Room: | |
| def __init__(self, room_id) -> None: | |
| self.ROOM_ID = room_id | |
| # self.BOARD = np.zeros(shape=(_BOARD_SIZE, _BOARD_SIZE), dtype=int) | |
| self.BOARD = Board(width=_BOARD_SIZE, height=_BOARD_SIZE, n_in_row=5, players=[_BLACK, _WHITE]) | |
| self.PLAYER = _BLACK | |
| self.TURN = self.PLAYER | |
| self.HISTORY = (0, 0) | |
| self.WINNER = _BLANK | |
| self.TIME = time.time() | |
| self.MCTS = MCTSpure(c_puct=5, n_playout=10) | |
| self.COORDINATE_1D = [_BOARD_SIZE_1D + 1] | |
| self.current_move = -1 | |
| self.simula_time_list = [] | |
| def change_turn(cur): | |
| return cur % 2 + 1 | |
| # Initialize the game | |
| if "ROOM" not in session_state: | |
| session_state.ROOM = Room("local") | |
| if "OWNER" not in session_state: | |
| session_state.OWNER = False | |
| # Check server health | |
| if "ROOMS" not in server_state: | |
| with server_state_lock["ROOMS"]: | |
| server_state.ROOMS = {} | |
| # # Layout | |
| # Main | |
| TITLE = st.empty() | |
| TITLE.header("๐ค AI 3603 Gomoku") | |
| ROUND_INFO = st.empty() | |
| st.markdown("<br>", unsafe_allow_html=True) | |
| BOARD_PLATE = [ | |
| [cell.empty() for cell in st.columns([1 for _ in range(_BOARD_SIZE)])] for _ in range(_BOARD_SIZE) | |
| ] | |
| LOG = st.empty() | |
| # Sidebar | |
| SCORE_TAG = st.sidebar.empty() | |
| SCORE_PLATE = st.sidebar.columns(2) | |
| # History scores | |
| SCORE_TAG.subheader("Scores") | |
| PLAY_MODE_INFO = st.sidebar.container() | |
| MULTIPLAYER_TAG = st.sidebar.empty() | |
| with st.sidebar.container(): | |
| ANOTHER_ROUND = st.empty() | |
| RESTART = st.empty() | |
| EXIT = st.empty() | |
| GAME_INFO = st.sidebar.container() | |
| message = st.empty() | |
| PLAY_MODE_INFO.write("---\n\n**You are Black, AI agent is White.**") | |
| GAME_INFO.markdown( | |
| """ | |
| --- | |
| # <span style="color:black;">Freestyle Gomoku game. ๐ฒ</span> | |
| - no restrictions ๐ซ | |
| - no regrets ๐ | |
| - swap players after one round is over ๐ | |
| Powered by an AlphaZero approach with our own improvements! ๐ For the specific details, please check out our <a href="insert_report_link_here" style="color:blue;">report</a>. | |
| ##### Adapted and improved by us! ๐ <a href="https://github.com/Lijiaxin0111/AI_3603_BIGHOME" style="color:blue;">Our Github repo</a> | |
| """, | |
| unsafe_allow_html=True, | |
| ) | |
| def restart() -> None: | |
| """ | |
| Restart the game. | |
| """ | |
| session_state.ROOM = Room(session_state.ROOM.ROOM_ID) | |
| RESTART.button( | |
| "Reset", | |
| on_click=restart, | |
| help="Clear the board as well as the scores", | |
| ) | |
| # Draw the board | |
| def gomoku(): | |
| """ | |
| Draw the board. | |
| Handle the main logic. | |
| """ | |
| # Restart the game | |
| # Continue new round | |
| def another_round() -> None: | |
| """ | |
| Continue new round. | |
| """ | |
| session_state.ROOM = deepcopy(session_state.ROOM) | |
| session_state.ROOM.BOARD = Board(width=_BOARD_SIZE, height=_BOARD_SIZE, n_in_row=5) | |
| session_state.ROOM.PLAYER = session_state.ROOM.PLAYER | |
| session_state.ROOM.TURN = session_state.ROOM.PLAYER | |
| session_state.ROOM.WINNER = _BLANK # 0 | |
| session_state.ROOM.COORDINATE_1D = [_BOARD_SIZE_1D + 1] | |
| # Room status sync | |
| def sync_room() -> bool: | |
| room_id = session_state.ROOM.ROOM_ID | |
| if room_id not in server_state.ROOMS.keys(): | |
| session_state.ROOM = Room("local") | |
| return False | |
| elif server_state.ROOMS[room_id].TIME == session_state.ROOM.TIME: | |
| return False | |
| elif server_state.ROOMS[room_id].TIME < session_state.ROOM.TIME: | |
| # Only acquire the lock when writing to the server state | |
| with server_state_lock["ROOMS"]: | |
| server_rooms = server_state.ROOMS | |
| server_rooms[room_id] = session_state.ROOM | |
| server_state.ROOMS = server_rooms | |
| return True | |
| else: | |
| session_state.ROOM = server_state.ROOMS[room_id] | |
| return True | |
| # Check if winner emerge from move | |
| def check_win() -> int: | |
| """ | |
| Use convolution to check if any player wins. | |
| """ | |
| vertical = convolve( | |
| session_state.ROOM.BOARD.board_map, | |
| _VERTICAL, | |
| mode="same", | |
| ) | |
| horizontal = convolve( | |
| session_state.ROOM.BOARD.board_map, | |
| _HORIZONTAL, | |
| mode="same", | |
| ) | |
| diagonal_up_left = convolve( | |
| session_state.ROOM.BOARD.board_map, | |
| _DIAGONAL_UP_LEFT, | |
| mode="same", | |
| ) | |
| diagonal_up_right = convolve( | |
| session_state.ROOM.BOARD.board_map, | |
| _DIAGONAL_UP_RIGHT, | |
| mode="same", | |
| ) | |
| if ( | |
| np.max( | |
| [ | |
| np.max(vertical), | |
| np.max(horizontal), | |
| np.max(diagonal_up_left), | |
| np.max(diagonal_up_right), | |
| ] | |
| ) | |
| == 5 * _BLACK | |
| ): | |
| winner = _BLACK | |
| elif ( | |
| np.min( | |
| [ | |
| np.min(vertical), | |
| np.min(horizontal), | |
| np.min(diagonal_up_left), | |
| np.min(diagonal_up_right), | |
| ] | |
| ) | |
| == 5 * _WHITE | |
| ): | |
| winner = _WHITE | |
| else: | |
| winner = _BLANK | |
| return winner | |
| # Triggers the board response on click | |
| def handle_click(x, y): | |
| """ | |
| Controls whether to pass on / continue current board / may start new round | |
| """ | |
| if session_state.ROOM.BOARD.board_map[x][y] != _BLANK: | |
| pass | |
| elif ( | |
| session_state.ROOM.ROOM_ID in server_state.ROOMS.keys() | |
| and _ROOM_COLOR[session_state.OWNER] | |
| != server_state.ROOMS[session_state.ROOM.ROOM_ID].TURN | |
| ): | |
| sync_room() | |
| # normal play situation | |
| elif session_state.ROOM.WINNER == _BLANK: | |
| # session_state.ROOM = deepcopy(session_state.ROOM) | |
| print("View of human player: ", session_state.ROOM.BOARD.board_map) | |
| move = session_state.ROOM.BOARD.location_to_move((x, y)) | |
| session_state.ROOM.current_move = move | |
| session_state.ROOM.BOARD.do_move(move) | |
| session_state.ROOM.BOARD.board_map[x][y] = session_state.ROOM.TURN | |
| session_state.ROOM.COORDINATE_1D.append(x * _BOARD_SIZE + y) | |
| session_state.ROOM.TURN = change_turn(session_state.ROOM.TURN) | |
| win, winner = session_state.ROOM.BOARD.game_end() | |
| if win: | |
| session_state.ROOM.WINNER = winner | |
| session_state.ROOM.HISTORY = ( | |
| session_state.ROOM.HISTORY[0] | |
| + int(session_state.ROOM.WINNER == _WHITE), | |
| session_state.ROOM.HISTORY[1] | |
| + int(session_state.ROOM.WINNER == _BLACK), | |
| ) | |
| session_state.ROOM.TIME = time.time() | |
| def forbid_click(x, y): | |
| # st.warning('This posistion has been occupied!!!!', icon="โ ๏ธ") | |
| st.error("({}, {}) has been occupied!!)".format(x, y), icon="๐จ") | |
| print("asdas") | |
| # Draw board | |
| def draw_board(response: bool): | |
| """construct each buttons for all cells of the board""" | |
| if response and session_state.ROOM.TURN == _BLACK: # human turn | |
| print("Your turn") | |
| # construction of clickable buttons | |
| for i, row in enumerate(session_state.ROOM.BOARD.board_map): | |
| # print("row:", row) | |
| for j, cell in enumerate(row): | |
| if ( | |
| i * _BOARD_SIZE + j | |
| in (session_state.ROOM.COORDINATE_1D) | |
| ): | |
| # disable click for GPT choices | |
| BOARD_PLATE[i][j].button( | |
| _PLAYER_SYMBOL[cell], | |
| key=f"{i}:{j}", | |
| args=(i, j), | |
| on_click=forbid_click | |
| ) | |
| else: | |
| # enable click for other cells available for human choices | |
| BOARD_PLATE[i][j].button( | |
| _PLAYER_SYMBOL[cell], | |
| key=f"{i}:{j}", | |
| on_click=handle_click, | |
| args=(i, j), | |
| ) | |
| elif response and session_state.ROOM.TURN == _WHITE: # AI turn | |
| message.empty() | |
| with st.spinner('๐ฎโจ Waiting for AI response... โณ๐'): | |
| time.sleep(0.1) | |
| print("AI's turn") | |
| print("Below are current board under AI's view") | |
| print(session_state.ROOM.BOARD.board_map) | |
| move, simul_time = session_state.ROOM.MCTS.get_action(session_state.ROOM.BOARD) | |
| session_state.ROOM.simula_time_list.append(simul_time) | |
| print("AI takes move: ", move) | |
| session_state.ROOM.current_move = move | |
| gpt_response = move | |
| gpt_i, gpt_j = gpt_response // _BOARD_SIZE, gpt_response % _BOARD_SIZE | |
| print("AI's move is located at ({}, {}) :".format(gpt_i, gpt_j)) | |
| move = session_state.ROOM.BOARD.location_to_move((gpt_i, gpt_j)) | |
| print("Location to move: ", move) | |
| session_state.ROOM.BOARD.do_move(move) | |
| # session_state.ROOM.BOARD[gpt_i][gpt_j] = session_state.ROOM.TURN | |
| session_state.ROOM.COORDINATE_1D.append(gpt_i * _BOARD_SIZE + gpt_j) | |
| # construction of clickable buttons | |
| for i, row in enumerate(session_state.ROOM.BOARD.board_map): | |
| # print("row:", row) | |
| for j, cell in enumerate(row): | |
| if ( | |
| i * _BOARD_SIZE + j | |
| in (session_state.ROOM.COORDINATE_1D) | |
| ): | |
| # disable click for GPT choices | |
| BOARD_PLATE[i][j].button( | |
| _PLAYER_SYMBOL[cell], | |
| key=f"{i}:{j}", | |
| args=(i, j), | |
| on_click=forbid_click | |
| ) | |
| else: | |
| # enable click for other cells available for human choices | |
| BOARD_PLATE[i][j].button( | |
| _PLAYER_SYMBOL[cell], | |
| key=f"{i}:{j}", | |
| on_click=handle_click, | |
| args=(i, j), | |
| ) | |
| message.markdown( | |
| 'AI agent has calculated its strategy, which takes <span style="color: blue; font-size: 20px;">{:.3e}</span>s per simulation.'.format( | |
| simul_time), | |
| unsafe_allow_html=True | |
| ) | |
| LOG.subheader("Logs") | |
| # change turn | |
| session_state.ROOM.TURN = change_turn(session_state.ROOM.TURN) | |
| # session_state.ROOM.WINNER = check_win() | |
| win, winner = session_state.ROOM.BOARD.game_end() | |
| if win: | |
| session_state.ROOM.WINNER = winner | |
| session_state.ROOM.HISTORY = ( | |
| session_state.ROOM.HISTORY[0] | |
| + int(session_state.ROOM.WINNER == _WHITE), | |
| session_state.ROOM.HISTORY[1] | |
| + int(session_state.ROOM.WINNER == _BLACK), | |
| ) | |
| session_state.ROOM.TIME = time.time() | |
| if not response or session_state.ROOM.WINNER != _BLANK: | |
| print("Game over") | |
| for i, row in enumerate(session_state.ROOM.BOARD.board_map): | |
| for j, cell in enumerate(row): | |
| BOARD_PLATE[i][j].write( | |
| _PLAYER_SYMBOL[cell], | |
| key=f"{i}:{j}", | |
| ) | |
| # Game process control | |
| def game_control(): | |
| if session_state.ROOM.WINNER != _BLANK: | |
| draw_board(False) | |
| else: | |
| draw_board(True) | |
| if session_state.ROOM.WINNER != _BLANK or 0 not in session_state.ROOM.BOARD.board_map: | |
| ANOTHER_ROUND.button( | |
| "Play Next round!", | |
| on_click=another_round, | |
| help="Clear board and swap first player", | |
| ) | |
| # Infos | |
| def update_info() -> None: | |
| # Additional information | |
| SCORE_PLATE[0].metric("Gomoku-Agent", session_state.ROOM.HISTORY[0]) | |
| SCORE_PLATE[1].metric("Black", session_state.ROOM.HISTORY[1]) | |
| if session_state.ROOM.WINNER != _BLANK: | |
| st.balloons() | |
| ROUND_INFO.write( | |
| f"#### **{_PLAYER_COLOR[session_state.ROOM.WINNER]} WIN!**\n**Click buttons on the left for more plays.**" | |
| ) | |
| # elif 0 not in session_state.ROOM.BOARD.board_map: | |
| # ROUND_INFO.write("#### **Tie**") | |
| # else: | |
| # ROUND_INFO.write( | |
| # f"#### **{_PLAYER_SYMBOL[session_state.ROOM.TURN]} {_PLAYER_COLOR[session_state.ROOM.TURN]}'s turn...**" | |
| # ) | |
| # draw the plot for simulation time | |
| # ๅๅปบไธไธช DataFrame | |
| print(session_state.ROOM.simula_time_list) | |
| st.markdown("<br>", unsafe_allow_html=True) | |
| st.markdown("<br>", unsafe_allow_html=True) | |
| chart_data = pd.DataFrame(session_state.ROOM.simula_time_list, columns=["Simulation Time"]) | |
| st.line_chart(chart_data) | |
| # The main game loop | |
| game_control() | |
| update_info() | |
| if __name__ == "__main__": | |
| gomoku() | |