import os import shutil import filecmp import logging import datetime from glob import glob from pathlib import Path # from itertools import repeat from omegaconf import OmegaConf from multiprocessing import cpu_count # , Pool from jinja2 import Environment, PackageLoader, select_autoescape from wildfire_occurrence.model.config import Config from wildfire_occurrence.model.common import read_config from wildfire_occurrence.model.data_download.ncep_fnl import NCEP_FNL from wildfire_occurrence.model.analysis.wrf_analysis import WRFAnalysis class WRFPipeline(object): def __init__( self, config_filename: str, start_date: str, forecast_lenght: str, multi_node: bool = False ): # Configuration file intialization self.conf = read_config(config_filename, Config) logging.info(f'Loaded configuration from {config_filename}') # Set value for forecast start and end date self.start_date = start_date self.end_date = self.start_date + datetime.timedelta( days=forecast_lenght) logging.info(f'WRF start: {self.start_date}, end: {self.end_date}') # Generate working directories os.makedirs(self.conf.working_dir, exist_ok=True) logging.info(f'Created working directory {self.conf.working_dir}') # Setup working directories and dates self.simulation_dir = os.path.join( self.conf.working_dir, f'{self.start_date.strftime("%Y-%m-%d")}_' + f'{self.end_date.strftime("%Y-%m-%d")}' ) os.makedirs(self.simulation_dir, exist_ok=True) logging.info(f'Created model output directory {self.simulation_dir}') # Setup data_dir self.data_dir = os.path.join(self.simulation_dir, 'data') os.makedirs(self.data_dir, exist_ok=True) # Setup configuration directory self.conf_dir = os.path.join(self.simulation_dir, 'configs') os.makedirs(self.conf_dir, exist_ok=True) # Setup wps directory self.local_wps_path = os.path.join(self.simulation_dir, 'WPS') self.local_wrf_path = os.path.join(self.simulation_dir, 'em_real') self.local_wrf_output = os.path.join(self.simulation_dir, 'output') self.local_wrf_output_vars = os.path.join( self.simulation_dir, 'variables') # Setup configuration filenames self.wps_conf_filename = os.path.join(self.conf_dir, 'namelist.wps') self.wrf_conf_filename = os.path.join(self.conf_dir, 'namelist.input') # Setup configuration filenames local to directories self.local_wps_conf = os.path.join(self.local_wps_path, 'namelist.wps') self.local_wrf_conf = os.path.join( self.local_wrf_path, 'namelist.input') # setup multi_node variable self.conf.multi_node = multi_node # ------------------------------------------------------------------------- # setup # ------------------------------------------------------------------------- def setup(self) -> None: # Working on the setup of the project logging.info('Starting setup pipeline step') # Working on the setup of the project logging.info('Starting download from setup pipeline step') # Generate subdirectories to work with WRF os.makedirs(self.data_dir, exist_ok=True) logging.info(f'Created data directory {self.data_dir}') # Generate data downloader data_downloader = NCEP_FNL( self.data_dir, self.start_date, self.end_date ) data_downloader.download() # Generate configuration files for WPS - namelist.wps self.setup_wps_config() # Generate configuration files for WRF - namelist.input self.setup_wrf_config() return # ------------------------------------------------------------------------- # geogrid # ------------------------------------------------------------------------- def geogrid(self) -> None: logging.info('Preparing to run geogrid.exe') # setup WPS directory if not os.path.exists(self.local_wps_path): shutil.copytree( self.conf.wps_path, self.local_wps_path, dirs_exist_ok=True) logging.info(f'Done copying WPS to {self.local_wps_path}') # create configuration file symlink self._symlink_conf_file(self.wps_conf_filename, self.local_wps_conf) # go to WPS directory and run wps os.chdir(self.local_wps_path) logging.info(f'Changed working directory to {self.local_wps_path}') # setup geogrid command if not self.conf.multi_node: geodrid_cmd = \ 'singularity exec -B /explore/nobackup/projects/ilab,' + \ '$NOBACKUP,/panfs/ccds02/nobackup/projects/ilab ' + \ f'{self.conf.container_path} ' + \ f'mpirun -np {cpu_count()} --oversubscribe ./geogrid.exe' else: geodrid_cmd = 'mpirun -np 40 --host gpu016 --oversubscribe' + \ 'singularity exec -B /explore/nobackup/projects/ilab,' + \ '$NOBACKUP,/lscratch,/panfs/ccds02/nobackup/projects/ilab ' + \ f'{self.conf.container_path} ./geogrid.exe' # run geogrid command os.system(geodrid_cmd) return # ------------------------------------------------------------------------- # ungrib # ------------------------------------------------------------------------- def ungrib(self) -> None: logging.info('Preparing to run geogrid.exe') # assert WPS directory assert os.path.exists(self.local_wps_path), \ f'{self.local_wps_path} does not exist, please run geogrid first.' # create Vtable symlink local_wps_vtable = os.path.join(self.local_wps_path, 'Vtable') if not os.path.lexists(local_wps_vtable): os.symlink( os.path.join( self.local_wps_path, 'ungrib/Variable_Tables/Vtable.GFS'), local_wps_vtable ) logging.info(f'Created Vtable symlink on {self.local_wps_path}') # go to WPS directory and run wps os.chdir(self.local_wps_path) logging.info(f'Changed working directory to {self.local_wps_path}') # find all files in directory, and extract common prefix common_prefix = os.path.commonprefix( glob(f'{self.data_dir}/{str(self.start_date.year)}/*')) # run link_grib os.system(f'./link_grib.csh {common_prefix}') logging.info('Done with link_grib.csh') # setup ungrib command if not self.conf.multi_node: ungrib_cmd = \ 'singularity exec -B /explore/nobackup/projects/ilab,' + \ '$NOBACKUP,/panfs/ccds02/nobackup/projects/ilab ' + \ f'{self.conf.container_path} ./ungrib.exe' else: ungrib_cmd = \ 'srun --mpi=pmix -N 1 -n 1 singularity exec -B ' + \ '/explore/nobackup/projects/ilab,' + \ '$NOBACKUP,/panfs/ccds02/nobackup/projects/ilab ' + \ f'{self.conf.container_path} ' + \ './ungrib.exe' # run ungrib command os.system(ungrib_cmd) return # ------------------------------------------------------------------------- # metgrid # ------------------------------------------------------------------------- def metgrid(self) -> None: logging.info('Preparing to run metgrid.exe') # assert WPS directory assert os.path.exists(self.local_wps_path), \ f'{self.local_wps_path} does not exist, please run geogrid first.' # go to WPS directory and run wps os.chdir(self.local_wps_path) logging.info(f'Changed working directory to {self.local_wps_path}') # setup metgrid command if not self.conf.multi_node: metgrid_cmd = \ 'singularity exec -B /explore/nobackup/projects/ilab,' + \ '$NOBACKUP,/panfs/ccds02/nobackup/projects/ilab ' + \ f'{self.conf.container_path} ' + \ f'mpirun -np {cpu_count()} --oversubscribe ./metgrid.exe' else: metgrid_cmd = \ f'srun --mpi=pmix -N 1 -n {cpu_count()} singularity ' + \ 'exec -B /explore/nobackup/projects/ilab,' + \ '$NOBACKUP,/panfs/ccds02/nobackup/projects/ilab ' + \ f'{self.conf.container_path} ' + \ './metgrid.exe' # run metgrid command os.system(metgrid_cmd) return # ------------------------------------------------------------------------- # real # ------------------------------------------------------------------------- def real(self) -> None: logging.info('Preparing to run em_real.exe') # setup WRF em_real directory if not os.path.exists(self.local_wrf_path): shutil.copytree( os.path.join(self.conf.wrf_path, 'test', 'em_real'), self.local_wrf_path, dirs_exist_ok=True) logging.info(f'Done copying WRF em_real to {self.local_wrf_path}') # move the created met_em* files to the em_real directory for met_filename in glob(os.path.join(self.local_wps_path, 'met_em*')): local_met_filename = os.path.join( self.local_wrf_path, f'{Path(met_filename).stem}.nc') if not os.path.exists(local_met_filename): os.symlink(met_filename, local_met_filename) # create configuration file symlink self._symlink_conf_file(self.wrf_conf_filename, self.local_wrf_conf) # go to WRF directory and run real os.chdir(self.local_wrf_path) logging.info(f'Changed working directory to {self.local_wrf_path}') # setup real command if not self.conf.multi_node: real_cmd = \ 'singularity exec -B /explore/nobackup/projects/ilab,' + \ '$NOBACKUP,/panfs/ccds02/nobackup/projects/ilab ' + \ f'{self.conf.container_path} ' + \ f'mpirun -np {cpu_count()} --oversubscribe ./real.exe' else: real_cmd = \ 'srun --mpi=pmix -N 2 -n 80 singularity ' + \ 'exec -B /explore/nobackup/projects/ilab,' + \ '$NOBACKUP,/panfs/ccds02/nobackup/projects/ilab ' + \ f'{self.conf.container_path} ' + \ './real.exe' # run metgrid command os.system(real_cmd) return # ------------------------------------------------------------------------- # wrf # ------------------------------------------------------------------------- def wrf(self) -> None: logging.info('Preparing to run wrf.exe') # assert WPS directory assert os.path.exists(self.local_wrf_path), \ f'{self.local_wrf_path} does not exist, please run real first.' # go to WPS directory and run wps os.chdir(self.local_wrf_path) logging.info(f'Changed working directory to {self.local_wrf_path}') # setup metgrid command if not self.conf.multi_node: wrf_cmd = \ 'singularity exec -B /explore/nobackup/projects/ilab,' + \ '$NOBACKUP,/panfs/ccds02/nobackup/projects/ilab ' + \ f'{self.conf.container_path} ' + \ f'mpirun -np {cpu_count()} --oversubscribe ./wrf.exe' else: wrf_cmd = \ 'srun --mpi=pmix -N 2 -n 80 singularity ' + \ 'exec -B /explore/nobackup/projects/ilab,' + \ '$NOBACKUP,/panfs/ccds02/nobackup/projects/ilab ' + \ f'{self.conf.container_path} ' + \ './wrf.exe' # run metgrid command os.system(wrf_cmd) # TODO # move output files at the end to something like working_dir/results return # ------------------------------------------------------------------------- # postprocess # ------------------------------------------------------------------------- def postprocess(self) -> None: logging.info('Preparing to postprocess and extract variables') # create output directory os.makedirs(self.local_wrf_output, exist_ok=True) logging.info(f'Created WRF output directory {self.local_wrf_output}') # if the files have not been moved, move them to the output dir if len(os.listdir(self.local_wrf_output)) == 0: # get filenames, make sure they exist wrf_output_filenames = \ glob(os.path.join(self.local_wrf_path, 'auxhist24_d0*')) + \ glob(os.path.join(self.local_wrf_path, 'wrfout_d0*')) assert len(wrf_output_filenames) > 0, \ 'WRF output (auxhist24_d0*, wrfout_d0*) not found. Re-run WRF.' # move output to clean directory for filename in wrf_output_filenames: shutil.move(filename, self.local_wrf_output) logging.info(f'Moved WRF output to {self.local_wrf_output}') # Get WRF output filename wrf_output_filename = glob( os.path.join(self.local_wrf_output, self.conf.wrf_output_filename)) assert len(wrf_output_filename) == 1, \ f'WRF output filename not found under {self.local_wrf_output}.' # Get first item from the list wrf_output_filename = wrf_output_filename[0] logging.info(f'Loading {wrf_output_filename}') # create WRFAnalysis object, stores wrf_dataset wrf_analysis = WRFAnalysis(wrf_output_filename) # variables output dir os.makedirs(self.local_wrf_output_vars, exist_ok=True) logging.info( f'Created WRF vars output directory {self.local_wrf_output_vars}') """ # parallel extraction of variables p = Pool(processes=cpu_count()) p.starmap( self._compute_variables, zip( range(len(wrf_analysis.wrf_dataset.Times.values)), wrf_analysis.wrf_dataset.Times.values, repeat(wrf_output_filename) ) ) """ # serial extraction of variables for t_idx, delta_time in \ enumerate(wrf_analysis.wrf_dataset.Times.values): logging.info(f'Processing t_idx: {t_idx}, timestamp: {delta_time}') # setup output filename output_filename = os.path.join( self.local_wrf_output_vars, f"d02_{delta_time.astype(str).replace(':', '-')}.tif") # if the imagery does not exist if not os.path.isfile(output_filename): # compute WRF variables and output to disk wrf_analysis.compute_all_and_write( timeidx=t_idx, output_variables=OmegaConf.to_object( self.conf.wrf_output_variables), output_filename=output_filename ) # ------------------------------------------------------------------------- # _compute_variables # ------------------------------------------------------------------------- def _compute_variables(self, t_idx, delta_time, wrf_output_filename): logging.info(f'Processing t_idx: {t_idx}, timestamp: {delta_time}') # setup output filename output_filename = os.path.join( self.local_wrf_output_vars, f"d02_{delta_time.astype(str).replace(':', '-')}.tif") # if the imagery does not exist if not os.path.isfile(output_filename): # unfortunately, netCDF object is not pickable, thus we need # to redifine it on every single process, hopefully there # will be an implementation on it at some point by them wrf_analysis = WRFAnalysis(wrf_output_filename) # compute WRF variables and output to disk wrf_analysis.compute_all_and_write( timeidx=t_idx, output_variables=OmegaConf.to_object( self.conf.wrf_output_variables), output_filename=output_filename ) return # ------------------------------------------------------------------------- # setup_wps_config # ------------------------------------------------------------------------- def setup_wps_config(self, template_filename: str = 'namelist.wps.jinja2'): # Setup jinja2 Environment env = Environment( loader=PackageLoader("wildfire_occurrence"), autoescape=select_autoescape() ) # Get the template of the environment for WPS template = env.get_template(template_filename) # Modify configuration to include start and end date self.conf.wps_config['start_date'] = \ self.start_date.strftime("%Y-%m-%d_%H:%M:%S") self.conf.wps_config['end_date'] = \ self.end_date.strftime("%Y-%m-%d_%H:%M:%S") # Fill in elements from the WPS environment and save filename template.stream(self.conf.wps_config).dump(self.wps_conf_filename) logging.info(f'Saved WPS configuration at {self.wps_conf_filename}') return # ------------------------------------------------------------------------- # setup_wrf_config # ------------------------------------------------------------------------- def setup_wrf_config( self, template_filename: str = 'namelist.input.jinja2'): # Setup jinja2 Environment env = Environment( loader=PackageLoader("wildfire_occurrence"), autoescape=select_autoescape() ) # Get the template of the environment for WPS template = env.get_template(template_filename) # Modify configuration to include start and end date self.conf.wrf_config['start_year'] = self.start_date.year self.conf.wrf_config['end_year'] = self.end_date.year self.conf.wrf_config['start_month'] = self.start_date.strftime('%m') self.conf.wrf_config['end_month'] = self.end_date.strftime('%m') self.conf.wrf_config['start_day'] = self.start_date.strftime('%d') self.conf.wrf_config['end_day'] = self.end_date.strftime('%d') self.conf.wrf_config['start_hour'] = self.start_date.strftime('%H') self.conf.wrf_config['end_hour'] = self.end_date.strftime('%H') # Fill in elements from the WRF environment and save filename template.stream(self.conf.wrf_config).dump(self.wrf_conf_filename) logging.info(f'Saved WRF configuration at {self.wrf_conf_filename}') return def _symlink_conf_file(self, source, destination): # condition 1: local configuration file does not exist, symlink it if not os.path.lexists(destination): # make sure configuration file exists assert os.path.isfile(source), \ 'Please run setup pipeline step before running real' # add symlink os.symlink(source, destination) logging.info(f'Created namelist.input symlink at {destination}') # condition #2: local configuration exists, but is not the latest one elif not filecmp.cmp(source, destination): # remove previous version os.remove(destination) # add symlink os.symlink(source, destination) logging.info( f'Removed old copy of {destination}. Created new ' f'namelist.input symlink at {destination}') else: logging.info( f'namelist.input exists, {destination}, ' 'nothing to copy.')