1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
| import numpy as np from scipy import ndimage
from numpy.linalg import norm import cv2 import numpy as np import matplotlib.pyplot as plt np.set_printoptions(threshold=np.inf)
def compute_SAL(field_obs, field_fc, user_threshold=None): """SAL-score (Structure, Amplitude, Location)
Parameters ---------- field_obs : numpy.ndarray 2D observation (or reference) field field_fc : numpy.ndarray 2D forecast field (same shape as `field_obs`) user_threshold : float, optional If set, use this threshold for object separation. 如果已设置,请使用此阈值进行对象分离。 If not set, use the default method of Wernli is used: ``threshold = field_obs.max() * 1/15``
Returns ------- S : float Structure score A : float Amplitude score L : float Location score L1, L2 : float Location score components `L1` and `L2` NObjects_obs : float Number of thresholded objects in observation 观察中的阈值对象数 NObjects_mod : float Number of objects in model (forecast `fc`) 模型中的对象数(预测“fc”) R_max : float Maximum Rain-value of observations `field_obs` 观测值的最大降雨值` user_treshold : float Threshold that was actually used 实际使用的阈值 """ if (field_obs.shape != field_fc.shape): raise Exception("field_obs and field_fc need to have the same shape" +\ " (this function also assumes that they cover the same domain!)")
if (user_threshold is not None) and (type(user_threshold) != float): raise Exception("user_threshold must be a float")
R_mod = field_fc R_obs = field_obs
if np.any(R_mod < 0.): raise Exception("negative values in forecast." +\ "use different field or apply SAL.threshold_to_zero(field)")
if np.any(R_obs < 0.): raise Exception("negative values in forecast." +\ "use different field or apply SAL.threshold_to_zero(field)")
f_threshold = 1/15. R_max_obs = np.nanpercentile(np.where(R_obs>0.1, R_obs, np.nan), 95) threshold_obs = R_max_obs * f_threshold R_max_mod = np.nanpercentile(np.where(R_mod>0.1, R_mod,np.nan), 95) threshold_mod = R_max_mod * f_threshold
if (user_threshold is not None): threshold_obs = user_threshold threshold_mod = user_threshold
R_mod_thr = np.where(R_mod > threshold_mod, R_mod, 0.) R_obs_thr = np.where(R_obs > threshold_obs, R_obs, 0.)
D_R_mod = np.mean(R_mod) D_R_obs = np.mean(R_obs)
A = (D_R_mod - D_R_obs) / (0.5 * (D_R_mod + D_R_obs))
x_R_mod = np.array(ndimage.measurements.center_of_mass(R_mod)) x_R_obs = np.array(ndimage.measurements.center_of_mass(R_obs))
d_diagonal = norm( L1 = norm(x_R_mod - x_R_obs) / d_diagonal
labels_mod, NObjects_mod = ndimage.measurements.label(R_mod_thr) labels_obs, NObjects_obs = ndimage.measurements.label(R_obs_thr)
labels_list = [labels_mod, labels_obs] NObjects_list = [NObjects_mod, NObjects_obs]
R_list = [R_mod, R_obs] x_R_list = [x_R_mod, x_R_obs]
r_list = [0,0] V_list = [0,0]
for iList, NObjects in enumerate(NObjects_list): R_n_list = np.zeros(NObjects) V_n_list = np.zeros(NObjects) distance_n_list = np.zeros(NObjects)
for n in range(NObjects):
label = n + 1
object_n = np.where(labels_list[iList] == label, R_list[iList], 0) R_n_list[n] = object_n.sum() V_n_list[n] = R_n_list[n] / object_n.max() x_n = np.array(ndimage.measurements.center_of_mass(object_n)) distance_n_list[n] = norm(x_R_list[iList] - x_n)
R_n_list = np.array(R_n_list) distance_n_list = np.array(distance_n_list) r_list[iList] = (R_n_list*distance_n_list).sum() / R_n_list.sum() V_list[iList] = (R_n_list*V_n_list).sum() / R_n_list.sum()
L2 = 2.*norm(r_list[0] - r_list[1]) / d_diagonal L = L1 + L2 S = (V_list[0] - V_list[1]) / (0.5 * (V_list[0] + V_list[1])) return S, A, L, L1, L2, NObjects_obs, NObjects_mod, R_max_obs, R_max_mod, threshold_obs, threshold_mod
def PrintResultSAL(field_obs, field_fc, user_threshold=None): S, A, L, L1, L2, NObjects_obs, NObjects_mod, R_max_obs, R_max_mod, threshold_obs, threshold_mod = compute_SAL(field_obs,field_fc) print("S = ", S) print("A = ", A) print("L = ", L)
def matrix_SAL(filename1, filename2): EC_fst = np.load(filename1) file_fst = EC_fst['f'] result_fst = file_fst[0].reshape(210,211)
EC_obs = np.load(filename2) file_obs = EC_obs['l'] result_obs = file_obs[0].reshape(210,211) PrintResultSAL(result_fst,result_obs)
|