# -*- coding: utf-8 -*- import argparse import functools import logging import math import multiprocessing import numpy as np import signal import sys import time # Ensure subprocesses are killed on SIGTERM def sigterm_handler(sig, frame): try: os.killpg(os.getpgid(os.getpid()), signal.SIGKILL) except Exception: pass sys.exit(1) signal.signal(signal.SIGTERM, sigterm_handler) # ***************** EDIT THESE VARIABLES FOR YOUR NEEDS ****************** # NUMBER OF THREADS TO USE FOR LPMM # *The domain will be split into N=thread_pool_size patches, with calculations performed for each patch in a separate thread thread_pool_size = 8 # Path for output file (binary data) path_output = '/path/to/dir/test_lpmm.dat' # Original grid spacing (km) dx = 3. # Original grid size nx = 1799 ny = 1059 # Flag indicating whether to calculate local PMM (instead of grid-wide PMM) # *If True, calculates LPMM # *If False, calculates PMM LOCAL_PMM_FLAG = True # "Neighborhood" size for LPMM calculation # *This is the "radius" of the square box surrounding each grid point where values are considered # *Square is used instead of circle for faster computation (easier to use numpy array slicing) # *We use 110 km for SPC HREF QPF LPMM; i.e., LPMM is computed over a ~220x220 km box around each point neighborhood_radius_km = 110. # Max execute time before timeout (s) MAX_EXECUTE_TIME = 3600 # Ensemble member list # *Specifying names may be useful to formulate paths to data files in code below that reads in member data # *If not, just fill this list with dummy values to make its length equal to ensemble membership size ensemble_members = ['namnest.tl00', 'hrwnssl.tl00', 'hrwarw.tl00'] # ************************************************************************ # List of 2D numpy arrays, each to contain field (QPF?) data for an ensemble member all_member_data = [] # Number of ensemble members successfully processed processed_member_count = 0 # Neighborhood size, as a linear grid point count nx_per_neighborhood = int(float(neighborhood_radius_km) / float(dx)) # Iterate over ensemble members for member in ensemble_members: logging.info("Processing member " + str(member)) try: # ***************** READ MEMBER DATA INTO field_tagg HERE ****************** # *If calculating PMM of QPF, make sure field_tagg is a 2D numpy array of size (ny, nx) # containing this member's QPF over the desired period at the end of this code snippet. # Modules like netCDF4 or pygrib are likely useful here, depending on source data format. field_tagg = np.zeros((ny, nx)) # *************************************************************************** # Append 2D member data array to all_member_data list all_member_data.append(field_tagg) # Increment processed_member_count if data was successfully read processed_member_count += 1 except Exception, e: logging.warning(e) logging.warning("Member " + str(member) + " was not successfully processed") logging.info("Processed " + str(processed_member_count) + "/" + str(len(ensemble_members)) + " members") # Create numpy array of all data (i.e., convert outer list to np.array) all_member_data = np.array(all_member_data) # Mean of all members mean_data = np.mean(all_member_data, axis=0) # Shape of mean data array md_shape = mean_data.shape # Helper function to calculate LPMM for a single patch of the domain def patch_local_pmm(grid_bounds, **kwargs): member_grids = kwargs.get('member_grids') mean_grid = kwargs.get('mean_grid') i_bounds, j_bounds = grid_bounds i_start, i_end = i_bounds j_start, j_end = j_bounds # 2D array to hold LPMM for this patch local_pmm = np.zeros((j_end-j_start+1, i_end-i_start+1)) # Iterate over j-values (y-axis) for j in range(j_start, j_end+1): # j-index within LPMM patch j_pmm = j-j_start # j-index of southernmost row in neighborhood j_0 = max(j-nx_per_neighborhood, 0) # j-index of northernmost row in neighborhood j_f = min(j+nx_per_neighborhood, ny-1) # Iterate over i-values (x-axis) for i in range(i_start, i_end+1): # Within this loop, we're calculating LPMM for "center point" (i, j) # i-index within LPMM patch i_pmm = i-i_start # Mean value at this grid point mean_value_ij = mean_data[j, i] # If ensemble mean is 0 here, set LPMM to 0 and skip to next point if mean_value_ij == 0.: local_pmm[j_pmm, i_pmm] = 0. continue # i-index of westernmost column in neighborhood i_0 = max(i-nx_per_neighborhood, 0) # i-index of easternmost column in neighborhood i_f = min(i+nx_per_neighborhood, nx-1) # List of ensemble mean values for all grid points in neighborhood mean_nh_vals = mean_data[j_0:j_f+1, i_0:i_f+1].flatten() # Sort list mean_nh_vals.sort() # Find ensemble mean rank of this grid point within its neighborhood rank_mean = np.searchsorted(mean_nh_vals, mean_data[j, i]) # All member values for grid points in the neighborhood member_values = all_member_data[:, j_0:j_f+1, i_0:i_f+1] member_values = np.array(member_values.flatten(), dtype=" 1): logging.info("...Grid divided into " + str(thread_pool_size) + " patches, and each will be handled by its own thread") try: # Fill multiprocessing pool with threads to compute patches' LPMM until all are complete async_call = pool.map_async(functools.partial(patch_local_pmm, **{'member_grids': all_member_data, 'mean_grid': mean_data}), patch_bounds_list) # Start pool (will timeout after MAX_EXECUTE_TIME seconds) local_pmm_patches = async_call.get(MAX_EXECUTE_TIME) # Catch KeyboardInterrupt, in case, e.g., Ctrl+C is used to interrupt this script except KeyboardInterrupt: logging.warning("Caught KeyboardInterrupt; terminating workers and exiting") pool.terminate() sys.exit(1) else: pool.close() pool.join() # Main 2D array to contain LPMM values for entire model grid local_pmm = np.zeros(mean_data.shape) # LPMM patch currently on patch_on = 0 # Iterate over LPMM patches for patch in patch_bounds_list: pib, pjb = patch # i_0, i_f of patch pi0, pif = pib # j_0, j_f of patch pj0, pjf = pjb # Fill appropriate points in main 2D array with patch array values local_pmm[pj0:pjf+1, pi0:pif+1] = local_pmm_patches[patch_on] # Increment patch_on patch_on += 1 # Set pmm grid dtype to "