Skip to content

Spatial API Reference (Ecospace)

Ecospace spatial modeling — grid creation, dispersal, connectivity, habitat, and simulation integration.

Core Data Structures

pypath.spatial.ecospace_params

ECOSPACE spatial parameter data structures.

This module defines the core data structures for spatial-temporal ecosystem modeling: - EcospaceGrid: Spatial grid configuration (irregular polygons) - EcospaceParams: Spatial parameters (habitat, dispersal, external flux) - SpatialState: Extended state for spatial simulation - ExternalFluxTimeseries: External flux data from ocean models

EcospaceGrid dataclass

Spatial grid configuration using irregular polygons.

Attributes:

Name Type Description
n_patches int

Number of spatial patches/cells

patch_ids ndarray

Unique identifiers for each patch [n_patches]

patch_areas ndarray

Area of each patch in km² [n_patches]

patch_centroids ndarray

(lon, lat) coordinates of patch centroids [n_patches, 2]

adjacency_matrix csr_matrix

Sparse adjacency matrix [n_patches, n_patches] 1 if patches share border, 0 otherwise

edge_lengths Dict[Tuple[int, int], float]

Border lengths (km) for adjacent patch pairs

crs str

Coordinate reference system (default: "EPSG:4326")

geometry Optional[GeoDataFrame]

GeoDataFrame with polygon geometries (if available)

Source code in pypath/spatial/ecospace_params.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
@dataclass
class EcospaceGrid:
    """Spatial grid configuration using irregular polygons.

    Attributes
    ----------
    n_patches : int
        Number of spatial patches/cells
    patch_ids : np.ndarray
        Unique identifiers for each patch [n_patches]
    patch_areas : np.ndarray
        Area of each patch in km² [n_patches]
    patch_centroids : np.ndarray
        (lon, lat) coordinates of patch centroids [n_patches, 2]
    adjacency_matrix : scipy.sparse.csr_matrix
        Sparse adjacency matrix [n_patches, n_patches]
        1 if patches share border, 0 otherwise
    edge_lengths : Dict[Tuple[int, int], float]
        Border lengths (km) for adjacent patch pairs
    crs : str
        Coordinate reference system (default: "EPSG:4326")
    geometry : Optional[gpd.GeoDataFrame]
        GeoDataFrame with polygon geometries (if available)
    """

    n_patches: int
    patch_ids: np.ndarray
    patch_areas: np.ndarray
    patch_centroids: np.ndarray
    adjacency_matrix: scipy.sparse.csr_matrix
    edge_lengths: Dict[Tuple[int, int], float]
    crs: str = "EPSG:4326"
    geometry: Optional[object] = None  # gpd.GeoDataFrame when available

    def __post_init__(self):
        """Validate grid data."""
        # Check dimensions
        if len(self.patch_ids) != self.n_patches:
            raise ValueError(
                f"patch_ids length ({len(self.patch_ids)}) != n_patches ({self.n_patches})"
            )
        if len(self.patch_areas) != self.n_patches:
            raise ValueError(
                f"patch_areas length ({len(self.patch_areas)}) != n_patches ({self.n_patches})"
            )
        if self.patch_centroids.shape != (self.n_patches, 2):
            raise ValueError(
                f"patch_centroids shape {self.patch_centroids.shape} != ({self.n_patches}, 2)"
            )
        if self.adjacency_matrix.shape != (self.n_patches, self.n_patches):
            raise ValueError(
                f"adjacency_matrix shape {self.adjacency_matrix.shape} != ({self.n_patches}, {self.n_patches})"
            )

        # Check that all areas are positive
        if np.any(self.patch_areas <= 0):
            raise ValueError("All patch areas must be positive")

        # Check that adjacency matrix is symmetric (sparse-safe)
        # Avoid densifying large adjacency matrices which can OOM; instead
        # check that (A - A.T) has no non-zero entries.
        diff = self.adjacency_matrix - self.adjacency_matrix.T
        if diff.nnz != 0:
            raise ValueError("Adjacency matrix must be symmetric")

    @classmethod
    def from_shapefile(
        cls,
        filepath: str,
        id_field: str = "id",
        area_field: Optional[str] = None,
        crs: Optional[str] = None,
    ) -> EcospaceGrid:
        """Create grid from shapefile or GeoJSON.

        Parameters
        ----------
        filepath : str
            Path to .shp, .geojson, or .gpkg file
        id_field : str
            Field containing unique patch IDs (default: "id")
        area_field : str, optional
            Field with pre-computed areas in km²
            If None, calculates from geometry
        crs : str, optional
            Force coordinate reference system (e.g., "EPSG:4326")

        Returns
        -------
        EcospaceGrid

        Raises
        ------
        ImportError
            If geopandas is not installed
        """
        if not _GIS_AVAILABLE:
            raise ImportError(
                "geopandas is required for shapefile support. "
                "Install with: pip install geopandas shapely rtree"
            )

        # Import here to avoid requiring geopandas if not using shapefiles
        from pypath.spatial.gis_utils import load_spatial_grid

        return load_spatial_grid(filepath, id_field, area_field, crs)

    @classmethod
    def from_regular_grid(
        cls, bounds: Tuple[float, float, float, float], nx: int, ny: int
    ) -> EcospaceGrid:
        """Create regular rectangular grid (for testing).

        Parameters
        ----------
        bounds : Tuple[float, float, float, float]
            (min_lon, min_lat, max_lon, max_lat)
        nx : int
            Number of grid cells in x direction
        ny : int
            Number of grid cells in y direction

        Returns
        -------
        EcospaceGrid
        """
        from pypath.spatial.gis_utils import create_regular_grid

        return create_regular_grid(bounds, nx, ny)

    def get_neighbors(self, patch_idx: int) -> np.ndarray:
        """Get indices of neighboring patches.

        Parameters
        ----------
        patch_idx : int
            Index of patch

        Returns
        -------
        np.ndarray
            Indices of neighboring patches
        """
        return self.adjacency_matrix[patch_idx].nonzero()[1]

    def get_edge_length(self, patch_i: int, patch_j: int) -> float:
        """Get border length between two patches.

        Parameters
        ----------
        patch_i, patch_j : int
            Patch indices

        Returns
        -------
        float
            Border length in km (0 if not adjacent)
        """
        key = (min(patch_i, patch_j), max(patch_i, patch_j))
        return self.edge_lengths.get(key, 0.0)
__post_init__
__post_init__()

Validate grid data.

Source code in pypath/spatial/ecospace_params.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def __post_init__(self):
    """Validate grid data."""
    # Check dimensions
    if len(self.patch_ids) != self.n_patches:
        raise ValueError(
            f"patch_ids length ({len(self.patch_ids)}) != n_patches ({self.n_patches})"
        )
    if len(self.patch_areas) != self.n_patches:
        raise ValueError(
            f"patch_areas length ({len(self.patch_areas)}) != n_patches ({self.n_patches})"
        )
    if self.patch_centroids.shape != (self.n_patches, 2):
        raise ValueError(
            f"patch_centroids shape {self.patch_centroids.shape} != ({self.n_patches}, 2)"
        )
    if self.adjacency_matrix.shape != (self.n_patches, self.n_patches):
        raise ValueError(
            f"adjacency_matrix shape {self.adjacency_matrix.shape} != ({self.n_patches}, {self.n_patches})"
        )

    # Check that all areas are positive
    if np.any(self.patch_areas <= 0):
        raise ValueError("All patch areas must be positive")

    # Check that adjacency matrix is symmetric (sparse-safe)
    # Avoid densifying large adjacency matrices which can OOM; instead
    # check that (A - A.T) has no non-zero entries.
    diff = self.adjacency_matrix - self.adjacency_matrix.T
    if diff.nnz != 0:
        raise ValueError("Adjacency matrix must be symmetric")
from_regular_grid classmethod
from_regular_grid(bounds: Tuple[float, float, float, float], nx: int, ny: int) -> EcospaceGrid

Create regular rectangular grid (for testing).

Parameters:

Name Type Description Default
bounds Tuple[float, float, float, float]

(min_lon, min_lat, max_lon, max_lat)

required
nx int

Number of grid cells in x direction

required
ny int

Number of grid cells in y direction

required

Returns:

Type Description
EcospaceGrid
Source code in pypath/spatial/ecospace_params.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
@classmethod
def from_regular_grid(
    cls, bounds: Tuple[float, float, float, float], nx: int, ny: int
) -> EcospaceGrid:
    """Create regular rectangular grid (for testing).

    Parameters
    ----------
    bounds : Tuple[float, float, float, float]
        (min_lon, min_lat, max_lon, max_lat)
    nx : int
        Number of grid cells in x direction
    ny : int
        Number of grid cells in y direction

    Returns
    -------
    EcospaceGrid
    """
    from pypath.spatial.gis_utils import create_regular_grid

    return create_regular_grid(bounds, nx, ny)
from_shapefile classmethod
from_shapefile(filepath: str, id_field: str = 'id', area_field: Optional[str] = None, crs: Optional[str] = None) -> EcospaceGrid

Create grid from shapefile or GeoJSON.

Parameters:

Name Type Description Default
filepath str

Path to .shp, .geojson, or .gpkg file

required
id_field str

Field containing unique patch IDs (default: "id")

'id'
area_field str

Field with pre-computed areas in km² If None, calculates from geometry

None
crs str

Force coordinate reference system (e.g., "EPSG:4326")

None

Returns:

Type Description
EcospaceGrid

Raises:

Type Description
ImportError

If geopandas is not installed

Source code in pypath/spatial/ecospace_params.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
@classmethod
def from_shapefile(
    cls,
    filepath: str,
    id_field: str = "id",
    area_field: Optional[str] = None,
    crs: Optional[str] = None,
) -> EcospaceGrid:
    """Create grid from shapefile or GeoJSON.

    Parameters
    ----------
    filepath : str
        Path to .shp, .geojson, or .gpkg file
    id_field : str
        Field containing unique patch IDs (default: "id")
    area_field : str, optional
        Field with pre-computed areas in km²
        If None, calculates from geometry
    crs : str, optional
        Force coordinate reference system (e.g., "EPSG:4326")

    Returns
    -------
    EcospaceGrid

    Raises
    ------
    ImportError
        If geopandas is not installed
    """
    if not _GIS_AVAILABLE:
        raise ImportError(
            "geopandas is required for shapefile support. "
            "Install with: pip install geopandas shapely rtree"
        )

    # Import here to avoid requiring geopandas if not using shapefiles
    from pypath.spatial.gis_utils import load_spatial_grid

    return load_spatial_grid(filepath, id_field, area_field, crs)
get_edge_length
get_edge_length(patch_i: int, patch_j: int) -> float

Get border length between two patches.

Parameters:

Name Type Description Default
patch_i int

Patch indices

required
patch_j int

Patch indices

required

Returns:

Type Description
float

Border length in km (0 if not adjacent)

Source code in pypath/spatial/ecospace_params.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
def get_edge_length(self, patch_i: int, patch_j: int) -> float:
    """Get border length between two patches.

    Parameters
    ----------
    patch_i, patch_j : int
        Patch indices

    Returns
    -------
    float
        Border length in km (0 if not adjacent)
    """
    key = (min(patch_i, patch_j), max(patch_i, patch_j))
    return self.edge_lengths.get(key, 0.0)
get_neighbors
get_neighbors(patch_idx: int) -> np.ndarray

Get indices of neighboring patches.

Parameters:

Name Type Description Default
patch_idx int

Index of patch

required

Returns:

Type Description
ndarray

Indices of neighboring patches

Source code in pypath/spatial/ecospace_params.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def get_neighbors(self, patch_idx: int) -> np.ndarray:
    """Get indices of neighboring patches.

    Parameters
    ----------
    patch_idx : int
        Index of patch

    Returns
    -------
    np.ndarray
        Indices of neighboring patches
    """
    return self.adjacency_matrix[patch_idx].nonzero()[1]

EcospaceParams dataclass

Spatial parameters for ECOSPACE simulation.

This extends the standard Ecosim parameters with spatial components. If ecospace=None in RsimScenario, simulation runs as non-spatial.

Attributes:

Name Type Description
grid EcospaceGrid

Spatial grid configuration

habitat_preference ndarray

Habitat preference/suitability [n_groups, n_patches] Values 0-1 where 1 = optimal habitat

habitat_capacity ndarray

Habitat capacity multiplier [n_groups, n_patches] Affects local carrying capacity (1.0 = no effect)

dispersal_rate ndarray

Diffusion coefficient (km²/month) [n_groups] 0 = no dispersal

advection_enabled ndarray

Enable habitat-directed movement [n_groups], boolean

gravity_strength ndarray

Strength of biomass-weighted movement [n_groups] 0 = no gravity effect

external_flux Optional[ExternalFluxTimeseries]

Externally provided flux (e.g., from ocean models) If provided for a group, overrides model-calculated dispersal

environmental_drivers Optional[object]

Time-varying environmental drivers (EnvironmentalDrivers instance)

Source code in pypath/spatial/ecospace_params.py
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
@dataclass
class EcospaceParams:
    """Spatial parameters for ECOSPACE simulation.

    This extends the standard Ecosim parameters with spatial components.
    If ecospace=None in RsimScenario, simulation runs as non-spatial.

    Attributes
    ----------
    grid : EcospaceGrid
        Spatial grid configuration
    habitat_preference : np.ndarray
        Habitat preference/suitability [n_groups, n_patches]
        Values 0-1 where 1 = optimal habitat
    habitat_capacity : np.ndarray
        Habitat capacity multiplier [n_groups, n_patches]
        Affects local carrying capacity (1.0 = no effect)
    dispersal_rate : np.ndarray
        Diffusion coefficient (km²/month) [n_groups]
        0 = no dispersal
    advection_enabled : np.ndarray
        Enable habitat-directed movement [n_groups], boolean
    gravity_strength : np.ndarray
        Strength of biomass-weighted movement [n_groups]
        0 = no gravity effect
    external_flux : Optional[ExternalFluxTimeseries]
        Externally provided flux (e.g., from ocean models)
        If provided for a group, overrides model-calculated dispersal
    environmental_drivers : Optional[object]
        Time-varying environmental drivers (EnvironmentalDrivers instance)
    """

    grid: EcospaceGrid
    habitat_preference: np.ndarray
    habitat_capacity: np.ndarray
    dispersal_rate: np.ndarray
    advection_enabled: np.ndarray
    gravity_strength: np.ndarray
    external_flux: Optional[ExternalFluxTimeseries] = None
    environmental_drivers: Optional[object] = (
        None  # EnvironmentalDrivers when available
    )

    def __post_init__(self):
        """Validate spatial parameters."""
        n_patches = self.grid.n_patches

        # Infer n_groups from habitat_preference
        if self.habitat_preference.ndim != 2:
            raise ValueError(
                f"habitat_preference must be 2D [n_groups, n_patches], got {self.habitat_preference.ndim}D"
            )

        n_groups = self.habitat_preference.shape[0]

        # Check habitat_preference dimensions
        if self.habitat_preference.shape != (n_groups, n_patches):
            raise ValueError(
                f"habitat_preference shape {self.habitat_preference.shape} != "
                f"({n_groups}, {n_patches})"
            )

        # Check habitat_capacity dimensions
        if self.habitat_capacity.shape != (n_groups, n_patches):
            raise ValueError(
                f"habitat_capacity shape {self.habitat_capacity.shape} != "
                f"({n_groups}, {n_patches})"
            )

        # Check dispersal_rate dimensions
        if len(self.dispersal_rate) != n_groups:
            raise ValueError(
                f"dispersal_rate length ({len(self.dispersal_rate)}) != n_groups ({n_groups})"
            )

        # Check advection_enabled dimensions
        if len(self.advection_enabled) != n_groups:
            raise ValueError(
                f"advection_enabled length ({len(self.advection_enabled)}) != n_groups ({n_groups})"
            )

        # Check gravity_strength dimensions
        if len(self.gravity_strength) != n_groups:
            raise ValueError(
                f"gravity_strength length ({len(self.gravity_strength)}) != n_groups ({n_groups})"
            )

        # Check value ranges
        if np.any(self.habitat_preference < 0) or np.any(self.habitat_preference > 1):
            raise ValueError("habitat_preference values must be in [0, 1]")

        if np.any(self.habitat_capacity < 0):
            raise ValueError("habitat_capacity values must be non-negative")

        if np.any(self.dispersal_rate < 0):
            raise ValueError("dispersal_rate values must be non-negative")

        if np.any(self.gravity_strength < 0):
            raise ValueError("gravity_strength values must be non-negative")
__post_init__
__post_init__()

Validate spatial parameters.

Source code in pypath/spatial/ecospace_params.py
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
def __post_init__(self):
    """Validate spatial parameters."""
    n_patches = self.grid.n_patches

    # Infer n_groups from habitat_preference
    if self.habitat_preference.ndim != 2:
        raise ValueError(
            f"habitat_preference must be 2D [n_groups, n_patches], got {self.habitat_preference.ndim}D"
        )

    n_groups = self.habitat_preference.shape[0]

    # Check habitat_preference dimensions
    if self.habitat_preference.shape != (n_groups, n_patches):
        raise ValueError(
            f"habitat_preference shape {self.habitat_preference.shape} != "
            f"({n_groups}, {n_patches})"
        )

    # Check habitat_capacity dimensions
    if self.habitat_capacity.shape != (n_groups, n_patches):
        raise ValueError(
            f"habitat_capacity shape {self.habitat_capacity.shape} != "
            f"({n_groups}, {n_patches})"
        )

    # Check dispersal_rate dimensions
    if len(self.dispersal_rate) != n_groups:
        raise ValueError(
            f"dispersal_rate length ({len(self.dispersal_rate)}) != n_groups ({n_groups})"
        )

    # Check advection_enabled dimensions
    if len(self.advection_enabled) != n_groups:
        raise ValueError(
            f"advection_enabled length ({len(self.advection_enabled)}) != n_groups ({n_groups})"
        )

    # Check gravity_strength dimensions
    if len(self.gravity_strength) != n_groups:
        raise ValueError(
            f"gravity_strength length ({len(self.gravity_strength)}) != n_groups ({n_groups})"
        )

    # Check value ranges
    if np.any(self.habitat_preference < 0) or np.any(self.habitat_preference > 1):
        raise ValueError("habitat_preference values must be in [0, 1]")

    if np.any(self.habitat_capacity < 0):
        raise ValueError("habitat_capacity values must be non-negative")

    if np.any(self.dispersal_rate < 0):
        raise ValueError("dispersal_rate values must be non-negative")

    if np.any(self.gravity_strength < 0):
        raise ValueError("gravity_strength values must be non-negative")

ExternalFluxTimeseries dataclass

Externally generated flux timeseries from ocean models.

Allows users to provide pre-computed transport between patches from: - Ocean circulation models (ROMS, MITgcm, HYCOM) - Particle tracking systems (Ichthyop, OpenDrift, Parcels) - Connectivity matrices (genetic data, telemetry, mark-recapture)

Attributes:

Name Type Description
flux_data Union[ndarray, csr_matrix]

Flux timeseries with shape: - [n_timesteps, n_groups, n_patches, n_patches] for full format - Sparse format supported for memory efficiency flux_data[t, g, p, q] = flux from patch p to patch q for group g at time t

times ndarray

Time points (in years) corresponding to flux_data [n_timesteps]

group_indices ndarray

Which groups have external flux [n_groups_with_flux] Groups not in this list will use model-calculated dispersal

interpolate bool

Whether to use temporal interpolation (default: True)

format str

Data format: "flux_matrix" or "connectivity_matrix" - "flux_matrix": Direct flux values (biomass/time) - "connectivity_matrix": Proportions (0-1) scaled by biomass

validated bool

Whether flux conservation has been validated

