calculator / utils /duckdb_queries.py
mbuuck's picture
Added coefficients/normalizations. Added rounding.
12fd562
raw
history blame
No virus
2.83 kB
import json
import os
import duckdb
# Configure DuckDB connection
if not os.getenv("motherduck_token"):
raise Exception(
"No motherduck token found. Please set the `motherduck_token` environment variable."
)
else:
con = duckdb.connect("md:climatebase")
con.sql("USE climatebase;")
# load extensions
con.sql("""INSTALL spatial; LOAD spatial;""")
# to-do: pass con through decorator
def list_projects_by_author(author_id):
return con.execute(
"SELECT DISTINCT name FROM project WHERE (authorId = ? OR authorId = 'default') AND (geometry IS NOT NULL)",
[author_id],
).df()
def get_project_geometry(project_name):
return con.execute(
"SELECT geometry FROM project WHERE name = ? LIMIT 1", [project_name]
).fetchall()
def get_project_centroid(project_name):
# Workaround to get centroid of project
# To-do: refactor to only use DuckDB spatial extension
_geom = get_project_geometry(project_name)
_polygon = json.dumps(json.loads(_geom[0][0])["features"][0]["geometry"])
return con.sql(
f"SELECT ST_X(ST_Centroid(ST_GeomFromGeoJSON('{_polygon}'))) AS longitude, ST_Y(ST_Centroid(ST_GeomFromGeoJSON('{_polygon}'))) AS latitude;"
).fetchall()[0]
def get_project_scores(project_name, start_year, end_year):
return con.execute(
"SELECT * FROM bioindicator WHERE (year >= ? AND year <= ? AND project_name = ?)",
[start_year, end_year, project_name],
).df()
def check_if_table_exists(table_name):
tables = con.execute("SHOW TABLES;").fetchall()
for i in range(len(tables)):
tables[i] = tables[i][0]
return table_name in tables
def check_if_project_exists_for_year(project_name, year):
return con.execute(
"SELECT COUNT(1) FROM bioindicator WHERE (year = ? AND project_name = ?)",
[year, project_name],
).fetchall()[0][0]
def write_score_to_temptable(df):
con.sql(
"CREATE OR REPLACE TABLE _temptable AS SELECT *, (value * area) AS score FROM (SELECT year, project_name, metric, AVG(value * coefficient) AS value, area FROM df GROUP BY year, project_name, metric, area ORDER BY project_name, metric)"
)
return True
def get_or_create_bioindicator_table():
con.sql(
"""
USE climatebase;
CREATE TABLE IF NOT EXISTS bioindicator (year BIGINT, project_name VARCHAR(255), metric VARCHAR(255), value DOUBLE, area DOUBLE, score DOUBLE, CONSTRAINT unique_year_project_name_metric UNIQUE (year, project_name, metric));
"""
)
return True
def upsert_project_record():
con.sql(
"""
INSERT INTO bioindicator FROM _temptable
ON CONFLICT (year, project_name, metric) DO UPDATE SET value = excluded.value;
"""
)
return True