收藏本站 劰载中...网站公告 | 吾爱海洋论坛交流QQ群:835383472

xarray+dask处理大规模海洋气象数据

[复制链接]

气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。

; ~; R2 y$ U4 Y) o
% Q! P0 e t% l/ c& i

大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。

2 M0 w. o/ g, U. ^, E/ l( z 2 f. I( a1 H O( @( j& q- N

本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。

7 G! G0 w3 e$ }- Z$ O b+ t h - s* e8 n' ~& I; o/ J

数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。

7 k# [% W% S4 U1 y

其中几个关键点解释一下:

! x0 Q% F% B9 c) k, ]& W

(1)首先定义拟合正太分布的函数

4 Q- F' g8 U3 j0 { W/ T$ Y
def norm_fit(data): 8 c# b8 E2 R+ g. ]8 ]; e loc, scale = st.norm.fit(data)1 h( Y* d) u- O& @ return np.array([loc, scale])
' E1 j: r9 L; y0 o* W! L, [6 w. U

这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。

: }( N( I( O. a7 T

(2)xarray分块

/ C6 ^1 L2 M- C" @8 Y& K4 v
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) / _/ a T3 w# E' T6 o2 c3 j x = x.chunk({"lev":5, "lat":100,"lon":100})
& y9 q( ?2 R- G" e% a

xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。

* ]3 j5 x3 ]4 s

(3)xarray.apply_ufunc函数

O1 w: F6 Z ]$ P: n/ G0 V
result = xr.apply_ufunc(norm_fit,! k) P5 F) H! y. V$ D! H x,4 D- o7 ]2 V& |& h* P input_core_dims=[["time"]],, v( ], J9 X+ J* y9 Q0 N$ k output_core_dims=[["paras"]],' t0 j7 w$ x$ i; T5 e dask="parallelized",& v+ Q+ ?( B, Y9 z% N output_dtypes=[np.float], $ _$ a( x) }. w$ W- @7 e) H F9 E dask_gufunc_kwargs={output_sizes:{"paras":2}}2 I* a. R0 W/ O6 h )
. N6 N8 i" q* T

apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)

1 l' B; t! v% N0 v9 G4 D5 Q/ r # Y2 ^, E: g1 J$ g/ J1 L4 ]

以下是全部代码:

' k5 R3 S; p" s+ X; \$ t$ M( b
from scipy import stats as st- o& X' `' s( C* V3 ^3 A % [$ f" U' X3 X, g) z import xarray as xr9 _" J' ^1 k6 n g7 ^; o: T0 v$ a import dask2 A$ i: \* N+ W import numpy as np- `2 b8 k* r! Q4 w5 O from dask.diagnostics import ProgressBar R4 y9 }% M. l( y' O9 o2 _+ r5 p 7 u, X% v8 N- D1 M6 i def norm_fit(data):" @4 _/ Q6 l: W- Q loc, scale = st.norm.fit(data)( f8 i! f/ K! Y0 u# _: Z: Y return np.array([loc, scale]) " _8 y# _* d/ U# E3 N% b7 g3 v! G# g2 L. E rs = np.random.RandomState(0)# ] k+ d) f! I- o! G" @ x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])/ g9 R: B" F# }7 T* L" j& N0 Q! H x = x.chunk({"lev":5, "lat":100,"lon":100}) + [$ R$ f5 p, B! E9 {- K$ u # l+ W( E8 |' i5 l O$ s #使用apply_ufunc计算,并用dask的并行计算' n# P7 O7 V- x. H' j" x, ] result = xr.apply_ufunc(norm_fit,) }! Z) T6 G9 y, O8 s x,* {" x5 Y& {7 A( H5 I6 v input_core_dims=[["time"]],/ S u% h& j! @9 I; o$ `; u output_core_dims=[["paras"]],, G4 G7 }7 i; d2 x2 \ dask="parallelized"," \0 }. u: } o2 I6 O output_dtypes=[np.float], o$ ]. ~2 { q/ I* D( p dask_gufunc_kwargs={output_sizes:{"paras":2}} 7 p# C; T: b( | ) . H/ @4 ~9 y4 x1 h' F2 N1 { Z- Z/ G& `$ g #compute进行真实的计算并显示进度! W9 B4 ~4 h( p with ProgressBar():; L6 Z# s, G" J8 D) q0 m result = results.compute() ( ~5 S" Z% D. _5 n0 r( A& F6 ^; h2 Y i0 h #结果冲命名保存到nc文件5 S* @# q- K ^& F" [9 T6 C* N result = result.rename("norm_paras") + ]- ~9 d4 Y; Q result = result.to_netcdf("norm_fit_paras.nc",compute=False) Q* T1 ?* I1 F8 h" J with ProgressBar():. M# `6 q1 ?# T4 [7 O9 F* a result.compute()
! S6 |3 {; h O

转自:气海同途

: ?: U1 m# ^8 }7 X4 ^" q

关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料

回复

举报 使用道具

相关帖子

全部回帖
暂无回帖,快来参与回复吧
懒得打字?点击右侧快捷回复 【吾爱海洋论坛发文有奖】
您需要登录后才可以回帖 登录 | 立即注册
各安天命
活跃在2026-4-7
快速回复 返回顶部 返回列表