Source code in pypath/spatial/ecospace_params.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
@dataclass
class ExternalFluxTimeseries:
    """Externally generated flux timeseries from ocean models.

    Allows users to provide pre-computed transport between patches from:
    - Ocean circulation models (ROMS, MITgcm, HYCOM)
    - Particle tracking systems (Ichthyop, OpenDrift, Parcels)
    - Connectivity matrices (genetic data, telemetry, mark-recapture)

    Attributes
    ----------
    flux_data : Union[np.ndarray, scipy.sparse.csr_matrix]
        Flux timeseries with shape:
        - [n_timesteps, n_groups, n_patches, n_patches] for full format
        - Sparse format supported for memory efficiency
        flux_data[t, g, p, q] = flux from patch p to patch q for group g at time t
    times : np.ndarray
        Time points (in years) corresponding to flux_data [n_timesteps]
    group_indices : np.ndarray
        Which groups have external flux [n_groups_with_flux]
        Groups not in this list will use model-calculated dispersal
    interpolate : bool
        Whether to use temporal interpolation (default: True)
    format : str
        Data format: "flux_matrix" or "connectivity_matrix"
        - "flux_matrix": Direct flux values (biomass/time)
        - "connectivity_matrix": Proportions (0-1) scaled by biomass
    validated : bool
        Whether flux conservation has been validated
    """

    flux_data: Union[np.ndarray, scipy.sparse.csr_matrix]
    times: np.ndarray
    group_indices: np.ndarray
    interpolate: bool = True
    format: str = "flux_matrix"
    validated: bool = False

    def __post_init__(self):
        """Validate external flux data."""
        # Check that times are sorted
        if not np.all(np.diff(self.times) > 0):
            raise ValueError("times must be strictly increasing")

        # Check format
        if self.format not in ["flux_matrix", "connectivity_matrix"]:
            raise ValueError(
                f"format must be 'flux_matrix' or 'connectivity_matrix', got '{self.format}'"
            )

        # Validate dimensions
        if isinstance(self.flux_data, np.ndarray):
            if self.flux_data.ndim != 4:
                raise ValueError(
                    f"flux_data must be 4D [time, group, patch, patch], got {self.flux_data.ndim}D"
                )
            if self.flux_data.shape[0] != len(self.times):
                raise ValueError(
                    f"flux_data time dimension ({self.flux_data.shape[0]}) != len(times) ({len(self.times)})"
                )

    def get_flux_at_time(self, t: float, group_idx: int) -> np.ndarray:
        """Get flux matrix at given time for group.

        Parameters
        ----------
        t : float
            Simulation time (years)
        group_idx : int
            Group index

        Returns
        -------
        np.ndarray
            Flux matrix [n_patches, n_patches]
            flux[p, q] = flux from patch p to patch q
        """
        # Find group in external flux data
        group_position = np.where(self.group_indices == group_idx)[0]
        if len(group_position) == 0:
            raise ValueError(f"Group {group_idx} not found in external flux")
        group_pos = group_position[0]

        # Get flux at time t (with interpolation if enabled)
        if self.interpolate:
            # Linear interpolation between timesteps
            if t <= self.times[0]:
                time_idx = 0
                flux_matrix = self.flux_data[0, group_pos]
            elif t >= self.times[-1]:
                time_idx = len(self.times) - 1
                flux_matrix = self.flux_data[-1, group_pos]
            else:
                # Find bracketing times
                idx_after = np.searchsorted(self.times, t)
                idx_before = idx_after - 1

                # Interpolation weight
                t_before = self.times[idx_before]
                t_after = self.times[idx_after]
                weight = (t - t_before) / (t_after - t_before)

                # Linear interpolation
                flux_before = self.flux_data[idx_before, group_pos]
                flux_after = self.flux_data[idx_after, group_pos]
                flux_matrix = (1 - weight) * flux_before + weight * flux_after
        else:
            # Nearest neighbor (no interpolation)
            time_idx = np.argmin(np.abs(self.times - t))
            flux_matrix = self.flux_data[time_idx, group_pos]

        # Convert to dense if sparse
        if scipy.sparse.issparse(flux_matrix):
            flux_matrix = flux_matrix.toarray()

        return flux_matrix

    @classmethod
    def from_netcdf(
        cls,
        filepath: str,
        time_var: str = "time",
        flux_var: str = "flux",
        group_mapping: Optional[Dict[str, int]] = None,
    ) -> ExternalFluxTimeseries:
        """Load external flux from NetCDF file.

        Parameters
        ----------
        filepath : str
            Path to NetCDF file
        time_var : str
            Name of time variable
        flux_var : str
            Name of flux variable
        group_mapping : dict, optional
            Map species names to group indices

        Returns
        -------
        ExternalFluxTimeseries
        """
        from pypath.spatial.external_flux import load_external_flux_from_netcdf

        return load_external_flux_from_netcdf(
            filepath, time_var, flux_var, group_mapping
        )
__post_init__
__post_init__()

Validate external flux data.

Source code in pypath/spatial/ecospace_params.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
def __post_init__(self):
    """Validate external flux data."""
    # Check that times are sorted
    if not np.all(np.diff(self.times) > 0):
        raise ValueError("times must be strictly increasing")

    # Check format
    if self.format not in ["flux_matrix", "connectivity_matrix"]:
        raise ValueError(
            f"format must be 'flux_matrix' or 'connectivity_matrix', got '{self.format}'"
        )

    # Validate dimensions
    if isinstance(self.flux_data, np.ndarray):
        if self.flux_data.ndim != 4:
            raise ValueError(
                f"flux_data must be 4D [time, group, patch, patch], got {self.flux_data.ndim}D"
            )
        if self.flux_data.shape[0] != len(self.times):
            raise ValueError(
                f"flux_data time dimension ({self.flux_data.shape[0]}) != len(times) ({len(self.times)})"
            )
from_netcdf classmethod
from_netcdf(filepath: str, time_var: str = 'time', flux_var: str = 'flux', group_mapping: Optional[Dict[str, int]] = None) -> ExternalFluxTimeseries

Load external flux from NetCDF file.

Parameters:

Name Type Description Default
filepath str

Path to NetCDF file

required
time_var str

Name of time variable

'time'
flux_var str

Name of flux variable

'flux'
group_mapping dict

Map species names to group indices

None

Returns:

Type Description
ExternalFluxTimeseries
Source code in pypath/spatial/ecospace_params.py
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
@classmethod
def from_netcdf(
    cls,
    filepath: str,
    time_var: str = "time",
    flux_var: str = "flux",
    group_mapping: Optional[Dict[str, int]] = None,
) -> ExternalFluxTimeseries:
    """Load external flux from NetCDF file.

    Parameters
    ----------
    filepath : str
        Path to NetCDF file
    time_var : str
        Name of time variable
    flux_var : str
        Name of flux variable
    group_mapping : dict, optional
        Map species names to group indices

    Returns
    -------
    ExternalFluxTimeseries
    """
    from pypath.spatial.external_flux import load_external_flux_from_netcdf

    return load_external_flux_from_netcdf(
        filepath, time_var, flux_var, group_mapping
    )
get_flux_at_time
get_flux_at_time(t: float, group_idx: int) -> np.ndarray

Get flux matrix at given time for group.

Parameters:

Name Type Description Default
t float

Simulation time (years)

required
group_idx int

Group index

required

Returns:

Type Description
ndarray

Flux matrix [n_patches, n_patches] flux[p, q] = flux from patch p to patch q

Source code in pypath/spatial/ecospace_params.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
def get_flux_at_time(self, t: float, group_idx: int) -> np.ndarray:
    """Get flux matrix at given time for group.

    Parameters
    ----------
    t : float
        Simulation time (years)
    group_idx : int
        Group index

    Returns
    -------
    np.ndarray
        Flux matrix [n_patches, n_patches]
        flux[p, q] = flux from patch p to patch q
    """
    # Find group in external flux data
    group_position = np.where(self.group_indices == group_idx)[0]
    if len(group_position) == 0:
        raise ValueError(f"Group {group_idx} not found in external flux")
    group_pos = group_position[0]

    # Get flux at time t (with interpolation if enabled)
    if self.interpolate:
        # Linear interpolation between timesteps
        if t <= self.times[0]:
            time_idx = 0
            flux_matrix = self.flux_data[0, group_pos]
        elif t >= self.times[-1]:
            time_idx = len(self.times) - 1
            flux_matrix = self.flux_data[-1, group_pos]
        else:
            # Find bracketing times
            idx_after = np.searchsorted(self.times, t)
            idx_before = idx_after - 1

            # Interpolation weight
            t_before = self.times[idx_before]
            t_after = self.times[idx_after]
            weight = (t - t_before) / (t_after - t_before)

            # Linear interpolation
            flux_before = self.flux_data[idx_before, group_pos]
            flux_after = self.flux_data[idx_after, group_pos]
            flux_matrix = (1 - weight) * flux_before + weight * flux_after
    else:
        # Nearest neighbor (no interpolation)
        time_idx = np.argmin(np.abs(self.times - t))
        flux_matrix = self.flux_data[time_idx, group_pos]

    # Convert to dense if sparse
    if scipy.sparse.issparse(flux_matrix):
        flux_matrix = flux_matrix.toarray()

    return flux_matrix

SpatialState dataclass

Extended state for spatial simulation.

Attributes:

Name Type Description
Biomass ndarray

Biomass state [n_groups+1, n_patches] Index 0 = "Outside" (detritus, patch-invariant)

N Optional[ndarray]

Numbers (for multi-stanza groups) [n_groups+1, n_patches]

Ftime Optional[ndarray]

Foraging time [n_groups+1, n_patches]

Source code in pypath/spatial/ecospace_params.py
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
@dataclass
class SpatialState:
    """Extended state for spatial simulation.

    Attributes
    ----------
    Biomass : np.ndarray
        Biomass state [n_groups+1, n_patches]
        Index 0 = "Outside" (detritus, patch-invariant)
    N : Optional[np.ndarray]
        Numbers (for multi-stanza groups) [n_groups+1, n_patches]
    Ftime : Optional[np.ndarray]
        Foraging time [n_groups+1, n_patches]
    """

    Biomass: np.ndarray
    N: Optional[np.ndarray] = None
    Ftime: Optional[np.ndarray] = None

    def collapse_to_total(self) -> np.ndarray:
        """Sum biomass across patches for compatibility.

        Returns
        -------
        np.ndarray
            Total biomass [n_groups+1]
        """
        return np.sum(self.Biomass, axis=1)

    def get_patch_biomass(self, patch_idx: int) -> np.ndarray:
        """Get biomass in a specific patch.

        Parameters
        ----------
        patch_idx : int
            Patch index

        Returns
        -------
        np.ndarray
            Biomass in patch [n_groups+1]
        """
        return self.Biomass[:, patch_idx]
collapse_to_total
collapse_to_total() -> np.ndarray

Sum biomass across patches for compatibility.

Returns:

Type Description
ndarray

Total biomass [n_groups+1]

Source code in pypath/spatial/ecospace_params.py
463
464
465
466
467
468
469
470
471
def collapse_to_total(self) -> np.ndarray:
    """Sum biomass across patches for compatibility.

    Returns
    -------
    np.ndarray
        Total biomass [n_groups+1]
    """
    return np.sum(self.Biomass, axis=1)
get_patch_biomass
get_patch_biomass(patch_idx: int) -> np.ndarray

Get biomass in a specific patch.

Parameters:

Name Type Description Default
patch_idx int

Patch index

required

Returns:

Type Description
ndarray

Biomass in patch [n_groups+1]

Source code in pypath/spatial/ecospace_params.py
473
474
475
476
477
478
479
480
481
482
483
484
485
486
def get_patch_biomass(self, patch_idx: int) -> np.ndarray:
    """Get biomass in a specific patch.

    Parameters
    ----------
    patch_idx : int
        Patch index

    Returns
    -------
    np.ndarray
        Biomass in patch [n_groups+1]
    """
    return self.Biomass[:, patch_idx]

Grid Creation & GIS Utilities

pypath.spatial.gis_utils

GIS utilities for ECOSPACE spatial grids.

Functions for loading spatial grids from shapefiles/GeoJSON and creating regular grids for testing.

create_1d_grid

create_1d_grid(n_patches: int, spacing: float = 1.0) -> 'EcospaceGrid'

Create 1D chain of patches for testing.

Parameters:

Name Type Description Default
n_patches int

Number of patches

required
spacing float

Distance between patch centers (km)

1.0

Returns:

Type Description
EcospaceGrid
Source code in pypath/spatial/gis_utils.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def create_1d_grid(n_patches: int, spacing: float = 1.0) -> "EcospaceGrid":
    """Create 1D chain of patches for testing.

    Parameters
    ----------
    n_patches : int
        Number of patches
    spacing : float
        Distance between patch centers (km)

    Returns
    -------
    EcospaceGrid
    """
    # Import here to avoid circular imports
    from pypath.spatial.ecospace_params import EcospaceGrid

    patch_ids = np.arange(n_patches)
    patch_areas = np.ones(n_patches)  # 1 km² each
    patch_centroids = np.column_stack(
        [
            np.arange(n_patches) * spacing,  # x coordinates
            np.zeros(n_patches),  # y coordinates (all at y=0)
        ]
    )

    # Build adjacency for 1D chain
    rows = []
    cols = []
    edge_lengths = {}

    for i in range(n_patches - 1):
        rows.extend([i, i + 1])
        cols.extend([i + 1, i])
        edge_lengths[(i, i + 1)] = 1.0  # Unit border length

    # Create sparse adjacency matrix
    data = np.ones(len(rows))
    adjacency_matrix = scipy.sparse.csr_matrix(
        (data, (rows, cols)), shape=(n_patches, n_patches)
    )

    return EcospaceGrid(
        n_patches=n_patches,
        patch_ids=patch_ids,
        patch_areas=patch_areas,
        patch_centroids=patch_centroids,
        adjacency_matrix=adjacency_matrix,
        edge_lengths=edge_lengths,
        crs="EPSG:4326",
        geometry=None,
    )

create_regular_grid

create_regular_grid(bounds: Tuple[float, float, float, float], nx: int, ny: int) -> 'EcospaceGrid'

Create regular rectangular grid for testing.

Parameters:

Name Type Description Default
bounds Tuple[float, float, float, float]

(min_lon, min_lat, max_lon, max_lat)

required
nx int

Number of grid cells in x direction

required
ny int

Number of grid cells in y direction

required

Returns:

Type Description
EcospaceGrid
Source code in pypath/spatial/gis_utils.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
def create_regular_grid(
    bounds: Tuple[float, float, float, float], nx: int, ny: int
) -> "EcospaceGrid":
    """Create regular rectangular grid for testing.

    Parameters
    ----------
    bounds : Tuple[float, float, float, float]
        (min_lon, min_lat, max_lon, max_lat)
    nx : int
        Number of grid cells in x direction
    ny : int
        Number of grid cells in y direction

    Returns
    -------
    EcospaceGrid
    """
    # Import here to avoid circular imports
    from pypath.spatial.ecospace_params import EcospaceGrid

    min_lon, min_lat, max_lon, max_lat = bounds

    # Grid spacing
    dx = (max_lon - min_lon) / nx
    dy = (max_lat - min_lat) / ny

    # Create grid cells
    n_patches = nx * ny
    patch_ids = np.arange(n_patches)
    patch_areas = np.full(n_patches, dx * dy * 111 * 111)  # Rough km² conversion
    patch_centroids = np.zeros((n_patches, 2))

    # Build adjacency matrix for regular grid
    # Patches are indexed row-major: patch_idx = iy * nx + ix
    rows = []
    cols = []
    edge_lengths = {}

    for iy in range(ny):
        for ix in range(nx):
            patch_idx = iy * nx + ix

            # Calculate centroid
            lon = min_lon + (ix + 0.5) * dx
            lat = min_lat + (iy + 0.5) * dy
            patch_centroids[patch_idx] = [lon, lat]

            # Add neighbors (rook adjacency: 4-connected)
            # Right neighbor
            if ix < nx - 1:
                neighbor_idx = iy * nx + (ix + 1)
                rows.extend([patch_idx, neighbor_idx])
                cols.extend([neighbor_idx, patch_idx])
                edge_key = (min(patch_idx, neighbor_idx), max(patch_idx, neighbor_idx))
                edge_lengths[edge_key] = dy * 111  # Rough km conversion

            # Top neighbor
            if iy < ny - 1:
                neighbor_idx = (iy + 1) * nx + ix
                rows.extend([patch_idx, neighbor_idx])
                cols.extend([neighbor_idx, patch_idx])
                edge_key = (min(patch_idx, neighbor_idx), max(patch_idx, neighbor_idx))
                edge_lengths[edge_key] = dx * 111  # Rough km conversion

    # Create sparse adjacency matrix
    data = np.ones(len(rows))
    adjacency_matrix = scipy.sparse.csr_matrix(
        (data, (rows, cols)), shape=(n_patches, n_patches)
    )

    return EcospaceGrid(
        n_patches=n_patches,
        patch_ids=patch_ids,
        patch_areas=patch_areas,
        patch_centroids=patch_centroids,
        adjacency_matrix=adjacency_matrix,
        edge_lengths=edge_lengths,
        crs="EPSG:4326",
        geometry=None,
    )

load_spatial_grid

load_spatial_grid(filepath: str, id_field: str = 'id', area_field: Optional[str] = None, crs: Optional[str] = None) -> 'EcospaceGrid'

Load spatial grid from shapefile or GeoJSON.

Parameters:

Name Type Description Default
filepath str

Path to .shp, .geojson, or .gpkg file

required
id_field str

Field containing unique patch IDs

'id'
area_field str

Field with pre-computed areas in km² If None, calculates from geometry

None
crs str

Force coordinate reference system (e.g., "EPSG:4326")

None

Returns:

Type Description
EcospaceGrid

Raises:

Type Description
ImportError

If geopandas is not installed

FileNotFoundError

If filepath does not exist

ValueError

If required fields are missing

Source code in pypath/spatial/gis_utils.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def load_spatial_grid(
    filepath: str,
    id_field: str = "id",
    area_field: Optional[str] = None,
    crs: Optional[str] = None,
) -> "EcospaceGrid":
    """Load spatial grid from shapefile or GeoJSON.

    Parameters
    ----------
    filepath : str
        Path to .shp, .geojson, or .gpkg file
    id_field : str
        Field containing unique patch IDs
    area_field : str, optional
        Field with pre-computed areas in km²
        If None, calculates from geometry
    crs : str, optional
        Force coordinate reference system (e.g., "EPSG:4326")

    Returns
    -------
    EcospaceGrid

    Raises
    ------
    ImportError
        If geopandas is not installed
    FileNotFoundError
        If filepath does not exist
    ValueError
        If required fields are missing
    """
    if not _GIS_AVAILABLE:
        raise ImportError(
            "geopandas is required for shapefile support. "
            "Install with: pip install geopandas shapely rtree"
        )

    # Import here to avoid circular imports
    from pypath.spatial.connectivity import build_adjacency_from_gdf
    from pypath.spatial.ecospace_params import EcospaceGrid

    # Load GeoDataFrame
    gdf = gpd.read_file(filepath)

    # Force CRS if specified
    if crs is not None:
        gdf = gdf.to_crs(crs)

    # Check for required fields
    if id_field not in gdf.columns:
        raise ValueError(
            f"Field '{id_field}' not found in shapefile. Available: {list(gdf.columns)}"
        )

    n_patches = len(gdf)
    patch_ids = gdf[id_field].values

    # Calculate areas if not provided
    if area_field is None:
        # Project to equal-area CRS for accurate area calculation
        # EPSG:3857 (Web Mercator) is reasonable for most applications
        # For global datasets, use appropriate equal-area projection
        gdf_area = gdf.to_crs("EPSG:3857")  # Units: meters
        areas_m2 = gdf_area.geometry.area
        patch_areas = areas_m2 / 1e6  # Convert to km²
    else:
        if area_field not in gdf.columns:
            raise ValueError(
                f"Area field '{area_field}' not found. Available: {list(gdf.columns)}"
            )
        patch_areas = gdf[area_field].values

    # Calculate centroids
    centroids = gdf.geometry.centroid
    patch_centroids = np.array([[c.x, c.y] for c in centroids])

    # Build adjacency matrix
    adjacency, edge_metadata = build_adjacency_from_gdf(gdf)

    return EcospaceGrid(
        n_patches=n_patches,
        patch_ids=patch_ids,
        patch_areas=patch_areas,
        patch_centroids=patch_centroids,
        adjacency_matrix=adjacency,
        edge_lengths=edge_metadata["border_lengths"],
        crs=gdf.crs.to_string() if gdf.crs else "EPSG:4326",
        geometry=gdf,
    )

Connectivity

pypath.spatial.connectivity

Connectivity and adjacency calculations for spatial grids.

Functions for building adjacency matrices from polygon geometries, calculating edge properties, and spatial indexing.

build_adjacency_from_gdf

build_adjacency_from_gdf(gdf: 'gpd.GeoDataFrame', method: str = 'rook') -> Tuple[scipy.sparse.csr_matrix, Dict]

Build adjacency matrix from GeoDataFrame.

Parameters:

Name Type Description Default
gdf GeoDataFrame

GeoDataFrame with polygon geometries

required
method str

Adjacency method: - "rook": Shared border (edge) required - "queen": Shared point or border

'rook'

Returns:

Name Type Description
adjacency csr_matrix

Sparse adjacency matrix [n_patches, n_patches] adjacency[i, j] = 1 if patches i and j are adjacent

metadata dict

Dictionary with: - 'border_lengths': Dict[(i, j)] = border length in km - 'method': adjacency method used

Raises:

Type Description
ImportError

If geopandas is not installed

Source code in pypath/spatial/connectivity.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def build_adjacency_from_gdf(
    gdf: "gpd.GeoDataFrame", method: str = "rook"
) -> Tuple[scipy.sparse.csr_matrix, Dict]:
    """Build adjacency matrix from GeoDataFrame.

    Parameters
    ----------
    gdf : gpd.GeoDataFrame
        GeoDataFrame with polygon geometries
    method : str
        Adjacency method:
        - "rook": Shared border (edge) required
        - "queen": Shared point or border

    Returns
    -------
    adjacency : scipy.sparse.csr_matrix
        Sparse adjacency matrix [n_patches, n_patches]
        adjacency[i, j] = 1 if patches i and j are adjacent
    metadata : dict
        Dictionary with:
        - 'border_lengths': Dict[(i, j)] = border length in km
        - 'method': adjacency method used

    Raises
    ------
    ImportError
        If geopandas is not installed
    """
    if not _GIS_AVAILABLE:
        raise ImportError("geopandas is required for adjacency calculations")

    n_patches = len(gdf)

    # Use spatial index for efficient neighbor queries
    sindex = gdf.sindex

    rows, cols = [], []
    border_lengths = {}

    for i, geom_i in enumerate(gdf.geometry):
        # Query spatial index for potential neighbors
        possible_neighbors = list(sindex.intersection(geom_i.bounds))

        for j in possible_neighbors:
            if i >= j:  # Skip self and avoid duplicates
                continue

            geom_j = gdf.geometry.iloc[j]

            # Check intersection based on method
            if method == "rook":
                # Must share a line (not just a point)
                intersection = geom_i.intersection(geom_j)
                if intersection.length > 0:
                    rows.extend([i, j])
                    cols.extend([j, i])
                    # Store border length (convert to km if in degrees)
                    border_length_deg = intersection.length
                    # Rough conversion: 1 degree ≈ 111 km at equator
                    border_length_km = border_length_deg * 111.0
                    border_lengths[(i, j)] = border_length_km

            elif method == "queen":
                # Shares point or border
                if geom_i.touches(geom_j) or geom_i.intersects(geom_j):
                    rows.extend([i, j])
                    cols.extend([j, i])
                    intersection = geom_i.intersection(geom_j)
                    border_length_deg = (
                        intersection.length if hasattr(intersection, "length") else 0
                    )
                    border_length_km = border_length_deg * 111.0
                    border_lengths[(i, j)] = border_length_km

            else:
                raise ValueError(f"Unknown adjacency method: {method}")

    # Create sparse adjacency matrix
    data = np.ones(len(rows))
    adjacency = scipy.sparse.csr_matrix(
        (data, (rows, cols)), shape=(n_patches, n_patches)
    )

    metadata = {"border_lengths": border_lengths, "method": method}

    return adjacency, metadata

build_distance_matrix

