收藏本站 劰载中...网站公告 | 吾爱海洋论坛交流QQ群:835383472

xarray+dask处理大规模海洋气象数据

[复制链接]

气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。

% J3 b7 K2 M" @/ n
9 z! }% R% _" [0 [

大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。

# @/ V+ r- X, @1 l 6 D& t& Y* b0 s

本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。

6 V8 X/ n# J9 S0 O \) ] 2 X& }4 C6 K3 F* n, }, r$ |

数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。

* L5 S! v0 {. w

其中几个关键点解释一下:

$ c5 b& y' m9 X* D) A- I

(1)首先定义拟合正太分布的函数

$ m8 @+ v( E( R- T! _( f8 n o
def norm_fit(data): 7 V( p" ~4 d2 @% ^ loc, scale = st.norm.fit(data) 4 B3 `/ Y: d% [0 j+ \ return np.array([loc, scale])
0 F& N* U" W) O. P5 w& \3 G' E

这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。

8 U4 d" q8 ]! C: @& Z, k

(2)xarray分块

- N1 m0 e2 P' e1 h
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) ) A! M# W( ?3 y; M, ^ x = x.chunk({"lev":5, "lat":100,"lon":100})
# X0 t7 {$ m2 N' F; u

xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。

/ I% P% C$ b5 {7 T! h

(3)xarray.apply_ufunc函数

+ x' b- _+ K* |0 f; M3 U& ~( a
result = xr.apply_ufunc(norm_fit,7 H2 {" m% q: y) L3 ~: ~ x,! c$ ]$ P4 c% J input_core_dims=[["time"]], 6 I3 b: g0 c7 f% h; B: x output_core_dims=[["paras"]],7 R8 g1 [; {; ]5 U' H- b dask="parallelized",; D/ ~7 }) _8 ]; G2 x output_dtypes=[np.float],7 P; S, q( b$ M, { dask_gufunc_kwargs={output_sizes:{"paras":2}}$ U- ^9 P K$ R; a )
F N# p4 t3 L; R5 ~

apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)

0 R0 p# o( ~8 {& b2 I, L4 n+ q% L; C5 C5 {. V" K# h

以下是全部代码:

+ n* {+ M, |0 p8 U
from scipy import stats as st$ x2 Q+ ]/ j# X0 m, P" z( h- G ; K' a3 _) h* }" b3 q import xarray as xr4 c' J3 h' I; L: o }: y8 O import dask / y' L6 ^3 `9 T$ W3 H/ W/ C' e. ] import numpy as np - V K0 V0 _' ?9 U& `8 P4 j' e from dask.diagnostics import ProgressBar$ }/ J2 m9 _; h+ H" E1 X 0 w8 ~& d5 n: c; g: Y, A) r* ? def norm_fit(data):1 |( A) M3 L5 X5 _9 \) z loc, scale = st.norm.fit(data)- o( `# S: j- a return np.array([loc, scale]) ) ]5 z: P" f: M3 ~6 T1 z+ m F+ y% c( m( G; ^ rs = np.random.RandomState(0) / y& j! E" e- S& G! P x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) $ s, Q$ Q" g- s7 @) a x = x.chunk({"lev":5, "lat":100,"lon":100}) 2 ^0 }4 m# e9 |- \0 v $ c5 i! O# W9 N) d" r #使用apply_ufunc计算,并用dask的并行计算 4 {3 t$ E0 ^2 ` result = xr.apply_ufunc(norm_fit,4 r1 ] R [, ]" U8 R- l+ b x, - P( M2 K% k: Z input_core_dims=[["time"]], V9 v4 o' z3 k, U) W! ? f% U output_core_dims=[["paras"]]," r1 o6 M2 Y0 ?7 ~7 X, c+ n8 t dask="parallelized", ( ?- {1 e; ?6 _ output_dtypes=[np.float],! u9 m: i+ t9 K% ?6 b$ k' j* F dask_gufunc_kwargs={output_sizes:{"paras":2}}' b& Z4 G# I* a" U% T1 X ) ' w8 [/ E M, T$ X* J' O * K2 y* W# @+ M B& e6 z. H: p #compute进行真实的计算并显示进度 v3 R& y0 Z7 Q) a# p with ProgressBar():% }1 c$ R4 Z8 g; {, t' M result = results.compute() 7 ?0 @( |8 D% H# h( Z- c- J4 \2 ?+ d # I% V: L- ?' F4 C, }6 p M" x #结果冲命名保存到nc文件 ' W3 D1 L% D; r+ @8 H% i result = result.rename("norm_paras") 5 H6 {6 [4 a8 ` result = result.to_netcdf("norm_fit_paras.nc",compute=False) 8 y! a4 f- v7 k4 \; m+ X; j5 d with ProgressBar(): $ H6 w. w% U% X+ S* j7 a! o) @ result.compute()
" }* A2 ], L# y1 M+ i0 F4 [

转自:气海同途

6 A& [3 { L# j' z6 @) W* J

关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料

回复

举报 使用道具

相关帖子

全部回帖
暂无回帖,快来参与回复吧
懒得打字?点击右侧快捷回复 【吾爱海洋论坛发文有奖】
您需要登录后才可以回帖 登录 | 立即注册
各安天命
活跃在2026-4-7
快速回复 返回顶部 返回列表