Pash1986's picture
Upload app.py
0464478 verified
import streamlit as st
import random
import json
import os
import boto3
from pymongo import MongoClient
from bson.objectid import ObjectId
# Constants
BOARD_SIZE = 10
EMPTY = ':ocean:'
HIT = ':boom:'
MISS = ':heavy_multiplication_x:'
SHIPS = [
{"name": "Carrier", "size": 5, "symbol": ":ship:"},
{"name": "Battleship", "size": 4, "symbol": ":motor_boat:"},
{"name": "Cruiser", "size": 3, "symbol": ":boat:"},
{"name": "Submarine", "size": 3, "symbol": ":ferry:"},
{"name": "Destroyer", "size": 2, "symbol": ":speedboat:"}
]
# MongoDB Atlas Connection
client = MongoClient(os.environ.get('MONGODB_ATLAS_URI'))
db = client['battleship']
games = db['games']
# AWS Bedrock Client Setup
bedrock_runtime = boto3.client('bedrock-runtime',
aws_access_key_id=os.environ.get('AWS_ACCESS_KEY'),
aws_secret_access_key=os.environ.get('AWS_SECRET_KEY'),
region_name="us-west-2")
def get_bedrock_claude_move(board, openent_moves=None):
"""
Get the next move from Claude AI using AWS Bedrock.
"""
print("inside get_bedrock_claude_move")
claude_body = json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 5000,
"temperature": 0,
"system": f"Please provide the next move as an opponent in the battleship game , the board is {BOARD_SIZE} X {BOARD_SIZE} . Be smart and stratigic. Respond only in JSON format strictly: 'row' : ... , 'col' : ... , 'entertainment_comment' and nothing more . ",
"messages": [{
"role": "user",
"content": [
{"type": "text", "text": f"The current board: {board} and your moves were: {openent_moves if openent_moves else 'None'}"}
]
}]
})
model_id = "anthropic.claude-3-5-sonnet-20241022-v2:0" if st.session_state.difficulty == "Expert" else "anthropic.claude-3-5-sonnet-20240620-v1:0"
response = bedrock_runtime.invoke_model(
body=claude_body,
modelId=model_id,
accept="application/json",
contentType="application/json",
)
response_body = json.loads(response.get("body").read())
json_response = json.loads(response_body["content"][0]['text'])
return json_response['row'], json_response['col'], json_response['entertainment_comment']
# Game Setup Functions
def create_empty_board():
"""Create an empty game board."""
return [[EMPTY for _ in range(BOARD_SIZE)] for _ in range(BOARD_SIZE)]
def is_valid_position(row, col):
"""Check if a position is within the board boundaries."""
return 0 <= row < BOARD_SIZE and 0 <= col < BOARD_SIZE
def check_adjacent_cells(board, row, col):
"""Check if any adjacent cells (including diagonals) contain a ship."""
for i in range(-1, 2):
for j in range(-1, 2):
if i == 0 and j == 0:
continue
new_row, new_col = row + i, col + j
if is_valid_position(new_row, new_col) and board[new_row][new_col] != EMPTY:
return False
return True
def can_place_ship(board, row, col, size, direction):
"""Check if a ship can be placed at the given position."""
if direction == 'horizontal':
if col + size > BOARD_SIZE:
return False
# Check the ship's cells and their adjacent cells
for i in range(-1, size + 1):
for j in range(-1, 2):
check_row = row + j
check_col = col + i
if is_valid_position(check_row, check_col):
if board[check_row][check_col] != EMPTY:
return False
else: # vertical
if row + size > BOARD_SIZE:
return False
# Check the ship's cells and their adjacent cells
for i in range(-1, 2):
for j in range(-1, size + 1):
check_row = row + j
check_col = col + i
if is_valid_position(check_row, check_col):
if board[check_row][check_col] != EMPTY:
return False
return True
def place_ships_randomly(board):
"""Place ships randomly on the board."""
for ship in SHIPS:
while True:
row = random.randint(0, BOARD_SIZE - 1)
col = random.randint(0, BOARD_SIZE - 1)
direction = random.choice(['horizontal', 'vertical'])
if can_place_ship(board, row, col, ship['size'], direction):
place_ship(board, row, col, ship['size'], direction, ship['symbol'])
break
return board
def place_ship(board, row, col, size, direction, symbol):
"""Place a ship on the board."""
if direction == 'horizontal':
for i in range(size):
board[row][col+i] = symbol
else:
for i in range(size):
board[row+i][col] = symbol
def initialize_game():
"""Initialize the game state."""
if 'difficulty' not in st.session_state:
st.session_state.difficulty = "Beginner"
if 'game_state' not in st.session_state:
st.session_state.game_state = {
'player_board': place_ships_randomly(create_empty_board()),
'opponent_board': place_ships_randomly(create_empty_board()),
'player_attacks': create_empty_board(),
'player_hits_left': 17,
'opponent_attacks': create_empty_board(),
'openent_moves': [],
'opponent_hits_left': 17,
'current_player': 'player',
'game_state': 'not_started',
'game_over': False,
'message': ''
}
if 'game_id' not in st.session_state.game_state:
st.session_state.game_state['game_id'] = ObjectId()
gameId = st.session_state.game_state['game_id']
# Initialize game data in MongoDB
games.update_one({'game_id': gameId, 'type': 'player'},
{"$set": {'game_id': gameId,
'board': st.session_state.game_state['player_board'],
'attacking_board': st.session_state.game_state['player_attacks']}},
upsert=True)
games.update_one({'game_id': gameId, 'type': 'opponent'},
{"$set": {'game_id': gameId,
'board': st.session_state.game_state['opponent_board'],
'oponent_moves': st.session_state.game_state['openent_moves'],
'attacking_board': st.session_state.game_state['opponent_attacks']}},
upsert=True)
def check_game_over():
"""Check if the game is over."""
game_state = st.session_state.game_state
if game_state['player_hits_left'] == 0:
game_state['message'] = "You won!"
game_state['game_over'] = True
return True
elif game_state['opponent_hits_left'] == 0:
game_state['message'] = "You lost!"
game_state['game_over'] = True
return True
def render_board(board, is_opponent=False):
"""Render the game board using Streamlit components."""
for i, row in enumerate(board):
cols = st.columns(BOARD_SIZE)
for j, cell in enumerate(row):
with cols[j]:
if is_opponent and cell == EMPTY:
if st.button('🌊', key=f'opp_{i}_{j}', on_click=lambda r=i, c=j: attack(r, c)):
pass
else:
# Determine the button color based on the cell content
if cell == EMPTY:
button_color = 'secondary'
elif cell in [HIT, MISS]:
button_color = 'primary' # Use default color for hits and misses
else:
button_color = 'primary' if not is_opponent else 'secondary'
# Use Streamlit's button with theme colors
st.button(cell, key=f'{"opp" if is_opponent else "player"}_{i}_{j}', type=button_color)
def attack(row, col):
"""Process a player's attack."""
game_state = st.session_state.game_state
if game_state['opponent_board'][row][col] != EMPTY:
game_state['player_attacks'][row][col] = HIT
game_state['message'] = "You hit a ship!"
game_state['opponent_hits_left'] -= 1
else:
game_state['player_attacks'][row][col] = MISS
game_state['message'] = "You missed!"
update_database('player')
game_state['current_player'] = 'opponent'
def update_database(type):
"""Update the game state in the database."""
game_state = st.session_state.game_state
gameId = game_state['game_id']
games.update_one({'game_id': gameId, 'type': type},
{'$set': {'attacking_board': game_state['player_attacks'],
'oponent_moves': game_state['openent_moves']}})
def opponent_turn():
"""Process the opponent's turn."""
game_state = st.session_state.game_state
with st.spinner("Opponent is making a move..."):
opponent_doc = games.find_one({'game_id': ObjectId(game_state['game_id']), 'type': 'opponent'})
print("opponent_doc:", opponent_doc)
row, col, comment = get_bedrock_claude_move(opponent_doc['attacking_board'], openent_moves=opponent_doc['oponent_moves'])
game_state['current_player'] = 'player'
if game_state['player_board'][row][col] != EMPTY and game_state['opponent_attacks'][row][col] != MISS:
game_state['openent_moves'].append({'row': row, 'col': col, 'status': 'hit'})
game_state['opponent_attacks'][row][col] = HIT
game_state['player_board'][row][col] = HIT
game_state['player_hits_left'] -= 1
game_state['message'] = f"Opponent hit your ship! Cell row {row} col {col} Opponent: {comment}"
else:
game_state['openent_moves'].append({'row': row, 'col': col, 'status': 'miss'})
game_state['opponent_attacks'][row][col] = MISS
game_state['player_board'][row][col] = MISS
game_state['message'] = f"Opponent missed! Cell row {row} col {col}, Opponent: {comment}"
update_database('opponent')
game_state['current_player'] = 'player'
def main():
"""Main function to run the Battleship game."""
st.title('Battleship Game')
# Initialize difficulty in session state if not present
if 'difficulty' not in st.session_state:
st.session_state.difficulty = "Beginner"
# Add difficulty selector
st.session_state.difficulty = st.selectbox(
"Select Difficulty",
["Beginner", "Expert"],
index=0 if st.session_state.difficulty == "Beginner" else 1
)
initialize_game()
if st.session_state.game_state['game_over']:
st.subheader('Game Over! You Lost!' if st.session_state.game_state['player_hits_left'] == 0 else 'You Won!')
else:
st.subheader('Your Turn')
col1, col3, col2 = st.columns(3)
with col1:
st.subheader('Your Board')
render_board(st.session_state.game_state['player_board'])
with col3:
st.subheader("Opponent Ships")
for ship in SHIPS:
my_ships, status = st.columns(2)
with my_ships:
st.write(f" {ship['symbol']} {ship['name']}, size: {ship['size']}")
with status:
st.checkbox("Sunk", key=f"{ship['name']}_sunk")
st.markdown("#### Opponent history")
container = st.container(height=250)
for move in st.session_state.game_state['openent_moves']:
container.write(f"Row: {move['row']} Col: {move['col']} Status: {move['status']}")
with col2:
st.subheader("Opponent's Board")
render_board(st.session_state.game_state['player_attacks'], is_opponent=True)
st.write(st.session_state.game_state['message'])
if st.button('Reset Game'):
st.session_state.clear()
st.rerun()
if st.session_state.game_state['current_player'] == 'opponent':
opponent_turn()
if check_game_over():
st.session_state.game_state['current_player'] = 'game_over'
else:
st.rerun()
st.header('Debug')
expander = st.expander("Game State")
for row in st.session_state.game_state['opponent_board']:
expander.markdown(row)
if st.button("Chat"):
pop_chat()
if __name__ == '__main__':
main()