diff --git a/nmc_met_io/retrieve_cassandraDB.py b/nmc_met_io/retrieve_cassandraDB.py index 283fcdf..3b1bba6 100644 --- a/nmc_met_io/retrieve_cassandraDB.py +++ b/nmc_met_io/retrieve_cassandraDB.py @@ -32,6 +32,19 @@ import nmc_met_io.config as CONFIG from nmc_met_io.read_radar import StandardData from nmc_met_io.read_satellite import resolve_awx_bytearray +from nmc_met_io.retrieve_shared import ( + collect_model_3d_grid, + collect_model_3d_grids, + collect_model_grids, + collect_station_dataset, + collect_xarray_dataset, + lzw_decompress, + parse_model_grid_bytearray, + parse_radar_mosaic_bytearray, + parse_station_data_bytearray, + parse_swan_radar_bytearray, + parse_tlogp_bytearray, +) try: # please install cassandra-driver first @@ -248,202 +261,14 @@ def get_model_grid(directory, filename=None, suffix="*.024", print('There is no data ' + filename + ' in ' + directory) return None - # define head information structure (278 bytes) - head_dtype = [('discriminator', 'S4'), ('type', 'i2'), - ('modelName', 'S20'), ('element', 'S50'), - ('description', 'S30'), ('level', 'f4'), - ('year', 'i4'), ('month', 'i4'), ('day', 'i4'), - ('hour', 'i4'), ('timezone', 'i4'), - ('period', 'i4'), ('startLongitude', 'f4'), - ('endLongitude', 'f4'), ('longitudeGridSpace', 'f4'), - ('longitudeGridNumber', 'i4'), - ('startLatitude', 'f4'), ('endLatitude', 'f4'), - ('latitudeGridSpace', 'f4'), - ('latitudeGridNumber', 'i4'), - ('isolineStartValue', 'f4'), - ('isolineEndValue', 'f4'), - ('isolineSpace', 'f4'), - ('perturbationNumber', 'i2'), - ('ensembleTotalNumber', 'i2'), - ('minute', 'i2'), ('second', 'i2'), - ('Extent', 'S92')] - - # read head information - head_info = np.frombuffer(byteArray[0:278], dtype=head_dtype) - - # get required grid information - data_type = head_info['type'][0] - nlon = head_info['longitudeGridNumber'][0] - nlat = head_info['latitudeGridNumber'][0] - nmem = head_info['ensembleTotalNumber'][0] - - # define data structure - if data_type == 4: - data_dtype = [('data', 'f4', (nlat, nlon))] - data_len = nlat * nlon * 4 - elif data_type == 11: - data_dtype = [('data', 'f4', (2, nlat, nlon))] - data_len = 2 * nlat * nlon * 4 - else: - raise Exception("Data type is not supported") - - # read data - if nmem == 0: - data = np.frombuffer(byteArray[278:], dtype=data_dtype) - data = np.squeeze(data['data']) - else: - if data_type == 4: - data = np.full((nmem, nlat, nlon), np.nan) - else: - data = np.full((nmem, 2, nlat, nlon), np.nan) - ind = 0 - for _ in range(nmem): - head_info_mem = np.frombuffer( - byteArray[ind:(ind+278)], dtype=head_dtype) - ind += 278 - data_mem = np.frombuffer( - byteArray[ind:(ind+data_len)], dtype=data_dtype) - ind += data_len - number = head_info_mem['perturbationNumber'][0] - if data_type == 4: - data[number, :, :] = np.squeeze(data_mem['data']) - else: - data[number, :, :, :] = np.squeeze(data_mem['data']) - - # scale and offset the data, if necessary. - if scale_off is not None: - data = data * scale_off[0] + scale_off[1] - - # construct longitude and latitude coordinates - slon = head_info['startLongitude'][0] - dlon = head_info['longitudeGridSpace'][0] - slat = head_info['startLatitude'][0] - dlat = head_info['latitudeGridSpace'][0] - lon = np.arange(nlon) * dlon + slon - lat = np.arange(nlat) * dlat + slat - level = np.array([head_info['level'][0]]) - - # construct initial time and forecast hour - init_time = datetime(head_info['year'][0], head_info['month'][0], - head_info['day'][0], head_info['hour'][0]) - fhour = np.array([head_info['period'][0]], dtype=np.float64) - time = init_time + timedelta(hours=fhour[0]) - init_time = np.array([init_time], dtype='datetime64[ms]') - time = np.array([time], dtype='datetime64[ms]') - - # define coordinates - time_coord = ('time', time) - lon_coord = ('lon', lon, { - 'long_name':'longitude', 'units':'degrees_east', - '_CoordinateAxisType':'Lon', "axis": "X"}) - lat_coord = ('lat', lat, { - 'long_name':'latitude', 'units':'degrees_north', - '_CoordinateAxisType':'Lat', 'axis': "Y"}) - if level[0] != 0: - level_coord = ('level', level, levattrs) - if nmem != 0: - number = np.arange(nmem) - number_coord = ('number', number, {'_CoordinateAxisType':'Ensemble'}) - - # create to xarray - if data_type == 4: - if nmem == 0: - if level[0] == 0: - data = data[np.newaxis, ...] - data = xr.Dataset({ - varname:(['time', 'lat', 'lon'], data, varattrs)}, - coords={ - 'time':time_coord, 'lat':lat_coord, 'lon':lon_coord}) - else: - data = data[np.newaxis, np.newaxis, ...] - data = xr.Dataset({ - varname:(['time', 'level', 'lat', 'lon'], data, varattrs)}, - coords={ - 'time':time_coord, 'level':level_coord, - 'lat':lat_coord, 'lon':lon_coord}) - else: - if level[0] == 0: - data = data[:, np.newaxis, ...] - data = xr.Dataset({ - varname:(['number', 'time', 'lat', 'lon'], data, varattrs)}, - coords={ - 'number':number_coord, 'time':time_coord, - 'lat':lat_coord, 'lon':lon_coord}) - else: - data = data[:, np.newaxis, np.newaxis, ...] - data = xr.Dataset({ - varname:(['number', 'time', 'level', 'lat', 'lon'], data, varattrs)}, - coords={ - 'number':number_coord, 'time':time_coord, 'level':level_coord, - 'lat':lat_coord, 'lon':lon_coord}) - elif data_type == 11: - speedattrs = {'long_name':'wind speed', 'units':'m/s'} - angleattrs = {'long_name':'wind angle', 'units':'degree'} - if nmem == 0: - speed = np.squeeze(data[0, :, :]) - angle = np.squeeze(data[1, :, :]) - # 原始数据文件中存储为: 西风为0度,南风为90度,东风为180度,北风为270度 - # 修改为气象风向常规定义: 北方为0度, 东风为90度, 南风为180度, 西方为270度 - angle = 270. - angle - angle[angle<0] = angle[angle<0] + 360. - if level[0] == 0: - speed = speed[np.newaxis, ...] - angle = angle[np.newaxis, ...] - data = xr.Dataset({ - 'speed': (['time', 'lat', 'lon'], speed, speedattrs), - 'angle': (['time', 'lat', 'lon'], angle, angleattrs)}, - coords={'lon': lon_coord, 'lat': lat_coord, 'time': time_coord}) - else: - speed = speed[np.newaxis, np.newaxis, ...] - angle = angle[np.newaxis, np.newaxis, ...] - data = xr.Dataset({ - 'speed': (['time', 'level', 'lat', 'lon'], speed, speedattrs), - 'angle': (['time', 'level', 'lat', 'lon'], angle, angleattrs)}, - coords={'lon': lon_coord, 'lat': lat_coord, - 'level': level_coord, 'time': time_coord}) - else: - speed = np.squeeze(data[:, 0, :, :]) - angle = np.squeeze(data[:, 1, :, :]) - # 原始数据文件中存储为: 西风为0度,南风为90度,东风为180度,北风为270度 - # 修改为气象风向常规定义: 北方为0度, 东风为90度, 南风为180度, 西方为270度 - angle = 270. - angle - angle[angle<0] = angle[angle<0] + 360. - if level[0] == 0: - speed = speed[:, np.newaxis, ...] - angle = angle[:, np.newaxis, ...] - data = xr.Dataset({ - 'speed': ( - ['number', 'time', 'lat', 'lon'], speed, speedattrs), - 'angle': ( - ['number', 'time', 'lat', 'lon'], angle, angleattrs)}, - coords={ - 'lon': lon_coord, 'lat': lat_coord, - 'number': number_coord, 'time': time_coord}) - else: - speed = speed[:, np.newaxis, np.newaxis, ...] - angle = angle[:, np.newaxis, np.newaxis, ...] - data = xr.Dataset({ - 'speed': ( - ['number', 'time', 'level', 'lat', 'lon'], - speed, speedattrs), - 'angle': ( - ['number', 'time', 'level', 'lat', 'lon'], - angle, angleattrs)}, - coords={ - 'lon': lon_coord, 'lat': lat_coord, 'level': level_coord, - 'number': number_coord, 'time': time_coord}) - - # add time coordinates - data.coords['forecast_reference_time'] = init_time[0] - data.coords['forecast_period'] = ('time', fhour, { - 'long_name':'forecast_period', 'units':'hour'}) - - # add attributes - data.attrs['Conventions'] = "CF-1.6" - data.attrs['Origin'] = 'MICAPS Cassandra DB' - - # sort latitude coordinates - data = data.loc[{'lat':sorted(data.coords['lat'].values)}] + data = parse_model_grid_bytearray( + byteArray, + varname=varname, + varattrs=varattrs, + scale_off=scale_off, + levattrs=levattrs, + origin='MICAPS Cassandra DB', + ) # cache data if cache: @@ -471,30 +296,15 @@ def get_model_grids(directory, filenames, allExists=True, pbar=False, **kargs): **kargs: key arguments passed to get_model_grid function. """ - dataset = [] - if pbar: - tqdm_filenames = tqdm(filenames, desc=directory + ": ") - else: - tqdm_filenames = filenames - - # get the file list for check - file_list = get_file_list(directory) - for filename in tqdm_filenames: - # check the file exists - if filename in file_list: - data = get_model_grid(directory, filename=filename, check_file_first=False, **kargs) - if data: - dataset.append(data) - else: - if allExists: - warnings.warn("{} doese not exists.".format(directory+'/'+filename)) - return None - else: - if allExists: - warnings.warn("{} doese not exists.".format(directory+'/'+filename)) - return None - - return xr.concat(dataset, dim='time') + return collect_model_grids( + directory, + filenames, + all_exists=allExists, + pbar=pbar, + get_file_list_func=get_file_list, + get_model_grid_func=get_model_grid, + **kargs + ) def get_model_points(directory, filenames, points, **kargs): @@ -517,10 +327,9 @@ def get_model_points(directory, filenames, points, **kargs): """ data = get_model_grids(directory, filenames, **kargs) - if data: + if data is not None: return data.interp(lon=('points', points['lon']), lat=('points', points['lat'])) - else: - return None + return None def get_model_3D_grid(directory, filename, levels, allExists=True, pbar=False, **kargs): @@ -542,25 +351,15 @@ def get_model_3D_grid(directory, filename, levels, allExists=True, pbar=False, * >>> data = get_model_3D_grid(directory, filename, levels) """ - dataset = [] - if pbar: - tqdm_levels = tqdm(levels, desc=directory+": ") - else: - tqdm_levels = levels - for level in tqdm_levels: - if directory[-1] == '/': - dataDir = directory + str(int(level)).strip() - else: - dataDir = directory + '/' + str(int(level)).strip() - data = get_model_grid(dataDir, filename=filename, **kargs) - if data: - dataset.append(data) - else: - if allExists: - warnings.warn("{} doese not exists.".format(dataDir+'/'+filename)) - return None - - return xr.concat(dataset, dim='level') + return collect_model_3d_grid( + directory, + filename, + levels, + all_exists=allExists, + pbar=pbar, + get_model_grid_func=get_model_grid, + **kargs + ) def get_model_3D_grids(directory, filenames, levels, allExists=True, pbar=True, **kargs): @@ -583,28 +382,15 @@ def get_model_3D_grids(directory, filenames, levels, allExists=True, pbar=True, >>> data = get_model_3D_grids(directory, filenames, levels) """ - dataset = [] - if pbar: - tqdm_filenames = tqdm(filenames, desc=directory+": ") - else: - tqdm_filenames = filenames - for filename in tqdm_filenames: - dataset_temp = [] - for level in levels: - if directory[-1] == '/': - dataDir = directory + str(int(level)).strip() - else: - dataDir = directory + '/' + str(int(level)).strip() - data = get_model_grid(dataDir, filename=filename, **kargs) - if data: - dataset_temp.append(data) - else: - if allExists: - warnings.warn("{} doese not exists.".format(dataDir+'/'+filename)) - return None - dataset.append(xr.concat(dataset_temp, dim='level')) - - return xr.concat(dataset, dim='time') + return collect_model_3d_grids( + directory, + filenames, + levels, + all_exists=allExists, + pbar=pbar, + get_model_grid_func=get_model_grid, + **kargs + ) def get_model_profiles(directory, filenames, levels, points, **kargs): @@ -627,10 +413,9 @@ def get_model_profiles(directory, filenames, levels, points, **kargs): """ data = get_model_3D_grids(directory, filenames, levels, **kargs) - if data: + if data is not None: return data.interp(lon=('points', points['lon']), lat=('points', points['lat'])) - else: - return None + return None def get_station_data(directory, filename=None, suffix="*.000", @@ -692,144 +477,7 @@ def get_station_data(directory, filename=None, suffix="*.000", print('There is no data ' + filename + ' in ' + directory) return None - # define head structure - head_dtype = [('discriminator', 'S4'), ('type', 'i2'), - ('description', 'S100'), - ('level', 'f4'), ('levelDescription', 'S50'), - ('year', 'i4'), ('month', 'i4'), ('day', 'i4'), - ('hour', 'i4'), ('minute', 'i4'), ('second', 'i4'), - ('Timezone', 'i4'), ('id_type', 'i2'), ('extent', 'S98')] - - # read head information - head_info = np.frombuffer(byteArray[0:288], dtype=head_dtype) - id_type = head_info['id_type'][0] - ind = 288 - - # read the number of stations - station_number = np.frombuffer( - byteArray[ind:(ind+4)], dtype='i4')[0] - ind += 4 - - # read the number of elements - element_number = np.frombuffer( - byteArray[ind:(ind+2)], dtype='i2')[0] - ind += 2 - - # construct record structure - element_type_map = { - 1: 'b1', 2: 'i2', 3: 'i4', 4: 'i8', 5: 'f4', 6: 'f8', 7: 'S'} - element_map = {} - for i in range(element_number): - element_id = str( - np.frombuffer(byteArray[ind:(ind+2)], dtype='i2')[0]) - ind += 2 - element_type = np.frombuffer( - byteArray[ind:(ind+2)], dtype='i2')[0] - ind += 2 - element_map[element_id] = element_type_map[element_type] - - # loop every station to retrieve record - # id_type=0 保持不变 - if id_type==0: - record_head_dtype = [ - ('ID', 'i4'), ('lon', 'f4'), ('lat', 'f4'), ('numb', 'i2')] - records = [] - for i in range(station_number): - record_head = np.frombuffer( - byteArray[ind:(ind+14)], dtype=record_head_dtype) - ind += 14 - record = { - 'ID': record_head['ID'][0], 'lon': record_head['lon'][0], - 'lat': record_head['lat'][0]} - for j in range(record_head['numb'][0]): # the record element number is not same, missing value is not included. - element_id = str( - np.frombuffer(byteArray[ind:(ind + 2)], dtype='i2')[0]) - ind += 2 - element_type = element_map[element_id] - if element_type == 'S': # if the element type is string, we need get the length of string - str_len = np.frombuffer(byteArray[ind:(ind + 2)], dtype='i2')[0] - ind += 2 - element_type = element_type + str(str_len) - element_len = int(element_type[1:]) - record[element_id] = np.frombuffer( - byteArray[ind:(ind + element_len)], - dtype=element_type)[0] - ind += element_len - records += [record] - else: # ==1 - record_head_dtype = [ - ('lon', 'f4'), ('lat', 'f4'), ('numb', 'i2')] - records = [] - for i in range(station_number): - # 原先为14个字节固定,现在改成前2字节定义接下来ID string长度 - ID_string_length = np.frombuffer(byteArray[ind:(ind + 2)], dtype="i2")[0] - record_ID = np.frombuffer(byteArray[ind + 2:(ind + 2 + ID_string_length)], - dtype="S" + str(ID_string_length))[0] - record_ID = record_ID.decode() - ind += (2 + ID_string_length) - record_head = np.frombuffer( - byteArray[ind:(ind+10)], dtype=record_head_dtype) - ind += 10 - record = { - 'ID': record_ID, 'lon': record_head['lon'][0], - 'lat': record_head['lat'][0]} - for j in range(record_head['numb'][0]): # the record element number is not same, missing value is not included. - element_id = str( - np.frombuffer(byteArray[ind:(ind + 2)], dtype='i2')[0]) - ind += 2 - element_type = element_map[element_id] - if element_type == 'S': # if the element type is string, we need get the length of string - str_len = np.frombuffer(byteArray[ind:(ind + 2)], dtype='i2')[0] - ind += 2 - element_type = element_type + str(str_len) - element_len = int(element_type[1:]) - record[element_id] = np.frombuffer( - byteArray[ind:(ind + element_len)], - dtype=element_type)[0] - ind += element_len - records += [record] - - # convert to pandas data frame - records = pd.DataFrame(records) - records.set_index('ID') - - # get time - time = datetime( - head_info['year'][0], head_info['month'][0], - head_info['day'][0], head_info['hour'][0], - head_info['minute'][0], head_info['second'][0]) - records['time'] = time - - # change column name for common observation - records.rename(columns={'3': 'Alt', '4': 'Grade', '5': 'Type', '21': 'Name', - '201': 'Wind_angle', '203': 'Wind_speed', '205': 'Wind_angle_1m_avg', '207': 'Wind_speed_1m_avg', - '209': 'Wind_angle_2m_avg', '211': 'Wind_speed_2m_avg', '213': 'Wind_angle_10m_avg', '215': 'Wind_speed_10m_avg', - '217': 'Wind_angle_max', '219': 'Wind_speed_max', '221': 'Wind_angle_instant', '223': 'Wind_speed_instant', - '225': 'Gust_angle', '227': 'Gust_speed', '229': 'Gust_angle_6h', '231': 'Gust_speed_6h', - '233': 'Gust_angle_12h', '235': 'Gust_speed_12h', '237': 'Wind_power', - '401': 'Sea_level_pressure', '403': 'Pressure_3h_trend', '405': 'Pressure_24h_trend', - '407': 'Station_pressure', '409': 'Pressure_max', '411': 'Pressure_min', '413': 'Pressure', - '415': 'Pressure_day_avg', '417': 'SLP_day_avg', '419': 'Hight', '421': 'Geopotential_hight', - '601': 'Temp', '603': 'Temp_max', '605': 'Temp_min', '607': 'Temp_24h_trend', - '609': 'Temp_24h_max', '611':'Temp_24h_min', '613': 'Temp_dav_avg', - '801': 'Dewpoint', '803': 'Dewpoint_depression', '805': 'Relative_humidity', - '807': 'Relative_humidity_min', '809': 'Relative_humidity_day_avg', - '811': 'Water_vapor_pressure', '813': 'Water_vapor_pressure_day_avg', - '1001': 'Rain', '1003': 'Rain_1h', '1005': 'Rain_3h', '1007': 'Rain_6h', - '1009': 'Rain_12h', '1011': 'Rain_24h', '1013': 'Rain_day', - '1015': 'Rain_20-08', '1017': 'Rain_08-20', '1019': 'Rain_20-20', '1021': 'Rain_08-08', - '1023': 'Evaporation', '1025': 'Evaporation_large', '1027': 'Precipitable_water', - '1201': 'Vis_1min', '1203': 'Vis_10min', '1205': 'Vis_min', '1207': 'Vis_manual', - '1401': 'Total_cloud_cover', '1403': 'Low_cloud_cover', '1405': 'Cloud_base_hight', - '1407': 'Low_cloud', '1409': 'Middle_cloud', '1411': 'High_cloud', - '1413': 'TCC_day_avg', '1415': 'LCC_day_avg', '1417': 'Cloud_cover', '1419': 'Cloud_type', - '1601': 'Weather_current', '1603': 'Weather_past_1', '1605': 'Weather_past_2', - '2001': 'Surface_temp', '2003': 'Surface_temp_max', '2005': 'Surface_temp_min'}, - inplace=True) - - # drop all NaN columns - if dropna: - records = records.dropna(axis=1, how='all') + records = parse_station_data_bytearray(byteArray, dropna=dropna) # cache records if cache: @@ -856,21 +504,14 @@ def get_station_dataset(directory, filenames, allExists=True, pbar=False, **karg **kargs: key arguments passed to get_fy_awx function. """ - dataset = [] - if pbar: - tqdm_filenames = tqdm(filenames, desc=directory + ": ") - else: - tqdm_filenames = filenames - for filename in tqdm_filenames: - data = get_station_data(directory, filename=filename, **kargs) - if data: - dataset.append(data) - else: - if allExists: - warnings.warn("{} doese not exists.".format(directory+'/'+filename)) - return None - - return pd.concat(dataset) + return collect_station_dataset( + directory, + filenames, + all_exists=allExists, + pbar=pbar, + get_station_data_func=get_station_data, + **kargs + ) def get_fy_awx(directory, filename=None, suffix="*.AWX", units='', cache=True, cache_clear=True): @@ -956,21 +597,15 @@ def get_fy_awxs(directory, filenames, allExists=True, pbar=False, **kargs): **kargs: key arguments passed to get_fy_awx function. """ - dataset = [] - if pbar: - tqdm_filenames = tqdm(filenames, desc=directory + ": ") - else: - tqdm_filenames = filenames - for filename in tqdm_filenames: - data = get_fy_awx(directory, filename=filename, **kargs) - if data: - dataset.append(data) - else: - if allExists: - warnings.warn("{} doese not exists.".format(directory+'/'+filename)) - return None - - return xr.concat(dataset, dim='time') + return collect_xarray_dataset( + directory, + filenames, + all_exists=allExists, + pbar=pbar, + get_data_func=get_fy_awx, + concat_dim='time', + **kargs + ) def _lzw_decompress(compressed): @@ -978,26 +613,7 @@ def _lzw_decompress(compressed): refer to https://stackoverflow.com/questions/6834388/basic-lzw-compression-help-in-python. """ - # Build the dictionary. - dict_size = 256 - dictionary = {chr(i): chr(i) for i in range(dict_size)} - - w = result = compressed.pop(0) - for k in compressed: - if k in dictionary: - entry = dictionary[k] - elif k == dict_size: - entry = w + w[0] - else: - raise ValueError('Bad compressed k: %s' % k) - result += entry - - # Add w+entry[0] to the dictionary. - dictionary[dict_size] = w + entry[0] - dict_size += 1 - - w = entry - return result + return lzw_decompress(compressed) def get_radar_mosaic(directory, filename=None, suffix="*.BIN", cache=True, cache_clear=True): @@ -1058,214 +674,12 @@ def get_radar_mosaic(directory, filename=None, suffix="*.BIN", cache=True, cache print('There is no data ' + filename + ' in ' + directory) return None - # check data format version - if byteArray[0:3] == b'MOC': - # the new V3 format - head_dtype = [ - ('label', 'S4'), # 文件固定标识:MOC - ('Version', 'S4'), # 文件格式版本代码,如: 1.0,1.1,etc - ('FileBytes', 'i4'), # 包含头信息在内的文件字节数,不超过2M - ('MosaicID', 'i2'), # 拼图产品编号 - ('coordinate', 'i2'), # 坐标类型: 2=笛卡儿坐标,3=等经纬网格坐标 - ('varname', 'S8'), # 产品代码,如: ET,VIL,CR,CAP,OHP,OHPC - ('description', 'S64'), # 产品描述,如Composite Reflectivity mosaic - ('BlockPos', 'i4'), # 产品数据起始位置(字节顺序) - ('BlockLen', 'i4'), # 产品数据字节数 - - ('TimeZone', 'i4'), # 数据时钟,0=世界时,28800=北京时 - ('yr', 'i2'), # 观测时间中的年份 - ('mon', 'i2'), # 观测时间中的月份(1-12) - ('day', 'i2'), # 观测时间中的日期(1-31) - ('hr', 'i2'), # 观测时间中的小时(00-23) - ('min', 'i2'), # 观测时间中的分(00-59) - ('sec', 'i2'), # 观测时间中的秒(00-59) - ('ObsSeconds', 'i4'), # 观测时间的seconds - ('ObsDates', 'u2'), # 观测时间中的Julian dates - ('GenDates', 'u2'), # 产品处理时间的天数 - ('GenSeconds', 'i4'), # 产品处理时间的描述 - - ('edge_s', 'i4'), # 数据区的南边界,单位:1/1000度,放大1千倍 - ('edge_w', 'i4'), # 数据区的西边界,单位:1/1000度,放大1千倍 - ('edge_n', 'i4'), # 数据区的北边界,单位:1/1000度,放大1千倍 - ('edge_e', 'i4'), # 数据区的东边界,单位:1/1000度,放大1千倍 - ('cx', 'i4'), # 数据区中心坐标,单位:1/1000度,放大1千倍 - ('cy', 'i4'), # 数据区中心坐标,单位:1/1000度,放大1千倍 - ('nX', 'i4'), # 格点坐标为列数 - ('nY', 'i4'), # 格点坐标为行数 - ('dx', 'i4'), # 格点坐标为列分辨率,单位:1/10000度,放大1万倍 - ('dy', 'i4'), # 格点坐标为行分辨率,单位:1/10000度,放大1万倍 - ('height', 'i2'), # 雷达高度 - ('Compress', 'i2'), # 数据压缩标识, 0=无,1=bz2,2=zip,3=lzw - ('num_of_radars', 'i4'), # 有多少个雷达进行了拼图 - ('UnZipBytes', 'i4'), # 数据段压缩前的字节数 - ('scale', 'i2'), - ('unUsed', 'i2'), - ('RgnID', 'S8'), - ('units', 'S8'), - ('reserved', 'S60') - ] - - # read head information - head_info = np.frombuffer(byteArray[0:256], dtype=head_dtype) - ind = 256 - - # get data information - varname = head_info['varname'][0] - longname = head_info['description'][0] - units = head_info['units'][0] - - # define data variable - rows = head_info['nY'][0] - cols = head_info['nX'][0] - dlat = head_info['dx'][0]/10000. - dlon = head_info['dy'][0]/10000. - - # decompress byte Array - # 目前主要支持bz2压缩格式, zip, lzw格式还没有测试过 - if head_info['Compress'] == 0: # 无压缩 - data = np.frombuffer(byteArray[ind:], 'i2') - elif head_info['Compress'] == 1: # - data = np.frombuffer(bz2.decompress(byteArray[ind:]), 'i2') - elif head_info['Compress'] == 2: - data = np.frombuffer(zlib.decompress(byteArray[ind:]), 'i2') - elif head_info['Compress'] == 3: - data = np.frombuffer(_lzw_decompress(byteArray[ind:]), 'i2') - else: - print('Can not decompress data.') - return None - - # reshape data - data.shape = (rows, cols) - - # deal missing data and restore values - data = data.astype(np.float32) - data[data < 0] = np.nan - data /= head_info['scale'][0] - - # set longitude and latitude coordinates - lat = head_info['edge_n'][0]/1000. - np.arange(rows)*dlat - dlat/2.0 - lon = head_info['edge_w'][0]/1000. + np.arange(cols)*dlon - dlon/2.0 - - # reverse latitude axis - data = np.flip(data, 0) - lat = lat[::-1] - - # set time coordinates - # 直接使用时间有问题, 需要天数减去1, 秒数加上28800(8小时) - time = datetime(head_info['yr'][0], head_info['mon'][0], head_info['day'][0], - head_info['hr'][0], head_info['min'][0], head_info['sec'][0]) - time = np.array([time], dtype='datetime64[m]') - data = np.expand_dims(data, axis=0) - - else: - # the old LONLAT format. - # define head structure - head_dtype = [ - ('description', 'S128'), - # product name, QREF=基本反射率, CREF=组合反射率, - # VIL=液态水含量, OHP=一小时降水 - ('name', 'S32'), - ('organization', 'S16'), - ('grid_flag', 'u2'), # 经纬网格数据标识,固定值19532 - ('data_byte', 'i2'), # 数据单元字节数,固定值2 - ('slat', 'f4'), # 数据区的南纬(度) - ('wlon', 'f4'), # 数据区的西经(度) - ('nlat', 'f4'), # 数据区的北纬(度) - ('elon', 'f4'), # 数据区的东经(度) - ('clat', 'f4'), # 数据区中心纬度(度) - ('clon', 'f4'), # 数据区中心经度(度) - ('rows', 'i4'), # 数据区的行数 - ('cols', 'i4'), # 每行数据的列数 - ('dlat', 'f4'), # 纬向分辨率(度) - ('dlon', 'f4'), # 经向分辨率(度) - ('nodata', 'f4'), # 无数据区的编码值 - ('levelbybtes', 'i4'), # 单层数据字节数 - ('levelnum', 'i2'), # 数据层个数 - ('amp', 'i2'), # 数值放大系数 - ('compmode', 'i2'), # 数据压缩存储时为1,否则为0 - ('dates', 'u2'), # 数据观测时间,为1970年1月1日以来的天数 - ('seconds', 'i4'), # 数据观测时间的秒数 - ('min_value', 'i2'), # 放大后的数据最小取值 - ('max_value', 'i2'), # 放大后的数据最大取值 - ('reserved', 'i2', 6) # 保留字节 - ] - - # read head information - head_info = np.frombuffer(byteArray[0:256], dtype=head_dtype) - ind = 256 - - # get data information - varname = head_info['name'][0].decode("utf-8", 'ignore').rsplit('\x00')[0] - longname = {'CREF': 'Composite Reflectivity', 'QREF': 'Basic Reflectivity', - 'VIL': 'Vertically Integrated Liquid', 'OHP': 'One Hour Precipitation'} - longname = longname.get(varname, 'radar mosaic') - units = head_info['organization'][0].decode("utf-8", 'ignore').rsplit('\x00')[0] - amp = head_info['amp'][0] - - # define data variable - rows = head_info['rows'][0] - cols = head_info['cols'][0] - dlat = head_info['dlat'][0] - dlon = head_info['dlon'][0] - data = np.full(rows*cols, -9999, dtype=np.int32) - - # put data into array - while ind < len(byteArray): - irow = np.frombuffer(byteArray[ind:(ind + 2)], dtype='i2')[0] - ind += 2 - icol = np.frombuffer(byteArray[ind:(ind + 2)], dtype='i2')[0] - ind += 2 - if irow == -1 or icol == -1: - break - nrec = np.frombuffer(byteArray[ind:(ind + 2)], dtype='i2')[0] - ind += 2 - recd = np.frombuffer( - byteArray[ind:(ind + 2*nrec)], dtype='i2', count=nrec) - ind += 2*nrec - position = (irow-1)*cols+icol-1 - data[position:(position+nrec)] = recd - - # reshape data - data.shape = (rows, cols) - - # deal missing data and restore values - data = data.astype(np.float32) - data[data < 0] = np.nan - data /= amp - - # set longitude and latitude coordinates - lat = head_info['nlat'][0] - np.arange(rows)*dlat - dlat/2.0 - lon = head_info['wlon'][0] + np.arange(cols)*dlon - dlon/2.0 - - # reverse latitude axis - data = np.flip(data, 0) - lat = lat[::-1] - - # set time coordinates - # 直接使用时间有问题, 需要天数减去1, 秒数加上28800(8小时) - time = datetime(1970, 1, 1) + timedelta( - days=head_info['dates'][0].astype(np.float64)-1, - seconds=head_info['seconds'][0].astype(np.float64)+28800) - time = np.array([time], dtype='datetime64[m]') - data = np.expand_dims(data, axis=0) - - # define coordinates - time_coord = ('time', time) - lon_coord = ('lon', lon, { - 'long_name':'longitude', 'units':'degrees_east', - '_CoordinateAxisType':'Lon', "axis": "X"}) - lat_coord = ('lat', lat, { - 'long_name':'latitude', 'units':'degrees_north', - '_CoordinateAxisType':'Lat', "axis": "Y"}) - - # create xarray - varattrs = {'long_name': longname, 'short_name': varname, 'units': units} - data = xr.Dataset({'data':(['time', 'lat', 'lon'], data, varattrs)}, - coords={'time':time_coord, 'lat':lat_coord, 'lon':lon_coord}) - - # add attributes - data.attrs['Conventions'] = "CF-1.6" - data.attrs['Origin'] = 'MICAPS Cassandra DB' + data = parse_radar_mosaic_bytearray( + byteArray, origin='MICAPS Cassandra DB' + ) + if data is None: + print('Can not decompress data.') + return None # cache data if cache: @@ -1292,21 +706,15 @@ def get_radar_mosaics(directory, filenames, allExists=True, pbar=False, **kargs) **kargs: key arguments passed to get_fy_awx function. """ - dataset = [] - if pbar: - tqdm_filenames = tqdm(filenames, desc=directory + ": ") - else: - tqdm_filenames = filenames - for filename in tqdm_filenames: - data = get_radar_mosaic(directory, filename=filename, **kargs) - if data: - dataset.append(data) - else: - if allExists: - warnings.warn("{} doese not exists.".format(directory+'/'+filename)) - return None - - return xr.concat(dataset, dim='time') + return collect_xarray_dataset( + directory, + filenames, + all_exists=allExists, + pbar=pbar, + get_data_func=get_radar_mosaic, + concat_dim='time', + **kargs + ) def get_tlogp(directory, filename=None, suffix="*.000", @@ -1366,66 +774,14 @@ def get_tlogp(directory, filename=None, suffix="*.000", print('There is no data ' + filename + ' in ' + directory) return None - # decode bytes to string - txt = byteArray.decode("utf-8") - txt = list(filter(None, re.split(' |\n', txt))) - - # observation date and time - if len(txt[3]) < 4: - year = int(txt[3]) + 2000 - else: - year = int(txt[3]) - month = int(txt[4]) - day = int(txt[5]) - hour = int(txt[6]) - time = datetime(year, month, day, hour) - - # the number of records - number = int(txt[7]) - if number < 1: + records = parse_tlogp_bytearray( + byteArray, + remove_duplicate=remove_duplicate, + remove_na=remove_na + ) + if records is None: return None - # cut the data - txt = txt[8:] - - # put the data into dictionary - index = 0 - records = [] - while index < len(txt): - # get the record information - ID = txt[index].strip() - lon = float(txt[index+1]) - lat = float(txt[index+2]) - alt = float(txt[index+3]) - number = int(int(txt[index+4])/6) - index += 5 - - # get the sounding records - for i in range(number): - record = { - 'ID': ID, 'lon': lon, 'lat': lat, 'alt': alt, - 'time': time, - 'p': float(txt[index]), 'h': float(txt[index+1]), - 't': float(txt[index+2]), 'td': float(txt[index+3]), - 'wd': float(txt[index+4]), - 'ws': float(txt[index+5])} - records.append(record) - index += 6 - - # transform to pandas data frame - records = pd.DataFrame(records) - records.set_index('ID') - - # dealing missing values - records = records.replace(9999.0, np.nan) - if remove_duplicate: - records = records.drop_duplicates() - if remove_na: - records = records.dropna(subset=['p', 'h', 't', 'td']) - - # the sounding height value convert to meters by multiple 10 - records['h'] = records['h'] * 10.0 - # cache data if cache: with open(cache_file, 'wb') as f: @@ -1451,21 +807,14 @@ def get_tlogps(directory, filenames, allExists=True, pbar=False, **kargs): **kargs: key arguments passed to get_fy_awx function. """ - dataset = [] - if pbar: - tqdm_filenames = tqdm(filenames, desc=directory + ": ") - else: - tqdm_filenames = filenames - for filename in tqdm_filenames: - data = get_tlogp(directory, filename=filename, **kargs) - if data: - dataset.append(data) - else: - if allExists: - warnings.warn("{} doese not exists.".format(directory+'/'+filename)) - return None - - return pd.concat(dataset) + return collect_station_dataset( + directory, + filenames, + all_exists=allExists, + pbar=pbar, + get_station_data_func=get_tlogp, + **kargs + ) def get_swan_radar(directory, filename=None, suffix="*.000", scale=[0.1, 0], @@ -1529,106 +878,14 @@ def get_swan_radar(directory, filename=None, suffix="*.000", scale=[0.1, 0], return None if filename.endswith(('.BZ2','.bz2')): byteArray = bz2.decompress(byteArray) - # define head structure - head_dtype = [ - ('ZonName', 'S12'), - ('DataName', 'S38'), - ('Flag', 'S8'), - ('Version', 'S8'), - ('year', 'i2'), - ('month', 'i2'), - ('day', 'i2'), - ('hour', 'i2'), - ('minute', 'i2'), - ('interval', 'i2'), - ('XNumGrids', 'i2'), - ('YNumGrids', 'i2'), - ('ZNumGrids', 'i2'), - ('RadarCount', 'i4'), - ('StartLon', 'f4'), - ('StartLat', 'f4'), - ('CenterLon', 'f4'), - ('CenterLat', 'f4'), - ('XReso', 'f4'), - ('YReso', 'f4'), - ('ZhighGrids', 'f4', 40), - ('RadarStationName', 'S20', 16), - ('RadarLongitude', 'f4', 20), - ('RadarLatitude', 'f4', 20), - ('RadarAltitude', 'f4', 20), - ('MosaicFlag', 'S1', 20), - ('m_iDataType', 'i2'), - ('m_iLevelDimension', 'i2'), - ('Reserved', 'S168')] - - # read head information - head_info = np.frombuffer(byteArray[0:1024], dtype=head_dtype) - ind = 1024 - - # get coordinates - nlon = head_info['XNumGrids'][0].astype(np.int64) - nlat = head_info['YNumGrids'][0].astype(np.int64) - nlev = head_info['ZNumGrids'][0].astype(np.int64) - dlon = head_info['XReso'][0].astype(np.float64) - dlat = head_info['YReso'][0].astype(np.float64) - lat = head_info['StartLat'][0] - np.arange(nlat)*dlat - dlat/2.0 - lon = head_info['StartLon'][0] + np.arange(nlon)*dlon - dlon/2.0 - level = head_info['ZhighGrids'][0][0:nlev] - - # retrieve data records - data_type = ['u1', 'u1', 'u2', 'i2'] - data_type = data_type[head_info['m_iDataType'][0]] - data_len = (nlon * nlat * nlev) - data = np.frombuffer( - byteArray[ind:(ind + data_len*int(data_type[1]))], - dtype=data_type, count=data_len) - - # convert data type - data.shape = (nlev, nlat, nlon) - data = data.astype(np.float32) - data = (data + scale[1]) * scale[0] - - # reverse latitude axis - data = np.flip(data, 1) - lat = lat[::-1] - - # set time coordinates - init_time = datetime( - head_info['year'][0], head_info['month'][0], - head_info['day'][0], head_info['hour'][0], head_info['minute'][0]) - if attach_forecast_period: - fhour = int(filename.split('.')[1])/60.0 - else: - fhour = 0 - fhour = np.array([fhour], dtype=np.float64) - time = init_time + timedelta(hours=fhour[0]) - init_time = np.array([init_time], dtype='datetime64[ms]') - time = np.array([time], dtype='datetime64[ms]') - - # define coordinates - time_coord = ('time', time) - lon_coord = ('lon', lon, { - 'long_name':'longitude', 'units':'degrees_east', - '_CoordinateAxisType':'Lon', "axis": "X"}) - lat_coord = ('lat', lat, { - 'long_name':'latitude', 'units':'degrees_north', - '_CoordinateAxisType':'Lat', "axis": "Y"}) - level_coord = ('level', level, { - 'long_name':'height', 'units':'m'}) - - # create xarray - data = np.expand_dims(data, axis=0) - data = xr.Dataset({'data':(['time', 'level', 'lat', 'lon'], data, varattrs)}, - coords={'time':time_coord, 'level':level_coord, 'lat':lat_coord, 'lon':lon_coord}) - - # add time coordinates - data.coords['forecast_reference_time'] = init_time[0] - data.coords['forecast_period'] = ('time', fhour, { - 'long_name':'forecast_period', 'units':'hour'}) - - # add attributes - data.attrs['Conventions'] = "CF-1.6" - data.attrs['Origin'] = 'MICAPS Cassandra DB' + data = parse_swan_radar_bytearray( + byteArray, + filename=filename, + scale=scale, + varattrs=varattrs, + attach_forecast_period=attach_forecast_period, + origin='MICAPS Cassandra DB' + ) # cache data if cache: @@ -1655,21 +912,15 @@ def get_swan_radars(directory, filenames, allExists=True, pbar=False, **kargs): **kargs: key arguments passed to get_fy_awx function. """ - dataset = [] - if pbar: - tqdm_filenames = tqdm(filenames, desc=directory + ": ") - else: - tqdm_filenames = filenames - for filename in tqdm_filenames: - data = get_swan_radar(directory, filename=filename, **kargs) - if data: - dataset.append(data) - else: - if allExists: - warnings.warn("{} doese not exists.".format(directory+'/'+filename)) - return None - - return xr.concat(dataset, dim='time') + return collect_xarray_dataset( + directory, + filenames, + all_exists=allExists, + pbar=pbar, + get_data_func=get_swan_radar, + concat_dim='time', + **kargs + ) def get_radar_standard(directory, filename=None, suffix="*.BZ2", cache=True, cache_clear=True): diff --git a/nmc_met_io/retrieve_cimiss_server.py b/nmc_met_io/retrieve_cimiss_server.py index b79af12..4930f4c 100644 --- a/nmc_met_io/retrieve_cimiss_server.py +++ b/nmc_met_io/retrieve_cimiss_server.py @@ -20,10 +20,11 @@ import numpy as np import pandas as pd import urllib3 -import xarray as xr -from tqdm import tqdm - -import nmc_met_io.config as CONFIG +import xarray as xr +from tqdm import tqdm + +import nmc_met_io.config as CONFIG +from nmc_met_io.retrieve_shared import extract_nafp_grid_metadata def get_http_result(interface_id, params, data_format='json'): @@ -1369,29 +1370,9 @@ def cimiss_model_grid(data_code, init_time_str, valid_time, fcst_ele, fcst_level if contents['returnCode'] != '0': return None - # get time information - init_time = datetime.strptime(init_time_str, '%Y%m%d%H') - fhour = np.array([valid_time], dtype=np.float64) - time = init_time + timedelta(hours=fhour[0]) - init_time = np.array([init_time], dtype='datetime64[ms]') - time = np.array([time], dtype='datetime64[ms]') - - # extract coordinates and data - start_lat = float(contents['startLat']) - start_lon = float(contents['startLon']) - nlon = int(contents['lonCount']) - nlat = int(contents['latCount']) - dlon = float(contents['lonStep']) - dlat = float(contents['latStep']) - lon = start_lon + np.arange(nlon)*dlon - lat = start_lat + np.arange(nlat)*dlat - name = contents['fieldNames'] - if units is None: - units = contents['fieldUnits'] - - # set missing fcst_level for fcst_leve='-' - if isinstance(fcst_level, str): - fcst_level = 0 + init_time, fhour, time, lon, lat, name, units, fcst_level = extract_nafp_grid_metadata( + contents, init_time_str, valid_time, fcst_level, units + ) # define coordinates and variables time_coord = ('time', time) diff --git a/nmc_met_io/retrieve_cmadaas.py b/nmc_met_io/retrieve_cmadaas.py index bd7f9ea..1e278eb 100644 --- a/nmc_met_io/retrieve_cmadaas.py +++ b/nmc_met_io/retrieve_cmadaas.py @@ -29,6 +29,7 @@ from tqdm import tqdm import nmc_met_io.config as CONFIG +from nmc_met_io.retrieve_shared import extract_nafp_grid_metadata def get_rest_result(interface_id, params, url_only=False, @@ -2750,29 +2751,9 @@ def cmadaas_model_grid(data_code, init_time, valid_time, fcst_ele, fcst_level, l if contents is None: return None - # get time information - init_time = datetime.strptime(init_time_str, '%Y%m%d%H') - fhour = np.array([valid_time], dtype=np.float64) - time = init_time + timedelta(hours=fhour[0]) - init_time = np.array([init_time], dtype='datetime64[ms]') - time = np.array([time], dtype='datetime64[ms]') - - # extract coordinates and data - start_lat = float(contents['startLat']) - start_lon = float(contents['startLon']) - nlon = int(contents['lonCount']) - nlat = int(contents['latCount']) - dlon = float(contents['lonStep']) - dlat = float(contents['latStep']) - lon = start_lon + np.arange(nlon)*dlon - lat = start_lat + np.arange(nlat)*dlat - name = contents['fieldNames'] - if units is None: - units = contents['fieldUnits'] - - # set missing fcst_level for like fcst_leve='-' - if isinstance(fcst_level, str): - fcst_level = 0 + init_time, fhour, time, lon, lat, name, units, fcst_level = extract_nafp_grid_metadata( + contents, init_time_str, valid_time, fcst_level, units + ) # define coordinates and variables time_coord = ('time', time) @@ -3133,29 +3114,9 @@ def cmadaas_model_by_time(init_time, valid_time=0, limit=None, fcst_member=None, if contents is None: return None - # get time information - init_time = datetime.strptime(init_time_str, '%Y%m%d%H') - fhour = np.array([valid_time], dtype=np.float64) - time = init_time + timedelta(hours=fhour[0]) - init_time = np.array([init_time], dtype='datetime64[ms]') - time = np.array([time], dtype='datetime64[ms]') - - # extract coordinates and data - start_lat = float(contents['startLat']) - start_lon = float(contents['startLon']) - nlon = int(contents['lonCount']) - nlat = int(contents['latCount']) - dlon = float(contents['lonStep']) - dlat = float(contents['latStep']) - lon = start_lon + np.arange(nlon)*dlon - lat = start_lat + np.arange(nlat)*dlat - name = contents['fieldNames'] - if units is None: - units = contents['fieldUnits'] - - # set missing fcst_level for fcst_leve='-' - if isinstance(fcst_level, str): - fcst_level = 0 + init_time, fhour, time, lon, lat, name, units, fcst_level = extract_nafp_grid_metadata( + contents, init_time_str, valid_time, fcst_level, units + ) # define coordinates and variables time_coord = ('time', time) @@ -3503,4 +3464,3 @@ def cmadaas_get_model_file_with_filter( out_files.append(out_file) return out_files - diff --git a/nmc_met_io/retrieve_micaps_server.py b/nmc_met_io/retrieve_micaps_server.py index f0d683f..ed6938f 100644 --- a/nmc_met_io/retrieve_micaps_server.py +++ b/nmc_met_io/retrieve_micaps_server.py @@ -26,10 +26,23 @@ import xarray as xr from tqdm import tqdm -import nmc_met_io.config as CONFIG -from nmc_met_io import DataBlock_pb2 -from nmc_met_io.read_radar import StandardData -from nmc_met_io.read_satellite import resolve_awx_bytearray +import nmc_met_io.config as CONFIG +from nmc_met_io import DataBlock_pb2 +from nmc_met_io.read_radar import StandardData +from nmc_met_io.read_satellite import resolve_awx_bytearray +from nmc_met_io.retrieve_shared import ( + collect_model_3d_grid, + collect_model_3d_grids, + collect_model_grids, + collect_station_dataset, + collect_xarray_dataset, + lzw_decompress, + parse_model_grid_bytearray, + parse_radar_mosaic_bytearray, + parse_station_data_bytearray, + parse_swan_radar_bytearray, + parse_tlogp_bytearray, +) def get_http_result(host, port, url): @@ -244,217 +257,29 @@ def get_model_grid(directory, filename=None, suffix="*.024", return None ByteArrayResult = DataBlock_pb2.ByteArrayResult() - if status == 200: - ByteArrayResult.ParseFromString(response) - if ByteArrayResult.errorCode == 1: - return None - if ByteArrayResult is not None: - byteArray = ByteArrayResult.byteArray - if byteArray == '': - print('There is no data ' + filename + ' in ' + directory) - return None - - # define head information structure (278 bytes) - head_dtype = [('discriminator', 'S4'), ('type', 'i2'), - ('modelName', 'S20'), ('element', 'S50'), - ('description', 'S30'), ('level', 'f4'), - ('year', 'i4'), ('month', 'i4'), ('day', 'i4'), - ('hour', 'i4'), ('timezone', 'i4'), - ('period', 'i4'), ('startLongitude', 'f4'), - ('endLongitude', 'f4'), ('longitudeGridSpace', 'f4'), - ('longitudeGridNumber', 'i4'), - ('startLatitude', 'f4'), ('endLatitude', 'f4'), - ('latitudeGridSpace', 'f4'), - ('latitudeGridNumber', 'i4'), - ('isolineStartValue', 'f4'), - ('isolineEndValue', 'f4'), - ('isolineSpace', 'f4'), - ('perturbationNumber', 'i2'), - ('ensembleTotalNumber', 'i2'), - ('minute', 'i2'), ('second', 'i2'), - ('Extent', 'S92')] - - # read head information - head_info = np.frombuffer(byteArray[0:278], dtype=head_dtype) - - # get required grid information - data_type = head_info['type'][0] - nlon = head_info['longitudeGridNumber'][0] - nlat = head_info['latitudeGridNumber'][0] - nmem = head_info['ensembleTotalNumber'][0] - - # define data structure - if data_type == 4: - data_dtype = [('data', 'f4', (nlat, nlon))] - data_len = nlat * nlon * 4 - elif data_type == 11: - data_dtype = [('data', 'f4', (2, nlat, nlon))] - data_len = 2 * nlat * nlon * 4 - else: - raise Exception("Data type is not supported") - - # read data - if nmem == 0: - data = np.frombuffer(byteArray[278:], dtype=data_dtype) - data = np.squeeze(data['data']) - else: - if data_type == 4: - data = np.full((nmem, nlat, nlon), np.nan) - else: - data = np.full((nmem, 2, nlat, nlon), np.nan) - ind = 0 - for _ in range(nmem): - head_info_mem = np.frombuffer( - byteArray[ind:(ind+278)], dtype=head_dtype) - ind += 278 - data_mem = np.frombuffer( - byteArray[ind:(ind+data_len)], dtype=data_dtype) - ind += data_len - number = head_info_mem['perturbationNumber'][0] - if data_type == 4: - data[number, :, :] = np.squeeze(data_mem['data']) - else: - data[number, :, :, :] = np.squeeze(data_mem['data']) - - # scale and offset the data, if necessary. - if scale_off is not None: - data = data * scale_off[0] + scale_off[1] - - # construct longitude and latitude coordinates - slon = head_info['startLongitude'][0] - dlon = head_info['longitudeGridSpace'][0] - slat = head_info['startLatitude'][0] - dlat = head_info['latitudeGridSpace'][0] - lon = np.arange(nlon) * dlon + slon - lat = np.arange(nlat) * dlat + slat - level = np.array([head_info['level'][0]]) - - # construct initial time and forecast hour - init_time = datetime(head_info['year'][0], head_info['month'][0], - head_info['day'][0], head_info['hour'][0]) - fhour = np.array([head_info['period'][0]], dtype=np.float64) - time = init_time + timedelta(hours=fhour[0]) - init_time = np.array([init_time], dtype='datetime64[ms]') - time = np.array([time], dtype='datetime64[ms]') - - # define coordinates - time_coord = ('time', time) - lon_coord = ('lon', lon, { - 'long_name':'longitude', 'units':'degrees_east', - '_CoordinateAxisType':'Lon', "axis": "X"}) - lat_coord = ('lat', lat, { - 'long_name':'latitude', 'units':'degrees_north', - '_CoordinateAxisType':'Lat', 'axis': "Y"}) - if level[0] != 0: - level_coord = ('level', level, levattrs) - if nmem != 0: - number = np.arange(nmem) - number_coord = ('number', number, {'_CoordinateAxisType':'Ensemble'}) - - # create to xarray - if data_type == 4: - if nmem == 0: - if level[0] == 0: - data = data[np.newaxis, ...] - data = xr.Dataset({ - varname:(['time', 'lat', 'lon'], data, varattrs)}, - coords={ - 'time':time_coord, 'lat':lat_coord, 'lon':lon_coord}) - else: - data = data[np.newaxis, np.newaxis, ...] - data = xr.Dataset({ - varname:(['time', 'level', 'lat', 'lon'], data, varattrs)}, - coords={ - 'time':time_coord, 'level':level_coord, - 'lat':lat_coord, 'lon':lon_coord}) - else: - if level[0] == 0: - data = data[:, np.newaxis, ...] - data = xr.Dataset({ - varname:(['number', 'time', 'lat', 'lon'], data, varattrs)}, - coords={ - 'number':number_coord, 'time':time_coord, - 'lat':lat_coord, 'lon':lon_coord}) - else: - data = data[:, np.newaxis, np.newaxis, ...] - data = xr.Dataset({ - varname:(['number', 'time', 'level', 'lat', 'lon'], data, varattrs)}, - coords={ - 'number':number_coord, 'time':time_coord, 'level':level_coord, - 'lat':lat_coord, 'lon':lon_coord}) - elif data_type == 11: - speedattrs = {'long_name':'wind speed', 'units':'m/s'} - angleattrs = {'long_name':'wind angle', 'units':'degree'} - if nmem == 0: - speed = np.squeeze(data[0, :, :]) - angle = np.squeeze(data[1, :, :]) - # 原始数据文件中存储为: 西风为0度,南风为90度,东风为180度,北风为270度 - # 修改为气象风向常规定义: 北方为0度, 东风为90度, 南风为180度, 西方为270度 - angle = 270. - angle - angle[angle<0] = angle[angle<0] + 360. - if level[0] == 0: - speed = speed[np.newaxis, ...] - angle = angle[np.newaxis, ...] - data = xr.Dataset({ - 'speed': (['time', 'lat', 'lon'], speed, speedattrs), - 'angle': (['time', 'lat', 'lon'], angle, angleattrs)}, - coords={'lon': lon_coord, 'lat': lat_coord, 'time': time_coord}) - else: - speed = speed[np.newaxis, np.newaxis, ...] - angle = angle[np.newaxis, np.newaxis, ...] - data = xr.Dataset({ - 'speed': (['time', 'level', 'lat', 'lon'], speed, speedattrs), - 'angle': (['time', 'level', 'lat', 'lon'], angle, angleattrs)}, - coords={'lon': lon_coord, 'lat': lat_coord, - 'level': level_coord, 'time': time_coord}) - else: - speed = np.squeeze(data[:, 0, :, :]) - angle = np.squeeze(data[:, 1, :, :]) - # 原始数据文件中存储为: 西风为0度,南风为90度,东风为180度,北风为270度 - # 修改为气象风向常规定义: 北方为0度, 东风为90度, 南风为180度, 西方为270度 - angle = 270. - angle - angle[angle<0] = angle[angle<0] + 360. - if level[0] == 0: - speed = speed[:, np.newaxis, ...] - angle = angle[:, np.newaxis, ...] - data = xr.Dataset({ - 'speed': ( - ['number', 'time', 'lat', 'lon'], speed, speedattrs), - 'angle': ( - ['number', 'time', 'lat', 'lon'], angle, angleattrs)}, - coords={ - 'lon': lon_coord, 'lat': lat_coord, - 'number': number_coord, 'time': time_coord}) - else: - speed = speed[:, np.newaxis, np.newaxis, ...] - angle = angle[:, np.newaxis, np.newaxis, ...] - data = xr.Dataset({ - 'speed': ( - ['number', 'time', 'level', 'lat', 'lon'], - speed, speedattrs), - 'angle': ( - ['number', 'time', 'level', 'lat', 'lon'], - angle, angleattrs)}, - coords={ - 'lon': lon_coord, 'lat': lat_coord, 'level': level_coord, - 'number': number_coord, 'time': time_coord}) - - # add time coordinates - data.coords['forecast_reference_time'] = init_time[0] - data.coords['forecast_period'] = ('time', fhour, { - 'long_name':'forecast_period', 'units':'hour'}) - - # add attributes - data.attrs['Conventions'] = "CF-1.6" - data.attrs['Origin'] = 'MICAPS Cassandra Server' - - # sort latitude coordinates - data = data.loc[{'lat':sorted(data.coords['lat'].values)}] - - # cache data - if cache: - with open(cache_file, 'wb') as f: - pickle.dump(data, f, protocol=pickle.HIGHEST_PROTOCOL) + if status == 200: + ByteArrayResult.ParseFromString(response) + if ByteArrayResult.errorCode == 1: + return None + if ByteArrayResult is not None: + byteArray = ByteArrayResult.byteArray + if byteArray == '': + print('There is no data ' + filename + ' in ' + directory) + return None + + data = parse_model_grid_bytearray( + byteArray, + varname=varname, + varattrs=varattrs, + scale_off=scale_off, + levattrs=levattrs, + origin='MICAPS Cassandra Server', + ) + + # cache data + if cache: + with open(cache_file, 'wb') as f: + pickle.dump(data, f, protocol=pickle.HIGHEST_PROTOCOL) # return data return data @@ -465,7 +290,7 @@ def get_model_grid(directory, filename=None, suffix="*.024", return None -def get_model_grids(directory, filenames, allExists=True, pbar=False, **kargs): +def get_model_grids(directory, filenames, allExists=True, pbar=False, **kargs): """ Retrieve multiple time grids from MICAPS cassandra service. @@ -477,34 +302,18 @@ def get_model_grids(directory, filenames, allExists=True, pbar=False, **kargs): **kargs: key arguments passed to get_model_grid function. """ - dataset = [] - - if pbar: - tqdm_filenames = tqdm(filenames, desc=directory + ": ") - else: - tqdm_filenames = filenames - - # get the file list for check - file_list = get_file_list(directory) - for filename in tqdm_filenames: - # check the file exists - if filename in file_list: - data = get_model_grid(directory, filename=filename, check_file_first=False, **kargs) - if data: - dataset.append(data) - else: - if allExists: - warnings.warn("{} doese not exists.".format(directory+'/'+filename)) - return None - else: - if allExists: - warnings.warn("{} doese not exists.".format(directory+'/'+filename)) - return None - - return xr.concat(dataset, dim='time') + return collect_model_grids( + directory, + filenames, + all_exists=allExists, + pbar=pbar, + get_file_list_func=get_file_list, + get_model_grid_func=get_model_grid, + **kargs + ) -def get_model_points(directory, filenames, points, **kargs): +def get_model_points(directory, filenames, points, **kargs): """ Retrieve point time series from MICAPS cassandra service. Return xarray, (time, points) @@ -523,14 +332,13 @@ def get_model_points(directory, filenames, points, **kargs): >>> data = get_model_points(dataDir, filenames, points) """ - data = get_model_grids(directory, filenames, **kargs) - if data: - return data.interp(lon=('points', points['lon']), lat=('points', points['lat'])) - else: - return None + data = get_model_grids(directory, filenames, **kargs) + if data is not None: + return data.interp(lon=('points', points['lon']), lat=('points', points['lat'])) + return None -def get_model_3D_grid(directory, filename, levels, allExists=True, pbar=False, **kargs): +def get_model_3D_grid(directory, filename, levels, allExists=True, pbar=False, **kargs): """ Retrieve 3D [level, lat, lon] grids from MICAPS cassandra service. @@ -549,28 +357,18 @@ def get_model_3D_grid(directory, filename, levels, allExists=True, pbar=False, * >>> data = get_model_3D_grid(directory, filename, levels) """ - dataset = [] - if pbar: - tqdm_levels = tqdm(levels, desc=directory+": ") - else: - tqdm_levels = levels - for level in tqdm_levels: - if directory[-1] == '/': - dataDir = directory + str(int(level)).strip() - else: - dataDir = directory + '/' + str(int(level)).strip() - data = get_model_grid(dataDir, filename=filename, **kargs) - if data: - dataset.append(data) - else: - if allExists: - warnings.warn("{} doese not exists.".format(dataDir+'/'+filename)) - return None - - return xr.concat(dataset, dim='level') + return collect_model_3d_grid( + directory, + filename, + levels, + all_exists=allExists, + pbar=pbar, + get_model_grid_func=get_model_grid, + **kargs + ) -def get_model_3D_grids(directory, filenames, levels, allExists=True, pbar=True, **kargs): +def get_model_3D_grids(directory, filenames, levels, allExists=True, pbar=True, **kargs): """ Retrieve 3D [time, level, lat, lon] grids from MICAPS cassandra service. @@ -590,31 +388,18 @@ def get_model_3D_grids(directory, filenames, levels, allExists=True, pbar=True, >>> data = get_model_3D_grids(directory, filenames, levels) """ - dataset = [] - if pbar: - tqdm_filenames = tqdm(filenames, desc=directory+": ") - else: - tqdm_filenames = filenames - for filename in tqdm_filenames: - dataset_temp = [] - for level in levels: - if directory[-1] == '/': - dataDir = directory + str(int(level)).strip() - else: - dataDir = directory + '/' + str(int(level)).strip() - data = get_model_grid(dataDir, filename=filename, **kargs) - if data: - dataset_temp.append(data) - else: - if allExists: - warnings.warn("{} doese not exists.".format(dataDir+'/'+filename)) - return None - dataset.append(xr.concat(dataset_temp, dim='level')) - - return xr.concat(dataset, dim='time') + return collect_model_3d_grids( + directory, + filenames, + levels, + all_exists=allExists, + pbar=pbar, + get_model_grid_func=get_model_grid, + **kargs + ) -def get_model_profiles(directory, filenames, levels, points, **kargs): +def get_model_profiles(directory, filenames, levels, points, **kargs): """ Retrieve time series of vertical profile from 3D [time, level, lat, lon] grids from MICAPS cassandra service. @@ -633,11 +418,10 @@ def get_model_profiles(directory, filenames, levels, points, **kargs): data = get_model_profiles(directory, filenames, levels, points) """ - data = get_model_3D_grids(directory, filenames, levels, **kargs) - if data: - return data.interp(lon=('points', points['lon']), lat=('points', points['lat'])) - else: - return None + data = get_model_3D_grids(directory, filenames, levels, **kargs) + if data is not None: + return data.interp(lon=('points', points['lon']), lat=('points', points['lat'])) + return None def get_station_data(directory, filename=None, suffix="*.000", @@ -695,154 +479,16 @@ def get_station_data(directory, filename=None, suffix="*.000", print('Can not retrieve data' + filename + ' from ' + directory) return None ByteArrayResult = DataBlock_pb2.ByteArrayResult() - if status == 200: - ByteArrayResult.ParseFromString(response) - if ByteArrayResult is not None: - byteArray = ByteArrayResult.byteArray - - # define head structure - head_dtype = [('discriminator', 'S4'), ('type', 'i2'), - ('description', 'S100'), - ('level', 'f4'), ('levelDescription', 'S50'), - ('year', 'i4'), ('month', 'i4'), ('day', 'i4'), - ('hour', 'i4'), ('minute', 'i4'), ('second', 'i4'), - ('Timezone', 'i4'), ('id_type', 'i2'), ('extent', 'S98')] - - # read head information - head_info = np.frombuffer(byteArray[0:288], dtype=head_dtype) - id_type = head_info['id_type'][0] - ind = 288 - - # read the number of stations - station_number = np.frombuffer( - byteArray[ind:(ind+4)], dtype='i4')[0] - ind += 4 - - # read the number of elements - element_number = np.frombuffer( - byteArray[ind:(ind+2)], dtype='i2')[0] - ind += 2 - - # construct record structure - element_type_map = { - 1: 'b1', 2: 'i2', 3: 'i4', 4: 'i8', 5: 'f4', 6: 'f8', 7: 'S'} - element_map = {} - for i in range(element_number): - element_id = str( - np.frombuffer(byteArray[ind:(ind+2)], dtype='i2')[0]) - ind += 2 - element_type = np.frombuffer( - byteArray[ind:(ind+2)], dtype='i2')[0] - ind += 2 - element_map[element_id] = element_type_map[element_type] - - # loop every station to retrieve record - # id_type=0 保持不变 - if id_type==0: - record_head_dtype = [ - ('ID', 'i4'), ('lon', 'f4'), ('lat', 'f4'), ('numb', 'i2')] - records = [] - for i in range(station_number): - record_head = np.frombuffer( - byteArray[ind:(ind+14)], dtype=record_head_dtype) - ind += 14 - record = { - 'ID': record_head['ID'][0], 'lon': record_head['lon'][0], - 'lat': record_head['lat'][0]} - for j in range(record_head['numb'][0]): # the record element number is not same, missing value is not included. - element_id = str( - np.frombuffer(byteArray[ind:(ind + 2)], dtype='i2')[0]) - ind += 2 - element_type = element_map[element_id] - if element_type == 'S': # if the element type is string, we need get the length of string - str_len = np.frombuffer(byteArray[ind:(ind + 2)], dtype='i2')[0] - ind += 2 - element_type = element_type + str(str_len) - element_len = int(element_type[1:]) - record[element_id] = np.frombuffer( - byteArray[ind:(ind + element_len)], - dtype=element_type)[0] - ind += element_len - records += [record] - else: # ==1 - record_head_dtype = [ - ('lon', 'f4'), ('lat', 'f4'), ('numb', 'i2')] - records = [] - for i in range(station_number): - # 原先为14个字节固定,现在改成前2字节定义接下来ID string长度 - ID_string_length = np.frombuffer(byteArray[ind:(ind + 2)], dtype="i2")[0] - record_ID = np.frombuffer(byteArray[ind + 2:(ind + 2 + ID_string_length)], - dtype="S" + str(ID_string_length))[0] - record_ID = record_ID.decode() - ind += (2 + ID_string_length) - record_head = np.frombuffer( - byteArray[ind:(ind+10)], dtype=record_head_dtype) - ind += 10 - record = { - 'ID': record_ID, 'lon': record_head['lon'][0], - 'lat': record_head['lat'][0]} - for j in range(record_head['numb'][0]): # the record element number is not same, missing value is not included. - element_id = str( - np.frombuffer(byteArray[ind:(ind + 2)], dtype='i2')[0]) - ind += 2 - element_type = element_map[element_id] - if element_type == 'S': # if the element type is string, we need get the length of string - str_len = np.frombuffer(byteArray[ind:(ind + 2)], dtype='i2')[0] - ind += 2 - element_type = element_type + str(str_len) - element_len = int(element_type[1:]) - record[element_id] = np.frombuffer( - byteArray[ind:(ind + element_len)], - dtype=element_type)[0] - ind += element_len - records += [record] - - # convert to pandas data frame - records = pd.DataFrame(records) - records.set_index('ID') - - # get time - time = datetime( - head_info['year'][0], head_info['month'][0], - head_info['day'][0], head_info['hour'][0], - head_info['minute'][0], head_info['second'][0]) - records['time'] = time - - # change column name for common observation - records.rename(columns={'3': 'Alt', '4': 'Grade', '5': 'Type', '21': 'Name', - '201': 'Wind_angle', '203': 'Wind_speed', '205': 'Wind_angle_1m_avg', '207': 'Wind_speed_1m_avg', - '209': 'Wind_angle_2m_avg', '211': 'Wind_speed_2m_avg', '213': 'Wind_angle_10m_avg', '215': 'Wind_speed_10m_avg', - '217': 'Wind_angle_max', '219': 'Wind_speed_max', '221': 'Wind_angle_instant', '223': 'Wind_speed_instant', - '225': 'Gust_angle', '227': 'Gust_speed', '229': 'Gust_angle_6h', '231': 'Gust_speed_6h', - '233': 'Gust_angle_12h', '235': 'Gust_speed_12h', '237': 'Wind_power', - '401': 'Sea_level_pressure', '403': 'Pressure_3h_trend', '405': 'Pressure_24h_trend', - '407': 'Station_pressure', '409': 'Pressure_max', '411': 'Pressure_min', '413': 'Pressure', - '415': 'Pressure_day_avg', '417': 'SLP_day_avg', '419': 'Hight', '421': 'Geopotential_hight', - '601': 'Temp', '603': 'Temp_max', '605': 'Temp_min', '607': 'Temp_24h_trend', - '609': 'Temp_24h_max', '611':'Temp_24h_min', '613': 'Temp_dav_avg', - '801': 'Dewpoint', '803': 'Dewpoint_depression', '805': 'Relative_humidity', - '807': 'Relative_humidity_min', '809': 'Relative_humidity_day_avg', - '811': 'Water_vapor_pressure', '813': 'Water_vapor_pressure_day_avg', - '1001': 'Rain', '1003': 'Rain_1h', '1005': 'Rain_3h', '1007': 'Rain_6h', - '1009': 'Rain_12h', '1011': 'Rain_24h', '1013': 'Rain_day', - '1015': 'Rain_20-08', '1017': 'Rain_08-20', '1019': 'Rain_20-20', '1021': 'Rain_08-08', - '1023': 'Evaporation', '1025': 'Evaporation_large', '1027': 'Precipitable_water', - '1201': 'Vis_1min', '1203': 'Vis_10min', '1205': 'Vis_min', '1207': 'Vis_manual', - '1401': 'Total_cloud_cover', '1403': 'Low_cloud_cover', '1405': 'Cloud_base_hight', - '1407': 'Low_cloud', '1409': 'Middle_cloud', '1411': 'High_cloud', - '1413': 'TCC_day_avg', '1415': 'LCC_day_avg', '1417': 'Cloud_cover', '1419': 'Cloud_type', - '1601': 'Weather_current', '1603': 'Weather_past_1', '1605': 'Weather_past_2', - '2001': 'Surface_temp', '2003': 'Surface_temp_max', '2005': 'Surface_temp_min'}, - inplace=True) - - # drop all NaN columns - if dropna: - records = records.dropna(axis=1, how='all') - - # cache records - if cache: - with open(cache_file, 'wb') as f: - pickle.dump(records, f, protocol=pickle.HIGHEST_PROTOCOL) + if status == 200: + ByteArrayResult.ParseFromString(response) + if ByteArrayResult is not None: + byteArray = ByteArrayResult.byteArray + records = parse_station_data_bytearray(byteArray, dropna=dropna) + + # cache records + if cache: + with open(cache_file, 'wb') as f: + pickle.dump(records, f, protocol=pickle.HIGHEST_PROTOCOL) # return return records @@ -852,7 +498,7 @@ def get_station_data(directory, filename=None, suffix="*.000", return None -def get_station_dataset(directory, filenames, allExists=True, pbar=False, **kargs): +def get_station_dataset(directory, filenames, allExists=True, pbar=False, **kargs): """ Retrieve multiple station observation from MICAPS cassandra service. @@ -864,21 +510,14 @@ def get_station_dataset(directory, filenames, allExists=True, pbar=False, **karg **kargs: key arguments passed to get_fy_awx function. """ - dataset = [] - if pbar: - tqdm_filenames = tqdm(filenames, desc=directory + ": ") - else: - tqdm_filenames = filenames - for filename in tqdm_filenames: - data = get_station_data(directory, filename=filename, **kargs) - if data: - dataset.append(data) - else: - if allExists: - warnings.warn("{} doese not exists.".format(directory+'/'+filename)) - return None - - return pd.concat(dataset) + return collect_station_dataset( + directory, + filenames, + all_exists=allExists, + pbar=pbar, + get_station_data_func=get_station_data, + **kargs + ) def get_fy_awx(directory, filename=None, suffix="*.AWX", units='', cache=True, cache_clear=True): @@ -960,7 +599,7 @@ def get_fy_awx(directory, filename=None, suffix="*.AWX", units='', cache=True, c return None -def get_fy_awxs(directory, filenames, allExists=True, pbar=False, **kargs): +def get_fy_awxs(directory, filenames, allExists=True, pbar=False, **kargs): """ Retrieve multiple satellite images from MICAPS cassandra service. @@ -972,733 +611,325 @@ def get_fy_awxs(directory, filenames, allExists=True, pbar=False, **kargs): **kargs: key arguments passed to get_fy_awx function. """ - dataset = [] - if pbar: - tqdm_filenames = tqdm(filenames, desc=directory + ": ") - else: - tqdm_filenames = filenames - for filename in tqdm_filenames: - data = get_fy_awx(directory, filename=filename, **kargs) - if data: - dataset.append(data) - else: - if allExists: - warnings.warn("{} doese not exists.".format(directory+'/'+filename)) - return None - - return xr.concat(dataset, dim='time') + return collect_xarray_dataset( + directory, + filenames, + all_exists=allExists, + pbar=pbar, + get_data_func=get_fy_awx, + concat_dim='time', + **kargs + ) -def _lzw_decompress(compressed): +def _lzw_decompress(compressed): """Decompress a list of output ks to a string. refer to https://stackoverflow.com/questions/6834388/basic-lzw-compression-help-in-python. """ - # Build the dictionary. - dict_size = 256 - dictionary = {chr(i): chr(i) for i in range(dict_size)} - - w = result = compressed.pop(0) - for k in compressed: - if k in dictionary: - entry = dictionary[k] - elif k == dict_size: - entry = w + w[0] - else: - raise ValueError('Bad compressed k: %s' % k) - result += entry - - # Add w+entry[0] to the dictionary. - dictionary[dict_size] = w + entry[0] - dict_size += 1 - - w = entry - return result - - -def get_radar_mosaic(directory, filename=None, suffix="*.BIN", cache=True, cache_clear=True): - """ - 该程序主要用于读取和处理中国气象局CRaMS系统的雷达回波全国拼图数据. - - :param directory: the data directory on the service - :param filename: the data filename, if none, will be the latest file. - :param suffix: the filename filter pattern which will be used to - find the specified file. - :param cache: cache retrieved data to local directory, default is True. - :return: xarray object. - - :Example: - >>> data = get_radar_mosaic("RADARMOSAIC/CREF/") - >>> dir_dir = "RADARMOSAIC/CREF/" - >>> filename = "ACHN_CREF_20210413_005000.BIN" - >>> CREF = get_radar_mosaic(dir_dir, filename=filename, cache=False) - >>> print(CREF['time'].values) - """ - - # get data file name - if filename is None: - try: - # connect to data service - service = GDSDataService() - status, response = service.getLatestDataName(directory, suffix) - except ValueError: - print('Can not retrieve data from ' + directory) - return None - StringResult = DataBlock_pb2.StringResult() - if status == 200: - StringResult.ParseFromString(response) - if StringResult is not None: - filename = StringResult.name - if filename == '': - return None - else: - return None - - # retrieve data from cached file - if cache: - cache_file = CONFIG.get_cache_file(directory, filename, name="MICAPS_DATA", cache_clear=cache_clear) - if cache_file.is_file(): - with open(cache_file, 'rb') as f: - data = pickle.load(f) - return data - - # get data contents - try: - service = GDSDataService() - status, response = service.getData(directory, filename) - except ValueError: - print('Can not retrieve data' + filename + ' from ' + directory) - return None - ByteArrayResult = DataBlock_pb2.ByteArrayResult() - if status == 200: - ByteArrayResult.ParseFromString(response) - if ByteArrayResult is not None: - byteArray = ByteArrayResult.byteArray - if byteArray == b'': - print('There is no data ' + filename + ' in ' + directory) - return None - - # check data format version - if byteArray[0:3] == b'MOC': - # the new V3 format - head_dtype = [ - ('label', 'S4'), # 文件固定标识:MOC - ('Version', 'S4'), # 文件格式版本代码,如: 1.0,1.1,etc - ('FileBytes', 'i4'), # 包含头信息在内的文件字节数,不超过2M - ('MosaicID', 'i2'), # 拼图产品编号 - ('coordinate', 'i2'), # 坐标类型: 2=笛卡儿坐标,3=等经纬网格坐标 - ('varname', 'S8'), # 产品代码,如: ET,VIL,CR,CAP,OHP,OHPC - ('description', 'S64'), # 产品描述,如Composite Reflectivity mosaic - ('BlockPos', 'i4'), # 产品数据起始位置(字节顺序) - ('BlockLen', 'i4'), # 产品数据字节数 - - ('TimeZone', 'i4'), # 数据时钟,0=世界时,28800=北京时 - ('yr', 'i2'), # 观测时间中的年份 - ('mon', 'i2'), # 观测时间中的月份(1-12) - ('day', 'i2'), # 观测时间中的日期(1-31) - ('hr', 'i2'), # 观测时间中的小时(00-23) - ('min', 'i2'), # 观测时间中的分(00-59) - ('sec', 'i2'), # 观测时间中的秒(00-59) - ('ObsSeconds', 'i4'), # 观测时间的seconds - ('ObsDates', 'u2'), # 观测时间中的Julian dates - ('GenDates', 'u2'), # 产品处理时间的天数 - ('GenSeconds', 'i4'), # 产品处理时间的描述 - - ('edge_s', 'i4'), # 数据区的南边界,单位:1/1000度,放大1千倍 - ('edge_w', 'i4'), # 数据区的西边界,单位:1/1000度,放大1千倍 - ('edge_n', 'i4'), # 数据区的北边界,单位:1/1000度,放大1千倍 - ('edge_e', 'i4'), # 数据区的东边界,单位:1/1000度,放大1千倍 - ('cx', 'i4'), # 数据区中心坐标,单位:1/1000度,放大1千倍 - ('cy', 'i4'), # 数据区中心坐标,单位:1/1000度,放大1千倍 - ('nX', 'i4'), # 格点坐标为列数 - ('nY', 'i4'), # 格点坐标为行数 - ('dx', 'i4'), # 格点坐标为列分辨率,单位:1/10000度,放大1万倍 - ('dy', 'i4'), # 格点坐标为行分辨率,单位:1/10000度,放大1万倍 - ('height', 'i2'), # 雷达高度 - ('Compress', 'i2'), # 数据压缩标识, 0=无,1=bz2,2=zip,3=lzw - ('num_of_radars', 'i4'), # 有多少个雷达进行了拼图 - ('UnZipBytes', 'i4'), # 数据段压缩前的字节数 - ('scale', 'i2'), - ('unUsed', 'i2'), - ('RgnID', 'S8'), - ('units', 'S8'), - ('reserved', 'S60') - ] - - # read head information - head_info = np.frombuffer(byteArray[0:256], dtype=head_dtype) - ind = 256 - - # get data information - varname = head_info['varname'][0].decode("utf-8", 'ignore').strip('\x00') - longname = head_info['description'][0].decode("utf-8", 'ignore').strip('\x00') - units = head_info['units'][0].decode("utf-8", 'ignore').strip('\x00') - - # define data variable - rows = head_info['nY'][0] - cols = head_info['nX'][0] - dlat = head_info['dx'][0]/10000. - dlon = head_info['dy'][0]/10000. - - # decompress byte Array - # 目前主要支持bz2压缩格式, zip, lzw格式还没有测试过 - if head_info['Compress'] == 0: # 无压缩 - data = np.frombuffer(byteArray[ind:], 'i2') - elif head_info['Compress'] == 1: # - data = np.frombuffer(bz2.decompress(byteArray[ind:]), 'i2') - elif head_info['Compress'] == 2: - data = np.frombuffer(zlib.decompress(byteArray[ind:]), 'i2') - elif head_info['Compress'] == 3: - data = np.frombuffer(_lzw_decompress(byteArray[ind:]), 'i2') - else: - print('Can not decompress data.') - return None - - # reshape data - data.shape = (rows, cols) - - # deal missing data and restore values - data = data.astype(np.float32) - data[data < 0] = np.nan - data /= head_info['scale'][0] - - # set longitude and latitude coordinates - lat = head_info['edge_n'][0]/1000. - np.arange(rows)*dlat - dlat/2.0 - lon = head_info['edge_w'][0]/1000. + np.arange(cols)*dlon - dlon/2.0 - - # reverse latitude axis - data = np.flip(data, 0) - lat = lat[::-1] - - # set time coordinates - # 直接使用时间有问题, 需要天数减去1, 秒数加上28800(8小时) - time = datetime(head_info['yr'][0], head_info['mon'][0], head_info['day'][0], - head_info['hr'][0], head_info['min'][0], head_info['sec'][0]) - time = np.array([time], dtype='datetime64[m]') - data = np.expand_dims(data, axis=0) - - else: - # the old LONLAT format. - # define head structure - head_dtype = [ - ('description', 'S128'), - # product name, QREF=基本反射率, CREF=组合反射率, - # VIL=液态水含量, OHP=一小时降水 - ('name', 'S32'), - ('organization', 'S16'), - ('grid_flag', 'u2'), # 经纬网格数据标识,固定值19532 - ('data_byte', 'i2'), # 数据单元字节数,固定值2 - ('slat', 'f4'), # 数据区的南纬(度) - ('wlon', 'f4'), # 数据区的西经(度) - ('nlat', 'f4'), # 数据区的北纬(度) - ('elon', 'f4'), # 数据区的东经(度) - ('clat', 'f4'), # 数据区中心纬度(度) - ('clon', 'f4'), # 数据区中心经度(度) - ('rows', 'i4'), # 数据区的行数 - ('cols', 'i4'), # 每行数据的列数 - ('dlat', 'f4'), # 纬向分辨率(度) - ('dlon', 'f4'), # 经向分辨率(度) - ('nodata', 'f4'), # 无数据区的编码值 - ('levelbybtes', 'i4'), # 单层数据字节数 - ('levelnum', 'i2'), # 数据层个数 - ('amp', 'i2'), # 数值放大系数 - ('compmode', 'i2'), # 数据压缩存储时为1,否则为0 - ('dates', 'u2'), # 数据观测时间,为1970年1月1日以来的天数 - ('seconds', 'i4'), # 数据观测时间的秒数 - ('min_value', 'i2'), # 放大后的数据最小取值 - ('max_value', 'i2'), # 放大后的数据最大取值 - ('reserved', 'i2', 6) # 保留字节 - ] - - # read head information - head_info = np.frombuffer(byteArray[0:256], dtype=head_dtype) - ind = 256 - - # get data information - varname = head_info['name'][0].decode("utf-8", 'ignore').rsplit('\x00')[0] - longname = {'CREF': 'Composite Reflectivity', 'QREF': 'Basic Reflectivity', - 'VIL': 'Vertically Integrated Liquid', 'OHP': 'One Hour Precipitation'} - longname = longname.get(varname, 'radar mosaic') - units = head_info['organization'][0].decode("utf-8", 'ignore').rsplit('\x00')[0] - amp = head_info['amp'][0] - - # define data variable - rows = head_info['rows'][0] - cols = head_info['cols'][0] - dlat = head_info['dlat'][0] - dlon = head_info['dlon'][0] - data = np.full(rows*cols, -9999, dtype=np.int32) - - # put data into array - while ind < len(byteArray): - irow = np.frombuffer(byteArray[ind:(ind + 2)], dtype='i2')[0] - ind += 2 - icol = np.frombuffer(byteArray[ind:(ind + 2)], dtype='i2')[0] - ind += 2 - if irow == -1 or icol == -1: - break - nrec = np.frombuffer(byteArray[ind:(ind + 2)], dtype='i2')[0] - ind += 2 - recd = np.frombuffer( - byteArray[ind:(ind + 2*nrec)], dtype='i2', count=nrec) - ind += 2*nrec - position = (irow-1)*cols+icol-1 - data[position:(position+nrec)] = recd - - # reshape data - data.shape = (rows, cols) - - # deal missing data and restore values - data = data.astype(np.float32) - data[data < 0] = np.nan - data /= amp - - # set longitude and latitude coordinates - lat = head_info['nlat'][0] - np.arange(rows)*dlat - dlat/2.0 - lon = head_info['wlon'][0] + np.arange(cols)*dlon - dlon/2.0 - - # reverse latitude axis - data = np.flip(data, 0) - lat = lat[::-1] - - # set time coordinates - # 直接使用时间有问题, 需要天数减去1, 秒数加上28800(8小时) - time = datetime(1970, 1, 1) + timedelta( - days=head_info['dates'][0].astype(np.float64)-1, - seconds=head_info['seconds'][0].astype(np.float64)+28800) - time = np.array([time], dtype='datetime64[m]') - data = np.expand_dims(data, axis=0) - - # define coordinates - time_coord = ('time', time) - lon_coord = ('lon', lon, { - 'long_name':'longitude', 'units':'degrees_east', - '_CoordinateAxisType':'Lon', "axis": "X"}) - lat_coord = ('lat', lat, { - 'long_name':'latitude', 'units':'degrees_north', - '_CoordinateAxisType':'Lat', "axis": "Y"}) - - # create xarray - varattrs = {'long_name': longname, 'short_name': varname, 'units': units} - data = xr.Dataset({'data':(['time', 'lat', 'lon'], data, varattrs)}, - coords={'time':time_coord, 'lat':lat_coord, 'lon':lon_coord}) - - # add attributes - data.attrs['Conventions'] = "CF-1.6" - data.attrs['Origin'] = 'MICAPS Cassandra Server' - - # cache data - if cache: - with open(cache_file, 'wb') as f: - pickle.dump(data, f, protocol=pickle.HIGHEST_PROTOCOL) - - # return - return data - else: - return None - else: - return None - - -def get_radar_mosaics(directory, filenames, allExists=True, pbar=False, **kargs): - """ - Retrieve multiple radar mosaics from MICAPS cassandra service. - - Args: - directory (string): the data directory on the service. - filenames (list): the list of filenames. - allExists (boolean): all files should exist, or return None. - pbar (boolean): Show progress bar, default to False. - **kargs: key arguments passed to get_fy_awx function. - """ - - dataset = [] - if pbar: - tqdm_filenames = tqdm(filenames, desc=directory + ": ") - else: - tqdm_filenames = filenames - for filename in tqdm_filenames: - data = get_radar_mosaic(directory, filename=filename, **kargs) - if data: - dataset.append(data) - else: - if allExists: - warnings.warn("{} doese not exists.".format(directory+'/'+filename)) - return None - - return xr.concat(dataset, dim='time') - - -def get_tlogp(directory, filename=None, suffix="*.000", - remove_duplicate=False, remove_na=False, - cache=False, cache_clear=True): - """ - 该程序用于读取micaps服务器上TLOGP数据信息, 文件格式与MICAPS第5类格式相同. - - :param directory: the data directory on the service - :param filename: the data filename, if none, will be the latest file. - :param suffix: the filename filter pattern which will be used to - find the specified file. - :param remove_duplicate: boolean, the duplicate records will be removed. - :param remove_na: boolean, the na records will be removed. - :param cache: cache retrieved data to local directory, default is True. - :return: pandas DataFrame object. - - >>> data = get_tlogp("UPPER_AIR/TLOGP/") - """ - - # get data file name - if filename is None: - try: - # connect to data service - service = GDSDataService() - status, response = service.getLatestDataName(directory, suffix) - except ValueError: - print('Can not retrieve data from ' + directory) - return None - StringResult = DataBlock_pb2.StringResult() - if status == 200: - StringResult.ParseFromString(response) - if StringResult is not None: - filename = StringResult.name - if filename == '': - return None - else: - return None - - # retrieve data from cached file - if cache: - cache_file = CONFIG.get_cache_file(directory, filename, name="MICAPS_DATA", cache_clear=cache_clear) - if cache_file.is_file(): - with open(cache_file, 'rb') as f: - records = pickle.load(f) - return records - - # get data contents - try: - service = GDSDataService() - status, response = service.getData(directory, filename) - except ValueError: - print('Can not retrieve data' + filename + ' from ' + directory) - return None - ByteArrayResult = DataBlock_pb2.ByteArrayResult() - if status == 200: - ByteArrayResult.ParseFromString(response) - if ByteArrayResult is not None: - byteArray = ByteArrayResult.byteArray - if byteArray == b'': - print('There is no data ' + filename + ' in ' + directory) - return None - - # decode bytes to string - txt = byteArray.decode("utf-8") - txt = list(filter(None, re.split(' |\n', txt))) - - # observation date and time - if len(txt[3]) < 4: - year = int(txt[3]) + 2000 - else: - year = int(txt[3]) - month = int(txt[4]) - day = int(txt[5]) - hour = int(txt[6]) - time = datetime(year, month, day, hour) - - # the number of records - number = int(txt[7]) - if number < 1: - return None - - # cut the data - txt = txt[8:] - - # put the data into dictionary - index = 0 - records = [] - while index < len(txt): - # get the record information - ID = txt[index].strip() - lon = float(txt[index+1]) - lat = float(txt[index+2]) - alt = float(txt[index+3]) - number = int(int(txt[index+4])/6) - index += 5 - - # get the sounding records - for i in range(number): - record = { - 'ID': ID, 'lon': lon, 'lat': lat, 'alt': alt, - 'time': time, - 'p': float(txt[index]), 'h': float(txt[index+1]), - 't': float(txt[index+2]), 'td': float(txt[index+3]), - 'wd': float(txt[index+4]), - 'ws': float(txt[index+5])} - records.append(record) - index += 6 - - # transform to pandas data frame - records = pd.DataFrame(records) - records.set_index('ID') - - # dealing missing values - records = records.replace(9999.0, np.nan) - if remove_duplicate: - records = records.drop_duplicates() - if remove_na: - records = records.dropna(subset=['p', 'h', 't', 'td']) - - # the sounding height value convert to meters by multiple 10 - records['h'] = records['h'] * 10.0 - - # cache data - if cache: - with open(cache_file, 'wb') as f: - pickle.dump(records, f, protocol=pickle.HIGHEST_PROTOCOL) - - # return - return records - else: - return None - else: - return None - - -def get_tlogps(directory, filenames, allExists=True, pbar=False, **kargs): - """ - Retrieve multiple tlog observation from MICAPS cassandra service. - - Args: - directory (string): the data directory on the service. - filenames (list): the list of filenames. - allExists (boolean): all files should exist, or return None. - pbar (boolean): Show progress bar, default to False. - **kargs: key arguments passed to get_fy_awx function. - """ - - dataset = [] - if pbar: - tqdm_filenames = tqdm(filenames, desc=directory + ": ") - else: - tqdm_filenames = filenames - for filename in tqdm_filenames: - data = get_tlogp(directory, filename=filename, **kargs) - if data: - dataset.append(data) - else: - if allExists: - warnings.warn("{} doese not exists.".format(directory+'/'+filename)) - return None - - return pd.concat(dataset) - - -def get_swan_radar(directory, filename=None, suffix="*.000", scale=[0.1, 0], - varattrs={'long_name': 'quantitative_precipitation_forecast', 'short_name': 'QPF', 'units': 'mm'}, - cache=True, cache_clear=True, attach_forecast_period=True): - """ - 该程序用于读取micaps服务器上SWAN的D131格点数据格式. - refer to https://www.taodocs.com/p-274692126.html - - :param directory: the data directory on the service - :param filename: the data filename, if none, will be the latest file. - :param suffix: the filename filter pattern which will be used to - find the specified file. - :param scale: data value will be scaled = (data + scale[1]) * scale[0], normally, - CREF, CAPPI: [0.5, -66] - radar echo height, VIL, OHP, ...: [0.1, 0] - :param varattrs: dictionary, variable attributes. - :param cache: cache retrieved data to local directory, default is True. - :return: pandas DataFrame object. - - >>> data = get_swan_radar("RADARMOSAIC/EXTRAPOLATION/QPF/") - """ - - # get data file name - if filename is None: - try: - # connect to data service - service = GDSDataService() - status, response = service.getLatestDataName(directory, suffix) - except ValueError: - print('Can not retrieve data from ' + directory) - return None - StringResult = DataBlock_pb2.StringResult() - if status == 200: - StringResult.ParseFromString(response) - if StringResult is not None: - filename = StringResult.name - if filename == '': - return None - else: - return None - - # retrieve data from cached file - if cache: - cache_file = CONFIG.get_cache_file(directory, filename, name="MICAPS_DATA", cache_clear=cache_clear) - if cache_file.is_file(): - with open(cache_file, 'rb') as f: - data = pickle.load(f) - return data - - # get data contents - try: - service = GDSDataService() - status, response = service.getData(directory, filename) - except ValueError: - print('Can not retrieve data' + filename + ' from ' + directory) - return None - ByteArrayResult = DataBlock_pb2.ByteArrayResult() - if status == 200: - ByteArrayResult.ParseFromString(response) - if ByteArrayResult is not None: - byteArray = ByteArrayResult.byteArray - if byteArray == b'': - print('There is no data ' + filename + ' in ' + directory) - return None - - # define head structure - head_dtype = [ - ('ZonName', 'S12'), - ('DataName', 'S38'), - ('Flag', 'S8'), - ('Version', 'S8'), - ('year', 'i2'), - ('month', 'i2'), - ('day', 'i2'), - ('hour', 'i2'), - ('minute', 'i2'), - ('interval', 'i2'), - ('XNumGrids', 'i2'), - ('YNumGrids', 'i2'), - ('ZNumGrids', 'i2'), - ('RadarCount', 'i4'), - ('StartLon', 'f4'), - ('StartLat', 'f4'), - ('CenterLon', 'f4'), - ('CenterLat', 'f4'), - ('XReso', 'f4'), - ('YReso', 'f4'), - ('ZhighGrids', 'f4', 40), - ('RadarStationName', 'S20', 16), - ('RadarLongitude', 'f4', 20), - ('RadarLatitude', 'f4', 20), - ('RadarAltitude', 'f4', 20), - ('MosaicFlag', 'S1', 20), - ('m_iDataType', 'i2'), - ('m_iLevelDimension', 'i2'), - ('Reserved', 'S168')] - - # read head information - head_info = np.frombuffer(byteArray[0:1024], dtype=head_dtype) - ind = 1024 - - # get coordinates - nlon = head_info['XNumGrids'][0].astype(np.int64) - nlat = head_info['YNumGrids'][0].astype(np.int64) - nlev = head_info['ZNumGrids'][0].astype(np.int64) - dlon = head_info['XReso'][0].astype(np.float64) - dlat = head_info['YReso'][0].astype(np.float64) - lat = head_info['StartLat'][0] - np.arange(nlat)*dlat - dlat/2.0 - lon = head_info['StartLon'][0] + np.arange(nlon)*dlon - dlon/2.0 - level = head_info['ZhighGrids'][0][0:nlev] - - # retrieve data records - data_type = ['u1', 'u1', 'u2', 'i2'] - data_type = data_type[head_info['m_iDataType'][0]] - data_len = (nlon * nlat * nlev) - data = np.frombuffer( - byteArray[ind:(ind + data_len*int(data_type[1]))], - dtype=data_type, count=data_len) - - # convert data type - data.shape = (nlev, nlat, nlon) - data = data.astype(np.float32) - data = (data + scale[1]) * scale[0] - - # reverse latitude axis - data = np.flip(data, 1) - lat = lat[::-1] - - # set time coordinates - init_time = datetime( - head_info['year'][0], head_info['month'][0], - head_info['day'][0], head_info['hour'][0], head_info['minute'][0]) - if attach_forecast_period: - fhour = int(filename.split('.')[1])/60.0 - else: - fhour = 0 - fhour = np.array([fhour], dtype=np.float64) - time = init_time + timedelta(hours=fhour[0]) - init_time = np.array([init_time], dtype='datetime64[ms]') - time = np.array([time], dtype='datetime64[ms]') - - # define coordinates - time_coord = ('time', time) - lon_coord = ('lon', lon, { - 'long_name':'longitude', 'units':'degrees_east', - '_CoordinateAxisType':'Lon', "axis": "X"}) - lat_coord = ('lat', lat, { - 'long_name':'latitude', 'units':'degrees_north', - '_CoordinateAxisType':'Lat', "axis": "Y"}) - level_coord = ('level', level, { - 'long_name':'height', 'units':'m'}) - - # create xarray - data = np.expand_dims(data, axis=0) - data = xr.Dataset({'data':(['time', 'level', 'lat', 'lon'], data, varattrs)}, - coords={'time':time_coord, 'level':level_coord, 'lat':lat_coord, 'lon':lon_coord}) - - # add time coordinates - data.coords['forecast_reference_time'] = init_time[0] - data.coords['forecast_period'] = ('time', fhour, { - 'long_name':'forecast_period', 'units':'hour'}) - - # add attributes - data.attrs['Conventions'] = "CF-1.6" - data.attrs['Origin'] = 'MICAPS Cassandra Server' - - # cache data - if cache: - with open(cache_file, 'wb') as f: - pickle.dump(data, f, protocol=pickle.HIGHEST_PROTOCOL) - - # return - return data - else: - return None - else: - return None - - -def get_swan_radars(directory, filenames, allExists=True, pbar=False, **kargs): - """ - Retrieve multiple swan 131 radar from MICAPS cassandra service. - - Args: - directory (string): the data directory on the service. - filenames (list): the list of filenames. - allExists (boolean): all files should exist, or return None. - pbar (boolean): Show progress bar, default to False. - **kargs: key arguments passed to get_fy_awx function. - """ - - dataset = [] - if pbar: - tqdm_filenames = tqdm(filenames, desc=directory + ": ") - else: - tqdm_filenames = filenames - for filename in tqdm_filenames: - data = get_swan_radar(directory, filename=filename, **kargs) - if data: - dataset.append(data) - else: - if allExists: - warnings.warn("{} doese not exists.".format(directory+'/'+filename)) - return None - - return xr.concat(dataset, dim='time') - - + return lzw_decompress(compressed) + + +def get_radar_mosaic(directory, filename=None, suffix="*.BIN", cache=True, cache_clear=True): + """ + 该程序主要用于读取和处理中国气象局CRaMS系统的雷达回波全国拼图数据. + + :param directory: the data directory on the service + :param filename: the data filename, if none, will be the latest file. + :param suffix: the filename filter pattern which will be used to + find the specified file. + :param cache: cache retrieved data to local directory, default is True. + :return: xarray object. + + :Example: + >>> data = get_radar_mosaic("RADARMOSAIC/CREF/") + >>> dir_dir = "RADARMOSAIC/CREF/" + >>> filename = "ACHN_CREF_20210413_005000.BIN" + >>> CREF = get_radar_mosaic(dir_dir, filename=filename, cache=False) + >>> print(CREF['time'].values) + """ + + # get data file name + if filename is None: + try: + service = GDSDataService() + status, response = service.getLatestDataName(directory, suffix) + except ValueError: + print('Can not retrieve data from ' + directory) + return None + StringResult = DataBlock_pb2.StringResult() + if status == 200: + StringResult.ParseFromString(response) + if StringResult is not None: + filename = StringResult.name + if filename == '': + return None + else: + return None + + # retrieve data from cached file + if cache: + cache_file = CONFIG.get_cache_file(directory, filename, name="MICAPS_DATA", cache_clear=cache_clear) + if cache_file.is_file(): + with open(cache_file, 'rb') as f: + data = pickle.load(f) + return data + + # get data contents + try: + service = GDSDataService() + status, response = service.getData(directory, filename) + except ValueError: + print('Can not retrieve data' + filename + ' from ' + directory) + return None + + ByteArrayResult = DataBlock_pb2.ByteArrayResult() + if status == 200: + ByteArrayResult.ParseFromString(response) + if ByteArrayResult is not None: + byteArray = ByteArrayResult.byteArray + if byteArray == b'': + print('There is no data ' + filename + ' in ' + directory) + return None + + data = parse_radar_mosaic_bytearray( + byteArray, origin='MICAPS Cassandra Server' + ) + if data is None: + print('Can not decompress data.') + return None + + if cache: + with open(cache_file, 'wb') as f: + pickle.dump(data, f, protocol=pickle.HIGHEST_PROTOCOL) + return data + else: + return None + else: + return None + + +def get_radar_mosaics(directory, filenames, allExists=True, pbar=False, **kargs): + """ + Retrieve multiple radar mosaics from MICAPS cassandra service. + + Args: + directory (string): the data directory on the service. + filenames (list): the list of filenames. + allExists (boolean): all files should exist, or return None. + pbar (boolean): Show progress bar, default to False. + **kargs: key arguments passed to get_fy_awx function. + """ + + return collect_xarray_dataset( + directory, + filenames, + all_exists=allExists, + pbar=pbar, + get_data_func=get_radar_mosaic, + concat_dim='time', + **kargs + ) + +def get_tlogp(directory, filename=None, suffix="*.000", + remove_duplicate=False, remove_na=False, + cache=False, cache_clear=True): + """ + 该程序用于读取micaps服务器上TLOGP数据信息, 文件格式与MICAPS第5类格式相同. + + :param directory: the data directory on the service + :param filename: the data filename, if none, will be the latest file. + :param suffix: the filename filter pattern which will be used to + find the specified file. + :param remove_duplicate: boolean, the duplicate records will be removed. + :param remove_na: boolean, the na records will be removed. + :param cache: cache retrieved data to local directory, default is True. + :return: pandas DataFrame object. + + >>> data = get_tlogp("UPPER_AIR/TLOGP/") + """ + + if filename is None: + try: + service = GDSDataService() + status, response = service.getLatestDataName(directory, suffix) + except ValueError: + print('Can not retrieve data from ' + directory) + return None + StringResult = DataBlock_pb2.StringResult() + if status == 200: + StringResult.ParseFromString(response) + if StringResult is not None: + filename = StringResult.name + if filename == '': + return None + else: + return None + + if cache: + cache_file = CONFIG.get_cache_file(directory, filename, name="MICAPS_DATA", cache_clear=cache_clear) + if cache_file.is_file(): + with open(cache_file, 'rb') as f: + records = pickle.load(f) + return records + + try: + service = GDSDataService() + status, response = service.getData(directory, filename) + except ValueError: + print('Can not retrieve data' + filename + ' from ' + directory) + return None + + ByteArrayResult = DataBlock_pb2.ByteArrayResult() + if status == 200: + ByteArrayResult.ParseFromString(response) + if ByteArrayResult is not None: + byteArray = ByteArrayResult.byteArray + if byteArray == b'': + print('There is no data ' + filename + ' in ' + directory) + return None + + records = parse_tlogp_bytearray( + byteArray, + remove_duplicate=remove_duplicate, + remove_na=remove_na + ) + if records is None: + return None + + if cache: + with open(cache_file, 'wb') as f: + pickle.dump(records, f, protocol=pickle.HIGHEST_PROTOCOL) + return records + else: + return None + else: + return None + + +def get_tlogps(directory, filenames, allExists=True, pbar=False, **kargs): + """ + Retrieve multiple tlog observation from MICAPS cassandra service. + + Args: + directory (string): the data directory on the service. + filenames (list): the list of filenames. + allExists (boolean): all files should exist, or return None. + pbar (boolean): Show progress bar, default to False. + **kargs: key arguments passed to get_fy_awx function. + """ + + return collect_station_dataset( + directory, + filenames, + all_exists=allExists, + pbar=pbar, + get_station_data_func=get_tlogp, + **kargs + ) + + +def get_swan_radar(directory, filename=None, suffix="*.000", scale=[0.1, 0], + varattrs={'long_name': 'quantitative_precipitation_forecast', 'short_name': 'QPF', 'units': 'mm'}, + cache=True, cache_clear=True, attach_forecast_period=True): + """ + 该程序用于读取micaps服务器上SWAN的D131格点数据格式. + refer to https://www.taodocs.com/p-274692126.html + + :param directory: the data directory on the service + :param filename: the data filename, if none, will be the latest file. + :param suffix: the filename filter pattern which will be used to + find the specified file. + :param scale: data value will be scaled = (data + scale[1]) * scale[0], normally, + CREF, CAPPI: [0.5, -66] + radar echo height, VIL, OHP, ...: [0.1, 0] + :param varattrs: dictionary, variable attributes. + :param cache: cache retrieved data to local directory, default is True. + :return: pandas DataFrame object. + + >>> data = get_swan_radar("RADARMOSAIC/EXTRAPOLATION/QPF/") + """ + + if filename is None: + try: + service = GDSDataService() + status, response = service.getLatestDataName(directory, suffix) + except ValueError: + print('Can not retrieve data from ' + directory) + return None + StringResult = DataBlock_pb2.StringResult() + if status == 200: + StringResult.ParseFromString(response) + if StringResult is not None: + filename = StringResult.name + if filename == '': + return None + else: + return None + + if cache: + cache_file = CONFIG.get_cache_file(directory, filename, name="MICAPS_DATA", cache_clear=cache_clear) + if cache_file.is_file(): + with open(cache_file, 'rb') as f: + data = pickle.load(f) + return data + + try: + service = GDSDataService() + status, response = service.getData(directory, filename) + except ValueError: + print('Can not retrieve data' + filename + ' from ' + directory) + return None + + ByteArrayResult = DataBlock_pb2.ByteArrayResult() + if status == 200: + ByteArrayResult.ParseFromString(response) + if ByteArrayResult is not None: + byteArray = ByteArrayResult.byteArray + if byteArray == b'': + print('There is no data ' + filename + ' in ' + directory) + return None + + data = parse_swan_radar_bytearray( + byteArray, + filename=filename, + scale=scale, + varattrs=varattrs, + attach_forecast_period=attach_forecast_period, + origin='MICAPS Cassandra Server' + ) + + if cache: + with open(cache_file, 'wb') as f: + pickle.dump(data, f, protocol=pickle.HIGHEST_PROTOCOL) + return data + else: + return None + else: + return None + + +def get_swan_radars(directory, filenames, allExists=True, pbar=False, **kargs): + """ + Retrieve multiple swan 131 radar from MICAPS cassandra service. + + Args: + directory (string): the data directory on the service. + filenames (list): the list of filenames. + allExists (boolean): all files should exist, or return None. + pbar (boolean): Show progress bar, default to False. + **kargs: key arguments passed to get_fy_awx function. + """ + + return collect_xarray_dataset( + directory, + filenames, + all_exists=allExists, + pbar=pbar, + get_data_func=get_swan_radar, + concat_dim='time', + **kargs + ) def get_radar_standard(directory, filename=None, suffix="*.BZ2", cache=True, cache_clear=True): """ 该程序用于读取Micaps服务器上的单站雷达基数据, 该数据为 diff --git a/nmc_met_io/retrieve_shared.py b/nmc_met_io/retrieve_shared.py new file mode 100644 index 0000000..50dc5eb --- /dev/null +++ b/nmc_met_io/retrieve_shared.py @@ -0,0 +1,832 @@ +# -*- coding: utf-8 -*- + +"""Shared helpers for retrieval modules.""" + +from datetime import datetime, timedelta +import bz2 +import re +import warnings +import zlib + +import numpy as np +import pandas as pd +import xarray as xr +from tqdm import tqdm + + +_STATION_COLUMN_MAP = { + '3': 'Alt', '4': 'Grade', '5': 'Type', '21': 'Name', + '201': 'Wind_angle', '203': 'Wind_speed', '205': 'Wind_angle_1m_avg', + '207': 'Wind_speed_1m_avg', '209': 'Wind_angle_2m_avg', + '211': 'Wind_speed_2m_avg', '213': 'Wind_angle_10m_avg', + '215': 'Wind_speed_10m_avg', '217': 'Wind_angle_max', + '219': 'Wind_speed_max', '221': 'Wind_angle_instant', + '223': 'Wind_speed_instant', '225': 'Gust_angle', '227': 'Gust_speed', + '229': 'Gust_angle_6h', '231': 'Gust_speed_6h', '233': 'Gust_angle_12h', + '235': 'Gust_speed_12h', '237': 'Wind_power', + '401': 'Sea_level_pressure', '403': 'Pressure_3h_trend', + '405': 'Pressure_24h_trend', '407': 'Station_pressure', + '409': 'Pressure_max', '411': 'Pressure_min', '413': 'Pressure', + '415': 'Pressure_day_avg', '417': 'SLP_day_avg', '419': 'Hight', + '421': 'Geopotential_hight', '601': 'Temp', '603': 'Temp_max', + '605': 'Temp_min', '607': 'Temp_24h_trend', '609': 'Temp_24h_max', + '611': 'Temp_24h_min', '613': 'Temp_dav_avg', '801': 'Dewpoint', + '803': 'Dewpoint_depression', '805': 'Relative_humidity', + '807': 'Relative_humidity_min', '809': 'Relative_humidity_day_avg', + '811': 'Water_vapor_pressure', '813': 'Water_vapor_pressure_day_avg', + '1001': 'Rain', '1003': 'Rain_1h', '1005': 'Rain_3h', '1007': 'Rain_6h', + '1009': 'Rain_12h', '1011': 'Rain_24h', '1013': 'Rain_day', + '1015': 'Rain_20-08', '1017': 'Rain_08-20', '1019': 'Rain_20-20', + '1021': 'Rain_08-08', '1023': 'Evaporation', '1025': 'Evaporation_large', + '1027': 'Precipitable_water', '1201': 'Vis_1min', '1203': 'Vis_10min', + '1205': 'Vis_min', '1207': 'Vis_manual', '1401': 'Total_cloud_cover', + '1403': 'Low_cloud_cover', '1405': 'Cloud_base_hight', + '1407': 'Low_cloud', '1409': 'Middle_cloud', '1411': 'High_cloud', + '1413': 'TCC_day_avg', '1415': 'LCC_day_avg', '1417': 'Cloud_cover', + '1419': 'Cloud_type', '1601': 'Weather_current', + '1603': 'Weather_past_1', '1605': 'Weather_past_2', + '2001': 'Surface_temp', '2003': 'Surface_temp_max', + '2005': 'Surface_temp_min', +} + + +def parse_model_grid_bytearray( + byte_array, varname, varattrs, scale_off, levattrs, origin +): + """Parse MICAPS model grid bytes to xarray.Dataset.""" + + head_dtype = [ + ('discriminator', 'S4'), ('type', 'i2'), + ('modelName', 'S20'), ('element', 'S50'), + ('description', 'S30'), ('level', 'f4'), + ('year', 'i4'), ('month', 'i4'), ('day', 'i4'), + ('hour', 'i4'), ('timezone', 'i4'), + ('period', 'i4'), ('startLongitude', 'f4'), + ('endLongitude', 'f4'), ('longitudeGridSpace', 'f4'), + ('longitudeGridNumber', 'i4'), + ('startLatitude', 'f4'), ('endLatitude', 'f4'), + ('latitudeGridSpace', 'f4'), + ('latitudeGridNumber', 'i4'), + ('isolineStartValue', 'f4'), + ('isolineEndValue', 'f4'), + ('isolineSpace', 'f4'), + ('perturbationNumber', 'i2'), + ('ensembleTotalNumber', 'i2'), + ('minute', 'i2'), ('second', 'i2'), + ('Extent', 'S92'), + ] + + head_info = np.frombuffer(byte_array[0:278], dtype=head_dtype) + data_type = head_info['type'][0] + nlon = head_info['longitudeGridNumber'][0] + nlat = head_info['latitudeGridNumber'][0] + nmem = head_info['ensembleTotalNumber'][0] + + if data_type == 4: + data_dtype = [('data', 'f4', (nlat, nlon))] + data_len = nlat * nlon * 4 + elif data_type == 11: + data_dtype = [('data', 'f4', (2, nlat, nlon))] + data_len = 2 * nlat * nlon * 4 + else: + raise Exception("Data type is not supported") + + if nmem == 0: + data = np.frombuffer(byte_array[278:], dtype=data_dtype) + data = np.squeeze(data['data']) + else: + if data_type == 4: + data = np.full((nmem, nlat, nlon), np.nan) + else: + data = np.full((nmem, 2, nlat, nlon), np.nan) + ind = 0 + for _ in range(nmem): + head_info_mem = np.frombuffer( + byte_array[ind:(ind + 278)], dtype=head_dtype + ) + ind += 278 + data_mem = np.frombuffer( + byte_array[ind:(ind + data_len)], dtype=data_dtype + ) + ind += data_len + number = head_info_mem['perturbationNumber'][0] + if data_type == 4: + data[number, :, :] = np.squeeze(data_mem['data']) + else: + data[number, :, :, :] = np.squeeze(data_mem['data']) + + if scale_off is not None: + data = data * scale_off[0] + scale_off[1] + + slon = head_info['startLongitude'][0] + dlon = head_info['longitudeGridSpace'][0] + slat = head_info['startLatitude'][0] + dlat = head_info['latitudeGridSpace'][0] + lon = np.arange(nlon) * dlon + slon + lat = np.arange(nlat) * dlat + slat + level = np.array([head_info['level'][0]]) + + init_time = datetime( + head_info['year'][0], head_info['month'][0], + head_info['day'][0], head_info['hour'][0] + ) + fhour = np.array([head_info['period'][0]], dtype=np.float64) + time = init_time + timedelta(hours=fhour[0]) + init_time = np.array([init_time], dtype='datetime64[ms]') + time = np.array([time], dtype='datetime64[ms]') + + time_coord = ('time', time) + lon_coord = ('lon', lon, { + 'long_name': 'longitude', 'units': 'degrees_east', + '_CoordinateAxisType': 'Lon', 'axis': 'X' + }) + lat_coord = ('lat', lat, { + 'long_name': 'latitude', 'units': 'degrees_north', + '_CoordinateAxisType': 'Lat', 'axis': 'Y' + }) + if level[0] != 0: + level_coord = ('level', level, levattrs) + if nmem != 0: + number = np.arange(nmem) + number_coord = ('number', number, {'_CoordinateAxisType': 'Ensemble'}) + + if data_type == 4: + if nmem == 0: + if level[0] == 0: + data = data[np.newaxis, ...] + data = xr.Dataset({ + varname: (['time', 'lat', 'lon'], data, varattrs)}, + coords={'time': time_coord, 'lat': lat_coord, 'lon': lon_coord} + ) + else: + data = data[np.newaxis, np.newaxis, ...] + data = xr.Dataset({ + varname: (['time', 'level', 'lat', 'lon'], data, varattrs)}, + coords={ + 'time': time_coord, 'level': level_coord, + 'lat': lat_coord, 'lon': lon_coord + } + ) + else: + if level[0] == 0: + data = data[:, np.newaxis, ...] + data = xr.Dataset({ + varname: (['number', 'time', 'lat', 'lon'], data, varattrs)}, + coords={ + 'number': number_coord, 'time': time_coord, + 'lat': lat_coord, 'lon': lon_coord + } + ) + else: + data = data[:, np.newaxis, np.newaxis, ...] + data = xr.Dataset({ + varname: ( + ['number', 'time', 'level', 'lat', 'lon'], + data, varattrs + )}, + coords={ + 'number': number_coord, 'time': time_coord, + 'level': level_coord, 'lat': lat_coord, 'lon': lon_coord + } + ) + elif data_type == 11: + speedattrs = {'long_name': 'wind speed', 'units': 'm/s'} + angleattrs = {'long_name': 'wind angle', 'units': 'degree'} + if nmem == 0: + speed = np.squeeze(data[0, :, :]) + angle = np.squeeze(data[1, :, :]) + angle = 270. - angle + angle[angle < 0] = angle[angle < 0] + 360. + if level[0] == 0: + speed = speed[np.newaxis, ...] + angle = angle[np.newaxis, ...] + data = xr.Dataset({ + 'speed': (['time', 'lat', 'lon'], speed, speedattrs), + 'angle': (['time', 'lat', 'lon'], angle, angleattrs)}, + coords={'lon': lon_coord, 'lat': lat_coord, 'time': time_coord} + ) + else: + speed = speed[np.newaxis, np.newaxis, ...] + angle = angle[np.newaxis, np.newaxis, ...] + data = xr.Dataset({ + 'speed': (['time', 'level', 'lat', 'lon'], speed, speedattrs), + 'angle': (['time', 'level', 'lat', 'lon'], angle, angleattrs)}, + coords={ + 'lon': lon_coord, 'lat': lat_coord, + 'level': level_coord, 'time': time_coord + } + ) + else: + speed = np.squeeze(data[:, 0, :, :]) + angle = np.squeeze(data[:, 1, :, :]) + angle = 270. - angle + angle[angle < 0] = angle[angle < 0] + 360. + if level[0] == 0: + speed = speed[:, np.newaxis, ...] + angle = angle[:, np.newaxis, ...] + data = xr.Dataset({ + 'speed': (['number', 'time', 'lat', 'lon'], speed, speedattrs), + 'angle': (['number', 'time', 'lat', 'lon'], angle, angleattrs)}, + coords={ + 'lon': lon_coord, 'lat': lat_coord, + 'number': number_coord, 'time': time_coord + } + ) + else: + speed = speed[:, np.newaxis, np.newaxis, ...] + angle = angle[:, np.newaxis, np.newaxis, ...] + data = xr.Dataset({ + 'speed': ( + ['number', 'time', 'level', 'lat', 'lon'], + speed, speedattrs + ), + 'angle': ( + ['number', 'time', 'level', 'lat', 'lon'], + angle, angleattrs + )}, + coords={ + 'lon': lon_coord, 'lat': lat_coord, 'level': level_coord, + 'number': number_coord, 'time': time_coord + } + ) + + data.coords['forecast_reference_time'] = init_time[0] + data.coords['forecast_period'] = ('time', fhour, { + 'long_name': 'forecast_period', 'units': 'hour' + }) + data.attrs['Conventions'] = "CF-1.6" + data.attrs['Origin'] = origin + data = data.loc[{'lat': sorted(data.coords['lat'].values)}] + return data + + +def parse_station_data_bytearray(byte_array, dropna=True): + """Parse MICAPS station bytes to pandas.DataFrame.""" + + head_dtype = [ + ('discriminator', 'S4'), ('type', 'i2'), + ('description', 'S100'), + ('level', 'f4'), ('levelDescription', 'S50'), + ('year', 'i4'), ('month', 'i4'), ('day', 'i4'), + ('hour', 'i4'), ('minute', 'i4'), ('second', 'i4'), + ('Timezone', 'i4'), ('id_type', 'i2'), ('extent', 'S98') + ] + + head_info = np.frombuffer(byte_array[0:288], dtype=head_dtype) + id_type = head_info['id_type'][0] + ind = 288 + station_number = np.frombuffer(byte_array[ind:(ind + 4)], dtype='i4')[0] + ind += 4 + element_number = np.frombuffer(byte_array[ind:(ind + 2)], dtype='i2')[0] + ind += 2 + + element_type_map = { + 1: 'b1', 2: 'i2', 3: 'i4', 4: 'i8', 5: 'f4', 6: 'f8', 7: 'S' + } + element_map = {} + for _ in range(element_number): + element_id = str(np.frombuffer(byte_array[ind:(ind + 2)], dtype='i2')[0]) + ind += 2 + element_type = np.frombuffer(byte_array[ind:(ind + 2)], dtype='i2')[0] + ind += 2 + element_map[element_id] = element_type_map[element_type] + + if id_type == 0: + record_head_dtype = [('ID', 'i4'), ('lon', 'f4'), ('lat', 'f4'), ('numb', 'i2')] + records = [] + for _ in range(station_number): + record_head = np.frombuffer(byte_array[ind:(ind + 14)], dtype=record_head_dtype) + ind += 14 + record = { + 'ID': record_head['ID'][0], 'lon': record_head['lon'][0], + 'lat': record_head['lat'][0] + } + for _ in range(record_head['numb'][0]): + element_id = str(np.frombuffer(byte_array[ind:(ind + 2)], dtype='i2')[0]) + ind += 2 + element_type = element_map[element_id] + if element_type == 'S': + str_len = np.frombuffer(byte_array[ind:(ind + 2)], dtype='i2')[0] + ind += 2 + element_type = element_type + str(str_len) + element_len = int(element_type[1:]) + record[element_id] = np.frombuffer( + byte_array[ind:(ind + element_len)], dtype=element_type + )[0] + ind += element_len + records += [record] + else: + record_head_dtype = [('lon', 'f4'), ('lat', 'f4'), ('numb', 'i2')] + records = [] + for _ in range(station_number): + id_string_length = np.frombuffer(byte_array[ind:(ind + 2)], dtype="i2")[0] + record_id = np.frombuffer( + byte_array[ind + 2:(ind + 2 + id_string_length)], + dtype="S" + str(id_string_length) + )[0].decode() + ind += (2 + id_string_length) + record_head = np.frombuffer(byte_array[ind:(ind + 10)], dtype=record_head_dtype) + ind += 10 + record = { + 'ID': record_id, 'lon': record_head['lon'][0], + 'lat': record_head['lat'][0] + } + for _ in range(record_head['numb'][0]): + element_id = str(np.frombuffer(byte_array[ind:(ind + 2)], dtype='i2')[0]) + ind += 2 + element_type = element_map[element_id] + if element_type == 'S': + str_len = np.frombuffer(byte_array[ind:(ind + 2)], dtype='i2')[0] + ind += 2 + element_type = element_type + str(str_len) + element_len = int(element_type[1:]) + record[element_id] = np.frombuffer( + byte_array[ind:(ind + element_len)], dtype=element_type + )[0] + ind += element_len + records += [record] + + records = pd.DataFrame(records) + records.set_index('ID') + time = datetime( + head_info['year'][0], head_info['month'][0], head_info['day'][0], + head_info['hour'][0], head_info['minute'][0], head_info['second'][0] + ) + records['time'] = time + records.rename(columns=_STATION_COLUMN_MAP, inplace=True) + if dropna: + records = records.dropna(axis=1, how='all') + return records + + +def collect_model_grids( + directory, filenames, all_exists, pbar, get_file_list_func, get_model_grid_func, **kwargs +): + """Collect multiple model grids and concat on time.""" + + dataset = [] + iterator = tqdm(filenames, desc=directory + ": ") if pbar else filenames + file_list = get_file_list_func(directory) + for filename in iterator: + if filename in file_list: + data = get_model_grid_func( + directory, filename=filename, check_file_first=False, **kwargs + ) + if data is not None: + dataset.append(data) + elif all_exists: + warnings.warn("{} doese not exists.".format(directory + '/' + filename)) + return None + elif all_exists: + warnings.warn("{} doese not exists.".format(directory + '/' + filename)) + return None + return xr.concat(dataset, dim='time') + + +def collect_model_3d_grid( + directory, filename, levels, all_exists, pbar, get_model_grid_func, **kwargs +): + """Collect one time 3D model grid.""" + + dataset = [] + iterator = tqdm(levels, desc=directory + ": ") if pbar else levels + for level in iterator: + data_dir = "{}/{}".format(directory.rstrip('/'), str(int(level)).strip()) + data = get_model_grid_func(data_dir, filename=filename, **kwargs) + if data is not None: + dataset.append(data) + elif all_exists: + warnings.warn("{} doese not exists.".format(data_dir + '/' + filename)) + return None + return xr.concat(dataset, dim='level') + + +def collect_model_3d_grids( + directory, filenames, levels, all_exists, pbar, get_model_grid_func, **kwargs +): + """Collect multiple time 3D model grids.""" + + dataset = [] + iterator = tqdm(filenames, desc=directory + ": ") if pbar else filenames + for filename in iterator: + dataset_temp = [] + for level in levels: + data_dir = "{}/{}".format(directory.rstrip('/'), str(int(level)).strip()) + data = get_model_grid_func(data_dir, filename=filename, **kwargs) + if data is not None: + dataset_temp.append(data) + elif all_exists: + warnings.warn("{} doese not exists.".format(data_dir + '/' + filename)) + return None + dataset.append(xr.concat(dataset_temp, dim='level')) + return xr.concat(dataset, dim='time') + + +def collect_station_dataset( + directory, filenames, all_exists, pbar, get_station_data_func, **kwargs +): + """Collect multiple station datasets and concat rows.""" + + dataset = [] + iterator = tqdm(filenames, desc=directory + ": ") if pbar else filenames + for filename in iterator: + data = get_station_data_func(directory, filename=filename, **kwargs) + if data is not None: + dataset.append(data) + elif all_exists: + warnings.warn("{} doese not exists.".format(directory + '/' + filename)) + return None + return pd.concat(dataset) + + +def collect_xarray_dataset( + directory, filenames, all_exists, pbar, get_data_func, concat_dim='time', **kwargs +): + """Collect multiple xarray datasets and concat.""" + + dataset = [] + iterator = tqdm(filenames, desc=directory + ": ") if pbar else filenames + for filename in iterator: + data = get_data_func(directory, filename=filename, **kwargs) + if data is not None: + dataset.append(data) + elif all_exists: + warnings.warn("{} doese not exists.".format(directory + '/' + filename)) + return None + return xr.concat(dataset, dim=concat_dim) + + +def lzw_decompress(compressed): + """Decompress a list of output ks to a string.""" + + dict_size = 256 + dictionary = {chr(i): chr(i) for i in range(dict_size)} + + w = result = compressed.pop(0) + for k in compressed: + if k in dictionary: + entry = dictionary[k] + elif k == dict_size: + entry = w + w[0] + else: + raise ValueError('Bad compressed k: %s' % k) + result += entry + dictionary[dict_size] = w + entry[0] + dict_size += 1 + w = entry + return result + + +def parse_radar_mosaic_bytearray(byte_array, origin): + """Parse radar mosaic bytes to xarray.Dataset.""" + + if byte_array[0:3] == b'MOC': + head_dtype = [ + ('label', 'S4'), + ('Version', 'S4'), + ('FileBytes', 'i4'), + ('MosaicID', 'i2'), + ('coordinate', 'i2'), + ('varname', 'S8'), + ('description', 'S64'), + ('BlockPos', 'i4'), + ('BlockLen', 'i4'), + ('TimeZone', 'i4'), + ('yr', 'i2'), + ('mon', 'i2'), + ('day', 'i2'), + ('hr', 'i2'), + ('min', 'i2'), + ('sec', 'i2'), + ('ObsSeconds', 'i4'), + ('ObsDates', 'u2'), + ('GenDates', 'u2'), + ('GenSeconds', 'i4'), + ('edge_s', 'i4'), + ('edge_w', 'i4'), + ('edge_n', 'i4'), + ('edge_e', 'i4'), + ('cx', 'i4'), + ('cy', 'i4'), + ('nX', 'i4'), + ('nY', 'i4'), + ('dx', 'i4'), + ('dy', 'i4'), + ('height', 'i2'), + ('Compress', 'i2'), + ('num_of_radars', 'i4'), + ('UnZipBytes', 'i4'), + ('scale', 'i2'), + ('unUsed', 'i2'), + ('RgnID', 'S8'), + ('units', 'S8'), + ('reserved', 'S60') + ] + + head_info = np.frombuffer(byte_array[0:256], dtype=head_dtype) + ind = 256 + + varname = head_info['varname'][0] + longname = head_info['description'][0] + units = head_info['units'][0] + rows = head_info['nY'][0] + cols = head_info['nX'][0] + dlat = head_info['dx'][0] / 10000. + dlon = head_info['dy'][0] / 10000. + + if head_info['Compress'] == 0: + data = np.frombuffer(byte_array[ind:], 'i2') + elif head_info['Compress'] == 1: + data = np.frombuffer(bz2.decompress(byte_array[ind:]), 'i2') + elif head_info['Compress'] == 2: + data = np.frombuffer(zlib.decompress(byte_array[ind:]), 'i2') + elif head_info['Compress'] == 3: + data = np.frombuffer(lzw_decompress(byte_array[ind:]), 'i2') + else: + return None + + data.shape = (rows, cols) + data = data.astype(np.float32) + data[data < 0] = np.nan + data /= head_info['scale'][0] + lat = head_info['edge_n'][0] / 1000. - np.arange(rows) * dlat - dlat / 2.0 + lon = head_info['edge_w'][0] / 1000. + np.arange(cols) * dlon - dlon / 2.0 + data = np.flip(data, 0) + lat = lat[::-1] + + time = datetime( + head_info['yr'][0], head_info['mon'][0], head_info['day'][0], + head_info['hr'][0], head_info['min'][0], head_info['sec'][0] + ) + time = np.array([time], dtype='datetime64[m]') + data = np.expand_dims(data, axis=0) + else: + head_dtype = [ + ('description', 'S128'), + ('name', 'S32'), + ('organization', 'S16'), + ('grid_flag', 'u2'), + ('data_byte', 'i2'), + ('slat', 'f4'), + ('wlon', 'f4'), + ('nlat', 'f4'), + ('elon', 'f4'), + ('clat', 'f4'), + ('clon', 'f4'), + ('rows', 'i4'), + ('cols', 'i4'), + ('dlat', 'f4'), + ('dlon', 'f4'), + ('nodata', 'f4'), + ('levelbybtes', 'i4'), + ('levelnum', 'i2'), + ('amp', 'i2'), + ('compmode', 'i2'), + ('dates', 'u2'), + ('seconds', 'i4'), + ('min_value', 'i2'), + ('max_value', 'i2'), + ('reserved', 'i2', 6) + ] + + head_info = np.frombuffer(byte_array[0:256], dtype=head_dtype) + ind = 256 + varname = head_info['name'][0].decode("utf-8", 'ignore').rsplit('\x00')[0] + longname = { + 'CREF': 'Composite Reflectivity', + 'QREF': 'Basic Reflectivity', + 'VIL': 'Vertically Integrated Liquid', + 'OHP': 'One Hour Precipitation' + }.get(varname, 'radar mosaic') + units = head_info['organization'][0].decode("utf-8", 'ignore').rsplit('\x00')[0] + amp = head_info['amp'][0] + rows = head_info['rows'][0] + cols = head_info['cols'][0] + dlat = head_info['dlat'][0] + dlon = head_info['dlon'][0] + data = np.full(rows * cols, -9999, dtype=np.int32) + + while ind < len(byte_array): + irow = np.frombuffer(byte_array[ind:(ind + 2)], dtype='i2')[0] + ind += 2 + icol = np.frombuffer(byte_array[ind:(ind + 2)], dtype='i2')[0] + ind += 2 + if irow == -1 or icol == -1: + break + nrec = np.frombuffer(byte_array[ind:(ind + 2)], dtype='i2')[0] + ind += 2 + recd = np.frombuffer(byte_array[ind:(ind + 2 * nrec)], dtype='i2', count=nrec) + ind += 2 * nrec + position = (irow - 1) * cols + icol - 1 + data[position:(position + nrec)] = recd + + data.shape = (rows, cols) + data = data.astype(np.float32) + data[data < 0] = np.nan + data /= amp + lat = head_info['nlat'][0] - np.arange(rows) * dlat - dlat / 2.0 + lon = head_info['wlon'][0] + np.arange(cols) * dlon - dlon / 2.0 + data = np.flip(data, 0) + lat = lat[::-1] + time = datetime(1970, 1, 1) + timedelta( + days=head_info['dates'][0].astype(np.float64) - 1, + seconds=head_info['seconds'][0].astype(np.float64) + 28800 + ) + time = np.array([time], dtype='datetime64[m]') + data = np.expand_dims(data, axis=0) + + time_coord = ('time', time) + lon_coord = ('lon', lon, { + 'long_name': 'longitude', 'units': 'degrees_east', + '_CoordinateAxisType': 'Lon', 'axis': 'X' + }) + lat_coord = ('lat', lat, { + 'long_name': 'latitude', 'units': 'degrees_north', + '_CoordinateAxisType': 'Lat', 'axis': 'Y' + }) + varattrs = {'long_name': longname, 'short_name': varname, 'units': units} + data = xr.Dataset( + {'data': (['time', 'lat', 'lon'], data, varattrs)}, + coords={'time': time_coord, 'lat': lat_coord, 'lon': lon_coord} + ) + data.attrs['Conventions'] = "CF-1.6" + data.attrs['Origin'] = origin + return data + + +def parse_tlogp_bytearray(byte_array, remove_duplicate=False, remove_na=False): + """Parse TLOGP bytes to pandas.DataFrame.""" + + txt = byte_array.decode("utf-8") + txt = list(filter(None, re.split(' |\n', txt))) + if len(txt[3]) < 4: + year = int(txt[3]) + 2000 + else: + year = int(txt[3]) + month = int(txt[4]) + day = int(txt[5]) + hour = int(txt[6]) + time = datetime(year, month, day, hour) + + number = int(txt[7]) + if number < 1: + return None + + txt = txt[8:] + index = 0 + records = [] + while index < len(txt): + station_id = txt[index].strip() + lon = float(txt[index + 1]) + lat = float(txt[index + 2]) + alt = float(txt[index + 3]) + number = int(int(txt[index + 4]) / 6) + index += 5 + + for _ in range(number): + record = { + 'ID': station_id, 'lon': lon, 'lat': lat, 'alt': alt, + 'time': time, + 'p': float(txt[index]), 'h': float(txt[index + 1]), + 't': float(txt[index + 2]), 'td': float(txt[index + 3]), + 'wd': float(txt[index + 4]), 'ws': float(txt[index + 5]) + } + records.append(record) + index += 6 + + records = pd.DataFrame(records) + records.set_index('ID') + records = records.replace(9999.0, np.nan) + if remove_duplicate: + records = records.drop_duplicates() + if remove_na: + records = records.dropna(subset=['p', 'h', 't', 'td']) + records['h'] = records['h'] * 10.0 + return records + + +def parse_swan_radar_bytearray( + byte_array, filename, scale, varattrs, attach_forecast_period, origin +): + """Parse SWAN radar bytes to xarray.Dataset.""" + + head_dtype = [ + ('ZonName', 'S12'), + ('DataName', 'S38'), + ('Flag', 'S8'), + ('Version', 'S8'), + ('year', 'i2'), + ('month', 'i2'), + ('day', 'i2'), + ('hour', 'i2'), + ('minute', 'i2'), + ('interval', 'i2'), + ('XNumGrids', 'i2'), + ('YNumGrids', 'i2'), + ('ZNumGrids', 'i2'), + ('RadarCount', 'i4'), + ('StartLon', 'f4'), + ('StartLat', 'f4'), + ('CenterLon', 'f4'), + ('CenterLat', 'f4'), + ('XReso', 'f4'), + ('YReso', 'f4'), + ('ZhighGrids', 'f4', 40), + ('RadarStationName', 'S20', 16), + ('RadarLongitude', 'f4', 20), + ('RadarLatitude', 'f4', 20), + ('RadarAltitude', 'f4', 20), + ('MosaicFlag', 'S1', 20), + ('m_iDataType', 'i2'), + ('m_iLevelDimension', 'i2'), + ('Reserved', 'S168') + ] + + head_info = np.frombuffer(byte_array[0:1024], dtype=head_dtype) + ind = 1024 + nlon = head_info['XNumGrids'][0].astype(np.int64) + nlat = head_info['YNumGrids'][0].astype(np.int64) + nlev = head_info['ZNumGrids'][0].astype(np.int64) + dlon = head_info['XReso'][0].astype(np.float64) + dlat = head_info['YReso'][0].astype(np.float64) + lat = head_info['StartLat'][0] - np.arange(nlat) * dlat - dlat / 2.0 + lon = head_info['StartLon'][0] + np.arange(nlon) * dlon - dlon / 2.0 + level = head_info['ZhighGrids'][0][0:nlev] + + data_type = ['u1', 'u1', 'u2', 'i2'] + data_type = data_type[head_info['m_iDataType'][0]] + data_len = nlon * nlat * nlev + data = np.frombuffer( + byte_array[ind:(ind + data_len * int(data_type[1]))], + dtype=data_type, count=data_len + ) + + data.shape = (nlev, nlat, nlon) + data = data.astype(np.float32) + data = (data + scale[1]) * scale[0] + data = np.flip(data, 1) + lat = lat[::-1] + + init_time = datetime( + head_info['year'][0], head_info['month'][0], + head_info['day'][0], head_info['hour'][0], head_info['minute'][0] + ) + if attach_forecast_period: + fhour = int(filename.split('.')[1]) / 60.0 + else: + fhour = 0 + fhour = np.array([fhour], dtype=np.float64) + time = init_time + timedelta(hours=fhour[0]) + init_time = np.array([init_time], dtype='datetime64[ms]') + time = np.array([time], dtype='datetime64[ms]') + + time_coord = ('time', time) + lon_coord = ('lon', lon, { + 'long_name': 'longitude', 'units': 'degrees_east', + '_CoordinateAxisType': 'Lon', 'axis': 'X' + }) + lat_coord = ('lat', lat, { + 'long_name': 'latitude', 'units': 'degrees_north', + '_CoordinateAxisType': 'Lat', 'axis': 'Y' + }) + level_coord = ('level', level, {'long_name': 'height', 'units': 'm'}) + + data = np.expand_dims(data, axis=0) + data = xr.Dataset( + {'data': (['time', 'level', 'lat', 'lon'], data, varattrs)}, + coords={'time': time_coord, 'level': level_coord, 'lat': lat_coord, 'lon': lon_coord} + ) + data.coords['forecast_reference_time'] = init_time[0] + data.coords['forecast_period'] = ('time', fhour, { + 'long_name': 'forecast_period', 'units': 'hour' + }) + data.attrs['Conventions'] = "CF-1.6" + data.attrs['Origin'] = origin + return data + + +def extract_nafp_grid_metadata(contents, init_time_str, valid_time, fcst_level, units=None): + """Extract common NAFP grid metadata used by CIMISS/CMADAAS readers.""" + + init_time = datetime.strptime(init_time_str, '%Y%m%d%H') + fhour = np.array([valid_time], dtype=np.float64) + time = init_time + timedelta(hours=fhour[0]) + init_time = np.array([init_time], dtype='datetime64[ms]') + time = np.array([time], dtype='datetime64[ms]') + + start_lat = float(contents['startLat']) + start_lon = float(contents['startLon']) + nlon = int(contents['lonCount']) + nlat = int(contents['latCount']) + dlon = float(contents['lonStep']) + dlat = float(contents['latStep']) + lon = start_lon + np.arange(nlon) * dlon + lat = start_lat + np.arange(nlat) * dlat + name = contents['fieldNames'] + if units is None: + units = contents['fieldUnits'] + + if isinstance(fcst_level, str): + fcst_level = 0 + + return init_time, fhour, time, lon, lat, name, units, fcst_level