|
气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。 . @1 e; ~; u W. T0 Q7 c* }$ C+ N
M6 i3 P9 y$ ~+ \3 u& E
大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。 % A& E# P1 V* W
6 @! G; a5 x' x z$ H6 } 本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。 1 w3 w" \) E3 v2 u
1 F/ J' g% | x6 b# R
数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。 2 e/ v8 |; U+ v: ], a r
其中几个关键点解释一下:
6 c. V' `9 K: d# K R (1)首先定义拟合正太分布的函数 * W( } ~1 t( v9 u
def norm_fit(data):
9 r) L) _" h' \: K# |( s( K loc, scale = st.norm.fit(data)3 `% D4 R) O! J/ J
return np.array([loc, scale])
0 p" [% Y: @5 D 这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。 . Y+ H0 b) B# q. R" h5 v, E+ `
(2)xarray分块
2 ?3 e1 T7 p/ k+ K Q2 M' l x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])& [5 _: P3 u# n$ Z) w% ^; d
x = x.chunk({"lev":5, "lat":100,"lon":100}) 6 d6 K$ H6 [* ^( H2 ]
xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。
! M+ s" h* T$ g) T, l: V (3)xarray.apply_ufunc函数
f6 a2 ]* p1 Z2 Y result = xr.apply_ufunc(norm_fit,% @# J. Q. f6 m% o) D( F1 d
x,% I- H+ |5 w3 n: J# g
input_core_dims=[["time"]],% U; v' V7 f' L2 l' }% e: @/ c. S
output_core_dims=[["paras"]],# k4 V; |4 D9 f. r7 ~
dask="parallelized",5 A) Q/ _* B1 ^- Z$ }+ t& \4 E @
output_dtypes=[np.float],
* X; v: E' M3 z dask_gufunc_kwargs={output_sizes:{"paras":2}}, x" s8 c% w; d) R3 C+ c
) 5 t4 D- t* H0 J
apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)
! i. b- r% k8 h# T* Q0 H. F
, {* D! ?& P+ w 以下是全部代码: % V( G5 W$ o- G. R
from scipy import stats as st
; a$ R# K# Q+ [) o# x0 X- d7 Y* x- k4 a& b+ U) V( e! B, l1 \) x
import xarray as xr
" t$ {4 e3 S5 }/ P( N; ?$ N: h import dask
3 i) d7 U& R) I/ K; @ import numpy as np
" [4 z3 e, I+ l6 o2 e& M+ g from dask.diagnostics import ProgressBar$ B! {/ m# s }) \9 n6 ?5 J
/ v& p {1 ~; ^5 `* T2 B def norm_fit(data):7 S1 |" P. U2 O! p( }4 ]2 j
loc, scale = st.norm.fit(data)7 n7 s, l; n+ Y1 R p4 ?9 Y. v8 i5 `
return np.array([loc, scale])
2 w: ~( }- S' `: r: L3 y
8 {6 X- K7 S# b* v rs = np.random.RandomState(0)$ M% }; |9 Z4 Q/ z
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])# E3 n2 X4 m, {' V+ E8 U; ^
x = x.chunk({"lev":5, "lat":100,"lon":100})
! X; u$ ?, V+ `1 W" b6 i4 v
' k6 n" u2 A. Y5 Z5 a #使用apply_ufunc计算,并用dask的并行计算
6 L) @5 Q/ `# g4 ~0 Y0 Q# O9 L result = xr.apply_ufunc(norm_fit,
7 t. E5 W& e& [) D2 m) ` x,) r# J; p$ h6 I8 w0 \0 d' x
input_core_dims=[["time"]],
9 x4 m/ S& @. Z2 J3 x; @3 [ output_core_dims=[["paras"]],
% g" Y3 F8 w5 e dask="parallelized",/ M9 K9 ?2 E; Z, k, D0 B
output_dtypes=[np.float],. X, ~) y* ?% D* w' }- n
dask_gufunc_kwargs={output_sizes:{"paras":2}}5 K4 {6 O( i+ H
)0 C t, r. Q3 h+ Z6 s5 b7 t. A
; L1 b' g# D4 a9 D* c+ U
#compute进行真实的计算并显示进度
/ a! ?, Z0 X7 l. i8 \6 Y! }1 P with ProgressBar():
' Q8 z( X6 k0 u. J% b result = results.compute()
7 F; I* p$ O3 i; d/ J1 V. C6 h# o! b# ?
#结果冲命名保存到nc文件% }; e& a0 s! G
result = result.rename("norm_paras")% D6 t* X. e" _( J. S
result = result.to_netcdf("norm_fit_paras.nc",compute=False), L. H, a0 J$ G+ g J
with ProgressBar():+ w8 l& t' j; {7 ?! }: j. n% U
result.compute()
0 Y+ s8 o; w7 G 转自:气海同途 $ I/ I3 ~9 s' y# _
关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料 |