收藏本站 劰载中...网站公告 | 吾爱海洋论坛交流QQ群:835383472

xarray+dask处理大规模海洋气象数据

[复制链接]

气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。

; B5 K4 R2 {4 _- o( g+ N# F0 x& f
R5 {! @7 [1 ?8 _1 U

大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。

* l% i0 y3 U3 l+ |+ ]3 G & c1 g1 v) K6 y3 ^

本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。

5 W! |) I7 j8 q+ ]8 `3 V) Z5 O) Y$ l2 q+ l/ z1 g( ~

数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。

# J- q" e/ T! w

其中几个关键点解释一下:

' t: Z r6 n- r2 I& M& k

(1)首先定义拟合正太分布的函数

) K" j$ L, q0 w6 [) o3 X
def norm_fit(data):9 X4 p' u, n1 t; \$ m loc, scale = st.norm.fit(data)5 |( d H' ~# J return np.array([loc, scale])
% p+ |" |( I4 u! }/ _

这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。

' {; ]5 H/ y: [9 M+ g

(2)xarray分块

7 [6 e( e3 I( f6 Q; R* Q/ b7 P& ]
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])- T9 o2 q0 J8 u$ S3 {. \# K x = x.chunk({"lev":5, "lat":100,"lon":100})
3 m; h2 c. @6 X0 O: R

xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。

- y6 A7 ^( G+ Q. ]9 g- z$ o

(3)xarray.apply_ufunc函数

. R9 B, ?) W9 H1 C# z: P
result = xr.apply_ufunc(norm_fit, ( c* K2 N2 s' V- b# S: D' s x, - @4 I" J6 H) j+ w: L. w/ y: Y$ N input_core_dims=[["time"]], X# ~' b" T( e' }9 o output_core_dims=[["paras"]]," }) x% u0 f* s, m" f dask="parallelized",# B9 @# B1 L8 j- f output_dtypes=[np.float],$ [, J0 ?8 T3 A. w6 u7 Y$ i+ ^6 r; Q dask_gufunc_kwargs={output_sizes:{"paras":2}} * j2 U* Q8 G9 F% j$ b) U9 z- X) m )
' s H, O9 b" p

apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)

* ~' a- ^" f# c/ S3 c 9 P8 Q- P/ d& M" S' P

以下是全部代码:

4 w: o% A' y( l3 i3 u5 C
from scipy import stats as st " K, _: u6 f. v0 D' K9 S4 X9 m4 n; }) X* d7 o* ~$ J import xarray as xr - I6 ^3 n& L- y- ~: Z import dask9 k' j2 j, U& m! A% _ import numpy as np" \! H6 u: V A' q# N) W. g) t) D from dask.diagnostics import ProgressBar ' D6 M6 w& M9 I+ Z2 i4 ]* k7 @ @ def norm_fit(data): ; _5 Z- b4 k: C4 |9 r5 g loc, scale = st.norm.fit(data) " Y$ N- Q. x7 x2 p; { return np.array([loc, scale])) Q' O m3 i, x- D' [ ( w0 W( i8 T: p6 N6 M rs = np.random.RandomState(0) $ N) c! C/ W( w) u& f" w% g7 o2 s x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) + k* {/ |$ k$ s, }4 {) Y3 g. T x = x.chunk({"lev":5, "lat":100,"lon":100})+ d+ X1 u; h. K2 e+ c) R+ g- V % A7 b: M( V* x8 [ #使用apply_ufunc计算,并用dask的并行计算. \% H" e8 w& C result = xr.apply_ufunc(norm_fit, 1 M& C% g% i( m& ?+ Z x, : H* a4 n8 N) Q6 R input_core_dims=[["time"]],* r8 M \$ L& y4 f4 X z3 c output_core_dims=[["paras"]], 5 o: v4 `' Y# ]0 E7 f- P dask="parallelized", ; ^3 l9 ?$ h$ z9 n8 ` output_dtypes=[np.float],, \/ s- J% s" f( h- \ N7 q+ j dask_gufunc_kwargs={output_sizes:{"paras":2}} 1 V5 C W2 ~, B! ?& j4 x ) - ?( m2 `; y2 c% ^5 ]9 E U. a( V8 M8 T. Y% r #compute进行真实的计算并显示进度" ~4 \! y6 r0 Y9 t' {2 }" T with ProgressBar():* | s. _# }1 H4 v/ i1 E2 c5 L" m result = results.compute()9 H ]- a) m, _" D/ T7 ~, E3 g% K) \ 8 L- l8 L4 |" g6 N #结果冲命名保存到nc文件 6 B. W+ r9 m) T' S9 T! x! ?- W% d4 u result = result.rename("norm_paras"), G9 {3 @9 N- H0 ^ result = result.to_netcdf("norm_fit_paras.nc",compute=False)- n+ B, B/ s# f9 P with ProgressBar():# x3 p& k9 d0 u3 u% V result.compute()
$ I$ H% g2 h3 \/ y, V

转自:气海同途

. s$ z% m' w$ l/ G2 K' f- z) X

关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料

回复

举报 使用道具

相关帖子

全部回帖
暂无回帖,快来参与回复吧
懒得打字?点击右侧快捷回复 【吾爱海洋论坛发文有奖】
您需要登录后才可以回帖 登录 | 立即注册
各安天命
活跃在5 天前
快速回复 返回顶部 返回列表