zarrio package#

Submodules#

zarrio.api module#

Public API for zarrio.

class zarrio.api.ZarrConverter(config: ZarrConverterConfig | None = None, **kwargs)[source]#

Bases: object

Main class for converting data to Zarr format with retry logic.

__init__(config: ZarrConverterConfig | None = None, **kwargs)[source]#

Initialize the ZarrConverter.

Parameters:
  • config – Pydantic configuration object

  • **kwargs – Backward compatibility parameters

classmethod from_config_file(config_path: str | Path) ZarrConverter[source]#

Create ZarrConverter from configuration file.

Parameters:

config_path – Path to configuration file (YAML or JSON)

Returns:

ZarrConverter instance

property conn: None#

Datamesh connector.

property use_datamesh_zarr_client: bool#

Whether to use the datamesh zarr client.

create_template(template_dataset: Dataset, output_path: str | Path, global_start: Any | None = None, global_end: Any | None = None, freq: str | None = None, compute: bool = False, cycle: Any | None = None, intelligent_chunking: bool = False, access_pattern: str = 'balanced') None[source]#

Create a template Zarr archive for parallel writing.

Parameters:
  • template_dataset – Dataset to use as template for structure and metadata

  • output_path – Path to output Zarr store

  • global_start – Start time for the full archive

  • global_end – End time for the full archive

  • freq – Frequency for time coordinate (inferred from template if not provided)

  • compute – Whether to compute immediately (False for template only)

  • cycle – Cycle information for datamesh

  • intelligent_chunking – Whether to perform intelligent chunking based on full archive dimensions

  • access_pattern – Access pattern for chunking optimization (“temporal”, “spatial”, “balanced”)

write_region(input_path: str | Path, zarr_path: str | Path, region: Dict[str, slice] | None = None, variables: list | None = None, drop_variables: list | None = None, cycle: Any | None = None) None[source]#

Write data to a specific region of an existing Zarr store with retry logic.

Parameters:
  • input_path – Path to input file

  • zarr_path – Path to existing Zarr store

  • region – Dictionary specifying the region to write to

  • variables – List of variables to include (None for all)

  • drop_variables – List of variables to exclude

  • cycle – Cycle information for datamesh

convert(input_path: str | Path, output_path: str | Path | None = None, variables: list | None = None, drop_variables: list | None = None, attrs: Dict[str, Any] | None = None, cycle: Any | None = None) None[source]#

Convert input data to Zarr format with retry logic.

Parameters:
  • input_path – Path to input file

  • output_path – Path to output Zarr store (optional if using datamesh)

  • variables – List of variables to include (None for all)

  • drop_variables – List of variables to exclude

  • attrs – Additional global attributes to add

  • cycle – Cycle information for datamesh

append(input_path: str | Path, zarr_path: str | Path, variables: list | None = None, drop_variables: list | None = None) None[source]#

Append data to an existing Zarr store with retry logic.

Parameters:
  • input_path – Path to input file

  • zarr_path – Path to existing Zarr store

  • variables – List of variables to include (None for all)

  • drop_variables – List of variables to exclude

class zarrio.api.Packer(nbits: int = 16)[source]#

Bases: object

Handles data packing using fixed-scale offset encoding.

__init__(nbits: int = 16)[source]#

Initialize the Packer.

Parameters:

nbits – Number of bits for packing (8, 16, 32)

compute_scale_and_offset(vmin: float, vmax: float) tuple[source]#

Compute scale and offset for fixed-scale offset encoding.

Parameters:
  • vmin – Minimum value

  • vmax – Maximum value

Returns:

Tuple of (scale_factor, offset)

setup_encoding(ds: Dataset, variables: List[str] | None = None, manual_ranges: Dict[str, Dict[str, float]] | None = None, auto_buffer_factor: float = 0.01, check_range_exceeded: bool = True, range_exceeded_action: str = 'warn') Dict[str, Any][source]#

Setup encoding for dataset variables with enhanced packing options.

Priority order for determining min/max values: 1. Manual ranges (if provided) 2. Variable attributes (valid_min/valid_max) 3. Automatic calculation from data

Parameters:
  • ds – Dataset to setup encoding for

  • variables – List of variables to pack (None for all numeric variables)

  • manual_ranges – Dictionary specifying manual min/max values e.g., {“temperature”: {“min”: 0, “max”: 100}}

  • auto_buffer_factor – Buffer factor for automatically calculated ranges

  • check_range_exceeded – Whether to check if data exceeds specified ranges

  • range_exceeded_action – Action when data exceeds range (“warn”, “error”, “ignore”)

Returns:

Dictionary of encoding specifications

add_valid_range_attributes(ds: Dataset, buffer_factor: float = 0.01, variables: List[str] | None = None) Dataset[source]#

Add valid_min and valid_max attributes to variables based on their data range.

Parameters:
  • ds – Dataset to add attributes to

  • buffer_factor – Factor to extend range by (e.g., 0.01 = 1% buffer)

  • variables – List of variables to process (None for all numeric variables)

Returns:

Dataset with added attributes

class zarrio.api.TimeManager(time_dim: str = 'time')[source]#

Bases: object

Handles time series operations.

