import datetime import json import logging import os import duckdb import ee import gradio as gr import pandas as pd import plotly.graph_objects as go import yaml import numpy as np from google.oauth2 import service_account from utils.js import get_window_url_params # Logging logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.INFO) # Define constants DATE = "2020-01-01" YEAR = 2020 LOCATION = [-74.653370, 5.845328] ROI_RADIUS = 20000 GEE_SERVICE_ACCOUNT = ( "climatebase-july-2023@ee-geospatialml-aquarry.iam.gserviceaccount.com" ) INDICES_FILE = "indices.yaml" START_YEAR = 2015 END_YEAR = 2022 class IndexGenerator: """ A class to generate indices and compute zonal means. Args: centroid (tuple): The centroid coordinates (latitude, longitude) of the region of interest. year (int): The year for which indices are generated. roi_radius (int, optional): The radius (in meters) for creating a buffer around the centroid as the region of interest. Defaults to 20000. project_name (str, optional): The name of the project. Defaults to "". map (geemap.Map, optional): Map object for mapping. Defaults to None (i.e. no map created) """ def __init__( self, centroid, roi_radius, year, indices_file, project_name="", map=None, ): self.indices = self._load_indices(indices_file) self.centroid = centroid self.roi = ee.Geometry.Point(*centroid).buffer(roi_radius) self.year = year self.start_date = str(datetime.date(self.year, 1, 1)) self.end_date = str(datetime.date(self.year, 12, 31)) self.daterange = [self.start_date, self.end_date] self.project_name = project_name self.map = map if self.map is not None: self.show = True else: self.show = False def _cloudfree(self, gee_path): """ Internal method to generate a cloud-free composite. Args: gee_path (str): The path to the Google Earth Engine (GEE) image or image collection. Returns: ee.Image: The cloud-free composite clipped to the region of interest. """ # Load a raw Landsat ImageCollection for a single year. collection = ( ee.ImageCollection(gee_path) .filterDate(*self.daterange) .filterBounds(self.roi) ) # Create a cloud-free composite with custom parameters for cloud score threshold and percentile. composite_cloudfree = ee.Algorithms.Landsat.simpleComposite( **{"collection": collection, "percentile": 75, "cloudScoreRange": 5} ) return composite_cloudfree.clip(self.roi) def _load_indices(self, indices_file): # Read index configurations with open(indices_file, "r") as stream: try: return yaml.safe_load(stream) except yaml.YAMLError as e: logging.error(e) return None def show_map(self, map=None): if map is not None: self.map = map self.show = True def disable_map(self): self.show = False def generate_index(self, index_config): """ Generates an index based on the provided index configuration. Args: index_config (dict): Configuration for generating the index. Returns: ee.Image: The generated index clipped to the region of interest. """ match index_config["gee_type"]: case "image": dataset = ee.Image(index_config["gee_path"]).clip(self.roi) if index_config.get("select"): dataset = dataset.select(index_config["select"]) case "image_collection": dataset = ( ee.ImageCollection(index_config["gee_path"]) .filterBounds(self.roi) .map(lambda image: image.clip(self.roi)) .mean() ) if index_config.get("select"): dataset = dataset.select(index_config["select"]) case "feature_collection": dataset = ( ee.Image() .float() .paint( ee.FeatureCollection(index_config["gee_path"]), index_config["select"], ) .clip(self.roi) ) case "algebraic": image = self._cloudfree(index_config["gee_path"]) dataset = image.normalizedDifference(["B4", "B3"]) case _: dataset = None if not dataset: raise Exception("Failed to generate dataset.") if self.show and index_config.get("show"): map.addLayer(dataset, index_config["viz"], index_config["name"]) logging.info(f"Generated index: {index_config['name']}") return dataset def zonal_mean_index(self, index_key): index_config = self.indices[index_key] dataset = self.generate_index(index_config) # zm = self._zonal_mean(single, index_config.get('bandname') or 'constant') out = dataset.reduceRegion( **{ "reducer": ee.Reducer.mean(), "geometry": self.roi, "scale": 200, # map scale } ).getInfo() if index_config.get("bandname"): return out[index_config.get("bandname")] return out def generate_composite_index_df(self, indices=[]): data = { "metric": indices, "year": self.year, "centroid": str(self.centroid), "project_name": self.project_name, "value": list(map(self.zonal_mean_index, indices)), "area": self.roi.area().getInfo(), # m^2 "geojson": str(self.roi.getInfo()), # to-do: coefficient } logging.info("data", data) df = pd.DataFrame(data) return df def set_up_duckdb(): logging.info("set up duckdb") # use `climatebase` db if not os.getenv("motherduck_token"): raise Exception( "No motherduck token found. Please set the `motherduck_token` environment variable." ) else: con = duckdb.connect("md:climatebase") con.sql("USE climatebase;") # load extensions con.sql("""INSTALL spatial; LOAD spatial;""") return con def authenticate_ee(ee_service_account): """ Huggingface Spaces does not support secret files, therefore authenticate with an environment variable containing the JSON. """ logging.info("authenticate_ee") credentials = ee.ServiceAccountCredentials( ee_service_account, key_data=os.environ["ee_service_account"] ) ee.Initialize(credentials) def load_indices(indices_file): # Read index configurations with open(indices_file, "r") as stream: try: return yaml.safe_load(stream) except yaml.YAMLError as e: logging.error(e) return None def create_dataframe(years, project_name): dfs = [] logging.info(years) indices = load_indices(INDICES_FILE) for year in years: logging.info(year) ig = IndexGenerator( centroid=LOCATION, roi_radius=ROI_RADIUS, year=year, indices_file=INDICES_FILE, project_name=project_name, ) df = ig.generate_composite_index_df(list(indices.keys())) dfs.append(df) return pd.concat(dfs) # h/t: https://community.plotly.com/t/dynamic-zoom-for-mapbox/32658/12 def get_plotting_zoom_level_and_center_coordinates_from_lonlat_tuples(longitudes=None, latitudes=None): """Function documentation:\n Basic framework adopted from Krichardson under the following thread: https://community.plotly.com/t/dynamic-zoom-for-mapbox/32658/7 # NOTE: # THIS IS A TEMPORARY SOLUTION UNTIL THE DASH TEAM IMPLEMENTS DYNAMIC ZOOM # in their plotly-functions associated with mapbox, such as go.Densitymapbox() etc. Returns the appropriate zoom-level for these plotly-mapbox-graphics along with the center coordinate tuple of all provided coordinate tuples. """ # Check whether both latitudes and longitudes have been passed, # or if the list lenghts don't match if ((latitudes is None or longitudes is None) or (len(latitudes) != len(longitudes))): # Otherwise, return the default values of 0 zoom and the coordinate origin as center point return 0, (0, 0) # Get the boundary-box b_box = {} b_box['height'] = latitudes.max()-latitudes.min() b_box['width'] = longitudes.max()-longitudes.min() b_box['center']= (np.mean(longitudes), np.mean(latitudes)) # get the area of the bounding box in order to calculate a zoom-level area = b_box['height'] * b_box['width'] # * 1D-linear interpolation with numpy: # - Pass the area as the only x-value and not as a list, in order to return a scalar as well # - The x-points "xp" should be in parts in comparable order of magnitude of the given area # - The zpom-levels are adapted to the areas, i.e. start with the smallest area possible of 0 # which leads to the highest possible zoom value 20, and so forth decreasing with increasing areas # as these variables are antiproportional zoom = np.interp(x=area, xp=[0, 5**-10, 4**-10, 3**-10, 2**-10, 1**-10, 1**-5], fp=[20, 15, 14, 13, 12, 7, 5]) # Finally, return the zoom level and the associated boundary-box center coordinates return zoom, b_box['center'] def show_project_map(project_name): prepared_statement = \ con.execute("SELECT geometry FROM project WHERE name = ? LIMIT 1", [project_name]).fetchall() features = \ json.loads(prepared_statement[0][0].replace("\'", "\""))['features'] geometry = features[0]['geometry'] longitudes = np.array(geometry["coordinates"])[0, :, 0] latitudes = np.array(geometry["coordinates"])[0, :, 1] zoom, bbox_center = get_plotting_zoom_level_and_center_coordinates_from_lonlat_tuples(longitudes, latitudes) fig = go.Figure(go.Scattermapbox( mode = "markers", lon = [bbox_center[0]], lat = [bbox_center[1]], marker = {'size': 20, 'color': ["cyan"]})) fig.update_layout( mapbox = { 'style': "stamen-terrain", 'center': { 'lon': bbox_center[0], 'lat': bbox_center[1]}, 'zoom': zoom, 'layers': [{ 'source': { 'type': "FeatureCollection", 'features': [{ 'type': "Feature", 'geometry': geometry }] }, 'type': "fill", 'below': "traces", 'color': "royalblue"}]}, margin = {'l':0, 'r':0, 'b':0, 't':0}) return fig # minMax.getInfo() def calculate_biodiversity_score(start_year, end_year, project_name): years = [] for year in range(start_year, end_year): row_exists = \ con.execute("SELECT COUNT(1) FROM bioindicator WHERE (year = ? AND project_name = ?)", [year, project_name]).fetchall()[0][0] if not row_exists: years.append(year) if len(years) > 0: df = create_dataframe(years, project_name) # Write score table to `_temptable` con.sql( "CREATE OR REPLACE TABLE _temptable AS SELECT *, (value * area) AS score FROM (SELECT year, project_name, AVG(value) AS value, area FROM df GROUP BY year, project_name, area ORDER BY project_name)" ) # Create `bioindicator` table IF NOT EXISTS. con.sql( """ USE climatebase; CREATE TABLE IF NOT EXISTS bioindicator (year BIGINT, project_name VARCHAR(255), value DOUBLE, area DOUBLE, score DOUBLE, CONSTRAINT unique_year_project_name UNIQUE (year, project_name)); """) # UPSERT project record con.sql( """ INSERT INTO bioindicator FROM _temptable ON CONFLICT (year, project_name) DO UPDATE SET value = excluded.value; """ ) logging.info("upsert records into motherduck") scores = \ con.execute("SELECT * FROM bioindicator WHERE (year >= ? AND year <= ? AND project_name = ?)", [start_year, end_year, project_name]).df() return scores def motherduck_list_projects(author_id): return \ con.execute("SELECT DISTINCT name FROM project WHERE authorId = ? AND geometry != 'null'", [author_id]).df() with gr.Blocks() as demo: # Environment setup authenticate_ee(GEE_SERVICE_ACCOUNT) con = set_up_duckdb() with gr.Column(): m1 = gr.Plot() with gr.Row(): project_name = gr.Dropdown([], label="Project", value="Select project") start_year = gr.Number(value=2017, label="Start Year", precision=0) end_year = gr.Number(value=2022, label="End Year", precision=0) with gr.Row(): view_btn = gr.Button(value="Show project map") calc_btn = gr.Button(value="Calculate!") # save_btn = gr.Button(value="Save") results_df = gr.Dataframe( headers=["Year", "Project Name", "Score"], datatype=["number", "str", "number"], label="Biodiversity scores by year", ) calc_btn.click( calculate_biodiversity_score, inputs=[start_year, end_year, project_name], outputs=results_df, ) view_btn.click( fn=show_project_map, inputs=[project_name], outputs=[m1], ) def update_project_dropdown_list(url_params): username = url_params.get("username", "default") projects = motherduck_list_projects(author_id=username) # to-do: filter projects based on user return gr.Dropdown.update(choices=projects["name"].tolist()) # Get url params url_params = gr.JSON({"username": "default"}, visible=False, label="URL Params") # Gradio has a bug # For dropdown to update by demo.load, dropdown value must be called downstream b1 = gr.Button("Hidden button that fixes bug.", visible=False) b1.click(lambda x: x, inputs=project_name, outputs=[]) # Update project dropdown list on page load demo.load( fn=update_project_dropdown_list, inputs=[url_params], outputs=[project_name], _js=get_window_url_params, queue=False, ) demo.launch()