NAS-ALKIS-Conversion / flurstueck.py
shamim237's picture
Upload 11 files
baf9496 verified
import time
import geopandas as gpd
from shapely.geometry import Polygon
import xml.etree.ElementTree as ET
import multiprocessing as mp
import sys
# Parse the XML file correctly
def parse_xml(file_path):
tree = ET.parse(file_path) # Parse the entire XML file
root = tree.getroot() # Get the root element
return root
# Extract coordinates in bulk
def extract_coordinates(posList):
coordinates = [float(x) for x in posList.strip().split()]
return [(coordinates[i], coordinates[i + 1]) for i in range(0, len(coordinates), 2)]
# Create flurstnr based on zaehler and nenner
def create_flurstnr(zaehler, nenner=None):
return f"{zaehler}/{nenner}" if nenner else zaehler
# Create gmdschl by merging land, kreis, regierungsbezirk, and gemeinde
def create_gmdschl(land, regierungsbezirk, kreis, gemeinde):
return f"{land}{regierungsbezirk}{kreis}{gemeinde}"
# Find and extract kreis values
def find_kreis(root, namespaces):
kreis_dict = {}
for kreis_region in root.findall('.//adv:AX_KreisRegion', namespaces):
schluessel_gesamt = kreis_region.find('.//adv:schluesselGesamt', namespaces)
if schluessel_gesamt is not None:
kreis_dict[schluessel_gesamt.text] = kreis_region.find('.//adv:bezeichnung', namespaces).text
return kreis_dict
# Find and extract regbezirk values
def find_regbezirk(root, namespaces):
regbezirk_dict = {}
for regbezirk in root.findall('.//adv:AX_Regierungsbezirk', namespaces):
schluessel_gesamt = regbezirk.find('.//adv:schluesselGesamt', namespaces)
bezeichnung = regbezirk.find('.//adv:bezeichnung', namespaces)
if schluessel_gesamt is not None and bezeichnung is not None:
regbezirk_dict[schluessel_gesamt.text] = bezeichnung.text
return regbezirk_dict
# Create lookup dictionary
def create_lookup_dict(root, tag, key_path, value_path, namespaces):
lookup_dict = {}
for element in root.findall(f'.//adv:{tag}', namespaces):
key = element.find(key_path, namespaces)
value = element.find(value_path, namespaces)
if key is not None and value is not None:
lookup_dict[key.text] = value.text
return lookup_dict
# Create lagebeztxt lookup dictionary
def create_lagebeztxt_dict(root, namespaces):
lagebeztxt_dict = {}
for tag in ['AX_LagebezeichnungMitHausnummer', 'AX_LagebezeichnungOhneHausnummer']:
for element in root.findall(f'.//adv:{tag}', namespaces):
gml_id = element.get('{http://www.opengis.net/gml/3.2}id')
unverschluesselt = element.find('.//adv:unverschluesselt', namespaces)
hausnummer = element.find('.//adv:hausnummer', namespaces)
if unverschluesselt is not None:
if hausnummer is not None:
lagebeztxt_dict[gml_id] = f"{unverschluesselt.text} {hausnummer.text}"
else:
lagebeztxt_dict[gml_id] = unverschluesselt.text
else:
lagebeztxt_dict[gml_id] = "<null>"
return lagebeztxt_dict
# Process a single AX_Flurstueck element
def process_single_flurstueck(flurstueck, namespaces, lookup_dicts):
# Extracting coordinates in bulk
polygon_coords = [coord for posList in flurstueck.findall('.//gml:posList', namespaces)
for coord in extract_coordinates(posList.text)]
polygon = Polygon(polygon_coords)
# Extract attributes in a single pass
flaeche = flurstueck.find('.//adv:amtlicheFlaeche', namespaces)
flaeche = flaeche.text if flaeche is not None else None
flstkennz = flurstueck.find('.//adv:flurstueckskennzeichen', namespaces)
flstkennz = flstkennz.text if flstkennz is not None else None
zaehler = flurstueck.find('.//adv:zaehler', namespaces).text
nenner = flurstueck.find('.//adv:nenner', namespaces)
flurstnr = create_flurstnr(zaehler, nenner.text if nenner is not None else None)
# Gemeindeschlüssel extraction
gemeindekennzeichen = flurstueck.find('.//adv:AX_Gemeindekennzeichen', namespaces)
if gemeindekennzeichen is not None:
land = gemeindekennzeichen.find('.//adv:land', namespaces).text
kreis = gemeindekennzeichen.find('.//adv:kreis', namespaces).text
regierungsbezirk = gemeindekennzeichen.find('.//adv:regierungsbezirk', namespaces).text
gemeinde_code = gemeindekennzeichen.find('.//adv:gemeinde', namespaces).text
gmdschl = create_gmdschl(land, regierungsbezirk, kreis, gemeinde_code)
else:
gmdschl = None
land = None
regierungsbezirk = None
gemeinde_code = None
# Use the lookup dictionaries for faster data retrieval
merged_value = f"{land}{regierungsbezirk}{kreis}"
kreis_bezeichnung = lookup_dicts['kreis'].get(merged_value, "<null>")
regbezirk_key = f"{land}{regierungsbezirk}" if land and regierungsbezirk else None
regbezirk_bezeichnung = lookup_dicts['regbezirk'].get(regbezirk_key, "<null>")
gemeinde = lookup_dicts['gemeinde'].get(merged_value + gemeinde_code, "<null>") if gemeindekennzeichen is not None else "<null>"
land_name = lookup_dicts['land'].get(land, "<null>") if land else "<null>"
gemarkungsnummer = flurstueck.find('.//adv:AX_Gemarkung_Schluessel/adv:gemarkungsnummer', namespaces)
gemarkung = lookup_dicts['gemarkung'].get(f"{land}{gemarkungsnummer.text}", "<null>") if land is not None and gemarkungsnummer is not None else "<null>"
# Extract lagebeztxt using the lookup dictionary
weist_auf = flurstueck.find('.//adv:weistAuf[@xlink:href]', namespaces)
zeigt_auf = flurstueck.find('.//adv:zeigtAuf[@xlink:href]', namespaces)
if weist_auf is not None:
href = weist_auf.get('{http://www.w3.org/1999/xlink}href')
lagebeztxt = lookup_dicts['lagebeztxt'].get(href.split(":")[-1], "<null>")
elif zeigt_auf is not None:
href = zeigt_auf.get('{http://www.w3.org/1999/xlink}href')
lagebeztxt = lookup_dicts['lagebeztxt'].get(href.split(":")[-1], "<null>")
else:
lagebeztxt = "<null>"
# Return the extracted data as a dictionary
return {
'geometry': polygon,
'flaeche': flaeche,
'flstkennz': flstkennz,
'flur': 'Flur',
'flurstnr': flurstnr,
'gmdschl': gmdschl,
'regbezirk': regbezirk_bezeichnung,
'kreis': kreis_bezeichnung,
'gemeinde': gemeinde,
'land': land_name,
'gemarkung': gemarkung,
'lagebeztxt': lagebeztxt
}
# Process all AX_Flurstueck tags with optimizations
def process_flurstueck(root):
namespaces = {'gml': 'http://www.opengis.net/gml/3.2',
'adv': 'http://www.adv-online.de/namespaces/adv/gid/6.0',
'xlink': 'http://www.w3.org/1999/xlink'}
# Create lookup dictionaries
lookup_dicts = {
'kreis': find_kreis(root, namespaces),
'regbezirk': find_regbezirk(root, namespaces),
'gemeinde': create_lookup_dict(root, 'AX_Gemeinde', './/adv:schluesselGesamt', './/adv:bezeichnung', namespaces),
'land': create_lookup_dict(root, 'AX_Bundesland', './/adv:schluesselGesamt', './/adv:bezeichnung', namespaces),
'gemarkung': create_lookup_dict(root, 'AX_Gemarkung', './/adv:schluesselGesamt', './/adv:bezeichnung', namespaces),
'lagebeztxt': create_lagebeztxt_dict(root, namespaces)
}
# Use multiprocessing to process Flurstueck elements in parallel
with mp.Pool() as pool:
data = pool.starmap(
process_single_flurstueck,
[(flurstueck, namespaces, lookup_dicts) for flurstueck in root.findall('.//adv:AX_Flurstueck', namespaces)]
)
return data
# Main function
def main(xml_file, output_shapefile):
start_time = time.time()
root = parse_xml(xml_file)
data = process_flurstueck(root)
# Create a GeoDataFrame
gdf = gpd.GeoDataFrame(data)
gdf.set_crs(epsg=25832, inplace=True) # Set appropriate CRS
# Save to a shapefile
gdf.to_file(output_shapefile, driver='ESRI Shapefile')
# Save the .prj file with the specified projection
prj_content = ('PROJCS["ETRS89 / UTM zone 32N",'
'GEOGCS["ETRS89",'
'DATUM["European_Terrestrial_Reference_System_1989",'
'SPHEROID["GRS 1980",6378137,298.257222101]],'
'PRIMEM["Greenwich",0],'
'UNIT["degree",0.0174532925199433]],'
'PROJECTION["Transverse_Mercator"],'
'PARAMETER["latitude_of_origin",0],'
'PARAMETER["central_meridian",9],'
'PARAMETER["scale_factor",0.9996],'
'PARAMETER["false_easting",500000],'
'PARAMETER["false_northing",0],'
'UNIT["metre",1]]')
prj_file = output_shapefile.replace('.shp', '.prj')
with open(prj_file, 'w') as prj:
prj.write(prj_content)
end_time = time.time()
print(f"Processing complete. Shapefile saved as '{output_shapefile}'. Time taken: {end_time - start_time:.2f} seconds.")
# Example usage
if __name__ == "__main__":
xml_file = sys.argv[1]
output_shapefile = sys.argv[2]
main(xml_file, output_shapefile)