__init__(time_dim: str = 'time')[source]#

Initialize the TimeManager.

Parameters:

time_dim – Name of the time dimension

remove_duplicates(ds: Dataset) Dataset[source]#

Remove duplicate time values from dataset.

Parameters:

ds – Dataset to remove duplicates from

Returns:

Dataset with duplicates removed

get_time_bounds(ds: Dataset) Tuple[datetime64, datetime64][source]#

Get the start and end times from a dataset.

Parameters:

ds – Dataset to get time bounds from

Returns:

Tuple of (start_time, end_time)

align_for_append(existing_ds: Dataset, new_ds: Dataset) Dataset[source]#

Align a new dataset with an existing dataset for appending.

Parameters:
  • existing_ds – Existing dataset

  • new_ds – New dataset to append

Returns:

Aligned new dataset

interpolate_irregular_times(ds: Dataset) Dataset[source]#

Interpolate irregular time steps to regular intervals.

Parameters:

ds – Dataset with potentially irregular time steps

Returns:

Dataset with regular time steps

add_time_attributes(ds: Dataset) Dataset[source]#

Add time-related attributes to dataset.

Parameters:

ds – Dataset to add attributes to

Returns:

Dataset with added attributes

class zarrio.api.Config(config_dict: Dict[str, Any] | None = None)[source]#

Bases: object

Configuration management class.

__init__(config_dict: Dict[str, Any] | None = None)[source]#

Initialize configuration.

Parameters:

config_dict – Dictionary of configuration values

classmethod from_file(file_path: str) Config[source]#

Load configuration from file.

Parameters:

file_path – Path to configuration file

Returns:

Config instance

get(key: str, default: Any | None = None) Any[source]#

Get configuration value.

Parameters:
  • key – Configuration key

  • default – Default value if key not found

Returns:

Configuration value

set(key: str, value: Any) None[source]#

Set configuration value.

Parameters:
  • key – Configuration key

  • value – Configuration value

to_dict() Dict[str, Any][source]#

Convert configuration to dictionary.

Returns:

Configuration dictionary

__getitem__(key: str) Any[source]#

Get configuration value using bracket notation.

__setitem__(key: str, value: Any) None[source]#

Set configuration value using bracket notation.

__contains__(key: str) bool[source]#

Check if key exists in configuration.

class zarrio.api.ZarrConverterConfig(*, chunking: ~zarrio.models.ChunkingConfig = <factory>, compression: ~zarrio.models.CompressionConfig | None = None, packing: ~zarrio.models.PackingConfig = <factory>, time: ~zarrio.models.TimeConfig = <factory>, variables: ~zarrio.models.VariableConfig = <factory>, missing_data: ~zarrio.models.MissingDataConfig = <factory>, datamesh: ~zarrio.models.DatameshConfig | None = None, attrs: ~typing.Dict[str, ~typing.Any] = <factory>, target_chunk_size_mb: int | None = None, access_pattern: str = 'balanced', retries_on_missing: ~typing.Annotated[int, ~annotated_types.Ge(ge=0)] = 0, missing_check_vars: str | ~typing.List[str] | None = 'all')[source]#

Bases: BaseModel

Main configuration for ZarrConverter.

chunking: ChunkingConfig#
compression: CompressionConfig | None#
packing: PackingConfig#
time: TimeConfig#
variables: VariableConfig#
missing_data: MissingDataConfig#
datamesh: DatameshConfig | None#
attrs: Dict[str, Any]#
target_chunk_size_mb: int | None#
access_pattern: str#
retries_on_missing: int#
missing_check_vars: str | List[str] | None#
classmethod validate_access_pattern(v: str) str[source]#
classmethod validate_retries_on_missing(v: int) int[source]#
classmethod validate_missing_check_vars(v: str | List[str] | None) str | List[str] | None[source]#
classmethod validate_datamesh(v: DatameshConfig | None) DatameshConfig | None[source]#
classmethod from_yaml_file(config_path: str | Path) ZarrConverterConfig[source]#

Load configuration from YAML file.

classmethod from_json_file(config_path: str | Path) ZarrConverterConfig[source]#

Load configuration from JSON file.

to_yaml_file(config_path: str | Path) None[source]#

Save configuration to YAML file.

to_json_file(config_path: str | Path) None[source]#

Save configuration to JSON file.

model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class zarrio.api.ChunkingConfig(*, time: int | None = None, lat: int | None = None, lon: int | None = None, depth: int | None = None, **extra_data: Any)[source]#

Bases: BaseModel

Configuration for data chunking.

model_config: ClassVar[ConfigDict] = {'extra': 'allow'}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

time: int | None#
lat: int | None#
lon: int | None#
depth: int | None#
class zarrio.api.PackingConfig(*, enabled: bool = False, bits: Annotated[int, Ge(ge=8), Le(le=32)] = 16, manual_ranges: Dict[str, Dict[str, float]] | None = None, auto_buffer_factor: Annotated[float, Ge(ge=0)] = 0.01, check_range_exceeded: bool = True, range_exceeded_action: str = 'warn')[source]#

Bases: BaseModel

Configuration for data packing.

