Advanced Workflows
This guide covers advanced patterns and techniques for implementing complex wave modeling workflows with rompy-oceanum. Learn how to automate model execution, implement batch processing, ensemble modeling, and operational forecasting systems.
Overview
Advanced workflows in rompy-oceanum enable sophisticated modeling patterns including:
Batch Processing: Execute multiple model configurations in parallel or sequence
Ensemble Modeling: Run multiple model variants with different parameters
Operational Forecasting: Automated model execution for real-time forecasting
Data Pipeline Integration: Seamless integration with data processing workflows
Conditional Execution: Dynamic workflow control based on results and conditions
Batch Processing
Execute multiple wave models efficiently using batch processing patterns.
Basic Batch Execution
Process multiple configurations in parallel:
import asyncio
from rompy_oceanum import BatchProcessor
from rompy_oceanum.backends import PraxConfig
async def run_batch_models():
# Define multiple model configurations
configs = [
{"grid": "north_sea", "forcing": "era5"},
{"grid": "atlantic", "forcing": "gfs"},
{"grid": "pacific", "forcing": "ecmwf"}
]
# Configure batch processor
batch_processor = BatchProcessor(
backend="prax",
backend_config=PraxConfig(
org="your-org",
project="wave-forecasting",
stage="prod"
),
max_concurrent=3
)
# Execute batch
results = await batch_processor.run_batch(configs)
# Process results
for config, result in zip(configs, results):
if result.is_successful():
print(f"✓ {config['grid']} completed successfully")
else:
print(f"✗ {config['grid']} failed: {result.error_message}")
# Run the batch
asyncio.run(run_batch_models())
Sequential Processing
When models depend on each other or resources are limited:
from rompy_oceanum import SequentialProcessor
def run_sequential_workflow():
# Define workflow stages
stages = [
{
"name": "preprocessing",
"config": preprocess_config,
"dependencies": []
},
{
"name": "coarse_grid",
"config": coarse_grid_config,
"dependencies": ["preprocessing"]
},
{
"name": "fine_grid",
"config": fine_grid_config,
"dependencies": ["coarse_grid"]
}
]
processor = SequentialProcessor(backend="prax")
results = processor.run_workflow(stages)
return results
Conditional Batch Processing
Execute batches based on conditions:
from rompy_oceanum import ConditionalBatch
from datetime import datetime, timedelta
def run_conditional_forecast():
# Check if new forcing data is available
forcing_check = check_latest_forcing_data()
if forcing_check.is_updated():
# Define forecast ensemble
ensemble_configs = generate_ensemble_configs(
base_config=operational_config,
perturbations=[
{"wind_speed_factor": 1.1},
{"wind_speed_factor": 0.9},
{"wave_height_bias": 0.1},
{"wave_height_bias": -0.1}
]
)
# Execute ensemble
batch = ConditionalBatch(
condition=lambda: forcing_check.is_updated(),
retry_interval=timedelta(hours=6),
max_retries=4
)
results = batch.execute(ensemble_configs)
return results
Ensemble Modeling
Implement ensemble modeling for uncertainty quantification and improved forecasts.
Parameter Ensemble
Vary model parameters systematically:
from rompy_oceanum import EnsembleManager
from itertools import product
def create_parameter_ensemble():
# Define parameter ranges
wind_factors = [0.8, 0.9, 1.0, 1.1, 1.2]
friction_coeffs = [0.01, 0.015, 0.02, 0.025, 0.03]
# Generate all combinations
ensemble_configs = []
for wind_factor, friction in product(wind_factors, friction_coeffs):
config = base_config.copy()
config.update({
"physics": {
"wind_drag_coefficient": wind_factor * base_wind_drag,
"bottom_friction": friction
},
"ensemble_member": f"wind_{wind_factor}_friction_{friction}"
})
ensemble_configs.append(config)
# Execute ensemble
ensemble = EnsembleManager(
backend="prax",
ensemble_name="parameter_sensitivity",
max_concurrent=10
)
results = ensemble.run(ensemble_configs)
return ensemble.analyze_results(results)
Forcing Ensemble
Use multiple forcing datasets:
def create_forcing_ensemble():
forcing_sources = [
{"name": "era5", "source": "era5_winds.nc"},
{"name": "gfs", "source": "gfs_winds.nc"},
{"name": "ecmwf", "source": "ecmwf_winds.nc"}
]
ensemble_configs = []
for forcing in forcing_sources:
config = base_config.copy()
config["forcing"]["wind_file"] = forcing["source"]
config["ensemble_member"] = forcing["name"]
ensemble_configs.append(config)
return ensemble_configs
Multi-Model Ensemble
Combine different wave models:
def create_multi_model_ensemble():
models = [
{"type": "swan", "config": swan_config},
{"type": "wavewatch3", "config": ww3_config},
{"type": "mike21", "config": mike_config}
]
ensemble_results = {}
for model in models:
result = run_model(
config=model["config"],
backend="prax",
pipeline_name=f"ensemble_{model['type']}"
)
ensemble_results[model["type"]] = result
# Combine results
combined_forecast = combine_ensemble_results(ensemble_results)
return combined_forecast
Operational Forecasting
Implement automated operational forecasting systems.
Scheduled Execution
Set up automated model runs:
from rompy_oceanum import ScheduledForecaster
from crontab import CronTab
class OperationalForecaster:
def __init__(self):
self.forecaster = ScheduledForecaster(
schedule="0 */6 * * *", # Every 6 hours
backend="prax",
backend_config=PraxConfig(
org="operational",
project="wave-forecast",
stage="prod"
)
)
def setup_operational_forecast(self):
# Configure forecast chain
forecast_chain = [
{
"name": "data_ingestion",
"task": self.ingest_latest_forcing,
"timeout": 1800 # 30 minutes
},
{
"name": "quality_control",
"task": self.validate_forcing_data,
"dependencies": ["data_ingestion"]
},
{
"name": "wave_forecast",
"task": self.run_wave_model,
"dependencies": ["quality_control"]
},
{
"name": "post_processing",
"task": self.process_outputs,
"dependencies": ["wave_forecast"]
},
{
"name": "dissemination",
"task": self.publish_results,
"dependencies": ["post_processing"]
}
]
self.forecaster.register_workflow(forecast_chain)
def ingest_latest_forcing(self):
"""Download latest forcing data."""
# Implementation for data ingestion
pass
def validate_forcing_data(self):
"""Validate forcing data quality."""
# Implementation for QC
pass
def run_wave_model(self):
"""Execute wave model forecast."""
config = self.generate_forecast_config()
result = self.forecaster.submit_model(config)
return result
def process_outputs(self):
"""Post-process model outputs."""
# Implementation for post-processing
pass
def publish_results(self):
"""Publish results to stakeholders."""
# Implementation for dissemination
pass
Real-time Monitoring
Monitor operational forecasts in real-time:
from rompy_oceanum import ForecastMonitor
import logging
class OperationalMonitor:
def __init__(self):
self.monitor = ForecastMonitor(
check_interval=300, # 5 minutes
alert_channels=["email", "slack"]
)
def setup_monitoring(self):
# Define monitoring rules
rules = [
{
"name": "forecast_delay",
"condition": lambda run: run.age_hours > 12,
"severity": "critical",
"action": self.alert_forecast_delay
},
{
"name": "model_failure",
"condition": lambda run: run.status == "failed",
"severity": "high",
"action": self.handle_model_failure
},
{
"name": "resource_usage",
"condition": lambda run: run.cpu_usage > 0.9,
"severity": "medium",
"action": self.scale_resources
}
]
self.monitor.add_rules(rules)
def alert_forecast_delay(self, run):
"""Handle forecast delays."""
message = f"Forecast {run.id} delayed by {run.age_hours} hours"
self.monitor.send_alert(message, severity="critical")
def handle_model_failure(self, run):
"""Handle model execution failures."""
# Attempt automatic recovery
if run.retry_count < 3:
run.retry()
else:
self.monitor.send_alert(
f"Model {run.id} failed after 3 retries",
severity="high"
)
def scale_resources(self, run):
"""Scale resources for high-usage runs."""
run.scale_resources(cpu="8000m", memory="16Gi")
Data Pipeline Integration
Integrate wave modeling with broader data processing pipelines.
Apache Airflow Integration
Use Airflow for complex workflow orchestration:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from rompy_oceanum.integrations.airflow import RompyOperator
from datetime import datetime, timedelta
# Define DAG
dag = DAG(
'wave_forecast_pipeline',
default_args={
'owner': 'wave-team',
'depends_on_past': False,
'start_date': datetime(2025, 1, 1),
'retries': 2,
'retry_delay': timedelta(minutes=30)
},
schedule_interval='0 */6 * * *', # Every 6 hours
catchup=False
)
# Data ingestion task
ingest_forcing = PythonOperator(
task_id='ingest_forcing_data',
python_callable=download_forcing_data,
dag=dag
)
# Wave model execution
run_wave_model = RompyOperator(
task_id='run_wave_forecast',
config_template='templates/operational_forecast.yml',
backend='prax',
pipeline_name='airflow_wave_forecast',
dag=dag
)
# Post-processing
process_results = PythonOperator(
task_id='process_results',
python_callable=process_wave_outputs,
dag=dag
)
# Set dependencies
ingest_forcing >> run_wave_model >> process_results
Prefect Integration
Use Prefect for modern workflow orchestration:
import prefect
from prefect import Flow, task
from rompy_oceanum.integrations.prefect import rompy_task
@task
def prepare_forcing_data():
"""Prepare forcing data for wave model."""
# Data preparation logic
return forcing_config
@rompy_task
def run_wave_forecast(forcing_config):
"""Execute wave forecast model."""
config = generate_model_config(forcing_config)
return config
@task
def validate_outputs(model_result):
"""Validate model outputs."""
if not model_result.is_successful():
raise ValueError("Model execution failed")
# Validation logic
return model_result
# Create flow
with Flow("Wave Forecast Pipeline") as flow:
forcing = prepare_forcing_data()
forecast = run_wave_forecast(forcing)
validated = validate_outputs(forecast)
# Register and run
flow.register(project_name="wave-forecasting")
Error Handling and Recovery
Implement robust error handling for production workflows.
Retry Strategies
Configure intelligent retry mechanisms:
from rompy_oceanum import RetryManager
from tenacity import retry, stop_after_attempt, wait_exponential
class RobustWorkflow:
def __init__(self):
self.retry_manager = RetryManager(
max_attempts=3,
backoff_strategy="exponential",
base_delay=60 # 1 minute
)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
def execute_with_retry(self, config):
"""Execute model with automatic retry."""
try:
result = self.run_model(config)
if not result.is_successful():
raise ModelExecutionError(f"Model failed: {result.error}")
return result
except Exception as e:
logging.error(f"Model execution failed: {e}")
raise
Circuit Breaker Pattern
Prevent cascade failures:
from rompy_oceanum import CircuitBreaker
class ProtectedWorkflow:
def __init__(self):
self.circuit_breaker = CircuitBreaker(
failure_threshold=5,
recovery_timeout=300, # 5 minutes
expected_exception=ModelExecutionError
)
def execute_protected(self, config):
"""Execute with circuit breaker protection."""
with self.circuit_breaker:
return self.run_model(config)
Performance Optimization
Optimize workflow performance for production use.
Resource Management
Optimize resource allocation:
from rompy_oceanum import ResourceOptimizer
class OptimizedWorkflow:
def __init__(self):
self.optimizer = ResourceOptimizer()
def optimize_batch_resources(self, configs):
"""Optimize resource allocation for batch processing."""
# Analyze configurations
resource_requirements = []
for config in configs:
req = self.optimizer.estimate_resources(config)
resource_requirements.append(req)
# Optimize scheduling
schedule = self.optimizer.optimize_schedule(
configs, resource_requirements
)
return schedule
Caching Strategies
Implement intelligent caching:
from rompy_oceanum import ResultCache
import hashlib
class CachedWorkflow:
def __init__(self):
self.cache = ResultCache(
backend="redis", # or "memory", "file"
ttl=3600 # 1 hour
)
def execute_with_cache(self, config):
"""Execute with result caching."""
# Generate cache key
config_hash = hashlib.md5(
str(sorted(config.items())).encode()
).hexdigest()
cache_key = f"model_result_{config_hash}"
# Check cache
cached_result = self.cache.get(cache_key)
if cached_result:
return cached_result
# Execute and cache
result = self.run_model(config)
self.cache.set(cache_key, result)
return result
Best Practices
Guidelines for implementing robust advanced workflows:
Workflow Design
Modularity: Design workflows as composable components
Idempotency: Ensure workflows can be safely re-executed
Observability: Include comprehensive logging and monitoring
Error Boundaries: Isolate failures to prevent cascade effects
Resource Management
Right-sizing: Monitor and optimize resource allocation
Scaling: Implement auto-scaling for variable workloads
Cost Control: Monitor and control computational costs
Efficiency: Optimize workflow scheduling and execution
Monitoring and Alerting
Health Checks: Implement comprehensive health monitoring
Performance Metrics: Track key performance indicators
Alert Fatigue: Configure meaningful, actionable alerts
Documentation: Maintain runbooks for incident response
See Also
Basic Usage - Getting started with rompy-oceanum
Pipeline Backends - Understanding execution backends
Troubleshooting - Debugging workflow issues
API Reference - API reference documentation