Spaces:
Runtime error
Runtime error
import torch | |
import torch.nn as nn | |
import torch.nn.functional as F # Added for activation functions | |
import numpy as np | |
from dataclasses import dataclass, field | |
import pygame | |
import gradio as gr | |
from typing import List, Tuple, Dict, Optional, Set | |
import random | |
import colorsys | |
import pymunk | |
import time | |
import threading | |
from queue import Queue | |
import plotly.graph_objects as go | |
from plotly.subplots import make_subplots | |
from PIL import Image | |
import io | |
import logging # Added for logging | |
# ============================== | |
# Logging Configuration | |
# ============================== | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s [%(levelname)s] %(message)s', | |
handlers=[ | |
logging.FileHandler("simulation.log"), | |
logging.StreamHandler() | |
] | |
) | |
# ============================== | |
# Configuration Dataclasses | |
# ============================== | |
class SimulationConfig: | |
WIDTH: int = 1024 | |
HEIGHT: int = 768 | |
TARGET_FPS: int = 60 | |
MIN_ORGANISMS: int = 5 | |
MAX_ORGANISMS: int = 50 # Population cap set to 50 | |
MUTATION_RATE: float = 0.1 | |
REPRODUCTION_ENERGY: float = 150.0 | |
INITIAL_ENERGY: float = 100.0 | |
BRAIN_UPDATE_RATE: int = 10 # Hz | |
MAX_NEURONS: int = 1000 | |
ENERGY_DECAY: float = 0.1 | |
class NeuronState: | |
activation: float = 0.0 | |
connections: int = 0 | |
energy: float = 100.0 | |
memory: List[float] = field(default_factory=lambda: [0.0] * 8) | |
class VisualizationConfig: | |
BACKGROUND_COLOR: Tuple[int, int, int] = (10, 10, 30) | |
NEURON_COLORS: Dict[str, Tuple[int, int, int]] = field(default_factory=lambda: { | |
'active': (255, 255, 0), | |
'inactive': (100, 100, 100), | |
'connected': (0, 255, 255) | |
}) | |
CONNECTION_COLOR: Tuple[int, int, int, int] = (50, 50, 200, 100) | |
ENERGY_COLOR: Tuple[int, int, int] = (0, 255, 0) | |
MAX_NEURAL_CONNECTIONS: int = 50 | |
class PhysicsConfig: | |
COLLISION_TYPE_ORGANISM: int = 1 | |
ELASTICITY: float = 0.7 | |
FRICTION: float = 0.5 | |
DAMPING: float = 0.9 | |
INTERACTION_RADIUS: float = 50.0 | |
FORCE_SCALE: float = 100.0 | |
# ============================== | |
# Neural Processing System | |
# ============================== | |
class FractalNeuron(nn.Module): | |
def __init__(self, input_dim=16, output_dim=16, depth=0, max_depth=2): | |
super().__init__() | |
self.depth = depth | |
self.max_depth = max_depth | |
# Store dimensions | |
self.input_dim = input_dim | |
self.output_dim = output_dim | |
self.hidden_dim = max(input_dim // 2, 8) # Add explicit hidden_dim | |
# Enhanced neural processing layers with LeakyReLU | |
self.synapse = nn.Sequential( | |
nn.Linear(input_dim, self.hidden_dim), # First layer: input_dim to hidden_dim | |
nn.LeakyReLU(negative_slope=0.1, inplace=True), | |
nn.Linear(self.hidden_dim, output_dim), # Second layer: hidden_dim to output_dim | |
nn.Tanh() | |
) | |
# Initialize weights using Xavier uniform initialization | |
for layer in self.synapse: | |
if isinstance(layer, nn.Linear): | |
nn.init.xavier_uniform_(layer.weight) | |
if layer.bias is not None: | |
nn.init.constant_(layer.bias, 0.0) | |
# Set to eval mode to prevent BatchNorm issues | |
self.eval() | |
# State maintenance with bounded values | |
self.state = NeuronState() | |
self.state.activation = 0.0 | |
self.state.energy = min(100.0, max(0.0, self.state.energy)) | |
# Memory processing with correct dimensions | |
self.memory_gate = nn.Sequential( | |
nn.Linear(output_dim + 8, 8), | |
nn.Sigmoid() | |
) | |
# Initialize memory_gate weights | |
for layer in self.memory_gate: | |
if isinstance(layer, nn.Linear): | |
nn.init.xavier_uniform_(layer.weight) | |
if layer.bias is not None: | |
nn.init.constant_(layer.bias, 0.0) | |
# Child neurons with matching dimensions | |
self.sub_neurons = nn.ModuleList([]) | |
if depth < max_depth: | |
branching_factor = max(1, 2 - depth) | |
for _ in range(branching_factor): | |
child = FractalNeuron( | |
input_dim=output_dim, # Child's input_dim matches parent's output_dim | |
output_dim=output_dim, # Keep output_dim consistent | |
depth=depth + 1, | |
max_depth=max_depth | |
) | |
self.sub_neurons.append(child) | |
def forward(self, x): | |
"""Forward pass for PyTorch module compatibility""" | |
return self.process_signal(x) | |
def process_signal(self, x, external_input=None): | |
"""Process input signal through the neuron""" | |
try: | |
with torch.no_grad(): | |
# Ensure we're in eval mode | |
self.eval() | |
# Reshape input for processing | |
if len(x.shape) == 1: | |
x = x.unsqueeze(0) # Add batch dimension | |
# Check for NaNs in input | |
if torch.isnan(x).any(): | |
logging.warning("NaN detected in input tensor. Returning zero tensor.") | |
return torch.zeros(self.output_dim) | |
# Add external input if provided | |
if external_input is not None: | |
if len(external_input.shape) == 1: | |
external_input = external_input.unsqueeze(0) | |
x = torch.cat([x, external_input], dim=-1) | |
# Process through synapse with proper shapes | |
x = x.to(torch.float32) # Ensure float32 dtype | |
try: | |
x = self.synapse(x) | |
except RuntimeError as e: | |
logging.error(f"Error in synapse processing: {e}") | |
return torch.zeros(self.output_dim) | |
# Update memory with bounds checking | |
try: | |
memory_tensor = torch.tensor(self.state.memory).to(torch.float32) | |
if len(x.shape) == 1: | |
x_for_memory = x.unsqueeze(0) | |
else: | |
x_for_memory = x | |
memory_input = torch.cat([x_for_memory, memory_tensor.unsqueeze(0)], dim=-1) | |
new_memory = self.memory_gate(memory_input) | |
new_memory = torch.clamp(new_memory, 0.0, 1.0) | |
if not torch.isnan(new_memory).any(): | |
self.state.memory = new_memory[0].tolist() | |
except Exception as e: | |
logging.error(f"Error updating memory: {e}") | |
# Update activation with bounded value | |
activation = float(torch.clamp(x.mean(), -1.0, 1.0)) | |
if not np.isnan(activation): | |
self.state.activation = activation | |
# Process through children with error handling | |
if self.sub_neurons: | |
child_outputs = [] | |
for child in self.sub_neurons: | |
try: | |
# Ensure x has correct shape before passing to child | |
child_input = x.squeeze(0) if len(x.shape) == 2 else x | |
# Ensure input matches child's expected input dimension | |
if child_input.shape[-1] != child.input_dim: | |
child_input = child_input[:child.input_dim] | |
child_out = child.process_signal(child_input) | |
if not torch.isnan(child_out).any(): | |
# Ensure child output has correct shape for stacking | |
if len(child_out.shape) == 1: | |
child_out = child_out.unsqueeze(0) | |
child_outputs.append(child_out) | |
except Exception as e: | |
logging.error(f"Error in child neuron processing: {e}") | |
continue | |
if child_outputs: | |
child_outputs = torch.stack(child_outputs) | |
x = torch.mean(child_outputs, dim=0) | |
x = torch.clamp(x, -1.0, 1.0) | |
# Update energy with bounds | |
energy_cost = 0.1 * self.depth | |
self.state.energy = max(0.0, min(100.0, self.state.energy - energy_cost)) | |
# Remove batch dimension if it was added | |
if len(x.shape) == 2: | |
x = x.squeeze(0) | |
return x | |
except Exception as e: | |
logging.error(f"Error in process_signal: {e}") | |
return torch.zeros(self.output_dim) | |
def interact_with(self, other_neuron, strength=0.5): | |
"""Interact with another neuron""" | |
try: | |
# Bound strength value | |
strength = max(0.0, min(1.0, strength)) | |
# Share neural states with bounds | |
shared_activation = (self.state.activation + other_neuron.state.activation) / 2 | |
shared_activation = float(shared_activation) | |
if np.isnan(shared_activation): | |
logging.warning("NaN detected in shared activation. Using default value.") | |
shared_activation = 0.0 | |
self.state.activation = shared_activation | |
other_neuron.state.activation = shared_activation | |
# Share memories with bounds checking | |
shared_memory = [] | |
for a, b in zip(self.state.memory, other_neuron.state.memory): | |
shared_value = (float(a) + float(b)) / 2 | |
shared_value = max(0.0, min(1.0, shared_value)) | |
shared_memory.append(shared_value) | |
self.state.memory = shared_memory | |
other_neuron.state.memory = shared_memory | |
# Update connections with bounds | |
max_connections = 100 | |
self.state.connections = min(self.state.connections + 1, max_connections) | |
other_neuron.state.connections = min(other_neuron.state.connections + 1, max_connections) | |
return shared_activation | |
except Exception as e: | |
logging.error(f"Error in interact_with: {e}") | |
return 0.0 | |
def save_state(self): | |
"""Save the current state of the neuron""" | |
return { | |
'activation': self.state.activation, | |
'connections': self.state.connections, | |
'energy': self.state.energy, | |
'memory': self.state.memory.copy() | |
} | |
def load_state(self, state_dict): | |
"""Load a previously saved state""" | |
try: | |
self.state.activation = state_dict['activation'] | |
self.state.connections = state_dict['connections'] | |
self.state.energy = state_dict['energy'] | |
self.state.memory = state_dict['memory'].copy() | |
except Exception as e: | |
logging.error(f"Error loading neuron state: {e}") | |
def clone(self): | |
"""Create a deep copy of the neuron""" | |
try: | |
new_neuron = FractalNeuron( | |
input_dim=self.input_dim, | |
output_dim=self.output_dim, | |
depth=self.depth, | |
max_depth=self.max_depth | |
) | |
new_neuron.load_state(self.save_state()) | |
return new_neuron | |
except Exception as e: | |
logging.error(f"Error cloning neuron: {e}") | |
return None | |
def mutate(self, mutation_rate=0.1): | |
"""Apply random mutations to the neuron""" | |
try: | |
with torch.no_grad(): | |
# Mutate weights | |
for layer in self.synapse: | |
if isinstance(layer, nn.Linear): | |
mask = torch.rand_like(layer.weight) < mutation_rate | |
mutations = torch.randn_like(layer.weight) * 0.1 | |
layer.weight.data[mask] += mutations[mask] | |
if layer.bias is not None: | |
mask = torch.rand_like(layer.bias) < mutation_rate | |
mutations = torch.randn_like(layer.bias) * 0.1 | |
layer.bias.data[mask] += mutations[mask] | |
# Mutate memory gate | |
for layer in self.memory_gate: | |
if isinstance(layer, nn.Linear): | |
mask = torch.rand_like(layer.weight) < mutation_rate | |
mutations = torch.randn_like(layer.weight) * 0.1 | |
layer.weight.data[mask] += mutations[mask] | |
if layer.bias is not None: | |
mask = torch.rand_like(layer.bias) < mutation_rate | |
mutations = torch.randn_like(layer.bias) * 0.1 | |
layer.bias.data[mask] += mutations[mask] | |
# Recursively mutate child neurons | |
for child in self.sub_neurons: | |
child.mutate(mutation_rate) | |
except Exception as e: | |
logging.error(f"Error mutating neuron: {e}") | |
class FractalBrain: | |
def __init__(self, input_dim=32, hidden_dim=64, max_neurons=1000): | |
self.input_dim = min(input_dim, 32) # Limit input dimension | |
self.hidden_dim = min(hidden_dim, 64) # Limit hidden dimension | |
self.max_neurons = min(max_neurons, 1000) # Limit maximum neurons | |
# Core neural network components with reduced complexity | |
self.visual_cortex = FractalNeuron(self.input_dim, self.hidden_dim, max_depth=2) | |
self.thought_processor = FractalNeuron(self.hidden_dim, self.hidden_dim, max_depth=2) | |
self.action_generator = FractalNeuron(self.hidden_dim, self.input_dim, max_depth=2) | |
# State tracking with bounds | |
self.total_neurons = self.count_neurons() | |
self.total_energy = 100.0 # Reduced initial energy | |
self.memories = [] | |
self.current_vision = None | |
def get_vitals(self): | |
"""Get vital statistics of the brain with safety checks""" | |
try: | |
# Calculate average activation safely | |
activations = [] | |
for neuron in [self.visual_cortex, self.thought_processor, self.action_generator]: | |
try: | |
activation = float(neuron.state.activation) | |
if not np.isnan(activation) and not np.isinf(activation): | |
activations.append(activation) | |
except (AttributeError, ValueError, TypeError): | |
activations.append(0.0) | |
avg_activation = sum(activations) / max(len(activations), 1) | |
avg_activation = max(-1.0, min(1.0, avg_activation)) | |
# Get connection counts safely | |
connections = [] | |
for neuron in [self.visual_cortex, self.thought_processor, self.action_generator]: | |
try: | |
conn_count = int(neuron.state.connections) | |
if not np.isnan(conn_count) and not np.isinf(conn_count): | |
connections.append(conn_count) | |
except (AttributeError, ValueError, TypeError): | |
connections.append(0) | |
total_connections = sum(connections) | |
return { | |
'neurons': min(self.total_neurons, self.max_neurons), | |
'energy': max(0.0, min(1000.0, float(self.total_energy))), | |
'connections': max(0, min(1000, total_connections)), | |
'activation': avg_activation | |
} | |
except Exception as e: | |
logging.error(f"Exception in get_vitals: {e}. Returning default vitals.") | |
# Return safe default values if anything goes wrong | |
return { | |
'neurons': 1, | |
'energy': 0.0, | |
'connections': 0, | |
'activation': 0.0 | |
} | |
def process_vision(self, visual_input): | |
try: | |
with torch.no_grad(): | |
# Ensure input is valid and properly shaped | |
visual_input = visual_input.clone().detach() | |
if len(visual_input.shape) == 1: | |
visual_input = visual_input.unsqueeze(0) # Add batch dimension | |
if torch.isnan(visual_input).any(): | |
logging.warning("NaN detected in visual_input. Replacing with zeros.") | |
visual_input = torch.zeros_like(visual_input) | |
visual_input = torch.clamp(visual_input, -10.0, 10.0) | |
# Process through neural components with shape handling | |
try: | |
visual_features = self.visual_cortex.process_signal(visual_input) | |
if len(visual_features.shape) == 1: | |
visual_features = visual_features.unsqueeze(0) | |
except Exception as e: | |
logging.error(f"Exception in visual_cortex.process_signal: {e}. Using zero tensor.") | |
visual_features = torch.zeros((1, self.hidden_dim)) | |
try: | |
thoughts = self.thought_processor.process_signal(visual_features) | |
if len(thoughts.shape) == 1: | |
thoughts = thoughts.unsqueeze(0) | |
except Exception as e: | |
logging.error(f"Exception in thought_processor.process_signal: {e}. Using zero tensor.") | |
thoughts = torch.zeros((1, self.hidden_dim)) | |
try: | |
actions = self.action_generator.process_signal(thoughts) | |
except Exception as e: | |
logging.error(f"Exception in action_generator.process_signal: {e}. Using zero tensor.") | |
actions = torch.zeros(self.input_dim) | |
# Remove batch dimension from final output if present | |
if len(actions.shape) > 1: | |
actions = actions.squeeze(0) | |
# Ensure outputs are bounded | |
actions = torch.clamp(actions, -1.0, 1.0) | |
# Energy consumption with bounds | |
self.total_energy = max(0.0, min(1000.0, self.total_energy - 0.1)) | |
return actions | |
except Exception as e: | |
logging.error(f"Exception in process_vision: {e}. Returning zero actions.") | |
return torch.zeros(self.input_dim) | |
def interact_with(self, other_brain, strength=0.5): | |
try: | |
# Bound strength value | |
strength = max(0.0, min(1.0, strength)) | |
# Neural interactions with error handling | |
shared_visual = self.visual_cortex.interact_with(other_brain.visual_cortex, strength) | |
shared_thoughts = self.thought_processor.interact_with(other_brain.thought_processor, strength) | |
shared_actions = self.action_generator.interact_with(other_brain.action_generator, strength) | |
# Energy transfer with bounds | |
energy_diff = self.total_energy - other_brain.total_energy | |
transfer = max(-10.0, min(10.0, energy_diff * 0.1)) | |
self.total_energy = max(0.0, min(1000.0, self.total_energy - transfer)) | |
other_brain.total_energy = max(0.0, min(1000.0, other_brain.total_energy + transfer)) | |
return shared_visual, shared_thoughts, shared_actions | |
except Exception as e: | |
logging.error(f"Exception in interact_with: {e}. Returning zeros.") | |
return 0.0, 0.0, 0.0 | |
def count_neurons(self): | |
"""Safely count neurons with error handling""" | |
try: | |
def count_recursive(module): | |
count = 1 | |
if hasattr(module, 'sub_neurons'): | |
for child in module.sub_neurons: | |
count += count_recursive(child) | |
return min(count, self.max_neurons) # Limit total count | |
total = sum(count_recursive(x) for x in [ | |
self.visual_cortex, | |
self.thought_processor, | |
self.action_generator | |
]) | |
return min(total, self.max_neurons) | |
except Exception as e: | |
logging.error(f"Exception in count_neurons: {e}. Returning 1.") | |
return 1 # Return minimum count if counting fails | |
def can_grow(self): | |
"""Check if brain can grow new neurons""" | |
return (self.total_neurons < self.max_neurons and | |
self.total_energy > 100.0) | |
# ============================== | |
# Organism Definition and Behavior | |
# ============================== | |
class FractalOrganism: | |
def __init__(self, x, y, size=20, feature_dim=32, max_neurons=1000): | |
# Physical properties | |
self.pos = pygame.math.Vector2(x, y) | |
self.vel = pygame.math.Vector2(0, 0) | |
self.acc = pygame.math.Vector2(0, 0) | |
self.size = size | |
self.mass = size * 0.1 | |
# Neural system | |
self.brain = FractalBrain(input_dim=feature_dim, hidden_dim=feature_dim*2, max_neurons=max_neurons) | |
self.feature_dim = feature_dim | |
self.features = torch.randn(feature_dim) | |
# Visual properties with validation | |
self.color = self._features_to_color() | |
self.pattern_type = self._determine_pattern_type() | |
self.pattern_intensity = self._determine_pattern_intensity() | |
self.shape_points = self._generate_shape() | |
# Life properties | |
self.alive = True | |
self.age = 0 | |
def _validate_color_component(self, value): | |
"""Ensure color component is a valid integer between 0 and 255""" | |
try: | |
value = int(value) | |
return max(0, min(255, value)) | |
except (ValueError, TypeError): | |
return 0 | |
def update(self, screen_width, screen_height, organisms): | |
"""Update organism state""" | |
if not self.alive: | |
return | |
try: | |
# Physics integration | |
# Update velocity with acceleration | |
self.vel += self.acc | |
# Apply friction/damping | |
self.vel *= 0.98 # Slight damping to prevent infinite movement | |
# Update position with velocity | |
self.pos += self.vel | |
# Clear acceleration for next frame | |
self.acc.x = 0 | |
self.acc.y = 0 | |
# Get visual input and process through brain | |
visual_input = self._get_visual_input(organisms) | |
actions = self.brain.process_vision(visual_input) | |
# Apply neural network outputs as forces if valid | |
if isinstance(actions, torch.Tensor) and not torch.isnan(actions).any(): | |
self._apply_action_forces(actions) | |
# Wrap around screen edges | |
self.pos.x = self.pos.x % screen_width | |
self.pos.y = self.pos.y % screen_height | |
# Update life properties | |
self.age += 1 | |
vitals = self.brain.get_vitals() | |
# Death conditions | |
if vitals['energy'] <= 0 or self.age > 1000: | |
self.alive = False | |
except Exception as e: | |
logging.error(f"Error updating organism {id(self)}: {e}") | |
logging.debug(f"Organism state - Age: {self.age}, Alive: {self.alive}") | |
def _get_visual_input(self, organisms): | |
"""Create visual input tensor from surroundings""" | |
try: | |
visual_input = torch.zeros(self.feature_dim) | |
# Add self-perception (first 3 features are color) | |
color_tensor = torch.tensor([c/255.0 for c in self.color]) | |
visual_input[:3] = color_tensor[:3] | |
# Add velocity perception (helps with movement learning) | |
if hasattr(self, 'vel'): | |
velocity_magnitude = np.sqrt(self.vel.x**2 + self.vel.y**2) | |
velocity_direction = np.arctan2(self.vel.y, self.vel.x) / np.pi | |
if 3 < len(visual_input): | |
visual_input[3] = float(velocity_magnitude) / 10.0 # Normalize velocity | |
if 4 < len(visual_input): | |
visual_input[4] = float(velocity_direction) | |
# Add perception of nearby organisms | |
for other in organisms: | |
if other != self and other.alive: | |
distance = self.pos.distance_to(other.pos) | |
if distance < 100: # Visual range | |
direction = (other.pos - self.pos) | |
if direction.length() > 0: | |
direction = direction.normalize() | |
angle = np.arctan2(direction.y, direction.x) | |
# Map angle to feature index | |
idx = int((angle + np.pi) / (2 * np.pi) * (self.feature_dim - 5)) + 5 | |
idx = min(max(5, idx), self.feature_dim - 1) | |
# Set feature value based on distance and target's properties | |
intensity = 1.0 - min(1.0, distance / 100) | |
visual_input[idx] = intensity | |
# Add information about target's energy level if visible | |
if idx + 1 < self.feature_dim: | |
target_energy = float(other.brain.total_energy) / 1000.0 | |
if not np.isnan(target_energy): | |
visual_input[idx + 1] = target_energy | |
return visual_input | |
except Exception as e: | |
logging.error(f"Error in _get_visual_input: {e}") | |
return torch.zeros(self.feature_dim) | |
def _apply_action_forces(self, actions): | |
"""Convert neural actions to physical forces with better control""" | |
try: | |
if not isinstance(actions, torch.Tensor): | |
return | |
# Get first two action dimensions for movement control | |
if len(actions) >= 2: | |
# Scale force based on neural network activation | |
activation = float(self.brain.visual_cortex.state.activation) | |
force_scale = 20.0 # Increased for more visible movement | |
# Convert actions to directional movement | |
force_x = float(actions[0].item()) * force_scale * (1 + abs(activation)) | |
force_y = float(actions[1].item()) * force_scale * (1 + abs(activation)) | |
# Add some randomness for exploration when activation is low | |
if abs(activation) < 0.2: | |
force_x += random.uniform(-2.0, 2.0) # Increased random movement | |
force_y += random.uniform(-2.0, 2.0) | |
# Clamp forces but allow for stronger movement | |
max_force = 40.0 # Increased maximum force | |
force_x = max(-max_force, min(max_force, force_x)) | |
force_y = max(-max_force, min(max_force, force_y)) | |
# Apply the forces | |
self.apply_force((force_x, force_y)) | |
# Additional actions for other behaviors | |
if len(actions) >= 4: | |
try: | |
# Action 3: Energy usage control | |
energy_control = float(actions[2].item()) | |
if energy_control > 0.8: | |
self.brain.total_energy += energy_control * 0.1 | |
# Action 4: Interaction strength | |
interaction_strength = max(0, min(1, float(actions[3].item()))) | |
if not hasattr(self, 'interaction_strength'): | |
self.__dict__['interaction_strength'] = interaction_strength | |
else: | |
self.interaction_strength = interaction_strength | |
except Exception as e: | |
logging.error(f"Error processing additional actions: {e}") | |
except Exception as e: | |
logging.error(f"Error in _apply_action_forces: {e}") | |
def apply_force(self, force): | |
"""Apply physics force with validation""" | |
try: | |
if isinstance(force, (tuple, list)) and len(force) >= 2: | |
fx = float(force[0]) | |
fy = float(force[1]) | |
# Check for NaN | |
if np.isnan(fx) or np.isnan(fy): | |
return | |
# Limit maximum force | |
max_force = 10.0 | |
fx = max(-max_force, min(max_force, fx)) | |
fy = max(-max_force, min(max_force, fy)) | |
force = pygame.math.Vector2(fx, fy) | |
# Validate acceleration before applying | |
new_acc = force / self.mass | |
if not (np.isnan(new_acc.x) or np.isnan(new_acc.y)): | |
self.acc.update(new_acc.x, new_acc.y) | |
# Clamp acceleration | |
max_acc = 5.0 | |
self.acc.x = max(-max_acc, min(max_acc, self.acc.x)) | |
self.acc.y = max(-max_acc, min(max_acc, self.acc.y)) | |
except Exception as e: | |
logging.error(f"Error in apply_force: {e}") | |
def _features_to_color(self): | |
"""Convert feature vector to RGB color with validation""" | |
try: | |
r = self._validate_color_component((self.features[0].item() + 1) / 2 * 255) | |
g = self._validate_color_component((self.features[1].item() + 1) / 2 * 255) | |
b = self._validate_color_component((self.features[2].item() + 1) / 2 * 255) | |
return (r, g, b) | |
except (IndexError, AttributeError) as e: | |
logging.error(f"Error in _features_to_color: {e}. Defaulting to (100, 100, 100).") | |
return (100, 100, 100) | |
def _determine_pattern_type(self): | |
"""Determine pattern type based on specific features""" | |
try: | |
# Use features 3 and 4 to determine pattern type safely | |
if len(self.features) >= 5: | |
feature_sum = float(self.features[3].item() + self.features[4].item()) | |
if feature_sum > 1: | |
return 'stripes' | |
elif feature_sum < -1: | |
return 'spots' | |
else: | |
return 'gradient' | |
return 'gradient' # Default pattern | |
except (IndexError, AttributeError, ValueError) as e: | |
logging.error(f"Error in _determine_pattern_type: {e}. Defaulting to 'gradient'.") | |
return 'gradient' # Fallback pattern | |
def _determine_pattern_intensity(self): | |
"""Determine pattern intensity based on specific features""" | |
try: | |
if len(self.features) >= 6: | |
intensity = (float(self.features[5].item()) + 1) / 2 | |
return max(0.0, min(1.0, intensity)) | |
return 0.5 # Default intensity | |
except (IndexError, AttributeError, ValueError) as e: | |
logging.error(f"Error in _determine_pattern_intensity: {e}. Defaulting to 0.5.") | |
return 0.5 # Fallback intensity | |
def _generate_shape(self): | |
"""Generate a polygon shape based on the pattern type""" | |
try: | |
points = [] | |
if self.pattern_type == 'stripes': | |
# Generate a star-like shape with protrusions | |
for angle in range(0, 360, 30): | |
rad = np.radians(angle) | |
x = self.size * np.cos(rad) | |
y = self.size * np.sin(rad) | |
# Alternate between outer and inner points for stripes | |
if (angle // 30) % 2 == 0: | |
points.append((x * 1.2, y * 1.2)) | |
else: | |
points.append((x * 0.8, y * 0.8)) | |
elif self.pattern_type == 'spots': | |
# Generate a more circular, smooth shape with bulges | |
for angle in range(0, 360, 45): | |
rad = np.radians(angle) | |
x = self.size * (1 + 0.3 * np.sin(4 * rad)) * np.cos(rad) | |
y = self.size * (1 + 0.3 * np.sin(4 * rad)) * np.sin(rad) | |
points.append((x, y)) | |
else: # 'gradient' or other patterns | |
# Simple regular polygon | |
for angle in range(0, 360, 60): | |
rad = np.radians(angle) | |
x = self.size * np.cos(rad) | |
y = self.size * np.sin(rad) | |
points.append((x, y)) | |
# Validate points and ensure we have at least a triangle | |
if len(points) < 3: | |
# Fallback to basic triangle | |
points = [ | |
(-self.size, -self.size), | |
(self.size, -self.size), | |
(0, self.size) | |
] | |
return points | |
except Exception as e: | |
logging.error(f"Error in _generate_shape: {e}. Defaulting to basic triangle.") | |
# Fallback to basic triangle if anything goes wrong | |
return [ | |
(-self.size, -self.size), | |
(self.size, -self.size), | |
(0, self.size) | |
] | |
def reproduce(self, mate, mutation_rate=0.1): | |
"""Reproduce with another organism to create a child organism with possible mutations""" | |
try: | |
# Check reproduction energy requirements | |
if not hasattr(self.brain, 'REPRODUCTION_ENERGY'): | |
self.brain.REPRODUCTION_ENERGY = 150.0 # Default value if not set | |
if self.brain.total_energy < self.brain.REPRODUCTION_ENERGY or mate.brain.total_energy < mate.brain.REPRODUCTION_ENERGY: | |
return None | |
# Deduct energy for reproduction | |
self.brain.total_energy -= 50.0 | |
mate.brain.total_energy -= 50.0 | |
# Blend features | |
child_features = (self.features + mate.features) / 2 | |
# Apply mutations | |
for i in range(len(child_features)): | |
if random.random() < mutation_rate: | |
child_features[i] += random.uniform(-0.1, 0.1) | |
# Clamp mutated features to prevent extreme values | |
child_features = torch.clamp(child_features, -1.0, 1.0) | |
# Create child organism | |
child = FractalOrganism( | |
x=(self.pos.x + mate.pos.x) / 2 + random.uniform(-10, 10), | |
y=(self.pos.y + mate.pos.y) / 2 + random.uniform(-10, 10), | |
size=self.size, | |
feature_dim=self.feature_dim, | |
max_neurons=self.brain.max_neurons | |
) | |
child.features = child_features | |
child.color = child._features_to_color() | |
child.pattern_type = child._determine_pattern_type() | |
child.pattern_intensity = child._determine_pattern_intensity() | |
child.shape_points = child._generate_shape() | |
child.brain = self._mutate_brain(mate.brain, mutation_rate) | |
return child | |
except Exception as e: | |
logging.error(f"Error in reproduction: {e}") | |
return None | |
def _mutate_brain(self, brain, mutation_rate): | |
"""Mutate the brain's neurons""" | |
try: | |
# For simplicity, we can randomly add connections or adjust activation | |
# Here, we'll randomly adjust activation levels | |
brain.visual_cortex.state.activation += random.uniform(-0.1, 0.1) | |
brain.thought_processor.state.activation += random.uniform(-0.1, 0.1) | |
brain.action_generator.state.activation += random.uniform(-0.1, 0.1) | |
# Ensure activations stay in valid range | |
brain.visual_cortex.state.activation = max(-1.0, min(1.0, brain.visual_cortex.state.activation)) | |
brain.thought_processor.state.activation = max(-1.0, min(1.0, brain.thought_processor.state.activation)) | |
brain.action_generator.state.activation = max(-1.0, min(1.0, brain.action_generator.state.activation)) | |
return brain | |
except Exception as e: | |
logging.error(f"Error in brain mutation: {e}. Returning unmutated brain.") | |
return brain | |
def interact_with(self, other): | |
"""Interact with another organism""" | |
try: | |
distance = self.pos.distance_to(other.pos) | |
if distance < self.size + other.size: | |
# Neural interaction | |
interaction_strength = 1.0 - distance / (self.size + other.size) | |
self.brain.interact_with(other.brain, interaction_strength) | |
# Physical interaction (simple collision) | |
direction = (self.pos - other.pos).normalize() | |
force = direction * interaction_strength * 5 | |
self.apply_force(force) | |
other.apply_force(-force) | |
return True | |
return False | |
except Exception as e: | |
logging.error(f"Error in organism interaction: {e}") | |
return False | |
def _blend_patterns(self, pattern1: str, pattern2: str) -> str: | |
"""Blend two pattern types to create a new pattern type""" | |
try: | |
if pattern1 == pattern2: | |
return pattern1 | |
else: | |
# Simple blending logic: randomly choose one of the parent patterns or a new pattern | |
return random.choice([pattern1, pattern2, 'stripes', 'spots', 'gradient']) | |
except Exception as e: | |
logging.error(f"Error in _blend_patterns: {e}. Defaulting to 'gradient'.") | |
return 'gradient' # Default pattern if anything goes wrong | |
# ============================== | |
# Physics and Interaction Handling | |
# ============================== | |
class PhysicsEngine: | |
def __init__(self, width: int, height: int, config: PhysicsConfig): | |
self.config = config | |
# Initialize pymunk space | |
self.space = pymunk.Space() | |
self.space.damping = self.config.DAMPING | |
# Create boundaries | |
self.create_boundaries(width, height) | |
# Collision handler for organisms | |
handler = self.space.add_collision_handler( | |
self.config.COLLISION_TYPE_ORGANISM, | |
self.config.COLLISION_TYPE_ORGANISM | |
) | |
handler.begin = self.handle_collision | |
# Track interactions | |
self.current_interactions: Set[tuple] = set() | |
# Store dimensions | |
self.width = width | |
self.height = height | |
def update(self, dt: float): | |
"""Update physics simulation""" | |
try: | |
# Pymunk works best with a fixed time step | |
fixed_dt = 1.0 / 60.0 | |
steps = max(1, min(4, int(dt / fixed_dt))) # Limit max steps to prevent spiral | |
for _ in range(steps): | |
self.space.step(fixed_dt) | |
# Update organism positions from physics bodies | |
for body in self.space.bodies: | |
if hasattr(body, 'organism'): | |
try: | |
organism = body.organism | |
# Validate positions | |
if not (np.isnan(body.position.x) or np.isnan(body.position.y)): | |
new_x = float(body.position.x % self.width) | |
new_y = float(body.position.y % self.height) | |
# Update pygame Vector2 position | |
organism.pos.update(new_x, new_y) | |
else: | |
# Reset to center if NaN | |
body.position = self.width/2, self.height/2 | |
organism.pos.update(self.width/2, self.height/2) | |
# Validate velocities | |
if not (np.isnan(body.velocity.x) or np.isnan(body.velocity.y)): | |
max_velocity = 200.0 | |
vx = max(-max_velocity, min(max_velocity, body.velocity.x)) | |
vy = max(-max_velocity, min(max_velocity, body.velocity.y)) | |
# Update pygame Vector2 velocity | |
organism.vel.update(vx, vy) | |
else: | |
body.velocity = (0, 0) | |
organism.vel.update(0, 0) | |
except Exception as e: | |
logging.error(f"Error updating organism physics state: {e}") | |
# Reset body to safe state | |
body.position = self.width/2, self.height/2 | |
body.velocity = (0, 0) | |
try: | |
organism.pos.update(self.width/2, self.height/2) | |
organism.vel.update(0, 0) | |
except: | |
pass | |
except Exception as e: | |
logging.error(f"Error updating physics: {e}") | |
def create_boundaries(self, width: int, height: int): | |
"""Create screen boundaries""" | |
try: | |
walls = [ | |
[(0, 0), (width, 0)], # Top | |
[(width, 0), (width, height)], # Right | |
[(width, height), (0, height)], # Bottom | |
[(0, height), (0, 0)] # Left | |
] | |
for wall in walls: | |
shape = pymunk.Segment(self.space.static_body, wall[0], wall[1], 0) | |
shape.elasticity = self.config.ELASTICITY | |
shape.friction = self.config.FRICTION | |
self.space.add(shape) | |
except Exception as e: | |
logging.error(f"Error creating boundaries: {e}") | |
def add_organism(self, organism: FractalOrganism) -> pymunk.Body: | |
"""Add organism to physics space""" | |
try: | |
# Validate mass | |
mass = max(0.1, organism.mass) # Ensure positive mass | |
moment = pymunk.moment_for_circle(mass, 0, organism.size) | |
body = pymunk.Body(mass, moment) | |
# Validate initial position and velocity | |
body.position = ( | |
float(organism.pos.x % self.width), | |
float(organism.pos.y % self.height) | |
) | |
# Clamp initial velocity | |
max_initial_velocity = 50.0 | |
vel_x = max(-max_initial_velocity, min(max_initial_velocity, organism.vel.x)) | |
vel_y = max(-max_initial_velocity, min(max_initial_velocity, organism.vel.y)) | |
body.velocity = (vel_x, vel_y) | |
# Validate shape points and create polygon | |
valid_vertices = [] | |
for x, y in organism.shape_points: | |
if not (np.isnan(x) or np.isnan(y)): | |
valid_vertices.append((float(x), float(y))) | |
# If insufficient valid vertices, create default circle shape | |
if len(valid_vertices) < 3: | |
logging.warning(f"Insufficient valid vertices for organism {id(organism)}, using circle shape") | |
shape = pymunk.Circle(body, organism.size) | |
else: | |
shape = pymunk.Poly(body, valid_vertices) | |
shape.elasticity = self.config.ELASTICITY | |
shape.friction = self.config.FRICTION | |
shape.collision_type = self.config.COLLISION_TYPE_ORGANISM | |
# Store reference to organism | |
shape.organism = organism | |
body.organism = organism | |
self.space.add(body, shape) | |
return body | |
except Exception as e: | |
logging.error(f"Error adding organism to physics: {e}") | |
return None | |
def handle_collision(self, arbiter, space, data): | |
"""Handle collision between organisms""" | |
try: | |
# Get colliding organisms | |
shape_a, shape_b = arbiter.shapes | |
org_a, org_b = shape_a.organism, shape_b.organism | |
# Add to interaction set | |
interaction_pair = tuple(sorted([id(org_a), id(org_b)])) | |
self.current_interactions.add(interaction_pair) | |
# Calculate collision response with validation | |
restitution = max(0, min(1, self.config.ELASTICITY)) | |
j = -(1 + restitution) * arbiter.total_ke / 2 | |
# Validate impulse | |
if not np.isnan(j): | |
# Clamp impulse to prevent extreme values | |
max_impulse = 1000.0 | |
j = max(-max_impulse, min(max_impulse, j)) | |
body_a = shape_a.body | |
body_b = shape_b.body | |
normal = arbiter.normal | |
point = arbiter.contact_point_set.points[0].point_a | |
# Apply impulse along the collision normal | |
body_a.apply_impulse_at_world_point(j * normal, point) | |
body_b.apply_impulse_at_world_point(-j * normal, point) | |
return True | |
except Exception as e: | |
logging.error(f"Error handling collision: {e}") | |
return False | |
def process_interactions(self, organisms: List[FractalOrganism]): | |
"""Process all current interactions""" | |
try: | |
# Process collision-based interactions | |
for org_a_id, org_b_id in self.current_interactions: | |
org_a = next((org for org in organisms if id(org) == org_a_id), None) | |
org_b = next((org for org in organisms if id(org) == org_b_id), None) | |
if org_a and org_b and org_a.alive and org_b.alive: | |
# Neural interaction | |
shared_thoughts = org_a.brain.interact_with(org_b.brain) | |
# Energy transfer based on neural activity | |
energy_diff = org_a.brain.total_energy - org_b.brain.total_energy | |
transfer = max(-10.0, min(10.0, energy_diff * 0.1)) # Limit transfer amount | |
org_a.brain.total_energy = max(0, org_a.brain.total_energy - transfer) | |
org_b.brain.total_energy = max(0, org_b.brain.total_energy + transfer) | |
# Clear interactions for next frame | |
self.current_interactions.clear() | |
# Process proximity-based interactions | |
for i, org_a in enumerate(organisms): | |
if not org_a.alive: | |
continue | |
for org_b in organisms[i+1:]: | |
if not org_b.alive: | |
continue | |
# Calculate distance with validation | |
dx = org_b.pos.x - org_a.pos.x | |
dy = org_b.pos.y - org_a.pos.y | |
if np.isnan(dx) or np.isnan(dy): | |
continue | |
distance = np.sqrt(dx*dx + dy*dy) | |
if distance < self.config.INTERACTION_RADIUS: | |
# Calculate interaction strength based on distance | |
strength = 1.0 - (distance / self.config.INTERACTION_RADIUS) | |
# Neural field effect with reduced strength | |
field_interaction = strength * 0.5 | |
# Calculate force with validation | |
force_magnitude = field_interaction * self.config.FORCE_SCALE | |
force_angle = np.arctan2(dy, dx) | |
if not (np.isnan(force_magnitude) or np.isnan(force_angle)): | |
force_x = np.cos(force_angle) * force_magnitude | |
force_y = np.sin(force_angle) * force_magnitude | |
# Clamp forces | |
max_force = 100.0 | |
force_x = max(-max_force, min(max_force, force_x)) | |
force_y = max(-max_force, min(max_force, force_y)) | |
# Apply forces through physics bodies | |
body_a = next((body for body in self.space.bodies | |
if hasattr(body, 'organism') and body.organism == org_a), None) | |
body_b = next((body for body in self.space.bodies | |
if hasattr(body, 'organism') and body.organism == org_b), None) | |
if body_a and body_b: | |
body_a.apply_force_at_local_point((-force_x, -force_y), (0, 0)) | |
body_b.apply_force_at_local_point((force_x, force_y), (0, 0)) | |
# Apply direct forces to organisms as well | |
org_a.apply_force((-force_x, -force_y)) | |
org_b.apply_force((force_x, force_y)) | |
except Exception as e: | |
logging.error(f"Error processing interactions: {e}") | |
# ============================== | |
# Visualization with PyGame | |
# ============================== | |
class NeuralVisualizer: | |
def __init__(self, width: int, height: int, config: VisualizationConfig): | |
self.width = width | |
self.height = height | |
self.config = config | |
self.neuron_surface = pygame.Surface((width, height), pygame.SRCALPHA) | |
self.connection_surface = pygame.Surface((width, height), pygame.SRCALPHA) | |
def _apply_pattern_overlay(self, organism, surface): | |
"""Apply visual pattern overlay based on organism type""" | |
try: | |
if not organism.alive: | |
return | |
pattern_alpha = int(255 * organism.pattern_intensity) | |
pattern_color = ( | |
255 - organism.color[0], | |
255 - organism.color[1], | |
255 - organism.color[2], | |
pattern_alpha | |
) | |
# Create pattern surface | |
pattern_surface = pygame.Surface((self.width, self.height), pygame.SRCALPHA) | |
if organism.pattern_type == 'stripes': | |
# Draw alternating stripes | |
stride = max(5, int(organism.size * 0.5)) | |
x, y = int(organism.pos.x), int(organism.pos.y) | |
for i in range(-2, 3): | |
offset = i * stride | |
pygame.draw.line(pattern_surface, pattern_color, | |
(x - organism.size, y + offset), | |
(x + organism.size, y + offset), 2) | |
elif organism.pattern_type == 'spots': | |
# Draw spots in a circular pattern | |
x, y = int(organism.pos.x), int(organism.pos.y) | |
spot_size = max(2, int(organism.size * 0.2)) | |
for angle in range(0, 360, 45): | |
spot_x = x + int(np.cos(np.radians(angle)) * organism.size * 0.7) | |
spot_y = y + int(np.sin(np.radians(angle)) * organism.size * 0.7) | |
pygame.draw.circle(pattern_surface, pattern_color, | |
(spot_x, spot_y), spot_size) | |
else: # gradient | |
# Draw radial gradient | |
x, y = int(organism.pos.x), int(organism.pos.y) | |
max_radius = int(organism.size * 1.2) | |
for radius in range(max_radius, 0, -2): | |
alpha = int((radius / max_radius) * pattern_alpha) | |
current_color = (*pattern_color[:3], alpha) | |
pygame.draw.circle(pattern_surface, current_color, | |
(x, y), radius, 1) | |
# Blend pattern with surface | |
surface.blit(pattern_surface, (0, 0), special_flags=pygame.BLEND_ALPHA_SDL2) | |
except Exception as e: | |
logging.error(f"Error applying pattern overlay: {e}") | |
def draw_brain_state(self, organism: FractalOrganism, surface: pygame.Surface): | |
"""Draw neural activity visualization with patterns and NaN handling""" | |
try: | |
# Clear previous state | |
self.neuron_surface.fill((0, 0, 0, 0)) | |
self.connection_surface.fill((0, 0, 0, 0)) | |
# Get brain vitals with NaN check | |
vitals = organism.brain.get_vitals() | |
if any(np.isnan(value) for value in vitals.values() if isinstance(value, (int, float))): | |
logging.warning(f"NaN detected in vitals for organism {id(organism)}. Marking for death.") | |
organism.alive = False | |
return | |
# Calculate neural positions based on fractal pattern | |
def plot_neural_layer(center, radius, neurons, depth=0): | |
if depth >= 3 or not neurons: | |
return | |
# Convert neurons to list if it's a ModuleList | |
if hasattr(neurons, 'sub_neurons'): | |
neurons = neurons.sub_neurons | |
if not neurons or len(neurons) == 0: | |
return | |
angle_step = 2 * np.pi / len(neurons) | |
for i, neuron in enumerate(neurons): | |
try: | |
angle = i * angle_step | |
x = center[0] + radius * np.cos(angle) | |
y = center[1] + radius * np.sin(angle) | |
# NaN check for coordinates | |
if np.isnan(x) or np.isnan(y): | |
logging.warning(f"NaN coordinates detected for neuron {i}. Skipping.") | |
continue | |
# Draw neuron | |
activation = float(neuron.state.activation) | |
connections = int(neuron.state.connections) | |
# Check for NaNs in neuron state | |
if np.isnan(activation) or np.isnan(connections): | |
logging.warning(f"NaN detected in neuron state. Marking organism for death.") | |
organism.alive = False | |
return | |
# Ensure coordinates are valid integers | |
x_pos = int(np.clip(x, 0, self.width)) | |
y_pos = int(np.clip(y, 0, self.height)) | |
color = self._get_neuron_color(activation, connections) | |
pygame.draw.circle(self.neuron_surface, color, (x_pos, y_pos), 5) | |
# Draw connections with safety checks | |
if connections > 0: | |
alpha = int(255 * min(connections / self.config.MAX_NEURAL_CONNECTIONS, 1)) | |
if not np.isnan(alpha): | |
connection_color = (*self.config.CONNECTION_COLOR[:3], alpha) | |
pygame.draw.line( | |
self.connection_surface, | |
connection_color, | |
(x_pos, y_pos), | |
(int(center[0]), int(center[1])), | |
2 | |
) | |
# Recursively draw sub-neurons | |
if hasattr(neuron, 'sub_neurons') and neuron.sub_neurons: | |
child_radius = radius * 0.5 | |
child_center = (x, y) | |
plot_neural_layer(child_center, child_radius, neuron.sub_neurons, depth + 1) | |
except Exception as e: | |
logging.error(f"Error plotting neuron {i}: {e}") | |
continue | |
# Draw neural network | |
try: | |
center = (organism.pos.x, organism.pos.y) | |
if not (np.isnan(center[0]) or np.isnan(center[1])): | |
plot_neural_layer(center, organism.size * 2, | |
[organism.brain.visual_cortex, | |
organism.brain.thought_processor, | |
organism.brain.action_generator]) | |
except Exception as e: | |
logging.error(f"Error plotting neural network: {e}") | |
# Apply pattern overlay with safety checks | |
try: | |
self._apply_pattern_overlay(organism, surface) | |
except Exception as e: | |
logging.error(f"Error applying pattern overlay: {e}") | |
# Combine surfaces with alpha blending | |
surface.blit(self.connection_surface, (0, 0)) | |
surface.blit(self.neuron_surface, (0, 0)) | |
except Exception as e: | |
logging.error(f"Error in draw_brain_state: {e}") | |
def _get_neuron_color(self, activation: float, connections: int) -> Tuple[int, int, int]: | |
"""Generate color based on neuron state""" | |
try: | |
# Use HSV color space for smooth transitions | |
hue = (activation + 1) / 2 # Map -1,1 to 0,1 | |
saturation = min(connections / self.config.MAX_NEURAL_CONNECTIONS, 1) | |
value = 0.8 + 0.2 * activation | |
# Convert to RGB | |
rgb = colorsys.hsv_to_rgb(hue, saturation, value) | |
return tuple(int(255 * x) for x in rgb) | |
except Exception as e: | |
logging.error(f"Error in _get_neuron_color: {e}. Defaulting to gray.") | |
return (100, 100, 100) | |
class SimulationVisualizer: | |
def __init__(self, width: int, height: int, config: VisualizationConfig): | |
pygame.init() | |
self.width = width | |
self.height = height | |
self.config = config | |
# Enable double buffering and vsync | |
self.screen = pygame.display.set_mode( | |
(width, height), | |
pygame.DOUBLEBUF | pygame.HWSURFACE | pygame.SCALED, | |
vsync=1 | |
) | |
pygame.display.set_caption("Fractal Life Simulation") | |
# Create off-screen surfaces for double buffering | |
self.buffer = pygame.Surface((width, height), pygame.SRCALPHA) | |
self.neural_viz = NeuralVisualizer(width, height, config) | |
self.background = pygame.Surface((width, height)) | |
self.background.fill(config.BACKGROUND_COLOR) | |
# Additional surfaces for layered rendering | |
self.organism_surface = pygame.Surface((width, height), pygame.SRCALPHA) | |
self.interaction_surface = pygame.Surface((width, height), pygame.SRCALPHA) | |
self.stats_surface = pygame.Surface((width, height), pygame.SRCALPHA) | |
def _validate_color(self, color): | |
"""Validate and ensure color values are proper RGB integers""" | |
try: | |
if len(color) >= 3: | |
return ( | |
max(0, min(255, int(color[0]))), | |
max(0, min(255, int(color[1]))), | |
max(0, min(255, int(color[2]))) | |
) | |
return (100, 100, 100) # Default fallback color | |
except Exception as e: | |
logging.error(f"Error validating color: {e}") | |
return (100, 100, 100) | |
def draw_frame(self, organisms: List[FractalOrganism], stats: Dict): | |
"""Draw complete frame with all visualizations""" | |
try: | |
# Clear all off-screen surfaces | |
self.organism_surface.fill((0, 0, 0, 0)) | |
self.interaction_surface.fill((0, 0, 0, 0)) | |
self.stats_surface.fill((0, 0, 0, 0)) | |
# Draw organisms and their neural states | |
for organism in organisms: | |
if organism.alive: | |
self._draw_organism(organism, self.organism_surface) | |
self.neural_viz.draw_brain_state(organism, self.interaction_surface) | |
# Draw statistics | |
self._draw_stats(stats, self.stats_surface) | |
# Blit off-screen surfaces to the main display surface | |
self.screen.blit(self.background, (0, 0)) # Background layer | |
self.screen.blit(self.organism_surface, (0, 0)) | |
self.screen.blit(self.interaction_surface, (0, 0)) | |
self.screen.blit(self.stats_surface, (0, 0)) | |
# Flip display buffers to avoid flickering | |
pygame.display.flip() | |
except Exception as e: | |
logging.error(f"Error in draw_frame: {e}") | |
def _draw_organism(self, organism: FractalOrganism, surface: pygame.Surface): | |
"""Draw organism body with color and pattern""" | |
try: | |
# Validate color before drawing | |
safe_color = self._validate_color(organism.color) | |
# Draw shape with validated color | |
points = [] | |
for x, y in organism.shape_points: | |
try: | |
px = organism.pos.x + x | |
py = organism.pos.y + y | |
if not (np.isnan(px) or np.isnan(py)): | |
points.append((px, py)) | |
else: | |
logging.warning(f"NaN detected in shape points for organism {id(organism)}. Skipping point.") | |
except Exception as e: | |
logging.error(f"Error processing shape points for organism {id(organism)}: {e}") | |
continue | |
if len(points) >= 3: | |
pygame.draw.polygon(surface, safe_color, points) | |
else: | |
logging.warning(f"Insufficient valid points to draw organism {id(organism)}. Skipping drawing.") | |
# Draw energy bar | |
energy_percentage = min(max(organism.brain.total_energy / 1000.0, 0), 1) | |
bar_width = organism.size * 2 | |
bar_height = 4 | |
bar_pos = (organism.pos.x - bar_width / 2, organism.pos.y - organism.size - 10) | |
pygame.draw.rect(surface, (50, 50, 50), (*bar_pos, bar_width, bar_height)) | |
pygame.draw.rect(surface, (0, 255, 0), # Using direct color value for energy bar | |
(*bar_pos, bar_width * energy_percentage, bar_height)) | |
except Exception as e: | |
logging.error(f"Error in _draw_organism: {e}") | |
def _draw_stats(self, stats: Dict, surface: pygame.Surface): | |
"""Draw simulation statistics""" | |
try: | |
font = pygame.font.Font(None, 24) | |
y_pos = 10 | |
for key, value in stats.items(): | |
text = font.render(f"{key.capitalize()}: {value}", True, (255, 255, 255)) | |
surface.blit(text, (10, y_pos)) | |
y_pos += 25 | |
except Exception as e: | |
logging.error(f"Error in _draw_stats: {e}") | |
def cleanup(self): | |
"""Clean up pygame resources""" | |
try: | |
pygame.quit() | |
except Exception as e: | |
logging.error(f"Error during pygame cleanup: {e}") | |
# ============================== | |
# Interaction Field (Optional Enhancement) | |
# ============================== | |
class InteractionField: | |
def __init__(self, width: int, height: int, resolution: int = 50): | |
self.width = max(1, width) | |
self.height = max(1, height) | |
self.resolution = max(10, min(resolution, 100)) # Bound resolution | |
# Create field grid with safe dimensions | |
self.grid_w = max(1, self.width // self.resolution) | |
self.grid_h = max(1, self.height // self.resolution) | |
self.field = np.zeros((self.grid_w, self.grid_h, 3)) | |
def update(self, organisms: List[FractalOrganism]): | |
"""Update field based on organism neural activity with safety checks""" | |
try: | |
# Decay field | |
self.field *= 0.9 | |
np.clip(self.field, 0, 1, out=self.field) | |
for org in organisms: | |
if not org.alive: | |
continue | |
try: | |
# Safe position to grid conversion | |
pos_x = max(0, min(float(org.pos.x), self.width)) | |
pos_y = max(0, min(float(org.pos.y), self.height)) | |
grid_x = int((pos_x / self.width) * (self.grid_w - 1)) | |
grid_y = int((pos_y / self.height) * (self.grid_h - 1)) | |
# Get neural activity with safety checks | |
vitals = org.brain.get_vitals() | |
activity_color = np.array([ | |
max(0.0, min(1.0, float(vitals['activation']))), | |
max(0.0, min(1.0, float(vitals['energy']) / 1000.0)), | |
max(0.0, min(1.0, float(vitals['connections']) / 100.0)) | |
]) | |
# Apply to field with falloff | |
for dx in range(-2, 3): | |
for dy in range(-2, 3): | |
x = (grid_x + dx) % self.grid_w | |
y = (grid_y + dy) % self.grid_h | |
distance = np.sqrt(dx * dx + dy * dy) | |
if distance < 3: | |
intensity = max(0.0, min(1.0, (3 - distance) / 3)) | |
self.field[x, y] += activity_color * intensity | |
except (ValueError, TypeError, ZeroDivisionError) as e: | |
logging.error(f"Error updating field for organism {id(org)}: {e}") | |
continue # Skip problematic organisms | |
# Ensure field values stay in valid range | |
np.clip(self.field, 0, 1, out=self.field) | |
except Exception as e: | |
logging.error(f"Error in InteractionField.update: {e}") | |
def get_field_at(self, x: float, y: float) -> np.ndarray: | |
"""Safely get field value at position""" | |
try: | |
x = max(0, min(float(x), self.width)) | |
y = max(0, min(float(y), self.height)) | |
grid_x = int((x / self.width) * (self.grid_w - 1)) | |
grid_y = int((y / self.height) * (self.grid_h - 1)) | |
return self.field[grid_x, grid_y] | |
except (ValueError, TypeError, IndexError) as e: | |
logging.error(f"Error in get_field_at for position ({x}, {y}): {e}") | |
return np.zeros(3) | |
# ============================== | |
# Simulation State Tracking | |
# ============================== | |
class SimulationState: | |
"""Tracks the current state of the simulation""" | |
def __init__(self): | |
self.running = False | |
self.paused = False | |
self.step_count = 0 | |
self.stats = { | |
'population': 0, | |
'avg_energy': 0.0, | |
'avg_neurons': 0.0, | |
'avg_connections': 0.0, | |
'total_interactions': 0 | |
} | |
self.selected_organism: Optional[FractalOrganism] = None | |
# ============================== | |
# Main Simulation Class | |
# ============================== | |
class FractalLifeSimulation: | |
def __init__(self, config: SimulationConfig, shared_data: Dict): | |
self.config = config | |
self.state = SimulationState() | |
# Shared data for Gradio interface | |
self.shared_data = shared_data | |
self.shared_lock = threading.Lock() | |
# Initialize systems | |
visualization_config = VisualizationConfig() | |
self.visualizer = SimulationVisualizer(config.WIDTH, config.HEIGHT, visualization_config) | |
self.physics = PhysicsEngine(config.WIDTH, config.HEIGHT, PhysicsConfig()) | |
self.field = InteractionField(config.WIDTH, config.HEIGHT, resolution=50) | |
# Event queues for thread communication | |
self.event_queue = Queue() | |
self.stats_queue = Queue() | |
# Initialize organisms | |
self.organisms: List[FractalOrganism] = [] | |
self._init_organisms() | |
# Pygame threading control | |
self.running = False | |
self.thread = None | |
def _init_organisms(self): | |
"""Initialize starting organisms""" | |
try: | |
for _ in range(self.config.MIN_ORGANISMS): | |
x = random.uniform(0, self.config.WIDTH) | |
y = random.uniform(0, self.config.HEIGHT) | |
organism = FractalOrganism( | |
x=x, y=y, | |
feature_dim=32, | |
max_neurons=self.config.MAX_NEURONS | |
) | |
self.organisms.append(organism) | |
self.physics.add_organism(organism) | |
except Exception as e: | |
logging.error(f"Error initializing organisms: {e}") | |
def _process_reproduction(self): | |
"""Handle organism reproduction""" | |
try: | |
new_organisms = [] | |
for org in self.organisms: | |
if not org.alive or len(self.organisms) + len(new_organisms) >= self.config.MAX_ORGANISMS: | |
continue | |
if org.brain.total_energy > self.config.REPRODUCTION_ENERGY: | |
# Find a mate (simple random selection for demonstration) | |
potential_mates = [o for o in self.organisms if o != org and o.alive and o.brain.total_energy > self.config.REPRODUCTION_ENERGY] | |
if potential_mates: | |
mate = random.choice(potential_mates) | |
child = org.reproduce(mate, mutation_rate=self.config.MUTATION_RATE) | |
if child: | |
new_organisms.append(child) | |
self.physics.add_organism(child) | |
self.organisms.extend(new_organisms) | |
except Exception as e: | |
logging.error(f"Error processing reproduction: {e}") | |
def _update_stats(self): | |
"""Update simulation statistics""" | |
try: | |
living_organisms = [org for org in self.organisms if org.alive] | |
population = len(living_organisms) | |
if population > 0: | |
self.state.stats.update({ | |
'population': population, | |
'avg_energy': sum(org.brain.total_energy for org in living_organisms) / population, | |
'avg_neurons': sum(org.brain.total_neurons for org in living_organisms) / population, | |
'avg_connections': sum(sum(n.state.connections for n in [org.brain.visual_cortex, | |
org.brain.thought_processor, | |
org.brain.action_generator]) | |
for org in living_organisms) / population, | |
'total_interactions': len(self.physics.current_interactions) | |
}) | |
else: | |
self.state.stats.update({ | |
'population': 0, | |
'avg_energy': 0.0, | |
'avg_neurons': 0.0, | |
'avg_connections': 0.0, | |
'total_interactions': 0 | |
}) | |
with self.shared_lock: | |
self.shared_data['stats'] = self.state.stats.copy() | |
except Exception as e: | |
logging.error(f"Error updating statistics: {e}") | |
def _main_loop(self): | |
"""Main simulation loop""" | |
try: | |
clock = pygame.time.Clock() | |
while self.running: | |
dt = clock.tick(self.config.TARGET_FPS) / 1000.0 # Delta time in seconds | |
for event in pygame.event.get(): | |
if event.type == pygame.QUIT: | |
self.stop() | |
# Process external events | |
while not self.event_queue.empty(): | |
event = self.event_queue.get() | |
self._handle_event(event) | |
if not self.state.paused: | |
# Update physics | |
self.physics.update(dt) | |
# Update neural field | |
self.field.update(self.organisms) | |
# Update organisms | |
for org in self.organisms: | |
if org.alive: | |
# Update organism state | |
org.update(self.config.WIDTH, self.config.HEIGHT, self.organisms) | |
# Process field interactions (placeholder for actual implementation) | |
field_value = self.field.get_field_at(org.pos.x, org.pos.y) | |
# org.process_field_input(field_value) # Implement if needed | |
# Energy decay | |
org.brain.total_energy = max(0.0, org.brain.total_energy - self.config.ENERGY_DECAY) | |
# Process physical interactions | |
self.physics.process_interactions(self.organisms) | |
# Handle reproduction | |
self._process_reproduction() | |
# Remove dead organisms | |
self.organisms = [org for org in self.organisms if org.alive] | |
# Maintain minimum population | |
while len(self.organisms) < self.config.MIN_ORGANISMS and len(self.organisms) < self.config.MAX_ORGANISMS: | |
x = random.uniform(0, self.config.WIDTH) | |
y = random.uniform(0, self.config.HEIGHT) | |
organism = FractalOrganism( | |
x=x, y=y, | |
feature_dim=32, | |
max_neurons=self.config.MAX_NEURONS | |
) | |
self.organisms.append(organism) | |
self.physics.add_organism(organism) | |
# Update statistics | |
self._update_stats() | |
# Draw frame | |
self.visualizer.draw_frame(self.organisms, self.state.stats) | |
except Exception as e: | |
logging.error(f"Exception in main loop: {e}") | |
finally: | |
# Cleanup when simulation stops | |
self.visualizer.cleanup() | |
def start(self): | |
"""Start simulation in separate thread""" | |
if not self.running: | |
self.running = True | |
self.thread = threading.Thread(target=self._main_loop) | |
self.thread.start() | |
logging.info("Simulation started.") | |
def stop(self): | |
"""Stop simulation""" | |
self.running = False | |
if self.thread and self.thread.is_alive(): | |
self.thread.join() | |
logging.info("Simulation stopped.") | |
def pause(self): | |
"""Pause/unpause simulation""" | |
self.state.paused = not self.state.paused | |
logging.info(f"Simulation {'paused' if self.state.paused else 'resumed'}.") | |
def _handle_event(self, event: Dict): | |
"""Handle external events""" | |
try: | |
if event['type'] == 'select_organism': | |
organism_id = event['organism_id'] | |
self.state.selected_organism = next( | |
(org for org in self.organisms if id(org) == organism_id), | |
None | |
) | |
logging.info(f"Organism {organism_id} selected.") | |
elif event['type'] == 'add_energy': | |
if self.state.selected_organism: | |
self.state.selected_organism.brain.total_energy += event['amount'] | |
logging.info(f"Added {event['amount']} energy to organism {id(self.state.selected_organism)}.") | |
elif event['type'] == 'modify_neurons': | |
if self.state.selected_organism: | |
# Placeholder for neuron modification logic | |
logging.info(f"Modify neurons event received for organism {id(self.state.selected_organism)}.") | |
except Exception as e: | |
logging.error(f"Error handling event {event}: {e}") | |
# ============================== | |
# Gradio Interface | |
# ============================== | |
class FractalLifeInterface: | |
def __init__(self): | |
self.simulation: Optional[FractalLifeSimulation] = None | |
self.history = { | |
'population': [], | |
'avg_energy': [], | |
'avg_neurons': [], | |
'time': [] | |
} | |
self.selected_organism_id = None | |
self.frame_image = None | |
self.DEFAULT_WIDTH = 1024 | |
self.DEFAULT_HEIGHT = 768 | |
self.shared_data = { | |
'stats': { | |
'population': 0, | |
'avg_energy': 0.0, | |
'avg_neurons': 0.0, | |
'avg_connections': 0.0, | |
'total_interactions': 0 | |
} | |
} | |
def get_frame(self): | |
"""Retrieve the latest frame from Pygame.""" | |
try: | |
if self.simulation and self.simulation.running: | |
with self.simulation.shared_lock: | |
# Get pygame surface | |
screen = self.simulation.visualizer.screen | |
# Get the size of the screen | |
width = screen.get_width() | |
height = screen.get_height() | |
# Create a new surface with alpha channel | |
surf_alpha = pygame.Surface((width, height), pygame.SRCALPHA) | |
surf_alpha.blit(screen, (0, 0)) | |
# Convert Pygame surface to PIL Image | |
data_string = pygame.image.tostring(surf_alpha, 'RGBA') | |
image = Image.frombytes('RGBA', (width, height), data_string) | |
# Convert to RGB | |
image = image.convert('RGB') | |
return image | |
else: | |
# Return a blank image | |
return Image.new('RGB', (self.DEFAULT_WIDTH, self.DEFAULT_HEIGHT), (0, 0, 0)) | |
except Exception as e: | |
logging.error(f"Error in get_frame: {e}") | |
# Return a fallback image in case of error | |
return Image.new('RGB', (self.DEFAULT_WIDTH, self.DEFAULT_HEIGHT), (0, 0, 0)) | |
def update_display(self): | |
"""Update display by retrieving the latest frame and statistics.""" | |
try: | |
# Get the frame as PIL Image | |
image = self.get_frame() | |
# Update statistics only if simulation is running | |
if self.simulation and self.simulation.running: | |
with self.simulation.shared_lock: | |
stats = self.shared_data['stats'] | |
# Update history | |
self.history['time'].append(self.simulation.state.step_count) | |
self.history['population'].append(stats['population']) | |
self.history['avg_energy'].append(stats['avg_energy']) | |
self.history['avg_neurons'].append(stats['avg_neurons']) | |
# Limit history length to prevent memory issues | |
max_history = 1000 | |
if len(self.history['time']) > max_history: | |
for key in self.history: | |
self.history[key] = self.history[key][-max_history:] | |
else: | |
stats = self.shared_data['stats'] | |
# Update plots | |
stats_fig = self._create_stats_plot() | |
neural_fig = self._create_neural_plot() | |
# Update organism list only if simulation is running | |
organism_list = [] | |
if self.simulation and self.simulation.running: | |
organism_list = [ | |
(f"Organism {id(org)}", id(org)) | |
for org in self.simulation.organisms | |
if org.alive | |
] | |
# Update selected organism vitals | |
vitals = None | |
if self.selected_organism_id and self.simulation and self.simulation.running: | |
org = next( | |
(org for org in self.simulation.organisms | |
if id(org) == self.selected_organism_id), | |
None | |
) | |
if org: | |
vitals = { | |
'energy': org.brain.total_energy, | |
'neurons': org.brain.total_neurons, | |
'age': org.age, | |
'connections': sum(n.state.connections for n in | |
[org.brain.visual_cortex, | |
org.brain.thought_processor, | |
org.brain.action_generator]) | |
} | |
# Create new Dropdown choices instead of using update | |
dropdown = gr.Dropdown( | |
choices=organism_list, | |
label="Select Organism", | |
interactive=True | |
) | |
return [ | |
image, # Return PIL Image directly | |
stats_fig, | |
neural_fig, | |
dropdown, | |
vitals | |
] | |
except Exception as e: | |
logging.error(f"Error in update_display: {e}") | |
# Return default values in case of error | |
blank_image = Image.new('RGB', (self.DEFAULT_WIDTH, self.DEFAULT_HEIGHT), (0, 0, 0)) | |
empty_fig = go.Figure() | |
empty_dropdown = gr.Dropdown(choices=[]) | |
return [blank_image, empty_fig, empty_fig, empty_dropdown, None] | |
def create_interface(self): | |
with gr.Blocks(title="Fractal Life Simulator") as interface: | |
gr.Markdown("# 🧬 Fractal Life Simulator") | |
with gr.Row(): | |
# Main simulation view and controls | |
with gr.Column(scale=2): | |
canvas = gr.Image(label="Simulation View", interactive=False) | |
with gr.Row(): | |
start_btn = gr.Button("Start Simulation", variant="primary") | |
pause_btn = gr.Button("Pause") | |
stop_btn = gr.Button("Stop") | |
with gr.Row(): | |
population_slider = gr.Slider( | |
minimum=5, maximum=100, # Increased maximum for flexibility | |
value=10, step=1, | |
label="Initial Population" | |
) | |
mutation_rate = gr.Slider( | |
minimum=0, maximum=1, value=0.1, step=0.01, | |
label="Mutation Rate" | |
) | |
max_population_slider = gr.Slider( | |
minimum=50, maximum=500, value=50, step=10, | |
label="Max Population" | |
) | |
# Statistics and organism details | |
with gr.Column(scale=1): | |
stats_plot = gr.Plot(label="Population Statistics") | |
neural_plot = gr.Plot(label="Neural Activity") | |
with gr.Group(): | |
gr.Markdown("### Selected Organism") | |
organism_dropdown = gr.Dropdown( | |
label="Select Organism", | |
choices=[], | |
interactive=True | |
) | |
vitals_json = gr.JSON(label="Organism Vitals") | |
with gr.Row(): | |
add_energy_btn = gr.Button("Add Energy") | |
add_neurons_btn = gr.Button("Add Neurons") | |
# Advanced settings tab | |
with gr.Tab("Advanced Settings"): | |
with gr.Row(): | |
with gr.Column(): | |
brain_update_rate = gr.Slider( | |
minimum=1, maximum=30, value=10, step=1, | |
label="Brain Update Rate (Hz)" | |
) | |
max_neurons = gr.Slider( | |
minimum=100, maximum=5000, value=1000, step=100, | |
label="Max Neurons per Brain" | |
) | |
energy_decay = gr.Slider( | |
minimum=0, maximum=1, value=0.1, step=0.01, | |
label="Energy Decay Rate" | |
) | |
with gr.Column(): | |
interaction_strength = gr.Slider( | |
minimum=0, maximum=1, value=0.5, step=0.01, | |
label="Interaction Strength" | |
) | |
field_resolution = gr.Slider( | |
minimum=10, maximum=100, value=50, step=5, | |
label="Field Resolution" | |
) | |
# Event handlers | |
def start_simulation(initial_population, mutation_rate_val, max_population_val, | |
brain_update_rate_val, max_neurons_val, energy_decay_val, | |
interaction_strength_val, field_resolution_val): | |
try: | |
if self.simulation is None: | |
config = SimulationConfig() | |
config.MIN_ORGANISMS = int(initial_population) | |
config.MUTATION_RATE = mutation_rate_val | |
config.MAX_ORGANISMS = int(max_population_val) | |
config.BRAIN_UPDATE_RATE = int(brain_update_rate_val) | |
config.MAX_NEURONS = int(max_neurons_val) | |
config.ENERGY_DECAY = energy_decay_val | |
self.simulation = FractalLifeSimulation(config, self.shared_data) | |
self.simulation.start() | |
logging.info("Simulation started via interface.") | |
return "Simulation started" | |
else: | |
logging.warning("Simulation is already running.") | |
return "Simulation is already running." | |
except Exception as e: | |
logging.error(f"Error starting simulation: {e}") | |
return "Failed to start simulation." | |
def pause_simulation(): | |
try: | |
if self.simulation: | |
self.simulation.pause() | |
status = "paused" if self.simulation.state.paused else "resumed" | |
logging.info(f"Simulation {status} via interface.") | |
return f"Simulation {status}" | |
logging.warning("No simulation running to pause/resume.") | |
return "No simulation running" | |
except Exception as e: | |
logging.error(f"Error pausing simulation: {e}") | |
return "Failed to pause simulation." | |
def stop_simulation(): | |
try: | |
if self.simulation: | |
self.simulation.stop() | |
self.simulation = None | |
logging.info("Simulation stopped via interface.") | |
return "Simulation stopped" | |
logging.warning("No simulation running to stop.") | |
return "No simulation running" | |
except Exception as e: | |
logging.error(f"Error stopping simulation: {e}") | |
return "Failed to stop simulation." | |
def select_organism(organism_id): | |
try: | |
self.selected_organism_id = organism_id | |
if self.simulation: | |
self.simulation.event_queue.put({ | |
'type': 'select_organism', | |
'organism_id': organism_id | |
}) | |
logging.info(f"Organism {organism_id} selected via interface.") | |
except Exception as e: | |
logging.error(f"Error selecting organism: {e}") | |
def add_energy_to_organism(): | |
try: | |
if self.simulation and self.selected_organism_id: | |
self.simulation.event_queue.put({ | |
'type': 'add_energy', | |
'amount': 50.0 | |
}) | |
logging.info(f"Added energy to organism {self.selected_organism_id} via interface.") | |
return "Added energy to selected organism" | |
logging.warning("No organism selected or simulation not running to add energy.") | |
return "No organism selected or simulation not running" | |
except Exception as e: | |
logging.error(f"Error adding energy to organism: {e}") | |
return "Failed to add energy" | |
def add_neurons_to_organism(): | |
try: | |
if self.simulation and self.selected_organism_id: | |
self.simulation.event_queue.put({ | |
'type': 'modify_neurons', | |
'amount': 10 | |
}) | |
logging.info(f"Added neurons to organism {self.selected_organism_id} via interface.") | |
return "Added neurons to selected organism" | |
logging.warning("No organism selected or simulation not running to add neurons.") | |
return "No organism selected or simulation not running" | |
except Exception as e: | |
logging.error(f"Error adding neurons to organism: {e}") | |
return "Failed to add neurons" | |
# Create a bound method for update_display | |
def bound_update_display(): | |
return self.update_display() | |
# Connect events | |
start_btn.click( | |
start_simulation, | |
inputs=[population_slider, mutation_rate, max_population_slider, | |
brain_update_rate, max_neurons, energy_decay, | |
interaction_strength, field_resolution], | |
outputs=gr.Textbox() | |
) | |
pause_btn.click(pause_simulation, outputs=gr.Textbox()) | |
stop_btn.click(stop_simulation, outputs=gr.Textbox()) | |
organism_dropdown.change(select_organism, inputs=[organism_dropdown], outputs=None) | |
add_energy_btn.click(add_energy_to_organism, outputs=gr.Textbox()) | |
add_neurons_btn.click(add_neurons_to_organism, outputs=gr.Textbox()) | |
# Periodic update using Gradio's update mechanism | |
interface.load( | |
fn=bound_update_display, | |
inputs=[], | |
outputs=[canvas, stats_plot, neural_plot, organism_dropdown, vitals_json], | |
every=1/30, # 30 FPS updates | |
queue=True | |
) | |
return interface | |
def _create_stats_plot(self): | |
"""Create statistics plot using plotly""" | |
try: | |
fig = make_subplots(rows=3, cols=1, shared_xaxes=True, | |
subplot_titles=("Population", "Average Energy", "Average Neurons")) | |
fig.add_trace( | |
go.Scatter(x=self.history['time'], y=self.history['population'], | |
name="Population"), | |
row=1, col=1 | |
) | |
fig.add_trace( | |
go.Scatter(x=self.history['time'], y=self.history['avg_energy'], | |
name="Avg Energy"), | |
row=2, col=1 | |
) | |
fig.add_trace( | |
go.Scatter(x=self.history['time'], y=self.history['avg_neurons'], | |
name="Avg Neurons"), | |
row=3, col=1 | |
) | |
fig.update_layout(height=600, showlegend=True) | |
return fig | |
except Exception as e: | |
logging.error(f"Error creating stats plot: {e}") | |
return go.Figure() | |
def _create_neural_plot(self): | |
"""Create neural activity plot""" | |
try: | |
if self.selected_organism_id and self.simulation and self.simulation.running: | |
org = next( | |
(org for org in self.simulation.organisms | |
if id(org) == self.selected_organism_id), | |
None | |
) | |
if org: | |
# Create neural activity heatmap | |
neurons = [org.brain.visual_cortex, | |
org.brain.thought_processor, | |
org.brain.action_generator] | |
activities = [] | |
for neuron in neurons: | |
layer_activities = [child.state.activation for child in neuron.sub_neurons] if hasattr(neuron, 'sub_neurons') and neuron.sub_neurons else [neuron.state.activation] | |
activities.append(layer_activities) | |
activities = np.array(activities) | |
fig = go.Figure(data=go.Heatmap(z=activities, colorscale='Viridis')) | |
fig.update_layout( | |
title="Neural Activity", | |
xaxis_title="Neuron Index", | |
yaxis_title="Layer" | |
) | |
return fig | |
return go.Figure() | |
except Exception as e: | |
logging.error(f"Error creating neural plot: {e}") | |
return go.Figure() | |
# ============================== | |
# Entry Point | |
# ============================== | |
if __name__ == "__main__": | |
interface = FractalLifeInterface() | |
gr_interface = interface.create_interface() | |
# Enable queue and allow for public access | |
gr_interface.queue().launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
share=True # This creates a public link | |
) | |