enabled: bool#
bits: int#
manual_ranges: Dict[str, Dict[str, float]] | None#
auto_buffer_factor: float#
check_range_exceeded: bool#
range_exceeded_action: str#
classmethod validate_bits(v: int) int[source]#
classmethod validate_range_exceeded_action(v: str) str[source]#
model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class zarrio.api.CompressionConfig(*, method: str | None = None, cname: str = 'zstd', clevel: Annotated[int, Ge(ge=0), Le(le=9)] = 1, shuffle: str = 'shuffle')[source]#

Bases: BaseModel

Configuration for data compression.

method: str | None#
cname: str#
clevel: int#
shuffle: str#
model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class zarrio.api.TimeConfig(*, dim: str = 'time', append_dim: str = 'time', global_start: str | datetime | None = None, global_end: str | datetime | None = None, freq: str | None = None)[source]#

Bases: BaseModel

Configuration for time handling.

dim: str#
append_dim: str#
global_start: str | datetime | None#
global_end: str | datetime | None#
freq: str | None#
model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class zarrio.api.VariableConfig(*, include: List[str] | None = None, exclude: List[str] | None = None)[source]#

Bases: BaseModel

Configuration for variable handling.

include: List[str] | None#
exclude: List[str] | None#
model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class zarrio.api.MissingDataConfig(*, check_vars: str | List[str] | None = 'all', retries_on_missing: Annotated[int, Ge(ge=0)] = 0, missing_check_vars: str | List[str] | None = 'all')[source]#

Bases: BaseModel

Configuration for missing data handling.

check_vars: str | List[str] | None#
retries_on_missing: int#
missing_check_vars: str | List[str] | None#
classmethod validate_check_vars(v: str | List[str] | None) str | List[str] | None[source]#
classmethod validate_missing_check_vars(v: str | List[str] | None) str | List[str] | None[source]#
model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

zarrio.api.convert_to_zarr(input_path: str | Path, output_path: str | Path | None = None, chunking: Dict[str, int] | None = None, compression: str | None = None, packing: bool = False, packing_bits: int = 16, packing_manual_ranges: Dict[str, Dict[str, float]] | None = None, packing_auto_buffer_factor: float = 0.01, packing_check_range_exceeded: bool = True, packing_range_exceeded_action: str = 'warn', variables: list | None = None, drop_variables: list | None = None, attrs: Dict[str, Any] | None = None, time_dim: str = 'time', retries_on_missing: int = 0, missing_check_vars: str | List[str] | None = 'all', datamesh_datasource: Dict[str, Any] | None = None, datamesh_token: str | None = None, datamesh_service: str = 'https://datamesh-v1.oceanum.io') None[source]#

Convert data to Zarr format using default settings with retry logic.

Parameters:
  • input_path – Path to input file

  • output_path – Path to output Zarr store (optional if using datamesh)

  • chunking – Dictionary specifying chunk sizes for dimensions

  • compression – Compression specification

  • packing – Whether to enable data packing

  • packing_bits – Number of bits for packing

  • packing_manual_ranges – Manual min/max ranges for variables

  • packing_auto_buffer_factor – Buffer factor for automatically calculated ranges

  • packing_check_range_exceeded – Whether to check if data exceeds specified ranges

  • packing_range_exceeded_action – Action when data exceeds range (“warn”, “error”, “ignore”)

  • variables – List of variables to include

  • drop_variables – List of variables to exclude

  • attrs – Additional global attributes

  • time_dim – Name of the time dimension

  • retries_on_missing – Number of retries if missing values are encountered

  • missing_check_vars – Data variables to check for missing values

  • datamesh_datasource – Datamesh datasource configuration

  • datamesh_token – Datamesh token for authentication

  • datamesh_service – Datamesh service URL

zarrio.api.append_to_zarr(input_path: str | Path, zarr_path: str | Path, chunking: Dict[str, int] | None = None, variables: list | None = None, drop_variables: list | None = None, append_dim: str = 'time', time_dim: str = 'time', retries_on_missing: int = 0, missing_check_vars: str | List[str] | None = 'all', datamesh_datasource: Dict[str, Any] | None = None, datamesh_token: str | None = None, datamesh_service: str = 'https://datamesh-v1.oceanum.io') None[source]#

Append data to an existing Zarr store with retry logic.

Parameters:
  • input_path – Path to input file

  • zarr_path – Path to existing Zarr store

  • chunking – Dictionary specifying chunk sizes for dimensions

  • variables – List of variables to include

  • drop_variables – List of variables to exclude

  • append_dim – Dimension to append along

  • time_dim – Name of the time dimension

  • retries_on_missing – Number of retries if missing values are encountered

  • missing_check_vars – Data variables to check for missing values

  • datamesh_datasource – Datamesh datasource configuration

  • datamesh_token – Datamesh token for authentication

  • datamesh_service – Datamesh service URL

exception zarrio.api.OnzarrError[source]#

Bases: Exception

Base exception for zarrio.

exception zarrio.api.ConversionError[source]#

Bases: OnzarrError

Raised when data conversion fails.

exception zarrio.api.PackingError[source]#

Bases: OnzarrError

Raised when data packing fails.

exception zarrio.api.TimeAlignmentError[source]#

Bases: OnzarrError

