Spaces:
Sleeping
Sleeping
import gradio as gr | |
import numpy as np | |
import matplotlib.pyplot as plt | |
import requests | |
import tempfile | |
from datetime import datetime | |
import json | |
class WeatherPredictor: | |
def __init__(self): | |
# API configuration | |
self.API_KEY = "15ee959f02bd5393c8864541af1d5eb8" | |
self.BASE_URL = "https://api.openweathermap.org/data/2.5/weather" | |
# Model parameters | |
self.sigma = 8.0 # Reduced for more stable evolution | |
self.rho = 20.0 # Reduced to limit extreme variations | |
self.beta = 2.667 # Standard ratio | |
self.damping_factor = 0.95 | |
# Historical averages (example values - should be replaced with real data) | |
self.historical_means = { | |
'temperature': 15.0, | |
'pressure': 1013.0, | |
'humidity': 65.0 | |
} | |
def fetch_weather(self, city): | |
"""Fetch real-time weather data from OpenWeatherMap API.""" | |
params = { | |
"q": city, | |
"appid": self.API_KEY, | |
"units": "metric" | |
} | |
try: | |
response = requests.get(self.BASE_URL, params=params) | |
response.raise_for_status() | |
data = response.json() | |
return ( | |
data["main"]["temp"], | |
data["main"]["pressure"], | |
data["main"]["humidity"] | |
) | |
except Exception as e: | |
print(f"Error fetching weather data: {str(e)}") | |
return None, None, None | |
def normalize_weather_data(self, temperature, pressure, humidity): | |
"""Improved normalization for weather data.""" | |
# Scale temperature to [-0.5, 0.5] for better stability | |
temp_norm = (temperature + 50) / 200 # Range: -50°C to 150°C | |
# Scale pressure variations more significantly | |
pressure_norm = (pressure - 1013) / 50 # Centered around standard pressure | |
# Center humidity around zero | |
humidity_norm = (humidity - 50) / 100 # Range: [-0.5, 0.5] | |
return np.array([temp_norm, pressure_norm, humidity_norm]) | |
def denormalize_weather_data(self, state): | |
"""Denormalize with improved scaling.""" | |
temp_norm, pressure_norm, humidity_norm = state | |
temperature = temp_norm * 200 - 50 | |
pressure = pressure_norm * 50 + 1013 | |
humidity = humidity_norm * 100 + 50 | |
# Ensure realistic bounds | |
temperature = np.clip(temperature, -50, 50) | |
pressure = np.clip(pressure, 950, 1050) | |
humidity = np.clip(humidity, 0, 100) | |
return temperature, pressure, humidity | |
def lorenz_system(self, state, t): | |
"""Modified Lorenz system with dampening.""" | |
x, y, z = state | |
# Apply dampening to limit extreme predictions | |
dxdt = self.sigma * (y - x) * self.damping_factor | |
dydt = (x * (self.rho - z) - y) * self.damping_factor | |
dzdt = (x * y - self.beta * z) * self.damping_factor | |
return np.array([dxdt, dydt, dzdt]) | |
def integrate_rk4(self, initial_state, dt, steps): | |
"""RK4 integration with stability checks.""" | |
state = initial_state.copy() | |
trajectory = [state] | |
for _ in range(steps): | |
k1 = self.lorenz_system(state, 0) | |
k2 = self.lorenz_system(state + dt * k1 / 2, dt/2) | |
k3 = self.lorenz_system(state + dt * k2 / 2, dt/2) | |
k4 = self.lorenz_system(state + dt * k3, dt) | |
state = state + dt * (k1 + 2*k2 + 2*k3 + k4) / 6 | |
# Bound checking | |
state = np.clip(state, -2, 2) | |
trajectory.append(state.copy()) | |
return np.array(trajectory) | |
def ensemble_predict(self, initial_conditions, hours=3, n_ensemble=10): | |
"""Generate ensemble predictions with perturbations.""" | |
dt = 0.005 | |
steps = int(hours * 200) | |
predictions = [] | |
trajectories = [] | |
for _ in range(n_ensemble): | |
# Add small random perturbations | |
perturbed = initial_conditions + np.random.normal(0, 0.01, 3) | |
# Generate trajectory | |
trajectory = self.integrate_rk4(perturbed, dt, steps) | |
trajectories.append(trajectory) | |
# Get final prediction | |
final_state = trajectory[-1] | |
predictions.append(self.denormalize_weather_data(final_state)) | |
# Convert predictions to numpy array for easier computation | |
predictions = np.array(predictions) | |
# Calculate mean and standard deviation | |
mean_prediction = np.mean(predictions, axis=0) | |
std_prediction = np.std(predictions, axis=0) | |
return mean_prediction, std_prediction, trajectories | |
def blend_with_climatology(self, prediction): | |
"""Blend chaotic prediction with historical averages.""" | |
blend_factor = 0.7 # Weight for chaotic prediction | |
historical = np.array([ | |
self.historical_means['temperature'], | |
self.historical_means['pressure'], | |
self.historical_means['humidity'] | |
]) | |
return prediction * blend_factor + historical * (1 - blend_factor) | |
def create_visualization(self, trajectories, city): | |
"""Create 3D visualization of prediction trajectories.""" | |
plt.figure(figsize=(12, 8)) | |
ax = plt.axes(projection='3d') | |
# Plot each trajectory with decreasing opacity | |
n_trajectories = len(trajectories) | |
for i, traj in enumerate(trajectories): | |
alpha = 0.3 + 0.7 * (i / n_trajectories) # Varying opacity | |
ax.plot3D(traj[:, 0], traj[:, 1], traj[:, 2], alpha=alpha) | |
ax.set_xlabel('Temperature Component') | |
ax.set_ylabel('Pressure Component') | |
ax.set_zlabel('Humidity Component') | |
ax.set_title(f'Weather Prediction Trajectories for {city}') | |
# Save plot to temporary file | |
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".png") | |
plt.savefig(temp_file.name) | |
plt.close() | |
return temp_file.name | |
def predict_weather(city, forecast_hours=3): | |
"""Main prediction interface.""" | |
predictor = WeatherPredictor() | |
# Fetch current weather | |
temperature, pressure, humidity = predictor.fetch_weather(city) | |
if temperature is None: | |
return {"error": "Could not fetch weather data for the city"}, None | |
# Normalize initial conditions | |
initial_conditions = predictor.normalize_weather_data(temperature, pressure, humidity) | |
# Generate ensemble predictions | |
mean_prediction, std_prediction, trajectories = predictor.ensemble_predict( | |
initial_conditions, hours=forecast_hours | |
) | |
# Blend with climatology | |
final_prediction = predictor.blend_with_climatology(mean_prediction) | |
# Create visualization | |
plot_path = predictor.create_visualization(trajectories, city) | |
# Prepare results | |
results = { | |
"Current Conditions": { | |
"Temperature (°C)": f"{temperature:.1f}", | |
"Pressure (hPa)": f"{pressure:.1f}", | |
"Humidity (%)": f"{humidity:.1f}" | |
}, | |
"Forecast": { | |
"Temperature (°C)": f"{final_prediction[0]:.1f} ± {std_prediction[0]:.1f}", | |
"Pressure (hPa)": f"{final_prediction[1]:.1f} ± {std_prediction[1]:.1f}", | |
"Humidity (%)": f"{final_prediction[2]:.1f} ± {std_prediction[2]:.1f}" | |
}, | |
"Forecast Time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
} | |
return results, plot_path | |
# Gradio Interface | |
def gradio_interface(city, forecast_hours): | |
"""Gradio interface function.""" | |
results, plot_path = predict_weather(city, float(forecast_hours)) | |
return json.dumps(results, indent=2), plot_path | |
# Create Gradio UI | |
iface = gr.Interface( | |
fn=gradio_interface, | |
inputs=[ | |
gr.Textbox(label="City", placeholder="Enter city name (e.g., London)"), | |
gr.Number(value=3, label="Forecast Hours", minimum=1, maximum=24) | |
], | |
outputs=[ | |
gr.JSON(label="Weather Forecast"), | |
gr.Image(label="Forecast Visualization") | |
], | |
title="Enhanced Chaotic Weather Prediction System", | |
description="Weather prediction using modified Lorenz system with ensemble forecasting" | |
) | |
# Launch the interface | |
if __name__ == "__main__": | |
iface.launch() |