|
气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。 ; B5 K4 R2 {4 _- o( g+ N# F0 x& f
R5 {! @7 [1 ?8 _1 U
大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。 * l% i0 y3 U3 l+ |+ ]3 G
& c1 g1 v) K6 y3 ^ 本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。
5 W! |) I7 j8 q+ ]8 `3 V) Z5 O) Y$ l2 q+ l/ z1 g( ~
数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。 # J- q" e/ T! w
其中几个关键点解释一下: ' t: Z r6 n- r2 I& M& k
(1)首先定义拟合正太分布的函数 ) K" j$ L, q0 w6 [) o3 X
def norm_fit(data):9 X4 p' u, n1 t; \$ m
loc, scale = st.norm.fit(data)5 |( d H' ~# J
return np.array([loc, scale]) % p+ |" |( I4 u! }/ _
这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。 ' {; ]5 H/ y: [9 M+ g
(2)xarray分块 7 [6 e( e3 I( f6 Q; R* Q/ b7 P& ]
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])- T9 o2 q0 J8 u$ S3 {. \# K
x = x.chunk({"lev":5, "lat":100,"lon":100}) 3 m; h2 c. @6 X0 O: R
xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。 - y6 A7 ^( G+ Q. ]9 g- z$ o
(3)xarray.apply_ufunc函数
. R9 B, ?) W9 H1 C# z: P result = xr.apply_ufunc(norm_fit,
( c* K2 N2 s' V- b# S: D' s x,
- @4 I" J6 H) j+ w: L. w/ y: Y$ N input_core_dims=[["time"]], X# ~' b" T( e' }9 o
output_core_dims=[["paras"]]," }) x% u0 f* s, m" f
dask="parallelized",# B9 @# B1 L8 j- f
output_dtypes=[np.float],$ [, J0 ?8 T3 A. w6 u7 Y$ i+ ^6 r; Q
dask_gufunc_kwargs={output_sizes:{"paras":2}}
* j2 U* Q8 G9 F% j$ b) U9 z- X) m ) ' s H, O9 b" p
apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点) * ~' a- ^" f# c/ S3 c
9 P8 Q- P/ d& M" S' P
以下是全部代码:
4 w: o% A' y( l3 i3 u5 C from scipy import stats as st
" K, _: u6 f. v0 D' K9 S4 X9 m4 n; }) X* d7 o* ~$ J
import xarray as xr
- I6 ^3 n& L- y- ~: Z import dask9 k' j2 j, U& m! A% _
import numpy as np" \! H6 u: V A' q# N) W. g) t) D
from dask.diagnostics import ProgressBar
' D6 M6 w& M9 I+ Z2 i4 ]* k7 @ @
def norm_fit(data):
; _5 Z- b4 k: C4 |9 r5 g loc, scale = st.norm.fit(data)
" Y$ N- Q. x7 x2 p; { return np.array([loc, scale])) Q' O m3 i, x- D' [
( w0 W( i8 T: p6 N6 M
rs = np.random.RandomState(0)
$ N) c! C/ W( w) u& f" w% g7 o2 s x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])
+ k* {/ |$ k$ s, }4 {) Y3 g. T x = x.chunk({"lev":5, "lat":100,"lon":100})+ d+ X1 u; h. K2 e+ c) R+ g- V
% A7 b: M( V* x8 [ #使用apply_ufunc计算,并用dask的并行计算. \% H" e8 w& C
result = xr.apply_ufunc(norm_fit,
1 M& C% g% i( m& ?+ Z x,
: H* a4 n8 N) Q6 R input_core_dims=[["time"]],* r8 M \$ L& y4 f4 X z3 c
output_core_dims=[["paras"]],
5 o: v4 `' Y# ]0 E7 f- P dask="parallelized",
; ^3 l9 ?$ h$ z9 n8 ` output_dtypes=[np.float],, \/ s- J% s" f( h- \ N7 q+ j
dask_gufunc_kwargs={output_sizes:{"paras":2}}
1 V5 C W2 ~, B! ?& j4 x )
- ?( m2 `; y2 c% ^5 ]9 E
U. a( V8 M8 T. Y% r #compute进行真实的计算并显示进度" ~4 \! y6 r0 Y9 t' {2 }" T
with ProgressBar():* | s. _# }1 H4 v/ i1 E2 c5 L" m
result = results.compute()9 H ]- a) m, _" D/ T7 ~, E3 g% K) \
8 L- l8 L4 |" g6 N
#结果冲命名保存到nc文件
6 B. W+ r9 m) T' S9 T! x! ?- W% d4 u result = result.rename("norm_paras"), G9 {3 @9 N- H0 ^
result = result.to_netcdf("norm_fit_paras.nc",compute=False)- n+ B, B/ s# f9 P
with ProgressBar():# x3 p& k9 d0 u3 u% V
result.compute()
$ I$ H% g2 h3 \/ y, V 转自:气海同途
. s$ z% m' w$ l/ G2 K' f- z) X 关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料 |