Raised when time alignment fails.

exception zarrio.api.ConfigurationError[source]#

Bases: OnzarrError

Raised when configuration is invalid.

zarrio.chunking module#

Chunking analysis and validation for zarrio.

This module provides intelligent chunking recommendations based on: - Dataset dimensions - Data type size - Access patterns (temporal, spatial, balanced) - Configurable target chunk sizes for different environments

The intelligent chunking system automatically recommends optimal chunk sizes to achieve the target chunk size (default 50 MB) while considering the dataset dimensions and access patterns.

Target chunk size can be configured in multiple ways: 1. As a function argument: get_chunk_recommendation(…, target_chunk_size_mb=100) 2. As an environment variable: ZARRIFY_TARGET_CHUNK_SIZE_MB=200 3. In ZarrConverterConfig: ZarrConverterConfig(target_chunk_size_mb=50)

Environment-specific recommendations: - Local development: 10-25 MB - Production servers: 50-100 MB - Cloud environments: 100-200 MB

class zarrio.chunking.ChunkRecommendation(chunks: Dict[str, int], strategy: str, estimated_chunk_size_mb: float, warnings: list, notes: list)[source]#

Bases: object

Recommendation for chunking strategy.

chunks: Dict[str, int]#
strategy: str#
estimated_chunk_size_mb: float#
warnings: list#
notes: list#
__init__(chunks: Dict[str, int], strategy: str, estimated_chunk_size_mb: float, warnings: list, notes: list) None#
class zarrio.chunking.ChunkAnalyzer(target_chunk_size_mb: int | None = None)[source]#

Bases: object

Analyzes and recommends chunking strategies for climate data.

DEFAULT_MIN_CHUNK_SIZE_MB = 1#
DEFAULT_TARGET_CHUNK_SIZE_MB = 50#
DEFAULT_MAX_CHUNK_SIZE_MB = 100#
SMALL_CHUNK_WARNING_MB = 1#
LARGE_CHUNK_WARNING_MB = 100#
__init__(target_chunk_size_mb: int | None = None)[source]#

Initialize chunk analyzer.

Parameters:

target_chunk_size_mb – Target chunk size in MB. If None, uses DEFAULT_TARGET_CHUNK_SIZE_MB.

analyze_chunking(dimensions: Dict[str, int], dtype_size_bytes: int = 4, access_pattern: str = 'balanced') ChunkRecommendation[source]#

Analyze dimensions and recommend chunking strategy.

Parameters:
  • dimensions – Dictionary of dimension names and sizes

  • dtype_size_bytes – Size of data type in bytes (default: 4 for float32)

  • access_pattern – Expected access pattern (“temporal”, “spatial”, “balanced”)

Returns:

ChunkRecommendation with recommended strategy

validate_user_chunking(user_chunks: Dict[str, int], dimensions: Dict[str, int], dtype_size_bytes: int = 4) Dict[str, Any][source]#

Validate user-provided chunking and provide feedback.

Parameters:
  • user_chunks – User-provided chunk sizes

  • dimensions – Dataset dimensions

  • dtype_size_bytes – Size of data type in bytes

Returns:

Dictionary with validation results

zarrio.chunking.get_chunk_recommendation(dimensions: Dict[str, int], dtype_size_bytes: int = 4, access_pattern: str = 'balanced', target_chunk_size_mb: int | None = None) ChunkRecommendation[source]#

Get chunking recommendation for given dimensions.

Parameters:
  • dimensions – Dictionary of dimension names and sizes

  • dtype_size_bytes – Size of data type in bytes (default: 4 for float32)

  • access_pattern – Expected access pattern (“temporal”, “spatial”, “balanced”)

  • target_chunk_size_mb – Target chunk size in MB. If None, uses default or environment variable.

Returns:

ChunkRecommendation with recommended strategy

zarrio.chunking.validate_chunking(user_chunks: Dict[str, int], dimensions: Dict[str, int], dtype_size_bytes: int = 4, target_chunk_size_mb: int | None = None) Dict[str, Any][source]#

Validate user-provided chunking.

Parameters:
  • user_chunks – User-provided chunk sizes

  • dimensions – Dataset dimensions

  • dtype_size_bytes – Size of data type in bytes

  • target_chunk_size_mb – Target chunk size in MB. If None, uses default or environment variable.

Returns:

Dictionary with validation results

zarrio.cli module#

Command-line interface for zarrio with Pydantic configuration support.

zarrio.cli.setup_logging(verbosity: int = 0) None[source]#

Setup logging based on verbosity level.

zarrio.cli.parse_chunking(chunking_str: str) Dict[str, int][source]#

Parse chunking string to dictionary.

zarrio.cli.convert_command(args: Namespace) None[source]#

Handle convert command.

zarrio.cli.append_command(args: Namespace) None[source]#

Handle append command.

zarrio.cli.create_template_command(args: Namespace) None[source]#

Handle create-template command.

zarrio.cli.write_region_command(args: Namespace) None[source]#

Handle write-region command.

zarrio.cli.analyze_command(args: Namespace) None[source]#

Handle analyze command.

zarrio.cli.main() None[source]#

Main entry point for CLI.

zarrio.config module#

Configuration management for zarrio.

