|
import streamlit as st |
|
import random |
|
import json |
|
import os |
|
import boto3 |
|
from pymongo import MongoClient |
|
from bson.objectid import ObjectId |
|
|
|
|
|
BOARD_SIZE = 10 |
|
EMPTY = ':ocean:' |
|
HIT = ':boom:' |
|
MISS = ':heavy_multiplication_x:' |
|
SHIPS = [ |
|
{"name": "Carrier", "size": 5, "symbol": ":ship:"}, |
|
{"name": "Battleship", "size": 4, "symbol": ":motor_boat:"}, |
|
{"name": "Cruiser", "size": 3, "symbol": ":boat:"}, |
|
{"name": "Submarine", "size": 3, "symbol": ":ferry:"}, |
|
{"name": "Destroyer", "size": 2, "symbol": ":speedboat:"} |
|
] |
|
|
|
|
|
client = MongoClient(os.environ.get('MONGODB_ATLAS_URI')) |
|
db = client['battleship'] |
|
games = db['games'] |
|
|
|
|
|
bedrock_runtime = boto3.client('bedrock-runtime', |
|
aws_access_key_id=os.environ.get('AWS_ACCESS_KEY'), |
|
aws_secret_access_key=os.environ.get('AWS_SECRET_KEY'), |
|
region_name="us-west-2") |
|
|
|
def get_bedrock_claude_move(board, openent_moves=None): |
|
""" |
|
Get the next move from Claude AI using AWS Bedrock. |
|
""" |
|
print("inside get_bedrock_claude_move") |
|
claude_body = json.dumps({ |
|
"anthropic_version": "bedrock-2023-05-31", |
|
"max_tokens": 5000, |
|
"temperature": 0, |
|
"system": f"Please provide the next move as an opponent in the battleship game , the board is {BOARD_SIZE} X {BOARD_SIZE} . Be smart and stratigic. Respond only in JSON format strictly: 'row' : ... , 'col' : ... , 'entertainment_comment' and nothing more . ", |
|
"messages": [{ |
|
"role": "user", |
|
"content": [ |
|
{"type": "text", "text": f"The current board: {board} and your moves were: {openent_moves if openent_moves else 'None'}"} |
|
] |
|
}] |
|
}) |
|
|
|
model_id = "anthropic.claude-3-5-sonnet-20241022-v2:0" if st.session_state.difficulty == "Expert" else "anthropic.claude-3-5-sonnet-20240620-v1:0" |
|
|
|
response = bedrock_runtime.invoke_model( |
|
body=claude_body, |
|
modelId=model_id, |
|
accept="application/json", |
|
contentType="application/json", |
|
) |
|
|
|
response_body = json.loads(response.get("body").read()) |
|
json_response = json.loads(response_body["content"][0]['text']) |
|
|
|
return json_response['row'], json_response['col'], json_response['entertainment_comment'] |
|
|
|
|
|
def create_empty_board(): |
|
"""Create an empty game board.""" |
|
return [[EMPTY for _ in range(BOARD_SIZE)] for _ in range(BOARD_SIZE)] |
|
|
|
def is_valid_position(row, col): |
|
"""Check if a position is within the board boundaries.""" |
|
return 0 <= row < BOARD_SIZE and 0 <= col < BOARD_SIZE |
|
|
|
def check_adjacent_cells(board, row, col): |
|
"""Check if any adjacent cells (including diagonals) contain a ship.""" |
|
for i in range(-1, 2): |
|
for j in range(-1, 2): |
|
if i == 0 and j == 0: |
|
continue |
|
new_row, new_col = row + i, col + j |
|
if is_valid_position(new_row, new_col) and board[new_row][new_col] != EMPTY: |
|
return False |
|
return True |
|
|
|
def can_place_ship(board, row, col, size, direction): |
|
"""Check if a ship can be placed at the given position.""" |
|
if direction == 'horizontal': |
|
if col + size > BOARD_SIZE: |
|
return False |
|
|
|
for i in range(-1, size + 1): |
|
for j in range(-1, 2): |
|
check_row = row + j |
|
check_col = col + i |
|
if is_valid_position(check_row, check_col): |
|
if board[check_row][check_col] != EMPTY: |
|
return False |
|
else: |
|
if row + size > BOARD_SIZE: |
|
return False |
|
|
|
for i in range(-1, 2): |
|
for j in range(-1, size + 1): |
|
check_row = row + j |
|
check_col = col + i |
|
if is_valid_position(check_row, check_col): |
|
if board[check_row][check_col] != EMPTY: |
|
return False |
|
return True |
|
|
|
def place_ships_randomly(board): |
|
"""Place ships randomly on the board.""" |
|
for ship in SHIPS: |
|
while True: |
|
row = random.randint(0, BOARD_SIZE - 1) |
|
col = random.randint(0, BOARD_SIZE - 1) |
|
direction = random.choice(['horizontal', 'vertical']) |
|
if can_place_ship(board, row, col, ship['size'], direction): |
|
place_ship(board, row, col, ship['size'], direction, ship['symbol']) |
|
break |
|
return board |
|
|
|
def place_ship(board, row, col, size, direction, symbol): |
|
"""Place a ship on the board.""" |
|
if direction == 'horizontal': |
|
for i in range(size): |
|
board[row][col+i] = symbol |
|
else: |
|
for i in range(size): |
|
board[row+i][col] = symbol |
|
|
|
def initialize_game(): |
|
"""Initialize the game state.""" |
|
if 'difficulty' not in st.session_state: |
|
st.session_state.difficulty = "Beginner" |
|
|
|
if 'game_state' not in st.session_state: |
|
st.session_state.game_state = { |
|
'player_board': place_ships_randomly(create_empty_board()), |
|
'opponent_board': place_ships_randomly(create_empty_board()), |
|
'player_attacks': create_empty_board(), |
|
'player_hits_left': 17, |
|
'opponent_attacks': create_empty_board(), |
|
'openent_moves': [], |
|
'opponent_hits_left': 17, |
|
'current_player': 'player', |
|
'game_state': 'not_started', |
|
'game_over': False, |
|
'message': '' |
|
} |
|
|
|
if 'game_id' not in st.session_state.game_state: |
|
st.session_state.game_state['game_id'] = ObjectId() |
|
gameId = st.session_state.game_state['game_id'] |
|
|
|
|
|
games.update_one({'game_id': gameId, 'type': 'player'}, |
|
{"$set": {'game_id': gameId, |
|
'board': st.session_state.game_state['player_board'], |
|
'attacking_board': st.session_state.game_state['player_attacks']}}, |
|
upsert=True) |
|
games.update_one({'game_id': gameId, 'type': 'opponent'}, |
|
{"$set": {'game_id': gameId, |
|
'board': st.session_state.game_state['opponent_board'], |
|
'oponent_moves': st.session_state.game_state['openent_moves'], |
|
'attacking_board': st.session_state.game_state['opponent_attacks']}}, |
|
upsert=True) |
|
|
|
def check_game_over(): |
|
"""Check if the game is over.""" |
|
game_state = st.session_state.game_state |
|
|
|
if game_state['player_hits_left'] == 0: |
|
game_state['message'] = "You won!" |
|
game_state['game_over'] = True |
|
return True |
|
elif game_state['opponent_hits_left'] == 0: |
|
game_state['message'] = "You lost!" |
|
game_state['game_over'] = True |
|
return True |
|
|
|
def render_board(board, is_opponent=False): |
|
"""Render the game board using Streamlit components.""" |
|
for i, row in enumerate(board): |
|
cols = st.columns(BOARD_SIZE) |
|
for j, cell in enumerate(row): |
|
with cols[j]: |
|
if is_opponent and cell == EMPTY: |
|
if st.button('🌊', key=f'opp_{i}_{j}', on_click=lambda r=i, c=j: attack(r, c)): |
|
pass |
|
else: |
|
|
|
if cell == EMPTY: |
|
button_color = 'secondary' |
|
elif cell in [HIT, MISS]: |
|
button_color = 'primary' |
|
else: |
|
button_color = 'primary' if not is_opponent else 'secondary' |
|
|
|
|
|
st.button(cell, key=f'{"opp" if is_opponent else "player"}_{i}_{j}', type=button_color) |
|
|
|
def attack(row, col): |
|
"""Process a player's attack.""" |
|
game_state = st.session_state.game_state |
|
if game_state['opponent_board'][row][col] != EMPTY: |
|
game_state['player_attacks'][row][col] = HIT |
|
game_state['message'] = "You hit a ship!" |
|
game_state['opponent_hits_left'] -= 1 |
|
else: |
|
game_state['player_attacks'][row][col] = MISS |
|
game_state['message'] = "You missed!" |
|
|
|
update_database('player') |
|
game_state['current_player'] = 'opponent' |
|
|
|
def update_database(type): |
|
"""Update the game state in the database.""" |
|
game_state = st.session_state.game_state |
|
gameId = game_state['game_id'] |
|
games.update_one({'game_id': gameId, 'type': type}, |
|
{'$set': {'attacking_board': game_state['player_attacks'], |
|
'oponent_moves': game_state['openent_moves']}}) |
|
|
|
def opponent_turn(): |
|
"""Process the opponent's turn.""" |
|
game_state = st.session_state.game_state |
|
with st.spinner("Opponent is making a move..."): |
|
opponent_doc = games.find_one({'game_id': ObjectId(game_state['game_id']), 'type': 'opponent'}) |
|
print("opponent_doc:", opponent_doc) |
|
row, col, comment = get_bedrock_claude_move(opponent_doc['attacking_board'], openent_moves=opponent_doc['oponent_moves']) |
|
game_state['current_player'] = 'player' |
|
|
|
if game_state['player_board'][row][col] != EMPTY and game_state['opponent_attacks'][row][col] != MISS: |
|
game_state['openent_moves'].append({'row': row, 'col': col, 'status': 'hit'}) |
|
game_state['opponent_attacks'][row][col] = HIT |
|
game_state['player_board'][row][col] = HIT |
|
game_state['player_hits_left'] -= 1 |
|
game_state['message'] = f"Opponent hit your ship! Cell row {row} col {col} Opponent: {comment}" |
|
else: |
|
game_state['openent_moves'].append({'row': row, 'col': col, 'status': 'miss'}) |
|
game_state['opponent_attacks'][row][col] = MISS |
|
game_state['player_board'][row][col] = MISS |
|
game_state['message'] = f"Opponent missed! Cell row {row} col {col}, Opponent: {comment}" |
|
|
|
update_database('opponent') |
|
game_state['current_player'] = 'player' |
|
|
|
def main(): |
|
"""Main function to run the Battleship game.""" |
|
st.title('Battleship Game') |
|
|
|
|
|
if 'difficulty' not in st.session_state: |
|
st.session_state.difficulty = "Beginner" |
|
|
|
|
|
st.session_state.difficulty = st.selectbox( |
|
"Select Difficulty", |
|
["Beginner", "Expert"], |
|
index=0 if st.session_state.difficulty == "Beginner" else 1 |
|
) |
|
|
|
initialize_game() |
|
if st.session_state.game_state['game_over']: |
|
st.subheader('Game Over! You Lost!' if st.session_state.game_state['player_hits_left'] == 0 else 'You Won!') |
|
else: |
|
st.subheader('Your Turn') |
|
|
|
col1, col3, col2 = st.columns(3) |
|
with col1: |
|
st.subheader('Your Board') |
|
render_board(st.session_state.game_state['player_board']) |
|
|
|
with col3: |
|
st.subheader("Opponent Ships") |
|
for ship in SHIPS: |
|
my_ships, status = st.columns(2) |
|
with my_ships: |
|
st.write(f" {ship['symbol']} {ship['name']}, size: {ship['size']}") |
|
with status: |
|
st.checkbox("Sunk", key=f"{ship['name']}_sunk") |
|
|
|
st.markdown("#### Opponent history") |
|
container = st.container(height=250) |
|
for move in st.session_state.game_state['openent_moves']: |
|
container.write(f"Row: {move['row']} Col: {move['col']} Status: {move['status']}") |
|
|
|
with col2: |
|
st.subheader("Opponent's Board") |
|
render_board(st.session_state.game_state['player_attacks'], is_opponent=True) |
|
|
|
st.write(st.session_state.game_state['message']) |
|
|
|
if st.button('Reset Game'): |
|
st.session_state.clear() |
|
st.rerun() |
|
|
|
if st.session_state.game_state['current_player'] == 'opponent': |
|
opponent_turn() |
|
if check_game_over(): |
|
st.session_state.game_state['current_player'] = 'game_over' |
|
else: |
|
st.rerun() |
|
|
|
st.header('Debug') |
|
expander = st.expander("Game State") |
|
for row in st.session_state.game_state['opponent_board']: |
|
expander.markdown(row) |
|
|
|
if st.button("Chat"): |
|
pop_chat() |
|
|
|
if __name__ == '__main__': |
|
main() |
|
|