import os
import ee
import json
import geojson
import geemap
import geemap.foliumap as gee_folium
import leafmap.foliumap as leaf_folium
import streamlit as st
import pandas as pd
import geopandas as gpd
from shapely.ops import transform
from functools import reduce
import plotly.express as px
st.set_page_config(layout="wide")
############################################
# Functions
############################################
def shape_3d_to_2d(shape):
if shape.has_z:
return transform(lambda x, y, z: (x, y), shape)
else:
return shape
def preprocess_gdf(gdf):
gdf = gdf.to_crs(epsg=4326)
gdf = gdf[["Name", "geometry"]]
gdf["geometry"] = gdf["geometry"].apply(shape_3d_to_2d)
return gdf
def calculate_ndvi(image, nir_band, red_band):
nir = image.select(nir_band)
red = image.select(red_band)
ndvi = (nir.subtract(red)).divide(nir.add(red)).rename("NDVI")
return image.addBands(ndvi)
def process_date(start_date, end_date, satellite):
try:
attrs = satellites[satellite]
collection = attrs["collection"]
collection = collection.filterBounds(ee_geometry)
collection = collection.filterDate(start_date, end_date)
mosaic = collection.qualityMosaic("NDVI")
fc = geemap.zonal_stats(
mosaic, ee_feature_collection, scale=attrs["scale"], return_fc=True
).getInfo()
mean_ndvi = fc["features"][0]["properties"]["NDVI"]
if satellite == "COPERNICUS/S2_SR_HARMONIZED":
cloud_mask_probability = fc["features"][0]["properties"]["MSK_CLDPRB"] / 100
else:
cloud_mask_probability = None
except Exception as e:
print(e)
mosaic = None
mean_ndvi = None
cloud_mask_probability = None
return mosaic, mean_ndvi, cloud_mask_probability
def postprocess_df(df, name):
df = df.T
df = df.reset_index()
ndvi_df = df[df["index"].str.contains("NDVI")]
ndvi_df["index"] = pd.to_datetime(ndvi_df["index"], format="%Y-%m_NDVI")
ndvi_df = ndvi_df.rename(columns={"index": "Date", 0: name})
cloud_mask_probability = df[df["index"].str.contains("MSK_CLDPRB")]
cloud_mask_probability["index"] = pd.to_datetime(
cloud_mask_probability["index"], format="%Y-%m_MSK_CLDPRB"
)
cloud_mask_probability = cloud_mask_probability.rename(
columns={"index": "Date", 0: f"{name}_cloud_proba"}
)
# normalize
cloud_mask_probability[f"{name}_cloud_proba"] = (
cloud_mask_probability[f"{name}_cloud_proba"] / 100
)
df = pd.merge(ndvi_df, cloud_mask_probability, on="Date", how="outer")
return df
def write_info(info):
st.write(f"{info}", unsafe_allow_html=True)
############################################
# One time setup
############################################
def one_time_setup():
credentials_path = os.path.expanduser("~/.config/earthengine/credentials")
if os.path.exists(credentials_path):
pass # Earth Engine credentials already exist
elif "EE" in os.environ: # write the credentials to the file
ee_credentials = os.environ.get("EE")
os.makedirs(os.path.dirname(credentials_path), exist_ok=True)
with open(credentials_path, "w") as f:
f.write(ee_credentials)
else:
raise ValueError(
f"Earth Engine credentials not found at {credentials_path} or in the environment variable 'EE'"
)
ee.Initialize()
satellites = {
# "LANDSAT/LC08/C02/T1_TOA": {
# "scale": 30,
# "collection": ee.ImageCollection("LANDSAT/LC08/C02/T1_TOA")
# .select(["B2", "B3", "B4", "B5"], ["B", "G", "R", "NIR"])
# .map(lambda image: calculate_ndvi(image, nir_band="NIR", red_band="R")),
# },
"COPERNICUS/S2_SR_HARMONIZED": {
"scale": 10,
"collection": ee.ImageCollection("COPERNICUS/S2_SR_HARMONIZED")
.select(
["B4", "B8", "MSK_CLDPRB", "TCI_R", "TCI_G", "TCI_B"],
["Red", "NIR", "MSK_CLDPRB", "R", "G", "B"],
)
.map(lambda image: calculate_ndvi(image, nir_band="NIR", red_band="Red")),
},
# "LANDSAT/LC09/C02/T1_L2": {
# "scale": 30,
# "collection": ee.ImageCollection("LANDSAT/LC09/C02/T1_L2")
# .select(["SR_B2", "SR_B3", "SR_B4", "SR_B5"], ["B", "G", "R", "NIR"])
# .map(lambda image: calculate_ndvi(image, nir_band="NIR", red_band="R")),
# },
# "LANDSAT/LC08/C02/T1_L2": {
# "scale": 30,
# "collection": ee.ImageCollection("LANDSAT/LC08/C02/T1_L2")
# .select(["SR_B2", "SR_B3", "SR_B4", "SR_B5"], ["B", "G", "R", "NIR"])
# .map(lambda image: calculate_ndvi(image, nir_band="NIR", red_band="R")),
# },
# "LANDSAT/LE07/C02/T1_L2": {
# "scale": 30,
# "collection": ee.ImageCollection("LANDSAT/LE07/C02/T1_L2")
# .select(["SR_B2", "SR_B3", "SR_B4", "SR_B5"], ["B", "G", "R", "NIR"])
# .map(lambda image: calculate_ndvi(image, nir_band="NIR", red_band="R")),
# },
}
st.session_state.satellites = satellites
with open("wayback_imagery.json") as f:
st.session_state.wayback_mapping = json.load(f)
if "one_time_setup_done" not in st.session_state:
one_time_setup()
st.session_state.one_time_setup_done = True
else:
satellites = st.session_state.satellites
wayback_mapping = st.session_state.wayback_mapping
############################################
# App
############################################
# Title
# make title in center
st.markdown(
f"""
NDVI Explorer
""",
unsafe_allow_html=True,
)
# Input: Date and Cloud Cover
col = st.columns(4)
start_year = col[0].selectbox("Start Year", list(range(2014, 2024)), index=8)
start_month = col[1].selectbox("Start Month", list(range(1, 13)), index=0)
end_year = col[2].selectbox("End Year", list(range(2014, 2024)), index=8)
end_month = col[3].selectbox("End Month", list(range(1, 13)), index=2)
start_date = f"{start_year}-{start_month:02d}"
end_date = f"{end_year}-{end_month:02d}"
# Input: GeoJSON/KML file
uploaded_file = st.file_uploader("Upload KML/GeoJSON file", type=["geojson", "kml"])
if uploaded_file is None:
st.stop()
gdf = preprocess_gdf(gpd.read_file(uploaded_file))
# Input: Geometry
selected_geometry = st.selectbox("Select the geometry", gdf.Name.values)
selected_geometry = gdf[gdf.Name == selected_geometry].iloc[0].geometry
if selected_geometry.type != "Polygon":
st.error(
f"Selected geometry is of type {selected_geometry.type}. Please provide a polygon geometry."
)
st.stop()
# Derived Inputs
selected_geometry = selected_geometry.__geo_interface__
ee_geometry = ee.Geometry(selected_geometry)
ee_feature_collection = ee.FeatureCollection(ee_geometry)
feature_collection = geojson.FeatureCollection([{"type": "Feature", "geometry": selected_geometry, "properties": {}}])
# Input: Satellite Sources
st.write("Select the satellite sources:")
satellite_selected = {}
for satellite in satellites:
satellite_selected[satellite] = st.checkbox(satellite, value=True)
# Submit
submit = st.button("Submit", use_container_width=True)
if submit:
if not any(satellite_selected.values()):
st.error("Please select at least one satellite source")
st.stop()
# Create month range
dates = pd.date_range(start_date, end_date, freq="MS").strftime("%Y-%m").tolist()
write_info(
f"Start Date (inclusive): {start_date}, End Date (exclusive): {end_date}"
)
result = {key: {} for key in satellites}
for satellite, attrs in satellites.items():
if not satellite_selected[satellite]:
continue
with st.spinner(f"Processing {satellite} ..."):
progress_bar = st.progress(0)
for i, (start, end) in enumerate(zip(dates[:-1], dates[1:])):
mosaic, mean_ndvi, cloud_proba = process_date(start, end, satellite)
result[satellite][start] = {
"mosaic": mosaic,
"mean_ndvi": mean_ndvi,
"cloud_mask_probability": cloud_proba,
}
progress_bar.progress((i + 1) / (len(dates) - 1))
st.session_state.result = result
if "result" in st.session_state:
result = st.session_state.result
df_list = []
for satellite, satellite_result in result.items():
satellite_df = pd.DataFrame(satellite_result).T
satellite_df.rename(
columns={
"mean_ndvi": f"NDVI_{satellite}",
"mosaic": f"Mosaic_{satellite}",
"cloud_mask_probability": f"Cloud_{satellite}",
},
inplace=True,
)
# drop rows with all NaN values
satellite_df = satellite_df.dropna(how="all")
# drop columns with all NaN values
satellite_df = satellite_df.dropna(axis=1, how="all")
df_list.append(satellite_df)
# merge outer on index of the dataframes
df = reduce(
lambda left, right: pd.merge(
left, right, left_index=True, right_index=True, how="outer"
),
df_list,
)
df.reset_index(inplace=True)
df.index = pd.to_datetime(df["index"], format="%Y-%m")
for column in df.columns:
df[column] = pd.to_numeric(df[column], errors="ignore")
df_numeric = df.select_dtypes(include=["float64"])
fig = px.line(df, y=df_numeric.columns, title="Mean NDVI", markers=True)
fig.update_yaxes(range=[-1, 1])
st.plotly_chart(fig)
st.subheader("Visual Inspection")
_, lonlat = ee_geometry.centroid().getInfo().values()
lon, lat = lonlat
write_info(f"Centroid of the selected geometry (lat, lon): ({lat}, {lon})")
cols = st.columns(2)
df_dates = df.index.strftime("%Y-%m").tolist()
with cols[0]:
date_1 = st.selectbox("Month 1", df_dates, index=0)
with cols[1]:
date_2 = st.selectbox("Month 2", df_dates, index=len(df.index) - 1)
for satellite in satellites:
for col, date in zip(cols, [date_1, date_2]):
if f"Mosaic_{satellite}" not in df.columns:
continue
mosaic = df.loc[pd.to_datetime(date), f"Mosaic_{satellite}"]
with col:
maps = [leaf_folium.Map(), leaf_folium.Map()]
ndvi_layer = gee_folium.ee_tile_layer(mosaic, {"bands": ["NDVI"], "min": 0, "max": 1})
if satellite == "COPERNICUS/S2_SR_HARMONIZED":
min_all = 0
max_all = 255
else:
raise ValueError(f"Unknown satellite: {satellite}")
visual_layer = gee_folium.ee_tile_layer(mosaic, {"bands": ["R", "G", "B"], "min": min_all, "max": max_all})
maps[0].add_layer(
ndvi_layer,
)
maps[1].add_layer(
visual_layer,
)
for m, name in zip(maps, ["NDVI", "Visual"]):
m.add_geojson(feature_collection)
write_info(f"{name}: {satellite} - {date}")
m.to_streamlit()
for col, date in zip(cols, [date_1, date_2]):
esri_date = min(wayback_mapping.keys(), key=lambda x: abs(pd.to_datetime(x) - pd.to_datetime(date)))
with col:
m = leaf_folium.Map()
m.add_tile_layer(wayback_mapping[esri_date], name=f"Esri Wayback Imagery - {esri_date}", attribution="Esri")
m.add_geojson(feature_collection)
write_info(f"Visual Esri Wayback Basemap - {esri_date} (Closest to {date})")
m.to_streamlit()