class zarrio.config.Config(config_dict: Dict[str, Any] | None = None)[source]#

Bases: object

Configuration management class.

__init__(config_dict: Dict[str, Any] | None = None)[source]#

Initialize configuration.

Parameters:

config_dict – Dictionary of configuration values

classmethod from_file(file_path: str) Config[source]#

Load configuration from file.

Parameters:

file_path – Path to configuration file

Returns:

Config instance

get(key: str, default: Any | None = None) Any[source]#

Get configuration value.

Parameters:
  • key – Configuration key

  • default – Default value if key not found

Returns:

Configuration value

set(key: str, value: Any) None[source]#

Set configuration value.

Parameters:
  • key – Configuration key

  • value – Configuration value

to_dict() Dict[str, Any][source]#

Convert configuration to dictionary.

Returns:

Configuration dictionary

__getitem__(key: str) Any[source]#

Get configuration value using bracket notation.

__setitem__(key: str, value: Any) None[source]#

Set configuration value using bracket notation.

__contains__(key: str) bool[source]#

Check if key exists in configuration.

zarrio.core module#

Enhanced core functionality for zarrio with retry logic for missing data and datamesh support.

class zarrio.core.ZarrConverter(config: ZarrConverterConfig | None = None, **kwargs)[source]#

Bases: object

Main class for converting data to Zarr format with retry logic.

__init__(config: ZarrConverterConfig | None = None, **kwargs)[source]#

Initialize the ZarrConverter.

Parameters:
  • config – Pydantic configuration object

  • **kwargs – Backward compatibility parameters

classmethod from_config_file(config_path: str | Path) ZarrConverter[source]#

Create ZarrConverter from configuration file.

Parameters:

config_path – Path to configuration file (YAML or JSON)

Returns:

ZarrConverter instance

property conn: None#

Datamesh connector.

property use_datamesh_zarr_client: bool#

Whether to use the datamesh zarr client.

create_template(template_dataset: Dataset, output_path: str | Path, global_start: Any | None = None, global_end: Any | None = None, freq: str | None = None, compute: bool = False, cycle: Any | None = None, intelligent_chunking: bool = False, access_pattern: str = 'balanced') None[source]#

Create a template Zarr archive for parallel writing.

Parameters:
  • template_dataset – Dataset to use as template for structure and metadata

  • output_path – Path to output Zarr store

  • global_start – Start time for the full archive

  • global_end – End time for the full archive

  • freq – Frequency for time coordinate (inferred from template if not provided)

  • compute – Whether to compute immediately (False for template only)

  • cycle – Cycle information for datamesh

  • intelligent_chunking – Whether to perform intelligent chunking based on full archive dimensions

  • access_pattern – Access pattern for chunking optimization (“temporal”, “spatial”, “balanced”)

write_region(input_path: str | Path, zarr_path: str | Path, region: Dict[str, slice] | None = None, variables: list | None = None, drop_variables: list | None = None, cycle: Any | None = None) None[source]#

Write data to a specific region of an existing Zarr store with retry logic.

Parameters:
  • input_path – Path to input file

  • zarr_path – Path to existing Zarr store

  • region – Dictionary specifying the region to write to

  • variables – List of variables to include (None for all)

  • drop_variables – List of variables to exclude

  • cycle – Cycle information for datamesh

convert(input_path: str | Path, output_path: str | Path | None = None, variables: list | None = None, drop_variables: list | None = None, attrs: Dict[str, Any] | None = None, cycle: Any | None = None) None[source]#

Convert input data to Zarr format with retry logic.

Parameters:
  • input_path – Path to input file

  • output_path – Path to output Zarr store (optional if using datamesh)

  • variables – List of variables to include (None for all)

  • drop_variables – List of variables to exclude

  • attrs – Additional global attributes to add

  • cycle – Cycle information for datamesh

append(input_path: str | Path, zarr_path: str | Path, variables: list | None = None, drop_variables: list | None = None) None[source]#

Append data to an existing Zarr store with retry logic.

Parameters:
  • input_path – Path to input file

  • zarr_path – Path to existing Zarr store

  • variables – List of variables to include (None for all)

  • drop_variables – List of variables to exclude

zarrio.core.convert_to_zarr(input_path: str | Path, output_path: str | Path | None = None, chunking: Dict[str, int] | None = None, compression: str | None = None, packing: bool = False, packing_bits: int = 16, packing_manual_ranges: Dict[str, Dict[str, float]] | None = None, packing_auto_buffer_factor: float = 0.01, packing_check_range_exceeded: bool = True, packing_range_exceeded_action: str = 'warn', variables: list | None = None, drop_variables: list | None = None, attrs: Dict[str, Any] | None = None, time_dim: str = 'time', retries_on_missing: int = 0, missing_check_vars: str | List[str] | None = 'all', datamesh_datasource: Dict[str, Any] | None = None, datamesh_token: str | None = None, datamesh_service: str = 'https://datamesh-v1.oceanum.io') None[source]#

Convert data to Zarr format using default settings with retry logic.

