climakitae.new_core.processors package#
Submodules#
climakitae.new_core.processors.abc_data_processor module#
Data processing module for climakitae.
This module defines the abstract base class for data processors, a registry system for processor classes, and example processor implementations. Processors are used to transform, filter, or otherwise process xarray data objects in a modular and extensible way.
Classes#
DataProcessor : Abstract base class for all data processors. RenameVariables : Example processor for renaming variables. ApplyBiasCorrection : Example processor for bias correction. FilterData : Example processor for filtering data.
Functions#
register_processor : Decorator for registering processor classes.
- class climakitae.new_core.processors.abc_data_processor.DataProcessor#
Bases:
ABCAbstract base class for data processing.
All data processors should inherit from this class and implement the required methods.
Notes
Processors should only store parameters needed for processing, not the data itself.
Processors should not throw exceptions; instead, they should return the data and a warning message if needed.
All processors should update the context with information about how they modified the data.
- execute(result, context)#
Process the data and return the result.
- update_context(context)#
Update the context with additional parameters.
- set_data_accessor(catalog)#
Set the data accessor for the processor.
- abstract execute(result: Dataset | DataArray | Iterable[Dataset | DataArray], context: Dict[str, Any]) Dataset | DataArray | Iterable[Dataset | DataArray]#
Process raw data into the required format.
- Parameters:
- Returns:
Dataset,DataArray, or iterable ofthese– Processed data.- Raises:
ValueError – If the data cannot be processed.
- abstract set_data_accessor(catalog: DataCatalog)#
Set the data accessor for the processor.
- Parameters:
catalog (
DataCatalog) – Data catalog for accessing datasets.- Returns:
- climakitae.new_core.processors.abc_data_processor.register_processor(key: str | object = <object object>, priority: int | object = <object object>) Callable#
Decorator to register a processor class.
- Parameters:
- Returns:
callable()– The decorator function that registers the processor class.
Examples
@register_processor(“my_processor”) class MyProcessor(DataProcessor):
…
climakitae.new_core.processors.concatenate module#
Concat DataProcessor
- class climakitae.new_core.processors.concatenate.Concat(value: str = 'time')#
Bases:
DataProcessorDataProcessor that concatenates multiple datasets along a new “sim” dimension.
This processor takes an iterable of xarray datasets or data arrays and concatenates them along a new “sim” dimension using their source_id values. This is useful for creating ensemble datasets from multiple climate models.
- Parameters:
value (
Any) – Optional configuration for the concatenation process. Can specify a dimension name other than “sim”.
- execute(result, context)#
Concatenates the input datasets along a new “sim” dimension.
- update_context(context)#
Updates the context with information about the concatenation operation.
Notes
All input datasets should have the ‘source_id’ attribute.
- execute(result: Dataset | DataArray | Dict[str, Dataset | DataArray] | Iterable[Dataset | DataArray], context: Dict[str, Any]) Dataset | DataArray#
Concatenate multiple datasets along a specified dimension.
If the dimension is “time”, this method will first extend the time domain of SSP scenarios by prepending historical data, then concatenate along a “sim” dimension. Otherwise, it concatenates datasets along the specified dimension using their source_id values.
- Parameters:
- Returns:
Union[xr.Dataset,xr.DataArray]– A single dataset with concatenated data.
- set_data_accessor(catalog: DataCatalog)#
Set the data catalog for this processor.
- Parameters:
catalog (
DataCatalog) – The data catalog to be used by this processor.
- update_context(context: ~typing.Dict[str, ~typing.Any], source_ids: ~typing.List[str] | object = <object object>)#
Update the context with information about the concatenation transformation.
- Parameters:
context (
dict[str,Any]) – Parameters for processing the data.source_ids (
List[str], optional) – List of source_ids that were concatenated
Note
The context is updated in place. This method does not return anything.
climakitae.new_core.processors.filter_unadjusted_models module#
Filter Unadjusted Models Processor
- class climakitae.new_core.processors.filter_unadjusted_models.FilterUnAdjustedModels(value: str = 'yes')#
Bases:
DataProcessorProcessor to filter out models that do not have a-priori bias adjustment.
- Parameters:
value (
tuple(date-like,date-like)) – The value to subset the data by. This should be a tuple of two date-like values.
- execute(result, context)#
Run the processor on the given result and context.
- update_context(context)#
Update the context with information about the transformation.
- set_data_accessor(catalog)#
Set the data accessor for the processor.
- _remove_unadjusted_models(result) xr.Dataset | xr.DataArray | Iterable[xr.Dataset, xr.DataArray] | None#
Remove unadjusted models from the result.
Notes
This processor filters out models that do not have a-priori bias adjustment. It is added to the processor chain by default when using the ClimateData class. If you want to include these models, you manually add the processor to your query and set the value to “no”.
- execute(result: Dataset | DataArray | Iterable[Dataset | DataArray], context: Dict[str, Any]) Dataset | DataArray | Iterable[Dataset | DataArray]#
Run the processor
- Parameters:
result (
xr.Dataset | xr.DataArray | Iterable[xr.Dataset | xr.DataArray]) – The data to be sliced.context (
dict) – The context for the processor. This is not used in this implementation but is included for consistency with the DataProcessor interface.
- Returns:
Union[xr.Dataset,DataArray,Iterable[xr.Dataset | xr.DataArray]]– The sliced data. This can be a single Dataset/DataArray or an iterable of them.- Raises:
ValueError – If the value is not one of the valid values.
- set_data_accessor(catalog: DataCatalog)#
Set the data accessor for the processor.
- Parameters:
catalog (
DataCatalog) – Data catalog for accessing datasets.- Returns:
climakitae.new_core.processors.processor_utils module#
Utility functions for processing data arrays in climakitae.
- climakitae.new_core.processors.processor_utils.extend_time_domain(result: Dict[str, Dataset | DataArray]) Dataset | DataArray#
Extend the time domain of the input data to cover 1980-2100.
This method ensures that all SSP scenarios have historical data included in the time series, allowing for proper warming level calculations. This is handled by concatenating historical data with SSP data and updating the attributes to that of the SSP data. Historical data is expected to be available in the input dictionary with keys formatted the same as SSP keys but with “historical” instead of r”ssp.{3}” (e.g., “ssp245” becomes “historical”).
- Parameters:
result (
Dict[str,Union[xr.Dataset | xr.DataArray]]) – A dictionary containing time-series data with keys representing different scenarios.- Returns:
Union[xr.Dataset,xr.DataArray]– The extended time-series data.
Notes
By construction, this function will drop reanalysis data.
- climakitae.new_core.processors.processor_utils.find_station_match(station_identifier: str, stations_df)#
Find matching station(s) in the stations DataFrame.
This function centralizes the station matching logic used by both the Clip processor and the clip parameter validator. It tries multiple matching strategies in order of specificity: 1. Exact match on station ID column 2. Exact match on station name column 3. Partial match on station name column
- Parameters:
- Returns:
DataFrame– DataFrame containing matching station(s). May have 0, 1, or multiple rows: - Empty (len=0): No matches found - Single row (len=1): Exact match found - Multiple rows (len>1): Multiple stations match the identifier
Notes
The caller is responsible for: - Checking if stations_df is None or empty before calling - Handling the different match scenarios (no match, single match, multiple matches) - Providing appropriate error messages or warnings based on context
Examples
>>> # For validation (clip_param_validator.py) >>> match = find_station_match("KSAC", stations_df) >>> if len(match) == 0: ... # Handle no match - provide suggestions >>> elif len(match) > 1: ... # Handle multiple matches - ask user to be more specific >>> else: ... # Valid single match ... return True
>>> # For coordinate extraction (clip.py) >>> match = find_station_match("KSAC", stations_df) >>> if len(match) == 0: ... # Raise ValueError with suggestions >>> elif len(match) > 1: ... # Raise ValueError asking for more specific identifier >>> else: ... # Extract coordinates and metadata ... lat = float(match.iloc[0]["LAT_Y"]) ... lon = float(match.iloc[0]["LON_X"])
- climakitae.new_core.processors.processor_utils.is_station_identifier(value: str) bool#
Check if a string looks like a station identifier.
This function uses heuristics to determine if a string appears to be a weather station identifier based on common patterns.
- Parameters:
value (
str) – String to check- Returns:
bool– True if the value looks like a station code or station name
Notes
Recognizes two patterns: 1. 4-character codes starting with ‘K’ (common US airport weather stations)
Examples: KSAC (Sacramento), KBFL (Bakersfield), KSFO (San Francisco)
Strings with parentheses containing a code with ‘K’ Examples: “Sacramento (KSAC)”, “San Francisco International (KSFO)”
Examples
>>> is_station_identifier("KSAC") True >>> is_station_identifier("Sacramento (KSAC)") True >>> is_station_identifier("CA") False >>> is_station_identifier("Kern County") False
climakitae.new_core.processors.template module#
Template for a DataProcessor subclass in climakitae.
This module provides a template for implementing custom data processors that can be registered and used within the climakitae data processing pipeline. Processors are designed to transform, filter, or otherwise process xarray data objects in a modular and extensible way.
Classes#
Template : Example processor template for subsetting data.
- class climakitae.new_core.processors.template.Template(value: Iterable[Any])#
Bases:
DataProcessorTemplate for a DataProcessor.
This class serves as a template for creating new data processors. It demonstrates the required methods and docstring style for consistency within the climakitae framework.
- Parameters:
value (
Iterable[Any]) – The value to subset the data by. Typically a tuple of two date-like values.
- value#
The value used for subsetting or transformation.
- Type:
Iterable[Any]
- execute(result, context)#
Run the processor on the provided data.
- update_context(context)#
Update the context with information about the transformation.
- set_data_accessor(catalog)#
Set the data accessor for the processor (optional, for advanced use).
- execute(result: Dataset | DataArray | Iterable[Dataset | DataArray], context: Dict[str, Any]) Dataset | DataArray | Iterable[Dataset | DataArray]#
Run the processor on the provided data.
- set_data_accessor(catalog: DataCatalog)#
Set the data accessor for the processor.
- Parameters:
catalog (
DataCatalog) – Data catalog for accessing datasets.- Returns:
climakitae.new_core.processors.update_attributes module#
UpdateAttributes Processor definition.
- class climakitae.new_core.processors.update_attributes.UpdateAttributes(value: ~typing.Any = <object object>)#
Bases:
DataProcessorUpdate attributes of the data.
Adds new attributes to the data that describe the processing steps
- execute(result: Dataset | DataArray | Iterable[Dataset | DataArray], context: Dict[str, Any]) Dataset | DataArray | Iterable[Dataset | DataArray]#
Execute the UpdateAttributes processor.
This method updates the attributes of the data based on the provided value.
- set_data_accessor(catalog: DataCatalog)#
Set the data accessor for the processor.
- Parameters:
catalog (
DataCatalog) – Data catalog for accessing datasets.- Returns:
Module contents#
Initialize the processors, ensuring they get registered.
- class climakitae.new_core.processors.Clip(value)#
Bases:
DataProcessorClip data based on spatial boundaries.
This processor supports single and multiple boundary clipping operations. In Phase 1, it supports multiple boundaries of the same category using union operations to combine geometries.
- Parameters:
value (
str | list | tuple) – The value(s) to clip the data by. Can be: - str: Single boundary key, file path, or coordinate specification - list: Multiple boundary keys of the same category (Phase 1) OR list of (lat, lon) tuples for multiple points - tuple: Coordinate bounds ((lat_min, lat_max), (lon_min, lon_max)) or a single (lat, lon) point
Examples
Single boundary: >>> clip = Clip(“CA”) # Single state >>> clip = Clip(“Los Angeles County”) # Single county
Multiple boundaries (Phase 1): >>> clip = Clip([“CA”, “OR”, “WA”]) # Multiple states >>> clip = Clip([“Los Angeles County”, “Orange County”]) # Multiple counties
Coordinate bounds: >>> clip = Clip(((32.0, 42.0), (-125.0, -114.0))) # lat/lon bounds
Single point (closest gridcell): >>> clip = Clip((37.7749, -122.4194)) # Single lat, lon point
Multiple points (closest gridcells): >>> clip = Clip([(37.7749, -122.4194), (34.0522, -118.2437)]) # Multiple lat, lon points
- execute(result: Dataset | DataArray | Iterable[Dataset | DataArray], context: Dict[str, Any]) Dataset | DataArray | Iterable[Dataset | DataArray]#
Process raw data into the required format.
- Parameters:
- Returns:
Dataset,DataArray, or iterable ofthese– Processed data.- Raises:
ValueError – If the data cannot be processed.
- set_data_accessor(catalog: DataCatalog)#
Set the data catalog for accessing boundary data.
- update_context(context: Dict[str, Any])#
Update the context with information about the clipping operation, to be stored in the “new_attrs” attribute.
- Parameters:
context (
dict[str,Any]) – Parameters for processing the data.
Note
The context is updated in place. This method does not return anything.
- validate_boundary_key(boundary_key: str) Dict[str, Any]#
Validate if a boundary key exists and return information about it.
- Parameters:
boundary_key (
str) – The boundary key to validate- Returns:
Dict[str,Any]– Dictionary containing validation results: - ‘valid’: bool, whether the key is valid - ‘category’: str, the category if found - ‘suggestions’: list, similar keys if not found
- class climakitae.new_core.processors.Concat(value: str = 'time')#
Bases:
DataProcessorDataProcessor that concatenates multiple datasets along a new “sim” dimension.
This processor takes an iterable of xarray datasets or data arrays and concatenates them along a new “sim” dimension using their source_id values. This is useful for creating ensemble datasets from multiple climate models.
- Parameters:
value (
Any) – Optional configuration for the concatenation process. Can specify a dimension name other than “sim”.
- execute(result, context)#
Concatenates the input datasets along a new “sim” dimension.
- update_context(context)#
Updates the context with information about the concatenation operation.
Notes
All input datasets should have the ‘source_id’ attribute.
- execute(result: Dataset | DataArray | Dict[str, Dataset | DataArray] | Iterable[Dataset | DataArray], context: Dict[str, Any]) Dataset | DataArray#
Concatenate multiple datasets along a specified dimension.
If the dimension is “time”, this method will first extend the time domain of SSP scenarios by prepending historical data, then concatenate along a “sim” dimension. Otherwise, it concatenates datasets along the specified dimension using their source_id values.
- Parameters:
- Returns:
Union[xr.Dataset,xr.DataArray]– A single dataset with concatenated data.
- set_data_accessor(catalog: DataCatalog)#
Set the data catalog for this processor.
- Parameters:
catalog (
DataCatalog) – The data catalog to be used by this processor.
- update_context(context: ~typing.Dict[str, ~typing.Any], source_ids: ~typing.List[str] | object = <object object>)#
Update the context with information about the concatenation transformation.
- Parameters:
context (
dict[str,Any]) – Parameters for processing the data.source_ids (
List[str], optional) – List of source_ids that were concatenated
Note
The context is updated in place. This method does not return anything.
- class climakitae.new_core.processors.FilterUnAdjustedModels(value: str = 'yes')#
Bases:
DataProcessorProcessor to filter out models that do not have a-priori bias adjustment.
- Parameters:
value (
tuple(date-like,date-like)) – The value to subset the data by. This should be a tuple of two date-like values.
- execute(result, context)#
Run the processor on the given result and context.
- update_context(context)#
Update the context with information about the transformation.
- set_data_accessor(catalog)#
Set the data accessor for the processor.
- _remove_unadjusted_models(result) xr.Dataset | xr.DataArray | Iterable[xr.Dataset, xr.DataArray] | None#
Remove unadjusted models from the result.
Notes
This processor filters out models that do not have a-priori bias adjustment. It is added to the processor chain by default when using the ClimateData class. If you want to include these models, you manually add the processor to your query and set the value to “no”.
- execute(result: Dataset | DataArray | Iterable[Dataset | DataArray], context: Dict[str, Any]) Dataset | DataArray | Iterable[Dataset | DataArray]#
Run the processor
- Parameters:
result (
xr.Dataset | xr.DataArray | Iterable[xr.Dataset | xr.DataArray]) – The data to be sliced.context (
dict) – The context for the processor. This is not used in this implementation but is included for consistency with the DataProcessor interface.
- Returns:
Union[xr.Dataset,DataArray,Iterable[xr.Dataset | xr.DataArray]]– The sliced data. This can be a single Dataset/DataArray or an iterable of them.- Raises:
ValueError – If the value is not one of the valid values.
- set_data_accessor(catalog: DataCatalog)#
Set the data accessor for the processor.
- Parameters:
catalog (
DataCatalog) – Data catalog for accessing datasets.- Returns:
- class climakitae.new_core.processors.TimeSlice(value: Iterable[Any])#
Bases:
DataProcessorSlice data based on time.
- Parameters:
value (
tuple(date-like,date-like)) – The value to subset the data by. This should be a tuple of two date-like values.
- _coerce_to_dates(value: tuple) tuple[pd.Timestamp, pd.Timestamp]#
Coerce the values to date-like objects.
- execute(result: Dataset | DataArray | Iterable[Dataset | DataArray], context: Dict[str, Any]) Dataset | DataArray | Iterable[Dataset | DataArray]#
Run the time slicing operation on the data.
- Parameters:
result (
xr.Dataset | xr.DataArray | Iterable[xr.Dataset | xr.DataArray]) – The data to be sliced.context (
dict) – The context for the processor. This is not used in this implementation but is included for consistency with the DataProcessor interface.
- Returns:
Union[xr.Dataset,DataArray,Iterable[xr.Dataset | xr.DataArray]]– The sliced data. This can be a single Dataset/DataArray or an iterable of them.
- set_data_accessor(catalog: DataCatalog)#
Set the data accessor for the processor.
- Parameters:
catalog (
DataCatalog) – Data catalog for accessing datasets.- Returns:
- class climakitae.new_core.processors.UpdateAttributes(value: ~typing.Any = <object object>)#
Bases:
DataProcessorUpdate attributes of the data.
Adds new attributes to the data that describe the processing steps
- execute(result: Dataset | DataArray | Iterable[Dataset | DataArray], context: Dict[str, Any]) Dataset | DataArray | Iterable[Dataset | DataArray]#
Execute the UpdateAttributes processor.
This method updates the attributes of the data based on the provided value.
- set_data_accessor(catalog: DataCatalog)#
Set the data accessor for the processor.
- Parameters:
catalog (
DataCatalog) – Data catalog for accessing datasets.- Returns:
- class climakitae.new_core.processors.WarmingLevel(value: Dict[str, Any])#
Bases:
DataProcessorTransform time-series climate data into a warming-levels approach.
This processor takes data with time dimensions and transforms it to data organized by warming levels, following the established warming level methodology.
- Parameters:
value (
Dict[str,Any]) – Configuration dictionary containing: - warming_levels : list[float]List of global warming levels in degrees C (e.g., [1.5, 2.0])
- warming_level_monthslist[int], optional
List of months to include (1-12). Default: all months
- warming_level_windowint, optional
Number of years before and after the central year. Default: 15
- execute : Transform data to warming level approach
- update_context : Update processing context with warming level information
- set_data_accessor : Set data catalog accessor
Notes
The input data must span from 1980-2100 and include historical climate data for proper warming level calculations. Data should have simulation and scenario dimensions or be properly configured for stacking.
- execute(result: Dataset | DataArray | Iterable[Dataset | DataArray], context: Dict[str, Any]) Dataset | DataArray | Iterable[Dataset | DataArray]#
Transform time-series data to warming level approach.
The transformation process involves the following steps for each simulation: 1. find the first year (from precomputed values) when a given warming level
is reached by a simulation (GCM, run, scenario)
- slice in a window of self.warming_level_window years around that year
- if the slice has a start year earlier than the simulation data, splice
historical onto the slice for the requested variable
return the data
- Parameters:
result (
xr.Dataset | xr.DataArray | Iterable[xr.Dataset | xr.DataArray]) – The time-series climate data to transform.context (
dict) – The context for the processor containing metadata and configuration.
- Returns:
Union[xr.Dataset,DataArray,Iterable[xr.Dataset | xr.DataArray]]– The data transformed to warming level approach with new dimensions: - warming_level: The target warming levels - simulation: Combined simulation identifiers - time_delta: Time steps relative to warming level center year
- extend_time_domain(result: Dict[str, Dataset | DataArray]) Dict[str, Dataset | DataArray]#
Extend the time domain of the input data to cover 1980-2100.
This method ensures that all SSP scenarios have historical data included in the time series, allowing for proper warming level calculations. This is handled by concatenating historical data with SSP data and updating the attributes to that of the SSP data. Historical data is expected to be available in the input dictionary with keys formatted the same as SSP keys but with “historical” instead of r”ssp.{3}” (e.g., “ssp245” becomes “historical”).
- Parameters:
result (
Dict[str,Union[xr.Dataset | xr.DataArray]]) – A dictionary containing time-series data with keys representing different scenarios.- Returns:
Union[xr.Dataset,xr.DataArray]– The extended time-series data.
- get_center_years(member_ids: Iterable[str], keys: Iterable[str]) Dict[str, list]#
Determine the year around which to center the warming level window for each simulation for each warming level.
- Parameters:
member_ids (
Iterable[str]) – List of member IDs corresponding to the keys.keys (
Iterable[str]) – List of keys representing different simulations or scenarios.
- Returns:
Dict[str,list]– A dictionary mapping each key to a list of center years for each warming level.
Notes
The center year is determined by finding the first occurrence of each warming level in the precomputed warming level times table. If no warming level data is found for a key, a warning is issued. If the warming level table does not contain data for a key, a warning is issued. The method assumes that the warming level times table is indexed by time and contains columns formatted as “key.join(‘_’)”, where the values are the warming levels and the index is the time dimension.
Center year can be np.nan if no warming level data is found.
- reformat_member_ids(result: Dict[str, Dataset | DataArray]) Dict[str, Dataset | DataArray]#
Reformat member IDs in the input data.
- Parameters:
result (
Dict[str,Union[xr.Dataset,xr.DataArray]]) – A dictionary containing time-series data with keys representing different scenarios.- Returns:
Dict[str,Union[xr.Dataset,xr.DataArray]]– The reformatted time-series data.
- set_data_accessor(catalog: DataCatalog)#
Set data accessor for retrieving warming level information.
- Parameters:
catalog (
DataCatalog) – Data catalog for accessing warming level lookup tables