Anupam251272's picture
Create app.py
8b0caab verified
import gradio as gr
import numpy as np
import matplotlib.pyplot as plt
import requests
import tempfile
from datetime import datetime
import json
class WeatherPredictor:
def __init__(self):
# API configuration
self.API_KEY = "15ee959f02bd5393c8864541af1d5eb8"
self.BASE_URL = "https://api.openweathermap.org/data/2.5/weather"
# Model parameters
self.sigma = 8.0 # Reduced for more stable evolution
self.rho = 20.0 # Reduced to limit extreme variations
self.beta = 2.667 # Standard ratio
self.damping_factor = 0.95
# Historical averages (example values - should be replaced with real data)
self.historical_means = {
'temperature': 15.0,
'pressure': 1013.0,
'humidity': 65.0
}
def fetch_weather(self, city):
"""Fetch real-time weather data from OpenWeatherMap API."""
params = {
"q": city,
"appid": self.API_KEY,
"units": "metric"
}
try:
response = requests.get(self.BASE_URL, params=params)
response.raise_for_status()
data = response.json()
return (
data["main"]["temp"],
data["main"]["pressure"],
data["main"]["humidity"]
)
except Exception as e:
print(f"Error fetching weather data: {str(e)}")
return None, None, None
def normalize_weather_data(self, temperature, pressure, humidity):
"""Improved normalization for weather data."""
# Scale temperature to [-0.5, 0.5] for better stability
temp_norm = (temperature + 50) / 200 # Range: -50°C to 150°C
# Scale pressure variations more significantly
pressure_norm = (pressure - 1013) / 50 # Centered around standard pressure
# Center humidity around zero
humidity_norm = (humidity - 50) / 100 # Range: [-0.5, 0.5]
return np.array([temp_norm, pressure_norm, humidity_norm])
def denormalize_weather_data(self, state):
"""Denormalize with improved scaling."""
temp_norm, pressure_norm, humidity_norm = state
temperature = temp_norm * 200 - 50
pressure = pressure_norm * 50 + 1013
humidity = humidity_norm * 100 + 50
# Ensure realistic bounds
temperature = np.clip(temperature, -50, 50)
pressure = np.clip(pressure, 950, 1050)
humidity = np.clip(humidity, 0, 100)
return temperature, pressure, humidity
def lorenz_system(self, state, t):
"""Modified Lorenz system with dampening."""
x, y, z = state
# Apply dampening to limit extreme predictions
dxdt = self.sigma * (y - x) * self.damping_factor
dydt = (x * (self.rho - z) - y) * self.damping_factor
dzdt = (x * y - self.beta * z) * self.damping_factor
return np.array([dxdt, dydt, dzdt])
def integrate_rk4(self, initial_state, dt, steps):
"""RK4 integration with stability checks."""
state = initial_state.copy()
trajectory = [state]
for _ in range(steps):
k1 = self.lorenz_system(state, 0)
k2 = self.lorenz_system(state + dt * k1 / 2, dt/2)
k3 = self.lorenz_system(state + dt * k2 / 2, dt/2)
k4 = self.lorenz_system(state + dt * k3, dt)
state = state + dt * (k1 + 2*k2 + 2*k3 + k4) / 6
# Bound checking
state = np.clip(state, -2, 2)
trajectory.append(state.copy())
return np.array(trajectory)
def ensemble_predict(self, initial_conditions, hours=3, n_ensemble=10):
"""Generate ensemble predictions with perturbations."""
dt = 0.005
steps = int(hours * 200)
predictions = []
trajectories = []
for _ in range(n_ensemble):
# Add small random perturbations
perturbed = initial_conditions + np.random.normal(0, 0.01, 3)
# Generate trajectory
trajectory = self.integrate_rk4(perturbed, dt, steps)
trajectories.append(trajectory)
# Get final prediction
final_state = trajectory[-1]
predictions.append(self.denormalize_weather_data(final_state))
# Convert predictions to numpy array for easier computation
predictions = np.array(predictions)
# Calculate mean and standard deviation
mean_prediction = np.mean(predictions, axis=0)
std_prediction = np.std(predictions, axis=0)
return mean_prediction, std_prediction, trajectories
def blend_with_climatology(self, prediction):
"""Blend chaotic prediction with historical averages."""
blend_factor = 0.7 # Weight for chaotic prediction
historical = np.array([
self.historical_means['temperature'],
self.historical_means['pressure'],
self.historical_means['humidity']
])
return prediction * blend_factor + historical * (1 - blend_factor)
def create_visualization(self, trajectories, city):
"""Create 3D visualization of prediction trajectories."""
plt.figure(figsize=(12, 8))
ax = plt.axes(projection='3d')
# Plot each trajectory with decreasing opacity
n_trajectories = len(trajectories)
for i, traj in enumerate(trajectories):
alpha = 0.3 + 0.7 * (i / n_trajectories) # Varying opacity
ax.plot3D(traj[:, 0], traj[:, 1], traj[:, 2], alpha=alpha)
ax.set_xlabel('Temperature Component')
ax.set_ylabel('Pressure Component')
ax.set_zlabel('Humidity Component')
ax.set_title(f'Weather Prediction Trajectories for {city}')
# Save plot to temporary file
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".png")
plt.savefig(temp_file.name)
plt.close()
return temp_file.name
def predict_weather(city, forecast_hours=3):
"""Main prediction interface."""
predictor = WeatherPredictor()
# Fetch current weather
temperature, pressure, humidity = predictor.fetch_weather(city)
if temperature is None:
return {"error": "Could not fetch weather data for the city"}, None
# Normalize initial conditions
initial_conditions = predictor.normalize_weather_data(temperature, pressure, humidity)
# Generate ensemble predictions
mean_prediction, std_prediction, trajectories = predictor.ensemble_predict(
initial_conditions, hours=forecast_hours
)
# Blend with climatology
final_prediction = predictor.blend_with_climatology(mean_prediction)
# Create visualization
plot_path = predictor.create_visualization(trajectories, city)
# Prepare results
results = {
"Current Conditions": {
"Temperature (°C)": f"{temperature:.1f}",
"Pressure (hPa)": f"{pressure:.1f}",
"Humidity (%)": f"{humidity:.1f}"
},
"Forecast": {
"Temperature (°C)": f"{final_prediction[0]:.1f} ± {std_prediction[0]:.1f}",
"Pressure (hPa)": f"{final_prediction[1]:.1f} ± {std_prediction[1]:.1f}",
"Humidity (%)": f"{final_prediction[2]:.1f} ± {std_prediction[2]:.1f}"
},
"Forecast Time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
return results, plot_path
# Gradio Interface
def gradio_interface(city, forecast_hours):
"""Gradio interface function."""
results, plot_path = predict_weather(city, float(forecast_hours))
return json.dumps(results, indent=2), plot_path
# Create Gradio UI
iface = gr.Interface(
fn=gradio_interface,
inputs=[
gr.Textbox(label="City", placeholder="Enter city name (e.g., London)"),
gr.Number(value=3, label="Forecast Hours", minimum=1, maximum=24)
],
outputs=[
gr.JSON(label="Weather Forecast"),
gr.Image(label="Forecast Visualization")
],
title="Enhanced Chaotic Weather Prediction System",
description="Weather prediction using modified Lorenz system with ensemble forecasting"
)
# Launch the interface
if __name__ == "__main__":
iface.launch()