Fetching timeseries with Python using Web API
This tutorial shows how to fetch timeseries from multiple locations via the Nexus Web API and combine them into a single wide-format table, where each row is a timestamp and each column is a location. The approach works correctly even when locations have timeseries of different lengths or with different timestamps.
Info
This tutorial assumes that you have:
- Python installed on your computer, with the
pandasandrequestspackages - obtained a valid Nexus Web API token (with Data Consumer role; see Web API basics for more info)
"""Example: fetch groundwater_head_m_msl timeseries for groundwater_monitoring_tube locations using Nexus Web API.
Results in a Pandas DataFrame like:
timestamp B44C0214-002 GMW000000061643_1 B40H1475
--------------------------------------------------------------------
2026-01-04 00:00:00+00:00 0.539 13.765 NaN
2026-01-05 00:00:00+00:00 0.544 13.761 NaN
2026-01-06 00:00:00+00:00 NaN 13.752 12.95
Steps:
1. Validate settings (output directory, feature type, parameter).
2. Fetch all matching locations and their timeseries IDs via the WFS API.
3. Download timeseries events per location via the Events API and write them to CSV file(s).
4. Print a preview DataFrame of the first 5 locations as a sanity check.
Configure the constants at the top of this file before running.
"""
from datetime import datetime
from io import StringIO
from pathlib import Path
import os
import pandas as pd
import requests
# Required constants
WEB_API_TOKEN = ""
WORLD_ALIAS = ""
FEATURE_TYPE = "groundwater_monitoring_tube" # Run get_available_feature_types() to list options
PARAMETER = "groundwater_head_m_msl" # Run get_available_timeseries_parameters() to list options
TIME_RANGE = "-P3M/+P0M" # Last 3 months; see temporal filtering docs
MAX_LOCATIONS = 3 # Set to None to fetch all locations
LOCATIONS_PER_CSV_FILE = 100 # Each CSV column is one location
OUTPUT_DIR = "" # Leave empty to save next to this script
# Optional constants
TIMESERIES_MAINGROUP = "" # Optionally timeseries filter. Run get_available_maingroups() to list options
TIMESERIES_SUBGROUP = "" # Optionally timeseries filter. Run get_available_subgroups() to list options
LOCALE = "nl-NL" # For English, use 'en-IE'
# Endpoints
URL_WFS = "https://nexus.stellaspark.com/api/v1/wfs"
URL_EVENTS = "https://nexus.stellaspark.com/api/v1/events"
URL_FEATURE_TYPES = "https://nexus.stellaspark.com/api/v1/feature_types"
def get_available_feature_types(token: str, world: str = None) -> dict[str, dict]:
"""List all feature types available in a world.
See https://nexus.stellaspark.com/docs/concepts/vector-feature-types/
"""
url = f"{URL_FEATURE_TYPES}/?page_size=50&token={token}&locale={LOCALE}"
if world:
url += f"&world={world}"
feature_types = {}
while url:
response = requests.get(url)
response.raise_for_status()
data = response.json()
feature_types.update({x["alias"]: x for x in data["results"]})
url = data["next"]
return feature_types
def get_available_timeseries_parameters(token: str, world: str = None, feature_type: str = None) -> dict[str, dict]:
"""List all timeseries parameters available in a world, optionally filtered by feature type."""
params = {
"request": "getfeature",
"outputformat": "json",
"typenames": "timeseries",
"distinct_on": "parameter",
"token": token,
"world": world,
}
response = requests.get(URL_WFS, params=params)
response.raise_for_status()
features = response.json()["features"]
if feature_type:
features = [x for x in features if x["properties"]["feature_type"] == feature_type]
parameters = list(set([x["properties"]["parameter"] for x in features]))
# Enrich results with description and unit.
# See: https://nexus.stellaspark.com/docs/web-api/feature-types/#browsing-timeseries-and-raster-parameters
url = f"{URL_FEATURE_TYPES}/43/?token={token}&locale={LOCALE}"
if world:
url += f"&world={world}"
response = requests.get(url)
response.raise_for_status()
data = response.json()
descriptions_and_units = [x["choices"] for x in data["properties"] if x["alias"] == "parameter"][0]
return {x["alias"]: x for x in descriptions_and_units if x["alias"] in parameters}
def get_available_timeseries_maingroups(
token: str, world: str = None, feature_type: str = None, parameter: str = None
) -> list[str]:
"""List all distinct maingroups for a given feature type and parameter."""
params = {
"request": "getfeature",
"outputformat": "json",
"typenames": "timeseries",
"distinct_on": "maingroup",
"token": token,
"world": world,
}
response = requests.get(URL_WFS, params=params)
response.raise_for_status()
features = response.json()["features"]
if feature_type:
features = [x for x in features if x["properties"]["feature_type"] == feature_type]
if parameter:
features = [x for x in features if x["properties"]["parameter"] == parameter]
return sorted(set(x["properties"]["maingroup"] for x in features if x["properties"].get("maingroup")))
def get_available_timeseries_subgroups(
token: str, world: str = None, feature_type: str = None, parameter: str = None, maingroup: str = None
) -> list[str]:
"""List all distinct subgroups for a given feature type, parameter, and optionally maingroup."""
params = {
"request": "getfeature",
"outputformat": "json",
"typenames": "timeseries",
"distinct_on": "subgroup",
"token": token,
"world": world,
}
response = requests.get(URL_WFS, params=params)
response.raise_for_status()
features = response.json()["features"]
if feature_type:
features = [x for x in features if x["properties"]["feature_type"] == feature_type]
if parameter:
features = [x for x in features if x["properties"]["parameter"] == parameter]
if maingroup:
features = [x for x in features if x["properties"].get("maingroup") == maingroup]
return sorted(set(x["properties"]["subgroup"] for x in features if x["properties"].get("subgroup")))
def get_locations_and_timeseries_ids(full_data: bool = False) -> list[dict]:
"""Fetch locations and their timeseries IDs using the WFS feature_event view.
The cql_filter limits results to locations that have a timeseries for the chosen parameter and that
actually have events in the requested time range.
full_data:
- if False (default), each feature is trimmed to only the properties needed downstream (timeseries_id,
id_src, label), reducing memory usage.
- if True, it returns the full GeoJSON feature including geometry and all properties.
"""
startindex = 0
count = 5000
cql_filter = f"timeseries_parameter = '{PARAMETER}' AND event_timestamp during {TIME_RANGE}"
if TIMESERIES_MAINGROUP:
cql_filter += f" AND maingroup = '{TIMESERIES_MAINGROUP}'"
if TIMESERIES_SUBGROUP:
cql_filter += f" AND subgroup = '{TIMESERIES_SUBGROUP}'"
features = []
while True:
print(f"Downloading locations {startindex} to {startindex + count}.")
params = {
"world": WORLD_ALIAS,
"request": "getfeature",
"typenames": FEATURE_TYPE,
"view": "feature_event",
"cql_filter": cql_filter,
"count": count,
"startindex": startindex,
"outputformat": "json",
"token": WEB_API_TOKEN,
}
response = requests.get(URL_WFS, params=params)
response.raise_for_status()
data = response.json()["features"]
if not full_data:
# Skip fields like 'geometry' to keep this function fast and memory efficient
needed = {"timeseries_id", "id_src", "label", "maingroup", "subgroup"}
data = [{"properties": {k: v for k, v in x["properties"].items() if k in needed}} for x in data]
features.extend(data)
startindex += count
if not data:
break
print(f"Found {len(features)} locations.")
return features
def make_progress_printer(total_locations: int):
"""Return a function that prints progress.
>1000 locations: prints every location as a count (1/5000, 2/5000, ...).
<=1000 locations: prints every 10% (10%, 20%, ...).
"""
if total_locations > 1000:
def print_progress(i: int) -> None:
print(f" {i + 1}/{total_locations}")
else:
last_pct = [0]
def print_progress(i: int) -> None:
pct = (i + 1) * 100 // total_locations
if pct >= last_pct[0] + 10:
print(f" {pct}% ({i + 1}/{total_locations} locations)")
last_pct[0] = pct
return print_progress
def fetch_and_save_events(features: list, output_dir: Path) -> None:
"""Fetch timeseries events for each location and write them to CSV files.
Locations are grouped in batches of LOCATIONS_PER_CSV_FILE and written to nexus_locations_batch1.csv,
nexus_locations_batch2.csv, etc.
"""
# Guard against bulk downloads that exceed the Web API's recommended limit.
if len(features) > 1000:
msg = (
f"Found {len(features)} locations. For downloading timeseries consider using the Nexus Expert API "
f"instead the Web API as the Expert API is better suited for bulk downloads of this size."
)
if MAX_LOCATIONS and MAX_LOCATIONS <= 1000:
print(f"WARNING: {msg}")
else:
raise AssertionError(msg)
if MAX_LOCATIONS:
features = features[:MAX_LOCATIONS]
batch_dfs = []
batch_num = 1
print_progress = make_progress_printer(len(features))
for i, feature in enumerate(features):
print_progress(i)
properties = feature["properties"]
location = properties.get("label", properties["id_src"])
params = {
"world": WORLD_ALIAS,
"timeseries_id": properties["timeseries_id"],
"range": TIME_RANGE,
"token": WEB_API_TOKEN,
}
response = requests.get(URL_EVENTS, params=params)
response.raise_for_status()
df = pd.read_csv(StringIO(response.text), parse_dates=["timestamp"])
batch_dfs.append(df.set_index("timestamp")["value"].rename(location))
if len(batch_dfs) == LOCATIONS_PER_CSV_FILE:
# Batch is full: write it to a CSV file and start a new batch.
pd.concat(batch_dfs, axis=1).to_csv(output_dir / f"nexus_locations_{PARAMETER}_batch{batch_num}.csv")
batch_num += 1
batch_dfs = []
if batch_dfs:
# Write any remaining locations that didn't fill a full batch.
pd.concat(batch_dfs, axis=1).to_csv(output_dir / f"nexus_locations_{PARAMETER}_batch{batch_num}.csv")
def print_preview(output_dir: Path) -> None:
"""Print a preview of the first 5 locations from the first CSV file as a sanity check.
Each location becomes a column, indexed by timestamp. Missing values become NaN when timeseries have
different lengths or timestamps.
"""
first_csv = sorted(output_dir.glob("nexus_locations_*.csv"))[0]
df = pd.read_csv(first_csv, index_col="timestamp", parse_dates=True)
print(df.iloc[:, :5].head())
def create_output_dir(custom_dir: str | None = None) -> Path:
"""Create a timestamped output directory and return its path.
custom_dir: parent directory in which to create the timestamped folder.
Must exist. If not set, the directory of this script is used.
"""
parent = Path(custom_dir) if custom_dir else Path(__file__).parent
if custom_dir and not parent.is_dir():
raise AssertionError(f"custom_dir '{custom_dir}' does not exist.")
if not os.access(parent, os.W_OK):
raise AssertionError(f"No write permission in '{parent}'.")
output_dir = parent / f"nexus_timeseries_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
output_dir.mkdir()
print(f"Created output directory: {output_dir}")
return output_dir
# Step 1: Validate settings before making any long-running API calls.
print("Step 1: Validating settings.")
output_dir = create_output_dir(OUTPUT_DIR or None)
feature_types = get_available_feature_types(world=WORLD_ALIAS, token=WEB_API_TOKEN)
if FEATURE_TYPE not in feature_types:
msg = f"FEATURE_TYPE '{FEATURE_TYPE}' does not exist in world '{WORLD_ALIAS}'"
raise AssertionError(f"{msg}. Choose from: {', '.join(sorted(feature_types.keys()))}.")
parameters = get_available_timeseries_parameters(world=WORLD_ALIAS, token=WEB_API_TOKEN, feature_type=FEATURE_TYPE)
if PARAMETER not in parameters:
msg = f"PARAMETER '{PARAMETER}' does not exist in world '{WORLD_ALIAS}' for feature_type '{FEATURE_TYPE}'"
raise AssertionError(f"{msg}. Choose from: {', '.join(sorted(parameters.keys()))}.")
if TIMESERIES_MAINGROUP:
maingroups = get_available_timeseries_maingroups(
world=WORLD_ALIAS, token=WEB_API_TOKEN, feature_type=FEATURE_TYPE, parameter=PARAMETER
)
if TIMESERIES_MAINGROUP not in maingroups:
raise AssertionError(
f"TIMESERIES_MAINGROUP '{TIMESERIES_MAINGROUP}' does not exist in world '{WORLD_ALIAS}' for feature_type "
f"'{FEATURE_TYPE}' and parameter '{PARAMETER}'. Choose from: {', '.join(maingroups)}."
)
if TIMESERIES_SUBGROUP:
subgroups = get_available_timeseries_subgroups(
world=WORLD_ALIAS,
token=WEB_API_TOKEN,
feature_type=FEATURE_TYPE,
parameter=PARAMETER,
maingroup=TIMESERIES_MAINGROUP or None,
)
if TIMESERIES_SUBGROUP not in subgroups:
raise AssertionError(
f"TIMESERIES_SUBGROUP '{TIMESERIES_SUBGROUP}' does not exist in world '{WORLD_ALIAS}' for feature_type "
f"'{FEATURE_TYPE}' and parameter '{PARAMETER}'. Choose from: {', '.join(subgroups)}."
)
# Step 2: Find locations and their timeseries IDs.
print("Step 2: Fetching locations and timeseries IDs.")
features = get_locations_and_timeseries_ids(full_data=False)
# Step 3: Fetch events per location from the Events API and write each to a CSV file.
print("Step 3: Fetching timeseries events.")
fetch_and_save_events(features, output_dir)
# Step 4: Print a preview of the first 5 locations.
print("Step 4: Preview of first 5 locations.")
print_preview(output_dir)
csv_count = len(list(output_dir.glob("nexus_locations_*.csv")))
print(f"Done. {csv_count} CSV file(s) written to: {output_dir}")