|
import zipfile |
|
from io import StringIO |
|
|
|
import bs4 |
|
import geopandas as gpd |
|
import lxml |
|
import pandas as pd |
|
from shapely.geometry import ( |
|
Point, |
|
LineString, |
|
Polygon, |
|
MultiPoint, |
|
MultiLineString, |
|
MultiPolygon, |
|
LinearRing, |
|
) |
|
|
|
|
|
def parse_descriptions_to_geodf(geodf: gpd.GeoDataFrame) -> gpd.GeoDataFrame: |
|
"""Parses Descriptions from Google Earth file to a GeoDataFrame object""" |
|
|
|
dataframes = [] |
|
|
|
|
|
for desc in geodf["Description"]: |
|
desc_as_io = StringIO(desc) |
|
|
|
|
|
parsed_html = pd.read_html(desc_as_io) |
|
try: |
|
temp_df = parsed_html[1].T |
|
except IndexError: |
|
temp_df = parsed_html[0].T |
|
|
|
|
|
temp_df.columns = temp_df.iloc[0] |
|
temp_df = temp_df.iloc[1:] |
|
|
|
dataframes.append(temp_df) |
|
|
|
|
|
combined_df = pd.concat(dataframes, ignore_index=True) |
|
|
|
|
|
combined_df["geometry"] = geodf["geometry"] |
|
|
|
|
|
result_geodf = gpd.GeoDataFrame(combined_df, crs=geodf.crs) |
|
|
|
return result_geodf |
|
|
|
|
|
def swap_coordinates(geometry): |
|
""" |
|
Swap the latitude and longitude of a shapely Point, LineString, Polygon, |
|
MultiPoint, MultiLineString, MultiPolygon, or LinearRing geometry. |
|
|
|
Parameters: |
|
- geometry: Shapely geometry (Point, LineString, Polygon, MultiPoint, |
|
MultiLineString, MultiPolygon, or LinearRing) |
|
|
|
Returns: |
|
- Shapely geometry with swapped coordinates |
|
""" |
|
|
|
def swap_coords(coords): |
|
return [(coord[1], coord[0]) for coord in coords] |
|
|
|
if isinstance(geometry, Point): |
|
return Point([geometry.y, geometry.x]) |
|
elif isinstance(geometry, MultiPoint): |
|
return MultiPoint( |
|
[Point(swap_coords(point.coords)) for point in geometry.geoms], |
|
) |
|
elif isinstance(geometry, LineString): |
|
return LineString(swap_coords(geometry.coords)) |
|
elif isinstance(geometry, MultiLineString): |
|
return MultiLineString( |
|
[LineString(swap_coords(line.coords)) for line in geometry.geoms], |
|
) |
|
elif isinstance(geometry, Polygon): |
|
exterior_coords = swap_coords(geometry.exterior.coords) |
|
interior_coords = [ |
|
swap_coords(interior.coords) for interior in geometry.interiors |
|
] |
|
return Polygon(exterior_coords, interior_coords) |
|
elif isinstance(geometry, MultiPolygon): |
|
return MultiPolygon([swap_coordinates(poly) for poly in geometry.geoms]) |
|
elif isinstance(geometry, LinearRing): |
|
return LinearRing(swap_coords(geometry.coords)) |
|
else: |
|
raise ValueError("Unsupported geometry type") |
|
|
|
|
|
def load_kmz_as_geodf(file_path: str) -> gpd.GeoDataFrame: |
|
"""Loads a KMZ file into a GeoPandas DataFrame, assuming the KMZ contains one KML file""" |
|
|
|
|
|
with zipfile.ZipFile(file_path, "r") as kmz: |
|
|
|
kml_files = [file for file in kmz.namelist() if file.endswith(".kml")] |
|
|
|
|
|
if len(kml_files) != 1: |
|
raise IndexError( |
|
"KMZ contains more than one KML. Please extract or convert to multiple KMLs.", |
|
) |
|
|
|
|
|
geodf = gpd.read_file( |
|
f"zip://{file_path}/{kml_files[0]}", |
|
driver="KML", |
|
engine="pyogrio", |
|
) |
|
|
|
return geodf |
|
|
|
|
|
def load_ge_file(file_path: str) -> gpd.GeoDataFrame: |
|
"""Loads a KML or KMZ file and parses its descriptions into a GeoDataFrame""" |
|
if file_path.endswith(".kml"): |
|
return parse_descriptions_to_geodf( |
|
gpd.read_file(file_path, driver="KML", engine="pyogrio"), |
|
) |
|
elif file_path.endswith(".kmz"): |
|
return parse_descriptions_to_geodf(load_kmz_as_geodf(file_path)) |
|
raise ValueError("The file must have a .kml or .kmz extension.") |
|
|
|
|
|
def extract_data_from_kml_code(kml_code: str) -> pd.DataFrame: |
|
"""Extracts data from KML code into a DataFrame using SimpleData tags, excluding embedded tables in feature descriptions""" |
|
|
|
|
|
soup = bs4.BeautifulSoup(kml_code, "html.parser") |
|
|
|
|
|
schema_data_tags = soup.find_all("schemadata") |
|
|
|
|
|
row_dicts = ( |
|
{ |
|
"Placemark_name": tag.parent.parent.find("name").text |
|
if tag.parent.parent.find("name") |
|
else "[no name]", |
|
**{field.get("name"): field.text for field in tag.find_all("simpledata")}, |
|
} |
|
for tag in schema_data_tags |
|
) |
|
|
|
|
|
df = pd.DataFrame(row_dicts) |
|
|
|
return df |
|
|
|
|
|
def extract_kml_code_from_file(file_path: str) -> str: |
|
"""Extracts KML source code from a Google Earth file (KML or KMZ)""" |
|
|
|
file_extension = file_path.lower().split(".")[-1] |
|
|
|
if file_extension == "kml": |
|
with open(file_path, "r") as kml_file: |
|
kml_code = kml_file.read() |
|
elif file_extension == "kmz": |
|
with zipfile.ZipFile(file_path) as kmz_file: |
|
|
|
kml_files = [ |
|
file for file in kmz_file.namelist() if file.lower().endswith(".kml") |
|
] |
|
|
|
if len(kml_files) != 1: |
|
raise IndexError( |
|
"KMZ file contains more than one KML. Please extract or convert to multiple KMLs.", |
|
) |
|
|
|
with kmz_file.open(kml_files[0]) as kml_file: |
|
|
|
kml_code = kml_file.read().decode() |
|
else: |
|
raise ValueError("The input file must have a .kml or .kmz extension.") |
|
|
|
return kml_code |
|
|
|
|
|
def extract_data_from_ge_file(file_path: str) -> gpd.GeoDataFrame: |
|
"""Extracts data from a Google Earth file (KML or KMZ) into a GeoDataFrame using SimpleData tags, excluding embedded tables in feature descriptions""" |
|
data_df = extract_data_from_kml_code(extract_kml_code_from_file(file_path)) |
|
|
|
if file_path.endswith(".kmz"): |
|
ge_file_gdf = load_kmz_as_geodf(file_path) |
|
else: |
|
ge_file_gdf = gpd.read_file(file_path, driver="KML", engine="pyogrio") |
|
|
|
geo_df = gpd.GeoDataFrame( |
|
data_df, |
|
geometry=ge_file_gdf["geometry"], |
|
crs=ge_file_gdf.crs, |
|
) |
|
geo_df["geometry"] = geo_df["geometry"].apply(swap_coordinates) |
|
return geo_df |
|
|
|
|
|
def load_ge_data(file_path: str) -> gpd.GeoDataFrame: |
|
"""Extracts data from a Google Earth file (KML or KMZ) and handles errors due to parsing issues""" |
|
|
|
kml_code = extract_kml_code_from_file(file_path) |
|
|
|
|
|
primary_func, fallback_func = ( |
|
(extract_data_from_ge_file, load_ge_file) |
|
if any(tag in kml_code.lower() for tag in ("<simpledata", "<simplefield")) |
|
else (load_ge_file, extract_data_from_ge_file) |
|
) |
|
|
|
try: |
|
data_df = primary_func(file_path) |
|
except ( |
|
pd.errors.ParserError, |
|
lxml.etree.ParserError, |
|
lxml.etree.XMLSyntaxError, |
|
ValueError, |
|
): |
|
data_df = fallback_func(file_path) |
|
|
|
return data_df |
|
|