Spaces:
Running
Running
import gradio as gr | |
import requests | |
import pandas as pd | |
from sklearn.linear_model import LogisticRegression | |
from fastai.vision.all import * | |
import os | |
from datetime import datetime, timedelta | |
# Load the trained model for image-based fog classification | |
learn = load_learner('fog_classifier.pkl') | |
labels = learn.dls.vocab | |
API_KEY = os.environ.get("OPENWEATHER_API_KEY") | |
BASE_URL = 'https://api.openweathermap.org/data/2.5/' | |
def predict_image(img): | |
"""Predict fog conditions from image and return confidence scores""" | |
img = PILImage.create(img) | |
img = img.resize((512, 512)) | |
pred, pred_idx, probs = learn.predict(img) | |
return {labels[i]: float(probs[i]) for i in range(len(labels))} | |
def calculate_fog_risk_score(weather_data): | |
"""Calculate a fog risk score (0-1) based on weather conditions""" | |
# Normalized weights for each factor | |
weights = { | |
'humidity': 0.3, | |
'dew_point_temp_diff': 0.3, | |
'visibility': 0.2, | |
'wind_speed': 0.1, | |
'pressure_change': 0.1 | |
} | |
# Calculate dew point | |
dew_point = weather_data['temperature'] - ((100 - weather_data['humidity']) / 5.0) | |
dew_point_temp_diff = abs(weather_data['temperature'] - dew_point) | |
# Normalize each factor to 0-1 scale | |
humidity_score = min(weather_data['humidity'] / 100, 1) | |
dew_point_score = 1 - min(dew_point_temp_diff / 5, 1) # Closer to dew point = higher score | |
visibility_score = 1 - min(weather_data['visibility'] / 10, 1) # Lower visibility = higher score | |
wind_score = 1 - min(weather_data['wind_speed'] / 10, 1) # Lower wind = higher score | |
pressure_score = min(abs(weather_data['pressure'] - 1013.25) / 50, 1) # Deviation from normal pressure | |
# Calculate weighted score | |
fog_risk = ( | |
weights['humidity'] * humidity_score + | |
weights['dew_point_temp_diff'] * dew_point_score + | |
weights['visibility'] * visibility_score + | |
weights['wind_speed'] * wind_score + | |
weights['pressure_change'] * pressure_score | |
) | |
return fog_risk | |
def get_weather_data(location): | |
"""Get current weather data with enhanced error handling""" | |
try: | |
current_weather_url = f'{BASE_URL}weather?q={location}&appid={API_KEY}&units=metric' | |
response = requests.get(current_weather_url) | |
response.raise_for_status() | |
data = response.json() | |
return { | |
'temperature': data['main'].get('temp', 0), | |
'feels_like': data['main'].get('feels_like', 0), | |
'description': data['weather'][0].get('description', ''), | |
'wind_speed': data['wind'].get('speed', 0), | |
'pressure': data['main'].get('pressure', 0), | |
'humidity': data['main'].get('humidity', 0), | |
'visibility': data.get('visibility', 10000) / 1000, | |
'timestamp': datetime.fromtimestamp(data['dt']) | |
} | |
except requests.exceptions.RequestException as e: | |
raise Exception(f"Failed to fetch weather data: {str(e)}") | |
def get_forecast_data(location): | |
"""Get 5-day forecast with enhanced error handling""" | |
try: | |
forecast_url = f'{BASE_URL}forecast?q={location}&appid={API_KEY}&units=metric' | |
response = requests.get(forecast_url) | |
response.raise_for_status() | |
data = response.json() | |
forecasts = [] | |
for item in data['list']: | |
forecasts.append({ | |
'temperature': item['main'].get('temp', 0), | |
'humidity': item['main'].get('humidity', 0), | |
'description': item['weather'][0].get('description', ''), | |
'wind_speed': item['wind'].get('speed', 0), | |
'pressure': item['main'].get('pressure', 0), | |
'visibility': item.get('visibility', 10000) / 1000, | |
'timestamp': datetime.fromtimestamp(item['dt']) | |
}) | |
return forecasts | |
except requests.exceptions.RequestException as e: | |
raise Exception(f"Failed to fetch forecast data: {str(e)}") | |
def format_duration(duration): | |
"""Format timedelta into days and hours string""" | |
total_hours = duration.total_seconds() / 3600 | |
days = int(total_hours // 24) | |
hours = int(total_hours % 24) | |
if days > 0: | |
return f"{days} days and {hours} hours" | |
return f"{hours} hours" | |
def determine_transmission_power(image_prediction, weather_data, forecast_data=None): | |
""" | |
Determine transmission power based on current conditions and forecast | |
Returns: (power_level, duration, explanation) | |
""" | |
# Get fog confidence from image | |
image_fog_confidence = max( | |
image_prediction.get('Dense_Fog', 0), | |
image_prediction.get('Moderate_Fog', 0) * 0.6 | |
) | |
# Calculate weather-based fog risk | |
current_fog_risk = calculate_fog_risk_score(weather_data) | |
# Combine image and weather predictions with weighted average | |
# Give slightly more weight to image prediction as it's more reliable | |
combined_fog_risk = (image_fog_confidence * 0.6) + (current_fog_risk * 0.4) | |
# Initialize explanation | |
explanation = [] | |
# Determine base power level from current conditions | |
if combined_fog_risk > 0.7: | |
power_level = "High" | |
explanation.append(f"High fog risk detected (Risk score: {combined_fog_risk:.2f})") | |
elif combined_fog_risk > 0.4: | |
power_level = "Medium" | |
explanation.append(f"Moderate fog risk detected (Risk score: {combined_fog_risk:.2f})") | |
else: | |
power_level = "Normal" | |
explanation.append(f"Low fog risk detected (Risk score: {combined_fog_risk:.2f})") | |
# Analyze forecast data if available | |
duration = timedelta(hours=1) # Default duration | |
if forecast_data: | |
future_risks = [] | |
for forecast in forecast_data[:40]: # 5 days of 3-hour forecasts | |
risk = calculate_fog_risk_score(forecast) | |
future_risks.append(risk) | |
# Find periods of high risk | |
high_risk_periods = [risk > 0.6 for risk in future_risks] | |
if any(high_risk_periods): | |
# Find the last high-risk timestamp | |
last_high_risk_idx = len(high_risk_periods) - 1 - high_risk_periods[::-1].index(True) | |
duration = forecast_data[last_high_risk_idx]['timestamp'] - weather_data['timestamp'] | |
explanation.append(f"High fog risk predicted to continue for {format_duration(duration)}") | |
# Adjust power level based on forecast | |
if sum(high_risk_periods) / len(high_risk_periods) > 0.5: | |
power_level = "High" | |
explanation.append("Power level set to High due to sustained fog risk in forecast") | |
return power_level, duration, explanation | |
def integrated_prediction(image, location): | |
"""Main function to process image and weather data""" | |
try: | |
# Get image prediction | |
image_prediction = predict_image(image) | |
# Get current weather | |
current_weather = get_weather_data(location) | |
# Get forecast | |
forecast_data = get_forecast_data(location) | |
# Determine transmission power | |
power_level, duration, explanation = determine_transmission_power( | |
image_prediction, | |
current_weather, | |
forecast_data | |
) | |
# Format result | |
result = [ | |
f"Current Conditions ({current_weather['timestamp'].strftime('%Y-%m-%d %H:%M')})", | |
f"Temperature: {current_weather['temperature']:.1f}°C", | |
f"Humidity: {current_weather['humidity']}%", | |
f"Visibility: {current_weather['visibility']:.1f} km", | |
f"Wind Speed: {current_weather['wind_speed']} m/s", | |
"", | |
"Analysis Results:", | |
*explanation, | |
"", | |
f"Recommended Power Level: {power_level}", | |
f"Duration: {format_duration(duration)}", | |
"", | |
"5-Day Forecast Summary:" | |
] | |
# Add daily forecast summary | |
current_date = current_weather['timestamp'].date() | |
for day in range(5): | |
forecast_date = current_date + timedelta(days=day) | |
day_forecasts = [f for f in forecast_data if f['timestamp'].date() == forecast_date] | |
if day_forecasts: | |
avg_risk = sum(calculate_fog_risk_score(f) for f in day_forecasts) / len(day_forecasts) | |
result.append(f"{forecast_date.strftime('%Y-%m-%d')}: " | |
f"Fog Risk: {'High' if avg_risk > 0.6 else 'Moderate' if avg_risk > 0.3 else 'Low'} " | |
f"({avg_risk:.2f})") | |
return "\n".join(result) | |
except Exception as e: | |
return f"Error: {str(e)}" | |
# Gradio interface | |
with gr.Blocks() as demo: | |
gr.Markdown("# Enhanced Fog Prediction and Transmission Power System") | |
with gr.Row(): | |
image_input = gr.Image(label="Upload Current Conditions Image") | |
location_input = gr.Textbox(label="Enter Location") | |
predict_button = gr.Button("Analyze and Determine Transmission Power") | |
output = gr.Textbox(label="Analysis Results", lines=15) | |
predict_button.click(integrated_prediction, inputs=[image_input, location_input], outputs=output) | |
demo.launch() |