import torch import torch.nn as nn import torch.nn.functional as F # Added for activation functions import numpy as np from dataclasses import dataclass, field import pygame import gradio as gr from typing import List, Tuple, Dict, Optional, Set import random import colorsys import pymunk import time import threading from queue import Queue import plotly.graph_objects as go from plotly.subplots import make_subplots from PIL import Image import io import logging # Added for logging # ============================== # Logging Configuration # ============================== logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', handlers=[ logging.FileHandler("simulation.log"), logging.StreamHandler() ] ) # ============================== # Configuration Dataclasses # ============================== @dataclass class SimulationConfig: WIDTH: int = 1024 HEIGHT: int = 768 TARGET_FPS: int = 60 MIN_ORGANISMS: int = 5 MAX_ORGANISMS: int = 50 # Population cap set to 50 MUTATION_RATE: float = 0.1 REPRODUCTION_ENERGY: float = 150.0 INITIAL_ENERGY: float = 100.0 BRAIN_UPDATE_RATE: int = 10 # Hz MAX_NEURONS: int = 1000 ENERGY_DECAY: float = 0.1 @dataclass class NeuronState: activation: float = 0.0 connections: int = 0 energy: float = 100.0 memory: List[float] = field(default_factory=lambda: [0.0] * 8) @dataclass class VisualizationConfig: BACKGROUND_COLOR: Tuple[int, int, int] = (10, 10, 30) NEURON_COLORS: Dict[str, Tuple[int, int, int]] = field(default_factory=lambda: { 'active': (255, 255, 0), 'inactive': (100, 100, 100), 'connected': (0, 255, 255) }) CONNECTION_COLOR: Tuple[int, int, int, int] = (50, 50, 200, 100) ENERGY_COLOR: Tuple[int, int, int] = (0, 255, 0) MAX_NEURAL_CONNECTIONS: int = 50 @dataclass class PhysicsConfig: COLLISION_TYPE_ORGANISM: int = 1 ELASTICITY: float = 0.7 FRICTION: float = 0.5 DAMPING: float = 0.9 INTERACTION_RADIUS: float = 50.0 FORCE_SCALE: float = 100.0 # ============================== # Neural Processing System # ============================== class FractalNeuron(nn.Module): def __init__(self, input_dim=16, output_dim=16, depth=0, max_depth=2): super().__init__() self.depth = depth self.max_depth = max_depth # Store dimensions self.input_dim = input_dim self.output_dim = output_dim self.hidden_dim = max(input_dim // 2, 8) # Add explicit hidden_dim # Enhanced neural processing layers with LeakyReLU self.synapse = nn.Sequential( nn.Linear(input_dim, self.hidden_dim), # First layer: input_dim to hidden_dim nn.LeakyReLU(negative_slope=0.1, inplace=True), nn.Linear(self.hidden_dim, output_dim), # Second layer: hidden_dim to output_dim nn.Tanh() ) # Initialize weights using Xavier uniform initialization for layer in self.synapse: if isinstance(layer, nn.Linear): nn.init.xavier_uniform_(layer.weight) if layer.bias is not None: nn.init.constant_(layer.bias, 0.0) # Set to eval mode to prevent BatchNorm issues self.eval() # State maintenance with bounded values self.state = NeuronState() self.state.activation = 0.0 self.state.energy = min(100.0, max(0.0, self.state.energy)) # Memory processing with correct dimensions self.memory_gate = nn.Sequential( nn.Linear(output_dim + 8, 8), nn.Sigmoid() ) # Initialize memory_gate weights for layer in self.memory_gate: if isinstance(layer, nn.Linear): nn.init.xavier_uniform_(layer.weight) if layer.bias is not None: nn.init.constant_(layer.bias, 0.0) # Child neurons with matching dimensions self.sub_neurons = nn.ModuleList([]) if depth < max_depth: branching_factor = max(1, 2 - depth) for _ in range(branching_factor): child = FractalNeuron( input_dim=output_dim, # Child's input_dim matches parent's output_dim output_dim=output_dim, # Keep output_dim consistent depth=depth + 1, max_depth=max_depth ) self.sub_neurons.append(child) def forward(self, x): """Forward pass for PyTorch module compatibility""" return self.process_signal(x) def process_signal(self, x, external_input=None): """Process input signal through the neuron""" try: with torch.no_grad(): # Ensure we're in eval mode self.eval() # Reshape input for processing if len(x.shape) == 1: x = x.unsqueeze(0) # Add batch dimension # Check for NaNs in input if torch.isnan(x).any(): logging.warning("NaN detected in input tensor. Returning zero tensor.") return torch.zeros(self.output_dim) # Add external input if provided if external_input is not None: if len(external_input.shape) == 1: external_input = external_input.unsqueeze(0) x = torch.cat([x, external_input], dim=-1) # Process through synapse with proper shapes x = x.to(torch.float32) # Ensure float32 dtype try: x = self.synapse(x) except RuntimeError as e: logging.error(f"Error in synapse processing: {e}") return torch.zeros(self.output_dim) # Update memory with bounds checking try: memory_tensor = torch.tensor(self.state.memory).to(torch.float32) if len(x.shape) == 1: x_for_memory = x.unsqueeze(0) else: x_for_memory = x memory_input = torch.cat([x_for_memory, memory_tensor.unsqueeze(0)], dim=-1) new_memory = self.memory_gate(memory_input) new_memory = torch.clamp(new_memory, 0.0, 1.0) if not torch.isnan(new_memory).any(): self.state.memory = new_memory[0].tolist() except Exception as e: logging.error(f"Error updating memory: {e}") # Update activation with bounded value activation = float(torch.clamp(x.mean(), -1.0, 1.0)) if not np.isnan(activation): self.state.activation = activation # Process through children with error handling if self.sub_neurons: child_outputs = [] for child in self.sub_neurons: try: # Ensure x has correct shape before passing to child child_input = x.squeeze(0) if len(x.shape) == 2 else x # Ensure input matches child's expected input dimension if child_input.shape[-1] != child.input_dim: child_input = child_input[:child.input_dim] child_out = child.process_signal(child_input) if not torch.isnan(child_out).any(): # Ensure child output has correct shape for stacking if len(child_out.shape) == 1: child_out = child_out.unsqueeze(0) child_outputs.append(child_out) except Exception as e: logging.error(f"Error in child neuron processing: {e}") continue if child_outputs: child_outputs = torch.stack(child_outputs) x = torch.mean(child_outputs, dim=0) x = torch.clamp(x, -1.0, 1.0) # Update energy with bounds energy_cost = 0.1 * self.depth self.state.energy = max(0.0, min(100.0, self.state.energy - energy_cost)) # Remove batch dimension if it was added if len(x.shape) == 2: x = x.squeeze(0) return x except Exception as e: logging.error(f"Error in process_signal: {e}") return torch.zeros(self.output_dim) def interact_with(self, other_neuron, strength=0.5): """Interact with another neuron""" try: # Bound strength value strength = max(0.0, min(1.0, strength)) # Share neural states with bounds shared_activation = (self.state.activation + other_neuron.state.activation) / 2 shared_activation = float(shared_activation) if np.isnan(shared_activation): logging.warning("NaN detected in shared activation. Using default value.") shared_activation = 0.0 self.state.activation = shared_activation other_neuron.state.activation = shared_activation # Share memories with bounds checking shared_memory = [] for a, b in zip(self.state.memory, other_neuron.state.memory): shared_value = (float(a) + float(b)) / 2 shared_value = max(0.0, min(1.0, shared_value)) shared_memory.append(shared_value) self.state.memory = shared_memory other_neuron.state.memory = shared_memory # Update connections with bounds max_connections = 100 self.state.connections = min(self.state.connections + 1, max_connections) other_neuron.state.connections = min(other_neuron.state.connections + 1, max_connections) return shared_activation except Exception as e: logging.error(f"Error in interact_with: {e}") return 0.0 def save_state(self): """Save the current state of the neuron""" return { 'activation': self.state.activation, 'connections': self.state.connections, 'energy': self.state.energy, 'memory': self.state.memory.copy() } def load_state(self, state_dict): """Load a previously saved state""" try: self.state.activation = state_dict['activation'] self.state.connections = state_dict['connections'] self.state.energy = state_dict['energy'] self.state.memory = state_dict['memory'].copy() except Exception as e: logging.error(f"Error loading neuron state: {e}") def clone(self): """Create a deep copy of the neuron""" try: new_neuron = FractalNeuron( input_dim=self.input_dim, output_dim=self.output_dim, depth=self.depth, max_depth=self.max_depth ) new_neuron.load_state(self.save_state()) return new_neuron except Exception as e: logging.error(f"Error cloning neuron: {e}") return None def mutate(self, mutation_rate=0.1): """Apply random mutations to the neuron""" try: with torch.no_grad(): # Mutate weights for layer in self.synapse: if isinstance(layer, nn.Linear): mask = torch.rand_like(layer.weight) < mutation_rate mutations = torch.randn_like(layer.weight) * 0.1 layer.weight.data[mask] += mutations[mask] if layer.bias is not None: mask = torch.rand_like(layer.bias) < mutation_rate mutations = torch.randn_like(layer.bias) * 0.1 layer.bias.data[mask] += mutations[mask] # Mutate memory gate for layer in self.memory_gate: if isinstance(layer, nn.Linear): mask = torch.rand_like(layer.weight) < mutation_rate mutations = torch.randn_like(layer.weight) * 0.1 layer.weight.data[mask] += mutations[mask] if layer.bias is not None: mask = torch.rand_like(layer.bias) < mutation_rate mutations = torch.randn_like(layer.bias) * 0.1 layer.bias.data[mask] += mutations[mask] # Recursively mutate child neurons for child in self.sub_neurons: child.mutate(mutation_rate) except Exception as e: logging.error(f"Error mutating neuron: {e}") class FractalBrain: def __init__(self, input_dim=32, hidden_dim=64, max_neurons=1000): self.input_dim = min(input_dim, 32) # Limit input dimension self.hidden_dim = min(hidden_dim, 64) # Limit hidden dimension self.max_neurons = min(max_neurons, 1000) # Limit maximum neurons # Core neural network components with reduced complexity self.visual_cortex = FractalNeuron(self.input_dim, self.hidden_dim, max_depth=2) self.thought_processor = FractalNeuron(self.hidden_dim, self.hidden_dim, max_depth=2) self.action_generator = FractalNeuron(self.hidden_dim, self.input_dim, max_depth=2) # State tracking with bounds self.total_neurons = self.count_neurons() self.total_energy = 100.0 # Reduced initial energy self.memories = [] self.current_vision = None def get_vitals(self): """Get vital statistics of the brain with safety checks""" try: # Calculate average activation safely activations = [] for neuron in [self.visual_cortex, self.thought_processor, self.action_generator]: try: activation = float(neuron.state.activation) if not np.isnan(activation) and not np.isinf(activation): activations.append(activation) except (AttributeError, ValueError, TypeError): activations.append(0.0) avg_activation = sum(activations) / max(len(activations), 1) avg_activation = max(-1.0, min(1.0, avg_activation)) # Get connection counts safely connections = [] for neuron in [self.visual_cortex, self.thought_processor, self.action_generator]: try: conn_count = int(neuron.state.connections) if not np.isnan(conn_count) and not np.isinf(conn_count): connections.append(conn_count) except (AttributeError, ValueError, TypeError): connections.append(0) total_connections = sum(connections) return { 'neurons': min(self.total_neurons, self.max_neurons), 'energy': max(0.0, min(1000.0, float(self.total_energy))), 'connections': max(0, min(1000, total_connections)), 'activation': avg_activation } except Exception as e: logging.error(f"Exception in get_vitals: {e}. Returning default vitals.") # Return safe default values if anything goes wrong return { 'neurons': 1, 'energy': 0.0, 'connections': 0, 'activation': 0.0 } def process_vision(self, visual_input): try: with torch.no_grad(): # Ensure input is valid and properly shaped visual_input = visual_input.clone().detach() if len(visual_input.shape) == 1: visual_input = visual_input.unsqueeze(0) # Add batch dimension if torch.isnan(visual_input).any(): logging.warning("NaN detected in visual_input. Replacing with zeros.") visual_input = torch.zeros_like(visual_input) visual_input = torch.clamp(visual_input, -10.0, 10.0) # Process through neural components with shape handling try: visual_features = self.visual_cortex.process_signal(visual_input) if len(visual_features.shape) == 1: visual_features = visual_features.unsqueeze(0) except Exception as e: logging.error(f"Exception in visual_cortex.process_signal: {e}. Using zero tensor.") visual_features = torch.zeros((1, self.hidden_dim)) try: thoughts = self.thought_processor.process_signal(visual_features) if len(thoughts.shape) == 1: thoughts = thoughts.unsqueeze(0) except Exception as e: logging.error(f"Exception in thought_processor.process_signal: {e}. Using zero tensor.") thoughts = torch.zeros((1, self.hidden_dim)) try: actions = self.action_generator.process_signal(thoughts) except Exception as e: logging.error(f"Exception in action_generator.process_signal: {e}. Using zero tensor.") actions = torch.zeros(self.input_dim) # Remove batch dimension from final output if present if len(actions.shape) > 1: actions = actions.squeeze(0) # Ensure outputs are bounded actions = torch.clamp(actions, -1.0, 1.0) # Energy consumption with bounds self.total_energy = max(0.0, min(1000.0, self.total_energy - 0.1)) return actions except Exception as e: logging.error(f"Exception in process_vision: {e}. Returning zero actions.") return torch.zeros(self.input_dim) def interact_with(self, other_brain, strength=0.5): try: # Bound strength value strength = max(0.0, min(1.0, strength)) # Neural interactions with error handling shared_visual = self.visual_cortex.interact_with(other_brain.visual_cortex, strength) shared_thoughts = self.thought_processor.interact_with(other_brain.thought_processor, strength) shared_actions = self.action_generator.interact_with(other_brain.action_generator, strength) # Energy transfer with bounds energy_diff = self.total_energy - other_brain.total_energy transfer = max(-10.0, min(10.0, energy_diff * 0.1)) self.total_energy = max(0.0, min(1000.0, self.total_energy - transfer)) other_brain.total_energy = max(0.0, min(1000.0, other_brain.total_energy + transfer)) return shared_visual, shared_thoughts, shared_actions except Exception as e: logging.error(f"Exception in interact_with: {e}. Returning zeros.") return 0.0, 0.0, 0.0 def count_neurons(self): """Safely count neurons with error handling""" try: def count_recursive(module): count = 1 if hasattr(module, 'sub_neurons'): for child in module.sub_neurons: count += count_recursive(child) return min(count, self.max_neurons) # Limit total count total = sum(count_recursive(x) for x in [ self.visual_cortex, self.thought_processor, self.action_generator ]) return min(total, self.max_neurons) except Exception as e: logging.error(f"Exception in count_neurons: {e}. Returning 1.") return 1 # Return minimum count if counting fails def can_grow(self): """Check if brain can grow new neurons""" return (self.total_neurons < self.max_neurons and self.total_energy > 100.0) # ============================== # Organism Definition and Behavior # ============================== class FractalOrganism: def __init__(self, x, y, size=20, feature_dim=32, max_neurons=1000): # Physical properties self.pos = pygame.math.Vector2(x, y) self.vel = pygame.math.Vector2(0, 0) self.acc = pygame.math.Vector2(0, 0) self.size = size self.mass = size * 0.1 # Neural system self.brain = FractalBrain(input_dim=feature_dim, hidden_dim=feature_dim*2, max_neurons=max_neurons) self.feature_dim = feature_dim self.features = torch.randn(feature_dim) # Visual properties with validation self.color = self._features_to_color() self.pattern_type = self._determine_pattern_type() self.pattern_intensity = self._determine_pattern_intensity() self.shape_points = self._generate_shape() # Life properties self.alive = True self.age = 0 def _validate_color_component(self, value): """Ensure color component is a valid integer between 0 and 255""" try: value = int(value) return max(0, min(255, value)) except (ValueError, TypeError): return 0 def update(self, screen_width, screen_height, organisms): """Update organism state""" if not self.alive: return try: # Physics integration # Update velocity with acceleration self.vel += self.acc # Apply friction/damping self.vel *= 0.98 # Slight damping to prevent infinite movement # Update position with velocity self.pos += self.vel # Clear acceleration for next frame self.acc.x = 0 self.acc.y = 0 # Get visual input and process through brain visual_input = self._get_visual_input(organisms) actions = self.brain.process_vision(visual_input) # Apply neural network outputs as forces if valid if isinstance(actions, torch.Tensor) and not torch.isnan(actions).any(): self._apply_action_forces(actions) # Wrap around screen edges self.pos.x = self.pos.x % screen_width self.pos.y = self.pos.y % screen_height # Update life properties self.age += 1 vitals = self.brain.get_vitals() # Death conditions if vitals['energy'] <= 0 or self.age > 1000: self.alive = False except Exception as e: logging.error(f"Error updating organism {id(self)}: {e}") logging.debug(f"Organism state - Age: {self.age}, Alive: {self.alive}") def _get_visual_input(self, organisms): """Create visual input tensor from surroundings""" try: visual_input = torch.zeros(self.feature_dim) # Add self-perception (first 3 features are color) color_tensor = torch.tensor([c/255.0 for c in self.color]) visual_input[:3] = color_tensor[:3] # Add velocity perception (helps with movement learning) if hasattr(self, 'vel'): velocity_magnitude = np.sqrt(self.vel.x**2 + self.vel.y**2) velocity_direction = np.arctan2(self.vel.y, self.vel.x) / np.pi if 3 < len(visual_input): visual_input[3] = float(velocity_magnitude) / 10.0 # Normalize velocity if 4 < len(visual_input): visual_input[4] = float(velocity_direction) # Add perception of nearby organisms for other in organisms: if other != self and other.alive: distance = self.pos.distance_to(other.pos) if distance < 100: # Visual range direction = (other.pos - self.pos) if direction.length() > 0: direction = direction.normalize() angle = np.arctan2(direction.y, direction.x) # Map angle to feature index idx = int((angle + np.pi) / (2 * np.pi) * (self.feature_dim - 5)) + 5 idx = min(max(5, idx), self.feature_dim - 1) # Set feature value based on distance and target's properties intensity = 1.0 - min(1.0, distance / 100) visual_input[idx] = intensity # Add information about target's energy level if visible if idx + 1 < self.feature_dim: target_energy = float(other.brain.total_energy) / 1000.0 if not np.isnan(target_energy): visual_input[idx + 1] = target_energy return visual_input except Exception as e: logging.error(f"Error in _get_visual_input: {e}") return torch.zeros(self.feature_dim) def _apply_action_forces(self, actions): """Convert neural actions to physical forces with better control""" try: if not isinstance(actions, torch.Tensor): return # Get first two action dimensions for movement control if len(actions) >= 2: # Scale force based on neural network activation activation = float(self.brain.visual_cortex.state.activation) force_scale = 20.0 # Increased for more visible movement # Convert actions to directional movement force_x = float(actions[0].item()) * force_scale * (1 + abs(activation)) force_y = float(actions[1].item()) * force_scale * (1 + abs(activation)) # Add some randomness for exploration when activation is low if abs(activation) < 0.2: force_x += random.uniform(-2.0, 2.0) # Increased random movement force_y += random.uniform(-2.0, 2.0) # Clamp forces but allow for stronger movement max_force = 40.0 # Increased maximum force force_x = max(-max_force, min(max_force, force_x)) force_y = max(-max_force, min(max_force, force_y)) # Apply the forces self.apply_force((force_x, force_y)) # Additional actions for other behaviors if len(actions) >= 4: try: # Action 3: Energy usage control energy_control = float(actions[2].item()) if energy_control > 0.8: self.brain.total_energy += energy_control * 0.1 # Action 4: Interaction strength interaction_strength = max(0, min(1, float(actions[3].item()))) if not hasattr(self, 'interaction_strength'): self.__dict__['interaction_strength'] = interaction_strength else: self.interaction_strength = interaction_strength except Exception as e: logging.error(f"Error processing additional actions: {e}") except Exception as e: logging.error(f"Error in _apply_action_forces: {e}") def apply_force(self, force): """Apply physics force with validation""" try: if isinstance(force, (tuple, list)) and len(force) >= 2: fx = float(force[0]) fy = float(force[1]) # Check for NaN if np.isnan(fx) or np.isnan(fy): return # Limit maximum force max_force = 10.0 fx = max(-max_force, min(max_force, fx)) fy = max(-max_force, min(max_force, fy)) force = pygame.math.Vector2(fx, fy) # Validate acceleration before applying new_acc = force / self.mass if not (np.isnan(new_acc.x) or np.isnan(new_acc.y)): self.acc.update(new_acc.x, new_acc.y) # Clamp acceleration max_acc = 5.0 self.acc.x = max(-max_acc, min(max_acc, self.acc.x)) self.acc.y = max(-max_acc, min(max_acc, self.acc.y)) except Exception as e: logging.error(f"Error in apply_force: {e}") def _features_to_color(self): """Convert feature vector to RGB color with validation""" try: r = self._validate_color_component((self.features[0].item() + 1) / 2 * 255) g = self._validate_color_component((self.features[1].item() + 1) / 2 * 255) b = self._validate_color_component((self.features[2].item() + 1) / 2 * 255) return (r, g, b) except (IndexError, AttributeError) as e: logging.error(f"Error in _features_to_color: {e}. Defaulting to (100, 100, 100).") return (100, 100, 100) def _determine_pattern_type(self): """Determine pattern type based on specific features""" try: # Use features 3 and 4 to determine pattern type safely if len(self.features) >= 5: feature_sum = float(self.features[3].item() + self.features[4].item()) if feature_sum > 1: return 'stripes' elif feature_sum < -1: return 'spots' else: return 'gradient' return 'gradient' # Default pattern except (IndexError, AttributeError, ValueError) as e: logging.error(f"Error in _determine_pattern_type: {e}. Defaulting to 'gradient'.") return 'gradient' # Fallback pattern def _determine_pattern_intensity(self): """Determine pattern intensity based on specific features""" try: if len(self.features) >= 6: intensity = (float(self.features[5].item()) + 1) / 2 return max(0.0, min(1.0, intensity)) return 0.5 # Default intensity except (IndexError, AttributeError, ValueError) as e: logging.error(f"Error in _determine_pattern_intensity: {e}. Defaulting to 0.5.") return 0.5 # Fallback intensity def _generate_shape(self): """Generate a polygon shape based on the pattern type""" try: points = [] if self.pattern_type == 'stripes': # Generate a star-like shape with protrusions for angle in range(0, 360, 30): rad = np.radians(angle) x = self.size * np.cos(rad) y = self.size * np.sin(rad) # Alternate between outer and inner points for stripes if (angle // 30) % 2 == 0: points.append((x * 1.2, y * 1.2)) else: points.append((x * 0.8, y * 0.8)) elif self.pattern_type == 'spots': # Generate a more circular, smooth shape with bulges for angle in range(0, 360, 45): rad = np.radians(angle) x = self.size * (1 + 0.3 * np.sin(4 * rad)) * np.cos(rad) y = self.size * (1 + 0.3 * np.sin(4 * rad)) * np.sin(rad) points.append((x, y)) else: # 'gradient' or other patterns # Simple regular polygon for angle in range(0, 360, 60): rad = np.radians(angle) x = self.size * np.cos(rad) y = self.size * np.sin(rad) points.append((x, y)) # Validate points and ensure we have at least a triangle if len(points) < 3: # Fallback to basic triangle points = [ (-self.size, -self.size), (self.size, -self.size), (0, self.size) ] return points except Exception as e: logging.error(f"Error in _generate_shape: {e}. Defaulting to basic triangle.") # Fallback to basic triangle if anything goes wrong return [ (-self.size, -self.size), (self.size, -self.size), (0, self.size) ] def reproduce(self, mate, mutation_rate=0.1): """Reproduce with another organism to create a child organism with possible mutations""" try: # Check reproduction energy requirements if not hasattr(self.brain, 'REPRODUCTION_ENERGY'): self.brain.REPRODUCTION_ENERGY = 150.0 # Default value if not set if self.brain.total_energy < self.brain.REPRODUCTION_ENERGY or mate.brain.total_energy < mate.brain.REPRODUCTION_ENERGY: return None # Deduct energy for reproduction self.brain.total_energy -= 50.0 mate.brain.total_energy -= 50.0 # Blend features child_features = (self.features + mate.features) / 2 # Apply mutations for i in range(len(child_features)): if random.random() < mutation_rate: child_features[i] += random.uniform(-0.1, 0.1) # Clamp mutated features to prevent extreme values child_features = torch.clamp(child_features, -1.0, 1.0) # Create child organism child = FractalOrganism( x=(self.pos.x + mate.pos.x) / 2 + random.uniform(-10, 10), y=(self.pos.y + mate.pos.y) / 2 + random.uniform(-10, 10), size=self.size, feature_dim=self.feature_dim, max_neurons=self.brain.max_neurons ) child.features = child_features child.color = child._features_to_color() child.pattern_type = child._determine_pattern_type() child.pattern_intensity = child._determine_pattern_intensity() child.shape_points = child._generate_shape() child.brain = self._mutate_brain(mate.brain, mutation_rate) return child except Exception as e: logging.error(f"Error in reproduction: {e}") return None def _mutate_brain(self, brain, mutation_rate): """Mutate the brain's neurons""" try: # For simplicity, we can randomly add connections or adjust activation # Here, we'll randomly adjust activation levels brain.visual_cortex.state.activation += random.uniform(-0.1, 0.1) brain.thought_processor.state.activation += random.uniform(-0.1, 0.1) brain.action_generator.state.activation += random.uniform(-0.1, 0.1) # Ensure activations stay in valid range brain.visual_cortex.state.activation = max(-1.0, min(1.0, brain.visual_cortex.state.activation)) brain.thought_processor.state.activation = max(-1.0, min(1.0, brain.thought_processor.state.activation)) brain.action_generator.state.activation = max(-1.0, min(1.0, brain.action_generator.state.activation)) return brain except Exception as e: logging.error(f"Error in brain mutation: {e}. Returning unmutated brain.") return brain def interact_with(self, other): """Interact with another organism""" try: distance = self.pos.distance_to(other.pos) if distance < self.size + other.size: # Neural interaction interaction_strength = 1.0 - distance / (self.size + other.size) self.brain.interact_with(other.brain, interaction_strength) # Physical interaction (simple collision) direction = (self.pos - other.pos).normalize() force = direction * interaction_strength * 5 self.apply_force(force) other.apply_force(-force) return True return False except Exception as e: logging.error(f"Error in organism interaction: {e}") return False def _blend_patterns(self, pattern1: str, pattern2: str) -> str: """Blend two pattern types to create a new pattern type""" try: if pattern1 == pattern2: return pattern1 else: # Simple blending logic: randomly choose one of the parent patterns or a new pattern return random.choice([pattern1, pattern2, 'stripes', 'spots', 'gradient']) except Exception as e: logging.error(f"Error in _blend_patterns: {e}. Defaulting to 'gradient'.") return 'gradient' # Default pattern if anything goes wrong # ============================== # Physics and Interaction Handling # ============================== class PhysicsEngine: def __init__(self, width: int, height: int, config: PhysicsConfig): self.config = config # Initialize pymunk space self.space = pymunk.Space() self.space.damping = self.config.DAMPING # Create boundaries self.create_boundaries(width, height) # Collision handler for organisms handler = self.space.add_collision_handler( self.config.COLLISION_TYPE_ORGANISM, self.config.COLLISION_TYPE_ORGANISM ) handler.begin = self.handle_collision # Track interactions self.current_interactions: Set[tuple] = set() # Store dimensions self.width = width self.height = height def update(self, dt: float): """Update physics simulation""" try: # Pymunk works best with a fixed time step fixed_dt = 1.0 / 60.0 steps = max(1, min(4, int(dt / fixed_dt))) # Limit max steps to prevent spiral for _ in range(steps): self.space.step(fixed_dt) # Update organism positions from physics bodies for body in self.space.bodies: if hasattr(body, 'organism'): try: organism = body.organism # Validate positions if not (np.isnan(body.position.x) or np.isnan(body.position.y)): new_x = float(body.position.x % self.width) new_y = float(body.position.y % self.height) # Update pygame Vector2 position organism.pos.update(new_x, new_y) else: # Reset to center if NaN body.position = self.width/2, self.height/2 organism.pos.update(self.width/2, self.height/2) # Validate velocities if not (np.isnan(body.velocity.x) or np.isnan(body.velocity.y)): max_velocity = 200.0 vx = max(-max_velocity, min(max_velocity, body.velocity.x)) vy = max(-max_velocity, min(max_velocity, body.velocity.y)) # Update pygame Vector2 velocity organism.vel.update(vx, vy) else: body.velocity = (0, 0) organism.vel.update(0, 0) except Exception as e: logging.error(f"Error updating organism physics state: {e}") # Reset body to safe state body.position = self.width/2, self.height/2 body.velocity = (0, 0) try: organism.pos.update(self.width/2, self.height/2) organism.vel.update(0, 0) except: pass except Exception as e: logging.error(f"Error updating physics: {e}") def create_boundaries(self, width: int, height: int): """Create screen boundaries""" try: walls = [ [(0, 0), (width, 0)], # Top [(width, 0), (width, height)], # Right [(width, height), (0, height)], # Bottom [(0, height), (0, 0)] # Left ] for wall in walls: shape = pymunk.Segment(self.space.static_body, wall[0], wall[1], 0) shape.elasticity = self.config.ELASTICITY shape.friction = self.config.FRICTION self.space.add(shape) except Exception as e: logging.error(f"Error creating boundaries: {e}") def add_organism(self, organism: FractalOrganism) -> pymunk.Body: """Add organism to physics space""" try: # Validate mass mass = max(0.1, organism.mass) # Ensure positive mass moment = pymunk.moment_for_circle(mass, 0, organism.size) body = pymunk.Body(mass, moment) # Validate initial position and velocity body.position = ( float(organism.pos.x % self.width), float(organism.pos.y % self.height) ) # Clamp initial velocity max_initial_velocity = 50.0 vel_x = max(-max_initial_velocity, min(max_initial_velocity, organism.vel.x)) vel_y = max(-max_initial_velocity, min(max_initial_velocity, organism.vel.y)) body.velocity = (vel_x, vel_y) # Validate shape points and create polygon valid_vertices = [] for x, y in organism.shape_points: if not (np.isnan(x) or np.isnan(y)): valid_vertices.append((float(x), float(y))) # If insufficient valid vertices, create default circle shape if len(valid_vertices) < 3: logging.warning(f"Insufficient valid vertices for organism {id(organism)}, using circle shape") shape = pymunk.Circle(body, organism.size) else: shape = pymunk.Poly(body, valid_vertices) shape.elasticity = self.config.ELASTICITY shape.friction = self.config.FRICTION shape.collision_type = self.config.COLLISION_TYPE_ORGANISM # Store reference to organism shape.organism = organism body.organism = organism self.space.add(body, shape) return body except Exception as e: logging.error(f"Error adding organism to physics: {e}") return None def handle_collision(self, arbiter, space, data): """Handle collision between organisms""" try: # Get colliding organisms shape_a, shape_b = arbiter.shapes org_a, org_b = shape_a.organism, shape_b.organism # Add to interaction set interaction_pair = tuple(sorted([id(org_a), id(org_b)])) self.current_interactions.add(interaction_pair) # Calculate collision response with validation restitution = max(0, min(1, self.config.ELASTICITY)) j = -(1 + restitution) * arbiter.total_ke / 2 # Validate impulse if not np.isnan(j): # Clamp impulse to prevent extreme values max_impulse = 1000.0 j = max(-max_impulse, min(max_impulse, j)) body_a = shape_a.body body_b = shape_b.body normal = arbiter.normal point = arbiter.contact_point_set.points[0].point_a # Apply impulse along the collision normal body_a.apply_impulse_at_world_point(j * normal, point) body_b.apply_impulse_at_world_point(-j * normal, point) return True except Exception as e: logging.error(f"Error handling collision: {e}") return False def process_interactions(self, organisms: List[FractalOrganism]): """Process all current interactions""" try: # Process collision-based interactions for org_a_id, org_b_id in self.current_interactions: org_a = next((org for org in organisms if id(org) == org_a_id), None) org_b = next((org for org in organisms if id(org) == org_b_id), None) if org_a and org_b and org_a.alive and org_b.alive: # Neural interaction shared_thoughts = org_a.brain.interact_with(org_b.brain) # Energy transfer based on neural activity energy_diff = org_a.brain.total_energy - org_b.brain.total_energy transfer = max(-10.0, min(10.0, energy_diff * 0.1)) # Limit transfer amount org_a.brain.total_energy = max(0, org_a.brain.total_energy - transfer) org_b.brain.total_energy = max(0, org_b.brain.total_energy + transfer) # Clear interactions for next frame self.current_interactions.clear() # Process proximity-based interactions for i, org_a in enumerate(organisms): if not org_a.alive: continue for org_b in organisms[i+1:]: if not org_b.alive: continue # Calculate distance with validation dx = org_b.pos.x - org_a.pos.x dy = org_b.pos.y - org_a.pos.y if np.isnan(dx) or np.isnan(dy): continue distance = np.sqrt(dx*dx + dy*dy) if distance < self.config.INTERACTION_RADIUS: # Calculate interaction strength based on distance strength = 1.0 - (distance / self.config.INTERACTION_RADIUS) # Neural field effect with reduced strength field_interaction = strength * 0.5 # Calculate force with validation force_magnitude = field_interaction * self.config.FORCE_SCALE force_angle = np.arctan2(dy, dx) if not (np.isnan(force_magnitude) or np.isnan(force_angle)): force_x = np.cos(force_angle) * force_magnitude force_y = np.sin(force_angle) * force_magnitude # Clamp forces max_force = 100.0 force_x = max(-max_force, min(max_force, force_x)) force_y = max(-max_force, min(max_force, force_y)) # Apply forces through physics bodies body_a = next((body for body in self.space.bodies if hasattr(body, 'organism') and body.organism == org_a), None) body_b = next((body for body in self.space.bodies if hasattr(body, 'organism') and body.organism == org_b), None) if body_a and body_b: body_a.apply_force_at_local_point((-force_x, -force_y), (0, 0)) body_b.apply_force_at_local_point((force_x, force_y), (0, 0)) # Apply direct forces to organisms as well org_a.apply_force((-force_x, -force_y)) org_b.apply_force((force_x, force_y)) except Exception as e: logging.error(f"Error processing interactions: {e}") # ============================== # Visualization with PyGame # ============================== class NeuralVisualizer: def __init__(self, width: int, height: int, config: VisualizationConfig): self.width = width self.height = height self.config = config self.neuron_surface = pygame.Surface((width, height), pygame.SRCALPHA) self.connection_surface = pygame.Surface((width, height), pygame.SRCALPHA) def _apply_pattern_overlay(self, organism, surface): """Apply visual pattern overlay based on organism type""" try: if not organism.alive: return pattern_alpha = int(255 * organism.pattern_intensity) pattern_color = ( 255 - organism.color[0], 255 - organism.color[1], 255 - organism.color[2], pattern_alpha ) # Create pattern surface pattern_surface = pygame.Surface((self.width, self.height), pygame.SRCALPHA) if organism.pattern_type == 'stripes': # Draw alternating stripes stride = max(5, int(organism.size * 0.5)) x, y = int(organism.pos.x), int(organism.pos.y) for i in range(-2, 3): offset = i * stride pygame.draw.line(pattern_surface, pattern_color, (x - organism.size, y + offset), (x + organism.size, y + offset), 2) elif organism.pattern_type == 'spots': # Draw spots in a circular pattern x, y = int(organism.pos.x), int(organism.pos.y) spot_size = max(2, int(organism.size * 0.2)) for angle in range(0, 360, 45): spot_x = x + int(np.cos(np.radians(angle)) * organism.size * 0.7) spot_y = y + int(np.sin(np.radians(angle)) * organism.size * 0.7) pygame.draw.circle(pattern_surface, pattern_color, (spot_x, spot_y), spot_size) else: # gradient # Draw radial gradient x, y = int(organism.pos.x), int(organism.pos.y) max_radius = int(organism.size * 1.2) for radius in range(max_radius, 0, -2): alpha = int((radius / max_radius) * pattern_alpha) current_color = (*pattern_color[:3], alpha) pygame.draw.circle(pattern_surface, current_color, (x, y), radius, 1) # Blend pattern with surface surface.blit(pattern_surface, (0, 0), special_flags=pygame.BLEND_ALPHA_SDL2) except Exception as e: logging.error(f"Error applying pattern overlay: {e}") def draw_brain_state(self, organism: FractalOrganism, surface: pygame.Surface): """Draw neural activity visualization with patterns and NaN handling""" try: # Clear previous state self.neuron_surface.fill((0, 0, 0, 0)) self.connection_surface.fill((0, 0, 0, 0)) # Get brain vitals with NaN check vitals = organism.brain.get_vitals() if any(np.isnan(value) for value in vitals.values() if isinstance(value, (int, float))): logging.warning(f"NaN detected in vitals for organism {id(organism)}. Marking for death.") organism.alive = False return # Calculate neural positions based on fractal pattern def plot_neural_layer(center, radius, neurons, depth=0): if depth >= 3 or not neurons: return # Convert neurons to list if it's a ModuleList if hasattr(neurons, 'sub_neurons'): neurons = neurons.sub_neurons if not neurons or len(neurons) == 0: return angle_step = 2 * np.pi / len(neurons) for i, neuron in enumerate(neurons): try: angle = i * angle_step x = center[0] + radius * np.cos(angle) y = center[1] + radius * np.sin(angle) # NaN check for coordinates if np.isnan(x) or np.isnan(y): logging.warning(f"NaN coordinates detected for neuron {i}. Skipping.") continue # Draw neuron activation = float(neuron.state.activation) connections = int(neuron.state.connections) # Check for NaNs in neuron state if np.isnan(activation) or np.isnan(connections): logging.warning(f"NaN detected in neuron state. Marking organism for death.") organism.alive = False return # Ensure coordinates are valid integers x_pos = int(np.clip(x, 0, self.width)) y_pos = int(np.clip(y, 0, self.height)) color = self._get_neuron_color(activation, connections) pygame.draw.circle(self.neuron_surface, color, (x_pos, y_pos), 5) # Draw connections with safety checks if connections > 0: alpha = int(255 * min(connections / self.config.MAX_NEURAL_CONNECTIONS, 1)) if not np.isnan(alpha): connection_color = (*self.config.CONNECTION_COLOR[:3], alpha) pygame.draw.line( self.connection_surface, connection_color, (x_pos, y_pos), (int(center[0]), int(center[1])), 2 ) # Recursively draw sub-neurons if hasattr(neuron, 'sub_neurons') and neuron.sub_neurons: child_radius = radius * 0.5 child_center = (x, y) plot_neural_layer(child_center, child_radius, neuron.sub_neurons, depth + 1) except Exception as e: logging.error(f"Error plotting neuron {i}: {e}") continue # Draw neural network try: center = (organism.pos.x, organism.pos.y) if not (np.isnan(center[0]) or np.isnan(center[1])): plot_neural_layer(center, organism.size * 2, [organism.brain.visual_cortex, organism.brain.thought_processor, organism.brain.action_generator]) except Exception as e: logging.error(f"Error plotting neural network: {e}") # Apply pattern overlay with safety checks try: self._apply_pattern_overlay(organism, surface) except Exception as e: logging.error(f"Error applying pattern overlay: {e}") # Combine surfaces with alpha blending surface.blit(self.connection_surface, (0, 0)) surface.blit(self.neuron_surface, (0, 0)) except Exception as e: logging.error(f"Error in draw_brain_state: {e}") def _get_neuron_color(self, activation: float, connections: int) -> Tuple[int, int, int]: """Generate color based on neuron state""" try: # Use HSV color space for smooth transitions hue = (activation + 1) / 2 # Map -1,1 to 0,1 saturation = min(connections / self.config.MAX_NEURAL_CONNECTIONS, 1) value = 0.8 + 0.2 * activation # Convert to RGB rgb = colorsys.hsv_to_rgb(hue, saturation, value) return tuple(int(255 * x) for x in rgb) except Exception as e: logging.error(f"Error in _get_neuron_color: {e}. Defaulting to gray.") return (100, 100, 100) class SimulationVisualizer: def __init__(self, width: int, height: int, config: VisualizationConfig): pygame.init() self.width = width self.height = height self.config = config # Enable double buffering and vsync self.screen = pygame.display.set_mode( (width, height), pygame.DOUBLEBUF | pygame.HWSURFACE | pygame.SCALED, vsync=1 ) pygame.display.set_caption("Fractal Life Simulation") # Create off-screen surfaces for double buffering self.buffer = pygame.Surface((width, height), pygame.SRCALPHA) self.neural_viz = NeuralVisualizer(width, height, config) self.background = pygame.Surface((width, height)) self.background.fill(config.BACKGROUND_COLOR) # Additional surfaces for layered rendering self.organism_surface = pygame.Surface((width, height), pygame.SRCALPHA) self.interaction_surface = pygame.Surface((width, height), pygame.SRCALPHA) self.stats_surface = pygame.Surface((width, height), pygame.SRCALPHA) def _validate_color(self, color): """Validate and ensure color values are proper RGB integers""" try: if len(color) >= 3: return ( max(0, min(255, int(color[0]))), max(0, min(255, int(color[1]))), max(0, min(255, int(color[2]))) ) return (100, 100, 100) # Default fallback color except Exception as e: logging.error(f"Error validating color: {e}") return (100, 100, 100) def draw_frame(self, organisms: List[FractalOrganism], stats: Dict): """Draw complete frame with all visualizations""" try: # Clear all off-screen surfaces self.organism_surface.fill((0, 0, 0, 0)) self.interaction_surface.fill((0, 0, 0, 0)) self.stats_surface.fill((0, 0, 0, 0)) # Draw organisms and their neural states for organism in organisms: if organism.alive: self._draw_organism(organism, self.organism_surface) self.neural_viz.draw_brain_state(organism, self.interaction_surface) # Draw statistics self._draw_stats(stats, self.stats_surface) # Blit off-screen surfaces to the main display surface self.screen.blit(self.background, (0, 0)) # Background layer self.screen.blit(self.organism_surface, (0, 0)) self.screen.blit(self.interaction_surface, (0, 0)) self.screen.blit(self.stats_surface, (0, 0)) # Flip display buffers to avoid flickering pygame.display.flip() except Exception as e: logging.error(f"Error in draw_frame: {e}") def _draw_organism(self, organism: FractalOrganism, surface: pygame.Surface): """Draw organism body with color and pattern""" try: # Validate color before drawing safe_color = self._validate_color(organism.color) # Draw shape with validated color points = [] for x, y in organism.shape_points: try: px = organism.pos.x + x py = organism.pos.y + y if not (np.isnan(px) or np.isnan(py)): points.append((px, py)) else: logging.warning(f"NaN detected in shape points for organism {id(organism)}. Skipping point.") except Exception as e: logging.error(f"Error processing shape points for organism {id(organism)}: {e}") continue if len(points) >= 3: pygame.draw.polygon(surface, safe_color, points) else: logging.warning(f"Insufficient valid points to draw organism {id(organism)}. Skipping drawing.") # Draw energy bar energy_percentage = min(max(organism.brain.total_energy / 1000.0, 0), 1) bar_width = organism.size * 2 bar_height = 4 bar_pos = (organism.pos.x - bar_width / 2, organism.pos.y - organism.size - 10) pygame.draw.rect(surface, (50, 50, 50), (*bar_pos, bar_width, bar_height)) pygame.draw.rect(surface, (0, 255, 0), # Using direct color value for energy bar (*bar_pos, bar_width * energy_percentage, bar_height)) except Exception as e: logging.error(f"Error in _draw_organism: {e}") def _draw_stats(self, stats: Dict, surface: pygame.Surface): """Draw simulation statistics""" try: font = pygame.font.Font(None, 24) y_pos = 10 for key, value in stats.items(): text = font.render(f"{key.capitalize()}: {value}", True, (255, 255, 255)) surface.blit(text, (10, y_pos)) y_pos += 25 except Exception as e: logging.error(f"Error in _draw_stats: {e}") def cleanup(self): """Clean up pygame resources""" try: pygame.quit() except Exception as e: logging.error(f"Error during pygame cleanup: {e}") # ============================== # Interaction Field (Optional Enhancement) # ============================== class InteractionField: def __init__(self, width: int, height: int, resolution: int = 50): self.width = max(1, width) self.height = max(1, height) self.resolution = max(10, min(resolution, 100)) # Bound resolution # Create field grid with safe dimensions self.grid_w = max(1, self.width // self.resolution) self.grid_h = max(1, self.height // self.resolution) self.field = np.zeros((self.grid_w, self.grid_h, 3)) def update(self, organisms: List[FractalOrganism]): """Update field based on organism neural activity with safety checks""" try: # Decay field self.field *= 0.9 np.clip(self.field, 0, 1, out=self.field) for org in organisms: if not org.alive: continue try: # Safe position to grid conversion pos_x = max(0, min(float(org.pos.x), self.width)) pos_y = max(0, min(float(org.pos.y), self.height)) grid_x = int((pos_x / self.width) * (self.grid_w - 1)) grid_y = int((pos_y / self.height) * (self.grid_h - 1)) # Get neural activity with safety checks vitals = org.brain.get_vitals() activity_color = np.array([ max(0.0, min(1.0, float(vitals['activation']))), max(0.0, min(1.0, float(vitals['energy']) / 1000.0)), max(0.0, min(1.0, float(vitals['connections']) / 100.0)) ]) # Apply to field with falloff for dx in range(-2, 3): for dy in range(-2, 3): x = (grid_x + dx) % self.grid_w y = (grid_y + dy) % self.grid_h distance = np.sqrt(dx * dx + dy * dy) if distance < 3: intensity = max(0.0, min(1.0, (3 - distance) / 3)) self.field[x, y] += activity_color * intensity except (ValueError, TypeError, ZeroDivisionError) as e: logging.error(f"Error updating field for organism {id(org)}: {e}") continue # Skip problematic organisms # Ensure field values stay in valid range np.clip(self.field, 0, 1, out=self.field) except Exception as e: logging.error(f"Error in InteractionField.update: {e}") def get_field_at(self, x: float, y: float) -> np.ndarray: """Safely get field value at position""" try: x = max(0, min(float(x), self.width)) y = max(0, min(float(y), self.height)) grid_x = int((x / self.width) * (self.grid_w - 1)) grid_y = int((y / self.height) * (self.grid_h - 1)) return self.field[grid_x, grid_y] except (ValueError, TypeError, IndexError) as e: logging.error(f"Error in get_field_at for position ({x}, {y}): {e}") return np.zeros(3) # ============================== # Simulation State Tracking # ============================== class SimulationState: """Tracks the current state of the simulation""" def __init__(self): self.running = False self.paused = False self.step_count = 0 self.stats = { 'population': 0, 'avg_energy': 0.0, 'avg_neurons': 0.0, 'avg_connections': 0.0, 'total_interactions': 0 } self.selected_organism: Optional[FractalOrganism] = None # ============================== # Main Simulation Class # ============================== class FractalLifeSimulation: def __init__(self, config: SimulationConfig, shared_data: Dict): self.config = config self.state = SimulationState() # Shared data for Gradio interface self.shared_data = shared_data self.shared_lock = threading.Lock() # Initialize systems visualization_config = VisualizationConfig() self.visualizer = SimulationVisualizer(config.WIDTH, config.HEIGHT, visualization_config) self.physics = PhysicsEngine(config.WIDTH, config.HEIGHT, PhysicsConfig()) self.field = InteractionField(config.WIDTH, config.HEIGHT, resolution=50) # Event queues for thread communication self.event_queue = Queue() self.stats_queue = Queue() # Initialize organisms self.organisms: List[FractalOrganism] = [] self._init_organisms() # Pygame threading control self.running = False self.thread = None def _init_organisms(self): """Initialize starting organisms""" try: for _ in range(self.config.MIN_ORGANISMS): x = random.uniform(0, self.config.WIDTH) y = random.uniform(0, self.config.HEIGHT) organism = FractalOrganism( x=x, y=y, feature_dim=32, max_neurons=self.config.MAX_NEURONS ) self.organisms.append(organism) self.physics.add_organism(organism) except Exception as e: logging.error(f"Error initializing organisms: {e}") def _process_reproduction(self): """Handle organism reproduction""" try: new_organisms = [] for org in self.organisms: if not org.alive or len(self.organisms) + len(new_organisms) >= self.config.MAX_ORGANISMS: continue if org.brain.total_energy > self.config.REPRODUCTION_ENERGY: # Find a mate (simple random selection for demonstration) potential_mates = [o for o in self.organisms if o != org and o.alive and o.brain.total_energy > self.config.REPRODUCTION_ENERGY] if potential_mates: mate = random.choice(potential_mates) child = org.reproduce(mate, mutation_rate=self.config.MUTATION_RATE) if child: new_organisms.append(child) self.physics.add_organism(child) self.organisms.extend(new_organisms) except Exception as e: logging.error(f"Error processing reproduction: {e}") def _update_stats(self): """Update simulation statistics""" try: living_organisms = [org for org in self.organisms if org.alive] population = len(living_organisms) if population > 0: self.state.stats.update({ 'population': population, 'avg_energy': sum(org.brain.total_energy for org in living_organisms) / population, 'avg_neurons': sum(org.brain.total_neurons for org in living_organisms) / population, 'avg_connections': sum(sum(n.state.connections for n in [org.brain.visual_cortex, org.brain.thought_processor, org.brain.action_generator]) for org in living_organisms) / population, 'total_interactions': len(self.physics.current_interactions) }) else: self.state.stats.update({ 'population': 0, 'avg_energy': 0.0, 'avg_neurons': 0.0, 'avg_connections': 0.0, 'total_interactions': 0 }) with self.shared_lock: self.shared_data['stats'] = self.state.stats.copy() except Exception as e: logging.error(f"Error updating statistics: {e}") def _main_loop(self): """Main simulation loop""" try: clock = pygame.time.Clock() while self.running: dt = clock.tick(self.config.TARGET_FPS) / 1000.0 # Delta time in seconds for event in pygame.event.get(): if event.type == pygame.QUIT: self.stop() # Process external events while not self.event_queue.empty(): event = self.event_queue.get() self._handle_event(event) if not self.state.paused: # Update physics self.physics.update(dt) # Update neural field self.field.update(self.organisms) # Update organisms for org in self.organisms: if org.alive: # Update organism state org.update(self.config.WIDTH, self.config.HEIGHT, self.organisms) # Process field interactions (placeholder for actual implementation) field_value = self.field.get_field_at(org.pos.x, org.pos.y) # org.process_field_input(field_value) # Implement if needed # Energy decay org.brain.total_energy = max(0.0, org.brain.total_energy - self.config.ENERGY_DECAY) # Process physical interactions self.physics.process_interactions(self.organisms) # Handle reproduction self._process_reproduction() # Remove dead organisms self.organisms = [org for org in self.organisms if org.alive] # Maintain minimum population while len(self.organisms) < self.config.MIN_ORGANISMS and len(self.organisms) < self.config.MAX_ORGANISMS: x = random.uniform(0, self.config.WIDTH) y = random.uniform(0, self.config.HEIGHT) organism = FractalOrganism( x=x, y=y, feature_dim=32, max_neurons=self.config.MAX_NEURONS ) self.organisms.append(organism) self.physics.add_organism(organism) # Update statistics self._update_stats() # Draw frame self.visualizer.draw_frame(self.organisms, self.state.stats) except Exception as e: logging.error(f"Exception in main loop: {e}") finally: # Cleanup when simulation stops self.visualizer.cleanup() def start(self): """Start simulation in separate thread""" if not self.running: self.running = True self.thread = threading.Thread(target=self._main_loop) self.thread.start() logging.info("Simulation started.") def stop(self): """Stop simulation""" self.running = False if self.thread and self.thread.is_alive(): self.thread.join() logging.info("Simulation stopped.") def pause(self): """Pause/unpause simulation""" self.state.paused = not self.state.paused logging.info(f"Simulation {'paused' if self.state.paused else 'resumed'}.") def _handle_event(self, event: Dict): """Handle external events""" try: if event['type'] == 'select_organism': organism_id = event['organism_id'] self.state.selected_organism = next( (org for org in self.organisms if id(org) == organism_id), None ) logging.info(f"Organism {organism_id} selected.") elif event['type'] == 'add_energy': if self.state.selected_organism: self.state.selected_organism.brain.total_energy += event['amount'] logging.info(f"Added {event['amount']} energy to organism {id(self.state.selected_organism)}.") elif event['type'] == 'modify_neurons': if self.state.selected_organism: # Placeholder for neuron modification logic logging.info(f"Modify neurons event received for organism {id(self.state.selected_organism)}.") except Exception as e: logging.error(f"Error handling event {event}: {e}") # ============================== # Gradio Interface # ============================== class FractalLifeInterface: def __init__(self): self.simulation: Optional[FractalLifeSimulation] = None self.history = { 'population': [], 'avg_energy': [], 'avg_neurons': [], 'time': [] } self.selected_organism_id = None self.frame_image = None self.DEFAULT_WIDTH = 1024 self.DEFAULT_HEIGHT = 768 self.shared_data = { 'stats': { 'population': 0, 'avg_energy': 0.0, 'avg_neurons': 0.0, 'avg_connections': 0.0, 'total_interactions': 0 } } def get_frame(self): """Retrieve the latest frame from Pygame.""" try: if self.simulation and self.simulation.running: with self.simulation.shared_lock: # Get pygame surface screen = self.simulation.visualizer.screen # Get the size of the screen width = screen.get_width() height = screen.get_height() # Create a new surface with alpha channel surf_alpha = pygame.Surface((width, height), pygame.SRCALPHA) surf_alpha.blit(screen, (0, 0)) # Convert Pygame surface to PIL Image data_string = pygame.image.tostring(surf_alpha, 'RGBA') image = Image.frombytes('RGBA', (width, height), data_string) # Convert to RGB image = image.convert('RGB') return image else: # Return a blank image return Image.new('RGB', (self.DEFAULT_WIDTH, self.DEFAULT_HEIGHT), (0, 0, 0)) except Exception as e: logging.error(f"Error in get_frame: {e}") # Return a fallback image in case of error return Image.new('RGB', (self.DEFAULT_WIDTH, self.DEFAULT_HEIGHT), (0, 0, 0)) def update_display(self): """Update display by retrieving the latest frame and statistics.""" try: # Get the frame as PIL Image image = self.get_frame() # Update statistics only if simulation is running if self.simulation and self.simulation.running: with self.simulation.shared_lock: stats = self.shared_data['stats'] # Update history self.history['time'].append(self.simulation.state.step_count) self.history['population'].append(stats['population']) self.history['avg_energy'].append(stats['avg_energy']) self.history['avg_neurons'].append(stats['avg_neurons']) # Limit history length to prevent memory issues max_history = 1000 if len(self.history['time']) > max_history: for key in self.history: self.history[key] = self.history[key][-max_history:] else: stats = self.shared_data['stats'] # Update plots stats_fig = self._create_stats_plot() neural_fig = self._create_neural_plot() # Update organism list only if simulation is running organism_list = [] if self.simulation and self.simulation.running: organism_list = [ (f"Organism {id(org)}", id(org)) for org in self.simulation.organisms if org.alive ] # Update selected organism vitals vitals = None if self.selected_organism_id and self.simulation and self.simulation.running: org = next( (org for org in self.simulation.organisms if id(org) == self.selected_organism_id), None ) if org: vitals = { 'energy': org.brain.total_energy, 'neurons': org.brain.total_neurons, 'age': org.age, 'connections': sum(n.state.connections for n in [org.brain.visual_cortex, org.brain.thought_processor, org.brain.action_generator]) } # Create new Dropdown choices instead of using update dropdown = gr.Dropdown( choices=organism_list, label="Select Organism", interactive=True ) return [ image, # Return PIL Image directly stats_fig, neural_fig, dropdown, vitals ] except Exception as e: logging.error(f"Error in update_display: {e}") # Return default values in case of error blank_image = Image.new('RGB', (self.DEFAULT_WIDTH, self.DEFAULT_HEIGHT), (0, 0, 0)) empty_fig = go.Figure() empty_dropdown = gr.Dropdown(choices=[]) return [blank_image, empty_fig, empty_fig, empty_dropdown, None] def create_interface(self): with gr.Blocks(title="Fractal Life Simulator") as interface: gr.Markdown("# 🧬 Fractal Life Simulator") with gr.Row(): # Main simulation view and controls with gr.Column(scale=2): canvas = gr.Image(label="Simulation View", interactive=False) with gr.Row(): start_btn = gr.Button("Start Simulation", variant="primary") pause_btn = gr.Button("Pause") stop_btn = gr.Button("Stop") with gr.Row(): population_slider = gr.Slider( minimum=5, maximum=100, # Increased maximum for flexibility value=10, step=1, label="Initial Population" ) mutation_rate = gr.Slider( minimum=0, maximum=1, value=0.1, step=0.01, label="Mutation Rate" ) max_population_slider = gr.Slider( minimum=50, maximum=500, value=50, step=10, label="Max Population" ) # Statistics and organism details with gr.Column(scale=1): stats_plot = gr.Plot(label="Population Statistics") neural_plot = gr.Plot(label="Neural Activity") with gr.Group(): gr.Markdown("### Selected Organism") organism_dropdown = gr.Dropdown( label="Select Organism", choices=[], interactive=True ) vitals_json = gr.JSON(label="Organism Vitals") with gr.Row(): add_energy_btn = gr.Button("Add Energy") add_neurons_btn = gr.Button("Add Neurons") # Advanced settings tab with gr.Tab("Advanced Settings"): with gr.Row(): with gr.Column(): brain_update_rate = gr.Slider( minimum=1, maximum=30, value=10, step=1, label="Brain Update Rate (Hz)" ) max_neurons = gr.Slider( minimum=100, maximum=5000, value=1000, step=100, label="Max Neurons per Brain" ) energy_decay = gr.Slider( minimum=0, maximum=1, value=0.1, step=0.01, label="Energy Decay Rate" ) with gr.Column(): interaction_strength = gr.Slider( minimum=0, maximum=1, value=0.5, step=0.01, label="Interaction Strength" ) field_resolution = gr.Slider( minimum=10, maximum=100, value=50, step=5, label="Field Resolution" ) # Event handlers def start_simulation(initial_population, mutation_rate_val, max_population_val, brain_update_rate_val, max_neurons_val, energy_decay_val, interaction_strength_val, field_resolution_val): try: if self.simulation is None: config = SimulationConfig() config.MIN_ORGANISMS = int(initial_population) config.MUTATION_RATE = mutation_rate_val config.MAX_ORGANISMS = int(max_population_val) config.BRAIN_UPDATE_RATE = int(brain_update_rate_val) config.MAX_NEURONS = int(max_neurons_val) config.ENERGY_DECAY = energy_decay_val self.simulation = FractalLifeSimulation(config, self.shared_data) self.simulation.start() logging.info("Simulation started via interface.") return "Simulation started" else: logging.warning("Simulation is already running.") return "Simulation is already running." except Exception as e: logging.error(f"Error starting simulation: {e}") return "Failed to start simulation." def pause_simulation(): try: if self.simulation: self.simulation.pause() status = "paused" if self.simulation.state.paused else "resumed" logging.info(f"Simulation {status} via interface.") return f"Simulation {status}" logging.warning("No simulation running to pause/resume.") return "No simulation running" except Exception as e: logging.error(f"Error pausing simulation: {e}") return "Failed to pause simulation." def stop_simulation(): try: if self.simulation: self.simulation.stop() self.simulation = None logging.info("Simulation stopped via interface.") return "Simulation stopped" logging.warning("No simulation running to stop.") return "No simulation running" except Exception as e: logging.error(f"Error stopping simulation: {e}") return "Failed to stop simulation." def select_organism(organism_id): try: self.selected_organism_id = organism_id if self.simulation: self.simulation.event_queue.put({ 'type': 'select_organism', 'organism_id': organism_id }) logging.info(f"Organism {organism_id} selected via interface.") except Exception as e: logging.error(f"Error selecting organism: {e}") def add_energy_to_organism(): try: if self.simulation and self.selected_organism_id: self.simulation.event_queue.put({ 'type': 'add_energy', 'amount': 50.0 }) logging.info(f"Added energy to organism {self.selected_organism_id} via interface.") return "Added energy to selected organism" logging.warning("No organism selected or simulation not running to add energy.") return "No organism selected or simulation not running" except Exception as e: logging.error(f"Error adding energy to organism: {e}") return "Failed to add energy" def add_neurons_to_organism(): try: if self.simulation and self.selected_organism_id: self.simulation.event_queue.put({ 'type': 'modify_neurons', 'amount': 10 }) logging.info(f"Added neurons to organism {self.selected_organism_id} via interface.") return "Added neurons to selected organism" logging.warning("No organism selected or simulation not running to add neurons.") return "No organism selected or simulation not running" except Exception as e: logging.error(f"Error adding neurons to organism: {e}") return "Failed to add neurons" # Create a bound method for update_display def bound_update_display(): return self.update_display() # Connect events start_btn.click( start_simulation, inputs=[population_slider, mutation_rate, max_population_slider, brain_update_rate, max_neurons, energy_decay, interaction_strength, field_resolution], outputs=gr.Textbox() ) pause_btn.click(pause_simulation, outputs=gr.Textbox()) stop_btn.click(stop_simulation, outputs=gr.Textbox()) organism_dropdown.change(select_organism, inputs=[organism_dropdown], outputs=None) add_energy_btn.click(add_energy_to_organism, outputs=gr.Textbox()) add_neurons_btn.click(add_neurons_to_organism, outputs=gr.Textbox()) # Periodic update using Gradio's update mechanism interface.load( fn=bound_update_display, inputs=[], outputs=[canvas, stats_plot, neural_plot, organism_dropdown, vitals_json], every=1/30, # 30 FPS updates queue=True ) return interface def _create_stats_plot(self): """Create statistics plot using plotly""" try: fig = make_subplots(rows=3, cols=1, shared_xaxes=True, subplot_titles=("Population", "Average Energy", "Average Neurons")) fig.add_trace( go.Scatter(x=self.history['time'], y=self.history['population'], name="Population"), row=1, col=1 ) fig.add_trace( go.Scatter(x=self.history['time'], y=self.history['avg_energy'], name="Avg Energy"), row=2, col=1 ) fig.add_trace( go.Scatter(x=self.history['time'], y=self.history['avg_neurons'], name="Avg Neurons"), row=3, col=1 ) fig.update_layout(height=600, showlegend=True) return fig except Exception as e: logging.error(f"Error creating stats plot: {e}") return go.Figure() def _create_neural_plot(self): """Create neural activity plot""" try: if self.selected_organism_id and self.simulation and self.simulation.running: org = next( (org for org in self.simulation.organisms if id(org) == self.selected_organism_id), None ) if org: # Create neural activity heatmap neurons = [org.brain.visual_cortex, org.brain.thought_processor, org.brain.action_generator] activities = [] for neuron in neurons: layer_activities = [child.state.activation for child in neuron.sub_neurons] if hasattr(neuron, 'sub_neurons') and neuron.sub_neurons else [neuron.state.activation] activities.append(layer_activities) activities = np.array(activities) fig = go.Figure(data=go.Heatmap(z=activities, colorscale='Viridis')) fig.update_layout( title="Neural Activity", xaxis_title="Neuron Index", yaxis_title="Layer" ) return fig return go.Figure() except Exception as e: logging.error(f"Error creating neural plot: {e}") return go.Figure() # ============================== # Entry Point # ============================== if __name__ == "__main__": interface = FractalLifeInterface() gr_interface = interface.create_interface() # Enable queue and allow for public access gr_interface.queue().launch( server_name="0.0.0.0", server_port=7860, share=True # This creates a public link )