Parameters:
  • input_path – Path to input file

  • output_path – Path to output Zarr store (optional if using datamesh)

  • chunking – Dictionary specifying chunk sizes for dimensions

  • compression – Compression specification

  • packing – Whether to enable data packing

  • packing_bits – Number of bits for packing

  • packing_manual_ranges – Manual min/max ranges for variables

  • packing_auto_buffer_factor – Buffer factor for automatically calculated ranges

  • packing_check_range_exceeded – Whether to check if data exceeds specified ranges

  • packing_range_exceeded_action – Action when data exceeds range (“warn”, “error”, “ignore”)

  • variables – List of variables to include

  • drop_variables – List of variables to exclude

  • attrs – Additional global attributes

  • time_dim – Name of the time dimension

  • retries_on_missing – Number of retries if missing values are encountered

  • missing_check_vars – Data variables to check for missing values

  • datamesh_datasource – Datamesh datasource configuration

  • datamesh_token – Datamesh token for authentication

  • datamesh_service – Datamesh service URL

zarrio.core.append_to_zarr(input_path: str | Path, zarr_path: str | Path, chunking: Dict[str, int] | None = None, variables: list | None = None, drop_variables: list | None = None, append_dim: str = 'time', time_dim: str = 'time', retries_on_missing: int = 0, missing_check_vars: str | List[str] | None = 'all', datamesh_datasource: Dict[str, Any] | None = None, datamesh_token: str | None = None, datamesh_service: str = 'https://datamesh-v1.oceanum.io') None[source]#

Append data to an existing Zarr store with retry logic.

Parameters:
  • input_path – Path to input file

  • zarr_path – Path to existing Zarr store

  • chunking – Dictionary specifying chunk sizes for dimensions

  • variables – List of variables to include

  • drop_variables – List of variables to exclude

  • append_dim – Dimension to append along

  • time_dim – Name of the time dimension

  • retries_on_missing – Number of retries if missing values are encountered

  • missing_check_vars – Data variables to check for missing values

  • datamesh_datasource – Datamesh datasource configuration

  • datamesh_token – Datamesh token for authentication

  • datamesh_service – Datamesh service URL

zarrio.exceptions module#

Custom exceptions for zarrio.

exception zarrio.exceptions.OnzarrError[source]#

Bases: Exception

Base exception for zarrio.

exception zarrio.exceptions.ConversionError[source]#

Bases: OnzarrError

Raised when data conversion fails.

exception zarrio.exceptions.PackingError[source]#

Bases: OnzarrError

Raised when data packing fails.

exception zarrio.exceptions.TimeAlignmentError[source]#

Bases: OnzarrError

Raised when time alignment fails.

exception zarrio.exceptions.ConfigurationError[source]#

Bases: OnzarrError

Raised when configuration is invalid.

exception zarrio.exceptions.RetryLimitExceededError[source]#

Bases: OnzarrError

Raised when retry limit is exceeded.

zarrio.missing module#

Missing data detection and retry logic for zarrio.

class zarrio.missing.MissingDataHandler(missing_check_vars: str | List[str] | None = 'all', retries_on_missing: int = 0, time_dim: str = 'time')[source]#

Bases: object

Handles missing data detection and retry logic.

__init__(missing_check_vars: str | List[str] | None = 'all', retries_on_missing: int = 0, time_dim: str = 'time')[source]#

Initialize the MissingDataHandler.

Parameters:
  • missing_check_vars – Variables to check for missing values (“all”, None, or list)

  • retries_on_missing – Number of retries if missing values are encountered

  • time_dim – Name of the time dimension

has_missing(zarr_path: str | Any, input_dataset: Dataset, region: Dict[str, slice] | None = None) bool[source]#

Check data just written for missing values.

Parameters:
  • zarr_path – Path to Zarr store

  • input_dataset – Input dataset that was written

  • region – Region that was written to

Returns:

True if missing data is detected, False otherwise

handle_missing_data(zarr_path: str | Any, input_dataset: Dataset, region: Dict[str, slice] | None = None, write_func: Callable | None = None, **kwargs) bool[source]#

Handle missing data with retry logic.

Parameters:
  • zarr_path – Path to Zarr store

  • input_dataset – Input dataset that was written

  • region – Region that was written to

  • write_func – Function to call for retry

  • **kwargs – Additional arguments for write_func

Returns:

True if successful, False if retry limit exceeded

reset_retry_count() None[source]#

Reset the retry counter.

zarrio.missing.check_missing_data(zarr_path: str | Any, input_dataset: Dataset, missing_check_vars: str | List[str] | None = 'all', region: Dict[str, slice] | None = None, time_dim: str = 'time') bool[source]#

Check for missing data in a Zarr store.

Parameters:
  • zarr_path – Path to Zarr store

  • input_dataset – Input dataset that was written

  • missing_check_vars – Variables to check for missing values

  • region – Region that was written to

  • time_dim – Name of the time dimension

Returns:

True if missing data is detected, False otherwise

zarrio.missing.handle_missing_with_retry(zarr_path: str | Any, input_dataset: Dataset, write_func: Callable, missing_check_vars: str | List[str] | None = 'all', retries_on_missing: int = 0, region: Dict[str, slice] | None = None, time_dim: str = 'time', **kwargs) bool[source]#

Handle missing data with retry logic.