build_distance_matrix(grid: 'EcospaceGrid', method: str = 'haversine') -> np.ndarray

Build distance matrix between all patch pairs.

Parameters:

Name Type Description Default
grid EcospaceGrid

Spatial grid

required
method str

Distance calculation method: - "haversine": Great circle distance (accurate for lat/lon) - "euclidean": Euclidean distance (fast, approximate)

'haversine'

Returns:

Type Description
ndarray

Distance matrix [n_patches, n_patches] in km

Source code in pypath/spatial/connectivity.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def build_distance_matrix(
    grid: "EcospaceGrid", method: str = "haversine"
) -> np.ndarray:
    """Build distance matrix between all patch pairs.

    Parameters
    ----------
    grid : EcospaceGrid
        Spatial grid
    method : str
        Distance calculation method:
        - "haversine": Great circle distance (accurate for lat/lon)
        - "euclidean": Euclidean distance (fast, approximate)

    Returns
    -------
    np.ndarray
        Distance matrix [n_patches, n_patches] in km
    """
    _n_patches = grid.n_patches
    centroids = grid.patch_centroids

    if method == "haversine":
        # Pairwise haversine distances
        distances = np.zeros((_n_patches, _n_patches))
        for i in range(_n_patches):
            distances[i, :] = haversine_distance(
                centroids[i, 0], centroids[i, 1], centroids[:, 0], centroids[:, 1]
            )
        return distances

    elif method == "euclidean":
        # Simple Euclidean (degrees to km)
        return calculate_patch_distances(grid)

    else:
        raise ValueError(f"Unknown distance method: {method}")

calculate_patch_distances

calculate_patch_distances(grid: 'EcospaceGrid') -> np.ndarray

Calculate pairwise distances between patch centroids.

Parameters:

Name Type Description Default
grid EcospaceGrid

Spatial grid

required

Returns:

Type Description
ndarray

Distance matrix [n_patches, n_patches] in km

Source code in pypath/spatial/connectivity.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def calculate_patch_distances(grid: "EcospaceGrid") -> np.ndarray:
    """Calculate pairwise distances between patch centroids.

    Parameters
    ----------
    grid : EcospaceGrid
        Spatial grid

    Returns
    -------
    np.ndarray
        Distance matrix [n_patches, n_patches] in km
    """
    _n_patches = grid.n_patches
    centroids = grid.patch_centroids

    # Calculate Euclidean distances
    # For geographic coordinates, this is approximate
    # For more accurate distances, use haversine formula
    from scipy.spatial.distance import cdist

    # Vectorized distance calculation (much faster than nested loops)
    # Calculate all pairwise distances at once
    distances_deg = cdist(centroids, centroids, metric="euclidean")
    distances = distances_deg * 111.0  # Rough conversion from degrees to km

    return distances

find_k_nearest_neighbors

find_k_nearest_neighbors(grid: 'EcospaceGrid', k: int, method: str = 'haversine') -> np.ndarray

Find k nearest neighbors for each patch.

Parameters:

Name Type Description Default
grid EcospaceGrid

Spatial grid

required
k int

Number of nearest neighbors to find

required
method str

Distance calculation method

'haversine'

Returns:

Type Description
ndarray

Neighbor indices [n_patches, k] neighbors[i, :] = indices of k nearest patches to patch i

Source code in pypath/spatial/connectivity.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
def find_k_nearest_neighbors(
    grid: "EcospaceGrid", k: int, method: str = "haversine"
) -> np.ndarray:
    """Find k nearest neighbors for each patch.

    Parameters
    ----------
    grid : EcospaceGrid
        Spatial grid
    k : int
        Number of nearest neighbors to find
    method : str
        Distance calculation method

    Returns
    -------
    np.ndarray
        Neighbor indices [n_patches, k]
        neighbors[i, :] = indices of k nearest patches to patch i
    """
    distances = build_distance_matrix(grid, method=method)

    # For each patch, find k+1 nearest (including self)
    # Then exclude self
    n_patches = grid.n_patches
    neighbors = np.zeros((n_patches, k), dtype=int)

    for i in range(n_patches):
        # argsort gives indices from nearest to farthest
        sorted_indices = np.argsort(distances[i, :])
        # Exclude self (distance 0) and take next k
        neighbors[i, :] = sorted_indices[1 : k + 1]

    return neighbors

get_connectivity_graph_stats

get_connectivity_graph_stats(adjacency: csr_matrix) -> Dict

Calculate graph statistics from adjacency matrix.

Parameters:

Name Type Description Default
adjacency csr_matrix

Adjacency matrix

required

Returns:

Type Description
dict

Dictionary with: - 'n_nodes': Number of patches - 'n_edges': Number of edges (undirected) - 'mean_degree': Average number of neighbors - 'max_degree': Maximum number of neighbors - 'min_degree': Minimum number of neighbors - 'isolated_patches': Patches with no neighbors

Source code in pypath/spatial/connectivity.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
def get_connectivity_graph_stats(adjacency: scipy.sparse.csr_matrix) -> Dict:
    """Calculate graph statistics from adjacency matrix.

    Parameters
    ----------
    adjacency : scipy.sparse.csr_matrix
        Adjacency matrix

    Returns
    -------
    dict
        Dictionary with:
        - 'n_nodes': Number of patches
        - 'n_edges': Number of edges (undirected)
        - 'mean_degree': Average number of neighbors
        - 'max_degree': Maximum number of neighbors
        - 'min_degree': Minimum number of neighbors
        - 'isolated_patches': Patches with no neighbors
    """
    n_patches = adjacency.shape[0]

    # Degree of each node
    degrees = np.array(adjacency.sum(axis=1)).flatten()

    # Number of edges (each edge counted twice in adjacency, so divide by 2)
    n_edges = int(adjacency.nnz / 2)

    stats = {
        "n_nodes": n_patches,
        "n_edges": n_edges,
        "mean_degree": np.mean(degrees),
        "max_degree": int(np.max(degrees)),
        "min_degree": int(np.min(degrees)),
        "isolated_patches": np.where(degrees == 0)[0].tolist(),
    }

    return stats

haversine_distance

haversine_distance(lon1: ndarray, lat1: ndarray, lon2: ndarray, lat2: ndarray) -> np.ndarray

Calculate great circle distance between points.

Uses the haversine formula for accurate distances on a sphere.

Parameters:

Name Type Description Default
lon1 ndarray

Longitude and latitude of first point(s) in degrees

required
lat1 ndarray

Longitude and latitude of first point(s) in degrees

required
lon2 ndarray

Longitude and latitude of second point(s) in degrees

required
lat2 ndarray

Longitude and latitude of second point(s) in degrees

required

Returns:

Type Description
ndarray

Distance in kilometers

Source code in pypath/spatial/connectivity.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def haversine_distance(
    lon1: np.ndarray, lat1: np.ndarray, lon2: np.ndarray, lat2: np.ndarray
) -> np.ndarray:
    """Calculate great circle distance between points.

    Uses the haversine formula for accurate distances on a sphere.

    Parameters
    ----------
    lon1, lat1 : np.ndarray
        Longitude and latitude of first point(s) in degrees
    lon2, lat2 : np.ndarray
        Longitude and latitude of second point(s) in degrees

    Returns
    -------
    np.ndarray
        Distance in kilometers
    """
    # Convert to radians
    lon1_rad = np.radians(lon1)
    lat1_rad = np.radians(lat1)
    lon2_rad = np.radians(lon2)
    lat2_rad = np.radians(lat2)

    # Haversine formula
    dlon = lon2_rad - lon1_rad
    dlat = lat2_rad - lat1_rad

    a = (
        np.sin(dlat / 2) ** 2
        + np.cos(lat1_rad) * np.cos(lat2_rad) * np.sin(dlon / 2) ** 2
    )
    c = 2 * np.arcsin(np.sqrt(a))

    # Earth radius in km
    R = 6371.0

    return R * c

validate_adjacency_symmetry

validate_adjacency_symmetry(adjacency: csr_matrix) -> bool

Check if adjacency matrix is symmetric.

Parameters:

Name Type Description Default
adjacency csr_matrix

Adjacency matrix

required

Returns:

Type Description
bool

True if symmetric (within tolerance)

Source code in pypath/spatial/connectivity.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
def validate_adjacency_symmetry(adjacency: scipy.sparse.csr_matrix) -> bool:
    """Check if adjacency matrix is symmetric.

    Parameters
    ----------
    adjacency : scipy.sparse.csr_matrix
        Adjacency matrix

    Returns
    -------
    bool
        True if symmetric (within tolerance)
    """
    diff = adjacency - adjacency.T
    return diff.nnz == 0 or np.allclose(diff.data, 0)

Dispersal

pypath.spatial.dispersal

Dispersal and movement mechanics for ECOSPACE.

