ArtificialLife / app.py
Aluode's picture
Upload 3 files
ee0a63c verified
import torch
import torch.nn as nn
import torch.nn.functional as F # Added for activation functions
import numpy as np
from dataclasses import dataclass, field
import pygame
import gradio as gr
from typing import List, Tuple, Dict, Optional, Set
import random
import colorsys
import pymunk
import time
import threading
from queue import Queue
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from PIL import Image
import io
import logging # Added for logging
# ==============================
# Logging Configuration
# ==============================
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler("simulation.log"),
logging.StreamHandler()
]
)
# ==============================
# Configuration Dataclasses
# ==============================
@dataclass
class SimulationConfig:
WIDTH: int = 1024
HEIGHT: int = 768
TARGET_FPS: int = 60
MIN_ORGANISMS: int = 5
MAX_ORGANISMS: int = 50 # Population cap set to 50
MUTATION_RATE: float = 0.1
REPRODUCTION_ENERGY: float = 150.0
INITIAL_ENERGY: float = 100.0
BRAIN_UPDATE_RATE: int = 10 # Hz
MAX_NEURONS: int = 1000
ENERGY_DECAY: float = 0.1
@dataclass
class NeuronState:
activation: float = 0.0
connections: int = 0
energy: float = 100.0
memory: List[float] = field(default_factory=lambda: [0.0] * 8)
@dataclass
class VisualizationConfig:
BACKGROUND_COLOR: Tuple[int, int, int] = (10, 10, 30)
NEURON_COLORS: Dict[str, Tuple[int, int, int]] = field(default_factory=lambda: {
'active': (255, 255, 0),
'inactive': (100, 100, 100),
'connected': (0, 255, 255)
})
CONNECTION_COLOR: Tuple[int, int, int, int] = (50, 50, 200, 100)
ENERGY_COLOR: Tuple[int, int, int] = (0, 255, 0)
MAX_NEURAL_CONNECTIONS: int = 50
@dataclass
class PhysicsConfig:
COLLISION_TYPE_ORGANISM: int = 1
ELASTICITY: float = 0.7
FRICTION: float = 0.5
DAMPING: float = 0.9
INTERACTION_RADIUS: float = 50.0
FORCE_SCALE: float = 100.0
# ==============================
# Neural Processing System
# ==============================
class FractalNeuron(nn.Module):
def __init__(self, input_dim=16, output_dim=16, depth=0, max_depth=2):
super().__init__()
self.depth = depth
self.max_depth = max_depth
# Store dimensions
self.input_dim = input_dim
self.output_dim = output_dim
self.hidden_dim = max(input_dim // 2, 8) # Add explicit hidden_dim
# Enhanced neural processing layers with LeakyReLU
self.synapse = nn.Sequential(
nn.Linear(input_dim, self.hidden_dim), # First layer: input_dim to hidden_dim
nn.LeakyReLU(negative_slope=0.1, inplace=True),
nn.Linear(self.hidden_dim, output_dim), # Second layer: hidden_dim to output_dim
nn.Tanh()
)
# Initialize weights using Xavier uniform initialization
for layer in self.synapse:
if isinstance(layer, nn.Linear):
nn.init.xavier_uniform_(layer.weight)
if layer.bias is not None:
nn.init.constant_(layer.bias, 0.0)
# Set to eval mode to prevent BatchNorm issues
self.eval()
# State maintenance with bounded values
self.state = NeuronState()
self.state.activation = 0.0
self.state.energy = min(100.0, max(0.0, self.state.energy))
# Memory processing with correct dimensions
self.memory_gate = nn.Sequential(
nn.Linear(output_dim + 8, 8),
nn.Sigmoid()
)
# Initialize memory_gate weights
for layer in self.memory_gate:
if isinstance(layer, nn.Linear):
nn.init.xavier_uniform_(layer.weight)
if layer.bias is not None:
nn.init.constant_(layer.bias, 0.0)
# Child neurons with matching dimensions
self.sub_neurons = nn.ModuleList([])
if depth < max_depth:
branching_factor = max(1, 2 - depth)
for _ in range(branching_factor):
child = FractalNeuron(
input_dim=output_dim, # Child's input_dim matches parent's output_dim
output_dim=output_dim, # Keep output_dim consistent
depth=depth + 1,
max_depth=max_depth
)
self.sub_neurons.append(child)
def forward(self, x):
"""Forward pass for PyTorch module compatibility"""
return self.process_signal(x)
def process_signal(self, x, external_input=None):
"""Process input signal through the neuron"""
try:
with torch.no_grad():
# Ensure we're in eval mode
self.eval()
# Reshape input for processing
if len(x.shape) == 1:
x = x.unsqueeze(0) # Add batch dimension
# Check for NaNs in input
if torch.isnan(x).any():
logging.warning("NaN detected in input tensor. Returning zero tensor.")
return torch.zeros(self.output_dim)
# Add external input if provided
if external_input is not None:
if len(external_input.shape) == 1:
external_input = external_input.unsqueeze(0)
x = torch.cat([x, external_input], dim=-1)
# Process through synapse with proper shapes
x = x.to(torch.float32) # Ensure float32 dtype
try:
x = self.synapse(x)
except RuntimeError as e:
logging.error(f"Error in synapse processing: {e}")
return torch.zeros(self.output_dim)
# Update memory with bounds checking
try:
memory_tensor = torch.tensor(self.state.memory).to(torch.float32)
if len(x.shape) == 1:
x_for_memory = x.unsqueeze(0)
else:
x_for_memory = x
memory_input = torch.cat([x_for_memory, memory_tensor.unsqueeze(0)], dim=-1)
new_memory = self.memory_gate(memory_input)
new_memory = torch.clamp(new_memory, 0.0, 1.0)
if not torch.isnan(new_memory).any():
self.state.memory = new_memory[0].tolist()
except Exception as e:
logging.error(f"Error updating memory: {e}")
# Update activation with bounded value
activation = float(torch.clamp(x.mean(), -1.0, 1.0))
if not np.isnan(activation):
self.state.activation = activation
# Process through children with error handling
if self.sub_neurons:
child_outputs = []
for child in self.sub_neurons:
try:
# Ensure x has correct shape before passing to child
child_input = x.squeeze(0) if len(x.shape) == 2 else x
# Ensure input matches child's expected input dimension
if child_input.shape[-1] != child.input_dim:
child_input = child_input[:child.input_dim]
child_out = child.process_signal(child_input)
if not torch.isnan(child_out).any():
# Ensure child output has correct shape for stacking
if len(child_out.shape) == 1:
child_out = child_out.unsqueeze(0)
child_outputs.append(child_out)
except Exception as e:
logging.error(f"Error in child neuron processing: {e}")
continue
if child_outputs:
child_outputs = torch.stack(child_outputs)
x = torch.mean(child_outputs, dim=0)
x = torch.clamp(x, -1.0, 1.0)
# Update energy with bounds
energy_cost = 0.1 * self.depth
self.state.energy = max(0.0, min(100.0, self.state.energy - energy_cost))
# Remove batch dimension if it was added
if len(x.shape) == 2:
x = x.squeeze(0)
return x
except Exception as e:
logging.error(f"Error in process_signal: {e}")
return torch.zeros(self.output_dim)
def interact_with(self, other_neuron, strength=0.5):
"""Interact with another neuron"""
try:
# Bound strength value
strength = max(0.0, min(1.0, strength))
# Share neural states with bounds
shared_activation = (self.state.activation + other_neuron.state.activation) / 2
shared_activation = float(shared_activation)
if np.isnan(shared_activation):
logging.warning("NaN detected in shared activation. Using default value.")
shared_activation = 0.0
self.state.activation = shared_activation
other_neuron.state.activation = shared_activation
# Share memories with bounds checking
shared_memory = []
for a, b in zip(self.state.memory, other_neuron.state.memory):
shared_value = (float(a) + float(b)) / 2
shared_value = max(0.0, min(1.0, shared_value))
shared_memory.append(shared_value)
self.state.memory = shared_memory
other_neuron.state.memory = shared_memory
# Update connections with bounds
max_connections = 100
self.state.connections = min(self.state.connections + 1, max_connections)
other_neuron.state.connections = min(other_neuron.state.connections + 1, max_connections)
return shared_activation
except Exception as e:
logging.error(f"Error in interact_with: {e}")
return 0.0
def save_state(self):
"""Save the current state of the neuron"""
return {
'activation': self.state.activation,
'connections': self.state.connections,
'energy': self.state.energy,
'memory': self.state.memory.copy()
}
def load_state(self, state_dict):
"""Load a previously saved state"""
try:
self.state.activation = state_dict['activation']
self.state.connections = state_dict['connections']
self.state.energy = state_dict['energy']
self.state.memory = state_dict['memory'].copy()
except Exception as e:
logging.error(f"Error loading neuron state: {e}")
def clone(self):
"""Create a deep copy of the neuron"""
try:
new_neuron = FractalNeuron(
input_dim=self.input_dim,
output_dim=self.output_dim,
depth=self.depth,
max_depth=self.max_depth
)
new_neuron.load_state(self.save_state())
return new_neuron
except Exception as e:
logging.error(f"Error cloning neuron: {e}")
return None
def mutate(self, mutation_rate=0.1):
"""Apply random mutations to the neuron"""
try:
with torch.no_grad():
# Mutate weights
for layer in self.synapse:
if isinstance(layer, nn.Linear):
mask = torch.rand_like(layer.weight) < mutation_rate
mutations = torch.randn_like(layer.weight) * 0.1
layer.weight.data[mask] += mutations[mask]
if layer.bias is not None:
mask = torch.rand_like(layer.bias) < mutation_rate
mutations = torch.randn_like(layer.bias) * 0.1
layer.bias.data[mask] += mutations[mask]
# Mutate memory gate
for layer in self.memory_gate:
if isinstance(layer, nn.Linear):
mask = torch.rand_like(layer.weight) < mutation_rate
mutations = torch.randn_like(layer.weight) * 0.1
layer.weight.data[mask] += mutations[mask]
if layer.bias is not None:
mask = torch.rand_like(layer.bias) < mutation_rate
mutations = torch.randn_like(layer.bias) * 0.1
layer.bias.data[mask] += mutations[mask]
# Recursively mutate child neurons
for child in self.sub_neurons:
child.mutate(mutation_rate)
except Exception as e:
logging.error(f"Error mutating neuron: {e}")
class FractalBrain:
def __init__(self, input_dim=32, hidden_dim=64, max_neurons=1000):
self.input_dim = min(input_dim, 32) # Limit input dimension
self.hidden_dim = min(hidden_dim, 64) # Limit hidden dimension
self.max_neurons = min(max_neurons, 1000) # Limit maximum neurons
# Core neural network components with reduced complexity
self.visual_cortex = FractalNeuron(self.input_dim, self.hidden_dim, max_depth=2)
self.thought_processor = FractalNeuron(self.hidden_dim, self.hidden_dim, max_depth=2)
self.action_generator = FractalNeuron(self.hidden_dim, self.input_dim, max_depth=2)
# State tracking with bounds
self.total_neurons = self.count_neurons()
self.total_energy = 100.0 # Reduced initial energy
self.memories = []
self.current_vision = None
def get_vitals(self):
"""Get vital statistics of the brain with safety checks"""
try:
# Calculate average activation safely
activations = []
for neuron in [self.visual_cortex, self.thought_processor, self.action_generator]:
try:
activation = float(neuron.state.activation)
if not np.isnan(activation) and not np.isinf(activation):
activations.append(activation)
except (AttributeError, ValueError, TypeError):
activations.append(0.0)
avg_activation = sum(activations) / max(len(activations), 1)
avg_activation = max(-1.0, min(1.0, avg_activation))
# Get connection counts safely
connections = []
for neuron in [self.visual_cortex, self.thought_processor, self.action_generator]:
try:
conn_count = int(neuron.state.connections)
if not np.isnan(conn_count) and not np.isinf(conn_count):
connections.append(conn_count)
except (AttributeError, ValueError, TypeError):
connections.append(0)
total_connections = sum(connections)
return {
'neurons': min(self.total_neurons, self.max_neurons),
'energy': max(0.0, min(1000.0, float(self.total_energy))),
'connections': max(0, min(1000, total_connections)),
'activation': avg_activation
}
except Exception as e:
logging.error(f"Exception in get_vitals: {e}. Returning default vitals.")
# Return safe default values if anything goes wrong
return {
'neurons': 1,
'energy': 0.0,
'connections': 0,
'activation': 0.0
}
def process_vision(self, visual_input):
try:
with torch.no_grad():
# Ensure input is valid and properly shaped
visual_input = visual_input.clone().detach()
if len(visual_input.shape) == 1:
visual_input = visual_input.unsqueeze(0) # Add batch dimension
if torch.isnan(visual_input).any():
logging.warning("NaN detected in visual_input. Replacing with zeros.")
visual_input = torch.zeros_like(visual_input)
visual_input = torch.clamp(visual_input, -10.0, 10.0)
# Process through neural components with shape handling
try:
visual_features = self.visual_cortex.process_signal(visual_input)
if len(visual_features.shape) == 1:
visual_features = visual_features.unsqueeze(0)
except Exception as e:
logging.error(f"Exception in visual_cortex.process_signal: {e}. Using zero tensor.")
visual_features = torch.zeros((1, self.hidden_dim))
try:
thoughts = self.thought_processor.process_signal(visual_features)
if len(thoughts.shape) == 1:
thoughts = thoughts.unsqueeze(0)
except Exception as e:
logging.error(f"Exception in thought_processor.process_signal: {e}. Using zero tensor.")
thoughts = torch.zeros((1, self.hidden_dim))
try:
actions = self.action_generator.process_signal(thoughts)
except Exception as e:
logging.error(f"Exception in action_generator.process_signal: {e}. Using zero tensor.")
actions = torch.zeros(self.input_dim)
# Remove batch dimension from final output if present
if len(actions.shape) > 1:
actions = actions.squeeze(0)
# Ensure outputs are bounded
actions = torch.clamp(actions, -1.0, 1.0)
# Energy consumption with bounds
self.total_energy = max(0.0, min(1000.0, self.total_energy - 0.1))
return actions
except Exception as e:
logging.error(f"Exception in process_vision: {e}. Returning zero actions.")
return torch.zeros(self.input_dim)
def interact_with(self, other_brain, strength=0.5):
try:
# Bound strength value
strength = max(0.0, min(1.0, strength))
# Neural interactions with error handling
shared_visual = self.visual_cortex.interact_with(other_brain.visual_cortex, strength)
shared_thoughts = self.thought_processor.interact_with(other_brain.thought_processor, strength)
shared_actions = self.action_generator.interact_with(other_brain.action_generator, strength)
# Energy transfer with bounds
energy_diff = self.total_energy - other_brain.total_energy
transfer = max(-10.0, min(10.0, energy_diff * 0.1))
self.total_energy = max(0.0, min(1000.0, self.total_energy - transfer))
other_brain.total_energy = max(0.0, min(1000.0, other_brain.total_energy + transfer))
return shared_visual, shared_thoughts, shared_actions
except Exception as e:
logging.error(f"Exception in interact_with: {e}. Returning zeros.")
return 0.0, 0.0, 0.0
def count_neurons(self):
"""Safely count neurons with error handling"""
try:
def count_recursive(module):
count = 1
if hasattr(module, 'sub_neurons'):
for child in module.sub_neurons:
count += count_recursive(child)
return min(count, self.max_neurons) # Limit total count
total = sum(count_recursive(x) for x in [
self.visual_cortex,
self.thought_processor,
self.action_generator
])
return min(total, self.max_neurons)
except Exception as e:
logging.error(f"Exception in count_neurons: {e}. Returning 1.")
return 1 # Return minimum count if counting fails
def can_grow(self):
"""Check if brain can grow new neurons"""
return (self.total_neurons < self.max_neurons and
self.total_energy > 100.0)
# ==============================
# Organism Definition and Behavior
# ==============================
class FractalOrganism:
def __init__(self, x, y, size=20, feature_dim=32, max_neurons=1000):
# Physical properties
self.pos = pygame.math.Vector2(x, y)
self.vel = pygame.math.Vector2(0, 0)
self.acc = pygame.math.Vector2(0, 0)
self.size = size
self.mass = size * 0.1
# Neural system
self.brain = FractalBrain(input_dim=feature_dim, hidden_dim=feature_dim*2, max_neurons=max_neurons)
self.feature_dim = feature_dim
self.features = torch.randn(feature_dim)
# Visual properties with validation
self.color = self._features_to_color()
self.pattern_type = self._determine_pattern_type()
self.pattern_intensity = self._determine_pattern_intensity()
self.shape_points = self._generate_shape()
# Life properties
self.alive = True
self.age = 0
def _validate_color_component(self, value):
"""Ensure color component is a valid integer between 0 and 255"""
try:
value = int(value)
return max(0, min(255, value))
except (ValueError, TypeError):
return 0
def update(self, screen_width, screen_height, organisms):
"""Update organism state"""
if not self.alive:
return
try:
# Physics integration
# Update velocity with acceleration
self.vel += self.acc
# Apply friction/damping
self.vel *= 0.98 # Slight damping to prevent infinite movement
# Update position with velocity
self.pos += self.vel
# Clear acceleration for next frame
self.acc.x = 0
self.acc.y = 0
# Get visual input and process through brain
visual_input = self._get_visual_input(organisms)
actions = self.brain.process_vision(visual_input)
# Apply neural network outputs as forces if valid
if isinstance(actions, torch.Tensor) and not torch.isnan(actions).any():
self._apply_action_forces(actions)
# Wrap around screen edges
self.pos.x = self.pos.x % screen_width
self.pos.y = self.pos.y % screen_height
# Update life properties
self.age += 1
vitals = self.brain.get_vitals()
# Death conditions
if vitals['energy'] <= 0 or self.age > 1000:
self.alive = False
except Exception as e:
logging.error(f"Error updating organism {id(self)}: {e}")
logging.debug(f"Organism state - Age: {self.age}, Alive: {self.alive}")
def _get_visual_input(self, organisms):
"""Create visual input tensor from surroundings"""
try:
visual_input = torch.zeros(self.feature_dim)
# Add self-perception (first 3 features are color)
color_tensor = torch.tensor([c/255.0 for c in self.color])
visual_input[:3] = color_tensor[:3]
# Add velocity perception (helps with movement learning)
if hasattr(self, 'vel'):
velocity_magnitude = np.sqrt(self.vel.x**2 + self.vel.y**2)
velocity_direction = np.arctan2(self.vel.y, self.vel.x) / np.pi
if 3 < len(visual_input):
visual_input[3] = float(velocity_magnitude) / 10.0 # Normalize velocity
if 4 < len(visual_input):
visual_input[4] = float(velocity_direction)
# Add perception of nearby organisms
for other in organisms:
if other != self and other.alive:
distance = self.pos.distance_to(other.pos)
if distance < 100: # Visual range
direction = (other.pos - self.pos)
if direction.length() > 0:
direction = direction.normalize()
angle = np.arctan2(direction.y, direction.x)
# Map angle to feature index
idx = int((angle + np.pi) / (2 * np.pi) * (self.feature_dim - 5)) + 5
idx = min(max(5, idx), self.feature_dim - 1)
# Set feature value based on distance and target's properties
intensity = 1.0 - min(1.0, distance / 100)
visual_input[idx] = intensity
# Add information about target's energy level if visible
if idx + 1 < self.feature_dim:
target_energy = float(other.brain.total_energy) / 1000.0
if not np.isnan(target_energy):
visual_input[idx + 1] = target_energy
return visual_input
except Exception as e:
logging.error(f"Error in _get_visual_input: {e}")
return torch.zeros(self.feature_dim)
def _apply_action_forces(self, actions):
"""Convert neural actions to physical forces with better control"""
try:
if not isinstance(actions, torch.Tensor):
return
# Get first two action dimensions for movement control
if len(actions) >= 2:
# Scale force based on neural network activation
activation = float(self.brain.visual_cortex.state.activation)
force_scale = 20.0 # Increased for more visible movement
# Convert actions to directional movement
force_x = float(actions[0].item()) * force_scale * (1 + abs(activation))
force_y = float(actions[1].item()) * force_scale * (1 + abs(activation))
# Add some randomness for exploration when activation is low
if abs(activation) < 0.2:
force_x += random.uniform(-2.0, 2.0) # Increased random movement
force_y += random.uniform(-2.0, 2.0)
# Clamp forces but allow for stronger movement
max_force = 40.0 # Increased maximum force
force_x = max(-max_force, min(max_force, force_x))
force_y = max(-max_force, min(max_force, force_y))
# Apply the forces
self.apply_force((force_x, force_y))
# Additional actions for other behaviors
if len(actions) >= 4:
try:
# Action 3: Energy usage control
energy_control = float(actions[2].item())
if energy_control > 0.8:
self.brain.total_energy += energy_control * 0.1
# Action 4: Interaction strength
interaction_strength = max(0, min(1, float(actions[3].item())))
if not hasattr(self, 'interaction_strength'):
self.__dict__['interaction_strength'] = interaction_strength
else:
self.interaction_strength = interaction_strength
except Exception as e:
logging.error(f"Error processing additional actions: {e}")
except Exception as e:
logging.error(f"Error in _apply_action_forces: {e}")
def apply_force(self, force):
"""Apply physics force with validation"""
try:
if isinstance(force, (tuple, list)) and len(force) >= 2:
fx = float(force[0])
fy = float(force[1])
# Check for NaN
if np.isnan(fx) or np.isnan(fy):
return
# Limit maximum force
max_force = 10.0
fx = max(-max_force, min(max_force, fx))
fy = max(-max_force, min(max_force, fy))
force = pygame.math.Vector2(fx, fy)
# Validate acceleration before applying
new_acc = force / self.mass
if not (np.isnan(new_acc.x) or np.isnan(new_acc.y)):
self.acc.update(new_acc.x, new_acc.y)
# Clamp acceleration
max_acc = 5.0
self.acc.x = max(-max_acc, min(max_acc, self.acc.x))
self.acc.y = max(-max_acc, min(max_acc, self.acc.y))
except Exception as e:
logging.error(f"Error in apply_force: {e}")
def _features_to_color(self):
"""Convert feature vector to RGB color with validation"""
try:
r = self._validate_color_component((self.features[0].item() + 1) / 2 * 255)
g = self._validate_color_component((self.features[1].item() + 1) / 2 * 255)
b = self._validate_color_component((self.features[2].item() + 1) / 2 * 255)
return (r, g, b)
except (IndexError, AttributeError) as e:
logging.error(f"Error in _features_to_color: {e}. Defaulting to (100, 100, 100).")
return (100, 100, 100)
def _determine_pattern_type(self):
"""Determine pattern type based on specific features"""
try:
# Use features 3 and 4 to determine pattern type safely
if len(self.features) >= 5:
feature_sum = float(self.features[3].item() + self.features[4].item())
if feature_sum > 1:
return 'stripes'
elif feature_sum < -1:
return 'spots'
else:
return 'gradient'
return 'gradient' # Default pattern
except (IndexError, AttributeError, ValueError) as e:
logging.error(f"Error in _determine_pattern_type: {e}. Defaulting to 'gradient'.")
return 'gradient' # Fallback pattern
def _determine_pattern_intensity(self):
"""Determine pattern intensity based on specific features"""
try:
if len(self.features) >= 6:
intensity = (float(self.features[5].item()) + 1) / 2
return max(0.0, min(1.0, intensity))
return 0.5 # Default intensity
except (IndexError, AttributeError, ValueError) as e:
logging.error(f"Error in _determine_pattern_intensity: {e}. Defaulting to 0.5.")
return 0.5 # Fallback intensity
def _generate_shape(self):
"""Generate a polygon shape based on the pattern type"""
try:
points = []
if self.pattern_type == 'stripes':
# Generate a star-like shape with protrusions
for angle in range(0, 360, 30):
rad = np.radians(angle)
x = self.size * np.cos(rad)
y = self.size * np.sin(rad)
# Alternate between outer and inner points for stripes
if (angle // 30) % 2 == 0:
points.append((x * 1.2, y * 1.2))
else:
points.append((x * 0.8, y * 0.8))
elif self.pattern_type == 'spots':
# Generate a more circular, smooth shape with bulges
for angle in range(0, 360, 45):
rad = np.radians(angle)
x = self.size * (1 + 0.3 * np.sin(4 * rad)) * np.cos(rad)
y = self.size * (1 + 0.3 * np.sin(4 * rad)) * np.sin(rad)
points.append((x, y))
else: # 'gradient' or other patterns
# Simple regular polygon
for angle in range(0, 360, 60):
rad = np.radians(angle)
x = self.size * np.cos(rad)
y = self.size * np.sin(rad)
points.append((x, y))
# Validate points and ensure we have at least a triangle
if len(points) < 3:
# Fallback to basic triangle
points = [
(-self.size, -self.size),
(self.size, -self.size),
(0, self.size)
]
return points
except Exception as e:
logging.error(f"Error in _generate_shape: {e}. Defaulting to basic triangle.")
# Fallback to basic triangle if anything goes wrong
return [
(-self.size, -self.size),
(self.size, -self.size),
(0, self.size)
]
def reproduce(self, mate, mutation_rate=0.1):
"""Reproduce with another organism to create a child organism with possible mutations"""
try:
# Check reproduction energy requirements
if not hasattr(self.brain, 'REPRODUCTION_ENERGY'):
self.brain.REPRODUCTION_ENERGY = 150.0 # Default value if not set
if self.brain.total_energy < self.brain.REPRODUCTION_ENERGY or mate.brain.total_energy < mate.brain.REPRODUCTION_ENERGY:
return None
# Deduct energy for reproduction
self.brain.total_energy -= 50.0
mate.brain.total_energy -= 50.0
# Blend features
child_features = (self.features + mate.features) / 2
# Apply mutations
for i in range(len(child_features)):
if random.random() < mutation_rate:
child_features[i] += random.uniform(-0.1, 0.1)
# Clamp mutated features to prevent extreme values
child_features = torch.clamp(child_features, -1.0, 1.0)
# Create child organism
child = FractalOrganism(
x=(self.pos.x + mate.pos.x) / 2 + random.uniform(-10, 10),
y=(self.pos.y + mate.pos.y) / 2 + random.uniform(-10, 10),
size=self.size,
feature_dim=self.feature_dim,
max_neurons=self.brain.max_neurons
)
child.features = child_features
child.color = child._features_to_color()
child.pattern_type = child._determine_pattern_type()
child.pattern_intensity = child._determine_pattern_intensity()
child.shape_points = child._generate_shape()
child.brain = self._mutate_brain(mate.brain, mutation_rate)
return child
except Exception as e:
logging.error(f"Error in reproduction: {e}")
return None
def _mutate_brain(self, brain, mutation_rate):
"""Mutate the brain's neurons"""
try:
# For simplicity, we can randomly add connections or adjust activation
# Here, we'll randomly adjust activation levels
brain.visual_cortex.state.activation += random.uniform(-0.1, 0.1)
brain.thought_processor.state.activation += random.uniform(-0.1, 0.1)
brain.action_generator.state.activation += random.uniform(-0.1, 0.1)
# Ensure activations stay in valid range
brain.visual_cortex.state.activation = max(-1.0, min(1.0, brain.visual_cortex.state.activation))
brain.thought_processor.state.activation = max(-1.0, min(1.0, brain.thought_processor.state.activation))
brain.action_generator.state.activation = max(-1.0, min(1.0, brain.action_generator.state.activation))
return brain
except Exception as e:
logging.error(f"Error in brain mutation: {e}. Returning unmutated brain.")
return brain
def interact_with(self, other):
"""Interact with another organism"""
try:
distance = self.pos.distance_to(other.pos)
if distance < self.size + other.size:
# Neural interaction
interaction_strength = 1.0 - distance / (self.size + other.size)
self.brain.interact_with(other.brain, interaction_strength)
# Physical interaction (simple collision)
direction = (self.pos - other.pos).normalize()
force = direction * interaction_strength * 5
self.apply_force(force)
other.apply_force(-force)
return True
return False
except Exception as e:
logging.error(f"Error in organism interaction: {e}")
return False
def _blend_patterns(self, pattern1: str, pattern2: str) -> str:
"""Blend two pattern types to create a new pattern type"""
try:
if pattern1 == pattern2:
return pattern1
else:
# Simple blending logic: randomly choose one of the parent patterns or a new pattern
return random.choice([pattern1, pattern2, 'stripes', 'spots', 'gradient'])
except Exception as e:
logging.error(f"Error in _blend_patterns: {e}. Defaulting to 'gradient'.")
return 'gradient' # Default pattern if anything goes wrong
# ==============================
# Physics and Interaction Handling
# ==============================
class PhysicsEngine:
def __init__(self, width: int, height: int, config: PhysicsConfig):
self.config = config
# Initialize pymunk space
self.space = pymunk.Space()
self.space.damping = self.config.DAMPING
# Create boundaries
self.create_boundaries(width, height)
# Collision handler for organisms
handler = self.space.add_collision_handler(
self.config.COLLISION_TYPE_ORGANISM,
self.config.COLLISION_TYPE_ORGANISM
)
handler.begin = self.handle_collision
# Track interactions
self.current_interactions: Set[tuple] = set()
# Store dimensions
self.width = width
self.height = height
def update(self, dt: float):
"""Update physics simulation"""
try:
# Pymunk works best with a fixed time step
fixed_dt = 1.0 / 60.0
steps = max(1, min(4, int(dt / fixed_dt))) # Limit max steps to prevent spiral
for _ in range(steps):
self.space.step(fixed_dt)
# Update organism positions from physics bodies
for body in self.space.bodies:
if hasattr(body, 'organism'):
try:
organism = body.organism
# Validate positions
if not (np.isnan(body.position.x) or np.isnan(body.position.y)):
new_x = float(body.position.x % self.width)
new_y = float(body.position.y % self.height)
# Update pygame Vector2 position
organism.pos.update(new_x, new_y)
else:
# Reset to center if NaN
body.position = self.width/2, self.height/2
organism.pos.update(self.width/2, self.height/2)
# Validate velocities
if not (np.isnan(body.velocity.x) or np.isnan(body.velocity.y)):
max_velocity = 200.0
vx = max(-max_velocity, min(max_velocity, body.velocity.x))
vy = max(-max_velocity, min(max_velocity, body.velocity.y))
# Update pygame Vector2 velocity
organism.vel.update(vx, vy)
else:
body.velocity = (0, 0)
organism.vel.update(0, 0)
except Exception as e:
logging.error(f"Error updating organism physics state: {e}")
# Reset body to safe state
body.position = self.width/2, self.height/2
body.velocity = (0, 0)
try:
organism.pos.update(self.width/2, self.height/2)
organism.vel.update(0, 0)
except:
pass
except Exception as e:
logging.error(f"Error updating physics: {e}")
def create_boundaries(self, width: int, height: int):
"""Create screen boundaries"""
try:
walls = [
[(0, 0), (width, 0)], # Top
[(width, 0), (width, height)], # Right
[(width, height), (0, height)], # Bottom
[(0, height), (0, 0)] # Left
]
for wall in walls:
shape = pymunk.Segment(self.space.static_body, wall[0], wall[1], 0)
shape.elasticity = self.config.ELASTICITY
shape.friction = self.config.FRICTION
self.space.add(shape)
except Exception as e:
logging.error(f"Error creating boundaries: {e}")
def add_organism(self, organism: FractalOrganism) -> pymunk.Body:
"""Add organism to physics space"""
try:
# Validate mass
mass = max(0.1, organism.mass) # Ensure positive mass
moment = pymunk.moment_for_circle(mass, 0, organism.size)
body = pymunk.Body(mass, moment)
# Validate initial position and velocity
body.position = (
float(organism.pos.x % self.width),
float(organism.pos.y % self.height)
)
# Clamp initial velocity
max_initial_velocity = 50.0
vel_x = max(-max_initial_velocity, min(max_initial_velocity, organism.vel.x))
vel_y = max(-max_initial_velocity, min(max_initial_velocity, organism.vel.y))
body.velocity = (vel_x, vel_y)
# Validate shape points and create polygon
valid_vertices = []
for x, y in organism.shape_points:
if not (np.isnan(x) or np.isnan(y)):
valid_vertices.append((float(x), float(y)))
# If insufficient valid vertices, create default circle shape
if len(valid_vertices) < 3:
logging.warning(f"Insufficient valid vertices for organism {id(organism)}, using circle shape")
shape = pymunk.Circle(body, organism.size)
else:
shape = pymunk.Poly(body, valid_vertices)
shape.elasticity = self.config.ELASTICITY
shape.friction = self.config.FRICTION
shape.collision_type = self.config.COLLISION_TYPE_ORGANISM
# Store reference to organism
shape.organism = organism
body.organism = organism
self.space.add(body, shape)
return body
except Exception as e:
logging.error(f"Error adding organism to physics: {e}")
return None
def handle_collision(self, arbiter, space, data):
"""Handle collision between organisms"""
try:
# Get colliding organisms
shape_a, shape_b = arbiter.shapes
org_a, org_b = shape_a.organism, shape_b.organism
# Add to interaction set
interaction_pair = tuple(sorted([id(org_a), id(org_b)]))
self.current_interactions.add(interaction_pair)
# Calculate collision response with validation
restitution = max(0, min(1, self.config.ELASTICITY))
j = -(1 + restitution) * arbiter.total_ke / 2
# Validate impulse
if not np.isnan(j):
# Clamp impulse to prevent extreme values
max_impulse = 1000.0
j = max(-max_impulse, min(max_impulse, j))
body_a = shape_a.body
body_b = shape_b.body
normal = arbiter.normal
point = arbiter.contact_point_set.points[0].point_a
# Apply impulse along the collision normal
body_a.apply_impulse_at_world_point(j * normal, point)
body_b.apply_impulse_at_world_point(-j * normal, point)
return True
except Exception as e:
logging.error(f"Error handling collision: {e}")
return False
def process_interactions(self, organisms: List[FractalOrganism]):
"""Process all current interactions"""
try:
# Process collision-based interactions
for org_a_id, org_b_id in self.current_interactions:
org_a = next((org for org in organisms if id(org) == org_a_id), None)
org_b = next((org for org in organisms if id(org) == org_b_id), None)
if org_a and org_b and org_a.alive and org_b.alive:
# Neural interaction
shared_thoughts = org_a.brain.interact_with(org_b.brain)
# Energy transfer based on neural activity
energy_diff = org_a.brain.total_energy - org_b.brain.total_energy
transfer = max(-10.0, min(10.0, energy_diff * 0.1)) # Limit transfer amount
org_a.brain.total_energy = max(0, org_a.brain.total_energy - transfer)
org_b.brain.total_energy = max(0, org_b.brain.total_energy + transfer)
# Clear interactions for next frame
self.current_interactions.clear()
# Process proximity-based interactions
for i, org_a in enumerate(organisms):
if not org_a.alive:
continue
for org_b in organisms[i+1:]:
if not org_b.alive:
continue
# Calculate distance with validation
dx = org_b.pos.x - org_a.pos.x
dy = org_b.pos.y - org_a.pos.y
if np.isnan(dx) or np.isnan(dy):
continue
distance = np.sqrt(dx*dx + dy*dy)
if distance < self.config.INTERACTION_RADIUS:
# Calculate interaction strength based on distance
strength = 1.0 - (distance / self.config.INTERACTION_RADIUS)
# Neural field effect with reduced strength
field_interaction = strength * 0.5
# Calculate force with validation
force_magnitude = field_interaction * self.config.FORCE_SCALE
force_angle = np.arctan2(dy, dx)
if not (np.isnan(force_magnitude) or np.isnan(force_angle)):
force_x = np.cos(force_angle) * force_magnitude
force_y = np.sin(force_angle) * force_magnitude
# Clamp forces
max_force = 100.0
force_x = max(-max_force, min(max_force, force_x))
force_y = max(-max_force, min(max_force, force_y))
# Apply forces through physics bodies
body_a = next((body for body in self.space.bodies
if hasattr(body, 'organism') and body.organism == org_a), None)
body_b = next((body for body in self.space.bodies
if hasattr(body, 'organism') and body.organism == org_b), None)
if body_a and body_b:
body_a.apply_force_at_local_point((-force_x, -force_y), (0, 0))
body_b.apply_force_at_local_point((force_x, force_y), (0, 0))
# Apply direct forces to organisms as well
org_a.apply_force((-force_x, -force_y))
org_b.apply_force((force_x, force_y))
except Exception as e:
logging.error(f"Error processing interactions: {e}")
# ==============================
# Visualization with PyGame
# ==============================
class NeuralVisualizer:
def __init__(self, width: int, height: int, config: VisualizationConfig):
self.width = width
self.height = height
self.config = config
self.neuron_surface = pygame.Surface((width, height), pygame.SRCALPHA)
self.connection_surface = pygame.Surface((width, height), pygame.SRCALPHA)
def _apply_pattern_overlay(self, organism, surface):
"""Apply visual pattern overlay based on organism type"""
try:
if not organism.alive:
return
pattern_alpha = int(255 * organism.pattern_intensity)
pattern_color = (
255 - organism.color[0],
255 - organism.color[1],
255 - organism.color[2],
pattern_alpha
)
# Create pattern surface
pattern_surface = pygame.Surface((self.width, self.height), pygame.SRCALPHA)
if organism.pattern_type == 'stripes':
# Draw alternating stripes
stride = max(5, int(organism.size * 0.5))
x, y = int(organism.pos.x), int(organism.pos.y)
for i in range(-2, 3):
offset = i * stride
pygame.draw.line(pattern_surface, pattern_color,
(x - organism.size, y + offset),
(x + organism.size, y + offset), 2)
elif organism.pattern_type == 'spots':
# Draw spots in a circular pattern
x, y = int(organism.pos.x), int(organism.pos.y)
spot_size = max(2, int(organism.size * 0.2))
for angle in range(0, 360, 45):
spot_x = x + int(np.cos(np.radians(angle)) * organism.size * 0.7)
spot_y = y + int(np.sin(np.radians(angle)) * organism.size * 0.7)
pygame.draw.circle(pattern_surface, pattern_color,
(spot_x, spot_y), spot_size)
else: # gradient
# Draw radial gradient
x, y = int(organism.pos.x), int(organism.pos.y)
max_radius = int(organism.size * 1.2)
for radius in range(max_radius, 0, -2):
alpha = int((radius / max_radius) * pattern_alpha)
current_color = (*pattern_color[:3], alpha)
pygame.draw.circle(pattern_surface, current_color,
(x, y), radius, 1)
# Blend pattern with surface
surface.blit(pattern_surface, (0, 0), special_flags=pygame.BLEND_ALPHA_SDL2)
except Exception as e:
logging.error(f"Error applying pattern overlay: {e}")
def draw_brain_state(self, organism: FractalOrganism, surface: pygame.Surface):
"""Draw neural activity visualization with patterns and NaN handling"""
try:
# Clear previous state
self.neuron_surface.fill((0, 0, 0, 0))
self.connection_surface.fill((0, 0, 0, 0))
# Get brain vitals with NaN check
vitals = organism.brain.get_vitals()
if any(np.isnan(value) for value in vitals.values() if isinstance(value, (int, float))):
logging.warning(f"NaN detected in vitals for organism {id(organism)}. Marking for death.")
organism.alive = False
return
# Calculate neural positions based on fractal pattern
def plot_neural_layer(center, radius, neurons, depth=0):
if depth >= 3 or not neurons:
return
# Convert neurons to list if it's a ModuleList
if hasattr(neurons, 'sub_neurons'):
neurons = neurons.sub_neurons
if not neurons or len(neurons) == 0:
return
angle_step = 2 * np.pi / len(neurons)
for i, neuron in enumerate(neurons):
try:
angle = i * angle_step
x = center[0] + radius * np.cos(angle)
y = center[1] + radius * np.sin(angle)
# NaN check for coordinates
if np.isnan(x) or np.isnan(y):
logging.warning(f"NaN coordinates detected for neuron {i}. Skipping.")
continue
# Draw neuron
activation = float(neuron.state.activation)
connections = int(neuron.state.connections)
# Check for NaNs in neuron state
if np.isnan(activation) or np.isnan(connections):
logging.warning(f"NaN detected in neuron state. Marking organism for death.")
organism.alive = False
return
# Ensure coordinates are valid integers
x_pos = int(np.clip(x, 0, self.width))
y_pos = int(np.clip(y, 0, self.height))
color = self._get_neuron_color(activation, connections)
pygame.draw.circle(self.neuron_surface, color, (x_pos, y_pos), 5)
# Draw connections with safety checks
if connections > 0:
alpha = int(255 * min(connections / self.config.MAX_NEURAL_CONNECTIONS, 1))
if not np.isnan(alpha):
connection_color = (*self.config.CONNECTION_COLOR[:3], alpha)
pygame.draw.line(
self.connection_surface,
connection_color,
(x_pos, y_pos),
(int(center[0]), int(center[1])),
2
)
# Recursively draw sub-neurons
if hasattr(neuron, 'sub_neurons') and neuron.sub_neurons:
child_radius = radius * 0.5
child_center = (x, y)
plot_neural_layer(child_center, child_radius, neuron.sub_neurons, depth + 1)
except Exception as e:
logging.error(f"Error plotting neuron {i}: {e}")
continue
# Draw neural network
try:
center = (organism.pos.x, organism.pos.y)
if not (np.isnan(center[0]) or np.isnan(center[1])):
plot_neural_layer(center, organism.size * 2,
[organism.brain.visual_cortex,
organism.brain.thought_processor,
organism.brain.action_generator])
except Exception as e:
logging.error(f"Error plotting neural network: {e}")
# Apply pattern overlay with safety checks
try:
self._apply_pattern_overlay(organism, surface)
except Exception as e:
logging.error(f"Error applying pattern overlay: {e}")
# Combine surfaces with alpha blending
surface.blit(self.connection_surface, (0, 0))
surface.blit(self.neuron_surface, (0, 0))
except Exception as e:
logging.error(f"Error in draw_brain_state: {e}")
def _get_neuron_color(self, activation: float, connections: int) -> Tuple[int, int, int]:
"""Generate color based on neuron state"""
try:
# Use HSV color space for smooth transitions
hue = (activation + 1) / 2 # Map -1,1 to 0,1
saturation = min(connections / self.config.MAX_NEURAL_CONNECTIONS, 1)
value = 0.8 + 0.2 * activation
# Convert to RGB
rgb = colorsys.hsv_to_rgb(hue, saturation, value)
return tuple(int(255 * x) for x in rgb)
except Exception as e:
logging.error(f"Error in _get_neuron_color: {e}. Defaulting to gray.")
return (100, 100, 100)
class SimulationVisualizer:
def __init__(self, width: int, height: int, config: VisualizationConfig):
pygame.init()
self.width = width
self.height = height
self.config = config
# Enable double buffering and vsync
self.screen = pygame.display.set_mode(
(width, height),
pygame.DOUBLEBUF | pygame.HWSURFACE | pygame.SCALED,
vsync=1
)
pygame.display.set_caption("Fractal Life Simulation")
# Create off-screen surfaces for double buffering
self.buffer = pygame.Surface((width, height), pygame.SRCALPHA)
self.neural_viz = NeuralVisualizer(width, height, config)
self.background = pygame.Surface((width, height))
self.background.fill(config.BACKGROUND_COLOR)
# Additional surfaces for layered rendering
self.organism_surface = pygame.Surface((width, height), pygame.SRCALPHA)
self.interaction_surface = pygame.Surface((width, height), pygame.SRCALPHA)
self.stats_surface = pygame.Surface((width, height), pygame.SRCALPHA)
def _validate_color(self, color):
"""Validate and ensure color values are proper RGB integers"""
try:
if len(color) >= 3:
return (
max(0, min(255, int(color[0]))),
max(0, min(255, int(color[1]))),
max(0, min(255, int(color[2])))
)
return (100, 100, 100) # Default fallback color
except Exception as e:
logging.error(f"Error validating color: {e}")
return (100, 100, 100)
def draw_frame(self, organisms: List[FractalOrganism], stats: Dict):
"""Draw complete frame with all visualizations"""
try:
# Clear all off-screen surfaces
self.organism_surface.fill((0, 0, 0, 0))
self.interaction_surface.fill((0, 0, 0, 0))
self.stats_surface.fill((0, 0, 0, 0))
# Draw organisms and their neural states
for organism in organisms:
if organism.alive:
self._draw_organism(organism, self.organism_surface)
self.neural_viz.draw_brain_state(organism, self.interaction_surface)
# Draw statistics
self._draw_stats(stats, self.stats_surface)
# Blit off-screen surfaces to the main display surface
self.screen.blit(self.background, (0, 0)) # Background layer
self.screen.blit(self.organism_surface, (0, 0))
self.screen.blit(self.interaction_surface, (0, 0))
self.screen.blit(self.stats_surface, (0, 0))
# Flip display buffers to avoid flickering
pygame.display.flip()
except Exception as e:
logging.error(f"Error in draw_frame: {e}")
def _draw_organism(self, organism: FractalOrganism, surface: pygame.Surface):
"""Draw organism body with color and pattern"""
try:
# Validate color before drawing
safe_color = self._validate_color(organism.color)
# Draw shape with validated color
points = []
for x, y in organism.shape_points:
try:
px = organism.pos.x + x
py = organism.pos.y + y
if not (np.isnan(px) or np.isnan(py)):
points.append((px, py))
else:
logging.warning(f"NaN detected in shape points for organism {id(organism)}. Skipping point.")
except Exception as e:
logging.error(f"Error processing shape points for organism {id(organism)}: {e}")
continue
if len(points) >= 3:
pygame.draw.polygon(surface, safe_color, points)
else:
logging.warning(f"Insufficient valid points to draw organism {id(organism)}. Skipping drawing.")
# Draw energy bar
energy_percentage = min(max(organism.brain.total_energy / 1000.0, 0), 1)
bar_width = organism.size * 2
bar_height = 4
bar_pos = (organism.pos.x - bar_width / 2, organism.pos.y - organism.size - 10)
pygame.draw.rect(surface, (50, 50, 50), (*bar_pos, bar_width, bar_height))
pygame.draw.rect(surface, (0, 255, 0), # Using direct color value for energy bar
(*bar_pos, bar_width * energy_percentage, bar_height))
except Exception as e:
logging.error(f"Error in _draw_organism: {e}")
def _draw_stats(self, stats: Dict, surface: pygame.Surface):
"""Draw simulation statistics"""
try:
font = pygame.font.Font(None, 24)
y_pos = 10
for key, value in stats.items():
text = font.render(f"{key.capitalize()}: {value}", True, (255, 255, 255))
surface.blit(text, (10, y_pos))
y_pos += 25
except Exception as e:
logging.error(f"Error in _draw_stats: {e}")
def cleanup(self):
"""Clean up pygame resources"""
try:
pygame.quit()
except Exception as e:
logging.error(f"Error during pygame cleanup: {e}")
# ==============================
# Interaction Field (Optional Enhancement)
# ==============================
class InteractionField:
def __init__(self, width: int, height: int, resolution: int = 50):
self.width = max(1, width)
self.height = max(1, height)
self.resolution = max(10, min(resolution, 100)) # Bound resolution
# Create field grid with safe dimensions
self.grid_w = max(1, self.width // self.resolution)
self.grid_h = max(1, self.height // self.resolution)
self.field = np.zeros((self.grid_w, self.grid_h, 3))
def update(self, organisms: List[FractalOrganism]):
"""Update field based on organism neural activity with safety checks"""
try:
# Decay field
self.field *= 0.9
np.clip(self.field, 0, 1, out=self.field)
for org in organisms:
if not org.alive:
continue
try:
# Safe position to grid conversion
pos_x = max(0, min(float(org.pos.x), self.width))
pos_y = max(0, min(float(org.pos.y), self.height))
grid_x = int((pos_x / self.width) * (self.grid_w - 1))
grid_y = int((pos_y / self.height) * (self.grid_h - 1))
# Get neural activity with safety checks
vitals = org.brain.get_vitals()
activity_color = np.array([
max(0.0, min(1.0, float(vitals['activation']))),
max(0.0, min(1.0, float(vitals['energy']) / 1000.0)),
max(0.0, min(1.0, float(vitals['connections']) / 100.0))
])
# Apply to field with falloff
for dx in range(-2, 3):
for dy in range(-2, 3):
x = (grid_x + dx) % self.grid_w
y = (grid_y + dy) % self.grid_h
distance = np.sqrt(dx * dx + dy * dy)
if distance < 3:
intensity = max(0.0, min(1.0, (3 - distance) / 3))
self.field[x, y] += activity_color * intensity
except (ValueError, TypeError, ZeroDivisionError) as e:
logging.error(f"Error updating field for organism {id(org)}: {e}")
continue # Skip problematic organisms
# Ensure field values stay in valid range
np.clip(self.field, 0, 1, out=self.field)
except Exception as e:
logging.error(f"Error in InteractionField.update: {e}")
def get_field_at(self, x: float, y: float) -> np.ndarray:
"""Safely get field value at position"""
try:
x = max(0, min(float(x), self.width))
y = max(0, min(float(y), self.height))
grid_x = int((x / self.width) * (self.grid_w - 1))
grid_y = int((y / self.height) * (self.grid_h - 1))
return self.field[grid_x, grid_y]
except (ValueError, TypeError, IndexError) as e:
logging.error(f"Error in get_field_at for position ({x}, {y}): {e}")
return np.zeros(3)
# ==============================
# Simulation State Tracking
# ==============================
class SimulationState:
"""Tracks the current state of the simulation"""
def __init__(self):
self.running = False
self.paused = False
self.step_count = 0
self.stats = {
'population': 0,
'avg_energy': 0.0,
'avg_neurons': 0.0,
'avg_connections': 0.0,
'total_interactions': 0
}
self.selected_organism: Optional[FractalOrganism] = None
# ==============================
# Main Simulation Class
# ==============================
class FractalLifeSimulation:
def __init__(self, config: SimulationConfig, shared_data: Dict):
self.config = config
self.state = SimulationState()
# Shared data for Gradio interface
self.shared_data = shared_data
self.shared_lock = threading.Lock()
# Initialize systems
visualization_config = VisualizationConfig()
self.visualizer = SimulationVisualizer(config.WIDTH, config.HEIGHT, visualization_config)
self.physics = PhysicsEngine(config.WIDTH, config.HEIGHT, PhysicsConfig())
self.field = InteractionField(config.WIDTH, config.HEIGHT, resolution=50)
# Event queues for thread communication
self.event_queue = Queue()
self.stats_queue = Queue()
# Initialize organisms
self.organisms: List[FractalOrganism] = []
self._init_organisms()
# Pygame threading control
self.running = False
self.thread = None
def _init_organisms(self):
"""Initialize starting organisms"""
try:
for _ in range(self.config.MIN_ORGANISMS):
x = random.uniform(0, self.config.WIDTH)
y = random.uniform(0, self.config.HEIGHT)
organism = FractalOrganism(
x=x, y=y,
feature_dim=32,
max_neurons=self.config.MAX_NEURONS
)
self.organisms.append(organism)
self.physics.add_organism(organism)
except Exception as e:
logging.error(f"Error initializing organisms: {e}")
def _process_reproduction(self):
"""Handle organism reproduction"""
try:
new_organisms = []
for org in self.organisms:
if not org.alive or len(self.organisms) + len(new_organisms) >= self.config.MAX_ORGANISMS:
continue
if org.brain.total_energy > self.config.REPRODUCTION_ENERGY:
# Find a mate (simple random selection for demonstration)
potential_mates = [o for o in self.organisms if o != org and o.alive and o.brain.total_energy > self.config.REPRODUCTION_ENERGY]
if potential_mates:
mate = random.choice(potential_mates)
child = org.reproduce(mate, mutation_rate=self.config.MUTATION_RATE)
if child:
new_organisms.append(child)
self.physics.add_organism(child)
self.organisms.extend(new_organisms)
except Exception as e:
logging.error(f"Error processing reproduction: {e}")
def _update_stats(self):
"""Update simulation statistics"""
try:
living_organisms = [org for org in self.organisms if org.alive]
population = len(living_organisms)
if population > 0:
self.state.stats.update({
'population': population,
'avg_energy': sum(org.brain.total_energy for org in living_organisms) / population,
'avg_neurons': sum(org.brain.total_neurons for org in living_organisms) / population,
'avg_connections': sum(sum(n.state.connections for n in [org.brain.visual_cortex,
org.brain.thought_processor,
org.brain.action_generator])
for org in living_organisms) / population,
'total_interactions': len(self.physics.current_interactions)
})
else:
self.state.stats.update({
'population': 0,
'avg_energy': 0.0,
'avg_neurons': 0.0,
'avg_connections': 0.0,
'total_interactions': 0
})
with self.shared_lock:
self.shared_data['stats'] = self.state.stats.copy()
except Exception as e:
logging.error(f"Error updating statistics: {e}")
def _main_loop(self):
"""Main simulation loop"""
try:
clock = pygame.time.Clock()
while self.running:
dt = clock.tick(self.config.TARGET_FPS) / 1000.0 # Delta time in seconds
for event in pygame.event.get():
if event.type == pygame.QUIT:
self.stop()
# Process external events
while not self.event_queue.empty():
event = self.event_queue.get()
self._handle_event(event)
if not self.state.paused:
# Update physics
self.physics.update(dt)
# Update neural field
self.field.update(self.organisms)
# Update organisms
for org in self.organisms:
if org.alive:
# Update organism state
org.update(self.config.WIDTH, self.config.HEIGHT, self.organisms)
# Process field interactions (placeholder for actual implementation)
field_value = self.field.get_field_at(org.pos.x, org.pos.y)
# org.process_field_input(field_value) # Implement if needed
# Energy decay
org.brain.total_energy = max(0.0, org.brain.total_energy - self.config.ENERGY_DECAY)
# Process physical interactions
self.physics.process_interactions(self.organisms)
# Handle reproduction
self._process_reproduction()
# Remove dead organisms
self.organisms = [org for org in self.organisms if org.alive]
# Maintain minimum population
while len(self.organisms) < self.config.MIN_ORGANISMS and len(self.organisms) < self.config.MAX_ORGANISMS:
x = random.uniform(0, self.config.WIDTH)
y = random.uniform(0, self.config.HEIGHT)
organism = FractalOrganism(
x=x, y=y,
feature_dim=32,
max_neurons=self.config.MAX_NEURONS
)
self.organisms.append(organism)
self.physics.add_organism(organism)
# Update statistics
self._update_stats()
# Draw frame
self.visualizer.draw_frame(self.organisms, self.state.stats)
except Exception as e:
logging.error(f"Exception in main loop: {e}")
finally:
# Cleanup when simulation stops
self.visualizer.cleanup()
def start(self):
"""Start simulation in separate thread"""
if not self.running:
self.running = True
self.thread = threading.Thread(target=self._main_loop)
self.thread.start()
logging.info("Simulation started.")
def stop(self):
"""Stop simulation"""
self.running = False
if self.thread and self.thread.is_alive():
self.thread.join()
logging.info("Simulation stopped.")
def pause(self):
"""Pause/unpause simulation"""
self.state.paused = not self.state.paused
logging.info(f"Simulation {'paused' if self.state.paused else 'resumed'}.")
def _handle_event(self, event: Dict):
"""Handle external events"""
try:
if event['type'] == 'select_organism':
organism_id = event['organism_id']
self.state.selected_organism = next(
(org for org in self.organisms if id(org) == organism_id),
None
)
logging.info(f"Organism {organism_id} selected.")
elif event['type'] == 'add_energy':
if self.state.selected_organism:
self.state.selected_organism.brain.total_energy += event['amount']
logging.info(f"Added {event['amount']} energy to organism {id(self.state.selected_organism)}.")
elif event['type'] == 'modify_neurons':
if self.state.selected_organism:
# Placeholder for neuron modification logic
logging.info(f"Modify neurons event received for organism {id(self.state.selected_organism)}.")
except Exception as e:
logging.error(f"Error handling event {event}: {e}")
# ==============================
# Gradio Interface
# ==============================
class FractalLifeInterface:
def __init__(self):
self.simulation: Optional[FractalLifeSimulation] = None
self.history = {
'population': [],
'avg_energy': [],
'avg_neurons': [],
'time': []
}
self.selected_organism_id = None
self.frame_image = None
self.DEFAULT_WIDTH = 1024
self.DEFAULT_HEIGHT = 768
self.shared_data = {
'stats': {
'population': 0,
'avg_energy': 0.0,
'avg_neurons': 0.0,
'avg_connections': 0.0,
'total_interactions': 0
}
}
def get_frame(self):
"""Retrieve the latest frame from Pygame."""
try:
if self.simulation and self.simulation.running:
with self.simulation.shared_lock:
# Get pygame surface
screen = self.simulation.visualizer.screen
# Get the size of the screen
width = screen.get_width()
height = screen.get_height()
# Create a new surface with alpha channel
surf_alpha = pygame.Surface((width, height), pygame.SRCALPHA)
surf_alpha.blit(screen, (0, 0))
# Convert Pygame surface to PIL Image
data_string = pygame.image.tostring(surf_alpha, 'RGBA')
image = Image.frombytes('RGBA', (width, height), data_string)
# Convert to RGB
image = image.convert('RGB')
return image
else:
# Return a blank image
return Image.new('RGB', (self.DEFAULT_WIDTH, self.DEFAULT_HEIGHT), (0, 0, 0))
except Exception as e:
logging.error(f"Error in get_frame: {e}")
# Return a fallback image in case of error
return Image.new('RGB', (self.DEFAULT_WIDTH, self.DEFAULT_HEIGHT), (0, 0, 0))
def update_display(self):
"""Update display by retrieving the latest frame and statistics."""
try:
# Get the frame as PIL Image
image = self.get_frame()
# Update statistics only if simulation is running
if self.simulation and self.simulation.running:
with self.simulation.shared_lock:
stats = self.shared_data['stats']
# Update history
self.history['time'].append(self.simulation.state.step_count)
self.history['population'].append(stats['population'])
self.history['avg_energy'].append(stats['avg_energy'])
self.history['avg_neurons'].append(stats['avg_neurons'])
# Limit history length to prevent memory issues
max_history = 1000
if len(self.history['time']) > max_history:
for key in self.history:
self.history[key] = self.history[key][-max_history:]
else:
stats = self.shared_data['stats']
# Update plots
stats_fig = self._create_stats_plot()
neural_fig = self._create_neural_plot()
# Update organism list only if simulation is running
organism_list = []
if self.simulation and self.simulation.running:
organism_list = [
(f"Organism {id(org)}", id(org))
for org in self.simulation.organisms
if org.alive
]
# Update selected organism vitals
vitals = None
if self.selected_organism_id and self.simulation and self.simulation.running:
org = next(
(org for org in self.simulation.organisms
if id(org) == self.selected_organism_id),
None
)
if org:
vitals = {
'energy': org.brain.total_energy,
'neurons': org.brain.total_neurons,
'age': org.age,
'connections': sum(n.state.connections for n in
[org.brain.visual_cortex,
org.brain.thought_processor,
org.brain.action_generator])
}
# Create new Dropdown choices instead of using update
dropdown = gr.Dropdown(
choices=organism_list,
label="Select Organism",
interactive=True
)
return [
image, # Return PIL Image directly
stats_fig,
neural_fig,
dropdown,
vitals
]
except Exception as e:
logging.error(f"Error in update_display: {e}")
# Return default values in case of error
blank_image = Image.new('RGB', (self.DEFAULT_WIDTH, self.DEFAULT_HEIGHT), (0, 0, 0))
empty_fig = go.Figure()
empty_dropdown = gr.Dropdown(choices=[])
return [blank_image, empty_fig, empty_fig, empty_dropdown, None]
def create_interface(self):
with gr.Blocks(title="Fractal Life Simulator") as interface:
gr.Markdown("# 🧬 Fractal Life Simulator")
with gr.Row():
# Main simulation view and controls
with gr.Column(scale=2):
canvas = gr.Image(label="Simulation View", interactive=False)
with gr.Row():
start_btn = gr.Button("Start Simulation", variant="primary")
pause_btn = gr.Button("Pause")
stop_btn = gr.Button("Stop")
with gr.Row():
population_slider = gr.Slider(
minimum=5, maximum=100, # Increased maximum for flexibility
value=10, step=1,
label="Initial Population"
)
mutation_rate = gr.Slider(
minimum=0, maximum=1, value=0.1, step=0.01,
label="Mutation Rate"
)
max_population_slider = gr.Slider(
minimum=50, maximum=500, value=50, step=10,
label="Max Population"
)
# Statistics and organism details
with gr.Column(scale=1):
stats_plot = gr.Plot(label="Population Statistics")
neural_plot = gr.Plot(label="Neural Activity")
with gr.Group():
gr.Markdown("### Selected Organism")
organism_dropdown = gr.Dropdown(
label="Select Organism",
choices=[],
interactive=True
)
vitals_json = gr.JSON(label="Organism Vitals")
with gr.Row():
add_energy_btn = gr.Button("Add Energy")
add_neurons_btn = gr.Button("Add Neurons")
# Advanced settings tab
with gr.Tab("Advanced Settings"):
with gr.Row():
with gr.Column():
brain_update_rate = gr.Slider(
minimum=1, maximum=30, value=10, step=1,
label="Brain Update Rate (Hz)"
)
max_neurons = gr.Slider(
minimum=100, maximum=5000, value=1000, step=100,
label="Max Neurons per Brain"
)
energy_decay = gr.Slider(
minimum=0, maximum=1, value=0.1, step=0.01,
label="Energy Decay Rate"
)
with gr.Column():
interaction_strength = gr.Slider(
minimum=0, maximum=1, value=0.5, step=0.01,
label="Interaction Strength"
)
field_resolution = gr.Slider(
minimum=10, maximum=100, value=50, step=5,
label="Field Resolution"
)
# Event handlers
def start_simulation(initial_population, mutation_rate_val, max_population_val,
brain_update_rate_val, max_neurons_val, energy_decay_val,
interaction_strength_val, field_resolution_val):
try:
if self.simulation is None:
config = SimulationConfig()
config.MIN_ORGANISMS = int(initial_population)
config.MUTATION_RATE = mutation_rate_val
config.MAX_ORGANISMS = int(max_population_val)
config.BRAIN_UPDATE_RATE = int(brain_update_rate_val)
config.MAX_NEURONS = int(max_neurons_val)
config.ENERGY_DECAY = energy_decay_val
self.simulation = FractalLifeSimulation(config, self.shared_data)
self.simulation.start()
logging.info("Simulation started via interface.")
return "Simulation started"
else:
logging.warning("Simulation is already running.")
return "Simulation is already running."
except Exception as e:
logging.error(f"Error starting simulation: {e}")
return "Failed to start simulation."
def pause_simulation():
try:
if self.simulation:
self.simulation.pause()
status = "paused" if self.simulation.state.paused else "resumed"
logging.info(f"Simulation {status} via interface.")
return f"Simulation {status}"
logging.warning("No simulation running to pause/resume.")
return "No simulation running"
except Exception as e:
logging.error(f"Error pausing simulation: {e}")
return "Failed to pause simulation."
def stop_simulation():
try:
if self.simulation:
self.simulation.stop()
self.simulation = None
logging.info("Simulation stopped via interface.")
return "Simulation stopped"
logging.warning("No simulation running to stop.")
return "No simulation running"
except Exception as e:
logging.error(f"Error stopping simulation: {e}")
return "Failed to stop simulation."
def select_organism(organism_id):
try:
self.selected_organism_id = organism_id
if self.simulation:
self.simulation.event_queue.put({
'type': 'select_organism',
'organism_id': organism_id
})
logging.info(f"Organism {organism_id} selected via interface.")
except Exception as e:
logging.error(f"Error selecting organism: {e}")
def add_energy_to_organism():
try:
if self.simulation and self.selected_organism_id:
self.simulation.event_queue.put({
'type': 'add_energy',
'amount': 50.0
})
logging.info(f"Added energy to organism {self.selected_organism_id} via interface.")
return "Added energy to selected organism"
logging.warning("No organism selected or simulation not running to add energy.")
return "No organism selected or simulation not running"
except Exception as e:
logging.error(f"Error adding energy to organism: {e}")
return "Failed to add energy"
def add_neurons_to_organism():
try:
if self.simulation and self.selected_organism_id:
self.simulation.event_queue.put({
'type': 'modify_neurons',
'amount': 10
})
logging.info(f"Added neurons to organism {self.selected_organism_id} via interface.")
return "Added neurons to selected organism"
logging.warning("No organism selected or simulation not running to add neurons.")
return "No organism selected or simulation not running"
except Exception as e:
logging.error(f"Error adding neurons to organism: {e}")
return "Failed to add neurons"
# Create a bound method for update_display
def bound_update_display():
return self.update_display()
# Connect events
start_btn.click(
start_simulation,
inputs=[population_slider, mutation_rate, max_population_slider,
brain_update_rate, max_neurons, energy_decay,
interaction_strength, field_resolution],
outputs=gr.Textbox()
)
pause_btn.click(pause_simulation, outputs=gr.Textbox())
stop_btn.click(stop_simulation, outputs=gr.Textbox())
organism_dropdown.change(select_organism, inputs=[organism_dropdown], outputs=None)
add_energy_btn.click(add_energy_to_organism, outputs=gr.Textbox())
add_neurons_btn.click(add_neurons_to_organism, outputs=gr.Textbox())
# Periodic update using Gradio's update mechanism
interface.load(
fn=bound_update_display,
inputs=[],
outputs=[canvas, stats_plot, neural_plot, organism_dropdown, vitals_json],
every=1/30, # 30 FPS updates
queue=True
)
return interface
def _create_stats_plot(self):
"""Create statistics plot using plotly"""
try:
fig = make_subplots(rows=3, cols=1, shared_xaxes=True,
subplot_titles=("Population", "Average Energy", "Average Neurons"))
fig.add_trace(
go.Scatter(x=self.history['time'], y=self.history['population'],
name="Population"),
row=1, col=1
)
fig.add_trace(
go.Scatter(x=self.history['time'], y=self.history['avg_energy'],
name="Avg Energy"),
row=2, col=1
)
fig.add_trace(
go.Scatter(x=self.history['time'], y=self.history['avg_neurons'],
name="Avg Neurons"),
row=3, col=1
)
fig.update_layout(height=600, showlegend=True)
return fig
except Exception as e:
logging.error(f"Error creating stats plot: {e}")
return go.Figure()
def _create_neural_plot(self):
"""Create neural activity plot"""
try:
if self.selected_organism_id and self.simulation and self.simulation.running:
org = next(
(org for org in self.simulation.organisms
if id(org) == self.selected_organism_id),
None
)
if org:
# Create neural activity heatmap
neurons = [org.brain.visual_cortex,
org.brain.thought_processor,
org.brain.action_generator]
activities = []
for neuron in neurons:
layer_activities = [child.state.activation for child in neuron.sub_neurons] if hasattr(neuron, 'sub_neurons') and neuron.sub_neurons else [neuron.state.activation]
activities.append(layer_activities)
activities = np.array(activities)
fig = go.Figure(data=go.Heatmap(z=activities, colorscale='Viridis'))
fig.update_layout(
title="Neural Activity",
xaxis_title="Neuron Index",
yaxis_title="Layer"
)
return fig
return go.Figure()
except Exception as e:
logging.error(f"Error creating neural plot: {e}")
return go.Figure()
# ==============================
# Entry Point
# ==============================
if __name__ == "__main__":
interface = FractalLifeInterface()
gr_interface = interface.create_interface()
# Enable queue and allow for public access
gr_interface.queue().launch(
server_name="0.0.0.0",
server_port=7860,
share=True # This creates a public link
)