Parameters:
  • zarr_path – Path to Zarr store

  • input_dataset – Input dataset that was written

  • write_func – Function to call for retry

  • missing_check_vars – Variables to check for missing values

  • retries_on_missing – Number of retries if missing values are encountered

  • region – Region that was written to

  • time_dim – Name of the time dimension

  • **kwargs – Additional arguments for write_func

Returns:

True if successful, False if retry limit exceeded

zarrio.models module#

Pydantic models for zarrio configuration and data validation.

class zarrio.models.ChunkingConfig(*, time: int | None = None, lat: int | None = None, lon: int | None = None, depth: int | None = None, **extra_data: Any)[source]#

Bases: BaseModel

Configuration for data chunking.

model_config: ClassVar[ConfigDict] = {'extra': 'allow'}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

time: int | None#
lat: int | None#
lon: int | None#
depth: int | None#
class zarrio.models.PackingConfig(*, enabled: bool = False, bits: Annotated[int, Ge(ge=8), Le(le=32)] = 16, manual_ranges: Dict[str, Dict[str, float]] | None = None, auto_buffer_factor: Annotated[float, Ge(ge=0)] = 0.01, check_range_exceeded: bool = True, range_exceeded_action: str = 'warn')[source]#

Bases: BaseModel

Configuration for data packing.

enabled: bool#
bits: int#
manual_ranges: Dict[str, Dict[str, float]] | None#
auto_buffer_factor: float#
check_range_exceeded: bool#
range_exceeded_action: str#
classmethod validate_bits(v: int) int[source]#
classmethod validate_range_exceeded_action(v: str) str[source]#
model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class zarrio.models.CompressionConfig(*, method: str | None = None, cname: str = 'zstd', clevel: Annotated[int, Ge(ge=0), Le(le=9)] = 1, shuffle: str = 'shuffle')[source]#

Bases: BaseModel

Configuration for data compression.

method: str | None#
cname: str#
clevel: int#
shuffle: str#
model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class zarrio.models.TimeConfig(*, dim: str = 'time', append_dim: str = 'time', global_start: str | datetime | None = None, global_end: str | datetime | None = None, freq: str | None = None)[source]#

Bases: BaseModel

Configuration for time handling.

dim: str#
append_dim: str#
global_start: str | datetime | None#
global_end: str | datetime | None#
freq: str | None#
model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class zarrio.models.VariableConfig(*, include: List[str] | None = None, exclude: List[str] | None = None)[source]#

Bases: BaseModel

Configuration for variable handling.

include: List[str] | None#
exclude: List[str] | None#
model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class zarrio.models.MissingDataConfig(*, check_vars: str | List[str] | None = 'all', retries_on_missing: Annotated[int, Ge(ge=0)] = 0, missing_check_vars: str | List[str] | None = 'all')[source]#

Bases: BaseModel

Configuration for missing data handling.

check_vars: str | List[str] | None#
retries_on_missing: int#
missing_check_vars: str | List[str] | None#
classmethod validate_check_vars(v: str | List[str] | None) str | List[str] | None[source]#
classmethod validate_missing_check_vars(v: str | List[str] | None) str | List[str] | None[source]#
model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class zarrio.models.DatameshDatasource(*, id: str, name: str | None = None, description: str | None = None, coordinates: Dict[str, str] | None = None, details: str | None = None, tags: List[str] | None = None, driver: str = 'vzarr', dataschema: Dict[str, Any] | None = None, geometry: Dict[str, Any] | None = None, tstart: str | datetime | None = None, tend: str | datetime | None = None, **extra_data: Any)[source]#

Bases: BaseModel

Configuration for datamesh datasource.

Note: - When writing using the zarr client, the driver should remain “vzarr”. - When writing using the xarray API, the driver should be “zarr”. - All the Datasource fields that are set will be updated in datamesh, even if None. - The schema, geometry and the time range will be updated from the dataset if not

set. In order to avoid this, set them explicitly.

id: str#
name: str | None#
description: str | None#
coordinates: Dict[str, str] | None#
details: str | None#
tags: List[str] | None#
driver: str#
dataschema: Dict[str, Any] | None#
geometry: Dict[str, Any] | None#
tstart: str | datetime | None#
tend: str | datetime | None#
model_config: ClassVar[ConfigDict] = {'extra': 'allow'}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class zarrio.models.DatameshConfig(*, datasource: DatameshDatasource | Dict[str, Any] | None = None, token: str | None = None, service: str = 'https://datamesh-v1.oceanum.io', use_zarr_client: bool = True)[source]#

Bases: BaseModel

Configuration for datamesh integration.

datasource: DatameshDatasource | Dict[str, Any] | None#
token: str | None#
service: str#
use_zarr_client: bool#
model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class zarrio.models.ZarrConverterConfig(*, chunking: ~zarrio.models.ChunkingConfig = <factory>, compression: ~zarrio.models.CompressionConfig | None = None, packing: ~zarrio.models.PackingConfig = <factory>, time: ~zarrio.models.TimeConfig = <factory>, variables: ~zarrio.models.VariableConfig = <factory>, missing_data: ~zarrio.models.MissingDataConfig = <factory>, datamesh: ~zarrio.models.DatameshConfig | None = None, attrs: ~typing.Dict[str, ~typing.Any] = <factory>, target_chunk_size_mb: int | None = None, access_pattern: str = 'balanced', retries_on_missing: ~typing.Annotated[int, ~annotated_types.Ge(ge=0)] = 0, missing_check_vars: str | ~typing.List[str] | None = 'all')[source]#

