File size: 7,520 Bytes
d362ca8
e9d1bce
d362ca8
 
 
e9d1bce
d362ca8
a69a345
 
 
 
 
 
 
 
 
d362ca8
 
c6718c6
 
e9d1bce
c6718c6
9d62f11
c6718c6
 
 
9d62f11
c6718c6
 
 
 
 
 
9d62f11
c6718c6
 
 
9d62f11
c6718c6
9d62f11
c6718c6
 
9d62f11
c6718c6
 
 
 
 
 
 
 
 
a69a345
 
a489e19
a69a345
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c6718c6
 
 
 
 
 
 
 
 
9d62f11
d362ca8
c6718c6
d362ca8
9d62f11
c6718c6
 
 
e9d1bce
 
 
d362ca8
c6718c6
d362ca8
9d62f11
c6718c6
 
 
 
 
 
 
 
 
9d62f11
 
 
c6718c6
 
 
 
9d62f11
c6718c6
 
 
 
 
 
c7a4712
 
 
c6718c6
 
 
d362ca8
9d62f11
c6718c6
 
 
 
9d62f11
 
c6718c6
 
 
9d62f11
 
 
c6718c6
 
 
 
 
 
 
 
9d62f11
e9d1bce
 
c6718c6
e9d1bce
9d62f11
c6718c6
 
 
 
 
 
 
9d62f11
 
c6718c6
 
 
9d62f11
 
c6718c6
d362ca8
c6718c6
 
 
 
 
 
 
a69a345
c6718c6
 
9d62f11
c6718c6
 
9d62f11
c6718c6
 
 
 
 
 
 
 
9d62f11
d362ca8
c6718c6
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
import zipfile
from io import StringIO

import bs4
import geopandas as gpd
import lxml  # nosec
import pandas as pd
from shapely.geometry import (
    Point,
    LineString,
    Polygon,
    MultiPoint,
    MultiLineString,
    MultiPolygon,
    LinearRing,
)


