alessandro trinca tornidor
feat: use everywhere a single app_logger instance defined in samgis_lisa_on_zero.__init__
bb6e1c0
raw
history blame
7.79 kB
"""lambda helper functions"""
from typing import Dict
from lisa_on_cuda.utils import session_logger
from lisa_on_cuda.utils.app_helpers import get_cleaned_input
from xyzservices import providers, TileProvider
from samgis_lisa_on_zero import app_logger
from samgis_lisa_on_zero.io_package.coordinates_pixel_conversion import get_latlng_to_pixel_coordinates
from samgis_lisa_on_zero.utilities.constants import COMPLETE_URL_TILES_MAPBOX, COMPLETE_URL_TILES_NEXTZEN
from samgis_lisa_on_zero.utilities.type_hints import (
ApiRequestBody, XYZTerrainProvidersNames, XYZDefaultProvidersNames, StringPromptApiRequestBody)
@session_logger.set_uuid_logging
def get_parsed_bbox_points_with_string_prompt(request_input: StringPromptApiRequestBody) -> Dict:
"""
Parse the raw input request into bbox, prompt string and zoom
Args:
request_input: input dict
Returns:
dict with bounding box, prompt string and zoom
"""
app_logger.info(f"try to parsing input request: {type(request_input)}, {request_input}...")
if isinstance(request_input, str):
app_logger.info(f"string/json input, parsing it to {type(StringPromptApiRequestBody)}...")
request_input = StringPromptApiRequestBody.model_validate_json(request_input)
app_logger.info(f"parsed input, now of type {type(request_input)}...")
bbox = request_input.bbox
app_logger.debug(f"request bbox: {type(bbox)}, value:{bbox}.")
ne = bbox.ne
sw = bbox.sw
app_logger.debug(f"request ne: {type(ne)}, value:{ne}.")
app_logger.debug(f"request sw: {type(sw)}, value:{sw}.")
ne_latlng = [float(ne.lat), float(ne.lng)]
sw_latlng = [float(sw.lat), float(sw.lng)]
new_zoom = int(request_input.zoom)
cleaned_prompt = get_cleaned_input(request_input.string_prompt)
app_logger.debug(f"bbox => {bbox}.")
app_logger.debug(f'request_input-prompt cleaned => {cleaned_prompt}.')
app_logger.info("unpacking elaborated request...")
return {
"bbox": [ne_latlng, sw_latlng],
"prompt": cleaned_prompt,
"zoom": new_zoom,
"source": get_url_tile(request_input.source_type),
"source_name": get_source_name(request_input.source_type)
}
@session_logger.set_uuid_logging
def get_parsed_bbox_points_with_dictlist_prompt(request_input: ApiRequestBody) -> Dict:
"""
Parse the raw input request into bbox, prompt and zoom
Args:
request_input: input dict
Returns:
dict with bounding box, prompt and zoom
"""
app_logger.info(f"try to parsing input request {request_input}...")
bbox = request_input.bbox
app_logger.debug(f"request bbox: {type(bbox)}, value:{bbox}.")
ne = bbox.ne
sw = bbox.sw
app_logger.debug(f"request ne: {type(ne)}, value:{ne}.")
app_logger.debug(f"request sw: {type(sw)}, value:{sw}.")
ne_latlng = [float(ne.lat), float(ne.lng)]
sw_latlng = [float(sw.lat), float(sw.lng)]
new_zoom = int(request_input.zoom)
new_prompt_list = _get_parsed_prompt_list(ne, sw, new_zoom, request_input.prompt)
app_logger.debug(f"bbox => {bbox}.")
app_logger.debug(f'request_input-prompt updated => {new_prompt_list}.')
app_logger.info("unpacking elaborated request...")
return {
"bbox": [ne_latlng, sw_latlng],
"prompt": new_prompt_list,
"zoom": new_zoom,
"source": get_url_tile(request_input.source_type),
"source_name": get_source_name(request_input.source_type)
}
@session_logger.set_uuid_logging
def _get_parsed_prompt_list(bbox_ne, bbox_sw, zoom, prompt_list):
new_prompt_list = []
for prompt in prompt_list:
app_logger.debug(f"current prompt: {type(prompt)}, value:{prompt}.")
new_prompt = {"type": prompt.type.value}
if prompt.type == "point":
new_prompt_data = _get_new_prompt_data_point(bbox_ne, bbox_sw, prompt, zoom)
new_prompt["label"] = prompt.label.value
elif prompt.type == "rectangle":
new_prompt_data = _get_new_prompt_data_rectangle(bbox_ne, bbox_sw, prompt, zoom)
else:
msg = "Valid prompt type: 'point' or 'rectangle', not '{}'. Check ApiRequestBody parsing/validation."
raise TypeError(msg.format(prompt.type))
app_logger.debug(f"new_prompt_data: {type(new_prompt_data)}, value:{new_prompt_data}.")
new_prompt["data"] = new_prompt_data
new_prompt_list.append(new_prompt)
return new_prompt_list
@session_logger.set_uuid_logging
def _get_new_prompt_data_point(bbox_ne, bbox_sw, prompt, zoom):
current_point = get_latlng_to_pixel_coordinates(bbox_ne, bbox_sw, prompt.data, zoom, prompt.type)
app_logger.debug(f"current prompt: {type(current_point)}, value:{current_point}, label: {prompt.label}.")
return [current_point['x'], current_point['y']]
@session_logger.set_uuid_logging
def _get_new_prompt_data_rectangle(bbox_ne, bbox_sw, prompt, zoom):
current_point_ne = get_latlng_to_pixel_coordinates(bbox_ne, bbox_sw, prompt.data.ne, zoom, prompt.type)
app_logger.debug(
f"rectangle:: current_point_ne prompt: {type(current_point_ne)}, value:{current_point_ne}.")
current_point_sw = get_latlng_to_pixel_coordinates(bbox_ne, bbox_sw, prompt.data.sw, zoom, prompt.type)
app_logger.debug(
f"rectangle:: current_point_sw prompt: {type(current_point_sw)}, value:{current_point_sw}.")
# correct order for rectangle prompt
return [
current_point_sw["x"],
current_point_ne["y"],
current_point_ne["x"],
current_point_sw["y"]
]
mapbox_terrain_rgb = TileProvider(
name=XYZTerrainProvidersNames.MAPBOX_TERRAIN_TILES_NAME,
url=COMPLETE_URL_TILES_MAPBOX,
attribution=""
)
nextzen_terrain_rgb = TileProvider(
name=XYZTerrainProvidersNames.NEXTZEN_TERRAIN_TILES_NAME,
url=COMPLETE_URL_TILES_NEXTZEN,
attribution=""
)
@session_logger.set_uuid_logging
def get_url_tile(source_type: str):
try:
match source_type.lower():
case XYZDefaultProvidersNames.DEFAULT_TILES_NAME_SHORT:
return providers.query_name(XYZDefaultProvidersNames.DEFAULT_TILES_NAME)
case XYZTerrainProvidersNames.MAPBOX_TERRAIN_TILES_NAME:
return mapbox_terrain_rgb
case XYZTerrainProvidersNames.NEXTZEN_TERRAIN_TILES_NAME:
app_logger.info("nextzen_terrain_rgb:", nextzen_terrain_rgb)
return nextzen_terrain_rgb
case _:
return providers.query_name(source_type)
except ValueError as ve:
from pydantic_core import ValidationError
app_logger.error("ve:", str(ve))
raise ValidationError(ve)
def check_source_type_is_terrain(source: str | TileProvider):
return isinstance(source, TileProvider) and source.name in list(XYZTerrainProvidersNames)
@session_logger.set_uuid_logging
def get_source_name(source: str | TileProvider) -> str | bool:
try:
match source.lower():
case XYZDefaultProvidersNames.DEFAULT_TILES_NAME_SHORT:
source_output = providers.query_name(XYZDefaultProvidersNames.DEFAULT_TILES_NAME)
case _:
source_output = providers.query_name(source)
if isinstance(source_output, str):
return source_output
try:
source_dict = dict(source_output)
app_logger.info(f"source_dict:{type(source_dict)}, {'name' in source_dict}, source_dict:{source_dict}.")
return source_dict["name"]
except KeyError as ke:
app_logger.error(f"ke:{ke}.")
except ValueError as ve:
app_logger.info(f"source name::{source}, ve:{ve}.")
app_logger.info(f"source name::{source}.")
return False