Skip to content

Commit

Permalink
Merge pull request #62 from ec-jrc/feature/new_ncextract
Browse files Browse the repository at this point in the history
Feature/new ncextract
  • Loading branch information
doc78 authored Apr 30, 2024
2 parents 8421474 + 0f1d0bf commit 3ce5cff
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 51 deletions.
45 changes: 38 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ The package contains convenient classes for reading/writing:
* NetCDFMap
* NetCDFWriter

* __ncextract__ is a tool to extract values from netCDF4 file at specific coordinates.
* __[ncextract](#ncextract)__ is a tool to extract values from NetCDF4 (or GRIB) file(s) at specific coordinates.

### Installation

Expand Down Expand Up @@ -611,11 +611,20 @@ cddmap /meteo/pr --parallel --maxdistance 500

## ncextract

The ncextract tool extracts the time series of values from (multiple) netCDF file(s) at user defined coordinates.
The `ncextract` tool extracts time series from (multiple) NetCDF or GRIB file(s) at user defined coordinates.

### Usage:
The tool takes as input a CSV file containing point coordinates (structured in 3 columns: id, lat, lon) and a directory containing one or more netCDF files.
The output is a CSV file (or optionally a netCDF file) containing the values at the points corresponding to the provided coordinates, in chronological order.
### Usage

The tool takes as input a CSV file containing point coordinates and a directory containing one or more NetCDF or GRIB files. The CSV files must contain only three columns: point identifier, and its two coordinates. The name of the coordinate fields must match those in the NetCDF or GRIB files. For example:

```text
ID,lat,lon
0010,40.6083,-4.2250
0025,37.5250,-6.2750
0033,40.5257,-6.4753
```

The output is a file containing the time series at the pixels corresponding to the provided coordinates, in chronological order. The function supports two otput formats: CSV or NetCDF.

```text
usage: ncextract.py [-h] -i INPUT -d DIRECTORY -o OUTPUT [-nc]
Expand All @@ -631,9 +640,31 @@ options:
-d DIRECTORY, --directory DIRECTORY
Input directory with .nc files
-o OUTPUT, --output OUTPUT
Output file (default is CSV, use -nc for NetCDF)
-nc, --nc Output to NetCDF
Output file. Two extensions are supported: .csv or .nc
```

#### Use in the command prompt

The following command extracts the discharge time series from EFAS simulations (NetCDF files in the directory _EFAS5/discharge/maps_) in a series of points where gauging stations are located (file _gauging_stations.csv_), and saves the extraction as a CSV file.

```bash
ncextract -i ./gauging_stations.csv -d ./EFAS5/discharge/maps/ -o ./EFAS5/discharge/timeseries/results_ncextract.csv
```

#### Use programmatically

The function can be imported in a Python script. It takes as inputs two `xarray.Dataset`: one defining the input maps and the other the points of interest. The result of the extraction can be another `xarray.Dataset`, or saved as a file either in CSV or NetCDF format.

```Python
from lisfloodutilities.ncextract import extract_timeseries

# load desired input maps and points
# maps: xarray.Dataset
# points: xarray.Dataset

# extract time series and save in a xarray.Dataset
ds = extract_timeseries(maps, points, output=None)
```

## Using lisfloodutilities programmatically

Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
attrs>=19.3.0
cffi>=1.15.1
cfgrib>=0.9.11.0
cftime>=1.0.4.2
cloudpickle>=2.2.1
coverage>=6.0
Expand Down
2 changes: 1 addition & 1 deletion src/lisfloodutilities/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.12.22
0.12.23
167 changes: 133 additions & 34 deletions src/lisfloodutilities/ncextract/ncextract.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,82 +11,181 @@
"""

import argparse
import glob
import os
import pandas as pd
import os
import sys
import time
import xarray as xr
import cfgrib
from pathlib import Path
from typing import Union, Optional

def read_points(inputcsv: Union[str, Path]) -> xr.Dataset:
"""It reads a CSV file with coordinates of points: gauging stations, reservoirs...
Parameters:
-----------
inputcsv: string or pathlib.Path
a CSV indicating the points for which the time series will be extracted. It must contain three columns: the identification of the point, and its two coordinates
def extract(inputcsv, directory, outputfile, nc):
start_time = time.perf_counter()
Returns:
--------
poi_xr: xarray.Dataset
the input data converted into a Xarray object
"""

if not os.path.isfile(inputcsv):
print(f'ERROR: {inputcsv} is missing!')
sys.exit(0)
sys.exit(1)

try:
poi_df = pd.read_csv(inputcsv)
original_columns = poi_df.columns.copy()
poi_df.columns = original_columns.str.lower()
# find columns representing coordinates
coord_1 = [col for col in poi_df.columns if col.startswith('lon') or col.startswith('x')][0]
coord_2 = [col for col in poi_df.columns if col.startswith('lat') or col.startswith('y')][0]
# find the column representing the point ID
idx_col = poi_df.columns.difference([coord_1, coord_2])[0]
# convert to xarray.Dataset
poi_xr = poi_df.set_index(idx_col)[[coord_1, coord_2]].to_xarray()
rename_dim = {idx_col: col for col in original_columns if col.lower() == idx_col}
poi_xr = poi_xr.rename(rename_dim)
except:
print('ERROR: Please check that the CSV file is formatted correctly!')
sys.exit(2)

return poi_xr



def read_inputmaps(directory: Union[str, Path]) -> xr.Dataset:
"""It extract from a series of input files (NetCDF or GRIB) the time series of a set of points
Parameters:
-----------
directory: string or pathlib.Path
the directory containing the input files, which can be either in NetCDF or GRIB format
Returns:
--------
maps: xarray.dataset
containing the concatenation of all the input maps
"""

pattern_engine = {'*.nc': 'netcdf4',
'*.grib': 'cfgrib'}

if not os.path.isdir(directory):
print(f'ERROR: {directory} is missing or not a directory!')
sys.exit(0)
sys.exit(1)
else:
directory = Path(directory)

filepaths = []
for file in glob.glob(os.path.join(directory, '*.nc')):
print(file)
filepaths.append(file)
for pattern, engine in pattern_engine.items():
filepaths = list(directory.glob(pattern))
if len(filepaths) > 0:
break

if not filepaths:
print(f'ERROR: No NetCDF files found in {directory}')
sys.exit(0)
print(f'ERROR: No NetCDF/GRIB file found in {directory}')
sys.exit(2)

try:
poi = pd.read_csv(inputcsv)
poi_indexer = poi.set_index('id')[['lat', 'lon']].to_xarray()
except:
print('ERROR: Please check that CSV file is formatted correctly!')
sys.exit(0)
print(f'{len(filepaths)} input {engine} file(s) found in "{directory}"')

# chunks is set to auto for general purpose processing
# it could be optimized depending on input NetCDF
ds = xr.open_mfdataset(filepaths, chunks='auto', parallel=True)
maps = xr.open_mfdataset(filepaths, engine=engine, chunks='auto', parallel=True)

return maps



def extract_timeseries(maps: xr.Dataset,
poi: xr.Dataset,
outputfile: Optional[Union[str, Path]] = None
) -> Optional[xr.Dataset]:
"""It extract from a series of input files (NetCDF or GRIB) the time series of a set of points
Parameters:
-----------
maps: xarray.Dataset
the time stack of input maps from which the time series will be extracted
poi: xarray.Dataset
a Dataset indicating the coordinates of the points of interest. It must have only two variables (the coordinates), and the names of this variables must be dimensions in "maps"
ouputfile: optional, string or pathlib.Path
the file where the results will be saved. It can be either a CSV or a NetCDF file
Returns:
--------
By default, it puts out an xarray.Dataset with the extracted time series. Instead, if "outputfile" is provided, results will be saved to a file (CSV or NetCDF)
"""

ds_poi = ds.sel(lat=poi_indexer.lat, lon=poi_indexer.lon, method='nearest')
print(ds_poi)
print("Processing...")
coord_1, coord_2 = list(poi)
if not all(coord in maps.coords for coord in [coord_1, coord_2]):
print(f'ERROR: The variables in "poi" (coordinates) are not coordinates in "maps"')
sys.exit(1)

# extract time series
maps_poi = maps.sel({coord_1: poi[coord_1], coord_2: poi[coord_2]}, method='nearest')

if outputfile is None:
return maps_poi.compute()

if nc:
ds_poi.to_netcdf(outputfile)
else:
df = ds_poi.to_dataframe()
df_reset = df.reset_index()
df_reset.to_csv(outputfile, index=False)
outputfile = Path(outputfile)
if outputfile.suffix == '.nc':
maps_poi.to_netcdf(outputfile)
elif outputfile.suffix == '.csv':
df = maps_poi.to_dataframe()
df_reset = df.reset_index()
df_reset.to_csv(outputfile, index=False)
else:
print('ERROR: the extension of the output file must be either ".nc" or ".csv"')
sys.exit(2)
print(f'Results exported as {outputfile}')


end_time = time.perf_counter()
elapsed_time = end_time - start_time
print(f"Time elapsed: {elapsed_time:0.2f} seconds")

def main(argv=sys.argv):
prog = os.path.basename(argv[0])
parser = argparse.ArgumentParser(
description="""
Utility to extract time series of values from
(multiple) NetCDF files at specific coordinates.
Coordinates of points of interest must be
Coordinates of points of interest must be
included in a CSV file with at least 3 columns
named id, lat, lon.
""",
prog=prog,
)
parser.add_argument("-i", "--input", required=True, help="Input CSV file (id, lat, lon)")
parser.add_argument("-d", "--directory", required=True, help="Input directory with .nc files")
parser.add_argument("-o", "--output", required=True, help="Output file (default is CSV, use -nc for NetCDF)")
parser.add_argument("-nc", "--nc", action='store_true', help='Output to NetCDF')
parser.add_argument("-o", "--output", required=True, help="Output file. Two extensions are supported: .csv or .nc")

args = parser.parse_args()

extract(args.input, args.directory, args.output, args.nc)
try:
start_time = time.perf_counter()

print('Reading input CSV...')
points = read_points(args.input)
print('Reading input maps...')
maps = read_inputmaps(args.directory)
print(maps)
print('Processing...')
extract_timeseries(maps, points, args.output)

elapsed_time = time.perf_counter() - start_time
print(f"Time elapsed: {elapsed_time:0.2f} seconds")

except Exception as e:
print(f'ERROR: {e}')
sys.exit(1)

def main_script():
sys.exit(main())

if __name__ == "__main__":
main_script()
main_script()
19 changes: 10 additions & 9 deletions tests/test_ncextract.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import unittest
from lisfloodutilities.compare.nc import NetCDFComparator

from lisfloodutilities.ncextract import extract
# from lisfloodutilities.compare.nc import NetCDFComparator
from lisfloodutilities.ncextract import read_points, read_inputmaps, extract_timeseries
import csv

class TestExtract(unittest.TestCase):
Expand All @@ -12,25 +11,27 @@ def compare_csv_files(self, file1, file2):
with open(file1, 'r') as f1, open(file2, 'r') as f2:
reader1 = csv.reader(f1)
reader2 = csv.reader(f2)

for row1, row2 in zip(reader1, reader2):
if row1 != row2:
return False

# Check if both files have the same number of rows
if len(list(reader1)) != len(list(reader2)):
return False

return True

def test_extract_csv(self):
inputcsv = 'tests/data/ncextract/stations.csv'
datasets = 'tests/data/ncextract/datasets'
outputfile = 'tests/data/output.csv'
expected = 'tests/data/ncextract/expected.csv'
extract(inputcsv, datasets, outputfile, nc=False)
poi = read_points(inputcsv)
maps = read_inputmaps(datasets)
extract_timeseries(maps, poi, outputfile)
assert self.compare_csv_files(outputfile, expected)

# def test_extract_nc(self):
# inputcsv = 'tests/data/ncextract/stations.csv'
# datasets = 'tests/data/ncextract/datasets'
Expand All @@ -39,4 +40,4 @@ def test_extract_csv(self):
# extract(inputcsv, datasets, outputfile, nc=True)
# comp = NetCDFComparator(None, for_testing=True)
# comp.compare_files(outputfile, expected)
# assert comp.errors == None
# assert comp.errors == None

0 comments on commit 3ce5cff

Please sign in to comment.