def parse_descriptions_to_geodf(geodf: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
    """Parses Descriptions from Google Earth file to a GeoDataFrame object"""

    dataframes = []

    # Iterate over descriptions and extract data
    for desc in geodf["Description"]:
        desc_as_io = StringIO(desc)

        # Try to read the description into a DataFrame
        parsed_html = pd.read_html(desc_as_io)
        try:
            temp_df = parsed_html[1].T
        except IndexError:
            temp_df = parsed_html[0].T

        # Set DataFrame header and remove the first row
        temp_df.columns = temp_df.iloc[0]
        temp_df = temp_df.iloc[1:]

        dataframes.append(temp_df)

    # Combine all DataFrames
    combined_df = pd.concat(dataframes, ignore_index=True)

    # Add geometry data
    combined_df["geometry"] = geodf["geometry"]

    # Create a GeoDataFrame with the combined data and original CRS
    result_geodf = gpd.GeoDataFrame(combined_df, crs=geodf.crs)

    return result_geodf


def swap_coordinates(geometry):
    """
    Swap the latitude and longitude of a shapely Point, LineString, Polygon,
    MultiPoint, MultiLineString, MultiPolygon, or LinearRing geometry.

    Parameters:
    - geometry: Shapely geometry (Point, LineString, Polygon, MultiPoint,
                MultiLineString, MultiPolygon, or LinearRing)

    Returns:
    - Shapely geometry with swapped coordinates
    """

    def swap_coords(coords):
        return [(coord[1], coord[0]) for coord in coords]

    if isinstance(geometry, Point):
        return Point([geometry.y, geometry.x])
    elif isinstance(geometry, MultiPoint):
        return MultiPoint(
            [Point(swap_coords(point.coords)) for point in geometry.geoms],
        )
    elif isinstance(geometry, LineString):
        return LineString(swap_coords(geometry.coords))
    elif isinstance(geometry, MultiLineString):
        return MultiLineString(
            [LineString(swap_coords(line.coords)) for line in geometry.geoms],
        )
    elif isinstance(geometry, Polygon):
        exterior_coords = swap_coords(geometry.exterior.coords)
        interior_coords = [
            swap_coords(interior.coords) for interior in geometry.interiors
        ]
        return Polygon(exterior_coords, interior_coords)
    elif isinstance(geometry, MultiPolygon):
        return MultiPolygon([swap_coordinates(poly) for poly in geometry.geoms])
    elif isinstance(geometry, LinearRing):
        return LinearRing(swap_coords(geometry.coords))
    else:
        raise ValueError("Unsupported geometry type")


def load_kmz_as_geodf(file_path: str) -> gpd.GeoDataFrame:
    """Loads a KMZ file into a GeoPandas DataFrame, assuming the KMZ contains one KML file"""

    # Open the KMZ file
    with zipfile.ZipFile(file_path, "r") as kmz:
        # List all KML files in the KMZ
        kml_files = [file for file in kmz.namelist() if file.endswith(".kml")]

    # Ensure there's only one KML file in the KMZ
    if len(kml_files) != 1:
        raise IndexError(
            "KMZ contains more than one KML. Please extract or convert to multiple KMLs.",
        )

    # Read the KML file into a GeoDataFrame
    geodf = gpd.read_file(
        f"zip://{file_path}/{kml_files[0]}",
        driver="KML",
        engine="pyogrio",
    )

    return geodf


def load_ge_file(file_path: str) -> gpd.GeoDataFrame:
    """Loads a KML or KMZ file and parses its descriptions into a GeoDataFrame"""
    if file_path.endswith(".kml"):
        return parse_descriptions_to_geodf(
            gpd.read_file(file_path, driver="KML", engine="pyogrio"),
        )
    elif file_path.endswith(".kmz"):
        return parse_descriptions_to_geodf(load_kmz_as_geodf(file_path))
    raise ValueError("The file must have a .kml or .kmz extension.")


def extract_data_from_kml_code(kml_code: str) -> pd.DataFrame:
    """Extracts data from KML code into a DataFrame using SimpleData tags, excluding embedded tables in feature descriptions"""

    # Parse the KML source code
    soup = bs4.BeautifulSoup(kml_code, "html.parser")

    # Find all SchemaData tags (representing rows)
    schema_data_tags = soup.find_all("schemadata")

    # Create a generator that yields a dictionary for each row, containing the Placemark name and each SimpleData field
    row_dicts = (
        {
            "Placemark_name": tag.parent.parent.find("name").text
            if tag.parent.parent.find("name")
            else "[no name]",
            **{field.get("name"): field.text for field in tag.find_all("simpledata")},
        }
        for tag in schema_data_tags
    )

    # Convert the row dictionaries into a DataFrame
    df = pd.DataFrame(row_dicts)

    return df


def extract_kml_code_from_file(file_path: str) -> str:
    """Extracts KML source code from a Google Earth file (KML or KMZ)"""

    file_extension = file_path.lower().split(".")[-1]

    if file_extension == "kml":
        with open(file_path, "r") as kml_file:
            kml_code = kml_file.read()
    elif file_extension == "kmz":
        with zipfile.ZipFile(file_path) as kmz_file:
            # Find all KML files in the KMZ
            kml_files = [
                file for file in kmz_file.namelist() if file.lower().endswith(".kml")
            ]

            if len(kml_files) != 1:
                raise IndexError(
                    "KMZ file contains more than one KML. Please extract or convert to multiple KMLs.",
                )

            with kmz_file.open(kml_files[0]) as kml_file:
                # Decode the KML file's content from bytes to string
                kml_code = kml_file.read().decode()
    else:
        raise ValueError("The input file must have a .kml or .kmz extension.")

    return kml_code


def extract_data_from_ge_file(file_path: str) -> gpd.GeoDataFrame:
    """Extracts data from a Google Earth file (KML or KMZ) into a GeoDataFrame using SimpleData tags, excluding embedded tables in feature descriptions"""
    data_df = extract_data_from_kml_code(extract_kml_code_from_file(file_path))

    if file_path.endswith(".kmz"):
        ge_file_gdf = load_kmz_as_geodf(file_path)
    else:
        ge_file_gdf = gpd.read_file(file_path, driver="KML", engine="pyogrio")

    geo_df = gpd.GeoDataFrame(
        data_df,
        geometry=ge_file_gdf["geometry"],
        crs=ge_file_gdf.crs,
    )
    geo_df["geometry"] = geo_df["geometry"].apply(swap_coordinates)
    return geo_df


def load_ge_data(file_path: str) -> gpd.GeoDataFrame:
    """Extracts data from a Google Earth file (KML or KMZ) and handles errors due to parsing issues"""

    kml_code = extract_kml_code_from_file(file_path)

    # Choose the extraction method based on the presence of SimpleData or SimpleField tags in the KML code
    primary_func, fallback_func = (
        (extract_data_from_ge_file, load_ge_file)
        if any(tag in kml_code.lower() for tag in ("<simpledata", "<simplefield"))
        else (load_ge_file, extract_data_from_ge_file)
    )

    try:
        data_df = primary_func(file_path)
    except (
        pd.errors.ParserError,
        lxml.etree.ParserError,
        lxml.etree.XMLSyntaxError,
        ValueError,
    ):
        data_df = fallback_func(file_path)

    return data_df