"""
FileName: app.py
Author: Benhao Huang
Create Date: 2023/11/19
Description: this file is used to display our project and add visualization elements to the game, using Streamlit
"""
import time
import pandas as pd
from copy import deepcopy
# import torch
import numpy as np
import streamlit as st
from scipy.signal import convolve # this is used to check if any player wins
from streamlit import session_state
from streamlit_server_state import server_state, server_state_lock
from Gomoku_MCTS import MCTSpure, alphazero, Board, PolicyValueNet
import matplotlib.pyplot as plt
from const import (
_BLACK, # 1, for human
_WHITE, # 2 , for AI
_BLANK,
_PLAYER_COLOR,
_PLAYER_SYMBOL,
_ROOM_COLOR,
_VERTICAL,
_HORIZONTAL,
_DIAGONAL_UP_LEFT,
_DIAGONAL_UP_RIGHT,
_BOARD_SIZE,
_BOARD_SIZE_1D
)
# Utils
class Room:
def __init__(self, room_id) -> None:
self.ROOM_ID = room_id
# self.BOARD = np.zeros(shape=(_BOARD_SIZE, _BOARD_SIZE), dtype=int)
self.BOARD = Board(width=_BOARD_SIZE, height=_BOARD_SIZE, n_in_row=5, players=[_BLACK, _WHITE])
self.PLAYER = _BLACK
self.TURN = self.PLAYER
self.HISTORY = (0, 0)
self.WINNER = _BLANK
self.TIME = time.time()
self.MCTS = MCTSpure(c_puct=5, n_playout=10)
self.MCTS = alphazero(PolicyValueNet(_BOARD_SIZE, _BOARD_SIZE, 'Gomoku_MCTS/checkpoints/best_policy_8_8_5_2torch.pth').policy_value_fn, c_puct=5, n_playout=100)
self.COORDINATE_1D = [_BOARD_SIZE_1D + 1]
self.current_move = -1
self.simula_time_list = []
def change_turn(cur):
return cur % 2 + 1
# Initialize the game
if "ROOM" not in session_state:
session_state.ROOM = Room("local")
if "OWNER" not in session_state:
session_state.OWNER = False
if "USE_AIAID" not in session_state:
session_state.USE_AIAID = False
# Check server health
if "ROOMS" not in server_state:
with server_state_lock["ROOMS"]:
server_state.ROOMS = {}
# # Layout
# Main
TITLE = st.empty()
TITLE.header("๐ค AI 3603 Gomoku")
ROUND_INFO = st.empty()
st.markdown("
", unsafe_allow_html=True)
BOARD_PLATE = [
[cell.empty() for cell in st.columns([1 for _ in range(_BOARD_SIZE)])] for _ in range(_BOARD_SIZE)
]
LOG = st.empty()
# Sidebar
SCORE_TAG = st.sidebar.empty()
SCORE_PLATE = st.sidebar.columns(2)
# History scores
SCORE_TAG.subheader("Scores")
PLAY_MODE_INFO = st.sidebar.container()
MULTIPLAYER_TAG = st.sidebar.empty()
with st.sidebar.container():
ANOTHER_ROUND = st.empty()
RESTART = st.empty()
AIAID = st.empty()
EXIT = st.empty()
GAME_INFO = st.sidebar.container()
message = st.empty()
PLAY_MODE_INFO.write("---\n\n**You are Black, AI agent is White.**")
GAME_INFO.markdown(
"""
---
# Freestyle Gomoku game. ๐ฒ
- no restrictions ๐ซ
- no regrets ๐
- swap players after one round is over ๐
Powered by an AlphaZero approach with our own improvements! ๐ For the specific details, please check out our report.
##### Adapted and improved by us! ๐ Our Github repo
""",
unsafe_allow_html=True,
)
def restart() -> None:
"""
Restart the game.
"""
session_state.ROOM = Room(session_state.ROOM.ROOM_ID)
RESTART.button(
"Reset",
on_click=restart,
help="Clear the board as well as the scores",
)
# Draw the board
def gomoku():
"""
Draw the board.
Handle the main logic.
"""
# Restart the game
# Continue new round
def another_round() -> None:
"""
Continue new round.
"""
session_state.ROOM = deepcopy(session_state.ROOM)
session_state.ROOM.BOARD = Board(width=_BOARD_SIZE, height=_BOARD_SIZE, n_in_row=5)
session_state.ROOM.PLAYER = session_state.ROOM.PLAYER
session_state.ROOM.TURN = session_state.ROOM.PLAYER
session_state.ROOM.WINNER = _BLANK # 0
session_state.ROOM.COORDINATE_1D = [_BOARD_SIZE_1D + 1]
# Room status sync
def sync_room() -> bool:
room_id = session_state.ROOM.ROOM_ID
if room_id not in server_state.ROOMS.keys():
session_state.ROOM = Room("local")
return False
elif server_state.ROOMS[room_id].TIME == session_state.ROOM.TIME:
return False
elif server_state.ROOMS[room_id].TIME < session_state.ROOM.TIME:
# Only acquire the lock when writing to the server state
with server_state_lock["ROOMS"]:
server_rooms = server_state.ROOMS
server_rooms[room_id] = session_state.ROOM
server_state.ROOMS = server_rooms
return True
else:
session_state.ROOM = server_state.ROOMS[room_id]
return True
# Check if winner emerge from move
def check_win() -> int:
"""
Use convolution to check if any player wins.
"""
vertical = convolve(
session_state.ROOM.BOARD.board_map,
_VERTICAL,
mode="same",
)
horizontal = convolve(
session_state.ROOM.BOARD.board_map,
_HORIZONTAL,
mode="same",
)
diagonal_up_left = convolve(
session_state.ROOM.BOARD.board_map,
_DIAGONAL_UP_LEFT,
mode="same",
)
diagonal_up_right = convolve(
session_state.ROOM.BOARD.board_map,
_DIAGONAL_UP_RIGHT,
mode="same",
)
if (
np.max(
[
np.max(vertical),
np.max(horizontal),
np.max(diagonal_up_left),
np.max(diagonal_up_right),
]
)
== 5 * _BLACK
):
winner = _BLACK
elif (
np.min(
[
np.min(vertical),
np.min(horizontal),
np.min(diagonal_up_left),
np.min(diagonal_up_right),
]
)
== 5 * _WHITE
):
winner = _WHITE
else:
winner = _BLANK
return winner
def ai_aid() -> None:
"""
Use AI Aid.
"""
session_state.USE_AIAID = not session_state.USE_AIAID
print('Use AI Aid: ', session_state.USE_AIAID)
draw_board(False)
# Triggers the board response on click
def handle_click(x, y):
"""
Controls whether to pass on / continue current board / may start new round
"""
if session_state.ROOM.BOARD.board_map[x][y] != _BLANK:
pass
elif (
session_state.ROOM.ROOM_ID in server_state.ROOMS.keys()
and _ROOM_COLOR[session_state.OWNER]
!= server_state.ROOMS[session_state.ROOM.ROOM_ID].TURN
):
sync_room()
# normal play situation
elif session_state.ROOM.WINNER == _BLANK:
# session_state.ROOM = deepcopy(session_state.ROOM)
# print("View of human player: ", session_state.ROOM.BOARD.board_map)
move = session_state.ROOM.BOARD.location_to_move((x, y))
session_state.ROOM.current_move = move
session_state.ROOM.BOARD.do_move(move)
session_state.ROOM.BOARD.board_map[x][y] = session_state.ROOM.TURN
session_state.ROOM.COORDINATE_1D.append(x * _BOARD_SIZE + y)
session_state.ROOM.TURN = change_turn(session_state.ROOM.TURN)
win, winner = session_state.ROOM.BOARD.game_end()
if win:
session_state.ROOM.WINNER = winner
session_state.ROOM.HISTORY = (
session_state.ROOM.HISTORY[0]
+ int(session_state.ROOM.WINNER == _WHITE),
session_state.ROOM.HISTORY[1]
+ int(session_state.ROOM.WINNER == _BLACK),
)
session_state.ROOM.TIME = time.time()
def forbid_click(x, y):
# st.warning('This posistion has been occupied!!!!', icon="โ ๏ธ")
st.error("({}, {}) has been occupied!!)".format(x, y), icon="๐จ")
print("asdas")
# Draw board
def draw_board(response: bool):
"""construct each buttons for all cells of the board"""
if session_state.USE_AIAID and session_state.ROOM.WINNER == _BLANK and session_state.ROOM.TURN == _BLACK:
copy_mcts = deepcopy(session_state.ROOM.MCTS.mcts)
_, acts, probs, simul_mean_time = copy_mcts.get_move_probs(session_state.ROOM.BOARD)
sorted_acts_probs = sorted(zip(acts, probs), key=lambda x: x[1], reverse=True)
top_five_acts = [act for act, prob in sorted_acts_probs[:5]]
top_five_probs = [prob for act, prob in sorted_acts_probs[:5]]
if response and session_state.ROOM.TURN == _BLACK: # human turn
print("Your turn")
# construction of clickable buttons
for i, row in enumerate(session_state.ROOM.BOARD.board_map):
# print("row:", row)
for j, cell in enumerate(row):
if (
i * _BOARD_SIZE + j
in (session_state.ROOM.COORDINATE_1D)
):
# disable click for GPT choices
BOARD_PLATE[i][j].button(
_PLAYER_SYMBOL[cell],
key=f"{i}:{j}",
args=(i, j),
on_click=forbid_click
)
else:
if session_state.USE_AIAID and i * _BOARD_SIZE + j in top_five_acts:
# enable click for other cells available for human choices
prob = top_five_probs[top_five_acts.index(i * _BOARD_SIZE + j)]
BOARD_PLATE[i][j].button(
_PLAYER_SYMBOL[cell] + f"({round(prob, 2)})",
key=f"{i}:{j}",
on_click=handle_click,
args=(i, j),
)
else:
# enable click for other cells available for human choices
BOARD_PLATE[i][j].button(
_PLAYER_SYMBOL[cell],
key=f"{i}:{j}",
on_click=handle_click,
args=(i, j),
)
elif response and session_state.ROOM.TURN == _WHITE: # AI turn
message.empty()
with st.spinner('๐ฎโจ Waiting for AI response... โณ๐'):
time.sleep(0.1)
print("AI's turn")
print("Below are current board under AI's view")
# print(session_state.ROOM.BOARD.board_map)
move, simul_time = session_state.ROOM.MCTS.get_action(session_state.ROOM.BOARD, return_time=True)
session_state.ROOM.simula_time_list.append(simul_time)
print("AI takes move: ", move)
session_state.ROOM.current_move = move
gpt_response = move
gpt_i, gpt_j = gpt_response // _BOARD_SIZE, gpt_response % _BOARD_SIZE
print("AI's move is located at ({}, {}) :".format(gpt_i, gpt_j))
move = session_state.ROOM.BOARD.location_to_move((gpt_i, gpt_j))
print("Location to move: ", move)
session_state.ROOM.BOARD.do_move(move)
# session_state.ROOM.BOARD[gpt_i][gpt_j] = session_state.ROOM.TURN
session_state.ROOM.COORDINATE_1D.append(gpt_i * _BOARD_SIZE + gpt_j)
copy_mcts = deepcopy(session_state.ROOM.MCTS.mcts)
_, acts, probs, simul_mean_time = copy_mcts.get_move_probs(session_state.ROOM.BOARD)
sorted_acts_probs = sorted(zip(acts, probs), key=lambda x: x[1], reverse=True)
top_five_acts = [act for act, prob in sorted_acts_probs[:5]]
top_five_probs = [prob for act, prob in sorted_acts_probs[:5]]
# construction of clickable buttons
for i, row in enumerate(session_state.ROOM.BOARD.board_map):
# print("row:", row)
for j, cell in enumerate(row):
if (
i * _BOARD_SIZE + j
in (session_state.ROOM.COORDINATE_1D)
):
# disable click for GPT choices
BOARD_PLATE[i][j].button(
_PLAYER_SYMBOL[cell],
key=f"{i}:{j}",
args=(i, j),
on_click=forbid_click
)
else:
if session_state.USE_AIAID and i * _BOARD_SIZE + j in top_five_acts:
# enable click for other cells available for human choices
prob = top_five_probs[top_five_acts.index(i * _BOARD_SIZE + j)]
BOARD_PLATE[i][j].button(
_PLAYER_SYMBOL[cell] + f"({round(prob, 2)})",
key=f"{i}:{j}",
on_click=handle_click,
args=(i, j),
)
else:
# enable click for other cells available for human choices
BOARD_PLATE[i][j].button(
_PLAYER_SYMBOL[cell],
key=f"{i}:{j}",
on_click=handle_click,
args=(i, j),
)
message.markdown(
'AI agent has calculated its strategy, which takes {:.3e}s per simulation.'.format(
simul_time),
unsafe_allow_html=True
)
LOG.subheader("Logs")
# change turn
session_state.ROOM.TURN = change_turn(session_state.ROOM.TURN)
# session_state.ROOM.WINNER = check_win()
win, winner = session_state.ROOM.BOARD.game_end()
if win:
session_state.ROOM.WINNER = winner
session_state.ROOM.HISTORY = (
session_state.ROOM.HISTORY[0]
+ int(session_state.ROOM.WINNER == _WHITE),
session_state.ROOM.HISTORY[1]
+ int(session_state.ROOM.WINNER == _BLACK),
)
session_state.ROOM.TIME = time.time()
if not response or session_state.ROOM.WINNER != _BLANK:
if session_state.ROOM.WINNER != _BLANK:
print("Game over")
for i, row in enumerate(session_state.ROOM.BOARD.board_map):
for j, cell in enumerate(row):
BOARD_PLATE[i][j].write(
_PLAYER_SYMBOL[cell],
# key=f"{i}:{j}",
)
# Game process control
def game_control():
if session_state.ROOM.WINNER != _BLANK:
draw_board(False)
else:
draw_board(True)
if session_state.ROOM.WINNER != _BLANK or 0 not in session_state.ROOM.BOARD.board_map:
ANOTHER_ROUND.button(
"Play Next round!",
on_click=another_round,
help="Clear board and swap first player",
)
# Infos
def update_info() -> None:
# Additional information
SCORE_PLATE[0].metric("Gomoku-Agent", session_state.ROOM.HISTORY[0])
SCORE_PLATE[1].metric("Black", session_state.ROOM.HISTORY[1])
if session_state.ROOM.WINNER != _BLANK:
st.balloons()
ROUND_INFO.write(
f"#### **{_PLAYER_COLOR[session_state.ROOM.WINNER]} WIN!**\n**Click buttons on the left for more plays.**"
)
# elif 0 not in session_state.ROOM.BOARD.board_map:
# ROUND_INFO.write("#### **Tie**")
# else:
# ROUND_INFO.write(
# f"#### **{_PLAYER_SYMBOL[session_state.ROOM.TURN]} {_PLAYER_COLOR[session_state.ROOM.TURN]}'s turn...**"
# )
# draw the plot for simulation time
# ๅๅปบไธไธช DataFrame
print(session_state.ROOM.simula_time_list)
st.markdown("
", unsafe_allow_html=True)
st.markdown("
", unsafe_allow_html=True)
chart_data = pd.DataFrame(session_state.ROOM.simula_time_list, columns=["Simulation Time"])
st.line_chart(chart_data)
# The main game loop
AIAID.button(
"Use AI Aid",
on_click=ai_aid,
help="Use AI Aid to help you make moves",
)
game_control()
update_info()
if __name__ == "__main__":
gomoku()