|
气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。
% J3 b7 K2 M" @/ n 9 z! }% R% _" [0 [
大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。 # @/ V+ r- X, @1 l
6 D& t& Y* b0 s 本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。
6 V8 X/ n# J9 S0 O \) ]
2 X& }4 C6 K3 F* n, }, r$ | 数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。
* L5 S! v0 {. w 其中几个关键点解释一下:
$ c5 b& y' m9 X* D) A- I (1)首先定义拟合正太分布的函数
$ m8 @+ v( E( R- T! _( f8 n o def norm_fit(data):
7 V( p" ~4 d2 @% ^ loc, scale = st.norm.fit(data)
4 B3 `/ Y: d% [0 j+ \ return np.array([loc, scale])
0 F& N* U" W) O. P5 w& \3 G' E 这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。 8 U4 d" q8 ]! C: @& Z, k
(2)xarray分块
- N1 m0 e2 P' e1 h x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])
) A! M# W( ?3 y; M, ^ x = x.chunk({"lev":5, "lat":100,"lon":100})
# X0 t7 {$ m2 N' F; u xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。 / I% P% C$ b5 {7 T! h
(3)xarray.apply_ufunc函数
+ x' b- _+ K* |0 f; M3 U& ~( a result = xr.apply_ufunc(norm_fit,7 H2 {" m% q: y) L3 ~: ~
x,! c$ ]$ P4 c% J
input_core_dims=[["time"]],
6 I3 b: g0 c7 f% h; B: x output_core_dims=[["paras"]],7 R8 g1 [; {; ]5 U' H- b
dask="parallelized",; D/ ~7 }) _8 ]; G2 x
output_dtypes=[np.float],7 P; S, q( b$ M, {
dask_gufunc_kwargs={output_sizes:{"paras":2}}$ U- ^9 P K$ R; a
) F N# p4 t3 L; R5 ~
apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)
0 R0 p# o( ~8 {& b2 I, L4 n+ q% L; C5 C5 {. V" K# h
以下是全部代码:
+ n* {+ M, |0 p8 U from scipy import stats as st$ x2 Q+ ]/ j# X0 m, P" z( h- G
; K' a3 _) h* }" b3 q import xarray as xr4 c' J3 h' I; L: o }: y8 O
import dask
/ y' L6 ^3 `9 T$ W3 H/ W/ C' e. ] import numpy as np
- V K0 V0 _' ?9 U& `8 P4 j' e from dask.diagnostics import ProgressBar$ }/ J2 m9 _; h+ H" E1 X
0 w8 ~& d5 n: c; g: Y, A) r* ?
def norm_fit(data):1 |( A) M3 L5 X5 _9 \) z
loc, scale = st.norm.fit(data)- o( `# S: j- a
return np.array([loc, scale])
) ]5 z: P" f: M3 ~6 T1 z+ m F+ y% c( m( G; ^
rs = np.random.RandomState(0)
/ y& j! E" e- S& G! P x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])
$ s, Q$ Q" g- s7 @) a x = x.chunk({"lev":5, "lat":100,"lon":100})
2 ^0 }4 m# e9 |- \0 v
$ c5 i! O# W9 N) d" r #使用apply_ufunc计算,并用dask的并行计算
4 {3 t$ E0 ^2 ` result = xr.apply_ufunc(norm_fit,4 r1 ] R [, ]" U8 R- l+ b
x,
- P( M2 K% k: Z input_core_dims=[["time"]], V9 v4 o' z3 k, U) W! ? f% U
output_core_dims=[["paras"]]," r1 o6 M2 Y0 ?7 ~7 X, c+ n8 t
dask="parallelized",
( ?- {1 e; ?6 _ output_dtypes=[np.float],! u9 m: i+ t9 K% ?6 b$ k' j* F
dask_gufunc_kwargs={output_sizes:{"paras":2}}' b& Z4 G# I* a" U% T1 X
)
' w8 [/ E M, T$ X* J' O
* K2 y* W# @+ M B& e6 z. H: p #compute进行真实的计算并显示进度
v3 R& y0 Z7 Q) a# p with ProgressBar():% }1 c$ R4 Z8 g; {, t' M
result = results.compute()
7 ?0 @( |8 D% H# h( Z- c- J4 \2 ?+ d
# I% V: L- ?' F4 C, }6 p M" x #结果冲命名保存到nc文件
' W3 D1 L% D; r+ @8 H% i result = result.rename("norm_paras")
5 H6 {6 [4 a8 ` result = result.to_netcdf("norm_fit_paras.nc",compute=False)
8 y! a4 f- v7 k4 \; m+ X; j5 d with ProgressBar():
$ H6 w. w% U% X+ S* j7 a! o) @ result.compute()
" }* A2 ], L# y1 M+ i0 F4 [ 转自:气海同途
6 A& [3 { L# j' z6 @) W* J 关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料 |