|
气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。
; ~; R2 y$ U4 Y) o
% Q! P0 e t% l/ c& i 大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。 2 M0 w. o/ g, U. ^, E/ l( z
2 f. I( a1 H O( @( j& q- N
本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。 7 G! G0 w3 e$ }- Z$ O b+ t h
- s* e8 n' ~& I; o/ J 数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。
7 k# [% W% S4 U1 y 其中几个关键点解释一下: ! x0 Q% F% B9 c) k, ]& W
(1)首先定义拟合正太分布的函数
4 Q- F' g8 U3 j0 { W/ T$ Y def norm_fit(data):
8 c# b8 E2 R+ g. ]8 ]; e loc, scale = st.norm.fit(data)1 h( Y* d) u- O& @
return np.array([loc, scale]) ' E1 j: r9 L; y0 o* W! L, [6 w. U
这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。
: }( N( I( O. a7 T (2)xarray分块
/ C6 ^1 L2 M- C" @8 Y& K4 v x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])
/ _/ a T3 w# E' T6 o2 c3 j x = x.chunk({"lev":5, "lat":100,"lon":100}) & y9 q( ?2 R- G" e% a
xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。
* ]3 j5 x3 ]4 s (3)xarray.apply_ufunc函数 O1 w: F6 Z ]$ P: n/ G0 V
result = xr.apply_ufunc(norm_fit,! k) P5 F) H! y. V$ D! H
x,4 D- o7 ]2 V& |& h* P
input_core_dims=[["time"]],, v( ], J9 X+ J* y9 Q0 N$ k
output_core_dims=[["paras"]],' t0 j7 w$ x$ i; T5 e
dask="parallelized",& v+ Q+ ?( B, Y9 z% N
output_dtypes=[np.float],
$ _$ a( x) }. w$ W- @7 e) H F9 E dask_gufunc_kwargs={output_sizes:{"paras":2}}2 I* a. R0 W/ O6 h
)
. N6 N8 i" q* T apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点) 1 l' B; t! v% N0 v9 G4 D5 Q/ r
# Y2 ^, E: g1 J$ g/ J1 L4 ] 以下是全部代码:
' k5 R3 S; p" s+ X; \$ t$ M( b from scipy import stats as st- o& X' `' s( C* V3 ^3 A
% [$ f" U' X3 X, g) z import xarray as xr9 _" J' ^1 k6 n g7 ^; o: T0 v$ a
import dask2 A$ i: \* N+ W
import numpy as np- `2 b8 k* r! Q4 w5 O
from dask.diagnostics import ProgressBar R4 y9 }% M. l( y' O9 o2 _+ r5 p
7 u, X% v8 N- D1 M6 i def norm_fit(data):" @4 _/ Q6 l: W- Q
loc, scale = st.norm.fit(data)( f8 i! f/ K! Y0 u# _: Z: Y
return np.array([loc, scale])
" _8 y# _* d/ U# E3 N% b7 g3 v! G# g2 L. E
rs = np.random.RandomState(0)# ] k+ d) f! I- o! G" @
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])/ g9 R: B" F# }7 T* L" j& N0 Q! H
x = x.chunk({"lev":5, "lat":100,"lon":100})
+ [$ R$ f5 p, B! E9 {- K$ u
# l+ W( E8 |' i5 l O$ s #使用apply_ufunc计算,并用dask的并行计算' n# P7 O7 V- x. H' j" x, ]
result = xr.apply_ufunc(norm_fit,) }! Z) T6 G9 y, O8 s
x,* {" x5 Y& {7 A( H5 I6 v
input_core_dims=[["time"]],/ S u% h& j! @9 I; o$ `; u
output_core_dims=[["paras"]],, G4 G7 }7 i; d2 x2 \
dask="parallelized"," \0 }. u: } o2 I6 O
output_dtypes=[np.float], o$ ]. ~2 { q/ I* D( p
dask_gufunc_kwargs={output_sizes:{"paras":2}}
7 p# C; T: b( | )
. H/ @4 ~9 y4 x1 h' F2 N1 { Z- Z/ G& `$ g
#compute进行真实的计算并显示进度! W9 B4 ~4 h( p
with ProgressBar():; L6 Z# s, G" J8 D) q0 m
result = results.compute()
( ~5 S" Z% D. _5 n0 r( A& F6 ^; h2 Y i0 h
#结果冲命名保存到nc文件5 S* @# q- K ^& F" [9 T6 C* N
result = result.rename("norm_paras")
+ ]- ~9 d4 Y; Q result = result.to_netcdf("norm_fit_paras.nc",compute=False)
Q* T1 ?* I1 F8 h" J with ProgressBar():. M# `6 q1 ?# T4 [7 O9 F* a
result.compute()
! S6 |3 {; h O 转自:气海同途
: ?: U1 m# ^8 }7 X4 ^" q 关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料 |