File size: 7,521 Bytes
d362ca8 e9d1bce d362ca8 e9d1bce d362ca8 a69a345 d362ca8 c6718c6 e9d1bce c6718c6 9d62f11 c6718c6 9d62f11 c6718c6 9d62f11 c6718c6 9d62f11 c6718c6 9d62f11 c6718c6 9d62f11 c6718c6 a69a345 a489e19 a69a345 c6718c6 9d62f11 d362ca8 c6718c6 d362ca8 9d62f11 c6718c6 e9d1bce d362ca8 c6718c6 d362ca8 9d62f11 c6718c6 9d62f11 c6718c6 59879a5 9d62f11 c6718c6 c7a4712 c6718c6 d362ca8 9d62f11 c6718c6 9d62f11 c6718c6 9d62f11 c6718c6 9d62f11 e9d1bce c6718c6 e9d1bce 9d62f11 c6718c6 9d62f11 c6718c6 9d62f11 c6718c6 d362ca8 c6718c6 a69a345 c6718c6 9d62f11 c6718c6 9d62f11 c6718c6 9d62f11 d362ca8 c6718c6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 |
import zipfile
from io import StringIO
import bs4
import geopandas as gpd
import lxml # nosec
import pandas as pd
from shapely.geometry import (
Point,
LineString,
Polygon,
MultiPoint,
MultiLineString,
MultiPolygon,
LinearRing,
)
def parse_descriptions_to_geodf(geodf: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""Parses Descriptions from Google Earth file to a GeoDataFrame object"""
dataframes = []
# Iterate over descriptions and extract data
for desc in geodf["Description"]:
desc_as_io = StringIO(desc)
# Try to read the description into a DataFrame
parsed_html = pd.read_html(desc_as_io)
try:
temp_df = parsed_html[1].T
except IndexError:
temp_df = parsed_html[0].T
# Set DataFrame header and remove the first row
temp_df.columns = temp_df.iloc[0]
temp_df = temp_df.iloc[1:]
dataframes.append(temp_df)
# Combine all DataFrames
combined_df = pd.concat(dataframes, ignore_index=True)
# Add geometry data
combined_df["geometry"] = geodf["geometry"]
# Create a GeoDataFrame with the combined data and original CRS
result_geodf = gpd.GeoDataFrame(combined_df, crs=geodf.crs)
return result_geodf
def swap_coordinates(geometry):
"""
Swap the latitude and longitude of a shapely Point, LineString, Polygon,
MultiPoint, MultiLineString, MultiPolygon, or LinearRing geometry.
Parameters:
- geometry: Shapely geometry (Point, LineString, Polygon, MultiPoint,
MultiLineString, MultiPolygon, or LinearRing)
Returns:
- Shapely geometry with swapped coordinates
"""
def swap_coords(coords):
return [(coord[1], coord[0]) for coord in coords]
if isinstance(geometry, Point):
return Point([geometry.y, geometry.x])
elif isinstance(geometry, MultiPoint):
return MultiPoint(
[Point(swap_coords(point.coords)) for point in geometry.geoms],
)
elif isinstance(geometry, LineString):
return LineString(swap_coords(geometry.coords))
elif isinstance(geometry, MultiLineString):
return MultiLineString(
[LineString(swap_coords(line.coords)) for line in geometry.geoms],
)
elif isinstance(geometry, Polygon):
exterior_coords = swap_coords(geometry.exterior.coords)
interior_coords = [
swap_coords(interior.coords) for interior in geometry.interiors
]
return Polygon(exterior_coords, interior_coords)
elif isinstance(geometry, MultiPolygon):
return MultiPolygon([swap_coordinates(poly) for poly in geometry.geoms])
elif isinstance(geometry, LinearRing):
return LinearRing(swap_coords(geometry.coords))
else:
raise ValueError("Unsupported geometry type")
def load_kmz_as_geodf(file_path: str) -> gpd.GeoDataFrame:
"""Loads a KMZ file into a GeoPandas DataFrame, assuming the KMZ contains one KML file"""
# Open the KMZ file
with zipfile.ZipFile(file_path, "r") as kmz:
# List all KML files in the KMZ
kml_files = [file for file in kmz.namelist() if file.endswith(".kml")]
# Ensure there's only one KML file in the KMZ
if len(kml_files) != 1:
raise IndexError(
"KMZ contains more than one KML. Please extract or convert to multiple KMLs.",
)
# Read the KML file into a GeoDataFrame
geodf = gpd.read_file(
f"zip://{file_path}/{kml_files[0]}",
driver="KML",
engine="pyogrio",
)
return geodf
def load_ge_file(file_path: str) -> gpd.GeoDataFrame:
"""Loads a KML or KMZ file and parses its descriptions into a GeoDataFrame"""
if file_path.endswith(".kml"):
return parse_descriptions_to_geodf(
gpd.read_file(file_path, driver="KML", engine="pyogrio"),
)
elif file_path.endswith(".kmz"):
return parse_descriptions_to_geodf(load_kmz_as_geodf(file_path))
raise ValueError("The file must have a .kml or .kmz extension.")
def extract_data_from_kml_code(kml_code: str) -> pd.DataFrame:
"""Extracts data from KML code into a DataFrame using SimpleData tags, excluding embedded tables in feature descriptions"""
# Parse the KML source code
soup = bs4.BeautifulSoup(kml_code, features="xml")
# Find all SchemaData tags (representing rows)
schema_data_tags = soup.find_all("schemadata")
# Create a generator that yields a dictionary for each row, containing the Placemark name and each SimpleData field
row_dicts = (
{
"Placemark_name": tag.parent.parent.find("name").text
if tag.parent.parent.find("name")
else "[no name]",
**{field.get("name"): field.text for field in tag.find_all("simpledata")},
}
for tag in schema_data_tags
)
# Convert the row dictionaries into a DataFrame
df = pd.DataFrame(row_dicts)
return df
def extract_kml_code_from_file(file_path: str) -> str:
"""Extracts KML source code from a Google Earth file (KML or KMZ)"""
file_extension = file_path.lower().split(".")[-1]
if file_extension == "kml":
with open(file_path, "r") as kml_file:
kml_code = kml_file.read()
elif file_extension == "kmz":
with zipfile.ZipFile(file_path) as kmz_file:
# Find all KML files in the KMZ
kml_files = [
file for file in kmz_file.namelist() if file.lower().endswith(".kml")
]
if len(kml_files) != 1:
raise IndexError(
"KMZ file contains more than one KML. Please extract or convert to multiple KMLs.",
)
with kmz_file.open(kml_files[0]) as kml_file:
# Decode the KML file's content from bytes to string
kml_code = kml_file.read().decode()
else:
raise ValueError("The input file must have a .kml or .kmz extension.")
return kml_code
def extract_data_from_ge_file(file_path: str) -> gpd.GeoDataFrame:
"""Extracts data from a Google Earth file (KML or KMZ) into a GeoDataFrame using SimpleData tags, excluding embedded tables in feature descriptions"""
data_df = extract_data_from_kml_code(extract_kml_code_from_file(file_path))
if file_path.endswith(".kmz"):
ge_file_gdf = load_kmz_as_geodf(file_path)
else:
ge_file_gdf = gpd.read_file(file_path, driver="KML", engine="pyogrio")
geo_df = gpd.GeoDataFrame(
data_df,
geometry=ge_file_gdf["geometry"],
crs=ge_file_gdf.crs,
)
geo_df["geometry"] = geo_df["geometry"].apply(swap_coordinates)
return geo_df
def load_ge_data(file_path: str) -> gpd.GeoDataFrame:
"""Extracts data from a Google Earth file (KML or KMZ) and handles errors due to parsing issues"""
kml_code = extract_kml_code_from_file(file_path)
# Choose the extraction method based on the presence of SimpleData or SimpleField tags in the KML code
primary_func, fallback_func = (
(extract_data_from_ge_file, load_ge_file)
if any(tag in kml_code.lower() for tag in ("<simpledata", "<simplefield"))
else (load_ge_file, extract_data_from_ge_file)
)
try:
data_df = primary_func(file_path)
except (
pd.errors.ParserError,
lxml.etree.ParserError,
lxml.etree.XMLSyntaxError,
ValueError,
):
data_df = fallback_func(file_path)
return data_df
|