Implements spatial flux calculations: - Diffusion (Fick's law) - Habitat-directed advection - Gravity models (biomass-weighted movement) - Hybrid flux (external + model-calculated)

apply_external_flux

apply_external_flux(biomass_vector: ndarray, external_flux: ExternalFluxTimeseries, group_idx: int, t: float) -> np.ndarray

Apply externally provided flux matrix to biomass.

External flux can come from: - Ocean circulation models (ROMS, MITgcm, HYCOM) - Particle tracking (Ichthyop, OpenDrift, Parcels) - Connectivity matrices (genetic data, telemetry)

Parameters:

Name Type Description Default
biomass_vector ndarray

Biomass in each patch [n_patches]

required
external_flux ExternalFluxTimeseries

External flux data

required
group_idx int

Group index

required
t float

Simulation time (years)

required

Returns:

Type Description
ndarray

Net flux for each patch [n_patches]

Notes

flux_matrix[p, q] = flux from patch p to patch q net_flux[p] = Σ_q flux_matrix[q, p] - Σ_q flux_matrix[p, q] = inflow - outflow

Source code in pypath/spatial/dispersal.py
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
def apply_external_flux(
    biomass_vector: np.ndarray,
    external_flux: ExternalFluxTimeseries,
    group_idx: int,
    t: float,
) -> np.ndarray:
    """Apply externally provided flux matrix to biomass.

    External flux can come from:
    - Ocean circulation models (ROMS, MITgcm, HYCOM)
    - Particle tracking (Ichthyop, OpenDrift, Parcels)
    - Connectivity matrices (genetic data, telemetry)

    Parameters
    ----------
    biomass_vector : np.ndarray
        Biomass in each patch [n_patches]
    external_flux : ExternalFluxTimeseries
        External flux data
    group_idx : int
        Group index
    t : float
        Simulation time (years)

    Returns
    -------
    np.ndarray
        Net flux for each patch [n_patches]

    Notes
    -----
    flux_matrix[p, q] = flux from patch p to patch q
    net_flux[p] = Σ_q flux_matrix[q, p] - Σ_q flux_matrix[p, q]
                = inflow - outflow
    """
    # Get flux matrix at time t
    flux_matrix = external_flux.get_flux_at_time(t, group_idx)

    # Calculate net flux for each patch
    # Inflow: sum over columns (from all q to p)
    # Outflow: sum over rows (from p to all q)
    if scipy.sparse.issparse(flux_matrix):
        inflow = np.array(flux_matrix.sum(axis=0)).flatten()
        outflow = np.array(flux_matrix.sum(axis=1)).flatten()
    else:
        inflow = flux_matrix.sum(axis=0)
        outflow = flux_matrix.sum(axis=1)

    net_flux = inflow - outflow

    return net_flux

apply_flux_limiter

apply_flux_limiter(flux: ndarray, biomass: ndarray, dt: float = 1.0) -> np.ndarray

Apply flux limiter to prevent negative biomass.

Limits outflow so that biomass cannot go negative during the timestep.

Parameters:

Name Type Description Default
flux ndarray

Net flux [n_patches]

required
biomass ndarray

Current biomass [n_patches]

required
dt float

Timestep size (fraction of month, default: 1.0)

1.0

Returns:

Type Description
ndarray

Limited flux [n_patches]

Source code in pypath/spatial/dispersal.py
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
def apply_flux_limiter(
    flux: np.ndarray, biomass: np.ndarray, dt: float = 1.0
) -> np.ndarray:
    """Apply flux limiter to prevent negative biomass.

    Limits outflow so that biomass cannot go negative
    during the timestep.

    Parameters
    ----------
    flux : np.ndarray
        Net flux [n_patches]
    biomass : np.ndarray
        Current biomass [n_patches]
    dt : float
        Timestep size (fraction of month, default: 1.0)

    Returns
    -------
    np.ndarray
        Limited flux [n_patches]
    """
    limited_flux = flux.copy()

    # For patches with outflow, limit to available biomass
    outflow_mask = flux < 0
    max_outflow = biomass[outflow_mask] / dt

    # Limit outflow
    limited_flux[outflow_mask] = np.maximum(flux[outflow_mask], -max_outflow)

    return limited_flux

calculate_spatial_flux

calculate_spatial_flux(state: ndarray, ecospace: EcospaceParams, params: dict, t: float) -> np.ndarray

Calculate total spatial flux (diffusion + advection + external).

Priority order for each group: 1. If external_flux provided for group -> use external 2. Else if dispersal_rate > 0 -> calculate model flux 3. Else -> no movement for this group

Parameters:

Name Type Description Default
state ndarray

Spatial state [n_groups+1, n_patches]

required
ecospace EcospaceParams

Spatial parameters

required
params dict

Ecosim parameters

required
t float

Simulation time (years)

required

Returns:

Type Description
ndarray

Spatial flux [n_groups+1, n_patches] flux[g, p] = net flux for group g in patch p

Source code in pypath/spatial/dispersal.py
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
def calculate_spatial_flux(
    state: np.ndarray, ecospace: EcospaceParams, params: dict, t: float
) -> np.ndarray:
    """Calculate total spatial flux (diffusion + advection + external).

    Priority order for each group:
    1. If external_flux provided for group -> use external
    2. Else if dispersal_rate > 0 -> calculate model flux
    3. Else -> no movement for this group

    Parameters
    ----------
    state : np.ndarray
        Spatial state [n_groups+1, n_patches]
    ecospace : EcospaceParams
        Spatial parameters
    params : dict
        Ecosim parameters
    t : float
        Simulation time (years)

    Returns
    -------
    np.ndarray
        Spatial flux [n_groups+1, n_patches]
        flux[g, p] = net flux for group g in patch p
    """
    n_groups = state.shape[0]
    _n_patches = state.shape[1]
    flux = np.zeros_like(state, dtype=float)

    grid = ecospace.grid
    adj = ecospace.grid.adjacency_matrix

    # IBM groups manage their own spatial movement; skip standard dispersal
    ibm_groups = params.get("ibm_groups", {})

    # Calculate flux for each group
    for group_idx in range(1, n_groups):  # Skip index 0 (Outside/Detritus)
        # Skip IBM groups — they handle movement in compute_step Phase 5
        if group_idx in ibm_groups:
            continue

        # Ecospace parameters are indexed from 0, but group_idx starts at 1
        # So we need to subtract 1 when accessing ecospace arrays
        eco_idx = group_idx - 1

        # Check for external flux first
        if (
            ecospace.external_flux is not None
            and eco_idx in ecospace.external_flux.group_indices
        ):
            # Use external flux (from ocean models, particle tracking, etc.)
            flux[group_idx] = apply_external_flux(
                state[group_idx], ecospace.external_flux, eco_idx, t
            )

        # Otherwise use model-calculated dispersal
        elif ecospace.dispersal_rate[eco_idx] > 0:
            # Passive diffusion (Fick's law)
            flux[group_idx] = diffusion_flux(
                state[group_idx], ecospace.dispersal_rate[eco_idx], grid, adj
            )

            # Add habitat-directed movement if enabled
            if (
                ecospace.advection_enabled[eco_idx]
                and ecospace.gravity_strength[eco_idx] > 0
            ):
                flux[group_idx] += habitat_advection(
                    state[group_idx],
                    ecospace.habitat_preference[eco_idx],
                    ecospace.gravity_strength[eco_idx],
                    grid,
                    adj,
                )

    return flux

diffusion_flux

diffusion_flux(biomass_vector: ndarray, dispersal_rate: float, grid: EcospaceGrid, adjacency: csr_matrix) -> np.ndarray

Calculate diffusion flux using Fick's law.

Flux between adjacent patches follows: flux_pq = -D * (B_p - B_q) * (border_length / distance)

where D is the dispersal rate (diffusion coefficient).

Parameters:

Name Type Description Default
biomass_vector ndarray

Biomass in each patch [n_patches]

required
dispersal_rate float

Diffusion coefficient (km²/month) Typical values: 1-100 km²/month for fish

required
grid EcospaceGrid

Spatial grid configuration

required
adjacency csr_matrix

Adjacency matrix [n_patches, n_patches]

required

Returns:

Type Description
ndarray

Net flux for each patch [n_patches] - Positive = inflow (biomass increases) - Negative = outflow (biomass decreases) - Sum over all patches = 0 (conservation)

Source code in pypath/spatial/dispersal.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
def diffusion_flux(
    biomass_vector: np.ndarray,
    dispersal_rate: float,
    grid: EcospaceGrid,
    adjacency: scipy.sparse.csr_matrix,
) -> np.ndarray:
    """Calculate diffusion flux using Fick's law.

    Flux between adjacent patches follows:
        flux_pq = -D * (B_p - B_q) * (border_length / distance)

    where D is the dispersal rate (diffusion coefficient).

    Parameters
    ----------
    biomass_vector : np.ndarray
        Biomass in each patch [n_patches]
    dispersal_rate : float
        Diffusion coefficient (km²/month)
        Typical values: 1-100 km²/month for fish
    grid : EcospaceGrid
        Spatial grid configuration
    adjacency : scipy.sparse.csr_matrix
        Adjacency matrix [n_patches, n_patches]

    Returns
    -------
    np.ndarray
        Net flux for each patch [n_patches]
        - Positive = inflow (biomass increases)
        - Negative = outflow (biomass decreases)
        - Sum over all patches = 0 (conservation)
    """
    n_patches = len(biomass_vector)
    net_flux = np.zeros(n_patches)

    rows, cols, border_lengths, distances = _get_diffusion_edge_data(grid, adjacency)

    if len(rows) == 0:
        return net_flux

    # Use Numba-accelerated implementation when available
    if HAS_NUMBA:
        return _diffusion_flux_numba(
            biomass_vector,
            dispersal_rate,
            rows,
            cols,
            border_lengths,
            distances,
            n_patches,
        )

    # Vectorized gradient calculation
    gradients = biomass_vector[rows] - biomass_vector[cols]

    # Vectorized flux calculation
    flux_rates = dispersal_rate * border_lengths / distances
    flux_values = flux_rates * gradients

    # Accumulate fluxes using np.add.at (vectorized accumulation)
    np.add.at(net_flux, rows, -flux_values)  # Outflow from rows
    np.add.at(net_flux, cols, flux_values)  # Inflow to cols

    return net_flux

gravity_model_flux

gravity_model_flux(biomass_vector: ndarray, attractiveness: ndarray, gravity_strength: float, grid: EcospaceGrid, adjacency: csr_matrix, distance_decay: float = 1.0) -> np.ndarray

Calculate gravity model flux (biomass-weighted attraction).

Movement rate from patch i to patch j: flux_ij ∝ (biomass_i) * (attractiveness_j) / (distance_ij ^ decay)

Parameters:

Name Type Description Default
biomass_vector ndarray

Biomass in each patch [n_patches]

required
attractiveness ndarray

Attractiveness of each patch [n_patches] Could be: biomass (aggregation), habitat quality, resources

required
gravity_strength float

Overall movement rate

required
grid EcospaceGrid

Spatial grid

required
adjacency csr_matrix

Adjacency matrix

required
distance_decay float

Distance decay exponent (default: 1.0) Higher values = stronger distance penalty

1.0

Returns:

Type Description
ndarray

Net flux [n_patches]

Source code in pypath/spatial/dispersal.py
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
def gravity_model_flux(
    biomass_vector: np.ndarray,
    attractiveness: np.ndarray,
    gravity_strength: float,
    grid: EcospaceGrid,
    adjacency: scipy.sparse.csr_matrix,
    distance_decay: float = 1.0,
) -> np.ndarray:
    """Calculate gravity model flux (biomass-weighted attraction).

    Movement rate from patch i to patch j:
        flux_ij ∝ (biomass_i) * (attractiveness_j) / (distance_ij ^ decay)

    Parameters
    ----------
    biomass_vector : np.ndarray
        Biomass in each patch [n_patches]
    attractiveness : np.ndarray
        Attractiveness of each patch [n_patches]
        Could be: biomass (aggregation), habitat quality, resources
    gravity_strength : float
        Overall movement rate
    grid : EcospaceGrid
        Spatial grid
    adjacency : scipy.sparse.csr_matrix
        Adjacency matrix
    distance_decay : float
        Distance decay exponent (default: 1.0)
        Higher values = stronger distance penalty

    Returns
    -------
    np.ndarray
        Net flux [n_patches]
    """
    n_patches = len(biomass_vector)
    net_flux = np.zeros(n_patches)

    rows, cols = _get_upper_triangle_edges(adjacency)

    if len(rows) == 0:
        return net_flux

    # Use cached distance matrix
    from scipy.spatial.distance import cdist

    if not hasattr(grid, "_distance_matrix"):
        grid._distance_matrix = (
            cdist(grid.patch_centroids, grid.patch_centroids, metric="euclidean")
            * 111.0
        )

    distances = grid._distance_matrix[rows, cols]

    # Filter out zero distances
    valid_dist = distances > 0
    if not np.any(valid_dist):
        return net_flux

    rows = rows[valid_dist]
    cols = cols[valid_dist]
    distances = distances[valid_dist]

    # Use Numba-accelerated implementation when available
    if HAS_NUMBA:
        return _gravity_model_flux_numba(
            biomass_vector,
            attractiveness,
            gravity_strength,
            rows,
            cols,
            distances,
            distance_decay,
            n_patches,
        )

    # Vectorized gravity model calculations
    attractiveness_rows = attractiveness[rows]
    attractiveness_cols = attractiveness[cols]

    # Distance decay factor
    distance_factor = distances**distance_decay

    # Flux from rows to cols
    flux_ij = (
        gravity_strength * biomass_vector[rows] * attractiveness_cols / distance_factor
    )

    # Flux from cols to rows
    flux_ji = (
        gravity_strength * biomass_vector[cols] * attractiveness_rows / distance_factor
    )

    # Net flux (vectorized)
    net_fluxes = flux_ij - flux_ji

    # Accumulate using np.add.at
    np.add.at(net_flux, rows, -net_fluxes)
    np.add.at(net_flux, cols, net_fluxes)

    return net_flux

habitat_advection

habitat_advection(biomass_vector: ndarray, habitat_preference: ndarray, gravity_strength: float, grid: EcospaceGrid, adjacency: csr_matrix) -> np.ndarray

Calculate habitat-directed movement (advection).

Organisms move toward patches with higher habitat quality, proportional to biomass and habitat gradient.

Parameters:

Name Type Description Default
biomass_vector ndarray

Biomass in each patch [n_patches]

required
habitat_preference ndarray

Habitat quality [n_patches], values 0-1

required
gravity_strength float

Movement strength (0-1) 0 = no movement, 1 = strong habitat-seeking

required
grid EcospaceGrid

Spatial grid configuration

required
adjacency csr_matrix

Adjacency matrix

required

Returns:

Type Description
ndarray

Net flux for each patch [n_patches]

Source code in pypath/spatial/dispersal.py
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
def habitat_advection(
    biomass_vector: np.ndarray,
    habitat_preference: np.ndarray,
    gravity_strength: float,
    grid: EcospaceGrid,
    adjacency: scipy.sparse.csr_matrix,
) -> np.ndarray:
    """Calculate habitat-directed movement (advection).

    Organisms move toward patches with higher habitat quality,
    proportional to biomass and habitat gradient.

    Parameters
    ----------
    biomass_vector : np.ndarray
        Biomass in each patch [n_patches]
    habitat_preference : np.ndarray
        Habitat quality [n_patches], values 0-1
    gravity_strength : float
        Movement strength (0-1)
        0 = no movement, 1 = strong habitat-seeking
    grid : EcospaceGrid
        Spatial grid configuration
    adjacency : scipy.sparse.csr_matrix
        Adjacency matrix

    Returns
    -------
    np.ndarray
        Net flux for each patch [n_patches]
    """
    if gravity_strength <= 0:
        return np.zeros(len(biomass_vector))

    n_patches = len(biomass_vector)
    net_flux = np.zeros(n_patches)

    rows, cols = _get_upper_triangle_edges(adjacency)

    if len(rows) == 0:
        return net_flux

    # Use Numba-accelerated implementation when available
    if HAS_NUMBA:
        return _habitat_advection_numba(
            biomass_vector, habitat_preference, gravity_strength, rows, cols, n_patches
        )

    # Vectorized habitat gradient calculation
    habitat_gradients = habitat_preference[cols] - habitat_preference[rows]

    # Filter out negligible gradients
    significant = np.abs(habitat_gradients) >= 1e-10
    if not np.any(significant):
        return net_flux

    rows = rows[significant]
    cols = cols[significant]
    habitat_gradients = habitat_gradients[significant]

    # Vectorized movement calculation
    # Positive gradient: move from rows to cols
    # Negative gradient: move from cols to rows
    positive_grad = habitat_gradients > 0
    negative_grad = ~positive_grad

    # For positive gradients: move from p (rows) to q (cols)
    if np.any(positive_grad):
        movement_rates_pos = (
            gravity_strength
            * biomass_vector[rows[positive_grad]]
            * habitat_gradients[positive_grad]
        )
        np.add.at(net_flux, rows[positive_grad], -movement_rates_pos)
        np.add.at(net_flux, cols[positive_grad], movement_rates_pos)

    # For negative gradients: move from q (cols) to p (rows)
    if np.any(negative_grad):
        movement_rates_neg = (
            gravity_strength
            * biomass_vector[cols[negative_grad]]
            * np.abs(habitat_gradients[negative_grad])
        )
        np.add.at(net_flux, cols[negative_grad], -movement_rates_neg)
        np.add.at(net_flux, rows[negative_grad], movement_rates_neg)

    return net_flux

validate_flux_conservation

validate_flux_conservation(flux: ndarray, tolerance: float = 1e-08) -> bool

Validate that spatial flux conserves mass.

The sum of flux over all patches should be zero (no net creation or destruction of biomass).

Parameters:

Name Type Description Default
flux ndarray

Flux array [n_groups, n_patches] or [n_patches]

required
tolerance float

Numerical tolerance for zero (default: 1e-8)

1e-08

Returns:

Type Description
bool

True if flux is conserved (within tolerance)

Source code in pypath/spatial/dispersal.py
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
def validate_flux_conservation(flux: np.ndarray, tolerance: float = 1e-8) -> bool:
    """Validate that spatial flux conserves mass.

    The sum of flux over all patches should be zero
    (no net creation or destruction of biomass).

    Parameters
    ----------
    flux : np.ndarray
        Flux array [n_groups, n_patches] or [n_patches]
    tolerance : float
        Numerical tolerance for zero (default: 1e-8)

    Returns
    -------
    bool
        True if flux is conserved (within tolerance)
    """
    if flux.ndim == 1:
        # Single group
        total_flux = np.sum(flux)
        return abs(total_flux) < tolerance
    else:
        # Multiple groups
        total_flux = np.sum(flux, axis=1)
        return np.all(np.abs(total_flux) < tolerance)

Habitat Suitability

pypath.spatial.habitat

Habitat capacity and response functions for ECOSPACE.

Implements functions that map environmental conditions to habitat suitability: - Gaussian response (optimal value ± tolerance) - Threshold response (trapezoidal) - Linear response - Custom response functions - Multi-driver habitat capacity calculations

apply_habitat_preference_and_suitability

apply_habitat_preference_and_suitability(base_preference: ndarray, environmental_suitability: ndarray, combine_method: str = 'multiplicative') -> np.ndarray

Combine base habitat preference with environmental suitability.

Base preference represents intrinsic patch quality (structure, substrate), while environmental suitability represents dynamic factors (temperature).

Parameters:

Name Type Description Default
base_preference ndarray

Base habitat preference [n_patches], values in [0, 1]

required
environmental_suitability ndarray

Environmental suitability [n_patches], values in [0, 1]

required
combine_method str

How to combine: - "multiplicative": preference * suitability (default) - "minimum": min(preference, suitability) - "average": (preference + suitability) / 2

'multiplicative'

Returns:

Type Description
ndarray

Combined habitat quality [n_patches], values in [0, 1]

Examples:

>>> base_pref = np.array([1.0, 0.5, 0.8])  # Intrinsic quality
>>> env_suit = np.array([0.8, 1.0, 0.6])   # Environmental suitability
>>>
>>> # Multiplicative (strict)
>>> apply_habitat_preference_and_suitability(base_pref, env_suit, "multiplicative")
array([0.8 , 0.5 , 0.48])
>>>
>>> # Minimum (limiting factor)
>>> apply_habitat_preference_and_suitability(base_pref, env_suit, "minimum")
array([0.8, 0.5, 0.6])
Source code in pypath/spatial/habitat.py
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
def apply_habitat_preference_and_suitability(
    base_preference: np.ndarray,
    environmental_suitability: np.ndarray,
    combine_method: str = "multiplicative",
) -> np.ndarray:
    """Combine base habitat preference with environmental suitability.

    Base preference represents intrinsic patch quality (structure, substrate),
    while environmental suitability represents dynamic factors (temperature).

    Parameters
    ----------
    base_preference : np.ndarray
        Base habitat preference [n_patches], values in [0, 1]
    environmental_suitability : np.ndarray
        Environmental suitability [n_patches], values in [0, 1]
    combine_method : str
        How to combine:
        - "multiplicative": preference * suitability (default)
        - "minimum": min(preference, suitability)
        - "average": (preference + suitability) / 2

    Returns
    -------
    np.ndarray
        Combined habitat quality [n_patches], values in [0, 1]

    Examples
    --------
    >>> base_pref = np.array([1.0, 0.5, 0.8])  # Intrinsic quality
    >>> env_suit = np.array([0.8, 1.0, 0.6])   # Environmental suitability
    >>>
    >>> # Multiplicative (strict)
    >>> apply_habitat_preference_and_suitability(base_pref, env_suit, "multiplicative")
    array([0.8 , 0.5 , 0.48])
    >>>
    >>> # Minimum (limiting factor)
    >>> apply_habitat_preference_and_suitability(base_pref, env_suit, "minimum")
    array([0.8, 0.5, 0.6])
    """
    base_preference = np.asarray(base_preference, dtype=float)
    environmental_suitability = np.asarray(environmental_suitability, dtype=float)

    if base_preference.shape != environmental_suitability.shape:
        raise ValueError(
            f"Shape mismatch: base_preference {base_preference.shape} != "
            f"environmental_suitability {environmental_suitability.shape}"
        )

    if combine_method == "multiplicative":
        return base_preference * environmental_suitability

    elif combine_method == "minimum":
        return np.minimum(base_preference, environmental_suitability)

    elif combine_method == "average":
        return (base_preference + environmental_suitability) / 2

    else:
        raise ValueError(
            f"Unknown combine_method '{combine_method}'. "
            f"Must be one of: multiplicative, minimum, average"
        )

calculate_habitat_suitability

calculate_habitat_suitability(environmental_values: ndarray, response_functions: List[Callable], combine_method: str = 'multiplicative') -> np.ndarray

Calculate habitat suitability from multiple environmental drivers.

Combines multiple environmental responses into overall habitat suitability.

Parameters:

Name Type Description Default
environmental_values ndarray

Environmental values [n_patches, n_drivers]

required
response_functions list of callable

Response function for each driver Must match number of drivers

required
combine_method str

How to combine responses: - "multiplicative": product of all responses (default) - "minimum": minimum of all responses - "geometric_mean": geometric mean - "average": arithmetic mean

'multiplicative'

Returns:

Type Description
ndarray

Habitat suitability [n_patches], values in [0, 1]

Examples:

>>> # Temperature and depth responses
>>> env = np.array([
...     [10, 50],   # Patch 0: 10°C, 50m
...     [15, 100],  # Patch 1: 15°C, 100m
...     [8, 30]     # Patch 2: 8°C, 30m
... ])
>>>
>>> temp_response = create_gaussian_response(optimal_value=12, tolerance=4)
>>> depth_response = create_linear_response(min_value=0, max_value=200)
>>>
>>> suitability = calculate_habitat_suitability(
...     env,
...     [temp_response, depth_response],
...     combine_method="multiplicative"
... )
Source code in pypath/spatial/habitat.py
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
def calculate_habitat_suitability(
    environmental_values: np.ndarray,
    response_functions: List[Callable],
    combine_method: str = "multiplicative",
) -> np.ndarray:
    """Calculate habitat suitability from multiple environmental drivers.

    Combines multiple environmental responses into overall habitat suitability.

    Parameters
    ----------
    environmental_values : np.ndarray
        Environmental values [n_patches, n_drivers]
    response_functions : list of callable
        Response function for each driver
        Must match number of drivers
    combine_method : str
        How to combine responses:
        - "multiplicative": product of all responses (default)
        - "minimum": minimum of all responses
        - "geometric_mean": geometric mean
        - "average": arithmetic mean

    Returns
    -------
    np.ndarray
        Habitat suitability [n_patches], values in [0, 1]

    Examples
    --------
    >>> # Temperature and depth responses
    >>> env = np.array([
    ...     [10, 50],   # Patch 0: 10°C, 50m
    ...     [15, 100],  # Patch 1: 15°C, 100m
    ...     [8, 30]     # Patch 2: 8°C, 30m
    ... ])
    >>>
    >>> temp_response = create_gaussian_response(optimal_value=12, tolerance=4)
    >>> depth_response = create_linear_response(min_value=0, max_value=200)
    >>>
    >>> suitability = calculate_habitat_suitability(
    ...     env,
    ...     [temp_response, depth_response],
    ...     combine_method="multiplicative"
    ... )
    """
    environmental_values = np.asarray(environmental_values, dtype=float)

    # Handle 1D case (single patch)
    if environmental_values.ndim == 1:
        environmental_values = environmental_values.reshape(1, -1)

    n_patches, n_drivers = environmental_values.shape

    if len(response_functions) != n_drivers:
        raise ValueError(
            f"Number of response functions ({len(response_functions)}) "
            f"must match number of drivers ({n_drivers})"
        )

    # Calculate response for each driver
    responses = np.zeros((n_patches, n_drivers), dtype=float)

    for i, response_func in enumerate(response_functions):
        responses[:, i] = response_func(environmental_values[:, i])

    # Combine responses
    if combine_method == "multiplicative":
        # Product of all responses
        suitability = np.prod(responses, axis=1)

    elif combine_method == "minimum":
        # Limiting factor (minimum response)
        suitability = np.min(responses, axis=1)

    elif combine_method == "geometric_mean":
        # Geometric mean
        suitability = np.exp(np.mean(np.log(responses + 1e-10), axis=1))

    elif combine_method == "average":
        # Arithmetic mean
        suitability = np.mean(responses, axis=1)

    else:
        raise ValueError(
            f"Unknown combine_method '{combine_method}'. "
            f"Must be one of: multiplicative, minimum, geometric_mean, average"
        )

    return suitability

create_gaussian_response

create_gaussian_response(optimal_value: float, tolerance: float, min_value: Optional[float] = None, max_value: Optional[float] = None) -> Callable[[np.ndarray], np.ndarray]

Create Gaussian (normal) response function.

Habitat suitability peaks at optimal value and decreases with distance from optimum:

response(x) = exp(-((x - optimal) / tolerance)²)

Parameters:

Name Type Description Default
optimal_value float

Optimal environmental value (maximum suitability)

required
tolerance float

Tolerance range (standard deviation) Suitability = 0.607 at optimal ± tolerance

required
min_value float

Minimum valid environmental value (hard cutoff) Values below this return 0 suitability

None
max_value float

Maximum valid environmental value (hard cutoff)

None

Returns:

Type Description
callable

Response function: env_value -> suitability [0, 1]

Examples:

>>> # Cod prefer 8°C ± 4°C
>>> response = create_gaussian_response(optimal_value=8.0, tolerance=4.0)
>>> response(np.array([4, 8, 12, 16]))
array([0.60653066, 1.        , 0.60653066, 0.13533528])
>>> # With hard cutoffs
>>> response = create_gaussian_response(
...     optimal_value=15.0,
...     tolerance=5.0,
...     min_value=5.0,
...     max_value=25.0
... )
>>> response(np.array([0, 10, 15, 20, 30]))
array([0.        , 0.60653066, 1.        , 0.60653066, 0.        ])
Source code in pypath/spatial/habitat.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def create_gaussian_response(
    optimal_value: float,
    tolerance: float,
    min_value: Optional[float] = None,
    max_value: Optional[float] = None,
) -> Callable[[np.ndarray], np.ndarray]:
    """Create Gaussian (normal) response function.

    Habitat suitability peaks at optimal value and decreases
    with distance from optimum:

        response(x) = exp(-((x - optimal) / tolerance)²)

    Parameters
    ----------
    optimal_value : float
        Optimal environmental value (maximum suitability)
    tolerance : float
        Tolerance range (standard deviation)
        Suitability = 0.607 at optimal ± tolerance
    min_value : float, optional
        Minimum valid environmental value (hard cutoff)
        Values below this return 0 suitability
    max_value : float, optional
        Maximum valid environmental value (hard cutoff)

    Returns
    -------
    callable
        Response function: env_value -> suitability [0, 1]

    Examples
    --------
    >>> # Cod prefer 8°C ± 4°C
    >>> response = create_gaussian_response(optimal_value=8.0, tolerance=4.0)
    >>> response(np.array([4, 8, 12, 16]))
    array([0.60653066, 1.        , 0.60653066, 0.13533528])

    >>> # With hard cutoffs
    >>> response = create_gaussian_response(
    ...     optimal_value=15.0,
    ...     tolerance=5.0,
    ...     min_value=5.0,
    ...     max_value=25.0
    ... )
    >>> response(np.array([0, 10, 15, 20, 30]))
    array([0.        , 0.60653066, 1.        , 0.60653066, 0.        ])
    """

    def response_function(env_values: np.ndarray) -> np.ndarray:
        env_values = np.asarray(env_values, dtype=float)

        # Gaussian response
        suitability = np.exp(-(((env_values - optimal_value) / tolerance) ** 2))

        # Apply hard cutoffs if specified
        if min_value is not None:
            suitability[env_values < min_value] = 0.0

        if max_value is not None:
            suitability[env_values > max_value] = 0.0

        return suitability

    return response_function

create_linear_response

create_linear_response(min_value: float, max_value: float, increasing: bool = True) -> Callable[[np.ndarray], np.ndarray]

Create linear response function.

Suitability increases or decreases linearly with environmental value.

Parameters:

Name Type Description Default
min_value float

Environmental value where suitability = 0 (or 1 if decreasing)

required
max_value float

Environmental value where suitability = 1 (or 0 if decreasing)

required
increasing bool

If True, suitability increases with env value (default) If False, suitability decreases

True

Returns:

Type Description
callable

Response function: env_value -> suitability [0, 1]

Examples:

>>> # Deeper is better (increasing)
>>> response = create_linear_response(min_value=0, max_value=100, increasing=True)
>>> response(np.array([0, 50, 100]))
array([0. , 0.5, 1. ])
>>> # Shallower is better (decreasing)
>>> response = create_linear_response(min_value=0, max_value=100, increasing=False)
>>> response(np.array([0, 50, 100]))
array([1. , 0.5, 0. ])
Source code in pypath/spatial/habitat.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def create_linear_response(
    min_value: float, max_value: float, increasing: bool = True
) -> Callable[[np.ndarray], np.ndarray]:
    """Create linear response function.

    Suitability increases or decreases linearly with environmental value.

    Parameters
    ----------
    min_value : float
        Environmental value where suitability = 0 (or 1 if decreasing)
    max_value : float
        Environmental value where suitability = 1 (or 0 if decreasing)
    increasing : bool
        If True, suitability increases with env value (default)
        If False, suitability decreases

    Returns
    -------
    callable
        Response function: env_value -> suitability [0, 1]

    Examples
    --------
    >>> # Deeper is better (increasing)
    >>> response = create_linear_response(min_value=0, max_value=100, increasing=True)
    >>> response(np.array([0, 50, 100]))
    array([0. , 0.5, 1. ])

    >>> # Shallower is better (decreasing)
    >>> response = create_linear_response(min_value=0, max_value=100, increasing=False)
    >>> response(np.array([0, 50, 100]))
    array([1. , 0.5, 0. ])
    """
    if min_value >= max_value:
        raise ValueError(f"min_value ({min_value}) must be < max_value ({max_value})")

    def response_function(env_values: np.ndarray) -> np.ndarray:
        env_values = np.asarray(env_values, dtype=float)

        # Normalize to [0, 1]
        suitability = (env_values - min_value) / (max_value - min_value)

        # Clip to [0, 1]
        suitability = np.clip(suitability, 0.0, 1.0)

        # Invert if decreasing
        if not increasing:
            suitability = 1.0 - suitability

        return suitability

    return response_function

create_step_response

create_step_response(threshold: float, above_threshold: float = 1.0, below_threshold: float = 0.0) -> Callable[[np.ndarray], np.ndarray]

Create step (binary) response function.

Suitability is constant above/below threshold.

Parameters:

Name Type Description Default
threshold float

Threshold value

required
above_threshold float

Suitability when env >= threshold (default: 1.0)

1.0
below_threshold float

Suitability when env < threshold (default: 0.0)

0.0

Returns:

Type Description
callable

Response function: env_value -> suitability

Examples:

>>> # Requires minimum depth of 50m
>>> response = create_step_response(threshold=50, above_threshold=1.0, below_threshold=0.0)
>>> response(np.array([30, 50, 100]))
array([0., 1., 1.])
Source code in pypath/spatial/habitat.py
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def create_step_response(
    threshold: float, above_threshold: float = 1.0, below_threshold: float = 0.0
) -> Callable[[np.ndarray], np.ndarray]:
    """Create step (binary) response function.

    Suitability is constant above/below threshold.

    Parameters
    ----------
    threshold : float
        Threshold value
    above_threshold : float
        Suitability when env >= threshold (default: 1.0)
    below_threshold : float
        Suitability when env < threshold (default: 0.0)

    Returns
    -------
    callable
        Response function: env_value -> suitability

    Examples
    --------
    >>> # Requires minimum depth of 50m
    >>> response = create_step_response(threshold=50, above_threshold=1.0, below_threshold=0.0)
    >>> response(np.array([30, 50, 100]))
    array([0., 1., 1.])
    """

    def response_function(env_values: np.ndarray) -> np.ndarray:
        env_values = np.asarray(env_values, dtype=float)
        suitability = np.where(
            env_values >= threshold, above_threshold, below_threshold
        )
        return suitability

    return response_function

create_threshold_response

create_threshold_response(min_value: float, max_value: float, optimal_min: Optional[float] = None, optimal_max: Optional[float] = None) -> Callable[[np.ndarray], np.ndarray]

Create threshold (trapezoidal) response function.

Response forms a trapezoid: - 0 below min_value - Linear increase from min_value to optimal_min - 1 (optimal) from optimal_min to optimal_max - Linear decrease from optimal_max to max_value - 0 above max_value

If optimal_min/max not specified, response is triangular with peak at midpoint.

Parameters:

Name Type Description Default
min_value float

Minimum tolerable value (suitability = 0)

required
max_value float

Maximum tolerable value (suitability = 0)

required
optimal_min float

Start of optimal range (suitability = 1) Default: midpoint between min and max

None
optimal_max float

End of optimal range (suitability = 1) Default: same as optimal_min (triangular response)

None

Returns:

Type Description
callable

Response function: env_value -> suitability [0, 1]

Examples:

>>> # Herring tolerate 0-20°C, optimal 8-12°C
>>> response = create_threshold_response(
...     min_value=0.0,
...     max_value=20.0,
...     optimal_min=8.0,
...     optimal_max=12.0
... )
>>> response(np.array([-5, 0, 4, 10, 16, 20, 25]))
array([0. , 0. , 0.5, 1. , 0.5, 0. , 0. ])
>>> # Triangular response (no optimal plateau)
>>> response = create_threshold_response(min_value=5, max_value=25)
>>> response(np.array([5, 15, 25]))
array([0., 1., 0.])
Source code in pypath/spatial/habitat.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
def create_threshold_response(
    min_value: float,
    max_value: float,
    optimal_min: Optional[float] = None,
    optimal_max: Optional[float] = None,
) -> Callable[[np.ndarray], np.ndarray]:
    """Create threshold (trapezoidal) response function.

    Response forms a trapezoid:
    - 0 below min_value
    - Linear increase from min_value to optimal_min
    - 1 (optimal) from optimal_min to optimal_max
    - Linear decrease from optimal_max to max_value
    - 0 above max_value

    If optimal_min/max not specified, response is triangular
    with peak at midpoint.

    Parameters
    ----------
    min_value : float
        Minimum tolerable value (suitability = 0)
    max_value : float
        Maximum tolerable value (suitability = 0)
    optimal_min : float, optional
        Start of optimal range (suitability = 1)
        Default: midpoint between min and max
    optimal_max : float, optional
        End of optimal range (suitability = 1)
        Default: same as optimal_min (triangular response)

    Returns
    -------
    callable
        Response function: env_value -> suitability [0, 1]

    Examples
    --------
    >>> # Herring tolerate 0-20°C, optimal 8-12°C
    >>> response = create_threshold_response(
    ...     min_value=0.0,
    ...     max_value=20.0,
    ...     optimal_min=8.0,
    ...     optimal_max=12.0
    ... )
    >>> response(np.array([-5, 0, 4, 10, 16, 20, 25]))
    array([0. , 0. , 0.5, 1. , 0.5, 0. , 0. ])

    >>> # Triangular response (no optimal plateau)
    >>> response = create_threshold_response(min_value=5, max_value=25)
    >>> response(np.array([5, 15, 25]))
    array([0., 1., 0.])
    """
    # Set defaults for optimal range
    if optimal_min is None and optimal_max is None:
        # Triangular - peak at midpoint
        midpoint = (min_value + max_value) / 2
        optimal_min = midpoint
        optimal_max = midpoint
    elif optimal_min is None:
        optimal_min = optimal_max
    elif optimal_max is None:
        optimal_max = optimal_min

    # Validate
    if not (min_value <= optimal_min <= optimal_max <= max_value):
        raise ValueError(
            f"Must satisfy: min_value ({min_value}) <= "
            f"optimal_min ({optimal_min}) <= "
            f"optimal_max ({optimal_max}) <= "
            f"max_value ({max_value})"
        )

    def response_function(env_values: np.ndarray) -> np.ndarray:
        env_values = np.asarray(env_values, dtype=float)
        suitability = np.zeros_like(env_values, dtype=float)

        # Below minimum: 0
        # (already initialized to 0)

        # Rising edge: min_value to optimal_min
        if optimal_min > min_value:
            mask = (env_values >= min_value) & (env_values < optimal_min)
            suitability[mask] = (env_values[mask] - min_value) / (
                optimal_min - min_value
            )

        # Optimal plateau: optimal_min to optimal_max
        mask = (env_values >= optimal_min) & (env_values <= optimal_max)
        suitability[mask] = 1.0

        # Falling edge: optimal_max to max_value
        if max_value > optimal_max:
            mask = (env_values > optimal_max) & (env_values <= max_value)
            suitability[mask] = (max_value - env_values[mask]) / (
                max_value - optimal_max
            )

        # Above maximum: 0
        # (already initialized to 0)

        return suitability

    return response_function

Environmental Drivers

pypath.spatial.environmental

Environmental drivers for ECOSPACE.

Implements time-varying spatial environmental fields: - Temperature, salinity, depth, currents - Multiple environmental layers - Temporal interpolation - Integration with habitat capacity models

EnvironmentalDrivers

Manager for multiple environmental layers.

Coordinates multiple environmental variables and provides combined environmental state for habitat calculations.

Parameters:

Name Type Description Default
layers dict

Dictionary of EnvironmentalLayer objects {name: layer}

None

Examples:

>>> drivers = EnvironmentalDrivers()
>>> drivers.add_layer(temp_layer)
>>> drivers.add_layer(depth_layer)
>>>
>>> # Get all drivers at specific time
>>> env = drivers.get_drivers_at_time(t=0.5)  # [n_patches, n_layers]
>>>
>>> # Get specific layer
>>> temp = drivers.get_layer_at_time('temperature', t=0.5)  # [n_patches]
Source code in pypath/spatial/environmental.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
class EnvironmentalDrivers:
    """Manager for multiple environmental layers.

    Coordinates multiple environmental variables and provides
    combined environmental state for habitat calculations.

    Parameters
    ----------
    layers : dict, optional
        Dictionary of EnvironmentalLayer objects {name: layer}

    Examples
    --------
    >>> drivers = EnvironmentalDrivers()
    >>> drivers.add_layer(temp_layer)
    >>> drivers.add_layer(depth_layer)
    >>>
    >>> # Get all drivers at specific time
    >>> env = drivers.get_drivers_at_time(t=0.5)  # [n_patches, n_layers]
    >>>
    >>> # Get specific layer
    >>> temp = drivers.get_layer_at_time('temperature', t=0.5)  # [n_patches]
    """

    def __init__(self, layers: Optional[Dict[str, EnvironmentalLayer]] = None):
        """Initialize environmental drivers."""
        self.layers = layers if layers is not None else {}
        self._validate_layers()

    def _validate_layers(self):
        """Validate that all layers have same number of patches."""
        if not self.layers:
            return

        n_patches_list = [layer.n_patches for layer in self.layers.values()]

        if len(set(n_patches_list)) > 1:
            raise ValueError(
                f"All layers must have same n_patches. Got: "
                f"{dict(zip(self.layers.keys(), n_patches_list))}"
            )

    @property
    def n_patches(self) -> int:
        """Number of spatial patches."""
        if not self.layers:
            return 0
        return next(iter(self.layers.values())).n_patches

    @property
    def n_layers(self) -> int:
        """Number of environmental layers."""
        return len(self.layers)

    @property
    def layer_names(self) -> list:
        """Names of all environmental layers."""
        return list(self.layers.keys())

    def add_layer(self, layer: EnvironmentalLayer):
        """Add environmental layer.

        Parameters
        ----------
        layer : EnvironmentalLayer
            Environmental layer to add

        Raises
        ------
        ValueError
            If layer name already exists or n_patches doesn't match
        """
        if layer.name in self.layers:
            raise ValueError(f"Layer '{layer.name}' already exists")

        # Check n_patches compatibility
        if self.layers and layer.n_patches != self.n_patches:
            raise ValueError(
                f"Layer '{layer.name}' has {layer.n_patches} patches, "
                f"but existing layers have {self.n_patches} patches"
            )

        self.layers[layer.name] = layer

    def remove_layer(self, name: str):
        """Remove environmental layer.

        Parameters
        ----------
        name : str
            Name of layer to remove

        Raises
        ------
        KeyError
            If layer doesn't exist
        """
        if name not in self.layers:
            raise KeyError(f"Layer '{name}' not found")

        del self.layers[name]

    def get_layer_at_time(self, name: str, t: float) -> np.ndarray:
        """Get specific layer values at time t.

        Parameters
        ----------
        name : str
            Layer name
        t : float
            Time (years)

        Returns
        -------
        np.ndarray
            Environmental values [n_patches]
        """
        if name not in self.layers:
            raise KeyError(f"Layer '{name}' not found")

        return self.layers[name].get_value_at_time(t)

    def get_drivers_at_time(
        self, t: float, layer_names: Optional[list] = None
    ) -> np.ndarray:
        """Get all environmental drivers at time t.

        Parameters
        ----------
        t : float
            Time (years)
        layer_names : list, optional
            Specific layers to include (default: all layers)
            Order matters - returned array will match this order

        Returns
        -------
        np.ndarray
            Environmental drivers [n_patches, n_layers]
        """
        if not self.layers:
            return np.array([]).reshape(0, 0)

        # Use all layers if not specified
        if layer_names is None:
            layer_names = self.layer_names

        # Validate layer names
        for name in layer_names:
            if name not in self.layers:
                raise KeyError(f"Layer '{name}' not found")

        # Stack all layers
        drivers = np.column_stack(
            [self.layers[name].get_value_at_time(t) for name in layer_names]
        )

        return drivers

    def get_statistics(self) -> Dict[str, Dict]:
        """Get statistics for all layers.

        Returns
        -------
        dict
            {layer_name: statistics_dict}
        """
        return {name: layer.get_statistics() for name, layer in self.layers.items()}

    def get_time_range(self) -> Tuple[float, float]:
        """Get overall time range across all layers.

        Returns
        -------
        tuple
            (min_time, max_time)
        """
        if not self.layers:
            return (0.0, 0.0)

        min_times = []
        max_times = []

        for layer in self.layers.values():
            if layer.is_time_varying:
                min_times.append(layer.times[0])
                max_times.append(layer.times[-1])

        if not min_times:
            # No time-varying layers
            return (0.0, 0.0)

        return (min(min_times), max(max_times))
layer_names property
layer_names: list

Names of all environmental layers.

n_layers property
n_layers: int

Number of environmental layers.

n_patches property
n_patches: int

Number of spatial patches.

__init__
__init__(layers: Optional[Dict[str, EnvironmentalLayer]] = None)

Initialize environmental drivers.

Source code in pypath/spatial/environmental.py
192
193
194
195
def __init__(self, layers: Optional[Dict[str, EnvironmentalLayer]] = None):
    """Initialize environmental drivers."""
    self.layers = layers if layers is not None else {}
    self._validate_layers()
add_layer
add_layer(layer: EnvironmentalLayer)

Add environmental layer.

Parameters:

Name Type Description Default
layer EnvironmentalLayer

Environmental layer to add

required

Raises:

Type Description
ValueError

If layer name already exists or n_patches doesn't match

Source code in pypath/spatial/environmental.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
def add_layer(self, layer: EnvironmentalLayer):
    """Add environmental layer.

    Parameters
    ----------
    layer : EnvironmentalLayer
        Environmental layer to add

    Raises
    ------
    ValueError
        If layer name already exists or n_patches doesn't match
    """
    if layer.name in self.layers:
        raise ValueError(f"Layer '{layer.name}' already exists")

    # Check n_patches compatibility
    if self.layers and layer.n_patches != self.n_patches:
        raise ValueError(
            f"Layer '{layer.name}' has {layer.n_patches} patches, "
            f"but existing layers have {self.n_patches} patches"
        )

    self.layers[layer.name] = layer
get_drivers_at_time
get_drivers_at_time(t: float, layer_names: Optional[list] = None) -> np.ndarray

Get all environmental drivers at time t.

Parameters:

Name Type Description Default
t float

Time (years)

required
layer_names list

Specific layers to include (default: all layers) Order matters - returned array will match this order

None

Returns:

Type Description
ndarray

Environmental drivers [n_patches, n_layers]

Source code in pypath/spatial/environmental.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
def get_drivers_at_time(
    self, t: float, layer_names: Optional[list] = None
) -> np.ndarray:
    """Get all environmental drivers at time t.

    Parameters
    ----------
    t : float
        Time (years)
    layer_names : list, optional
        Specific layers to include (default: all layers)
        Order matters - returned array will match this order

    Returns
    -------
    np.ndarray
        Environmental drivers [n_patches, n_layers]
    """
    if not self.layers:
        return np.array([]).reshape(0, 0)

    # Use all layers if not specified
    if layer_names is None:
        layer_names = self.layer_names

    # Validate layer names
    for name in layer_names:
        if name not in self.layers:
            raise KeyError(f"Layer '{name}' not found")

    # Stack all layers
    drivers = np.column_stack(
        [self.layers[name].get_value_at_time(t) for name in layer_names]
    )

    return drivers
get_layer_at_time
get_layer_at_time(name: str, t: float) -> np.ndarray

Get specific layer values at time t.

Parameters:

Name Type Description Default
name str

Layer name

required
t float

Time (years)

required

Returns:

Type Description
ndarray

Environmental values [n_patches]

Source code in pypath/spatial/environmental.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
def get_layer_at_time(self, name: str, t: float) -> np.ndarray:
    """Get specific layer values at time t.

    Parameters
    ----------
    name : str
        Layer name
    t : float
        Time (years)

    Returns
    -------
    np.ndarray
        Environmental values [n_patches]
    """
    if name not in self.layers:
        raise KeyError(f"Layer '{name}' not found")

    return self.layers[name].get_value_at_time(t)
get_statistics
get_statistics() -> Dict[str, Dict]

Get statistics for all layers.

Returns:

Type Description
dict

{layer_name: statistics_dict}

Source code in pypath/spatial/environmental.py
327
328
329
330
331
332
333
334
335
def get_statistics(self) -> Dict[str, Dict]:
    """Get statistics for all layers.

    Returns
    -------
    dict
        {layer_name: statistics_dict}
    """
    return {name: layer.get_statistics() for name, layer in self.layers.items()}
get_time_range
get_time_range() -> Tuple[float, float]

Get overall time range across all layers.

Returns:

Type Description
tuple

(min_time, max_time)

Source code in pypath/spatial/environmental.py
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
def get_time_range(self) -> Tuple[float, float]:
    """Get overall time range across all layers.

    Returns
    -------
    tuple
        (min_time, max_time)
    """
    if not self.layers:
        return (0.0, 0.0)

    min_times = []
    max_times = []

    for layer in self.layers.values():
        if layer.is_time_varying:
            min_times.append(layer.times[0])
            max_times.append(layer.times[-1])

    if not min_times:
        # No time-varying layers
        return (0.0, 0.0)

    return (min(min_times), max(max_times))
remove_layer
remove_layer(name: str)

Remove environmental layer.

Parameters:

Name Type Description Default
name str

Name of layer to remove

required

Raises:

Type Description
KeyError

If layer doesn't exist

Source code in pypath/spatial/environmental.py
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
def remove_layer(self, name: str):
    """Remove environmental layer.

    Parameters
    ----------
    name : str
        Name of layer to remove

    Raises
    ------
    KeyError
        If layer doesn't exist
    """
    if name not in self.layers:
        raise KeyError(f"Layer '{name}' not found")

    del self.layers[name]

EnvironmentalLayer dataclass

Time-varying spatial environmental field.

Represents a single environmental variable (temperature, depth, etc.) that varies across patches and potentially over time.

Parameters:

Name Type Description Default
name str

Variable name (e.g., "temperature", "depth", "salinity")

required
units str

Units of measurement (e.g., "celsius", "meters", "psu")

required
values ndarray

Environmental values [n_timesteps, n_patches] or [n_patches] If 1D, assumed constant over time

required
times ndarray

Time points corresponding to values (years) Required if values is 2D

None
interpolate bool

Whether to interpolate between timesteps (default: True)

True

Examples:

>>> # Constant depth layer
>>> depth = EnvironmentalLayer(
...     name='depth',
...     units='meters',
...     values=np.array([10, 20, 30, 40, 50])
... )
>>> # Time-varying temperature
>>> temp = EnvironmentalLayer(
...     name='temperature',
...     units='celsius',
...     values=np.array([[5, 6, 7], [10, 12, 14], [8, 9, 10]]),
...     times=np.array([0.0, 0.5, 1.0])
... )
>>> temp.get_value_at_time(0.25)  # Interpolates between t=0 and t=0.5
array([7.5, 9., 10.5])
Source code in pypath/spatial/environmental.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
@dataclass
class EnvironmentalLayer:
    """Time-varying spatial environmental field.

    Represents a single environmental variable (temperature, depth, etc.)
    that varies across patches and potentially over time.

    Parameters
    ----------
    name : str
        Variable name (e.g., "temperature", "depth", "salinity")
    units : str
        Units of measurement (e.g., "celsius", "meters", "psu")
    values : np.ndarray
        Environmental values [n_timesteps, n_patches] or [n_patches]
        If 1D, assumed constant over time
    times : np.ndarray, optional
        Time points corresponding to values (years)
        Required if values is 2D
    interpolate : bool
        Whether to interpolate between timesteps (default: True)

    Examples
    --------
    >>> # Constant depth layer
    >>> depth = EnvironmentalLayer(
    ...     name='depth',
    ...     units='meters',
    ...     values=np.array([10, 20, 30, 40, 50])
    ... )

    >>> # Time-varying temperature
    >>> temp = EnvironmentalLayer(
    ...     name='temperature',
    ...     units='celsius',
    ...     values=np.array([[5, 6, 7], [10, 12, 14], [8, 9, 10]]),
    ...     times=np.array([0.0, 0.5, 1.0])
    ... )
    >>> temp.get_value_at_time(0.25)  # Interpolates between t=0 and t=0.5
    array([7.5, 9., 10.5])
    """

    name: str
    units: str
    values: np.ndarray
    times: Optional[np.ndarray] = None
    interpolate: bool = True

    def __post_init__(self):
        """Validate layer after initialization."""
        self.values = np.asarray(self.values, dtype=float)

        # Handle 1D vs 2D values
        if self.values.ndim == 1:
            # Constant over time
            self.n_patches = len(self.values)
            self.n_timesteps = 1
            self.is_time_varying = False

        elif self.values.ndim == 2:
            # Time-varying
            self.n_timesteps, self.n_patches = self.values.shape
            self.is_time_varying = True

            # Require times for time-varying data
            if self.times is None:
                raise ValueError(
                    f"Layer '{self.name}': times required for time-varying values"
                )

            self.times = np.asarray(self.times, dtype=float)

            if len(self.times) != self.n_timesteps:
                raise ValueError(
                    f"Layer '{self.name}': times length ({len(self.times)}) != "
                    f"n_timesteps ({self.n_timesteps})"
                )
        else:
            raise ValueError(
                f"Layer '{self.name}': values must be 1D [n_patches] or 2D [n_timesteps, n_patches], "
                f"got {self.values.ndim}D"
            )

    def get_value_at_time(self, t: float) -> np.ndarray:
        """Get environmental values at time t.

        Parameters
        ----------
        t : float
            Time (years)

        Returns
        -------
        np.ndarray
            Environmental values [n_patches]
        """
        if not self.is_time_varying:
            # Constant over time
            return self.values.copy()

        # Time-varying - interpolate if requested
        if not self.interpolate:
            # Find nearest timestep
            idx = np.argmin(np.abs(self.times - t))
            return self.values[idx].copy()

        # Linear interpolation
        if t <= self.times[0]:
            return self.values[0].copy()

        if t >= self.times[-1]:
            return self.values[-1].copy()

        # Find bracketing timesteps
        idx_after = np.searchsorted(self.times, t)
        idx_before = idx_after - 1

        t_before = self.times[idx_before]
        t_after = self.times[idx_after]

        # Linear interpolation weight
        weight = (t - t_before) / (t_after - t_before)

        return (1 - weight) * self.values[idx_before] + weight * self.values[idx_after]

    def get_statistics(self) -> Dict[str, float]:
        """Get summary statistics for this layer.

        Returns
        -------
        dict
            Statistics: min, max, mean, std
        """
        return {
            "name": self.name,
            "units": self.units,
            "min": float(np.min(self.values)),
            "max": float(np.max(self.values)),
            "mean": float(np.mean(self.values)),
            "std": float(np.std(self.values)),
            "n_patches": self.n_patches,
            "n_timesteps": self.n_timesteps,
            "is_time_varying": self.is_time_varying,
        }
__post_init__
__post_init__()

Validate layer after initialization.

Source code in pypath/spatial/environmental.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def __post_init__(self):
    """Validate layer after initialization."""
    self.values = np.asarray(self.values, dtype=float)

    # Handle 1D vs 2D values
    if self.values.ndim == 1:
        # Constant over time
        self.n_patches = len(self.values)
        self.n_timesteps = 1
        self.is_time_varying = False

    elif self.values.ndim == 2:
        # Time-varying
        self.n_timesteps, self.n_patches = self.values.shape
        self.is_time_varying = True

        # Require times for time-varying data
        if self.times is None:
            raise ValueError(
                f"Layer '{self.name}': times required for time-varying values"
            )

        self.times = np.asarray(self.times, dtype=float)

        if len(self.times) != self.n_timesteps:
            raise ValueError(
                f"Layer '{self.name}': times length ({len(self.times)}) != "
                f"n_timesteps ({self.n_timesteps})"
            )
    else:
        raise ValueError(
            f"Layer '{self.name}': values must be 1D [n_patches] or 2D [n_timesteps, n_patches], "
            f"got {self.values.ndim}D"
        )
get_statistics
get_statistics() -> Dict[str, float]

Get summary statistics for this layer.

Returns:

Type Description
dict

Statistics: min, max, mean, std

Source code in pypath/spatial/environmental.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def get_statistics(self) -> Dict[str, float]:
    """Get summary statistics for this layer.

    Returns
    -------
    dict
        Statistics: min, max, mean, std
    """
    return {
        "name": self.name,
        "units": self.units,
        "min": float(np.min(self.values)),
        "max": float(np.max(self.values)),
        "mean": float(np.mean(self.values)),
        "std": float(np.std(self.values)),
        "n_patches": self.n_patches,
        "n_timesteps": self.n_timesteps,
        "is_time_varying": self.is_time_varying,
    }
get_value_at_time
get_value_at_time(t: float) -> np.ndarray

Get environmental values at time t.

Parameters:

Name Type Description Default
t float

Time (years)

required

Returns:

Type Description
ndarray

Environmental values [n_patches]

Source code in pypath/spatial/environmental.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def get_value_at_time(self, t: float) -> np.ndarray:
    """Get environmental values at time t.

    Parameters
    ----------
    t : float
        Time (years)

    Returns
    -------
    np.ndarray
        Environmental values [n_patches]
    """
    if not self.is_time_varying:
        # Constant over time
        return self.values.copy()

    # Time-varying - interpolate if requested
    if not self.interpolate:
        # Find nearest timestep
        idx = np.argmin(np.abs(self.times - t))
        return self.values[idx].copy()

    # Linear interpolation
    if t <= self.times[0]:
        return self.values[0].copy()

    if t >= self.times[-1]:
        return self.values[-1].copy()

    # Find bracketing timesteps
    idx_after = np.searchsorted(self.times, t)
    idx_before = idx_after - 1

    t_before = self.times[idx_before]
    t_after = self.times[idx_after]

    # Linear interpolation weight
    weight = (t - t_before) / (t_after - t_before)

    return (1 - weight) * self.values[idx_before] + weight * self.values[idx_after]

create_constant_layer

create_constant_layer(name: str, values: ndarray, units: str = '') -> EnvironmentalLayer

Create constant (time-invariant) environmental layer.

Parameters:

Name Type Description Default
name str

Layer name (e.g., "depth", "slope")

required
values ndarray

Environmental values [n_patches]

required
units str

Units of measurement

''

Returns:

Type Description
EnvironmentalLayer
Source code in pypath/spatial/environmental.py
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
def create_constant_layer(
    name: str, values: np.ndarray, units: str = ""
) -> EnvironmentalLayer:
    """Create constant (time-invariant) environmental layer.

    Parameters
    ----------
    name : str
        Layer name (e.g., "depth", "slope")
    values : np.ndarray
        Environmental values [n_patches]
    units : str
        Units of measurement

    Returns
    -------
    EnvironmentalLayer
    """
    return EnvironmentalLayer(
        name=name,
        units=units,
        values=np.asarray(values, dtype=float),
        times=None,
        interpolate=False,
    )

create_seasonal_temperature

create_seasonal_temperature(baseline_temp: ndarray, amplitude: float = 10.0, n_months: int = 12) -> EnvironmentalLayer

Create seasonal temperature variation.

Temperature follows sinusoidal pattern: T(month) = baseline + amplitude * sin(2π * month / 12)

Parameters:

Name Type Description Default
baseline_temp ndarray

Baseline temperature for each patch [n_patches]

required
amplitude float

Seasonal amplitude (default: 10°C)

10.0
n_months int

Number of monthly timesteps (default: 12)

12

Returns:

Type Description
EnvironmentalLayer

Time-varying temperature layer

Examples:

>>> baseline = np.array([15, 18, 20])  # Baseline temps
>>> temp = create_seasonal_temperature(baseline, amplitude=8.0)
>>> # Winter (t=0): ~7-12°C
>>> # Summer (t=0.5): ~23-28°C
Source code in pypath/spatial/environmental.py
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
def create_seasonal_temperature(
    baseline_temp: np.ndarray, amplitude: float = 10.0, n_months: int = 12
) -> EnvironmentalLayer:
    """Create seasonal temperature variation.

    Temperature follows sinusoidal pattern:
        T(month) = baseline + amplitude * sin(2π * month / 12)

    Parameters
    ----------
    baseline_temp : np.ndarray
        Baseline temperature for each patch [n_patches]
    amplitude : float
        Seasonal amplitude (default: 10°C)
    n_months : int
        Number of monthly timesteps (default: 12)

    Returns
    -------
    EnvironmentalLayer
        Time-varying temperature layer

    Examples
    --------
    >>> baseline = np.array([15, 18, 20])  # Baseline temps
    >>> temp = create_seasonal_temperature(baseline, amplitude=8.0)
    >>> # Winter (t=0): ~7-12°C
    >>> # Summer (t=0.5): ~23-28°C
    """
    baseline_temp = np.asarray(baseline_temp, dtype=float)
    _n_patches = len(baseline_temp)

    times = np.arange(n_months) / 12.0  # Monthly timesteps in years

    # Seasonal pattern (peak in summer, month 6)
    seasonal = amplitude * np.sin(2 * np.pi * (times - 0.25))

    # Apply to each patch
    values = baseline_temp[np.newaxis, :] + seasonal[:, np.newaxis]

    return EnvironmentalLayer(
        name="temperature",
        units="celsius",
        values=values,
        times=times,
        interpolate=True,
    )

External Flux

pypath.spatial.external_flux

External flux loading and validation.

Functions for loading flux timeseries from: - NetCDF files (ocean models: ROMS, MITgcm, HYCOM) - CSV files (connectivity matrices, telemetry data) - Numpy arrays (pre-computed flux)

convert_connectivity_to_flux

convert_connectivity_to_flux(connectivity_matrix: ndarray, biomass: ndarray) -> np.ndarray

Convert connectivity matrix to flux matrix.

Connectivity represents proportions (0-1), while flux represents actual biomass movement.

Parameters:

Name Type Description Default
connectivity_matrix ndarray

Connectivity proportions [n_patches, n_patches] connectivity[i, j] = fraction of biomass in i that moves to j

required
biomass ndarray

Biomass in each patch [n_patches]

required

Returns:

Type Description
ndarray

Flux matrix [n_patches, n_patches] flux[i, j] = biomass moving from i to j

Source code in pypath/spatial/external_flux.py
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
def convert_connectivity_to_flux(
    connectivity_matrix: np.ndarray, biomass: np.ndarray
) -> np.ndarray:
    """Convert connectivity matrix to flux matrix.

    Connectivity represents proportions (0-1), while flux represents
    actual biomass movement.

    Parameters
    ----------
    connectivity_matrix : np.ndarray
        Connectivity proportions [n_patches, n_patches]
        connectivity[i, j] = fraction of biomass in i that moves to j
    biomass : np.ndarray
        Biomass in each patch [n_patches]

    Returns
    -------
    np.ndarray
        Flux matrix [n_patches, n_patches]
        flux[i, j] = biomass moving from i to j
    """
    flux_matrix = connectivity_matrix * biomass[:, np.newaxis]
    np.fill_diagonal(flux_matrix, 0.0)
    return flux_matrix

create_flux_from_connectivity_matrix

create_flux_from_connectivity_matrix(connectivity_matrix: ndarray, times: Optional[ndarray] = None, seasonal_pattern: Optional[ndarray] = None) -> 'ExternalFluxTimeseries'

Create flux timeseries from connectivity matrix.

Connectivity matrix represents proportion of individuals/biomass moving from patch i to patch j per timestep.

Parameters:

Name Type Description Default
connectivity_matrix ndarray

Connectivity matrix [n_patches, n_patches] connectivity[i, j] = proportion moving from i to j

required
times ndarray

Time points (years). If None, uses monthly for 1 year

None
seasonal_pattern ndarray

Seasonal variation in connectivity strength [n_timesteps] If None, assumes constant connectivity

None

Returns:

Type Description
ExternalFluxTimeseries
Source code in pypath/spatial/external_flux.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def create_flux_from_connectivity_matrix(
    connectivity_matrix: np.ndarray,
    times: Optional[np.ndarray] = None,
    seasonal_pattern: Optional[np.ndarray] = None,
) -> "ExternalFluxTimeseries":
    """Create flux timeseries from connectivity matrix.

    Connectivity matrix represents proportion of individuals/biomass
    moving from patch i to patch j per timestep.

    Parameters
    ----------
    connectivity_matrix : np.ndarray
        Connectivity matrix [n_patches, n_patches]
        connectivity[i, j] = proportion moving from i to j
    times : np.ndarray, optional
        Time points (years). If None, uses monthly for 1 year
    seasonal_pattern : np.ndarray, optional
        Seasonal variation in connectivity strength [n_timesteps]
        If None, assumes constant connectivity

    Returns
    -------
    ExternalFluxTimeseries
    """
    from pypath.spatial.ecospace_params import ExternalFluxTimeseries

    n_patches = connectivity_matrix.shape[0]

    # Validate connectivity matrix
    if connectivity_matrix.shape != (n_patches, n_patches):
        raise ValueError(
            f"Connectivity matrix must be square, got {connectivity_matrix.shape}"
        )

    # Default times: monthly for 1 year
    if times is None:
        n_timesteps = 12
        times = np.arange(n_timesteps) / 12.0
    else:
        n_timesteps = len(times)

    # Default seasonal pattern: constant
    if seasonal_pattern is None:
        seasonal_pattern = np.ones(n_timesteps)
    elif len(seasonal_pattern) != n_timesteps:
        raise ValueError(
            f"seasonal_pattern length ({len(seasonal_pattern)}) != n_timesteps ({n_timesteps})"
        )

    # Create flux timeseries
    flux_data = np.zeros((n_timesteps, 1, n_patches, n_patches))

    for t_idx in range(n_timesteps):
        flux_data[t_idx, 0] = connectivity_matrix * seasonal_pattern[t_idx]

    return ExternalFluxTimeseries(
        flux_data=flux_data,
        times=times,
        group_indices=np.array([0]),
        interpolate=True,
        format="connectivity_matrix",
    )

load_external_flux_from_csv

load_external_flux_from_csv(filepath: str, n_patches: int, time_column: str = 'time', patch_from_column: str = 'from', patch_to_column: str = 'to', flux_column: str = 'flux') -> 'ExternalFluxTimeseries'

Load external flux from CSV file.

CSV format (edge list): time, from, to, flux 0.0, 0, 1, 0.5 0.0, 1, 2, 0.3 ...

Parameters:

Name Type Description Default
filepath str

Path to CSV file

required
n_patches int

Number of patches in grid

required
time_column str

Name of time column

'time'
patch_from_column str

Name of source patch column

'from'
patch_to_column str

Name of destination patch column

'to'
flux_column str

Name of flux value column

'flux'

Returns:

Type Description
ExternalFluxTimeseries
Source code in pypath/spatial/external_flux.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
def load_external_flux_from_csv(
    filepath: str,
    n_patches: int,
    time_column: str = "time",
    patch_from_column: str = "from",
    patch_to_column: str = "to",
    flux_column: str = "flux",
) -> "ExternalFluxTimeseries":
    """Load external flux from CSV file.

    CSV format (edge list):
        time, from, to, flux
        0.0, 0, 1, 0.5
        0.0, 1, 2, 0.3
        ...

    Parameters
    ----------
    filepath : str
        Path to CSV file
    n_patches : int
        Number of patches in grid
    time_column : str
        Name of time column
    patch_from_column : str
        Name of source patch column
    patch_to_column : str
        Name of destination patch column
    flux_column : str
        Name of flux value column

    Returns
    -------
    ExternalFluxTimeseries
    """
    import pandas as pd

    from pypath.spatial.ecospace_params import ExternalFluxTimeseries

    # Load CSV
    df = pd.read_csv(filepath)

    # Check for required columns
    required = [time_column, patch_from_column, patch_to_column, flux_column]
    missing = [col for col in required if col not in df.columns]
    if missing:
        raise ValueError(f"Missing columns: {missing}. Available: {list(df.columns)}")

    # Get unique times
    times = np.sort(df[time_column].unique())
    n_timesteps = len(times)

    # Initialize flux array (assume single group for CSV)
    flux_data = np.zeros((n_timesteps, 1, n_patches, n_patches))

    # Fill flux matrix for each timestep
    for t_idx, t in enumerate(times):
        df_t = df[df[time_column] == t]

        for _, row in df_t.iterrows():
            i = int(row[patch_from_column])
            j = int(row[patch_to_column])
            flux_val = float(row[flux_column])

            flux_data[t_idx, 0, i, j] = flux_val

    return ExternalFluxTimeseries(
        flux_data=flux_data,
        times=times,
        group_indices=np.array([0]),  # Single group
        interpolate=True,
        format="flux_matrix",
    )

load_external_flux_from_netcdf

load_external_flux_from_netcdf(filepath: str, time_var: str = 'time', flux_var: str = 'flux', group_mapping: Optional[Dict[str, int]] = None) -> 'ExternalFluxTimeseries'

Load external flux from NetCDF file.

Typical NetCDF structure from ocean models: dimensions: time = n_timesteps group = n_groups (or species) patch_from = n_patches patch_to = n_patches

variables:
    float time(time)  # Time in years or days
    float flux(time, group, patch_from, patch_to)

Parameters:

Name Type Description Default
filepath str

Path to NetCDF file

required
time_var str

Name of time dimension/variable (default: "time")

'time'
flux_var str

Name of flux variable (default: "flux") Expected shape: [time, group, patch_from, patch_to]

'flux'
group_mapping dict

Map from species names to group indices Example: {'cod': 3, 'herring': 5} If None, uses sequential indices

None

Returns:

Type Description
ExternalFluxTimeseries

Raises:

Type Description
ImportError

If netCDF4/xarray not installed

FileNotFoundError

If filepath does not exist

ValueError

If required variables not found

Source code in pypath/spatial/external_flux.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def load_external_flux_from_netcdf(
    filepath: str,
    time_var: str = "time",
    flux_var: str = "flux",
    group_mapping: Optional[Dict[str, int]] = None,
) -> "ExternalFluxTimeseries":
    """Load external flux from NetCDF file.

    Typical NetCDF structure from ocean models:
        dimensions:
            time = n_timesteps
            group = n_groups (or species)
            patch_from = n_patches
            patch_to = n_patches

        variables:
            float time(time)  # Time in years or days
            float flux(time, group, patch_from, patch_to)

    Parameters
    ----------
    filepath : str
        Path to NetCDF file
    time_var : str
        Name of time dimension/variable (default: "time")
    flux_var : str
        Name of flux variable (default: "flux")
        Expected shape: [time, group, patch_from, patch_to]
    group_mapping : dict, optional
        Map from species names to group indices
        Example: {'cod': 3, 'herring': 5}
        If None, uses sequential indices

    Returns
    -------
    ExternalFluxTimeseries

    Raises
    ------
    ImportError
        If netCDF4/xarray not installed
    FileNotFoundError
        If filepath does not exist
    ValueError
        If required variables not found
    """
    if not _NETCDF_AVAILABLE:
        raise ImportError(
            "netCDF4 and xarray required for NetCDF support. "
            "Install with: pip install netCDF4 xarray"
        )

    from pypath.spatial.ecospace_params import ExternalFluxTimeseries

    # Load NetCDF using xarray
    ds = xr.open_dataset(filepath)

    # Check for required variables
    if time_var not in ds:
        raise ValueError(
            f"Time variable '{time_var}' not found in NetCDF. Available: {list(ds.variables)}"
        )

    if flux_var not in ds:
        raise ValueError(
            f"Flux variable '{flux_var}' not found in NetCDF. Available: {list(ds.variables)}"
        )

    # Load time
    times = ds[time_var].values

    # Convert time to years if needed
    if hasattr(ds[time_var], "units"):
        units = ds[time_var].units
        if "days" in units.lower():
            times = times / 365.25
        elif "months" in units.lower():
            times = times / 12.0

    # Load flux data
    flux_data = ds[flux_var].values

    # Validate dimensions
    if flux_data.ndim != 4:
        raise ValueError(
            f"Flux variable must be 4D [time, group, patch_from, patch_to], "
            f"got {flux_data.ndim}D with shape {flux_data.shape}"
        )

    n_timesteps, n_groups, n_patches_from, n_patches_to = flux_data.shape

    if n_patches_from != n_patches_to:
        raise ValueError(
            f"Flux matrix must be square [patch_from, patch_to], "
            f"got {n_patches_from} x {n_patches_to}"
        )

    # Determine group indices
    if group_mapping is not None:
        # Use provided mapping
        group_indices = np.array(list(group_mapping.values()))
    else:
        # Sequential indices
        group_indices = np.arange(n_groups)

    # Close dataset
    ds.close()

    return ExternalFluxTimeseries(
        flux_data=flux_data,
        times=times,
        group_indices=group_indices,
        interpolate=True,
        format="flux_matrix",
    )

rescale_flux_for_conservation

rescale_flux_for_conservation(flux_matrix: ndarray) -> np.ndarray

Rescale flux matrix to enforce mass conservation.

Normalizes each row so that total outflow fraction does not exceed 1.0 (i.e., a patch cannot export more than 100% of its biomass per step). Rows whose outflow sums to <= 1.0 are left unchanged.

Parameters:

Name Type Description Default
flux_matrix ndarray

Flux matrix [n_patches, n_patches] flux_matrix[i, j] = fraction of biomass in patch i moving to patch j.

required

Returns:

Type Description
ndarray

Rescaled flux matrix where no row sums to more than 1.0.

Source code in pypath/spatial/external_flux.py
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
def rescale_flux_for_conservation(flux_matrix: np.ndarray) -> np.ndarray:
    """Rescale flux matrix to enforce mass conservation.

    Normalizes each row so that total outflow fraction does not exceed 1.0
    (i.e., a patch cannot export more than 100% of its biomass per step).
    Rows whose outflow sums to <= 1.0 are left unchanged.

    Parameters
    ----------
    flux_matrix : np.ndarray
        Flux matrix [n_patches, n_patches]
        flux_matrix[i, j] = fraction of biomass in patch i moving to patch j.

    Returns
    -------
    np.ndarray
        Rescaled flux matrix where no row sums to more than 1.0.
    """
    result = flux_matrix.copy()
    for i in range(result.shape[0]):
        row_sum = result[i, :].sum()
        if row_sum > 1.0:
            logger.warning("Row %d outflow fraction %.4f > 1.0, rescaling", i, row_sum)
            result[i, :] /= row_sum
    return result

summarize_external_flux

summarize_external_flux(external_flux: 'ExternalFluxTimeseries') -> Dict

Summarize external flux timeseries.

Parameters:

Name Type Description Default
external_flux ExternalFluxTimeseries

External flux data

required

Returns:

Type Description
dict

Summary statistics: - 'n_timesteps': Number of time points - 'time_range': (min_time, max_time) - 'n_groups': Number of groups with external flux - 'n_patches': Number of patches - 'mean_flux': Mean flux value - 'max_flux': Maximum flux value - 'is_conserved': Whether flux conserves mass

Source code in pypath/spatial/external_flux.py
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
def summarize_external_flux(external_flux: "ExternalFluxTimeseries") -> Dict:
    """Summarize external flux timeseries.

    Parameters
    ----------
    external_flux : ExternalFluxTimeseries
        External flux data

    Returns
    -------
    dict
        Summary statistics:
        - 'n_timesteps': Number of time points
        - 'time_range': (min_time, max_time)
        - 'n_groups': Number of groups with external flux
        - 'n_patches': Number of patches
        - 'mean_flux': Mean flux value
        - 'max_flux': Maximum flux value
        - 'is_conserved': Whether flux conserves mass
    """
    flux_data = external_flux.flux_data

    # Check conservation for first timestep
    is_conserved = validate_external_flux_conservation(flux_data[0, 0])

    summary = {
        "n_timesteps": len(external_flux.times),
        "time_range": (external_flux.times[0], external_flux.times[-1]),
        "n_groups": len(external_flux.group_indices),
        "n_patches": flux_data.shape[2],
        "mean_flux": float(np.mean(np.abs(flux_data))),
        "max_flux": float(np.max(np.abs(flux_data))),
        "is_conserved": is_conserved,
        "interpolate": external_flux.interpolate,
        "format": external_flux.format,
    }

    return summary

validate_external_flux_conservation

validate_external_flux_conservation(flux_matrix: ndarray, tolerance: float = 1e-10) -> bool

Validate that external flux conserves mass.

Sum over all patches: inflow - outflow should be 0 (within numerical tolerance).

Parameters:

Name Type Description Default
flux_matrix ndarray

Flux matrix [n_patches, n_patches] flux_matrix[i, j] = flux from patch i to patch j

required
tolerance float

Numerical tolerance for zero

1e-10

Returns:

Type Description
bool

True if flux is conserved

Notes

For mass conservation: Σ_i Σ_j flux[i, j] = Σ_j Σ_i flux[i, j] (total outflow = total inflow)

Equivalently: Σ_i (Σ_j flux[i, j] - Σ_j flux[j, i]) = 0

Source code in pypath/spatial/external_flux.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
def validate_external_flux_conservation(
    flux_matrix: np.ndarray, tolerance: float = 1e-10
) -> bool:
    """Validate that external flux conserves mass.

    Sum over all patches: inflow - outflow should be 0
    (within numerical tolerance).

    Parameters
    ----------
    flux_matrix : np.ndarray
        Flux matrix [n_patches, n_patches]
        flux_matrix[i, j] = flux from patch i to patch j
    tolerance : float
        Numerical tolerance for zero

    Returns
    -------
    bool
        True if flux is conserved

    Notes
    -----
    For mass conservation:
        Σ_i Σ_j flux[i, j] = Σ_j Σ_i flux[i, j]
        (total outflow = total inflow)

    Equivalently:
        Σ_i (Σ_j flux[i, j] - Σ_j flux[j, i]) = 0
    """
    # Calculate net flux for each patch
    if scipy.sparse.issparse(flux_matrix):
        inflow = np.array(flux_matrix.sum(axis=0)).flatten()
        outflow = np.array(flux_matrix.sum(axis=1)).flatten()
    else:
        inflow = flux_matrix.sum(axis=0)
        outflow = flux_matrix.sum(axis=1)

    net_flux = inflow - outflow

    # Total imbalance
    total_imbalance = np.abs(net_flux).sum()

    return total_imbalance < tolerance

Spatial Fishing

pypath.spatial.fishing

Spatial fishing effort allocation for ECOSPACE.

Implements spatially-explicit fishing with multiple allocation strategies: - Uniform: Equal effort across all patches - Gravity: Biomass-weighted effort (fish where fish are) - Port-based: Distance from fishing ports - Prescribed: User-defined spatial patterns - Habitat-based: Target preferred habitats

SpatialFishing dataclass

Spatial fishing effort allocation.

Represents how fishing effort is distributed across spatial patches.

Parameters:

Name Type Description Default
allocation_type str

Method for allocating effort: - "uniform": Equal across patches - "gravity": Biomass-weighted (alpha, beta parameters) - "port": Distance from ports (beta parameter) - "prescribed": User-defined pattern - "habitat": Target specific habitat types

'uniform'
effort_allocation ndarray

Pre-computed effort distribution [n_months, n_gears, n_patches] Normalized so sum over patches = ForcedEffort[month, gear]

None
gravity_alpha float

Biomass attraction exponent (default: 1.0) Higher = stronger attraction to high biomass

1.0
gravity_beta float

Distance penalty exponent (default: 0.5) Higher = stronger distance penalty from ports

0.5
port_patches ndarray

Indices of patches containing ports

None
target_groups List[int]

Group indices to target for gravity allocation

None
custom_allocation_function Callable

Custom function(biomass, t, params) -> allocation [n_patches]

None

Examples:

>>> # Uniform allocation
>>> fishing = SpatialFishing(allocation_type="uniform")
>>> # Gravity model (fish where fish are)
>>> fishing = SpatialFishing(
...     allocation_type="gravity",
...     gravity_alpha=1.5,  # Strong biomass attraction
...     target_groups=[3, 5, 7]  # Target specific species
... )
>>> # Port-based (effort decreases with distance)
>>> fishing = SpatialFishing(
...     allocation_type="port",
...     port_patches=np.array([0, 5, 10]),  # Three ports
...     gravity_beta=1.0  # Distance penalty
... )
Source code in pypath/spatial/fishing.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
@dataclass
class SpatialFishing:
    """Spatial fishing effort allocation.

    Represents how fishing effort is distributed across spatial patches.

    Parameters
    ----------
    allocation_type : str
        Method for allocating effort:
        - "uniform": Equal across patches
        - "gravity": Biomass-weighted (alpha, beta parameters)
        - "port": Distance from ports (beta parameter)
        - "prescribed": User-defined pattern
        - "habitat": Target specific habitat types
    effort_allocation : np.ndarray, optional
        Pre-computed effort distribution [n_months, n_gears, n_patches]
        Normalized so sum over patches = ForcedEffort[month, gear]
    gravity_alpha : float
        Biomass attraction exponent (default: 1.0)
        Higher = stronger attraction to high biomass
    gravity_beta : float
        Distance penalty exponent (default: 0.5)
        Higher = stronger distance penalty from ports
    port_patches : np.ndarray, optional
        Indices of patches containing ports
    target_groups : List[int], optional
        Group indices to target for gravity allocation
    custom_allocation_function : Callable, optional
        Custom function(biomass, t, params) -> allocation [n_patches]

    Examples
    --------
    >>> # Uniform allocation
    >>> fishing = SpatialFishing(allocation_type="uniform")

    >>> # Gravity model (fish where fish are)
    >>> fishing = SpatialFishing(
    ...     allocation_type="gravity",
    ...     gravity_alpha=1.5,  # Strong biomass attraction
    ...     target_groups=[3, 5, 7]  # Target specific species
    ... )

    >>> # Port-based (effort decreases with distance)
    >>> fishing = SpatialFishing(
    ...     allocation_type="port",
    ...     port_patches=np.array([0, 5, 10]),  # Three ports
    ...     gravity_beta=1.0  # Distance penalty
    ... )
    """

    allocation_type: str = "uniform"
    effort_allocation: Optional[np.ndarray] = None
    gravity_alpha: float = 1.0
    gravity_beta: float = 0.5
    port_patches: Optional[np.ndarray] = None
    target_groups: Optional[List[int]] = None
    custom_allocation_function: Optional[Callable] = None

    def __post_init__(self):
        """Validate parameters after initialization."""
        valid_types = ["uniform", "gravity", "port", "prescribed", "habitat", "custom"]
        if self.allocation_type not in valid_types:
            raise ValueError(
                f"allocation_type must be one of {valid_types}, got '{self.allocation_type}'"
            )

        if self.allocation_type == "prescribed" and self.effort_allocation is None:
            raise ValueError(
                "allocation_type='prescribed' requires effort_allocation array"
            )

        if self.allocation_type == "custom" and self.custom_allocation_function is None:
            raise ValueError(
                "allocation_type='custom' requires custom_allocation_function"
            )

        if self.port_patches is not None:
            self.port_patches = np.asarray(self.port_patches, dtype=int)
__post_init__
__post_init__()

Validate parameters after initialization.

Source code in pypath/spatial/fishing.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def __post_init__(self):
    """Validate parameters after initialization."""
    valid_types = ["uniform", "gravity", "port", "prescribed", "habitat", "custom"]
    if self.allocation_type not in valid_types:
        raise ValueError(
            f"allocation_type must be one of {valid_types}, got '{self.allocation_type}'"
        )

    if self.allocation_type == "prescribed" and self.effort_allocation is None:
        raise ValueError(
            "allocation_type='prescribed' requires effort_allocation array"
        )

    if self.allocation_type == "custom" and self.custom_allocation_function is None:
        raise ValueError(
            "allocation_type='custom' requires custom_allocation_function"
        )

    if self.port_patches is not None:
        self.port_patches = np.asarray(self.port_patches, dtype=int)

allocate_gravity

allocate_gravity(biomass: ndarray, target_groups: Optional[List[int]], total_effort: float, alpha: float = 1.0, beta: float = 0.0, port_patches: Optional[ndarray] = None, grid: Optional['EcospaceGrid'] = None) -> np.ndarray

Allocate effort using gravity model (biomass attraction + distance penalty).

Effort follows: effort[p] ∝ (Σ_g biomass[g, p]^alpha) / distance[port, p]^beta

where g ∈ target_groups, and distance is from nearest port.

Parameters:

Name Type Description Default
biomass ndarray

Biomass by group and patch [n_groups+1, n_patches]

required
target_groups list of int

Group indices to target (if None, use all groups)

required
total_effort float

Total effort to allocate

required
alpha float

Biomass attraction exponent (default: 1.0) - 0 = ignore biomass (random) - 1 = proportional to biomass - >1 = concentrate on high biomass

1.0
beta float

Distance penalty exponent (default: 0.0) - 0 = ignore distance - >0 = avoid distant patches

0.0
port_patches ndarray

Indices of patches with ports If None, assumes uniform accessibility

None
grid EcospaceGrid

Spatial grid (required if beta > 0)

None

Returns:

Type Description
ndarray

Effort per patch [n_patches], sums to total_effort

Examples:

>>> biomass = np.array([[0, 0, 0], [10, 20, 5]])  # 1 group, 3 patches
>>> allocate_gravity(biomass, target_groups=[1], total_effort=100, alpha=1.0)
array([28.57142857, 57.14285714, 14.28571429])  # Proportional to biomass
Source code in pypath/spatial/fishing.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def allocate_gravity(
    biomass: np.ndarray,
    target_groups: Optional[List[int]],
    total_effort: float,
    alpha: float = 1.0,
    beta: float = 0.0,
    port_patches: Optional[np.ndarray] = None,
    grid: Optional["EcospaceGrid"] = None,
) -> np.ndarray:
    """Allocate effort using gravity model (biomass attraction + distance penalty).

    Effort follows:
        effort[p] ∝ (Σ_g biomass[g, p]^alpha) / distance[port, p]^beta

    where g ∈ target_groups, and distance is from nearest port.

    Parameters
    ----------
    biomass : np.ndarray
        Biomass by group and patch [n_groups+1, n_patches]
    target_groups : list of int, optional
        Group indices to target (if None, use all groups)
    total_effort : float
        Total effort to allocate
    alpha : float
        Biomass attraction exponent (default: 1.0)
        - 0 = ignore biomass (random)
        - 1 = proportional to biomass
        - >1 = concentrate on high biomass
    beta : float
        Distance penalty exponent (default: 0.0)
        - 0 = ignore distance
        - >0 = avoid distant patches
    port_patches : np.ndarray, optional
        Indices of patches with ports
        If None, assumes uniform accessibility
    grid : EcospaceGrid, optional
        Spatial grid (required if beta > 0)

    Returns
    -------
    np.ndarray
        Effort per patch [n_patches], sums to total_effort

    Examples
    --------
    >>> biomass = np.array([[0, 0, 0], [10, 20, 5]])  # 1 group, 3 patches
    >>> allocate_gravity(biomass, target_groups=[1], total_effort=100, alpha=1.0)
    array([28.57142857, 57.14285714, 14.28571429])  # Proportional to biomass
    """
    n_groups, n_patches = biomass.shape

    # Determine target groups
    if target_groups is None:
        target_groups = list(range(1, n_groups))  # Skip index 0 (Outside)

    # Calculate attractiveness (biomass-based)
    attractiveness = np.zeros(n_patches)

    for g in target_groups:
        if g < n_groups:
            attractiveness += biomass[g] ** alpha

    # Apply distance penalty if ports specified
    if beta > 0 and port_patches is not None and grid is not None:
        distance_penalty = calculate_distance_penalty(grid, port_patches, beta)
        attractiveness = attractiveness / (distance_penalty + 1e-10)

    # Normalize to total effort
    total_attractiveness = attractiveness.sum()

    if total_attractiveness < 1e-10:
        # No biomass - fall back to uniform
        return allocate_uniform(n_patches, total_effort)

    effort = attractiveness * (total_effort / total_attractiveness)

    return effort

allocate_habitat_based

allocate_habitat_based(habitat_preference: ndarray, total_effort: float, threshold: float = 0.5) -> np.ndarray

Allocate effort based on habitat preference.

Targets patches with high habitat quality for target species.

Parameters:

Name Type Description Default
habitat_preference ndarray

Habitat preference values [n_patches] Values in [0, 1]

required
total_effort float

Total effort to allocate

required
threshold float

Minimum habitat preference to fish (default: 0.5) Patches below this get zero effort

0.5

Returns:

Type Description
ndarray

Effort per patch [n_patches], sums to total_effort

Examples:

>>> habitat = np.array([0.2, 0.6, 0.8, 0.4, 0.9])
>>> allocate_habitat_based(habitat, total_effort=100, threshold=0.5)
array([ 0., 26.08695652, 34.78260870,  0., 39.13043478])
# Only patches with preference > 0.5 get effort
Source code in pypath/spatial/fishing.py
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
def allocate_habitat_based(
    habitat_preference: np.ndarray, total_effort: float, threshold: float = 0.5
) -> np.ndarray:
    """Allocate effort based on habitat preference.

    Targets patches with high habitat quality for target species.

    Parameters
    ----------
    habitat_preference : np.ndarray
        Habitat preference values [n_patches]
        Values in [0, 1]
    total_effort : float
        Total effort to allocate
    threshold : float
        Minimum habitat preference to fish (default: 0.5)
        Patches below this get zero effort

    Returns
    -------
    np.ndarray
        Effort per patch [n_patches], sums to total_effort

    Examples
    --------
    >>> habitat = np.array([0.2, 0.6, 0.8, 0.4, 0.9])
    >>> allocate_habitat_based(habitat, total_effort=100, threshold=0.5)
    array([ 0., 26.08695652, 34.78260870,  0., 39.13043478])
    # Only patches with preference > 0.5 get effort
    """
    habitat_preference = np.asarray(habitat_preference, dtype=float)

    # Apply threshold
    effort = habitat_preference.copy()
    effort[effort < threshold] = 0.0

    # Normalize
    total_pref = effort.sum()

    if total_pref < 1e-10:
        # No suitable habitat - fall back to uniform
        return allocate_uniform(len(habitat_preference), total_effort)

    effort = effort * (total_effort / total_pref)

    return effort

allocate_port_based

allocate_port_based(grid: 'EcospaceGrid', port_patches: ndarray, total_effort: float, beta: float = 1.0, max_distance: Optional[float] = None) -> np.ndarray

Allocate effort based on distance from fishing ports.

Effort decreases with distance from nearest port: effort[p] ∝ 1 / distance[p]^beta

Parameters:

Name Type Description Default
grid EcospaceGrid

Spatial grid

required
port_patches ndarray

Indices of patches containing ports

required
total_effort float

Total effort to allocate

required
beta float

Distance decay exponent (default: 1.0) Higher = faster decay with distance

1.0
max_distance float

Maximum fishing distance from ports (km) Patches beyond this get zero effort

None

Returns:

Type Description
ndarray

Effort per patch [n_patches], sums to total_effort

Examples:

>>> grid = create_1d_grid(n_patches=5)
>>> allocate_port_based(grid, port_patches=np.array([0]), total_effort=100, beta=1.0)
# Returns effort decreasing with distance from patch 0
Source code in pypath/spatial/fishing.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
def allocate_port_based(
    grid: "EcospaceGrid",
    port_patches: np.ndarray,
    total_effort: float,
    beta: float = 1.0,
    max_distance: Optional[float] = None,
) -> np.ndarray:
    """Allocate effort based on distance from fishing ports.

    Effort decreases with distance from nearest port:
        effort[p] ∝ 1 / distance[p]^beta

    Parameters
    ----------
    grid : EcospaceGrid
        Spatial grid
    port_patches : np.ndarray
        Indices of patches containing ports
    total_effort : float
        Total effort to allocate
    beta : float
        Distance decay exponent (default: 1.0)
        Higher = faster decay with distance
    max_distance : float, optional
        Maximum fishing distance from ports (km)
        Patches beyond this get zero effort

    Returns
    -------
    np.ndarray
        Effort per patch [n_patches], sums to total_effort

    Examples
    --------
    >>> grid = create_1d_grid(n_patches=5)
    >>> allocate_port_based(grid, port_patches=np.array([0]), total_effort=100, beta=1.0)
    # Returns effort decreasing with distance from patch 0
    """
    from scipy.spatial.distance import cdist

    n_patches = grid.n_patches

    # Vectorized distance from each patch to nearest port
    all_centroids = grid.patch_centroids  # [n_patches, 2]
    port_centroids = all_centroids[port_patches]  # [n_ports, 2]
    dist_matrix = cdist(all_centroids, port_centroids, metric="euclidean") * 111.0
    distance_to_port = np.maximum(dist_matrix.min(axis=1), 0.1)

    # Calculate effort based on inverse distance
    effort = 1.0 / (distance_to_port**beta)

    # Apply maximum distance cutoff if specified
    if max_distance is not None:
        effort[distance_to_port > max_distance] = 0.0

    # Normalize to total effort
    total_attractiveness = effort.sum()

    if total_attractiveness < 1e-10:
        # All patches too far - fall back to uniform at ports
        effort = np.zeros(n_patches)
        effort[port_patches] = 1.0
        total_attractiveness = len(port_patches)

    effort = effort * (total_effort / total_attractiveness)

    return effort

allocate_uniform

allocate_uniform(n_patches: int, total_effort: float = 1.0) -> np.ndarray

Allocate effort uniformly across all patches.

Parameters:

Name Type Description Default
n_patches int

Number of spatial patches

required
total_effort float

Total effort to allocate (default: 1.0)

1.0

Returns:

Type Description
ndarray

Effort per patch [n_patches], sums to total_effort

Examples:

>>> allocate_uniform(5, total_effort=100)
array([20., 20., 20., 20., 20.])
Source code in pypath/spatial/fishing.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def allocate_uniform(n_patches: int, total_effort: float = 1.0) -> np.ndarray:
    """Allocate effort uniformly across all patches.

    Parameters
    ----------
    n_patches : int
        Number of spatial patches
    total_effort : float
        Total effort to allocate (default: 1.0)

    Returns
    -------
    np.ndarray
        Effort per patch [n_patches], sums to total_effort

    Examples
    --------
    >>> allocate_uniform(5, total_effort=100)
    array([20., 20., 20., 20., 20.])
    """
    return np.ones(n_patches) * (total_effort / n_patches)

calculate_distance_penalty

calculate_distance_penalty(grid: 'EcospaceGrid', port_patches: ndarray, beta: float) -> np.ndarray

Calculate distance penalty from nearest port.

Parameters:

Name Type Description Default
grid EcospaceGrid

Spatial grid

required
port_patches ndarray

Indices of port patches

required
beta float

Distance decay exponent

required

Returns:

Type Description
ndarray

Distance penalty [n_patches] penalty[p] = distance_to_nearest_port[p]^beta

Source code in pypath/spatial/fishing.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
def calculate_distance_penalty(
    grid: "EcospaceGrid", port_patches: np.ndarray, beta: float
) -> np.ndarray:
    """Calculate distance penalty from nearest port.

    Parameters
    ----------
    grid : EcospaceGrid
        Spatial grid
    port_patches : np.ndarray
        Indices of port patches
    beta : float
        Distance decay exponent

    Returns
    -------
    np.ndarray
        Distance penalty [n_patches]
        penalty[p] = distance_to_nearest_port[p]^beta
    """
    from scipy.spatial.distance import cdist

    all_centroids = grid.patch_centroids
    port_centroids = all_centroids[port_patches]
    dist_matrix = cdist(all_centroids, port_centroids, metric="euclidean") * 111.0
    min_dist = np.maximum(dist_matrix.min(axis=1), 0.1)
    return min_dist**beta

create_spatial_fishing

create_spatial_fishing(n_months: int, n_gears: int, n_patches: int, forced_effort: ndarray, allocation_type: str = 'uniform', **kwargs) -> SpatialFishing

Create spatial fishing with pre-computed effort allocation.

Parameters:

Name Type Description Default
n_months int

Number of monthly timesteps

required
n_gears int

Number of fishing gears/fleets

required
n_patches int

Number of spatial patches

required
forced_effort ndarray

Total effort by month and gear [n_months, n_gears+1] Column 0 is "Outside" (ignored)

required
allocation_type str

Allocation method (see SpatialFishing)

'uniform'
**kwargs

Additional parameters for allocation method (e.g., gravity_alpha, port_patches, etc.)

{}

Returns:

Type Description
SpatialFishing

Spatial fishing object with effort_allocation computed

Examples:

>>> forced_effort = np.ones((12, 3))  # 12 months, 2 gears + Outside
>>> fishing = create_spatial_fishing(
...     n_months=12,
...     n_gears=2,
...     n_patches=10,
...     forced_effort=forced_effort,
...     allocation_type="uniform"
... )
Source code in pypath/spatial/fishing.py
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
def create_spatial_fishing(
    n_months: int,
    n_gears: int,
    n_patches: int,
    forced_effort: np.ndarray,
    allocation_type: str = "uniform",
    **kwargs,
) -> SpatialFishing:
    """Create spatial fishing with pre-computed effort allocation.

    Parameters
    ----------
    n_months : int
        Number of monthly timesteps
    n_gears : int
        Number of fishing gears/fleets
    n_patches : int
        Number of spatial patches
    forced_effort : np.ndarray
        Total effort by month and gear [n_months, n_gears+1]
        Column 0 is "Outside" (ignored)
    allocation_type : str
        Allocation method (see SpatialFishing)
    **kwargs
        Additional parameters for allocation method
        (e.g., gravity_alpha, port_patches, etc.)

    Returns
    -------
    SpatialFishing
        Spatial fishing object with effort_allocation computed

    Examples
    --------
    >>> forced_effort = np.ones((12, 3))  # 12 months, 2 gears + Outside
    >>> fishing = create_spatial_fishing(
    ...     n_months=12,
    ...     n_gears=2,
    ...     n_patches=10,
    ...     forced_effort=forced_effort,
    ...     allocation_type="uniform"
    ... )
    """
    # Initialize effort allocation array
    effort_allocation = np.zeros((n_months, n_gears + 1, n_patches))

    # Column 0 (Outside) always zero
    effort_allocation[:, 0, :] = 0.0

    # Allocate each gear for each month
    for month in range(n_months):
        for gear in range(1, n_gears + 1):
            total_effort = forced_effort[month, gear]

            if allocation_type == "uniform":
                allocation = allocate_uniform(n_patches, total_effort)

            elif allocation_type == "gravity":
                # Requires biomass - will be computed dynamically
                # For now, use uniform as placeholder
                allocation = allocate_uniform(n_patches, total_effort)

            elif allocation_type == "port":
                # Requires grid and port_patches
                grid = kwargs.get("grid")
                port_patches = kwargs.get("port_patches")

                if grid is None or port_patches is None:
                    raise ValueError(
                        "allocation_type='port' requires 'grid' and 'port_patches'"
                    )

                beta = kwargs.get("gravity_beta", 1.0)
                allocation = allocate_port_based(grid, port_patches, total_effort, beta)

            else:
                # Fall back to uniform
                allocation = allocate_uniform(n_patches, total_effort)

            effort_allocation[month, gear, :] = allocation

    # Filter kwargs to only include SpatialFishing parameters
    # Exclude allocation-specific parameters that were only used for calculation
    spatial_fishing_kwargs = {}
    valid_params = [
        "gravity_alpha",
        "gravity_beta",
        "port_patches",
        "target_groups",
        "custom_allocation_function",
    ]

    for key in valid_params:
        if key in kwargs:
            spatial_fishing_kwargs[key] = kwargs[key]

    # Create SpatialFishing object
    spatial_fishing = SpatialFishing(
        allocation_type=allocation_type,
        effort_allocation=effort_allocation,
        **spatial_fishing_kwargs,
    )

    return spatial_fishing

validate_effort_allocation

validate_effort_allocation(effort_allocation: ndarray, forced_effort: ndarray, tolerance: float = 1e-08) -> bool

Validate that spatial effort allocation sums correctly.

For each month and gear: Σ_patches effort_allocation[m, g, p] = forced_effort[m, g]

Parameters:

Name Type Description Default
effort_allocation ndarray

Spatial effort [n_months, n_gears+1, n_patches]

required
forced_effort ndarray

Total effort [n_months, n_gears+1]

required
tolerance float

Numerical tolerance (default: 1e-8)

1e-08

Returns:

Type Description
bool

True if allocation is valid

Source code in pypath/spatial/fishing.py
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
def validate_effort_allocation(
    effort_allocation: np.ndarray, forced_effort: np.ndarray, tolerance: float = 1e-8
) -> bool:
    """Validate that spatial effort allocation sums correctly.

    For each month and gear:
        Σ_patches effort_allocation[m, g, p] = forced_effort[m, g]

    Parameters
    ----------
    effort_allocation : np.ndarray
        Spatial effort [n_months, n_gears+1, n_patches]
    forced_effort : np.ndarray
        Total effort [n_months, n_gears+1]
    tolerance : float
        Numerical tolerance (default: 1e-8)

    Returns
    -------
    bool
        True if allocation is valid
    """
    # Sum over patches
    spatial_total = effort_allocation.sum(axis=2)

    # Compare to forced effort
    difference = np.abs(spatial_total - forced_effort)

    return np.all(difference < tolerance)

Spatial Integration

pypath.spatial.integration

Spatial-temporal integration for ECOSPACE.

Integrates ECOSPACE spatial dynamics with Ecosim temporal dynamics: - Spatial derivative calculation (local dynamics + movement) - RK4 integration extended for spatial state - Wrapper functions for spatial simulations - Backward compatibility with non-spatial Ecosim

deriv_vector_spatial

deriv_vector_spatial(state_spatial: ndarray, params: Dict, forcing: Dict, fishing: Dict, ecospace: EcospaceParams, environmental_drivers: Optional[EnvironmentalDrivers], t: float = 0.0, dt: float = 1.0 / 12.0) -> np.ndarray

Calculate spatial derivative (local dynamics + movement).

For each patch p: 1. Calculate local Ecosim dynamics (production, predation, fishing, M0) 2. Apply habitat capacity to carrying capacity (if environmental drivers present) 3. Add spatial fluxes (migration/dispersal)

Parameters:

Name Type Description Default
state_spatial ndarray

Spatial state [n_groups+1, n_patches] Index 0 = "Outside" (no dynamics) Index 1+ = Living and detritus groups

required
params dict

Ecosim parameters (from RsimParams)

required
forcing dict

Environmental forcing (from RsimForcing)

required
fishing dict

Fishing forcing (from RsimFishing)

required
ecospace EcospaceParams

Spatial parameters

required
environmental_drivers EnvironmentalDrivers

Time-varying environmental layers for habitat capacity

required
t float

Simulation time (years)

0.0
dt float

Timestep size (default: 1/12 year = 1 month)

1.0 / 12.0

Returns:

Type Description
ndarray

Spatial derivative [n_groups+1, n_patches] deriv[g, p] = rate of change for group g in patch p

Notes

This function extends the standard Ecosim derivative to spatial grids. For each patch, the local Ecosim dynamics are calculated independently, then spatial fluxes (movement) are added to account for dispersal.

Habitat capacity can be calculated from environmental drivers: capacity = f(temperature, depth, salinity, ...)

Source code in pypath/spatial/integration.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def deriv_vector_spatial(
    state_spatial: np.ndarray,
    params: Dict,
    forcing: Dict,
    fishing: Dict,
    ecospace: EcospaceParams,
    environmental_drivers: Optional[EnvironmentalDrivers],
    t: float = 0.0,
    dt: float = 1.0 / 12.0,
) -> np.ndarray:
    """Calculate spatial derivative (local dynamics + movement).

    For each patch p:
        1. Calculate local Ecosim dynamics (production, predation, fishing, M0)
        2. Apply habitat capacity to carrying capacity (if environmental drivers present)
        3. Add spatial fluxes (migration/dispersal)

    Parameters
    ----------
    state_spatial : np.ndarray
        Spatial state [n_groups+1, n_patches]
        Index 0 = "Outside" (no dynamics)
        Index 1+ = Living and detritus groups
    params : dict
        Ecosim parameters (from RsimParams)
    forcing : dict
        Environmental forcing (from RsimForcing)
    fishing : dict
        Fishing forcing (from RsimFishing)
    ecospace : EcospaceParams
        Spatial parameters
    environmental_drivers : EnvironmentalDrivers, optional
        Time-varying environmental layers for habitat capacity
    t : float
        Simulation time (years)
    dt : float
        Timestep size (default: 1/12 year = 1 month)

    Returns
    -------
    np.ndarray
        Spatial derivative [n_groups+1, n_patches]
        deriv[g, p] = rate of change for group g in patch p

    Notes
    -----
    This function extends the standard Ecosim derivative to spatial grids.
    For each patch, the local Ecosim dynamics are calculated independently,
    then spatial fluxes (movement) are added to account for dispersal.

    Habitat capacity can be calculated from environmental drivers:
        capacity = f(temperature, depth, salinity, ...)
    """
    from pypath.ibm.base import SpatialContext
    from pypath.spatial.dispersal import calculate_spatial_flux

    _n_groups = state_spatial.shape[0]
    n_patches = state_spatial.shape[1]

    # Initialize derivative
    deriv_spatial = np.zeros_like(state_spatial, dtype=float)

    # Step 1: Calculate local dynamics for each patch
    # Pre-compute habitat capacity modifications if needed
    params_need_modification = (
        environmental_drivers is not None
        and hasattr(ecospace, "habitat_capacity")
        and "B_BaseRef" in params
    )

    if params_need_modification:
        # Pre-compute all modified B_BaseRef arrays for all patches
        # This is more efficient than copying params for each patch
        b_base_ref_original = params["B_BaseRef"]
        capacity_multipliers = ecospace.habitat_capacity  # [n_groups, n_patches]
        n_ecospace_groups = capacity_multipliers.shape[0]

        # Create modified B_BaseRef for each patch (vectorized)
        # Only need to modify if we actually have habitat capacity
        b_base_ref_patches = np.tile(b_base_ref_original[:, np.newaxis], (1, n_patches))

        # Apply capacity multipliers to living groups only (skip index 0)
        for g_idx in range(n_ecospace_groups):
            state_idx = g_idx + 1  # Skip index 0 (Outside)
            if state_idx < len(b_base_ref_original):
                b_base_ref_patches[state_idx, :] *= capacity_multipliers[g_idx, :]

    # Build SpatialContext for each IBM group
    ibm_groups = params.get("ibm_groups", {})
    ibm_spatial_contexts = {}
    if ibm_groups:
        ActiveLink = params.get("ActiveLink", None)
        n_state = state_spatial.shape[0]
        adjacency_matrix = ecospace.grid.adjacency_matrix

        for g_idx, _ibm in ibm_groups.items():
            # Use habitat preference for this group, or uniform if unavailable
            hab_idx = g_idx - 1  # Convert 1-based Ecosim index to 0-based
            if hab_idx < ecospace.habitat_preference.shape[0]:
                habitat_qual = ecospace.habitat_preference[hab_idx]
            else:
                habitat_qual = np.ones(n_patches)

            # Compute prey and predator densities using the diet matrix.
            # ActiveLink[prey, pred] is True when pred eats prey.
            if ActiveLink is not None:
                # Use cached masks if available (ActiveLink is static)
                cache_key = "_ibm_masks_%d" % g_idx
                cached = params.get(cache_key)
                if cached is not None:
                    prey_mask, pred_mask = cached
                else:
                    prey_mask = np.zeros(n_state, dtype=bool)
                    for prey in range(1, min(n_state, ActiveLink.shape[0])):
                        if g_idx < ActiveLink.shape[1] and ActiveLink[prey, g_idx]:
                            prey_mask[prey] = True
                    pred_mask = np.zeros(n_state, dtype=bool)
                    for pred in range(1, min(n_state, ActiveLink.shape[1])):
                        if g_idx < ActiveLink.shape[0] and ActiveLink[g_idx, pred]:
                            pred_mask[pred] = True
                    params[cache_key] = (prey_mask, pred_mask)

                food = (
                    state_spatial[prey_mask, :].sum(axis=0)
                    if prey_mask.any()
                    else np.zeros(n_patches)
                )
                pred = (
                    state_spatial[pred_mask, :].sum(axis=0)
                    if pred_mask.any()
                    else np.zeros(n_patches)
                )
            else:
                # Fallback: total living biomass (less informative but safe)
                food = state_spatial[1:, :].sum(axis=0)
                pred = state_spatial[1:, :].sum(axis=0)

            ibm_spatial_contexts[g_idx] = SpatialContext(
                adjacency=adjacency_matrix,
                habitat_quality=habitat_qual,
                food_density=food,
                predator_density=pred,
                n_patches=n_patches,
            )

    # Inject IBM spatial contexts into params for deriv_vector
    for g_idx, ctx in ibm_spatial_contexts.items():
        params["_ibm_spatial_context_%d" % g_idx] = ctx

    def _compute_patch(patch_idx, patch_params):
        """Compute local Ecosim derivative for a single patch.

        Each call receives its own *patch_params* dict so that concurrent
        threads never share mutable state.
        """
        state_patch = state_spatial[:, patch_idx]
        if params_need_modification:
            patch_params["B_BaseRef"] = b_base_ref_patches[:, patch_idx]
        deriv_spatial[:, patch_idx] = deriv_vector(
            state_patch, patch_params, forcing, fishing, t=t
        )

    try:
        # Parallelize across patches when numba is active (GIL-free kernels)
        # and there are enough patches to amortize thread-pool overhead.
        _use_parallel = n_patches > 4 and HAS_NUMBA

        if _use_parallel:
            # Each thread gets a shallow copy of the params dict so that
            # per-patch B_BaseRef swaps are thread-safe.  The heavy arrays
            # (ActiveLink, VV, DD, QQbase, _link_prey, _link_pred, ...) are
            # shared read-only across threads.
            patch_params_list = [params.copy() for _ in range(n_patches)]
            with ThreadPoolExecutor(max_workers=_N_WORKERS) as pool:
                futures = [
                    pool.submit(_compute_patch, pidx, patch_params_list[pidx])
                    for pidx in range(n_patches)
                ]
                # Raise any exception that occurred in a worker thread.
                for f in futures:
                    f.result()
        else:
            # Sequential fallback — reuse the same params dict (no copy needed
            # when running single-threaded because B_BaseRef is restored after
            # each iteration).
            for patch_idx in range(n_patches):
                state_patch = state_spatial[:, patch_idx]

                if params_need_modification:
                    b_base_ref_backup = params["B_BaseRef"]
                    params["B_BaseRef"] = b_base_ref_patches[:, patch_idx]
                    try:
                        deriv_local = deriv_vector(
                            state_patch, params, forcing, fishing, t=t
                        )
                    finally:
                        params["B_BaseRef"] = b_base_ref_backup
                else:
                    deriv_local = deriv_vector(
                        state_patch, params, forcing, fishing, t=t
                    )

                deriv_spatial[:, patch_idx] = deriv_local
    finally:
        # Clean up injected spatial context keys
        for g_idx in ibm_spatial_contexts:
            params.pop("_ibm_spatial_context_%d" % g_idx, None)

    # Step 2: Add spatial fluxes (movement/dispersal)
    spatial_flux = calculate_spatial_flux(state_spatial, ecospace, params, t)

    # Add spatial fluxes to local dynamics
    deriv_spatial += spatial_flux

    return deriv_spatial

rsim_run_spatial

rsim_run_spatial(scenario: RsimScenario, method: str = 'RK4', years: Optional[range] = None, ecospace: Optional[EcospaceParams] = None, environmental_drivers: Optional[EnvironmentalDrivers] = None) -> RsimOutput

Run spatial Ecosim simulation.

Wrapper for Ecosim that extends to spatial grids. If ecospace is None, falls back to standard non-spatial Ecosim.

Parameters:

Name Type Description Default
scenario RsimScenario

Simulation scenario (params, forcing, fishing, start state)

required
method str

Integration method (default: 'RK4') Currently only RK4 is implemented

'RK4'
years range

Years to simulate (default: use scenario years) Example: range(1, 101) for 100 years

None
ecospace EcospaceParams

Spatial parameters If None, runs standard non-spatial Ecosim

None
environmental_drivers EnvironmentalDrivers

Time-varying environmental layers for habitat capacity

None

Returns:

Type Description
RsimOutput

Simulation results - out_Biomass: Total biomass (summed over patches) for compatibility - out_Biomass_spatial: Spatial biomass [n_months, n_groups+1, n_patches] (if spatial) - Other outputs as per standard Ecosim

Examples:

>>> # Non-spatial (standard Ecosim)
>>> result = rsim_run_spatial(scenario)
>>> # Spatial ECOSPACE
>>> from pypath.spatial import EcospaceGrid, EcospaceParams
>>> grid = EcospaceGrid.from_shapefile('grid.shp')
>>> ecospace = EcospaceParams(grid, ...)
>>> result = rsim_run_spatial(scenario, ecospace=ecospace)
>>> spatial_biomass = result.out_Biomass_spatial  # [n_months, n_groups, n_patches]
>>> total_biomass = result.out_Biomass  # [n_months, n_groups] (summed over patches)
Source code in pypath/spatial/integration.py
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
def rsim_run_spatial(
    scenario: RsimScenario,
    method: str = "RK4",
    years: Optional[range] = None,
    ecospace: Optional[EcospaceParams] = None,
    environmental_drivers: Optional[EnvironmentalDrivers] = None,
) -> RsimOutput:
    """Run spatial Ecosim simulation.

    Wrapper for Ecosim that extends to spatial grids. If ecospace is None,
    falls back to standard non-spatial Ecosim.

    Parameters
    ----------
    scenario : RsimScenario
        Simulation scenario (params, forcing, fishing, start state)
    method : str
        Integration method (default: 'RK4')
        Currently only RK4 is implemented
    years : range, optional
        Years to simulate (default: use scenario years)
        Example: range(1, 101) for 100 years
    ecospace : EcospaceParams, optional
        Spatial parameters
        If None, runs standard non-spatial Ecosim
    environmental_drivers : EnvironmentalDrivers, optional
        Time-varying environmental layers for habitat capacity

    Returns
    -------
    RsimOutput
        Simulation results
        - out_Biomass: Total biomass (summed over patches) for compatibility
        - out_Biomass_spatial: Spatial biomass [n_months, n_groups+1, n_patches] (if spatial)
        - Other outputs as per standard Ecosim

    Examples
    --------
    >>> # Non-spatial (standard Ecosim)
    >>> result = rsim_run_spatial(scenario)

    >>> # Spatial ECOSPACE
    >>> from pypath.spatial import EcospaceGrid, EcospaceParams
    >>> grid = EcospaceGrid.from_shapefile('grid.shp')
    >>> ecospace = EcospaceParams(grid, ...)
    >>> result = rsim_run_spatial(scenario, ecospace=ecospace)
    >>> spatial_biomass = result.out_Biomass_spatial  # [n_months, n_groups, n_patches]
    >>> total_biomass = result.out_Biomass  # [n_months, n_groups] (summed over patches)
    """
    # Backward compatibility: if no ecospace, use standard Ecosim
    if ecospace is None:
        from pypath.core.ecosim import rsim_run

        return rsim_run(scenario, method=method, years=years)

    # Import necessary functions
    from pypath.core.ecosim import DELTA_T, STEPS_PER_YEAR, rsim_run
    from pypath.spatial.ecospace_params import SpatialState

    # Validate method
    if method != "RK4":
        raise ValueError(f"Only RK4 method implemented for spatial, got '{method}'")

    # Setup years range
    if years is None:
        # Default: simulate all years in forcing
        n_months = scenario.forcing.ForcedPrey.shape[0]
        n_years = n_months // STEPS_PER_YEAR
        years = range(scenario.start_year, scenario.start_year + n_years)
    else:
        n_years = len(years)

    n_months = n_years * STEPS_PER_YEAR

    # Setup spatial dimensions
    n_patches = ecospace.grid.n_patches
    n_groups = scenario.params.NUM_GROUPS

    # Initialize spatial state
    # Expand initial state to spatial
    initial_biomass = scenario.start_state.Biomass  # [n_groups+1]

    # Create spatial initial state
    # Start with uniform distribution across patches
    state_spatial = SpatialState(
        Biomass=np.tile(initial_biomass[:, np.newaxis], (1, n_patches)) / n_patches
    )

    # Convert scenario to dictionary format for deriv function
    # Must match the key names that deriv_vector expects (same as rsim_run in ecosim.py)
    from pypath.core.ecosim import _build_active_link_matrix, _build_link_matrix

    params = scenario.params
    params_dict = {
        "NUM_GROUPS": params.NUM_GROUPS,
        "NUM_LIVING": params.NUM_LIVING,
        "NUM_DEAD": params.NUM_DEAD,
        "NUM_GEARS": params.NUM_GEARS,
        "PB": params.PBopt,
        "QB": params.FtimeQBOpt,
        "M0": params.MzeroMort,
        "Unassim": params.UnassimRespFrac,
        "ActiveLink": _build_active_link_matrix(params),
        "VV": _build_link_matrix(params, params.VV),
        "DD": _build_link_matrix(params, params.DD),
        "QQbase": _build_link_matrix(params, params.QQ),
        "Bbase": params.B_BaseRef,
        "B_BaseRef": params.B_BaseRef,
        "PP_type": params.PP_type,
        "NoIntegrate": params.NoIntegrate,
        "FishFrom": getattr(params, "FishFrom", np.array([])),
        "FishTo": getattr(params, "FishTo", np.array([])),
        "FishQ": getattr(params, "FishQ", np.array([])),
        "DetFrac": params.DetFrac,
        "DetFrom": params.DetFrom,
        "DetTo": params.DetTo,
    }

    # Pre-compute sparse link arrays for the consumption kernel
    from pypath.core.link_array import ActiveLinkArray

    _links = ActiveLinkArray.from_bool_matrix(params_dict["ActiveLink"])
    params_dict["_link_prey"] = _links.prey
    params_dict["_link_pred"] = _links.pred

    # Include IBM groups in params dict for deriv_vector
    if hasattr(params, "ibm_groups"):
        params_dict["ibm_groups"] = params.ibm_groups

    # Build fishing dict (constant across timesteps, same as rsim_run)
    fishing_dict = {
        "FishFrom": params.FishFrom,
        "FishThrough": params.FishThrough,
        "FishQ": params.FishQ,
        "FishingMort": np.zeros(n_groups + 1),
    }
    for i in range(1, len(params.FishFrom)):
        grp = int(params.FishFrom[i])
        if grp < n_groups + 1:
            fishing_dict["FishingMort"][grp] += params.FishQ[i]

    forcing = scenario.forcing
    fishing_obj = scenario.fishing

    # Storage for output (n_months + 1 rows: initial state + n_months of simulation)
    n_rows = n_months + 1
    out_Biomass_spatial = np.zeros((n_rows, n_groups + 1, n_patches), dtype=float)
    out_Biomass = np.zeros((n_rows, n_groups + 1), dtype=float)

    # Initial conditions
    out_Biomass_spatial[0] = state_spatial.Biomass
    out_Biomass[0] = state_spatial.collapse_to_total()

    # Time integration (RK4)
    current_biomass = state_spatial.Biomass.copy()

    # Ftime is static — snapshot once before the loop
    _ftime_snapshot = scenario.start_state.Ftime.copy()

    for month_idx in range(1, n_rows):
        t = month_idx * DELTA_T

        # Build per-timestep forcing dict (same structure as rsim_run)
        mi = month_idx - 1  # 0-based forcing index
        forcing_dict = {
            "Ftime": _ftime_snapshot,
            "ForcedBio": np.where(forcing.ForcedBio[mi] > 0, forcing.ForcedBio[mi], 0),
            "ForcedMigrate": forcing.ForcedMigrate[mi],
            "ForcedEffort": (
                fishing_obj.ForcedEffort[mi]
                if mi < len(fishing_obj.ForcedEffort)
                else np.ones(params.NUM_GEARS + 1)
            ),
        }

        # RK4 integration
        # k1 = f(t, y)
        k1 = deriv_vector_spatial(
            current_biomass,
            params_dict,
            forcing_dict,
            fishing_dict,
            ecospace,
            environmental_drivers,
            t=t,
            dt=DELTA_T,
        )

        # k2 = f(t + dt/2, y + k1*dt/2)
        k2 = deriv_vector_spatial(
            current_biomass + k1 * DELTA_T / 2,
            params_dict,
            forcing_dict,
            fishing_dict,
            ecospace,
            environmental_drivers,
            t=t + DELTA_T / 2,
            dt=DELTA_T,
        )

        # k3 = f(t + dt/2, y + k2*dt/2)
        k3 = deriv_vector_spatial(
            current_biomass + k2 * DELTA_T / 2,
            params_dict,
            forcing_dict,
            fishing_dict,
            ecospace,
            environmental_drivers,
            t=t + DELTA_T / 2,
            dt=DELTA_T,
        )

        # k4 = f(t + dt, y + k3*dt)
        k4 = deriv_vector_spatial(
            current_biomass + k3 * DELTA_T,
            params_dict,
            forcing_dict,
            fishing_dict,
            ecospace,
            environmental_drivers,
            t=t + DELTA_T,
            dt=DELTA_T,
        )

        # Update: y(t+dt) = y(t) + dt/6 * (k1 + 2*k2 + 2*k3 + k4)
        current_biomass = current_biomass + DELTA_T / 6 * (k1 + 2 * k2 + 2 * k3 + k4)

        # Prevent negative biomass
        current_biomass = np.maximum(current_biomass, 0.0)

        # Store results
        out_Biomass_spatial[month_idx] = current_biomass
        out_Biomass[month_idx] = current_biomass.sum(axis=1)  # Sum over patches

    # Create output (simplified for now - full output would include catch, etc.)
    from pypath.core.ecosim import RsimOutput, RsimState

    # Create end state
    end_state = RsimState(
        Biomass=out_Biomass[-1],
        N=scenario.start_state.N,  # Placeholder
        Ftime=scenario.start_state.Ftime,  # Placeholder
    )

    # Create output object
    n_fish_links = getattr(params, "NumFishingLinks", 0)
    n_pred_prey = getattr(params, "NumPredPreyLinks", 0)
    output = RsimOutput(
        out_Biomass=out_Biomass,
        out_Catch=np.zeros_like(out_Biomass),
        out_Gear_Catch=np.zeros((n_rows, n_fish_links)),
        annual_Biomass=np.zeros((n_years, n_groups + 1)),
        annual_Catch=np.zeros((n_years, n_groups + 1)),
        annual_QB=np.zeros((n_years, n_groups + 1)),
        annual_Qlink=np.zeros((n_years, n_pred_prey)),
        stanza_biomass=None,
        end_state=end_state,
        crash_year=-1,
        crashed_groups=set(),
        pred=np.array([]),
        prey=np.array([]),
        Gear_Catch_sp=np.array([]),
        Gear_Catch_gear=np.array([]),
        Gear_Catch_disp=np.array([]),
        start_state=scenario.start_state,
        params=params_dict,
    )

    # Add spatial output as new attribute
    output.out_Biomass_spatial = out_Biomass_spatial

    return output