jordancaraballo's picture
Fixing PEP8 compliance
5f0a394
import os
import shutil
import filecmp
import logging
import datetime
from glob import glob
from pathlib import Path
# from itertools import repeat
from omegaconf import OmegaConf
from multiprocessing import cpu_count # , Pool
from jinja2 import Environment, PackageLoader, select_autoescape
from wildfire_occurrence.model.config import Config
from wildfire_occurrence.model.common import read_config
from wildfire_occurrence.model.data_download.ncep_fnl import NCEP_FNL
from wildfire_occurrence.model.analysis.wrf_analysis import WRFAnalysis
class WRFPipeline(object):
def __init__(
self,
config_filename: str,
start_date: str,
forecast_lenght: str,
multi_node: bool = False
):
# Configuration file intialization
self.conf = read_config(config_filename, Config)
logging.info(f'Loaded configuration from {config_filename}')
# Set value for forecast start and end date
self.start_date = start_date
self.end_date = self.start_date + datetime.timedelta(
days=forecast_lenght)
logging.info(f'WRF start: {self.start_date}, end: {self.end_date}')
# Generate working directories
os.makedirs(self.conf.working_dir, exist_ok=True)
logging.info(f'Created working directory {self.conf.working_dir}')
# Setup working directories and dates
self.simulation_dir = os.path.join(
self.conf.working_dir,
f'{self.start_date.strftime("%Y-%m-%d")}_' +
f'{self.end_date.strftime("%Y-%m-%d")}'
)
os.makedirs(self.simulation_dir, exist_ok=True)
logging.info(f'Created model output directory {self.simulation_dir}')
# Setup data_dir
self.data_dir = os.path.join(self.simulation_dir, 'data')
os.makedirs(self.data_dir, exist_ok=True)
# Setup configuration directory
self.conf_dir = os.path.join(self.simulation_dir, 'configs')
os.makedirs(self.conf_dir, exist_ok=True)
# Setup wps directory
self.local_wps_path = os.path.join(self.simulation_dir, 'WPS')
self.local_wrf_path = os.path.join(self.simulation_dir, 'em_real')
self.local_wrf_output = os.path.join(self.simulation_dir, 'output')
self.local_wrf_output_vars = os.path.join(
self.simulation_dir, 'variables')
# Setup configuration filenames
self.wps_conf_filename = os.path.join(self.conf_dir, 'namelist.wps')
self.wrf_conf_filename = os.path.join(self.conf_dir, 'namelist.input')
# Setup configuration filenames local to directories
self.local_wps_conf = os.path.join(self.local_wps_path, 'namelist.wps')
self.local_wrf_conf = os.path.join(
self.local_wrf_path, 'namelist.input')
# setup multi_node variable
self.conf.multi_node = multi_node
# -------------------------------------------------------------------------
# setup
# -------------------------------------------------------------------------
def setup(self) -> None:
# Working on the setup of the project
logging.info('Starting setup pipeline step')
# Working on the setup of the project
logging.info('Starting download from setup pipeline step')
# Generate subdirectories to work with WRF
os.makedirs(self.data_dir, exist_ok=True)
logging.info(f'Created data directory {self.data_dir}')
# Generate data downloader
data_downloader = NCEP_FNL(
self.data_dir,
self.start_date,
self.end_date
)
data_downloader.download()
# Generate configuration files for WPS - namelist.wps
self.setup_wps_config()
# Generate configuration files for WRF - namelist.input
self.setup_wrf_config()
return
# -------------------------------------------------------------------------
# geogrid
# -------------------------------------------------------------------------
def geogrid(self) -> None:
logging.info('Preparing to run geogrid.exe')
# setup WPS directory
if not os.path.exists(self.local_wps_path):
shutil.copytree(
self.conf.wps_path, self.local_wps_path, dirs_exist_ok=True)
logging.info(f'Done copying WPS to {self.local_wps_path}')
# create configuration file symlink
self._symlink_conf_file(self.wps_conf_filename, self.local_wps_conf)
# go to WPS directory and run wps
os.chdir(self.local_wps_path)
logging.info(f'Changed working directory to {self.local_wps_path}')
# setup geogrid command
if not self.conf.multi_node:
geodrid_cmd = \
'singularity exec -B /explore/nobackup/projects/ilab,' + \
'$NOBACKUP,/panfs/ccds02/nobackup/projects/ilab ' + \
f'{self.conf.container_path} ' + \
f'mpirun -np {cpu_count()} --oversubscribe ./geogrid.exe'
else:
geodrid_cmd = 'mpirun -np 40 --host gpu016 --oversubscribe' + \
'singularity exec -B /explore/nobackup/projects/ilab,' + \
'$NOBACKUP,/lscratch,/panfs/ccds02/nobackup/projects/ilab ' + \
f'{self.conf.container_path} ./geogrid.exe'
# run geogrid command
os.system(geodrid_cmd)
return
# -------------------------------------------------------------------------
# ungrib
# -------------------------------------------------------------------------
def ungrib(self) -> None:
logging.info('Preparing to run geogrid.exe')
# assert WPS directory
assert os.path.exists(self.local_wps_path), \
f'{self.local_wps_path} does not exist, please run geogrid first.'
# create Vtable symlink
local_wps_vtable = os.path.join(self.local_wps_path, 'Vtable')
if not os.path.lexists(local_wps_vtable):
os.symlink(
os.path.join(
self.local_wps_path, 'ungrib/Variable_Tables/Vtable.GFS'),
local_wps_vtable
)
logging.info(f'Created Vtable symlink on {self.local_wps_path}')
# go to WPS directory and run wps
os.chdir(self.local_wps_path)
logging.info(f'Changed working directory to {self.local_wps_path}')
# find all files in directory, and extract common prefix
common_prefix = os.path.commonprefix(
glob(f'{self.data_dir}/{str(self.start_date.year)}/*'))
# run link_grib
os.system(f'./link_grib.csh {common_prefix}')
logging.info('Done with link_grib.csh')
# setup ungrib command
if not self.conf.multi_node:
ungrib_cmd = \
'singularity exec -B /explore/nobackup/projects/ilab,' + \
'$NOBACKUP,/panfs/ccds02/nobackup/projects/ilab ' + \
f'{self.conf.container_path} ./ungrib.exe'
else:
ungrib_cmd = \
'srun --mpi=pmix -N 1 -n 1 singularity exec -B ' + \
'/explore/nobackup/projects/ilab,' + \
'$NOBACKUP,/panfs/ccds02/nobackup/projects/ilab ' + \
f'{self.conf.container_path} ' + \
'./ungrib.exe'
# run ungrib command
os.system(ungrib_cmd)
return
# -------------------------------------------------------------------------
# metgrid
# -------------------------------------------------------------------------
def metgrid(self) -> None:
logging.info('Preparing to run metgrid.exe')
# assert WPS directory
assert os.path.exists(self.local_wps_path), \
f'{self.local_wps_path} does not exist, please run geogrid first.'
# go to WPS directory and run wps
os.chdir(self.local_wps_path)
logging.info(f'Changed working directory to {self.local_wps_path}')
# setup metgrid command
if not self.conf.multi_node:
metgrid_cmd = \
'singularity exec -B /explore/nobackup/projects/ilab,' + \
'$NOBACKUP,/panfs/ccds02/nobackup/projects/ilab ' + \
f'{self.conf.container_path} ' + \
f'mpirun -np {cpu_count()} --oversubscribe ./metgrid.exe'
else:
metgrid_cmd = \
f'srun --mpi=pmix -N 1 -n {cpu_count()} singularity ' + \
'exec -B /explore/nobackup/projects/ilab,' + \
'$NOBACKUP,/panfs/ccds02/nobackup/projects/ilab ' + \
f'{self.conf.container_path} ' + \
'./metgrid.exe'
# run metgrid command
os.system(metgrid_cmd)
return
# -------------------------------------------------------------------------
# real
# -------------------------------------------------------------------------
def real(self) -> None:
logging.info('Preparing to run em_real.exe')
# setup WRF em_real directory
if not os.path.exists(self.local_wrf_path):
shutil.copytree(
os.path.join(self.conf.wrf_path, 'test', 'em_real'),
self.local_wrf_path, dirs_exist_ok=True)
logging.info(f'Done copying WRF em_real to {self.local_wrf_path}')
# move the created met_em* files to the em_real directory
for met_filename in glob(os.path.join(self.local_wps_path, 'met_em*')):
local_met_filename = os.path.join(
self.local_wrf_path, f'{Path(met_filename).stem}.nc')
if not os.path.exists(local_met_filename):
os.symlink(met_filename, local_met_filename)
# create configuration file symlink
self._symlink_conf_file(self.wrf_conf_filename, self.local_wrf_conf)
# go to WRF directory and run real
os.chdir(self.local_wrf_path)
logging.info(f'Changed working directory to {self.local_wrf_path}')
# setup real command
if not self.conf.multi_node:
real_cmd = \
'singularity exec -B /explore/nobackup/projects/ilab,' + \
'$NOBACKUP,/panfs/ccds02/nobackup/projects/ilab ' + \
f'{self.conf.container_path} ' + \
f'mpirun -np {cpu_count()} --oversubscribe ./real.exe'
else:
real_cmd = \
'srun --mpi=pmix -N 2 -n 80 singularity ' + \
'exec -B /explore/nobackup/projects/ilab,' + \
'$NOBACKUP,/panfs/ccds02/nobackup/projects/ilab ' + \
f'{self.conf.container_path} ' + \
'./real.exe'
# run metgrid command
os.system(real_cmd)
return
# -------------------------------------------------------------------------
# wrf
# -------------------------------------------------------------------------
def wrf(self) -> None:
logging.info('Preparing to run wrf.exe')
# assert WPS directory
assert os.path.exists(self.local_wrf_path), \
f'{self.local_wrf_path} does not exist, please run real first.'
# go to WPS directory and run wps
os.chdir(self.local_wrf_path)
logging.info(f'Changed working directory to {self.local_wrf_path}')
# setup metgrid command
if not self.conf.multi_node:
wrf_cmd = \
'singularity exec -B /explore/nobackup/projects/ilab,' + \
'$NOBACKUP,/panfs/ccds02/nobackup/projects/ilab ' + \
f'{self.conf.container_path} ' + \
f'mpirun -np {cpu_count()} --oversubscribe ./wrf.exe'
else:
wrf_cmd = \
'srun --mpi=pmix -N 2 -n 80 singularity ' + \
'exec -B /explore/nobackup/projects/ilab,' + \
'$NOBACKUP,/panfs/ccds02/nobackup/projects/ilab ' + \
f'{self.conf.container_path} ' + \
'./wrf.exe'
# run metgrid command
os.system(wrf_cmd)
# TODO
# move output files at the end to something like working_dir/results
return
# -------------------------------------------------------------------------
# postprocess
# -------------------------------------------------------------------------
def postprocess(self) -> None:
logging.info('Preparing to postprocess and extract variables')
# create output directory
os.makedirs(self.local_wrf_output, exist_ok=True)
logging.info(f'Created WRF output directory {self.local_wrf_output}')
# if the files have not been moved, move them to the output dir
if len(os.listdir(self.local_wrf_output)) == 0:
# get filenames, make sure they exist
wrf_output_filenames = \
glob(os.path.join(self.local_wrf_path, 'auxhist24_d0*')) + \
glob(os.path.join(self.local_wrf_path, 'wrfout_d0*'))
assert len(wrf_output_filenames) > 0, \
'WRF output (auxhist24_d0*, wrfout_d0*) not found. Re-run WRF.'
# move output to clean directory
for filename in wrf_output_filenames:
shutil.move(filename, self.local_wrf_output)
logging.info(f'Moved WRF output to {self.local_wrf_output}')
# Get WRF output filename
wrf_output_filename = glob(
os.path.join(self.local_wrf_output, self.conf.wrf_output_filename))
assert len(wrf_output_filename) == 1, \
f'WRF output filename not found under {self.local_wrf_output}.'
# Get first item from the list
wrf_output_filename = wrf_output_filename[0]
logging.info(f'Loading {wrf_output_filename}')
# create WRFAnalysis object, stores wrf_dataset
wrf_analysis = WRFAnalysis(wrf_output_filename)
# variables output dir
os.makedirs(self.local_wrf_output_vars, exist_ok=True)
logging.info(
f'Created WRF vars output directory {self.local_wrf_output_vars}')
"""
# parallel extraction of variables
p = Pool(processes=cpu_count())
p.starmap(
self._compute_variables,
zip(
range(len(wrf_analysis.wrf_dataset.Times.values)),
wrf_analysis.wrf_dataset.Times.values,
repeat(wrf_output_filename)
)
)
"""
# serial extraction of variables
for t_idx, delta_time in \
enumerate(wrf_analysis.wrf_dataset.Times.values):
logging.info(f'Processing t_idx: {t_idx}, timestamp: {delta_time}')
# setup output filename
output_filename = os.path.join(
self.local_wrf_output_vars,
f"d02_{delta_time.astype(str).replace(':', '-')}.tif")
# if the imagery does not exist
if not os.path.isfile(output_filename):
# compute WRF variables and output to disk
wrf_analysis.compute_all_and_write(
timeidx=t_idx,
output_variables=OmegaConf.to_object(
self.conf.wrf_output_variables),
output_filename=output_filename
)
# -------------------------------------------------------------------------
# _compute_variables
# -------------------------------------------------------------------------
def _compute_variables(self, t_idx, delta_time, wrf_output_filename):
logging.info(f'Processing t_idx: {t_idx}, timestamp: {delta_time}')
# setup output filename
output_filename = os.path.join(
self.local_wrf_output_vars,
f"d02_{delta_time.astype(str).replace(':', '-')}.tif")
# if the imagery does not exist
if not os.path.isfile(output_filename):
# unfortunately, netCDF object is not pickable, thus we need
# to redifine it on every single process, hopefully there
# will be an implementation on it at some point by them
wrf_analysis = WRFAnalysis(wrf_output_filename)
# compute WRF variables and output to disk
wrf_analysis.compute_all_and_write(
timeidx=t_idx,
output_variables=OmegaConf.to_object(
self.conf.wrf_output_variables),
output_filename=output_filename
)
return
# -------------------------------------------------------------------------
# setup_wps_config
# -------------------------------------------------------------------------
def setup_wps_config(self, template_filename: str = 'namelist.wps.jinja2'):
# Setup jinja2 Environment
env = Environment(
loader=PackageLoader("wildfire_occurrence"),
autoescape=select_autoescape()
)
# Get the template of the environment for WPS
template = env.get_template(template_filename)
# Modify configuration to include start and end date
self.conf.wps_config['start_date'] = \
self.start_date.strftime("%Y-%m-%d_%H:%M:%S")
self.conf.wps_config['end_date'] = \
self.end_date.strftime("%Y-%m-%d_%H:%M:%S")
# Fill in elements from the WPS environment and save filename
template.stream(self.conf.wps_config).dump(self.wps_conf_filename)
logging.info(f'Saved WPS configuration at {self.wps_conf_filename}')
return
# -------------------------------------------------------------------------
# setup_wrf_config
# -------------------------------------------------------------------------
def setup_wrf_config(
self, template_filename: str = 'namelist.input.jinja2'):
# Setup jinja2 Environment
env = Environment(
loader=PackageLoader("wildfire_occurrence"),
autoescape=select_autoescape()
)
# Get the template of the environment for WPS
template = env.get_template(template_filename)
# Modify configuration to include start and end date
self.conf.wrf_config['start_year'] = self.start_date.year
self.conf.wrf_config['end_year'] = self.end_date.year
self.conf.wrf_config['start_month'] = self.start_date.strftime('%m')
self.conf.wrf_config['end_month'] = self.end_date.strftime('%m')
self.conf.wrf_config['start_day'] = self.start_date.strftime('%d')
self.conf.wrf_config['end_day'] = self.end_date.strftime('%d')
self.conf.wrf_config['start_hour'] = self.start_date.strftime('%H')
self.conf.wrf_config['end_hour'] = self.end_date.strftime('%H')
# Fill in elements from the WRF environment and save filename
template.stream(self.conf.wrf_config).dump(self.wrf_conf_filename)
logging.info(f'Saved WRF configuration at {self.wrf_conf_filename}')
return
def _symlink_conf_file(self, source, destination):
# condition 1: local configuration file does not exist, symlink it
if not os.path.lexists(destination):
# make sure configuration file exists
assert os.path.isfile(source), \
'Please run setup pipeline step before running real'
# add symlink
os.symlink(source, destination)
logging.info(f'Created namelist.input symlink at {destination}')
# condition #2: local configuration exists, but is not the latest one
elif not filecmp.cmp(source, destination):
# remove previous version
os.remove(destination)
# add symlink
os.symlink(source, destination)
logging.info(
f'Removed old copy of {destination}. Created new '
f'namelist.input symlink at {destination}')
else:
logging.info(
f'namelist.input exists, {destination}, '
'nothing to copy.')