Spaces:
Sleeping
Sleeping
File size: 5,876 Bytes
baf9496 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
import geopandas as gpd
from shapely.ops import unary_union
import xml.etree.ElementTree as ET
import shapefile
from shapely.geometry import Polygon, MultiPolygon
import sys
# Input shapefile
input_shapefile = sys.argv[1]
# Input XML file
input_xml = sys.argv[2]
# Output shapefile
output_shapefile = sys.argv[3]
# Create shapefile writer for the output
w = shapefile.Writer(output_shapefile)
w.autoBalance = 1
# Step 1: Load the shapefile and get the merged polygon with exterior boundary
gdf = gpd.read_file(input_shapefile)
# Step 1.1: Identify and fix invalid geometries
gdf['valid'] = gdf.is_valid
# Print invalid geometries
invalid_geometries = gdf[~gdf['valid']]
if not invalid_geometries.empty:
print(f"Invalid geometries found: {len(invalid_geometries)}. Attempting to fix them.")
# Fix invalid geometries using buffer(0)
gdf['geometry'] = gdf['geometry'].buffer(0)
# Combine all polygons into one using unary_union
merged_polygon = unary_union(gdf.geometry)
# Step 1.2: Check if merged_polygon is a MultiPolygon or a single Polygon
if isinstance(merged_polygon, Polygon):
exterior_boundaries = [merged_polygon.exterior]
elif isinstance(merged_polygon, MultiPolygon):
exterior_boundaries = [poly.exterior for poly in merged_polygon.geoms]
else:
raise TypeError("Resulting geometry is neither a Polygon nor a MultiPolygon.")
# Define fields for shapefile
w.field('art', 'C') # Type of tag (Gemeinde, Bundesland, etc.)
w.field('name', 'C') # Name value
w.field('schluessel', 'C') # Schlüssel value
w.field('uebaname', 'C') # Uebaname value
w.field('ueobjekt', 'C') # Ueobjekt value
# Parse the XML file
tree = ET.parse(input_xml)
root = tree.getroot()
# Namespace map
ns = {'gml': 'http://www.opengis.net/gml/3.2',
'adv': 'http://www.adv-online.de/namespaces/adv/gid/6.0'}
# Store the tag names
tag_names = {
'AX_Gemeinde': 'Gemeinde',
'AX_Bundesland': 'Bundesland',
'AX_Regierungsbezirk': 'Regierungsbezirk',
'AX_KreisRegion': 'Kreis / kreisfreie Stadt'
}
# Track if "Gemeinde" and "Kreis / kreisfreie Stadt" have already been added
first_gemeinde_added = False
first_kreis_added = False
# Helper function to extract polygon coordinates
def extract_polygon(coords_text):
try:
coords = list(map(float, coords_text.split()))
return [(coords[i], coords[i+1]) for i in range(0, len(coords), 2)]
except Exception as e:
print(f"Error parsing coordinates: {e}")
return None
# Step 3: Add the extracted XML data to the shapefile along with the exterior boundary from the polygon
tags = ['AX_Gemeinde', 'AX_Bundesland', 'AX_Regierungsbezirk', 'AX_KreisRegion']
data = {}
for tag in tags:
for element in root.findall(f'.//adv:{tag}', ns):
tag_name = tag_names.get(tag, '<null>')
name_elem = element.find('.//adv:bezeichnung', ns)
name = name_elem.text if name_elem is not None else '<null>'
schluessel_elem = element.find('.//adv:schluesselGesamt', ns)
schluessel = schluessel_elem.text if schluessel_elem is not None else '<null>'
# Define uebaname based on tag
if tag == 'AX_Gemeinde':
uebaname = data.get('AX_KreisRegion', {}).get('name', '<null>')
elif tag == 'AX_Bundesland':
uebaname = '<null>'
elif tag == 'AX_Regierungsbezirk':
uebaname = data.get('AX_Bundesland', {}).get('name', '<null>')
elif tag == 'AX_KreisRegion':
uebaname = data.get('AX_Regierungsbezirk', {}).get('name', '<null>')
# Define ueobjekt based on schluessel
if tag == 'AX_Gemeinde':
ueobjekt = f"DE{schluessel[:5]}"
elif tag == 'AX_Bundesland':
ueobjekt = '<null>'
elif tag == 'AX_Regierungsbezirk':
ueobjekt = '<null>'
elif tag == 'AX_KreisRegion':
ueobjekt = f"DE{schluessel[:3]}"
data[tag] = {'name': name, 'schluessel': schluessel}
# Skip if first Gemeinde or Kreis / kreisfreie Stadt has already been added
if (tag == 'AX_Gemeinde' and first_gemeinde_added) or (tag == 'AX_KreisRegion' and first_kreis_added):
continue
# Set flags after first Gemeinde or Kreis / kreisfreie Stadt is added
if tag == 'AX_Gemeinde':
first_gemeinde_added = True
elif tag == 'AX_KreisRegion':
first_kreis_added = True
print("="*50)
print(tag_name)
print(name)
print(schluessel)
print(uebaname)
print(ueobjekt)
print("="*50)
# Add record to shapefile (with the tag data)
w.record(tag_name, name, schluessel, uebaname, ueobjekt)
# Step 4: Add the exterior boundaries to the shapefile
for boundary in exterior_boundaries:
w.poly([list(boundary.coords)])
# Define spatial reference (projection file)
with open(output_shapefile.replace('.shp', '.prj'), 'w') as prj_file:
prj_file.write('PROJCS["ETRS89 / UTM zone 32N",'
'GEOGCS["ETRS89",'
'DATUM["European_Terrestrial_Reference_System_1989",'
'SPHEROID["GRS 1980",6378137,298.257222101]],'
'PRIMEM["Greenwich",0],'
'UNIT["degree",0.0174532925199433]],'
'PROJECTION["Transverse_Mercator"],'
'PARAMETER["latitude_of_origin",0],'
'PARAMETER["central_meridian",9],'
'PARAMETER["scale_factor",0.9996],'
'PARAMETER["false_easting",500000],'
'PARAMETER["false_northing",0],'
'UNIT["metre",1]]')
# Save the shapefile
w.close() |