Bases: BaseModel

Main configuration for ZarrConverter.

chunking: ChunkingConfig#
compression: CompressionConfig | None#
packing: PackingConfig#
time: TimeConfig#
variables: VariableConfig#
missing_data: MissingDataConfig#
datamesh: DatameshConfig | None#
attrs: Dict[str, Any]#
target_chunk_size_mb: int | None#
access_pattern: str#
retries_on_missing: int#
missing_check_vars: str | List[str] | None#
classmethod validate_access_pattern(v: str) str[source]#
classmethod validate_retries_on_missing(v: int) int[source]#
classmethod validate_missing_check_vars(v: str | List[str] | None) str | List[str] | None[source]#
classmethod validate_datamesh(v: DatameshConfig | None) DatameshConfig | None[source]#
classmethod from_yaml_file(config_path: str | Path) ZarrConverterConfig[source]#

Load configuration from YAML file.

classmethod from_json_file(config_path: str | Path) ZarrConverterConfig[source]#

Load configuration from JSON file.

to_yaml_file(config_path: str | Path) None[source]#

Save configuration to YAML file.

to_json_file(config_path: str | Path) None[source]#

Save configuration to JSON file.

model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

zarrio.models.load_config_from_file(config_path: str | Path) ZarrConverterConfig[source]#

Load configuration from YAML or JSON file.

zarrio.packing module#

Data packing functionality for zarrio.

class zarrio.packing.Packer(nbits: int = 16)[source]#

Bases: object

Handles data packing using fixed-scale offset encoding.

__init__(nbits: int = 16)[source]#

Initialize the Packer.

Parameters:

nbits – Number of bits for packing (8, 16, 32)

compute_scale_and_offset(vmin: float, vmax: float) tuple[source]#

Compute scale and offset for fixed-scale offset encoding.

Parameters:
  • vmin – Minimum value

  • vmax – Maximum value

Returns:

Tuple of (scale_factor, offset)

setup_encoding(ds: Dataset, variables: List[str] | None = None, manual_ranges: Dict[str, Dict[str, float]] | None = None, auto_buffer_factor: float = 0.01, check_range_exceeded: bool = True, range_exceeded_action: str = 'warn') Dict[str, Any][source]#

Setup encoding for dataset variables with enhanced packing options.

Priority order for determining min/max values: 1. Manual ranges (if provided) 2. Variable attributes (valid_min/valid_max) 3. Automatic calculation from data

Parameters:
  • ds – Dataset to setup encoding for

  • variables – List of variables to pack (None for all numeric variables)

  • manual_ranges – Dictionary specifying manual min/max values e.g., {“temperature”: {“min”: 0, “max”: 100}}

  • auto_buffer_factor – Buffer factor for automatically calculated ranges

  • check_range_exceeded – Whether to check if data exceeds specified ranges

  • range_exceeded_action – Action when data exceeds range (“warn”, “error”, “ignore”)

Returns:

Dictionary of encoding specifications

add_valid_range_attributes(ds: Dataset, buffer_factor: float = 0.01, variables: List[str] | None = None) Dataset[source]#

Add valid_min and valid_max attributes to variables based on their data range.

Parameters:
  • ds – Dataset to add attributes to

  • buffer_factor – Factor to extend range by (e.g., 0.01 = 1% buffer)

  • variables – List of variables to process (None for all numeric variables)

Returns:

Dataset with added attributes

zarrio.time module#

Time series handling for zarrio.

class zarrio.time.TimeManager(time_dim: str = 'time')[source]#

Bases: object

Handles time series operations.

__init__(time_dim: str = 'time')[source]#

Initialize the TimeManager.

Parameters:

time_dim – Name of the time dimension

remove_duplicates(ds: Dataset) Dataset[source]#

Remove duplicate time values from dataset.

Parameters:

ds – Dataset to remove duplicates from

Returns:

Dataset with duplicates removed

get_time_bounds(ds: Dataset) Tuple[datetime64, datetime64][source]#

Get the start and end times from a dataset.

Parameters:

ds – Dataset to get time bounds from

Returns:

Tuple of (start_time, end_time)

align_for_append(existing_ds: Dataset, new_ds: Dataset) Dataset[source]#

Align a new dataset with an existing dataset for appending.

Parameters:
  • existing_ds – Existing dataset

  • new_ds – New dataset to append

Returns:

Aligned new dataset

interpolate_irregular_times(ds: Dataset) Dataset[source]#

Interpolate irregular time steps to regular intervals.

Parameters:

ds – Dataset with potentially irregular time steps

Returns:

Dataset with regular time steps

add_time_attributes(ds: Dataset) Dataset[source]#

Add time-related attributes to dataset.

Parameters:

ds – Dataset to add attributes to

Returns:

Dataset with added attributes

Module contents#

zarrio - A modern library for converting scientific data to Zarr format.