Skip to content

I/O API Reference

EcoBase

pypath.io.ecobase

EcoBase database connector for PyPath.

This module provides functions to connect to the EcoBase database (http://ecobase.ecopath.org/) and download Ecopath model data.

EcoBase is a global repository of Ecopath models maintained by AGROCAMPUS OUEST (France).

Functions: - list_ecobase_models(): Get list of all available public models - get_ecobase_model(model_id): Download a specific model's data - ecobase_to_rpath(model_data): Convert EcoBase data to RpathParams

Example: >>> from pypath.io.ecobase import list_ecobase_models, get_ecobase_model >>> models = list_ecobase_models() >>> print(f"Found {len(models)} models") >>> model_data = get_ecobase_model(403) # Get specific model >>> rpath_params = ecobase_to_rpath(model_data)

EcoBaseGroupData dataclass

Data for a single functional group from EcoBase.

Attributes:

Name Type Description
group_seq int

Group sequence number (1-based)

group_name str

Name of the group

trophic_level float

Calculated trophic level

biomass float

Biomass (t/km²)

biomass_hab float

Biomass in habitat area

prod_biom float

Production/Biomass ratio (/year)

cons_biom float

Consumption/Biomass ratio (/year)

ecotrophic_eff float

Ecotrophic efficiency

prod_cons float

Production/Consumption ratio

unassim_cons float

Unassimilated consumption fraction

habitat_area float

Habitat area fraction

Source code in pypath/io/ecobase.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
@dataclass
class EcoBaseGroupData:
    """Data for a single functional group from EcoBase.

    Attributes
    ----------
    group_seq : int
        Group sequence number (1-based)
    group_name : str
        Name of the group
    trophic_level : float
        Calculated trophic level
    biomass : float
        Biomass (t/km²)
    biomass_hab : float
        Biomass in habitat area
    prod_biom : float
        Production/Biomass ratio (/year)
    cons_biom : float
        Consumption/Biomass ratio (/year)
    ecotrophic_eff : float
        Ecotrophic efficiency
    prod_cons : float
        Production/Consumption ratio
    unassim_cons : float
        Unassimilated consumption fraction
    habitat_area : float
        Habitat area fraction
    """

    group_seq: int
    group_name: str = ""
    trophic_level: float = 0.0
    biomass: float = 0.0
    biomass_hab: float = 0.0
    prod_biom: float = 0.0
    cons_biom: float = 0.0
    ecotrophic_eff: float = 0.0
    prod_cons: float = 0.0
    unassim_cons: float = 0.2
    habitat_area: float = 1.0
    group_type: int = 0  # 0=consumer, 1=producer, 2=detritus, 3=fleet

EcoBaseModel dataclass

Container for EcoBase model metadata.

Attributes:

Name Type Description
model_number int

Unique model identifier in EcoBase

model_name str

Name of the model

country str

Country/region of the ecosystem

ecosystem_type str

Type of ecosystem (marine, freshwater, etc.)

num_groups int

Number of functional groups

author str

Model author(s)

year int

Year of model creation

reference str

Publication reference

description str

Model description

dissemination_allow bool

Whether public access is allowed

Source code in pypath/io/ecobase.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
@dataclass
class EcoBaseModel:
    """Container for EcoBase model metadata.

    Attributes
    ----------
    model_number : int
        Unique model identifier in EcoBase
    model_name : str
        Name of the model
    country : str
        Country/region of the ecosystem
    ecosystem_type : str
        Type of ecosystem (marine, freshwater, etc.)
    num_groups : int
        Number of functional groups
    author : str
        Model author(s)
    year : int
        Year of model creation
    reference : str
        Publication reference
    description : str
        Model description
    dissemination_allow : bool
        Whether public access is allowed
    """

    model_number: int
    model_name: str = ""
    country: str = ""
    ecosystem_type: str = ""
    num_groups: int = 0
    author: str = ""
    year: int = 0
    reference: str = ""
    description: str = ""
    dissemination_allow: bool = True

download_ecobase_model_to_file

download_ecobase_model_to_file(model_id: int, output_path: str, format: str = 'csv') -> None

Download EcoBase model and save to file(s).

Parameters:

Name Type Description Default
model_id int

Model ID from EcoBase

required
output_path str

Base path for output files (without extension)

required
format str

Output format: 'csv', 'excel', 'json'

'csv'
Example

download_ecobase_model_to_file(403, "baltic_model", format="csv")

Creates: baltic_model_groups.csv, baltic_model_diet.csv
Source code in pypath/io/ecobase.py
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
def download_ecobase_model_to_file(
    model_id: int, output_path: str, format: str = "csv"
) -> None:
    """Download EcoBase model and save to file(s).

    Parameters
    ----------
    model_id : int
        Model ID from EcoBase
    output_path : str
        Base path for output files (without extension)
    format : str
        Output format: 'csv', 'excel', 'json'

    Example
    -------
    >>> download_ecobase_model_to_file(403, "baltic_model", format="csv")
    # Creates: baltic_model_groups.csv, baltic_model_diet.csv
    """
    model_data = get_ecobase_model(model_id)
    params = ecobase_to_rpath(model_data)

    if format == "csv":
        params.model.to_csv(f"{output_path}_groups.csv", index=False)
        params.diet.to_csv(f"{output_path}_diet.csv")
    elif format == "excel":
        with pd.ExcelWriter(f"{output_path}.xlsx") as writer:
            params.model.to_excel(writer, sheet_name="Groups", index=False)
            params.diet.to_excel(writer, sheet_name="Diet")
    elif format == "json":
        import json

        result = {
            "model": params.model.to_dict(orient="records"),
            "diet": params.diet.to_dict(),
        }
        with open(f"{output_path}.json", "w") as f:
            json.dump(result, f, indent=2)
    else:
        raise ValueError(f"Unknown format: {format}")

ecobase_to_rpath

ecobase_to_rpath(model_data: Dict[str, Any], include_fleets: bool = True, use_input_values: bool = True) -> RpathParams

Convert EcoBase model data to RpathParams.

Parameters:

Name Type Description Default
model_data dict

Model data from get_ecobase_model()

required
include_fleets bool

Whether to include fishing fleets

True
use_input_values bool

If True, prefer input values (before balancing) over output values. EcoBase stores both input (original) and output (balanced) parameters.

True

Returns:

Type Description
RpathParams

PyPath parameter structure ready for balancing

Example

model_data = get_ecobase_model(403) params = ecobase_to_rpath(model_data) from pypath.core.ecopath import rpath balanced = rpath(params)

Source code in pypath/io/ecobase.py
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
def ecobase_to_rpath(
    model_data: Dict[str, Any],
    include_fleets: bool = True,
    use_input_values: bool = True,
) -> RpathParams:
    """Convert EcoBase model data to RpathParams.

    Parameters
    ----------
    model_data : dict
        Model data from get_ecobase_model()
    include_fleets : bool
        Whether to include fishing fleets
    use_input_values : bool
        If True, prefer input values (before balancing) over output values.
        EcoBase stores both input (original) and output (balanced) parameters.

    Returns
    -------
    RpathParams
        PyPath parameter structure ready for balancing

    Example
    -------
    >>> model_data = get_ecobase_model(403)
    >>> params = ecobase_to_rpath(model_data)
    >>> from pypath.core.ecopath import rpath
    >>> balanced = rpath(params)
    """
    groups_data = model_data.get("groups", [])
    diet_data = model_data.get("diet", {})
    fleets_data = model_data.get("fleets", [])
    catches_data = model_data.get("catches", {})

    if not groups_data:
        raise ValueError("No group data found in model")

    # Classify groups
    group_names = []
    group_types = []  # 0=consumer, 1=producer, 2=detritus, 3=fleet

    for g in groups_data:
        name = g.get("group_name", g.get("name", f"Group_{len(group_names) + 1}"))
        group_names.append(name)

        # Determine type from various possible fields
        gtype = g.get("group_type", g.get("type", 0))
        if isinstance(gtype, str):
            gtype_lower = gtype.lower()
            if "producer" in gtype_lower or "primary" in gtype_lower:
                gtype = 1
            elif "detritus" in gtype_lower or "det" in gtype_lower:
                gtype = 2
            elif "fleet" in gtype_lower or "fish" in gtype_lower:
                gtype = 3
            else:
                gtype = 0

        # Also check if PB > 0 but QB = 0 for producers
        pb = g.get("prod_biom", g.get("pb", 0)) or 0
        qb = g.get("cons_biom", g.get("qb", 0)) or 0
        if pb > 0 and (qb == 0 or qb is None):
            gtype = 1

        group_types.append(int(gtype))

    # Add fleets if present and requested
    if include_fleets and fleets_data:
        for f in fleets_data:
            fleet_name = f.get(
                "fleet_name", f.get("name", f"Fleet_{len(group_names) + 1}")
            )
            group_names.append(fleet_name)
            group_types.append(3)

    # Create RpathParams
    params = create_rpath_params(groups=group_names, types=group_types)

    # Fill in group parameters
    # EcoBase field names:
    # - Numeric values are stored in: biomass, pb, qb, ee, gs, etc.
    # - Boolean flags (*_input) indicate if user entered the value or it was calculated
    # The actual values are ALWAYS in pb, qb, ee, biomass - the _input suffix is a boolean flag!
    for i, g in enumerate(groups_data):
        # Biomass - the numeric value is in 'biomass', not 'biomass_input'
        biomass = g.get("biomass", g.get("b", None))
        biomass_val = safe_float(biomass)
        if biomass_val is not None:
            params.model.loc[i, "Biomass"] = biomass_val

        # PB (P/B ratio) - the numeric value is in 'pb', not 'pb_input'
        pb = g.get("pb", g.get("prod_biom", None))
        pb_val = safe_float(pb)
        if pb_val is not None:
            params.model.loc[i, "PB"] = pb_val

        # QB (Q/B ratio) - the numeric value is in 'qb', not 'qb_input'
        qb = g.get("qb", g.get("cons_biom", None))
        qb_val = safe_float(qb)
        if qb_val is not None and group_types[i] != 1:  # Not for producers
            params.model.loc[i, "QB"] = qb_val

        # EE (Ecotrophic efficiency) - the numeric value is in 'ee', not 'ee_input'
        ee = g.get("ee", g.get("ecotrophic_eff", None))
        ee_val = safe_float(ee)
        if ee_val is not None:
            params.model.loc[i, "EE"] = ee_val

        # Unassimilated fraction (GS in EcoBase)
        unassim = g.get("gs", g.get("unassim_cons", 0.2))
        unassim_val = safe_float(unassim, default=0.2)
        if unassim_val is not None:
            params.model.loc[i, "Unassim"] = unassim_val

        # Biomass accumulation
        ba = g.get("biomass_accum", g.get("biomass_acc", g.get("ba", 0.0)))
        ba_val = safe_float(ba, default=0.0)
        if ba_val is not None:
            params.model.loc[i, "BioAcc"] = ba_val

    # Fill diet matrix
    # Note: params.diet has 'Group' as a column with prey names, not as index
    # We need to find the row by matching the Group column
    diet_groups = params.diet["Group"].tolist()

    for pred_name, prey_dict in diet_data.items():
        if pred_name in params.diet.columns:
            for prey_name, proportion in prey_dict.items():
                # Find the row index for this prey
                if prey_name in diet_groups:
                    row_idx = diet_groups.index(prey_name)
                    prop_val = safe_float(proportion, default=0.0)
                    if prop_val is not None and prop_val > 0:
                        params.diet.iloc[
                            row_idx, params.diet.columns.get_loc(pred_name)
                        ] = prop_val

    # Fill catch data
    if include_fleets and catches_data:
        for group_name, fleet_catches in catches_data.items():
            if group_name in params.model["Group"].values:
                group_idx = params.model[params.model["Group"] == group_name].index[0]
                for fleet_name, catch_data in fleet_catches.items():
                    if fleet_name in params.model.columns:
                        landings = safe_float(
                            catch_data.get("landings", 0), default=0.0
                        )
                        if landings is not None:
                            params.model.loc[group_idx, fleet_name] = landings

    # Store model name
    params.model_name = f"EcoBase Model {model_data.get('model_id', 'Unknown')}"

    return params

get_ecobase_model

get_ecobase_model(model_id: int, timeout: int = 60) -> Dict[str, Any]

Download a specific model from EcoBase.

Parameters:

Name Type Description Default
model_id int

Model number (from list_ecobase_models())

required
timeout int

Request timeout in seconds

60

Returns:

Type Description
dict

Dictionary containing: - 'metadata': Model metadata - 'groups': List of group data dictionaries - 'diet': Diet matrix as nested dict - 'raw_xml': Raw XML string for debugging

Example

model_data = get_ecobase_model(403) print(f"Model has {len(model_data['groups'])} groups")

Source code in pypath/io/ecobase.py
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
def get_ecobase_model(model_id: int, timeout: int = 60) -> Dict[str, Any]:
    """Download a specific model from EcoBase.

    Parameters
    ----------
    model_id : int
        Model number (from list_ecobase_models())
    timeout : int
        Request timeout in seconds

    Returns
    -------
    dict
        Dictionary containing:
        - 'metadata': Model metadata
        - 'groups': List of group data dictionaries
        - 'diet': Diet matrix as nested dict
        - 'raw_xml': Raw XML string for debugging

    Example
    -------
    >>> model_data = get_ecobase_model(403)
    >>> print(f"Model has {len(model_data['groups'])} groups")
    """
    url = f"{ECOBASE_MODEL_URL}{model_id}"

    try:
        xml_content = fetch_url(url, timeout=timeout, parse_json=False)
    except Exception as e:
        raise ConnectionError(f"Failed to download model {model_id}: {e}")

    # Parse XML
    try:
        root = _parse_xml(xml_content)
    except ET.ParseError as e:
        raise ValueError(f"Failed to parse model data: {e}")

    result = {
        "model_id": model_id,
        "metadata": {},
        "groups": [],
        "diet": {},
        "fleets": [],
        "catches": {},
        "raw_xml": xml_content,
    }

    # First pass: Build group_seq to group_name mapping
    group_seq_to_name = {}
    for group_elem in root.iter("group"):
        group_name = None
        group_seq = None
        for child in group_elem:
            if child.tag == "group_name":
                group_name = child.text
            elif child.tag == "group_seq":
                try:
                    group_seq = int(child.text) if child.text else None
                except ValueError:
                    group_seq = None
        if group_name and group_seq is not None:
            group_seq_to_name[group_seq] = group_name

    # Extract groups and diet data
    for group_elem in root.iter("group"):
        group_data = {}
        pred_name = None

        for child in group_elem:
            tag = child.tag
            text = child.text

            # Store group name for diet processing
            if tag == "group_name":
                pred_name = text

            # Handle diet_descr specially - extract nested diet elements
            if tag == "diet_descr":
                # Process nested diet elements
                for diet_elem in child.iter("diet"):
                    prey_seq = None
                    proportion = 0.0

                    for diet_child in diet_elem:
                        if diet_child.tag == "prey_seq":
                            try:
                                prey_seq = (
                                    int(diet_child.text) if diet_child.text else None
                                )
                            except ValueError:
                                prey_seq = None
                        elif diet_child.tag == "proportion":
                            try:
                                proportion = (
                                    float(diet_child.text) if diet_child.text else 0.0
                                )
                            except ValueError:
                                proportion = 0.0

                    # Map prey_seq to prey_name and store diet
                    if prey_seq is not None and proportion > 0 and pred_name:
                        prey_name = group_seq_to_name.get(prey_seq, f"Group_{prey_seq}")
                        if pred_name not in result["diet"]:
                            result["diet"][pred_name] = {}
                        result["diet"][pred_name][prey_name] = proportion
                continue

            # Try to convert values appropriately
            if text:
                text_lower = text.lower().strip()
                # Handle boolean strings first
                if text_lower in ("true", "false", "yes", "no"):
                    group_data[tag] = text_lower in ("true", "yes")
                else:
                    # Try numeric conversion
                    try:
                        if "." in text or (
                            "e" in text_lower and text_lower not in ("true", "false")
                        ):
                            group_data[tag] = float(text)
                        else:
                            group_data[tag] = int(text)
                    except ValueError:
                        group_data[tag] = text
            else:
                group_data[tag] = None

        if group_data:
            result["groups"].append(group_data)

    # Build group_id to group_name mapping for diet matrix
    group_id_to_name = {}
    for g in result["groups"]:
        gid = g.get(
            "group_seq", g.get("group_id", g.get("sequence", g.get("no", None)))
        )
        gname = g.get("group_name", g.get("name", None))
        if gid is not None and gname is not None:
            group_id_to_name[int(gid)] = gname

    # Extract diet from dc (diet composition) fields in groups
    # Format: dc fields contain "prey_id proportion" pairs
    for g in result["groups"]:
        pred_name = g.get("group_name", g.get("name", None))
        if not pred_name:
            continue

        # Look for dc fields (dc1, dc2, ... or dc_1, dc_2, ...)
        for key, value in g.items():
            if key.lower().startswith("dc") and value is not None:
                # Try to parse as "prey_id proportion" or just get prey_id
                try:
                    if isinstance(value, str) and " " in value:
                        parts = value.strip().split()
                        if len(parts) >= 2:
                            prey_id = int(parts[0])
                            proportion = float(parts[1])
                        else:
                            prey_id = int(parts[0])
                            proportion = 1.0
                    elif isinstance(value, (int, float)):
                        # Could be just a proportion or an ID
                        continue
                    else:
                        continue

                    # Map prey_id to name
                    prey_name = group_id_to_name.get(prey_id, f"Group_{prey_id}")

                    if proportion > 0:
                        if pred_name not in result["diet"]:
                            result["diet"][pred_name] = {}
                        result["diet"][pred_name][prey_name] = proportion
                except (ValueError, TypeError):
                    continue

    # Also try DietComp fields (another common format)
    for g in result["groups"]:
        pred_name = g.get("group_name", g.get("name", None))
        if not pred_name:
            continue

        # Look for DietComp, dietcomp fields
        for key, value in g.items():
            key_lower = key.lower()
            if (
                "dietcomp" in key_lower or "diet_comp" in key_lower
            ) and value is not None:
                try:
                    if isinstance(value, str) and " " in value:
                        parts = value.strip().split()
                        if len(parts) >= 2:
                            prey_id = int(parts[0])
                            proportion = float(parts[1])
                            prey_name = group_id_to_name.get(
                                prey_id, f"Group_{prey_id}"
                            )

                            if proportion > 0:
                                if pred_name not in result["diet"]:
                                    result["diet"][pred_name] = {}
                                result["diet"][pred_name][prey_name] = proportion
                except (ValueError, TypeError):
                    continue

    # Extract diet matrix from dedicated diet elements (alternative format)
    for diet_elem in root.iter("diet"):
        prey_name = None
        pred_name = None
        value = 0.0

        for child in diet_elem:
            if child.tag in ["prey", "prey_name", "from"]:
                prey_name = child.text
            elif child.tag in ["predator", "pred_name", "to"]:
                pred_name = child.text
            elif child.tag in ["diet", "value", "proportion"]:
                try:
                    value = float(child.text) if child.text else 0.0
                except ValueError:
                    value = 0.0

        if prey_name and pred_name and value > 0:
            if pred_name not in result["diet"]:
                result["diet"][pred_name] = {}
            result["diet"][pred_name][prey_name] = value

    # Alternative diet structure (nested in groups)
    for group_elem in root.iter("group"):
        group_name = None
        for child in group_elem:
            if child.tag in ["group_name", "name"]:
                group_name = child.text
                break

        if group_name:
            for diet_elem in group_elem.iter("diet_item"):
                prey_name = None
                value = 0.0
                for child in diet_elem:
                    if child.tag in ["prey", "prey_name"]:
                        prey_name = child.text
                    elif child.tag in ["proportion", "value", "diet"]:
                        try:
                            value = float(child.text) if child.text else 0.0
                        except ValueError:
                            value = 0.0

                if prey_name and value > 0:
                    if group_name not in result["diet"]:
                        result["diet"][group_name] = {}
                    result["diet"][group_name][prey_name] = value

    # Extract fleet/fishery data with catches from catch_descr
    for fleet_elem in root.iter("fleet"):
        fleet_data = {}
        fleet_name = None

        for child in fleet_elem:
            if child.tag == "fleet_name":
                fleet_name = child.text
            elif child.tag == "catch_descr":
                # Parse catch entries within fleet
                for catch_elem in child.findall("catch"):
                    group_seq = None
                    catch_value = 0.0
                    catch_type = None

                    for catch_child in catch_elem:
                        if catch_child.tag == "group_seq":
                            try:
                                group_seq = (
                                    int(catch_child.text) if catch_child.text else None
                                )
                            except ValueError:
                                group_seq = None
                        elif catch_child.tag == "catch_value":
                            try:
                                catch_value = (
                                    float(catch_child.text) if catch_child.text else 0.0
                                )
                            except ValueError:
                                catch_value = 0.0
                        elif catch_child.tag == "catch_type":
                            catch_type = (
                                catch_child.text.strip() if catch_child.text else None
                            )

                    # Store catches by fleet and group
                    if fleet_name and group_seq is not None and catch_type:
                        group_name = group_seq_to_name.get(
                            group_seq, f"Group_{group_seq}"
                        )

                        if group_name not in result["catches"]:
                            result["catches"][group_name] = {}
                        if fleet_name not in result["catches"][group_name]:
                            result["catches"][group_name][fleet_name] = {
                                "landings": 0.0,
                                "discards": 0.0,
                                "discard_mort": 0.0,
                                "market": 0.0,
                                "prop_mort": 0.0,
                            }

                        # Map catch types to our structure
                        if catch_type == "total landings":
                            result["catches"][group_name][fleet_name]["landings"] = (
                                catch_value
                            )
                        elif catch_type == "discards":
                            result["catches"][group_name][fleet_name]["discards"] = (
                                catch_value
                            )
                        elif catch_type == "market":
                            result["catches"][group_name][fleet_name]["market"] = (
                                catch_value
                            )
                        elif catch_type == "prop mort":
                            result["catches"][group_name][fleet_name]["prop_mort"] = (
                                catch_value
                            )
            else:
                fleet_data[child.tag] = child.text

        if fleet_name:
            fleet_data["fleet_name"] = fleet_name
            result["fleets"].append(fleet_data)

    # Extract catch data
    for catch_elem in root.iter("catch"):
        group_name = None
        fleet_name = None
        landings = 0.0
        discards = 0.0

        for child in catch_elem:
            if child.tag in ["group", "group_name"]:
                group_name = child.text
            elif child.tag in ["fleet", "fleet_name"]:
                fleet_name = child.text
            elif child.tag == "landings":
                try:
                    landings = float(child.text) if child.text else 0.0
                except ValueError:
                    landings = 0.0
            elif child.tag == "discards":
                try:
                    discards = float(child.text) if child.text else 0.0
                except ValueError:
                    discards = 0.0

        if group_name and fleet_name:
            if group_name not in result["catches"]:
                result["catches"][group_name] = {}
            # Only add if not already present from fleet/catch_descr parsing
            if fleet_name not in result["catches"][group_name]:
                result["catches"][group_name][fleet_name] = {
                    "landings": landings,
                    "discards": discards,
                }
            else:
                # Update only if values are provided
                if landings > 0:
                    result["catches"][group_name][fleet_name]["landings"] = landings
                if discards > 0:
                    result["catches"][group_name][fleet_name]["discards"] = discards

    return result

list_ecobase_models

list_ecobase_models(filter_public: bool = True, timeout: int = 60) -> pd.DataFrame

Get list of available Ecopath models from EcoBase.

Connects to the EcoBase SOAP API and retrieves metadata for all available models.

Parameters:

Name Type Description Default
filter_public bool

If True, only return models with public access allowed

True
timeout int

Request timeout in seconds

60

Returns:

Type Description
DataFrame

DataFrame with model metadata including: - model_number: Unique ID - model_name: Name - country: Location - ecosystem_type: Type - num_groups: Number of groups - author: Author(s) - year: Year - reference: Publication

Example

models = list_ecobase_models() print(f"Found {len(models)} public models")

Filter by ecosystem type

marine = models[models['ecosystem_type'].str.contains('marine', case=False)]

Source code in pypath/io/ecobase.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
def list_ecobase_models(filter_public: bool = True, timeout: int = 60) -> pd.DataFrame:
    """Get list of available Ecopath models from EcoBase.

    Connects to the EcoBase SOAP API and retrieves metadata for
    all available models.

    Parameters
    ----------
    filter_public : bool
        If True, only return models with public access allowed
    timeout : int
        Request timeout in seconds

    Returns
    -------
    pd.DataFrame
        DataFrame with model metadata including:
        - model_number: Unique ID
        - model_name: Name
        - country: Location
        - ecosystem_type: Type
        - num_groups: Number of groups
        - author: Author(s)
        - year: Year
        - reference: Publication

    Example
    -------
    >>> models = list_ecobase_models()
    >>> print(f"Found {len(models)} public models")
    >>> # Filter by ecosystem type
    >>> marine = models[models['ecosystem_type'].str.contains('marine', case=False)]
    """
    try:
        xml_content = fetch_url(ECOBASE_LIST_URL, timeout=timeout, parse_json=False)
    except Exception as e:
        raise ConnectionError(f"Failed to connect to EcoBase: {e}")

    # Parse XML response
    try:
        root = _parse_xml(xml_content)
    except ET.ParseError as e:
        raise ValueError(f"Failed to parse EcoBase response: {e}")

    # Extract model data
    models = []

    # Navigate through SOAP envelope to find model data
    # The structure varies, so we try multiple paths
    for model_elem in root.iter("model"):
        model_data = {}
        for child in model_elem:
            tag = child.tag.replace("{http://schemas.xmlsoap.org/soap/envelope/}", "")
            model_data[tag] = child.text

        if model_data:
            try:
                model = {
                    "model_number": int(
                        model_data.get("model_number", model_data.get("no_model", 0))
                    ),
                    "model_name": model_data.get(
                        "model_name", model_data.get("name", "")
                    ),
                    "country": model_data.get(
                        "country", model_data.get("location", "")
                    ),
                    "ecosystem_type": model_data.get(
                        "ecosystem_type", model_data.get("type", "")
                    ),
                    "num_groups": int(
                        model_data.get("number_group", model_data.get("nb_group", 0))
                        or 0
                    ),
                    "author": model_data.get("author", ""),
                    "year": int(model_data.get("year", 0) or 0),
                    "reference": model_data.get("reference", ""),
                    "dissemination_allow": model_data.get(
                        "dissemination_allow", "true"
                    ).lower()
                    == "true",
                }
                models.append(model)
            except (ValueError, TypeError):
                continue

    # Also try alternative XML structure
    if not models:
        for item in root.iter():
            if "model" in item.tag.lower() or item.tag == "item":
                model_data = {child.tag: child.text for child in item}
                if model_data and any(
                    k in model_data for k in ["model_number", "no_model", "model_name"]
                ):
                    try:
                        model = {
                            "model_number": int(
                                model_data.get(
                                    "model_number", model_data.get("no_model", 0)
                                )
                                or 0
                            ),
                            "model_name": str(
                                model_data.get("model_name", model_data.get("name", ""))
                            ),
                            "country": str(
                                model_data.get(
                                    "country", model_data.get("location", "")
                                )
                            ),
                            "ecosystem_type": str(
                                model_data.get(
                                    "ecosystem_type", model_data.get("type", "")
                                )
                            ),
                            "num_groups": int(
                                model_data.get(
                                    "number_group", model_data.get("nb_group", 0)
                                )
                                or 0
                            ),
                            "author": str(model_data.get("author", "")),
                            "year": int(model_data.get("year", 0) or 0),
                            "reference": str(model_data.get("reference", "")),
                            "dissemination_allow": str(
                                model_data.get("dissemination_allow", "true")
                            ).lower()
                            == "true",
                        }
                        if model["model_number"] > 0:
                            models.append(model)
                    except (ValueError, TypeError):
                        continue

    df = pd.DataFrame(models)

    if filter_public and "dissemination_allow" in df.columns:
        df = df[df["dissemination_allow"]].copy()

    return df

search_ecobase_models

search_ecobase_models(query: str, field: str = 'all', models_df: Optional[DataFrame] = None) -> pd.DataFrame

Search EcoBase models by keyword.

Parameters:

Name Type Description Default
query str

Search term

required
field str

Field to search: 'all', 'model_name', 'country', 'ecosystem_type', 'author'

'all'
models_df DataFrame

Pre-fetched models DataFrame. If None, will fetch from EcoBase.

None

Returns:

Type Description
DataFrame

Matching models

Example

results = search_ecobase_models("Baltic") results = search_ecobase_models("coral", field="ecosystem_type")

Source code in pypath/io/ecobase.py
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
def search_ecobase_models(
    query: str, field: str = "all", models_df: Optional[pd.DataFrame] = None
) -> pd.DataFrame:
    """Search EcoBase models by keyword.

    Parameters
    ----------
    query : str
        Search term
    field : str
        Field to search: 'all', 'model_name', 'country', 'ecosystem_type', 'author'
    models_df : pd.DataFrame, optional
        Pre-fetched models DataFrame. If None, will fetch from EcoBase.

    Returns
    -------
    pd.DataFrame
        Matching models

    Example
    -------
    >>> results = search_ecobase_models("Baltic")
    >>> results = search_ecobase_models("coral", field="ecosystem_type")
    """
    if models_df is None:
        models_df = list_ecobase_models()

    query_lower = query.lower()

    # Reset index to avoid alignment issues
    models_df = models_df.reset_index(drop=True)

    if field == "all":
        # Search across all text fields
        mask = pd.Series([False] * len(models_df), index=models_df.index)
        for col in ["model_name", "country", "ecosystem_type", "author", "reference"]:
            if col in models_df.columns:
                col_mask = (
                    models_df[col]
                    .astype(str)
                    .str.lower()
                    .str.contains(query_lower, na=False)
                )
                mask = mask | col_mask
        return models_df[mask].copy().reset_index(drop=True)
    else:
        if field not in models_df.columns:
            raise ValueError(f"Unknown field: {field}")
        mask = (
            models_df[field].astype(str).str.lower().str.contains(query_lower, na=False)
        )
        return models_df[mask].copy().reset_index(drop=True)

EwE Database (.eweaccdb)

pypath.io.ewemdb

EwE Database (ewemdb) file reader for PyPath.

This module provides functions to read Ecopath with Ecosim database files (.ewemdb format), which are Microsoft Access database files.

The ewemdb format is the native file format for EwE 6.x software. These files contain all model parameters, diet matrices, time series, and simulation settings.

Requirements: - pyodbc (Windows with Access drivers) - pypyodbc (alternative) - or: mdbtools + pandas (Linux/Mac)

Functions: - read_ewemdb(filepath): Read an ewemdb file and return RpathParams - list_ewemdb_tables(filepath): List all tables in the database - read_ewemdb_table(filepath, table): Read a specific table as DataFrame

Example: >>> from pypath.io.ewemdb import read_ewemdb >>> params = read_ewemdb("my_model.ewemdb") >>> from pypath.core.ecopath import rpath >>> balanced = rpath(params)

EwEDatabaseError

Bases: Exception

Exception for EwE database errors.

Source code in pypath/io/ewemdb.py
110
111
112
113
class EwEDatabaseError(Exception):
    """Exception for EwE database errors."""

    pass

check_ewemdb_support

check_ewemdb_support() -> Dict[str, bool]

Check what database drivers are available.

Returns:

Type Description
dict

Dictionary indicating available drivers: - pyodbc: True if pyodbc is installed - pypyodbc: True if pypyodbc is installed - mdb_tools: True if mdb-tools is available - any_available: True if any driver works

Source code in pypath/io/ewemdb.py
2880
2881
2882
2883
2884
2885
2886
2887
2888
2889
2890
2891
2892
2893
2894
2895
2896
2897
def check_ewemdb_support() -> Dict[str, bool]:
    """Check what database drivers are available.

    Returns
    -------
    dict
        Dictionary indicating available drivers:
        - pyodbc: True if pyodbc is installed
        - pypyodbc: True if pypyodbc is installed
        - mdb_tools: True if mdb-tools is available
        - any_available: True if any driver works
    """
    return {
        "pyodbc": HAS_PYODBC,
        "pypyodbc": HAS_PYPYODBC,
        "mdb_tools": HAS_MDB_TOOLS,
        "any_available": HAS_PYODBC or HAS_PYPYODBC or HAS_MDB_TOOLS,
    }

ecosim_scenario_from_ewemdb

ecosim_scenario_from_ewemdb(filepath: str, scenario: Optional[Union[int, str]] = 1, balance: bool = True, years: Optional[range] = None) -> 'RsimScenario'

Convenience: create a full RsimScenario from an EwE database scenario.

Parameters:

Name Type Description Default
filepath str

Path to .ewemdb file

required
scenario int or str

Scenario ID (int) or name (str) to select

1
balance bool

Whether to run Ecopath balancing via :func:pypath.core.ecopath.rpath to create a balanced Rpath model. If False, the input params must already be balanced (not recommended).

True
years range

Years to simulate. If None, derived from scenario metadata.

None

Returns:

Type Description
RsimScenario

Ready-to-run scenario object (can be passed to :func:rsim_run).

Example

scen = ecosim_scenario_from_ewemdb('model.ewemdb', scenario=1) out = rsim_run(scen, method='RK4', years=range(1, 11))

Source code in pypath/io/ewemdb.py
2639
2640
2641
2642
2643
2644
2645
2646
2647
2648
2649
2650
2651
2652
2653
2654
2655
2656
2657
2658
2659
2660
2661
2662
2663
2664
2665
2666
2667
2668
2669
2670
2671
2672
2673
2674
2675
2676
2677
2678
2679
2680
2681
2682
2683
2684
2685
2686
2687
2688
2689
2690
2691
2692
2693
2694
2695
2696
2697
2698
2699
2700
2701
2702
2703
2704
2705
2706
2707
2708
2709
2710
2711
2712
2713
2714
2715
2716
2717
2718
2719
2720
2721
2722
2723
2724
2725
2726
2727
2728
2729
2730
2731
2732
2733
2734
2735
2736
2737
2738
2739
2740
2741
2742
2743
2744
2745
2746
2747
2748
2749
2750
2751
2752
2753
2754
2755
2756
def ecosim_scenario_from_ewemdb(
    filepath: str,
    scenario: Optional[Union[int, str]] = 1,
    balance: bool = True,
    years: Optional[range] = None,
) -> "RsimScenario":
    """Convenience: create a full RsimScenario from an EwE database scenario.

    Parameters
    ----------
    filepath : str
        Path to .ewemdb file
    scenario : int or str
        Scenario ID (int) or name (str) to select
    balance : bool
        Whether to run Ecopath balancing via :func:`pypath.core.ecopath.rpath`
        to create a balanced Rpath model. If False, the input params must
        already be balanced (not recommended).
    years : range, optional
        Years to simulate. If None, derived from scenario metadata.

    Returns
    -------
    RsimScenario
        Ready-to-run scenario object (can be passed to :func:`rsim_run`).

    Example
    -------
    >>> scen = ecosim_scenario_from_ewemdb('model.ewemdb', scenario=1)
    >>> out = rsim_run(scen, method='RK4', years=range(1, 11))
    """
    # Local imports to avoid circular dependencies at module import time
    from pypath.core.ecopath import rpath
    from pypath.core.ecosim import rsim_scenario

    params = read_ewemdb(filepath, include_ecosim=True)

    if getattr(params, "ecosim", None) is None or not params.ecosim.get(
        "has_ecosim", False
    ):
        raise EwEDatabaseError("No Ecosim scenarios found in the database")

    # Select scenario by id or name
    selected = None
    for sc in params.ecosim["scenarios"]:
        if isinstance(scenario, int) and sc.get("id") == scenario:
            selected = sc
            break
        if isinstance(scenario, str) and sc.get("name", "").lower() == scenario.lower():
            selected = sc
            break
    if selected is None:
        raise EwEDatabaseError(f"Scenario {scenario} not found in EwE DB")

    # Use years if provided, else derive from scenario
    if years is None:
        start = (
            int(selected.get("start_year"))
            if selected.get("start_year") is not None
            else 1
        )
        num = (
            int(selected.get("num_years"))
            if selected.get("num_years") is not None
            else 1
        )
        # Ensure at least two years for RsimScenario compatibility
        if num < 2:
            logger.info(
                f"Raising number of years from {num} to 2 for scenario {selected.get('name')}"
            )
            num = 2
        years = range(start, start + num)

    # Balance via rpath — required to produce an Rpath object for rsim_scenario
    if not balance:
        logger.warning(
            "balance=False requested but rpath() is still needed to build the "
            "Rpath structure; the model will be balanced regardless."
        )
    try:
        balanced = rpath(params)
    except Exception as e:
        raise EwEDatabaseError(f"Failed to balance Ecopath model: {e}") from e

    # Create RsimScenario
    rsim = rsim_scenario(balanced, params, years=years)

    # Replace default forcing/fishing with ones parsed from the DB if available
    try:
        if "rsim_forcing" in selected:
            rsim.forcing = selected["rsim_forcing"]
        if "rsim_fishing" in selected:
            rsim.fishing = selected["rsim_fishing"]
    except (AttributeError, TypeError, ValueError):
        # Be defensive: leave defaults if replacement fails
        pass

    # Try to construct and attach EcospaceParams if ecospace tables exist
    try:
        ecospace_tables = selected.get("ecospace") or _map_ecospace_tables(filepath)
        # Use Rsim parameter species names (which include 'Outside' at index 0) to align indices
        try:
            rsim_group_names = rsim.params.spname
        except AttributeError:
            rsim_group_names = params.model["Group"].tolist()
        ecospace_params = _construct_ecospace_params(ecospace_tables, rsim_group_names)
        if ecospace_params is not None:
            rsim.ecospace = ecospace_params
    except Exception as e:
        logger.exception("Failed to construct EcospaceParams: %s", e)
        # Leave ecospace as None if construction fails
        rsim.ecospace = None

    # Attach metadata for convenience
    rsim._from_ewemdb = {"filepath": filepath, "scenario_meta": selected}

    return rsim

get_ewemdb_metadata

get_ewemdb_metadata(filepath: str) -> Dict[str, Any]

Get metadata from an EwE database file.

Parameters:

Name Type Description Default
filepath str

Path to the ewemdb file

required

Returns:

Type Description
dict

Dictionary with model metadata including: - name: Model name - description: Model description - author: Author name - date: Creation date - version: EwE version - num_groups: Number of groups - num_fleets: Number of fleets

Source code in pypath/io/ewemdb.py
2759
2760
2761
2762
2763
2764
2765
2766
2767
2768
2769
2770
2771
2772
2773
2774
2775
2776
2777
2778
2779
2780
2781
2782
2783
2784
2785
2786
2787
2788
2789
2790
2791
2792
2793
2794
2795
2796
2797
2798
2799
2800
2801
2802
2803
2804
2805
2806
2807
2808
2809
2810
2811
2812
2813
2814
2815
2816
2817
2818
2819
2820
2821
2822
2823
2824
2825
2826
2827
2828
2829
2830
2831
2832
2833
2834
2835
2836
2837
2838
2839
2840
2841
2842
2843
2844
2845
2846
2847
2848
2849
2850
2851
2852
2853
2854
2855
2856
2857
2858
2859
2860
2861
2862
2863
2864
2865
2866
2867
2868
2869
2870
2871
2872
2873
2874
2875
2876
2877
def get_ewemdb_metadata(filepath: str) -> Dict[str, Any]:
    """Get metadata from an EwE database file.

    Parameters
    ----------
    filepath : str
        Path to the ewemdb file

    Returns
    -------
    dict
        Dictionary with model metadata including:
        - name: Model name
        - description: Model description
        - author: Author name
        - date: Creation date
        - version: EwE version
        - num_groups: Number of groups
        - num_fleets: Number of fleets
    """
    filepath = str(Path(filepath).resolve())

    metadata = {
        "name": Path(filepath).stem,
        "description": "",
        "author": "",
        "date": "",
        "version": "",
        "num_groups": 0,
        "num_fleets": 0,
        "num_scenarios": 0,
        "scenarios": [],
        "has_ecosim": False,
        "has_ecospace": False,
        "filepath": filepath,
    }

    try:
        # Try to read model info table
        info_tables = ["EcopathModel", "Model", "ModelInfo", "EwEModel"]
        info_df = None

        for table in info_tables:
            try:
                info_df = read_ewemdb_table(filepath, table)
                break
            except (EwEDatabaseError, FileNotFoundError, ValueError, KeyError):
                continue

        if info_df is not None and len(info_df) > 0:
            row = info_df.iloc[0]

            name_cols = ["ModelName", "Name", "Title"]
            for col in name_cols:
                if col in row and row[col]:
                    metadata["name"] = str(row[col])
                    break

            desc_cols = ["Description", "Notes", "Comments"]
            for col in desc_cols:
                if col in row and row[col]:
                    metadata["description"] = str(row[col])
                    break

            author_cols = ["Author", "Creator", "Contact"]
            for col in author_cols:
                if col in row and row[col]:
                    metadata["author"] = str(row[col])
                    break

        # Count groups and fleets
        try:
            groups_df = read_ewemdb_table(filepath, "EcopathGroup")
            metadata["num_groups"] = len(groups_df)
        except (EwEDatabaseError, FileNotFoundError, ValueError, KeyError):
            logger.debug(
                "Failed to read EcopathGroup table for metadata", exc_info=True
            )

        try:
            fleet_df = read_ewemdb_table(filepath, "EcopathFleet")
            metadata["num_fleets"] = len(fleet_df)
        except (EwEDatabaseError, FileNotFoundError, ValueError, KeyError):
            logger.debug(
                "Failed to read EcopathFleet table for metadata", exc_info=True
            )

        # Check for Ecosim scenarios
        try:
            ecosim_df = read_ewemdb_table(filepath, "EcosimScenario")
            if len(ecosim_df) > 0:
                metadata["has_ecosim"] = True
                metadata["num_scenarios"] = len(ecosim_df)
                # Get scenario names
                name_col = next(
                    (c for c in ["ScenarioName", "Name"] if c in ecosim_df.columns),
                    None,
                )
                if name_col:
                    metadata["scenarios"] = ecosim_df[name_col].tolist()
        except (EwEDatabaseError, FileNotFoundError, ValueError, KeyError):
            logger.debug(
                "Failed to read EcosimScenario table for metadata", exc_info=True
            )

        # Check for Ecospace
        try:
            ecospace_df = read_ewemdb_table(filepath, "EcospaceScenario")
            if len(ecospace_df) > 0:
                metadata["has_ecospace"] = True
        except (EwEDatabaseError, FileNotFoundError, ValueError, KeyError):
            logger.debug(
                "Failed to read EcospaceScenario table for metadata", exc_info=True
            )

    except Exception as e:
        warnings.warn(f"Could not read all metadata: {e}")

    return metadata

list_ewemdb_tables

list_ewemdb_tables(filepath: str) -> List[str]

List all tables in an EwE database file.

Parameters:

Name Type Description Default
filepath str

Path to the ewemdb file

required

Returns:

Type Description
list

List of table names

Example

tables = list_ewemdb_tables("model.ewemdb") print(tables) ['EcopathGroup', 'EcopathDietComp', 'EcopathFleet', ...]

Source code in pypath/io/ewemdb.py
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
def list_ewemdb_tables(filepath: str) -> List[str]:
    """List all tables in an EwE database file.

    Parameters
    ----------
    filepath : str
        Path to the ewemdb file

    Returns
    -------
    list
        List of table names

    Example
    -------
    >>> tables = list_ewemdb_tables("model.ewemdb")
    >>> print(tables)
    ['EcopathGroup', 'EcopathDietComp', 'EcopathFleet', ...]
    """
    filepath = str(Path(filepath).resolve())

    if not Path(filepath).exists():
        raise FileNotFoundError(f"File not found: {filepath}")

    # Try mdb-tools first (cross-platform)
    if HAS_MDB_TOOLS:
        return _list_mdb_tables(filepath)

    # Try pyodbc
    if HAS_PYODBC or HAS_PYPYODBC:
        conn_str = _get_connection_string(filepath)
        try:
            conn = pyodbc.connect(conn_str)
            try:
                cursor = conn.cursor()
                tables = [row.table_name for row in cursor.tables(tableType="TABLE")]
                return tables
            finally:
                conn.close()
        except EwEDatabaseError:
            raise
        except Exception as e:
            raise EwEDatabaseError(f"Failed to connect to database: {e}")

    raise EwEDatabaseError("No database driver available. Install pyodbc or mdb-tools.")

read_ewemdb

read_ewemdb(filepath: str, scenario: int = 1, include_ecosim: bool = False) -> RpathParams

Read an EwE database file and convert to RpathParams.

Parameters:

Name Type Description Default
filepath str

Path to the ewemdb file

required
scenario int

Scenario number to load (default: 1)

1
include_ecosim bool

Whether to read Ecosim parameters (not yet implemented)

False

Returns:

Type Description
RpathParams

PyPath parameter structure ready for balancing

Example

params = read_ewemdb("my_model.ewemdb") from pypath.core.ecopath import rpath balanced = rpath(params)

Notes

The ewemdb format uses Microsoft Access database structure. Key tables include: - EcopathGroup: Group parameters (biomass, P/B, Q/B, etc.) - EcopathDietComp: Diet composition matrix - EcopathFleet: Fleet definitions - EcopathCatch: Catch data by fleet and group - Stanza: Multi-stanza group definitions - StanzaLifeStage: Life stage parameters

Source code in pypath/io/ewemdb.py
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
def read_ewemdb(
    filepath: str, scenario: int = 1, include_ecosim: bool = False
) -> RpathParams:
    """Read an EwE database file and convert to RpathParams.

    Parameters
    ----------
    filepath : str
        Path to the ewemdb file
    scenario : int
        Scenario number to load (default: 1)
    include_ecosim : bool
        Whether to read Ecosim parameters (not yet implemented)

    Returns
    -------
    RpathParams
        PyPath parameter structure ready for balancing

    Example
    -------
    >>> params = read_ewemdb("my_model.ewemdb")
    >>> from pypath.core.ecopath import rpath
    >>> balanced = rpath(params)

    Notes
    -----
    The ewemdb format uses Microsoft Access database structure.
    Key tables include:
    - EcopathGroup: Group parameters (biomass, P/B, Q/B, etc.)
    - EcopathDietComp: Diet composition matrix
    - EcopathFleet: Fleet definitions
    - EcopathCatch: Catch data by fleet and group
    - Stanza: Multi-stanza group definitions
    - StanzaLifeStage: Life stage parameters
    """
    filepath = str(Path(filepath).resolve())

    if not Path(filepath).exists():
        raise FileNotFoundError(f"File not found: {filepath}")

    # Check file extension
    suffix = Path(filepath).suffix.lower()
    if suffix not in [".ewemdb", ".eweaccdb", ".ewe", ".mdb", ".accdb"]:
        warnings.warn(f"Unexpected file extension: {suffix}")

    # Read main tables
    try:
        groups_df = read_ewemdb_table(filepath, "EcopathGroup")
    except (EwEDatabaseError, FileNotFoundError, ValueError, KeyError):
        # Try alternative table names
        try:
            groups_df = read_ewemdb_table(filepath, "Group")
        except (EwEDatabaseError, FileNotFoundError, ValueError, KeyError) as e:
            raise EwEDatabaseError(f"Could not find group data: {e}")

    try:
        diet_df = read_ewemdb_table(filepath, "EcopathDietComp")
    except (EwEDatabaseError, FileNotFoundError, ValueError, KeyError):
        try:
            diet_df = read_ewemdb_table(filepath, "DietComp")
        except (EwEDatabaseError, FileNotFoundError, ValueError, KeyError) as e:
            diet_df = None
            logger.warning("Could not read diet composition data: %s", e)

    try:
        fleet_df = read_ewemdb_table(filepath, "EcopathFleet")
    except (EwEDatabaseError, FileNotFoundError, ValueError, KeyError) as e:
        try:
            fleet_df = read_ewemdb_table(filepath, "Fleet")
        except (EwEDatabaseError, FileNotFoundError, ValueError, KeyError):
            fleet_df = None
            logger.debug("Could not read fleet data: %s", e)

    try:
        catch_df = read_ewemdb_table(filepath, "EcopathCatch")
    except (EwEDatabaseError, FileNotFoundError, ValueError, KeyError) as e:
        try:
            catch_df = read_ewemdb_table(filepath, "Catch")
        except (EwEDatabaseError, FileNotFoundError, ValueError, KeyError):
            catch_df = None
            logger.debug("Could not read catch data: %s", e)

    # Try to read Auxillary table (contains cell-level remarks in EwE 6.6+)
    auxillary_df = None
    try:
        auxillary_df = read_ewemdb_table(filepath, "Auxillary")
        # Filter to only rows with remarks
        auxillary_df = auxillary_df[
            auxillary_df["Remark"].notna() & (auxillary_df["Remark"] != "")
        ]
        logger.debug("Found Auxillary table with %d remarks", len(auxillary_df))
    except (EwEDatabaseError, FileNotFoundError, ValueError, KeyError) as e:
        logger.debug("Could not read Auxillary table: %s", e)

    # Filter by scenario if needed
    if "ScenarioID" in groups_df.columns:
        groups_df = groups_df[groups_df["ScenarioID"] == scenario].copy()

    # Extract group information
    # Column names vary between EwE versions, so we try multiple options
    name_cols = ["GroupName", "Name", "group_name", "name"]
    name_col = next((c for c in name_cols if c in groups_df.columns), None)

    if name_col is None:
        raise EwEDatabaseError("Could not find group name column")

    # Get group names and types
    group_names = groups_df[name_col].tolist()

    # Determine group types
    type_cols = ["Type", "GroupType", "type", "PP"]
    type_col = next((c for c in type_cols if c in groups_df.columns), None)

    if type_col:
        # EwE types: 0=consumer, 1=producer, 2=detritus, 3=fleet
        # Some versions use: 0=normal, 1=PP=1, 2=PP=2 (detritus)
        raw_types = groups_df[type_col].fillna(0).astype(int).tolist()

        # Convert PP values to our types if needed
        pp_col = "PP" if "PP" in groups_df.columns else None
        if pp_col and type_col != "PP":
            pp_values = groups_df[pp_col].fillna(0).tolist()
            group_types = []
            for i, (t, pp) in enumerate(zip(raw_types, pp_values)):
                if pp == 1:  # Primary producer
                    group_types.append(1)
                elif pp == 2:  # Detritus
                    group_types.append(2)
                elif t == 3:  # Fleet
                    group_types.append(3)
                else:
                    group_types.append(0)  # Consumer
        else:
            group_types = raw_types
    else:
        # Guess types based on Q/B values
        qb_col = next(
            (
                c
                for c in ["QB", "QoverB", "ConsumptionBiomass"]
                if c in groups_df.columns
            ),
            None,
        )
        if qb_col:
            qb_values = groups_df[qb_col].fillna(0)
            # Producer/detritus if QB is 0 or NaN, consumer otherwise
            group_types = [1 if qb == 0 else 0 for qb in qb_values]
        else:
            group_types = [0] * len(groups_df)  # Default to consumer

    # Create RpathParams
    params = create_rpath_params(group_names, group_types)

    # Map columns to RpathParams
    column_mapping = {
        "Biomass": ["Biomass", "B", "biomass", "BiomassAreaInput"],
        "PB": ["PB", "PoverB", "ProductionBiomass", "ProdBiom"],
        "QB": ["QB", "QoverB", "ConsumptionBiomass", "ConsBiom"],
        "EE": ["EE", "EcotrophicEfficiency", "Ecotrophic", "EcotrophEff"],
        "ProdCons": ["GE", "ProdCons", "GrossEfficiency", "PoverQ"],
        "Unassim": ["GS", "Unassim", "UnassimilatedConsumption"],
        "BioAcc": ["BA", "BioAcc", "BiomassAccumulation", "BiomassAccum"],
        "DetInput": ["DetInput", "DetritalInput", "ImmigEmig"],
    }

    # Map remarks columns - EwE stores remarks as separate columns
    # Different EwE versions use different column names
    _remarks_mapping = {
        "Biomass": [
            "BRemarks",
            "BiomassRemarks",
            "BRemark",
            "Remark",
            "Remarks",
            "Comment",
            "Comments",
            "Note",
            "Notes",
        ],
        "PB": ["PBRemarks", "PBRemark", "ProductionRemarks"],
        "QB": ["QBRemarks", "QBRemark", "ConsumptionRemarks"],
        "EE": ["EERemarks", "EERemark", "EcotrophicRemarks"],
        "ProdCons": ["GERemarks", "ProdConsRemarks"],
        "Unassim": ["GSRemarks", "UnassimRemarks"],
        "BioAcc": ["BARemarks", "BioAccRemarks"],
        "DetInput": ["DetInputRemarks"],
    }

    for param_name, possible_cols in column_mapping.items():
        for col in possible_cols:
            if col in groups_df.columns:
                values = groups_df[col].fillna(np.nan).tolist()
                params.model[param_name] = values
                break

    # Extract remarks if available and create remarks DataFrame
    remarks_data = {"Group": group_names}
    has_any_remarks = False
    found_remarks_cols = []

    # Create ID to group name mapping
    id_col = next(
        (
            c
            for c in ["GroupID", "ID", "Sequence", "GroupSeq"]
            if c in groups_df.columns
        ),
        None,
    )
    if id_col:
        id_to_name = dict(zip(groups_df[id_col].tolist(), group_names))
    else:
        id_to_name = {i + 1: name for i, name in enumerate(group_names)}

    # Map VarName to our parameter names
    varname_to_param = {
        "BiomassAreaInput": "Biomass",
        "Biomass": "Biomass",
        "B": "Biomass",
        "PBInput": "PB",
        "PB": "PB",
        "ProdBiom": "PB",
        "QBInput": "QB",
        "QB": "QB",
        "ConsBiom": "QB",
        "EEInput": "EE",
        "EE": "EE",
        "EcotrophEff": "EE",
        "GE": "ProdCons",
        "ProdCons": "ProdCons",
        "GEInput": "ProdCons",
        "GS": "Unassim",
        "Unassim": "Unassim",
        "GSInput": "Unassim",
        "BA": "BioAcc",
        "BioAcc": "BioAcc",
        "BAInput": "BioAcc",
        "BioAccRate": "BioAcc",
        "BiomassAccum": "BioAcc",
        "DetInput": "DetInput",
        "DetritalInput": "DetInput",
        "Area": "Area",
        "HabitatArea": "Area",
        "BiomassHabArea": "Area",
    }

    # Initialize remarks lists for each parameter
    for param in [
        "Biomass",
        "PB",
        "QB",
        "EE",
        "ProdCons",
        "Unassim",
        "BioAcc",
        "DetInput",
        "Area",
    ]:
        remarks_data[param] = [""] * len(group_names)

    # PRIMARY METHOD: Extract remarks from Auxillary table (EwE 6.6+)
    # ValueID format: "EcoPathGroupInput:<GroupID>:<VarName>"
    if auxillary_df is not None and len(auxillary_df) > 0:
        logger.debug("Processing %d remarks from Auxillary table", len(auxillary_df))

        import re

        # Pattern to match: EcoPathGroupInput:<GroupID>:<VarName>
        pattern = re.compile(r"EcoPathGroupInput:(\d+):(\w+)")

        for _, row in auxillary_df.iterrows():
            value_id = str(row.get("ValueID", ""))
            remark = str(row.get("Remark", "")).strip()

            if not remark:
                continue

            match = pattern.match(value_id)
            if match:
                group_id = int(match.group(1))
                var_name = match.group(2)

                # Find group name
                group_name = id_to_name.get(group_id)
                if group_name and group_name in group_names:
                    group_idx = group_names.index(group_name)

                    # Map variable name to parameter
                    param_name = varname_to_param.get(var_name, var_name)

                    if param_name in remarks_data:
                        remarks_data[param_name][group_idx] = remark
                        has_any_remarks = True
                        if param_name not in found_remarks_cols:
                            found_remarks_cols.append(param_name)

        if found_remarks_cols:
            logger.debug("Found remarks for parameters: %s", found_remarks_cols)

    if has_any_remarks:
        params.remarks = pd.DataFrame(remarks_data)
        logger.debug(
            "Created remarks DataFrame with %d parameter columns",
            len(found_remarks_cols),
        )
        # Count total non-empty remarks
        total_remarks = sum(
            1 for param in found_remarks_cols for r in remarks_data.get(param, []) if r
        )
        logger.debug("Total non-empty remarks: %d", total_remarks)
    else:
        logger.debug("No remarks found in EwE database file")

    # Read diet composition
    if diet_df is not None and len(diet_df) > 0:
        # Diet table structure varies:
        # Option 1: PreyID, PredID, Diet
        # Option 2: PreyName, PredName, Proportion
        # Option 3: Wide format with predators as columns
        # Option 4: GroupID, PreyID, Diet (EwE 6 format)

        prey_cols = [
            "PreyID",
            "PreyGroupID",
            "Prey",
            "PreyName",
            "prey_id",
            "GroupIDPrey",
        ]
        pred_cols = [
            "PredID",
            "PredGroupID",
            "Predator",
            "PredName",
            "pred_id",
            "GroupID",
            "GroupIDPred",
        ]
        value_cols = ["Diet", "Proportion", "DietComp", "Value", "DC", "DietValue"]

        prey_col = next((c for c in prey_cols if c in diet_df.columns), None)
        pred_col = next((c for c in pred_cols if c in diet_df.columns), None)
        value_col = next((c for c in value_cols if c in diet_df.columns), None)

        # Debug: show what columns were found
        logger.debug(
            "Diet columns: %s, Found prey=%s, pred=%s, value=%s",
            diet_df.columns.tolist(),
            prey_col,
            pred_col,
            value_col,
        )

        if prey_col and pred_col and value_col:
            # Long format - pivot to wide
            # Filter by scenario if needed
            if "ScenarioID" in diet_df.columns:
                diet_df = diet_df[diet_df["ScenarioID"] == scenario]

            # Create ID to name mapping
            id_col = next(
                (
                    c
                    for c in ["GroupID", "ID", "Sequence", "GroupSeq"]
                    if c in groups_df.columns
                ),
                None,
            )

            if id_col:
                id_to_name = dict(zip(groups_df[id_col], groups_df[name_col]))

                # Convert IDs to names if columns contain IDs
                if "ID" in prey_col or prey_col in ["GroupIDPrey"]:
                    diet_df = diet_df.copy()
                    diet_df["PreyName"] = diet_df[prey_col].map(id_to_name)
                    prey_col = "PreyName"

                if "ID" in pred_col or pred_col in ["GroupID", "GroupIDPred"]:
                    diet_df = diet_df.copy()
                    diet_df["PredName"] = diet_df[pred_col].map(id_to_name)
                    pred_col = "PredName"

            # Build diet matrix
            # Note: params.diet has 'Group' as a column with prey names, not as index
            diet_groups = params.diet["Group"].tolist()

            for pred_name in group_names:
                pred_diet = diet_df[diet_df[pred_col] == pred_name]
                for _, row in pred_diet.iterrows():
                    prey_name = row[prey_col]
                    value = row[value_col]
                    if pd.notna(prey_name) and pd.notna(value) and float(value) > 0:
                        # Find the row index for this prey
                        if (
                            prey_name in diet_groups
                            and pred_name in params.diet.columns
                        ):
                            row_idx = diet_groups.index(prey_name)
                            params.diet.iloc[
                                row_idx, params.diet.columns.get_loc(pred_name)
                            ] = float(value)

        # Alternative: Try wide format where columns are predator names
        elif len(diet_df.columns) > 2:
            # Wide format: rows are prey, columns are predators
            # First column might be prey names
            diet_groups = params.diet["Group"].tolist()
            first_col = diet_df.columns[0]
            if first_col.lower() in ["group", "prey", "preyname", "groupname", "name"]:
                for col in diet_df.columns[1:]:
                    if col in params.diet.columns:
                        for idx, row in diet_df.iterrows():
                            prey_name = row[first_col]
                            value = row[col]
                            if pd.notna(prey_name) and pd.notna(value) and value > 0:
                                if prey_name in diet_groups:
                                    row_idx = diet_groups.index(prey_name)
                                    params.diet.iloc[
                                        row_idx, params.diet.columns.get_loc(col)
                                    ] = float(value)

    # Read fleet/catch data
    if fleet_df is not None and catch_df is not None:
        # Add fleet columns to model
        fleet_name_col = next(
            (c for c in ["FleetName", "Name", "Fleet"] if c in fleet_df.columns), None
        )
        if fleet_name_col:
            fleet_names = fleet_df[fleet_name_col].tolist()

            # Add landing columns
            for fleet in fleet_names:
                if fleet not in params.model.columns:
                    params.model[fleet] = 0.0

            # Fill in catch data
            if catch_df is not None:
                group_col = next(
                    (
                        c
                        for c in ["GroupID", "GroupName", "Group"]
                        if c in catch_df.columns
                    ),
                    None,
                )
                fleet_col = next(
                    (
                        c
                        for c in ["FleetID", "FleetName", "Fleet"]
                        if c in catch_df.columns
                    ),
                    None,
                )
                land_col = next(
                    (
                        c
                        for c in ["Landing", "Landings", "Catch"]
                        if c in catch_df.columns
                    ),
                    None,
                )
                _disc_col = next(
                    (c for c in ["Discard", "Discards"] if c in catch_df.columns), None
                )

                if group_col and fleet_col and land_col:
                    for _, row in catch_df.iterrows():
                        group = row[group_col]
                        fleet = row[fleet_col]
                        landing = row.get(land_col, 0) or 0

                        # Map IDs to names if needed
                        if isinstance(group, (int, float)) and not pd.isna(group):
                            id_col = next(
                                (
                                    c
                                    for c in ["GroupID", "ID", "Sequence"]
                                    if c in groups_df.columns
                                ),
                                None,
                            )
                            if id_col:
                                id_to_name = dict(
                                    zip(groups_df[id_col], groups_df[name_col])
                                )
                                group = id_to_name.get(int(group), group)

                        if isinstance(fleet, (int, float)) and not pd.isna(fleet):
                            id_col = next(
                                (
                                    c
                                    for c in ["FleetID", "ID", "Sequence"]
                                    if c in fleet_df.columns
                                ),
                                None,
                            )
                            if id_col:
                                id_to_name = dict(
                                    zip(fleet_df[id_col], fleet_df[fleet_name_col])
                                )
                                fleet = id_to_name.get(int(fleet), fleet)

                        if (
                            group in params.model["Group"].values
                            and fleet in params.model.columns
                        ):
                            idx = params.model[params.model["Group"] == group].index[0]
                            params.model.loc[idx, fleet] = landing

    # Read multi-stanza data
    try:
        stanza_df = read_ewemdb_table(filepath, "Stanza")
        stanza_life_df = read_ewemdb_table(filepath, "StanzaLifeStage")

        if len(stanza_df) > 0 and len(stanza_life_df) > 0:
            logger.debug(
                "Found %d stanza groups, %d life stages",
                len(stanza_df),
                len(stanza_life_df),
            )

            # Get ID to name mapping
            id_col = next(
                (
                    c
                    for c in ["GroupID", "ID", "Sequence", "GroupSeq"]
                    if c in groups_df.columns
                ),
                None,
            )
            if id_col:
                id_to_name = dict(zip(groups_df[id_col].tolist(), group_names))
            else:
                id_to_name = {i + 1: name for i, name in enumerate(group_names)}

            # Build stgroups DataFrame (one row per stanza group)
            stgroups_data = []
            for _, row in stanza_df.iterrows():
                stanza_id = row.get("StanzaID", row.get("ID", 0))
                stanza_name = row.get(
                    "StanzaName", row.get("Name", f"Stanza{stanza_id}")
                )

                # Count life stages for this stanza
                life_stages = stanza_life_df[stanza_life_df["StanzaID"] == stanza_id]
                n_stanzas = len(life_stages)

                # Get VBGF K from life stages (usually same for all stages)
                vbk = None
                if "vbK" in life_stages.columns and len(life_stages) > 0:
                    vbk = life_stages["vbK"].iloc[0]

                stgroups_data.append(
                    {
                        "StGroupNum": stanza_id,
                        "StanzaGroup": stanza_name,
                        "nstanzas": n_stanzas,
                        "VBGF_Ksp": vbk,
                        "VBGF_d": row.get("WmatWinf", np.nan),
                        "Wmat": row.get("WmatWinf", np.nan),
                        "RecPower": row.get("RecPower", np.nan),
                    }
                )

            # Build stindiv DataFrame (one row per life stage)
            stindiv_data = []
            for _, row in stanza_life_df.iterrows():
                stanza_id = row.get("StanzaID", 0)
                group_id = row.get("GroupID", 0)
                group_name = id_to_name.get(group_id, f"Group{group_id}")

                # Find stanza name
                stanza_row = stanza_df[stanza_df["StanzaID"] == stanza_id]
                stanza_name = (
                    stanza_row["StanzaName"].iloc[0]
                    if len(stanza_row) > 0
                    else f"Stanza{stanza_id}"
                )

                stindiv_data.append(
                    {
                        "StGroupNum": stanza_id,
                        "StanzaGroup": stanza_name,
                        "StanzaNum": row.get("Sequence", 1),
                        "Group": group_name,
                        "First": row.get("AgeStart", 0),
                        "Last": np.nan,  # Will be calculated from next stage's First
                        "Z": row.get("Mortality", np.nan),
                        "Leading": (
                            row.get("Sequence", 1)
                            == stanza_df[stanza_df["StanzaID"] == stanza_id][
                                "LeadingLifeStage"
                            ].iloc[0]
                            if len(stanza_df[stanza_df["StanzaID"] == stanza_id]) > 0
                            else False
                        ),
                    }
                )

            # Calculate Last values (First of next stage - 1, or max for last stage)
            stindiv_data_df = pd.DataFrame(stindiv_data)
            for stanza_id in stindiv_data_df["StGroupNum"].unique():
                mask = stindiv_data_df["StGroupNum"] == stanza_id
                stages = stindiv_data_df[mask].sort_values("StanzaNum")
                for i, (idx, stage) in enumerate(stages.iterrows()):
                    if i < len(stages) - 1:
                        next_first = stages.iloc[i + 1]["First"]
                        stindiv_data_df.loc[idx, "Last"] = next_first - 1
                    else:
                        stindiv_data_df.loc[idx, "Last"] = 999  # Max age for last stage

            params.stanzas.n_stanza_groups = len(stanza_df)
            params.stanzas.stgroups = pd.DataFrame(stgroups_data)
            params.stanzas.stindiv = stindiv_data_df

            logger.debug(
                "Populated stanza params: %d groups",
                params.stanzas.n_stanza_groups,
            )
    except (
        EwEDatabaseError,
        FileNotFoundError,
        ValueError,
        KeyError,
        IndexError,
        TypeError,
    ) as e:
        logger.debug("Could not read stanza tables: %s", e)

    # OPTIONAL: Read Ecosim scenarios and associated time-series if requested
    if include_ecosim:
        ecosim_meta: Dict[str, Any] = {"has_ecosim": False, "scenarios": []}
        ecosim_df = None
        frate_df = None
        catch_yr_df = None
        # Try common table names
        ecosim_df = _try_read_table_variants(
            filepath,
            [
                "EcosimScenario",
                "EcosimScenarios",
                "EcosimScenarioTable",
                "Ecosim Scenario",
                "Ecosim_Scenario",
            ],
        )
        if ecosim_df is not None and len(ecosim_df) > 0:
            ecosim_meta["has_ecosim"] = True
            # Try to also load auxiliary tables once using a set of common variants
            forcing_df = _try_read_table_variants(
                filepath,
                [
                    "EcosimForcing",
                    "EcosimForcings",
                    "EcosimForcingTable",
                    "Ecosim Forcing",
                    "Ecosim_Forced",
                ],
            )
            fishing_df = _try_read_table_variants(
                filepath,
                [
                    "EcosimFishing",
                    "EcosimEffort",
                    "EcosimEfforts",
                    "EcosimFishingTable",
                    "EcosimEffortTable",
                ],
            )
            # Also try annual FRate / Catch tables
            frate_df = _try_read_table_variants(
                filepath,
                [
                    "EcosimFRate",
                    "EcosimFRateTable",
                    "Ecosim_FRate",
                    "EcosimAnnualFRate",
                ],
            )
            catch_yr_df = _try_read_table_variants(
                filepath,
                [
                    "EcosimCatch",
                    "EcosimAnnualCatch",
                    "EcosimCatchTable",
                    "Ecosim_Annual_Catch",
                ],
            )
            # Ecospace tables
            _try_read_table_variants(
                filepath,
                [
                    "EcospaceHabitat",
                    "EcospaceLayer",
                    "Ecospace_Habitat",
                    "Ecospace Habitat",
                ],
            )
            _try_read_table_variants(
                filepath,
                ["EcospaceGrid", "Ecospace_Grid", "EcospaceGridTable"],
            )
            _try_read_table_variants(
                filepath,
                [
                    "EcospaceDispersal",
                    "EcospaceDispersalTable",
                    "Ecospace_Dispersal",
                ],
            )

            for _, row in ecosim_df.iterrows():
                sid = row.get("ScenarioID", row.get("ID", None))
                name = row.get("ScenarioName", row.get("Name", f"Scenario{sid}"))
                start = row.get("StartYear", row.get("Start", None))
                end = row.get("EndYear", row.get("End", None))
                num_years = row.get("NumYears")
                if num_years is None and start is not None and end is not None:
                    try:
                        num_years = int(end) - int(start) + 1
                    except (ValueError, TypeError):
                        num_years = None

                scen: Dict[str, Any] = {
                    "id": sid,
                    "name": str(name) if name is not None else None,
                    "start_year": start,
                    "end_year": end,
                    "num_years": num_years,
                    "start_month": row.get("StartMonth")
                    or row.get("Start Month")
                    or row.get("Start_Month")
                    or 1,
                    "description": row.get("Description", ""),
                }

                # Filter forcing/fishing dataframes by ScenarioID if present
                if forcing_df is not None:
                    if sid is not None and "ScenarioID" in forcing_df.columns:
                        fdf = forcing_df[forcing_df["ScenarioID"] == sid].copy()
                    else:
                        fdf = forcing_df.copy()
                    scen["forcing_df"] = fdf
                    # Parse into structured time series
                    try:
                        # Detect if forcing DF uses month-label columns like M1..M12 or Month1..Month12
                        month_label_relative = any(
                            str(c).lower().startswith("m")
                            and str(c)[1:].isdigit()
                            and 1 <= int(str(c)[1:]) <= 12
                            for c in fdf.columns
                        )
                        forcing_ts = _parse_ecosim_forcing(
                            fdf,
                            start_month=int(scen.get("start_month", 1)),
                            month_label_relative=month_label_relative,
                        )
                        scen["forcing_ts"] = forcing_ts
                        # If scenario contains start_year and num_years, resample to monthly
                        if (
                            scen.get("start_year") is not None
                            and scen.get("num_years") is not None
                        ):
                            try:
                                scen["forcing_monthly"] = _resample_to_monthly(
                                    forcing_ts,
                                    int(scen["start_year"]),
                                    int(scen["num_years"]),
                                    start_month=int(scen.get("start_month", 1)),
                                    use_actual_month_lengths=False,
                                )
                                # If forcing_monthly contains single-column parameter data and the model has
                                # a single group, rename that lone column to the group's name for convenience
                                if group_names is not None and len(group_names) == 1:
                                    gname = group_names[0]
                                    for k, v in list(scen["forcing_monthly"].items()):
                                        if str(k).startswith("_"):
                                            continue
                                        if (
                                            isinstance(v, pd.DataFrame)
                                            and v.shape[1] == 1
                                        ):
                                            v.columns = [gname]
                                            scen["forcing_monthly"][k] = v
                                # Build forcing matrices aligned to model groups (if available later)
                                try:
                                    scen["forcing_matrices"] = _build_forcing_matrices(
                                        {
                                            **scen["forcing_monthly"],
                                            "_times": forcing_ts["_times"],
                                            "_monthly_times": scen["forcing_monthly"][
                                                "_monthly_times"
                                            ],
                                        },
                                        group_names,
                                        int(scen["start_year"]),
                                        int(scen["num_years"]),
                                    )
                                    # Build Rsim dataclasses if possible
                                    try:
                                        from pypath.core.ecosim import (
                                            RsimFishing,
                                            RsimForcing,
                                        )

                                        rf = scen.get("forcing_matrices", None)
                                        ff = scen.get("fishing_monthly", None)
                                        if rf is not None:
                                            # Use matrices from rf
                                            ForcedPrey = rf.get("ForcedPrey")
                                            ForcedMort = rf.get("ForcedMort")
                                            ForcedRecs = rf.get("ForcedRecs")
                                            ForcedSearch = rf.get("ForcedSearch")
                                            ForcedActresp = rf.get("ForcedActresp")
                                            ForcedMigrate = rf.get("ForcedMigrate")
                                            ForcedBio = rf.get("ForcedBio")
                                        else:
                                            ForcedPrey = ForcedMort = ForcedRecs = (
                                                ForcedSearch
                                            ) = ForcedActresp = ForcedMigrate = (
                                                ForcedBio
                                            ) = None

                                        ForcedEffort = None
                                        if ff is not None:
                                            # ff may include 'Effort' key as DataFrame
                                            Effort_df = ff.get("Effort")
                                            if isinstance(Effort_df, pd.DataFrame):
                                                # build numpy array months x (n_gears+1)
                                                months = Effort_df.shape[0]
                                                n_gears = len(Effort_df.columns)
                                                arr = np.ones(
                                                    (months, n_gears + 1), dtype=float
                                                )
                                                for i, col in enumerate(
                                                    Effort_df.columns, start=1
                                                ):
                                                    arr[:, i] = (
                                                        Effort_df[col]
                                                        .astype(float)
                                                        .values
                                                    )
                                                ForcedEffort = arr
                                            else:
                                                # scalar series
                                                try:
                                                    arr = np.asarray(ff.get("Effort"))
                                                    months = len(arr)
                                                    ForcedEffort = np.ones(
                                                        (months, 1), dtype=float
                                                    )
                                                    ForcedEffort[:, 0] = arr
                                                except (
                                                    ValueError,
                                                    TypeError,
                                                    IndexError,
                                                ):
                                                    ForcedEffort = None

                                        # create dataclasses
                                        try:
                                            rsim_forcing = RsimForcing(
                                                ForcedPrey=(
                                                    np.asarray(ForcedPrey)
                                                    if ForcedPrey is not None
                                                    else np.ones(
                                                        (
                                                            int(scen["num_years"]) * 12,
                                                            len(group_names) + 1,
                                                        )
                                                    )
                                                ),
                                                ForcedMort=(
                                                    np.asarray(ForcedMort)
                                                    if ForcedMort is not None
                                                    else np.ones(
                                                        (
                                                            int(scen["num_years"]) * 12,
                                                            len(group_names) + 1,
                                                        )
                                                    )
                                                ),
                                                ForcedRecs=(
                                                    np.asarray(ForcedRecs)
                                                    if ForcedRecs is not None
                                                    else np.ones(
                                                        (
                                                            int(scen["num_years"]) * 12,
                                                            len(group_names) + 1,
                                                        )
                                                    )
                                                ),
                                                ForcedSearch=(
                                                    np.asarray(ForcedSearch)
                                                    if ForcedSearch is not None
                                                    else np.ones(
                                                        (
                                                            int(scen["num_years"]) * 12,
                                                            len(group_names) + 1,
                                                        )
                                                    )
                                                ),
                                                ForcedActresp=(
                                                    np.asarray(ForcedActresp)
                                                    if ForcedActresp is not None
                                                    else np.ones(
                                                        (
                                                            int(scen["num_years"]) * 12,
                                                            len(group_names) + 1,
                                                        )
                                                    )
                                                ),
                                                ForcedMigrate=(
                                                    np.asarray(ForcedMigrate)
                                                    if ForcedMigrate is not None
                                                    else np.zeros(
                                                        (
                                                            int(scen["num_years"]) * 12,
                                                            len(group_names) + 1,
                                                        )
                                                    )
                                                ),
                                                ForcedBio=(
                                                    np.asarray(ForcedBio)
                                                    if ForcedBio is not None
                                                    else np.full(
                                                        (
                                                            int(scen["num_years"]) * 12,
                                                            len(group_names) + 1,
                                                        ),
                                                        -1.0,
                                                    )
                                                ),
                                                ForcedEffort=ForcedEffort,
                                            )
                                            scen["rsim_forcing"] = rsim_forcing
                                        except (
                                            ValueError,
                                            TypeError,
                                            KeyError,
                                            IndexError,
                                        ) as _e:
                                            logger.debug(
                                                f"Failed to construct RsimForcing: {_e}"
                                            )

                                        # Build RsimFishing (annual matrices if available)
                                        try:
                                            n_years = (
                                                int(scen["num_years"])
                                                if scen.get("num_years") is not None
                                                else 0
                                            )
                                            n_bio = len(group_names) + 1
                                            # Parse annual FRATE and CATCH if present
                                            # Use pre-read annual tables if available, else try common variants
                                            frate_tbl = frate_df
                                            catch_tbl = catch_yr_df
                                            if frate_tbl is None:
                                                frate_tbl = _try_read_table_variants(
                                                    filepath,
                                                    [
                                                        "EcosimFRate",
                                                        "EcosimFRateTable",
                                                        "Ecosim_FRate",
                                                        "EcosimAnnualFRate",
                                                    ],
                                                )
                                            if catch_tbl is None:
                                                catch_tbl = _try_read_table_variants(
                                                    filepath,
                                                    [
                                                        "EcosimCatch",
                                                        "EcosimAnnualCatch",
                                                        "EcosimCatchTable",
                                                        "Ecosim_Annual_Catch",
                                                    ],
                                                )

                                            annual = _parse_annual_fishing(
                                                frate_tbl,
                                                catch_tbl,
                                                group_names,
                                                scen.get("start_year"),
                                                scen.get("num_years"),
                                                scenario_id=sid,
                                            )

                                            frate = annual.get(
                                                "FRate", np.zeros((n_years, n_bio))
                                            )
                                            fcatch = annual.get(
                                                "Catch", np.zeros((n_years, n_bio))
                                            )

                                            rsim_fishing = RsimFishing(
                                                ForcedEffort=(
                                                    ForcedEffort
                                                    if ForcedEffort is not None
                                                    else np.ones(
                                                        (int(scen["num_years"]) * 12, 1)
                                                    )
                                                ),
                                                ForcedFRate=frate,
                                                ForcedCatch=fcatch,
                                            )
                                            scen["rsim_fishing"] = rsim_fishing
                                        except (
                                            ValueError,
                                            TypeError,
                                            KeyError,
                                            IndexError,
                                        ) as _e:
                                            logger.debug(
                                                f"Failed to construct RsimFishing: {_e}"
                                            )
                                    except (
                                        ImportError,
                                        ValueError,
                                        TypeError,
                                        KeyError,
                                    ) as _e:
                                        logger.debug(
                                            f"Failed to import Rsim dataclasses or construct them: {_e}"
                                        )
                                except (
                                    ValueError,
                                    TypeError,
                                    KeyError,
                                    IndexError,
                                ) as _e:
                                    logger.debug(
                                        f"Failed to build forcing matrices for scenario {sid}: {_e}"
                                    )
                            except (ValueError, TypeError, KeyError, IndexError) as _e:
                                logger.debug(
                                    f"Failed to resample forcing monthly for scenario {sid}: {_e}"
                                )
                    except (ValueError, TypeError, KeyError, IndexError) as _e:
                        logger.debug(
                            f"Failed to parse forcing for scenario {sid}: {_e}"
                        )
                if fishing_df is not None:
                    if sid is not None and "ScenarioID" in fishing_df.columns:
                        ff = fishing_df[fishing_df["ScenarioID"] == sid].copy()
                    else:
                        ff = fishing_df.copy()
                    scen["fishing_df"] = ff
                    try:
                        month_label_relative_f = any(
                            str(c).lower().startswith("m")
                            and str(c)[1:].isdigit()
                            and 1 <= int(str(c)[1:]) <= 12
                            for c in ff.columns
                        )
                        fishing_ts = _parse_ecosim_fishing(
                            ff,
                            start_month=int(scen.get("start_month", 1)),
                            month_label_relative=month_label_relative_f,
                        )
                        scen["fishing_ts"] = fishing_ts
                        if (
                            scen.get("start_year") is not None
                            and scen.get("num_years") is not None
                        ):
                            try:
                                scen["fishing_monthly"] = (
                                    _resample_fishing_pivot_to_monthly(
                                        fishing_ts,
                                        int(scen["start_year"]),
                                        int(scen["num_years"]),
                                        start_month=int(scen.get("start_month", 1)),
                                        use_actual_month_lengths=False,
                                    )
                                )
                            except (ValueError, TypeError, KeyError, IndexError) as _e:
                                logger.debug(
                                    f"Failed to resample fishing monthly for scenario {sid}: {_e}"
                                )
                    except (ValueError, TypeError, KeyError, IndexError) as _e:
                        logger.debug(
                            f"Failed to parse fishing for scenario {sid}: {_e}"
                        )

                # Try to attach ecospace tables if present
                try:
                    ecospace_tables = _map_ecospace_tables(filepath)
                    if ecospace_tables:
                        scen["ecospace"] = ecospace_tables
                except (EwEDatabaseError, FileNotFoundError, ValueError, KeyError) as e:
                    logger.debug("Could not read ecospace tables: %s", e)

                ecosim_meta["scenarios"].append(scen)
        params.ecosim = ecosim_meta

    return params

read_ewemdb_table

read_ewemdb_table(filepath: str, table: str, columns: Optional[List[str]] = None) -> pd.DataFrame

Read a specific table from an EwE database.

Parameters:

Name Type Description Default
filepath str

Path to the ewemdb file

required
table str

Name of the table to read

required
columns list

Specific columns to read. If None, reads all columns.

None

Returns:

Type Description
DataFrame

Table data as DataFrame

Example

groups = read_ewemdb_table("model.ewemdb", "EcopathGroup") print(groups.columns)

Source code in pypath/io/ewemdb.py
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
def read_ewemdb_table(
    filepath: str, table: str, columns: Optional[List[str]] = None
) -> pd.DataFrame:
    """Read a specific table from an EwE database.

    Parameters
    ----------
    filepath : str
        Path to the ewemdb file
    table : str
        Name of the table to read
    columns : list, optional
        Specific columns to read. If None, reads all columns.

    Returns
    -------
    pd.DataFrame
        Table data as DataFrame

    Example
    -------
    >>> groups = read_ewemdb_table("model.ewemdb", "EcopathGroup")
    >>> print(groups.columns)
    """
    filepath = str(Path(filepath).resolve())

    if not Path(filepath).exists():
        raise FileNotFoundError(f"File not found: {filepath}")

    # Validate identifiers before building SQL
    _validate_sql_identifier(table, "table")
    if columns:
        for col in columns:
            _validate_sql_identifier(col, "column")

    # Try mdb-tools first
    if HAS_MDB_TOOLS:
        df = _read_mdb_with_tools(filepath, table)
        if columns:
            df = df[[c for c in columns if c in df.columns]]
        return df

    # Try pyodbc
    if HAS_PYODBC or HAS_PYPYODBC:
        conn_str = _get_connection_string(filepath)
        try:
            conn = pyodbc.connect(conn_str)
            try:
                if columns:
                    col_str = ", ".join([f"[{c}]" for c in columns])
                    query = f"SELECT {col_str} FROM [{table}]"
                else:
                    query = f"SELECT * FROM [{table}]"

                df = pd.read_sql(query, conn)
                return df
            finally:
                conn.close()
        except EwEDatabaseError:
            raise
        except Exception as e:
            raise EwEDatabaseError(f"Failed to read table {table}: {e}")

    raise EwEDatabaseError("No database driver available. Install pyodbc or mdb-tools.")

Biological Data (WoRMS/OBIS/FishBase)

pypath.io.biodata

Biodiversity data integration for PyPath.

This module provides functions to retrieve species information from global biodiversity databases and convert it to Ecopath parameters.

Data sources: - WoRMS (World Register of Marine Species): Taxonomy and nomenclature - OBIS (Ocean Biodiversity Information System): Occurrence data - FishBase: Trait data (diet, trophic level, growth parameters)

Requirements: - pyworms (pip install pyworms) - pyobis (pip install pyobis) - requests (for FishBase API)

Main workflow: Common name → WoRMS → AphiaID → Scientific name → OBIS + FishBase → RpathParams

Functions: - get_species_info(): Get comprehensive species data - batch_get_species_info(): Process multiple species in parallel - biodata_to_rpath(): Convert biodiversity data to RpathParams

Example: >>> from pypath.io.biodata import get_species_info, biodata_to_rpath >>> # Get data for a single species >>> info = get_species_info("Atlantic cod") >>> print(f"Scientific name: {info.scientific_name}") 'Gadus morhua' >>> print(f"Trophic level: {info.trophic_level}") 4.4 >>> >>> # Batch process multiple species >>> species = ["Atlantic cod", "Herring", "Sprat"] >>> df = batch_get_species_info(species) >>> >>> # Convert to Rpath parameters >>> biomass = {'Atlantic cod': 2.0, 'Herring': 5.0, 'Sprat': 8.0} >>> params = biodata_to_rpath(df, biomass_estimates=biomass) >>> from pypath.core.ecopath import rpath >>> balanced = rpath(params)

APIConnectionError

Bases: BiodataError

Raised when API connection fails.

Source code in pypath/io/biodata.py
112
113
114
115
class APIConnectionError(BiodataError):
    """Raised when API connection fails."""

    pass

AmbiguousSpeciesError

Bases: BiodataError

Raised when multiple species match the query.

Source code in pypath/io/biodata.py
118
119
120
121
122
123
class AmbiguousSpeciesError(BiodataError):
    """Raised when multiple species match the query."""

    def __init__(self, matches: List[Dict], message: str):
        super().__init__(message)
        self.matches = matches

BiodataError

Bases: Exception

Base exception for biodiversity data errors.

Source code in pypath/io/biodata.py
100
101
102
103
class BiodataError(Exception):
    """Base exception for biodiversity data errors."""

    pass

BiodiversityCache

In-memory LRU cache with TTL for API responses.

Implements caching with time-to-live for each entry to reduce API load. Stores results keyed by (source, identifier) tuples.

Parameters:

Name Type Description Default
maxsize int

Maximum number of cached entries

1000
ttl_seconds int

Time-to-live for cached entries in seconds

3600

Examples:

>>> cache = BiodiversityCache(maxsize=1000, ttl_seconds=3600)
>>> cache.set('worms', 'Atlantic cod', {'AphiaID': 126436, ...})
>>> result = cache.get('worms', 'Atlantic cod')
>>> stats = cache.stats()
>>> print(f"Hit rate: {stats['hit_rate']:.2%}")
Source code in pypath/io/biodata.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
class BiodiversityCache:
    """In-memory LRU cache with TTL for API responses.

    Implements caching with time-to-live for each entry to reduce API load.
    Stores results keyed by (source, identifier) tuples.

    Parameters
    ----------
    maxsize : int
        Maximum number of cached entries
    ttl_seconds : int
        Time-to-live for cached entries in seconds

    Examples
    --------
    >>> cache = BiodiversityCache(maxsize=1000, ttl_seconds=3600)
    >>> cache.set('worms', 'Atlantic cod', {'AphiaID': 126436, ...})
    >>> result = cache.get('worms', 'Atlantic cod')
    >>> stats = cache.stats()
    >>> print(f"Hit rate: {stats['hit_rate']:.2%}")
    """

    def __init__(self, maxsize: int = 1000, ttl_seconds: int = 3600):
        """Initialize cache with size limit and TTL."""
        self._cache: Dict[Tuple[str, str], Tuple[Any, float]] = {}
        self._maxsize = maxsize
        self._ttl = ttl_seconds
        self._hits = 0
        self._misses = 0

    def get(self, source: str, identifier: str) -> Optional[Any]:
        """Get cached value if exists and not expired.

        Parameters
        ----------
        source : str
            Data source ('worms', 'obis', 'fishbase')
        identifier : str
            Unique identifier for the cached item

        Returns
        -------
        Any or None
            Cached value if found and valid, None otherwise
        """
        key = (source, identifier)
        if key in self._cache:
            value, timestamp = self._cache[key]
            if time.time() - timestamp < self._ttl:
                self._hits += 1
                return value
            else:
                # Expired - remove from cache
                del self._cache[key]
        self._misses += 1
        return None

    def set(self, source: str, identifier: str, value: Any):
        """Cache a value with current timestamp.

        Parameters
        ----------
        source : str
            Data source ('worms', 'obis', 'fishbase')
        identifier : str
            Unique identifier for the cached item
        value : Any
            Value to cache
        """
        if len(self._cache) >= self._maxsize:
            # Remove oldest entry (simple LRU)
            if self._cache:
                oldest_key = min(self._cache.items(), key=lambda x: x[1][1])[0]
                del self._cache[oldest_key]
        self._cache[(source, identifier)] = (value, time.time())

    def clear(self):
        """Clear all cached entries and reset statistics."""
        self._cache.clear()
        self._hits = 0
        self._misses = 0

    def stats(self) -> Dict[str, Union[int, float]]:
        """Get cache statistics.

        Returns
        -------
        dict
            Dictionary with 'size', 'hits', 'misses', 'hit_rate'
        """
        total = self._hits + self._misses
        return {
            "size": len(self._cache),
            "hits": self._hits,
            "misses": self._misses,
            "hit_rate": self._hits / total if total > 0 else 0.0,
        }
__init__
__init__(maxsize: int = 1000, ttl_seconds: int = 3600)

Initialize cache with size limit and TTL.

Source code in pypath/io/biodata.py
232
233
234
235
236
237
238
def __init__(self, maxsize: int = 1000, ttl_seconds: int = 3600):
    """Initialize cache with size limit and TTL."""
    self._cache: Dict[Tuple[str, str], Tuple[Any, float]] = {}
    self._maxsize = maxsize
    self._ttl = ttl_seconds
    self._hits = 0
    self._misses = 0
clear
clear()

Clear all cached entries and reset statistics.

Source code in pypath/io/biodata.py
286
287
288
289
290
def clear(self):
    """Clear all cached entries and reset statistics."""
    self._cache.clear()
    self._hits = 0
    self._misses = 0
get
get(source: str, identifier: str) -> Optional[Any]

Get cached value if exists and not expired.

Parameters:

Name Type Description Default
source str

Data source ('worms', 'obis', 'fishbase')

required
identifier str

Unique identifier for the cached item

required

Returns:

Type Description
Any or None

Cached value if found and valid, None otherwise

Source code in pypath/io/biodata.py
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
def get(self, source: str, identifier: str) -> Optional[Any]:
    """Get cached value if exists and not expired.

    Parameters
    ----------
    source : str
        Data source ('worms', 'obis', 'fishbase')
    identifier : str
        Unique identifier for the cached item

    Returns
    -------
    Any or None
        Cached value if found and valid, None otherwise
    """
    key = (source, identifier)
    if key in self._cache:
        value, timestamp = self._cache[key]
        if time.time() - timestamp < self._ttl:
            self._hits += 1
            return value
        else:
            # Expired - remove from cache
            del self._cache[key]
    self._misses += 1
    return None
set
set(source: str, identifier: str, value: Any)

Cache a value with current timestamp.

Parameters:

Name Type Description Default
source str

Data source ('worms', 'obis', 'fishbase')

required
identifier str

Unique identifier for the cached item

required
value Any

Value to cache

required
Source code in pypath/io/biodata.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
def set(self, source: str, identifier: str, value: Any):
    """Cache a value with current timestamp.

    Parameters
    ----------
    source : str
        Data source ('worms', 'obis', 'fishbase')
    identifier : str
        Unique identifier for the cached item
    value : Any
        Value to cache
    """
    if len(self._cache) >= self._maxsize:
        # Remove oldest entry (simple LRU)
        if self._cache:
            oldest_key = min(self._cache.items(), key=lambda x: x[1][1])[0]
            del self._cache[oldest_key]
    self._cache[(source, identifier)] = (value, time.time())
stats
stats() -> Dict[str, Union[int, float]]

Get cache statistics.

Returns:

Type Description
dict

Dictionary with 'size', 'hits', 'misses', 'hit_rate'

Source code in pypath/io/biodata.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
def stats(self) -> Dict[str, Union[int, float]]:
    """Get cache statistics.

    Returns
    -------
    dict
        Dictionary with 'size', 'hits', 'misses', 'hit_rate'
    """
    total = self._hits + self._misses
    return {
        "size": len(self._cache),
        "hits": self._hits,
        "misses": self._misses,
        "hit_rate": self._hits / total if total > 0 else 0.0,
    }

FishBaseTraits dataclass

FishBase ecological trait data.

Attributes:

Name Type Description
species_code int

FishBase species code

trophic_level (float, optional)

Trophic level from ecology table

diet_items list of dict

List of prey items with {'prey': str, 'percentage': float}

growth_params (dict, optional)

Von Bertalanffy growth parameters {'Loo': float, 'K': float, 'to': float}

max_length (float, optional)

Maximum observed length in cm

habitat (str, optional)

Preferred habitat type

Source code in pypath/io/biodata.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
@dataclass
class FishBaseTraits:
    """FishBase ecological trait data.

    Attributes
    ----------
    species_code : int
        FishBase species code
    trophic_level : float, optional
        Trophic level from ecology table
    diet_items : list of dict
        List of prey items with {'prey': str, 'percentage': float}
    growth_params : dict, optional
        Von Bertalanffy growth parameters {'Loo': float, 'K': float, 'to': float}
    max_length : float, optional
        Maximum observed length in cm
    habitat : str, optional
        Preferred habitat type
    """

    species_code: int
    trophic_level: Optional[float] = None
    diet_items: List[Dict[str, Any]] = field(default_factory=list)
    growth_params: Optional[Dict[str, float]] = None
    max_length: Optional[float] = None
    habitat: Optional[str] = None

SpeciesInfo dataclass

Complete species information from all data sources.

Attributes:

Name Type Description
common_name str

Original common/vernacular name queried

scientific_name str

Accepted scientific name from WoRMS

aphia_id int

WoRMS AphiaID

authority str

Taxonomic authority

trophic_level (float, optional)

Trophic level from FishBase

diet_items list of dict, optional

Diet composition from FishBase

growth_params (dict, optional)

VBGF parameters from FishBase

max_length (float, optional)

Maximum length from FishBase

occurrence_count (int, optional)

Number of OBIS occurrence records

depth_range (tuple, optional)

(min_depth, max_depth) from OBIS in meters

geographic_extent (dict, optional)

Bounding box from OBIS

habitat (str, optional)

Habitat preference from FishBase

Source code in pypath/io/biodata.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
@dataclass
class SpeciesInfo:
    """Complete species information from all data sources.

    Attributes
    ----------
    common_name : str
        Original common/vernacular name queried
    scientific_name : str
        Accepted scientific name from WoRMS
    aphia_id : int
        WoRMS AphiaID
    authority : str
        Taxonomic authority
    trophic_level : float, optional
        Trophic level from FishBase
    diet_items : list of dict, optional
        Diet composition from FishBase
    growth_params : dict, optional
        VBGF parameters from FishBase
    max_length : float, optional
        Maximum length from FishBase
    occurrence_count : int, optional
        Number of OBIS occurrence records
    depth_range : tuple, optional
        (min_depth, max_depth) from OBIS in meters
    geographic_extent : dict, optional
        Bounding box from OBIS
    habitat : str, optional
        Habitat preference from FishBase
    """

    common_name: str
    scientific_name: str
    aphia_id: int
    authority: str
    trophic_level: Optional[float] = None
    diet_items: Optional[List[Dict[str, Any]]] = None
    growth_params: Optional[Dict[str, float]] = None
    max_length: Optional[float] = None
    occurrence_count: Optional[int] = None
    depth_range: Optional[Tuple[float, float]] = None
    geographic_extent: Optional[Dict[str, Any]] = None
    habitat: Optional[str] = None

SpeciesNotFoundError

Bases: BiodataError

Raised when species cannot be found in any database.

Source code in pypath/io/biodata.py
106
107
108
109
class SpeciesNotFoundError(BiodataError):
    """Raised when species cannot be found in any database."""

    pass

batch_get_species_info

batch_get_species_info(common_names: List[str], include_occurrences: bool = True, include_traits: bool = True, strict: bool = False, cache: bool = True, max_workers: int = 5, timeout: int = 30) -> pd.DataFrame

Get species information for multiple species in parallel.

Uses ThreadPoolExecutor to fetch data for multiple species concurrently.

Parameters:

Name Type Description Default
common_names list of str

List of common/vernacular names

required
include_occurrences bool

Whether to fetch OBIS occurrence data

True
include_traits bool

Whether to fetch FishBase trait data

True
strict bool

If True, raise on any failure. If False, continue with partial data.

False
cache bool

Whether to use cached results

True
max_workers int

Maximum number of concurrent API requests

5
timeout int

API request timeout per species

30

Returns:

Type Description
DataFrame

DataFrame with one row per species, columns for all retrieved data

Example

from pypath.io.biodata import batch_get_species_info species = ["Atlantic cod", "Herring", "Sprat"] df = batch_get_species_info(species) print(df[['common_name', 'scientific_name', 'trophic_level']])

Source code in pypath/io/biodata.py
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
def batch_get_species_info(
    common_names: List[str],
    include_occurrences: bool = True,
    include_traits: bool = True,
    strict: bool = False,
    cache: bool = True,
    max_workers: int = 5,
    timeout: int = 30,
) -> pd.DataFrame:
    """Get species information for multiple species in parallel.

    Uses ThreadPoolExecutor to fetch data for multiple species concurrently.

    Parameters
    ----------
    common_names : list of str
        List of common/vernacular names
    include_occurrences : bool
        Whether to fetch OBIS occurrence data
    include_traits : bool
        Whether to fetch FishBase trait data
    strict : bool
        If True, raise on any failure. If False, continue with partial data.
    cache : bool
        Whether to use cached results
    max_workers : int
        Maximum number of concurrent API requests
    timeout : int
        API request timeout per species

    Returns
    -------
    pd.DataFrame
        DataFrame with one row per species, columns for all retrieved data

    Example
    -------
    >>> from pypath.io.biodata import batch_get_species_info
    >>> species = ["Atlantic cod", "Herring", "Sprat"]
    >>> df = batch_get_species_info(species)
    >>> print(df[['common_name', 'scientific_name', 'trophic_level']])
    """
    results = []
    errors = []

    def fetch_single(name):
        try:
            return get_species_info(
                name,
                include_occurrences=include_occurrences,
                include_traits=include_traits,
                strict=strict,
                cache=cache,
                timeout=timeout,
            )
        except Exception as e:
            errors.append((name, str(e)))
            return None

    # Fetch in parallel
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_name = {
            executor.submit(fetch_single, name): name for name in common_names
        }

        for future in as_completed(future_to_name):
            result = future.result()
            if result is not None:
                results.append(result)

    # Report errors
    if errors and not results:
        error_msg = "\n".join([f"{name}: {err}" for name, err in errors])
        raise SpeciesNotFoundError(f"Failed to fetch any species:\n{error_msg}")
    elif errors:
        warnings.warn(
            f"Failed to fetch {len(errors)} species: "
            + ", ".join([name for name, _ in errors])
        )

    # Convert to DataFrame
    if not results:
        return pd.DataFrame()

    data = []
    for info in results:
        row = {
            "common_name": info.common_name,
            "scientific_name": info.scientific_name,
            "aphia_id": info.aphia_id,
            "authority": info.authority,
            "trophic_level": info.trophic_level,
            "max_length": info.max_length,
            "occurrence_count": info.occurrence_count,
            "habitat": info.habitat,
        }

        # Add growth params as separate columns
        if info.growth_params:
            row["k"] = info.growth_params.get("K")
            row["loo"] = info.growth_params.get("Loo")
            row["to"] = info.growth_params.get("to")
        else:
            row["k"] = None
            row["loo"] = None
            row["to"] = None

        # Add depth range as separate columns
        if info.depth_range:
            row["min_depth"] = info.depth_range[0]
            row["max_depth"] = info.depth_range[1]
        else:
            row["min_depth"] = None
            row["max_depth"] = None

        # Store diet items as string for now (can be parsed later)
        if info.diet_items:
            row["diet_items"] = str(info.diet_items)
        else:
            row["diet_items"] = None

        data.append(row)

    df = pd.DataFrame(data)
    return df

biodata_to_rpath

biodata_to_rpath(species_data: Union[SpeciesInfo, DataFrame], group_names: Optional[List[str]] = None, biomass_estimates: Optional[Dict[str, float]] = None, area_km2: float = 1000.0) -> RpathParams

Convert biodiversity data to RpathParams format.

Creates an Rpath parameter structure using trait data from biodiversity databases. Follows the ecobase_to_rpath() pattern.

Parameters:

Name Type Description Default
species_data SpeciesInfo or DataFrame

Species information from get_species_info() or batch_get_species_info()

required
group_names list of str

Custom group names. If None, uses scientific names.

None
biomass_estimates dict

Manual biomass estimates {group_name: biomass}. If not provided, uses occurrence density as proxy.

None
area_km2 float

Ecosystem area in km² for biomass normalization

1000.0

Returns:

Type Description
RpathParams

Parameter structure ready for balancing

Example

from pypath.io.biodata import batch_get_species_info, biodata_to_rpath df = batch_get_species_info(["Cod", "Herring", "Sprat"]) params = biodata_to_rpath( ... df, ... biomass_estimates={'Cod': 2.0, 'Herring': 5.0, 'Sprat': 8.0} ... ) from pypath.core.ecopath import rpath balanced = rpath(params)

Notes

Mapping from FishBase/OBIS to Rpath parameters: - PB: Estimated from growth parameter K (VBGF) - QB: Estimated from trophic level and P/B (Palomares & Pauly) - Biomass: From manual estimates or OBIS density - Diet: From FishBase diet composition (simplified) - TL: From FishBase ecology data

Source code in pypath/io/biodata.py
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
def biodata_to_rpath(
    species_data: Union[SpeciesInfo, pd.DataFrame],
    group_names: Optional[List[str]] = None,
    biomass_estimates: Optional[Dict[str, float]] = None,
    area_km2: float = 1000.0,
) -> RpathParams:
    """Convert biodiversity data to RpathParams format.

    Creates an Rpath parameter structure using trait data from
    biodiversity databases. Follows the ecobase_to_rpath() pattern.

    Parameters
    ----------
    species_data : SpeciesInfo or pd.DataFrame
        Species information from get_species_info() or batch_get_species_info()
    group_names : list of str, optional
        Custom group names. If None, uses scientific names.
    biomass_estimates : dict, optional
        Manual biomass estimates {group_name: biomass}.
        If not provided, uses occurrence density as proxy.
    area_km2 : float
        Ecosystem area in km² for biomass normalization

    Returns
    -------
    RpathParams
        Parameter structure ready for balancing

    Example
    -------
    >>> from pypath.io.biodata import batch_get_species_info, biodata_to_rpath
    >>> df = batch_get_species_info(["Cod", "Herring", "Sprat"])
    >>> params = biodata_to_rpath(
    ...     df,
    ...     biomass_estimates={'Cod': 2.0, 'Herring': 5.0, 'Sprat': 8.0}
    ... )
    >>> from pypath.core.ecopath import rpath
    >>> balanced = rpath(params)

    Notes
    -----
    Mapping from FishBase/OBIS to Rpath parameters:
    - PB: Estimated from growth parameter K (VBGF)
    - QB: Estimated from trophic level and P/B (Palomares & Pauly)
    - Biomass: From manual estimates or OBIS density
    - Diet: From FishBase diet composition (simplified)
    - TL: From FishBase ecology data
    """
    # Convert single SpeciesInfo to DataFrame
    if isinstance(species_data, SpeciesInfo):
        species_data = pd.DataFrame(
            [
                {
                    "common_name": species_data.common_name,
                    "scientific_name": species_data.scientific_name,
                    "trophic_level": species_data.trophic_level,
                    "k": (
                        species_data.growth_params.get("K")
                        if species_data.growth_params
                        else None
                    ),
                }
            ]
        )

    if species_data.empty:
        raise ValueError("No species data provided")

    # Use scientific names as default group names
    if group_names is None:
        group_names = species_data["scientific_name"].tolist()

    # All are consumers by default (type=0)
    group_types = [0] * len(group_names)

    # Create basic RpathParams structure
    params = create_rpath_params(groups=group_names, types=group_types)

    # Fill in parameters
    for i, (_, row) in enumerate(species_data.iterrows()):
        group_name = group_names[i] if i < len(group_names) else row["scientific_name"]

        # Biomass
        if biomass_estimates and group_name in biomass_estimates:
            params.model.loc[i, "Biomass"] = biomass_estimates[group_name]
        else:
            # Use occurrence count as proxy (normalized)
            if "occurrence_count" in row and pd.notna(row["occurrence_count"]):
                # Very rough proxy: occurrences per 1000 km²
                proxy_biomass = row["occurrence_count"] / (area_km2 / 1000.0) / 100.0
                params.model.loc[i, "Biomass"] = max(0.01, proxy_biomass)
                warnings.warn(
                    f"Using occurrence-based proxy for {group_name} biomass. "
                    "Provide biomass_estimates for better results."
                )
            else:
                params.model.loc[i, "Biomass"] = np.nan

        # P/B from growth parameter K
        if "k" in row and pd.notna(row["k"]):
            pb = estimate_pb_from_growth(row["k"])
            params.model.loc[i, "PB"] = pb
        else:
            params.model.loc[i, "PB"] = np.nan

        # Q/B from trophic level and P/B
        if "trophic_level" in row and pd.notna(row["trophic_level"]):
            tl = row["trophic_level"]
            pb = params.model.loc[i, "PB"]
            if pd.notna(pb):
                qb = estimate_qb_from_tl_pb(tl, pb)
                params.model.loc[i, "QB"] = qb
            else:
                params.model.loc[i, "QB"] = np.nan
        else:
            params.model.loc[i, "QB"] = np.nan

        # Default unassimilated consumption
        params.model.loc[i, "Unassim"] = 0.2

    # Add a detritus group
    detritus_name = "Detritus"
    det_params = create_rpath_params(
        groups=group_names + [detritus_name], types=group_types + [2]
    )

    # Copy existing data
    for col in params.model.columns:
        if col in det_params.model.columns:
            det_params.model.loc[: len(group_names) - 1, col] = params.model[col].values

    # Set detritus parameters
    det_params.model.loc[len(group_names), "DetInput"] = 1.0

    # Initialize diet matrix (simplified - set to detritus by default)
    # In practice, would use FishBase diet items
    diet_groups = det_params.diet["Group"].tolist()
    if detritus_name in diet_groups:
        det_idx = diet_groups.index(detritus_name)
        for predator in group_names:
            if predator in det_params.diet.columns:
                det_params.diet.loc[det_idx, predator] = 1.0

    warnings.warn(
        "Diet matrix initialized with simple detritus diet. "
        "Use FishBase diet_items data for more accurate diet composition."
    )

    params = det_params
    params.model_name = "Biodiversity Data Model"

    return params

clear_cache

clear_cache()

Clear the global biodiversity data cache.

Example

from pypath.io.biodata import clear_cache clear_cache()

Source code in pypath/io/biodata.py
1275
1276
1277
1278
1279
1280
1281
1282
1283
def clear_cache():
    """Clear the global biodiversity data cache.

    Example
    -------
    >>> from pypath.io.biodata import clear_cache
    >>> clear_cache()
    """
    _biodata_cache.clear()

get_cache_stats

get_cache_stats() -> Dict[str, Union[int, float]]

Get statistics about the global cache.

Returns:

Type Description
dict

Cache statistics including size, hits, misses, hit_rate

Example

from pypath.io.biodata import get_cache_stats stats = get_cache_stats() print(f"Cache hit rate: {stats['hit_rate']:.2%}")

Source code in pypath/io/biodata.py
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
def get_cache_stats() -> Dict[str, Union[int, float]]:
    """Get statistics about the global cache.

    Returns
    -------
    dict
        Cache statistics including size, hits, misses, hit_rate

    Example
    -------
    >>> from pypath.io.biodata import get_cache_stats
    >>> stats = get_cache_stats()
    >>> print(f"Cache hit rate: {stats['hit_rate']:.2%}")
    """
    return _biodata_cache.stats()

get_species_info

get_species_info(common_name: str, include_occurrences: bool = True, include_traits: bool = True, strict: bool = False, cache: bool = True, timeout: int = 30) -> SpeciesInfo

Get comprehensive species information from common name.

Implements the workflow: 1. Search WoRMS vernacular database for common name 2. Get AphiaID and accepted scientific name 3. Query OBIS for occurrence data (if include_occurrences=True) 4. Query FishBase for trait data (if include_traits=True)

Parameters:

Name Type Description Default
common_name str

Common/vernacular name of species (e.g., "Atlantic cod")

required
include_occurrences bool

Whether to fetch OBIS occurrence data

True
include_traits bool

Whether to fetch FishBase trait data

True
strict bool

If True, raise errors on any failure. If False, return partial data.

False
cache bool

Whether to use cached results

True
timeout int

API request timeout in seconds

30

Returns:

Type Description
SpeciesInfo

Dataclass containing all retrieved information

Raises:

Type Description
SpeciesNotFoundError

If species not found in WoRMS (only in strict mode)

AmbiguousSpeciesError

If multiple species match and auto-selection fails

APIConnectionError

If API connection fails (only in strict mode)

Example

from pypath.io.biodata import get_species_info info = get_species_info("Atlantic cod") print(info.scientific_name) 'Gadus morhua' print(info.trophic_level) 4.4 print(f"Found {info.occurrence_count} OBIS records")

Source code in pypath/io/biodata.py
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
def get_species_info(
    common_name: str,
    include_occurrences: bool = True,
    include_traits: bool = True,
    strict: bool = False,
    cache: bool = True,
    timeout: int = 30,
) -> SpeciesInfo:
    """Get comprehensive species information from common name.

    Implements the workflow:
    1. Search WoRMS vernacular database for common name
    2. Get AphiaID and accepted scientific name
    3. Query OBIS for occurrence data (if include_occurrences=True)
    4. Query FishBase for trait data (if include_traits=True)

    Parameters
    ----------
    common_name : str
        Common/vernacular name of species (e.g., "Atlantic cod")
    include_occurrences : bool
        Whether to fetch OBIS occurrence data
    include_traits : bool
        Whether to fetch FishBase trait data
    strict : bool
        If True, raise errors on any failure. If False, return partial data.
    cache : bool
        Whether to use cached results
    timeout : int
        API request timeout in seconds

    Returns
    -------
    SpeciesInfo
        Dataclass containing all retrieved information

    Raises
    ------
    SpeciesNotFoundError
        If species not found in WoRMS (only in strict mode)
    AmbiguousSpeciesError
        If multiple species match and auto-selection fails
    APIConnectionError
        If API connection fails (only in strict mode)

    Example
    -------
    >>> from pypath.io.biodata import get_species_info
    >>> info = get_species_info("Atlantic cod")
    >>> print(info.scientific_name)
    'Gadus morhua'
    >>> print(info.trophic_level)
    4.4
    >>> print(f"Found {info.occurrence_count} OBIS records")
    """
    # Step 1: Search WoRMS by common name
    try:
        matches = _fetch_worms_vernacular(common_name, cache=cache, timeout=timeout)

        # Handle multiple matches
        if len(matches) > 1:
            best_match = _select_best_match(matches, common_name)
        else:
            best_match = matches[0]

        aphia_id = best_match.get("AphiaID")

    except Exception as e:
        if strict:
            raise
        warnings.warn(f"Failed to find species in WoRMS: {e}")
        raise SpeciesNotFoundError(f"Could not find species: {common_name}")

    # Step 2: Get accepted name from AphiaID
    try:
        worms_data = _fetch_worms_accepted(aphia_id, cache=cache, timeout=timeout)
    except Exception as e:
        if strict:
            raise
        warnings.warn(f"Failed to get accepted name: {e}")
        raise APIConnectionError(f"Failed to get accepted name for AphiaID {aphia_id}")

    scientific_name = worms_data.get("scientificname", worms_data.get("valid_name", ""))

    # Step 3: Query OBIS (optional)
    obis_data = None
    if include_occurrences:
        try:
            obis_data = _fetch_obis_occurrences(
                scientific_name, cache=cache, timeout=timeout
            )
        except Exception as e:
            if strict:
                raise
            warnings.warn(f"Failed to fetch OBIS data: {e}")

    # Step 4: Query FishBase (optional)
    fishbase_data = None
    if include_traits:
        try:
            fishbase_data = _fetch_fishbase_traits(
                scientific_name, cache=cache, timeout=timeout
            )
        except Exception as e:
            if strict:
                raise
            warnings.warn(f"Failed to fetch FishBase data: {e}")

    # Step 5: Merge all data
    info = _merge_species_data(
        worms_data=worms_data,
        obis_data=obis_data,
        fishbase_data=fishbase_data,
        common_name=common_name,
    )

    return info

Marine Environmental Data (EMODnet)

pypath.io.marine_data

Marine data clients for EMODnet habitats, bathymetry, and salinity.

Provides: - MarineDataCache: Local file cache for downloaded marine data - EMODnetHabitatsClient: WFS client for EUSeaMap seabed habitats - EMODnetBathymetryClient: WCS client for bathymetry depth grids - SalinityLoader: Load salinity from user-provided files - HabitatPreferenceBuilder: Semi-automatic habitat preference assignment

EMODnetBathymetryClient

WCS client for EMODnet bathymetry depth data.

Parameters:

Name Type Description Default
cache MarineDataCache

Cache instance for storing downloaded data.

required
Source code in pypath/io/marine_data.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
class EMODnetBathymetryClient:
    """WCS client for EMODnet bathymetry depth data.

    Parameters
    ----------
    cache : MarineDataCache
        Cache instance for storing downloaded data.
    """

    def __init__(self, cache: MarineDataCache):
        self._cache = cache

    def fetch_depth(
        self, bbox: tuple[float, float, float, float], resolution: float = 0.002
    ):
        """Fetch depth raster for a bounding box.

        Parameters
        ----------
        bbox : tuple
            (min_lon, min_lat, max_lon, max_lat) in WGS84.
        resolution : float
            Grid resolution in degrees (default ~200m).

        Returns
        -------
        tuple of (np.ndarray, tuple)
            (raster [rows, cols], transform tuple).
        """
        import requests

        cache_key = self._cache.cache_key(
            bbox=bbox, layer="bathymetry", resolution=resolution
        )
        cached = self._cache.get(cache_key)
        if cached is not None:
            return self._read_geotiff_bytes(cached)

        bbox_str = f"{bbox[0]},{bbox[1]},{bbox[2]},{bbox[3]}"
        params = {
            "service": "WCS",
            "version": "1.0.0",
            "request": "GetCoverage",
            "coverage": "emodnet:mean",
            "crs": "EPSG:4326",
            "BBOX": bbox_str,
            "format": "image/tiff",
            "interpolation": "nearest",
            "resx": str(resolution),
            "resy": str(resolution),
        }
        logger.info("Fetching EMODnet bathymetry for bbox %s", bbox)
        resp = requests.get(_EMODNET_BATHYMETRY_WCS, params=params, timeout=120)
        resp.raise_for_status()

        self._cache.put(cache_key, resp.content)
        return self._read_geotiff_bytes(resp.content)

    @staticmethod
    def _read_geotiff_bytes(data: bytes):
        """Read a GeoTIFF from bytes, return (array, transform)."""
        try:
            import rasterio

            with rasterio.open(_io.BytesIO(data)) as src:
                arr = src.read(1).astype(float)
                t = src.transform
                transform = (t.c, t.a, t.b, t.f, t.d, t.e)
                return arr, transform
        except ImportError:
            logger.warning("rasterio not installed; cannot read GeoTIFF")
            raise

    def sample_to_grid(
        self, raster: np.ndarray, transform: tuple, grid: "gpd.GeoDataFrame"
    ) -> np.ndarray:
        """Average raster values within each grid patch.

        Parameters
        ----------
        raster : np.ndarray
            Depth raster [rows, cols].
        transform : tuple
            (x_origin, pixel_width, x_skew, y_origin, y_skew, pixel_height).
        grid : EcospaceGrid
            Target spatial grid.

        Returns
        -------
        np.ndarray
            Mean depth per patch [n_patches].
        """
        x_origin, pixel_width, _, y_origin, _, pixel_height = transform
        rows, cols = raster.shape
        depth = np.zeros(grid.n_patches)

        for i in range(grid.n_patches):
            lon, lat = grid.patch_centroids[i]
            col = int((lon - x_origin) / pixel_width)
            row = int((lat - y_origin) / pixel_height)
            col = max(0, min(col, cols - 1))
            row = max(0, min(row, rows - 1))
            depth[i] = raster[row, col]

        return depth
fetch_depth
fetch_depth(bbox: tuple[float, float, float, float], resolution: float = 0.002)

Fetch depth raster for a bounding box.

Parameters:

Name Type Description Default
bbox tuple

(min_lon, min_lat, max_lon, max_lat) in WGS84.

required
resolution float

Grid resolution in degrees (default ~200m).

0.002

Returns:

Type Description
tuple of (np.ndarray, tuple)

(raster [rows, cols], transform tuple).

Source code in pypath/io/marine_data.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
def fetch_depth(
    self, bbox: tuple[float, float, float, float], resolution: float = 0.002
):
    """Fetch depth raster for a bounding box.

    Parameters
    ----------
    bbox : tuple
        (min_lon, min_lat, max_lon, max_lat) in WGS84.
    resolution : float
        Grid resolution in degrees (default ~200m).

    Returns
    -------
    tuple of (np.ndarray, tuple)
        (raster [rows, cols], transform tuple).
    """
    import requests

    cache_key = self._cache.cache_key(
        bbox=bbox, layer="bathymetry", resolution=resolution
    )
    cached = self._cache.get(cache_key)
    if cached is not None:
        return self._read_geotiff_bytes(cached)

    bbox_str = f"{bbox[0]},{bbox[1]},{bbox[2]},{bbox[3]}"
    params = {
        "service": "WCS",
        "version": "1.0.0",
        "request": "GetCoverage",
        "coverage": "emodnet:mean",
        "crs": "EPSG:4326",
        "BBOX": bbox_str,
        "format": "image/tiff",
        "interpolation": "nearest",
        "resx": str(resolution),
        "resy": str(resolution),
    }
    logger.info("Fetching EMODnet bathymetry for bbox %s", bbox)
    resp = requests.get(_EMODNET_BATHYMETRY_WCS, params=params, timeout=120)
    resp.raise_for_status()

    self._cache.put(cache_key, resp.content)
    return self._read_geotiff_bytes(resp.content)
sample_to_grid
sample_to_grid(raster: ndarray, transform: tuple, grid: 'gpd.GeoDataFrame') -> np.ndarray

Average raster values within each grid patch.

Parameters:

Name Type Description Default
raster ndarray

Depth raster [rows, cols].

required
transform tuple

(x_origin, pixel_width, x_skew, y_origin, y_skew, pixel_height).

required
grid EcospaceGrid

Target spatial grid.

required

Returns:

Type Description
ndarray

Mean depth per patch [n_patches].

Source code in pypath/io/marine_data.py
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
def sample_to_grid(
    self, raster: np.ndarray, transform: tuple, grid: "gpd.GeoDataFrame"
) -> np.ndarray:
    """Average raster values within each grid patch.

    Parameters
    ----------
    raster : np.ndarray
        Depth raster [rows, cols].
    transform : tuple
        (x_origin, pixel_width, x_skew, y_origin, y_skew, pixel_height).
    grid : EcospaceGrid
        Target spatial grid.

    Returns
    -------
    np.ndarray
        Mean depth per patch [n_patches].
    """
    x_origin, pixel_width, _, y_origin, _, pixel_height = transform
    rows, cols = raster.shape
    depth = np.zeros(grid.n_patches)

    for i in range(grid.n_patches):
        lon, lat = grid.patch_centroids[i]
        col = int((lon - x_origin) / pixel_width)
        row = int((lat - y_origin) / pixel_height)
        col = max(0, min(col, cols - 1))
        row = max(0, min(row, rows - 1))
        depth[i] = raster[row, col]

    return depth

EMODnetHabitatsClient

WFS client for EMODnet EUSeaMap seabed habitats.

Parameters:

Name Type Description Default
cache MarineDataCache

Cache instance for storing downloaded data.

required
Source code in pypath/io/marine_data.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
class EMODnetHabitatsClient:
    """WFS client for EMODnet EUSeaMap seabed habitats.

    Parameters
    ----------
    cache : MarineDataCache
        Cache instance for storing downloaded data.
    """

    def __init__(self, cache: MarineDataCache):
        self._cache = cache

    def fetch_euseamap(
        self, bbox: tuple[float, float, float, float], eunis_level: int = 3
    ):
        """Fetch EUSeaMap habitat polygons within a bounding box.

        Parameters
        ----------
        bbox : tuple
            (min_lon, min_lat, max_lon, max_lat) in WGS84.
        eunis_level : int
            EUNIS classification level (default 3).

        Returns
        -------
        geopandas.GeoDataFrame
            Habitat polygons with EUNIS classification columns.
        """
        import geopandas as gpd
        import requests

        cache_key = self._cache.cache_key(
            bbox=bbox, layer="euseamap", eunis_level=eunis_level
        )
        cached = self._cache.get(cache_key)
        if cached is not None:
            return gpd.read_file(_io.BytesIO(cached))

        bbox_str = f"{bbox[1]},{bbox[0]},{bbox[3]},{bbox[2]}"
        params = {
            "service": "WFS",
            "version": "2.0.0",
            "request": "GetFeature",
            "typeName": "emodnet_view:euseamap_2023",
            "outputFormat": "application/json",
            "bbox": bbox_str,
            "srsName": "EPSG:4326",
        }
        logger.info("Fetching EMODnet habitats for bbox %s", bbox)
        resp = requests.get(_EMODNET_HABITATS_WFS, params=params, timeout=120)
        resp.raise_for_status()

        self._cache.put(cache_key, resp.content)
        gdf = gpd.read_file(_io.BytesIO(resp.content))
        logger.info("Downloaded %d habitat features", len(gdf))
        return gdf

    def rasterize_habitats(
        self, gdf: "gpd.GeoDataFrame", grid: "gpd.GeoDataFrame"
    ) -> np.ndarray:
        """Assign majority EUNIS habitat class to each grid patch.

        Parameters
        ----------
        gdf : geopandas.GeoDataFrame
            Habitat polygons with 'EUNIScomb' column.
        grid : EcospaceGrid
            Target spatial grid.

        Returns
        -------
        np.ndarray
            EUNIS code per patch [n_patches], dtype=object.
        """
        from shapely.geometry import Point

        habitat_per_patch = np.empty(grid.n_patches, dtype=object)
        habitat_per_patch[:] = "unknown"

        if gdf.empty:
            return habitat_per_patch

        for i in range(grid.n_patches):
            centroid = Point(grid.patch_centroids[i, 0], grid.patch_centroids[i, 1])
            within = gdf[gdf.geometry.contains(centroid)]
            if not within.empty:
                habitat_per_patch[i] = within.iloc[0]["EUNIScomb"]
            else:
                nearest = gdf.geometry.distance(centroid)
                if len(nearest) > 0:
                    habitat_per_patch[i] = gdf.iloc[nearest.idxmin()]["EUNIScomb"]

        return habitat_per_patch

    @staticmethod
    def get_habitat_types(gdf: "gpd.GeoDataFrame", level: int = 3) -> list:
        """Extract unique EUNIS codes truncated to requested level.

        Parameters
        ----------
        gdf : geopandas.GeoDataFrame
            Habitat polygons with 'EUNIScomb' column.
        level : int
            EUNIS hierarchy level (e.g., 3 means 'A5.2').

        Returns
        -------
        list of str
            Sorted unique EUNIS codes at the requested level.
        """
        codes = gdf["EUNIScomb"].dropna().unique()
        truncated = set()
        for code in codes:
            parts = code.split(".")
            if level <= 1:
                truncated.add(parts[0][:1])
            elif level == 2:
                truncated.add(parts[0])
            else:
                # Level 3+: keep first part + "." + first (level-2) chars
                if len(parts) >= 2:
                    sub = parts[1]
                    keep = min(level - 2, len(sub))
                    truncated.add(f"{parts[0]}.{sub[:keep]}")
                else:
                    truncated.add(parts[0])
        return sorted(truncated)
fetch_euseamap
fetch_euseamap(bbox: tuple[float, float, float, float], eunis_level: int = 3)

Fetch EUSeaMap habitat polygons within a bounding box.

Parameters:

Name Type Description Default
bbox tuple

(min_lon, min_lat, max_lon, max_lat) in WGS84.

required
eunis_level int

EUNIS classification level (default 3).

3

Returns:

Type Description
GeoDataFrame

Habitat polygons with EUNIS classification columns.

Source code in pypath/io/marine_data.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def fetch_euseamap(
    self, bbox: tuple[float, float, float, float], eunis_level: int = 3
):
    """Fetch EUSeaMap habitat polygons within a bounding box.

    Parameters
    ----------
    bbox : tuple
        (min_lon, min_lat, max_lon, max_lat) in WGS84.
    eunis_level : int
        EUNIS classification level (default 3).

    Returns
    -------
    geopandas.GeoDataFrame
        Habitat polygons with EUNIS classification columns.
    """
    import geopandas as gpd
    import requests

    cache_key = self._cache.cache_key(
        bbox=bbox, layer="euseamap", eunis_level=eunis_level
    )
    cached = self._cache.get(cache_key)
    if cached is not None:
        return gpd.read_file(_io.BytesIO(cached))

    bbox_str = f"{bbox[1]},{bbox[0]},{bbox[3]},{bbox[2]}"
    params = {
        "service": "WFS",
        "version": "2.0.0",
        "request": "GetFeature",
        "typeName": "emodnet_view:euseamap_2023",
        "outputFormat": "application/json",
        "bbox": bbox_str,
        "srsName": "EPSG:4326",
    }
    logger.info("Fetching EMODnet habitats for bbox %s", bbox)
    resp = requests.get(_EMODNET_HABITATS_WFS, params=params, timeout=120)
    resp.raise_for_status()

    self._cache.put(cache_key, resp.content)
    gdf = gpd.read_file(_io.BytesIO(resp.content))
    logger.info("Downloaded %d habitat features", len(gdf))
    return gdf
get_habitat_types staticmethod
get_habitat_types(gdf: 'gpd.GeoDataFrame', level: int = 3) -> list

Extract unique EUNIS codes truncated to requested level.

Parameters:

Name Type Description Default
gdf GeoDataFrame

Habitat polygons with 'EUNIScomb' column.

required
level int

EUNIS hierarchy level (e.g., 3 means 'A5.2').

3

Returns:

Type Description
list of str

Sorted unique EUNIS codes at the requested level.

Source code in pypath/io/marine_data.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
@staticmethod
def get_habitat_types(gdf: "gpd.GeoDataFrame", level: int = 3) -> list:
    """Extract unique EUNIS codes truncated to requested level.

    Parameters
    ----------
    gdf : geopandas.GeoDataFrame
        Habitat polygons with 'EUNIScomb' column.
    level : int
        EUNIS hierarchy level (e.g., 3 means 'A5.2').

    Returns
    -------
    list of str
        Sorted unique EUNIS codes at the requested level.
    """
    codes = gdf["EUNIScomb"].dropna().unique()
    truncated = set()
    for code in codes:
        parts = code.split(".")
        if level <= 1:
            truncated.add(parts[0][:1])
        elif level == 2:
            truncated.add(parts[0])
        else:
            # Level 3+: keep first part + "." + first (level-2) chars
            if len(parts) >= 2:
                sub = parts[1]
                keep = min(level - 2, len(sub))
                truncated.add(f"{parts[0]}.{sub[:keep]}")
            else:
                truncated.add(parts[0])
    return sorted(truncated)
rasterize_habitats
rasterize_habitats(gdf: 'gpd.GeoDataFrame', grid: 'gpd.GeoDataFrame') -> np.ndarray

Assign majority EUNIS habitat class to each grid patch.

Parameters:

Name Type Description Default
gdf GeoDataFrame

Habitat polygons with 'EUNIScomb' column.

required
grid EcospaceGrid

Target spatial grid.

required

Returns:

Type Description
ndarray

EUNIS code per patch [n_patches], dtype=object.

Source code in pypath/io/marine_data.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def rasterize_habitats(
    self, gdf: "gpd.GeoDataFrame", grid: "gpd.GeoDataFrame"
) -> np.ndarray:
    """Assign majority EUNIS habitat class to each grid patch.

    Parameters
    ----------
    gdf : geopandas.GeoDataFrame
        Habitat polygons with 'EUNIScomb' column.
    grid : EcospaceGrid
        Target spatial grid.

    Returns
    -------
    np.ndarray
        EUNIS code per patch [n_patches], dtype=object.
    """
    from shapely.geometry import Point

    habitat_per_patch = np.empty(grid.n_patches, dtype=object)
    habitat_per_patch[:] = "unknown"

    if gdf.empty:
        return habitat_per_patch

    for i in range(grid.n_patches):
        centroid = Point(grid.patch_centroids[i, 0], grid.patch_centroids[i, 1])
        within = gdf[gdf.geometry.contains(centroid)]
        if not within.empty:
            habitat_per_patch[i] = within.iloc[0]["EUNIScomb"]
        else:
            nearest = gdf.geometry.distance(centroid)
            if len(nearest) > 0:
                habitat_per_patch[i] = gdf.iloc[nearest.idxmin()]["EUNIScomb"]

    return habitat_per_patch

HabitatPreferenceBuilder

Build habitat preference matrices for ecospace models.

Source code in pypath/io/marine_data.py
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
class HabitatPreferenceBuilder:
    """Build habitat preference matrices for ecospace models."""

    def apply_preset(
        self, n_groups: int, habitat_types: list, preset: str
    ) -> np.ndarray:
        """Apply a preset preference pattern.

        Parameters
        ----------
        n_groups : int
            Number of species groups.
        habitat_types : list of str
            Unique habitat type codes.
        preset : str
            One of 'pelagic', 'demersal', 'benthic'.

        Returns
        -------
        np.ndarray
            Preference matrix [n_groups, n_habitat_types], values 0-1.
        """
        n_types = len(habitat_types)
        if preset == "pelagic":
            return np.ones((n_groups, n_types))
        elif preset == "benthic":
            prefs = np.full((n_groups, n_types), 0.2)
            for g in range(n_groups):
                primary = g % n_types
                prefs[g, primary] = 1.0
            return prefs
        elif preset == "demersal":
            return np.full((n_groups, n_types), 0.6)
        else:
            raise ValueError(f"Unknown preset: {preset}")

    def suggest_preferences(
        self,
        group_names: list,
        habitat_types: list,
        depth_per_patch: Optional[np.ndarray] = None,
    ):
        """Auto-suggest preferences using biodata lookups.

        Parameters
        ----------
        group_names : list of str
            Species/group names from the Ecopath model.
        habitat_types : list of str
            Unique EUNIS habitat type codes.
        depth_per_patch : np.ndarray, optional
            Depth values per patch for depth-based suggestions.

        Returns
        -------
        np.ndarray
            Suggested preference matrix [n_groups, n_habitat_types].
        """
        n_groups = len(group_names)
        n_types = len(habitat_types)
        prefs = np.ones((n_groups, n_types)) * 0.5  # default moderate

        for g, name in enumerate(group_names):
            try:
                from pypath.io.biodata import get_species_info

                info = get_species_info(name)
                if info and hasattr(info, "traits") and info.traits:
                    if info.traits.depth_range_shallow is not None:
                        for t, htype in enumerate(habitat_types):
                            if htype.startswith("A5"):
                                prefs[g, t] = 0.8
                            elif htype.startswith("A6"):
                                if (
                                    info.traits.depth_range_deep
                                    and info.traits.depth_range_deep > 200
                                ):
                                    prefs[g, t] = 0.7
                                else:
                                    prefs[g, t] = 0.2
            except Exception as e:
                logger.debug("Biodata lookup failed for %s: %s", name, e)

        return prefs

    @staticmethod
    def build_preference_matrix(
        prefs_by_type: np.ndarray,
        habitat_types: list,
        habitat_map: np.ndarray,
        grid,
    ) -> np.ndarray:
        """Convert habitat-type preferences to per-patch preferences.

        Parameters
        ----------
        prefs_by_type : np.ndarray
            Preference per habitat type [n_groups, n_habitat_types].
        habitat_types : list of str
            Ordered habitat type codes matching prefs_by_type columns.
        habitat_map : np.ndarray
            EUNIS code per patch [n_patches], dtype=object.
        grid : EcospaceGrid
            Target spatial grid.

        Returns
        -------
        np.ndarray
            Preference matrix [n_groups, n_patches].
        """
        n_groups = prefs_by_type.shape[0]
        type_to_idx = {t: i for i, t in enumerate(habitat_types)}
        matrix = np.full((n_groups, grid.n_patches), 0.5)

        for p in range(grid.n_patches):
            htype = habitat_map[p]
            matched = False
            for t, code in enumerate(habitat_types):
                if htype.startswith(code) or code.startswith(htype):
                    matrix[:, p] = prefs_by_type[:, t]
                    matched = True
                    break
            if not matched and htype in type_to_idx:
                matrix[:, p] = prefs_by_type[:, type_to_idx[htype]]

        return matrix
apply_preset
apply_preset(n_groups: int, habitat_types: list, preset: str) -> np.ndarray

Apply a preset preference pattern.

Parameters:

Name Type Description Default
n_groups int

Number of species groups.

required
habitat_types list of str

Unique habitat type codes.

required
preset str

One of 'pelagic', 'demersal', 'benthic'.

required

Returns:

Type Description
ndarray

Preference matrix [n_groups, n_habitat_types], values 0-1.

Source code in pypath/io/marine_data.py
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
def apply_preset(
    self, n_groups: int, habitat_types: list, preset: str
) -> np.ndarray:
    """Apply a preset preference pattern.

    Parameters
    ----------
    n_groups : int
        Number of species groups.
    habitat_types : list of str
        Unique habitat type codes.
    preset : str
        One of 'pelagic', 'demersal', 'benthic'.

    Returns
    -------
    np.ndarray
        Preference matrix [n_groups, n_habitat_types], values 0-1.
    """
    n_types = len(habitat_types)
    if preset == "pelagic":
        return np.ones((n_groups, n_types))
    elif preset == "benthic":
        prefs = np.full((n_groups, n_types), 0.2)
        for g in range(n_groups):
            primary = g % n_types
            prefs[g, primary] = 1.0
        return prefs
    elif preset == "demersal":
        return np.full((n_groups, n_types), 0.6)
    else:
        raise ValueError(f"Unknown preset: {preset}")
build_preference_matrix staticmethod
build_preference_matrix(prefs_by_type: ndarray, habitat_types: list, habitat_map: ndarray, grid) -> np.ndarray

Convert habitat-type preferences to per-patch preferences.

Parameters:

Name Type Description Default
prefs_by_type ndarray

Preference per habitat type [n_groups, n_habitat_types].

required
habitat_types list of str

Ordered habitat type codes matching prefs_by_type columns.

required
habitat_map ndarray

EUNIS code per patch [n_patches], dtype=object.

required
grid EcospaceGrid

Target spatial grid.

required

Returns:

Type Description
ndarray

Preference matrix [n_groups, n_patches].

Source code in pypath/io/marine_data.py
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
@staticmethod
def build_preference_matrix(
    prefs_by_type: np.ndarray,
    habitat_types: list,
    habitat_map: np.ndarray,
    grid,
) -> np.ndarray:
    """Convert habitat-type preferences to per-patch preferences.

    Parameters
    ----------
    prefs_by_type : np.ndarray
        Preference per habitat type [n_groups, n_habitat_types].
    habitat_types : list of str
        Ordered habitat type codes matching prefs_by_type columns.
    habitat_map : np.ndarray
        EUNIS code per patch [n_patches], dtype=object.
    grid : EcospaceGrid
        Target spatial grid.

    Returns
    -------
    np.ndarray
        Preference matrix [n_groups, n_patches].
    """
    n_groups = prefs_by_type.shape[0]
    type_to_idx = {t: i for i, t in enumerate(habitat_types)}
    matrix = np.full((n_groups, grid.n_patches), 0.5)

    for p in range(grid.n_patches):
        htype = habitat_map[p]
        matched = False
        for t, code in enumerate(habitat_types):
            if htype.startswith(code) or code.startswith(htype):
                matrix[:, p] = prefs_by_type[:, t]
                matched = True
                break
        if not matched and htype in type_to_idx:
            matrix[:, p] = prefs_by_type[:, type_to_idx[htype]]

    return matrix
suggest_preferences
suggest_preferences(group_names: list, habitat_types: list, depth_per_patch: Optional[ndarray] = None)

Auto-suggest preferences using biodata lookups.

Parameters:

Name Type Description Default
group_names list of str

Species/group names from the Ecopath model.

required
habitat_types list of str

Unique EUNIS habitat type codes.

required
depth_per_patch ndarray

Depth values per patch for depth-based suggestions.

None

Returns:

Type Description
ndarray

Suggested preference matrix [n_groups, n_habitat_types].

Source code in pypath/io/marine_data.py
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
def suggest_preferences(
    self,
    group_names: list,
    habitat_types: list,
    depth_per_patch: Optional[np.ndarray] = None,
):
    """Auto-suggest preferences using biodata lookups.

    Parameters
    ----------
    group_names : list of str
        Species/group names from the Ecopath model.
    habitat_types : list of str
        Unique EUNIS habitat type codes.
    depth_per_patch : np.ndarray, optional
        Depth values per patch for depth-based suggestions.

    Returns
    -------
    np.ndarray
        Suggested preference matrix [n_groups, n_habitat_types].
    """
    n_groups = len(group_names)
    n_types = len(habitat_types)
    prefs = np.ones((n_groups, n_types)) * 0.5  # default moderate

    for g, name in enumerate(group_names):
        try:
            from pypath.io.biodata import get_species_info

            info = get_species_info(name)
            if info and hasattr(info, "traits") and info.traits:
                if info.traits.depth_range_shallow is not None:
                    for t, htype in enumerate(habitat_types):
                        if htype.startswith("A5"):
                            prefs[g, t] = 0.8
                        elif htype.startswith("A6"):
                            if (
                                info.traits.depth_range_deep
                                and info.traits.depth_range_deep > 200
                            ):
                                prefs[g, t] = 0.7
                            else:
                                prefs[g, t] = 0.2
        except Exception as e:
            logger.debug("Biodata lookup failed for %s: %s", name, e)

    return prefs

MarineDataCache

Local file cache for marine data downloads.

Parameters:

Name Type Description Default
cache_dir str or Path

Directory for cached files. Created if it doesn't exist.

None
Source code in pypath/io/marine_data.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class MarineDataCache:
    """Local file cache for marine data downloads.

    Parameters
    ----------
    cache_dir : str or Path
        Directory for cached files. Created if it doesn't exist.
    """

    def __init__(self, cache_dir: Optional[str] = None):
        self._cache_dir = Path(cache_dir) if cache_dir else _DEFAULT_CACHE_DIR

    def get(self, key: str) -> Optional[bytes]:
        """Retrieve cached data by key. Returns None on cache miss."""
        path = self._cache_dir / key
        if path.exists():
            logger.debug("Cache hit: %s", key)
            return path.read_bytes()
        return None

    def put(self, key: str, data: bytes) -> None:
        """Store data in cache."""
        self._cache_dir.mkdir(parents=True, exist_ok=True)
        path = self._cache_dir / key
        path.write_bytes(data)
        logger.debug("Cached: %s (%d bytes)", key, len(data))

    @staticmethod
    def cache_key(bbox: tuple[float, float, float, float], layer: str, **kwargs) -> str:
        """Generate deterministic cache key from parameters."""
        parts = {"bbox": list(bbox), "layer": layer, **kwargs}
        raw = json.dumps(parts, sort_keys=True)
        return hashlib.sha256(raw.encode()).hexdigest()
cache_key staticmethod
cache_key(bbox: tuple[float, float, float, float], layer: str, **kwargs) -> str

Generate deterministic cache key from parameters.

Source code in pypath/io/marine_data.py
59
60
61
62
63
64
@staticmethod
def cache_key(bbox: tuple[float, float, float, float], layer: str, **kwargs) -> str:
    """Generate deterministic cache key from parameters."""
    parts = {"bbox": list(bbox), "layer": layer, **kwargs}
    raw = json.dumps(parts, sort_keys=True)
    return hashlib.sha256(raw.encode()).hexdigest()
get
get(key: str) -> Optional[bytes]

Retrieve cached data by key. Returns None on cache miss.

Source code in pypath/io/marine_data.py
44
45
46
47
48
49
50
def get(self, key: str) -> Optional[bytes]:
    """Retrieve cached data by key. Returns None on cache miss."""
    path = self._cache_dir / key
    if path.exists():
        logger.debug("Cache hit: %s", key)
        return path.read_bytes()
    return None
put
put(key: str, data: bytes) -> None

Store data in cache.

Source code in pypath/io/marine_data.py
52
53
54
55
56
57
def put(self, key: str, data: bytes) -> None:
    """Store data in cache."""
    self._cache_dir.mkdir(parents=True, exist_ok=True)
    path = self._cache_dir / key
    path.write_bytes(data)
    logger.debug("Cached: %s (%d bytes)", key, len(data))

SalinityLoader

Load salinity data from user-provided files.

Source code in pypath/io/marine_data.py
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
class SalinityLoader:
    """Load salinity data from user-provided files."""

    @staticmethod
    def load_from_csv(filepath: str, grid) -> "EnvironmentalLayer":
        """Load salinity from CSV with lon, lat, salinity columns.

        Parameters
        ----------
        filepath : str
            Path to CSV file with columns: lon, lat, salinity.
        grid : EcospaceGrid
            Target spatial grid for nearest-neighbor sampling.

        Returns
        -------
        EnvironmentalLayer
            Salinity values sampled onto the grid patches.
        """
        import pandas as pd

        from pypath.spatial.environmental import EnvironmentalLayer

        resolved = Path(filepath).resolve()
        if not resolved.is_file():
            raise FileNotFoundError(f"Salinity CSV not found: {resolved}")
        df = pd.read_csv(resolved)
        required = {"lon", "lat", "salinity"}
        if not required.issubset(df.columns):
            raise ValueError(
                f"CSV must have columns: {required}, got: {set(df.columns)}"
            )

        values = np.zeros(grid.n_patches)
        for i in range(grid.n_patches):
            lon, lat = grid.patch_centroids[i]
            dists = (df["lon"] - lon) ** 2 + (df["lat"] - lat) ** 2
            values[i] = df.loc[dists.idxmin(), "salinity"]

        return EnvironmentalLayer(name="salinity", units="PSU", values=values)

    @staticmethod
    def load_from_netcdf(
        filepath: str, grid, variable: str = "so"
    ) -> "EnvironmentalLayer":
        """Load salinity from NetCDF.

        Parameters
        ----------
        filepath : str
            Path to NetCDF file.
        grid : EcospaceGrid
            Target spatial grid.
        variable : str
            NetCDF variable name for salinity (default: 'so').

        Returns
        -------
        EnvironmentalLayer
            Salinity values sampled onto the grid patches.
        """
        try:
            import xarray as xr
        except ImportError:
            raise ImportError(
                "xarray required for NetCDF support: pip install xarray netCDF4"
            )

        from pypath.spatial.environmental import EnvironmentalLayer

        ds = xr.open_dataset(filepath)
        sal = ds[variable]

        # Handle time dimension: take mean if present
        if "time" in sal.dims:
            sal = sal.mean(dim="time")
        # Handle depth dimension: take surface layer
        for dim in ["depth", "lev", "z"]:
            if dim in sal.dims:
                sal = sal.isel({dim: 0})

        values = np.zeros(grid.n_patches)
        lons = sal.coords[_find_coord(sal, "lon")].values
        lats = sal.coords[_find_coord(sal, "lat")].values

        for i in range(grid.n_patches):
            plon, plat = grid.patch_centroids[i]
            lon_idx = np.argmin(np.abs(lons - plon))
            lat_idx = np.argmin(np.abs(lats - plat))
            values[i] = float(sal.values[lat_idx, lon_idx])

        ds.close()
        return EnvironmentalLayer(name="salinity", units="PSU", values=values)
load_from_csv staticmethod
load_from_csv(filepath: str, grid) -> 'EnvironmentalLayer'

Load salinity from CSV with lon, lat, salinity columns.

Parameters:

Name Type Description Default
filepath str

Path to CSV file with columns: lon, lat, salinity.

required
grid EcospaceGrid

Target spatial grid for nearest-neighbor sampling.

required

Returns:

Type Description
EnvironmentalLayer

Salinity values sampled onto the grid patches.

Source code in pypath/io/marine_data.py
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
@staticmethod
def load_from_csv(filepath: str, grid) -> "EnvironmentalLayer":
    """Load salinity from CSV with lon, lat, salinity columns.

    Parameters
    ----------
    filepath : str
        Path to CSV file with columns: lon, lat, salinity.
    grid : EcospaceGrid
        Target spatial grid for nearest-neighbor sampling.

    Returns
    -------
    EnvironmentalLayer
        Salinity values sampled onto the grid patches.
    """
    import pandas as pd

    from pypath.spatial.environmental import EnvironmentalLayer

    resolved = Path(filepath).resolve()
    if not resolved.is_file():
        raise FileNotFoundError(f"Salinity CSV not found: {resolved}")
    df = pd.read_csv(resolved)
    required = {"lon", "lat", "salinity"}
    if not required.issubset(df.columns):
        raise ValueError(
            f"CSV must have columns: {required}, got: {set(df.columns)}"
        )

    values = np.zeros(grid.n_patches)
    for i in range(grid.n_patches):
        lon, lat = grid.patch_centroids[i]
        dists = (df["lon"] - lon) ** 2 + (df["lat"] - lat) ** 2
        values[i] = df.loc[dists.idxmin(), "salinity"]

    return EnvironmentalLayer(name="salinity", units="PSU", values=values)
load_from_netcdf staticmethod
load_from_netcdf(filepath: str, grid, variable: str = 'so') -> 'EnvironmentalLayer'

Load salinity from NetCDF.

Parameters:

Name Type Description Default
filepath str

Path to NetCDF file.

required
grid EcospaceGrid

Target spatial grid.

required
variable str

NetCDF variable name for salinity (default: 'so').

'so'

Returns:

Type Description
EnvironmentalLayer

Salinity values sampled onto the grid patches.

Source code in pypath/io/marine_data.py
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
@staticmethod
def load_from_netcdf(
    filepath: str, grid, variable: str = "so"
) -> "EnvironmentalLayer":
    """Load salinity from NetCDF.

    Parameters
    ----------
    filepath : str
        Path to NetCDF file.
    grid : EcospaceGrid
        Target spatial grid.
    variable : str
        NetCDF variable name for salinity (default: 'so').

    Returns
    -------
    EnvironmentalLayer
        Salinity values sampled onto the grid patches.
    """
    try:
        import xarray as xr
    except ImportError:
        raise ImportError(
            "xarray required for NetCDF support: pip install xarray netCDF4"
        )

    from pypath.spatial.environmental import EnvironmentalLayer

    ds = xr.open_dataset(filepath)
    sal = ds[variable]

    # Handle time dimension: take mean if present
    if "time" in sal.dims:
        sal = sal.mean(dim="time")
    # Handle depth dimension: take surface layer
    for dim in ["depth", "lev", "z"]:
        if dim in sal.dims:
            sal = sal.isel({dim: 0})

    values = np.zeros(grid.n_patches)
    lons = sal.coords[_find_coord(sal, "lon")].values
    lats = sal.coords[_find_coord(sal, "lat")].values

    for i in range(grid.n_patches):
        plon, plat = grid.patch_centroids[i]
        lon_idx = np.argmin(np.abs(lons - plon))
        lat_idx = np.argmin(np.abs(lats - plat))
        values[i] = float(sal.values[lat_idx, lon_idx])

    ds.close()
    return EnvironmentalLayer(name="salinity", units="PSU", values=values)

Utilities

pypath.io.utils

Shared utilities for PyPath I/O modules.

This module provides common helper functions used across multiple I/O modules (biodata, ecobase, ewemdb) to avoid code duplication and ensure consistency.

Functions:

Name Description
- safe_float
- fetch_url

estimate_pb_from_growth

estimate_pb_from_growth(k: float, max_age: Optional[float] = None) -> float

Estimate P/B ratio from von Bertalanffy growth parameter K.

Uses the empirical relationship that P/B is approximately proportional to the growth coefficient K from the von Bertalanffy growth function.

Parameters:

Name Type Description Default
k float

Von Bertalanffy growth coefficient K (1/year)

required
max_age float

Maximum age in years. If provided, uses Z/K ratio method. If None, uses simple approximation P/B ≈ 2.5 * K.

None

Returns:

Type Description
float

Estimated P/B ratio (1/year)

Notes

Based on Brey (2001) and Pauly (1980) empirical relationships between growth parameters and production rates.

References
  • Brey, T. (2001). Population dynamics in benthic invertebrates. A virtual handbook. http://www.thomas-brey.de/science/virtualhandbook
  • Pauly, D. (1980). On the interrelationships between natural mortality, growth parameters, and mean environmental temperature in 175 fish stocks. ICES Journal of Marine Science, 39(2), 175-192.
Source code in pypath/io/utils.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
def estimate_pb_from_growth(k: float, max_age: Optional[float] = None) -> float:
    """Estimate P/B ratio from von Bertalanffy growth parameter K.

    Uses the empirical relationship that P/B is approximately proportional
    to the growth coefficient K from the von Bertalanffy growth function.

    Parameters
    ----------
    k : float
        Von Bertalanffy growth coefficient K (1/year)
    max_age : float, optional
        Maximum age in years. If provided, uses Z/K ratio method.
        If None, uses simple approximation P/B ≈ 2.5 * K.

    Returns
    -------
    float
        Estimated P/B ratio (1/year)

    Notes
    -----
    Based on Brey (2001) and Pauly (1980) empirical relationships between
    growth parameters and production rates.

    References
    ----------
    - Brey, T. (2001). Population dynamics in benthic invertebrates.
      A virtual handbook. http://www.thomas-brey.de/science/virtualhandbook
    - Pauly, D. (1980). On the interrelationships between natural mortality,
      growth parameters, and mean environmental temperature in 175 fish stocks.
      ICES Journal of Marine Science, 39(2), 175-192.
    """
    if max_age is not None:
        # Z/K method (Pauly 1980)
        z = 1.5 * k  # Empirical Z estimate
        return z
    else:
        # Simple approximation
        return k * 2.5

estimate_qb_from_tl_pb

estimate_qb_from_tl_pb(trophic_level: float, pb: float) -> float

Estimate Q/B ratio from trophic level and P/B ratio.

Uses the empirical relationship from Palomares & Pauly (1998) relating consumption rates to trophic level and production rates.

Parameters:

Name Type Description Default
trophic_level float

Trophic level (typically 2.0 to 5.0 for consumers)

required
pb float

Production/Biomass ratio (1/year)

required

Returns:

Type Description
float

Estimated Q/B ratio (1/year)

Notes

The relationship assumes: - Higher trophic levels have lower assimilation efficiency - Q/B scales with P/B but modified by trophic efficiency - Typical P/Q ratios: 0.1-0.3 for fish, 0.2-0.4 for invertebrates

References

Palomares, M.L.D. & Pauly, D. (1998). Predicting food consumption of fish populations as functions of mortality, food type, morphometrics, temperature and salinity. Marine and Freshwater Research, 49, 447-453.

Source code in pypath/io/utils.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def estimate_qb_from_tl_pb(trophic_level: float, pb: float) -> float:
    """Estimate Q/B ratio from trophic level and P/B ratio.

    Uses the empirical relationship from Palomares & Pauly (1998) relating
    consumption rates to trophic level and production rates.

    Parameters
    ----------
    trophic_level : float
        Trophic level (typically 2.0 to 5.0 for consumers)
    pb : float
        Production/Biomass ratio (1/year)

    Returns
    -------
    float
        Estimated Q/B ratio (1/year)

    Notes
    -----
    The relationship assumes:
    - Higher trophic levels have lower assimilation efficiency
    - Q/B scales with P/B but modified by trophic efficiency
    - Typical P/Q ratios: 0.1-0.3 for fish, 0.2-0.4 for invertebrates

    References
    ----------
    Palomares, M.L.D. & Pauly, D. (1998). Predicting food consumption of
    fish populations as functions of mortality, food type, morphometrics,
    temperature and salinity. Marine and Freshwater Research, 49, 447-453.
    """
    # Empirical relationship: Q/B increases with TL
    # Typical P/Q for fish: 0.15-0.25
    if trophic_level < 2.0:
        # Primary producers/detritus - not applicable
        return pb * 10.0
    elif trophic_level < 3.0:
        # Herbivores/detritivores - higher efficiency
        return pb * 5.0
    elif trophic_level < 4.0:
        # Low-level carnivores
        return pb * 7.0
    else:
        # Top predators - lower efficiency
        return pb * 10.0

fetch_url

fetch_url(url: str, params: Optional[Dict] = None, timeout: int = 30, parse_json: bool = True) -> Union[str, Dict]

Fetch content from URL with automatic fallback to urllib.

Attempts to use the requests library if available, falling back to urllib.request if not. Optionally parses JSON responses.

Parameters:

Name Type Description Default
url str

URL to fetch

required
params dict

Query parameters to append to URL

None
timeout int

Request timeout in seconds

30
parse_json bool

If True, attempt to parse response as JSON. If parsing fails or parse_json is False, return raw text.

True

Returns:

Type Description
str or dict

Response content as dictionary (if JSON parsing succeeds) or string (if JSON parsing fails or is disabled)

Raises:

Type Description
HTTPError

If request fails (non-200 status code)

URLError

If connection fails

Examples:

>>> data = fetch_url("https://api.example.com/data")
>>> text = fetch_url("https://example.com/page", parse_json=False)
>>> filtered = fetch_url("https://api.example.com/search",
...                      params={"q": "marine species"})
Notes
  • Prefers requests library for better error handling and features
  • Automatically falls back to urllib if requests is not installed
  • JSON parsing is attempted but never raises an error if it fails
Source code in pypath/io/utils.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def fetch_url(
    url: str, params: Optional[Dict] = None, timeout: int = 30, parse_json: bool = True
) -> Union[str, Dict]:
    """Fetch content from URL with automatic fallback to urllib.

    Attempts to use the requests library if available, falling back to
    urllib.request if not. Optionally parses JSON responses.

    Parameters
    ----------
    url : str
        URL to fetch
    params : dict, optional
        Query parameters to append to URL
    timeout : int, default=30
        Request timeout in seconds
    parse_json : bool, default=True
        If True, attempt to parse response as JSON. If parsing fails or
        parse_json is False, return raw text.

    Returns
    -------
    str or dict
        Response content as dictionary (if JSON parsing succeeds) or
        string (if JSON parsing fails or is disabled)

    Raises
    ------
    urllib.error.HTTPError
        If request fails (non-200 status code)
    urllib.error.URLError
        If connection fails

    Examples
    --------
    >>> data = fetch_url("https://api.example.com/data")
    >>> text = fetch_url("https://example.com/page", parse_json=False)
    >>> filtered = fetch_url("https://api.example.com/search",
    ...                      params={"q": "marine species"})

    Notes
    -----
    - Prefers requests library for better error handling and features
    - Automatically falls back to urllib if requests is not installed
    - JSON parsing is attempted but never raises an error if it fails
    """
    if HAS_REQUESTS:
        # Use requests library (preferred)
        response = requests.get(url, params=params, timeout=timeout)
        response.raise_for_status()

        if parse_json:
            try:
                return response.json()
            except ValueError:
                return response.text
        else:
            return response.text

    else:
        # Fallback to urllib
        if params:
            from urllib.parse import urlencode

            url = f"{url}?{urlencode(params)}"

        with urllib.request.urlopen(url, timeout=timeout) as response:
            content = response.read().decode("utf-8")

            if parse_json:
                try:
                    import json

                    return json.loads(content)
                except ValueError:
                    return content
            else:
                return content

safe_float

safe_float(value: Any, default: Optional[float] = None) -> Optional[float]

Safely convert a value to float, handling booleans and strings.

This function handles various input types and edge cases when converting to float, including boolean values, empty strings, and common text representations of missing data.

Parameters:

Name Type Description Default
value Any

Value to convert to float

required
default float or None

Default value to return if conversion fails. If None (default), returns None on conversion failure.

None

Returns:

Type Description
float or None

Converted float value, or default/None if conversion fails

Examples:

>>> safe_float(42)
42.0
>>> safe_float("3.14")
3.14
>>> safe_float("NA")
None
>>> safe_float("invalid", default=0.0)
0.0
>>> safe_float(True)  # Booleans converted to numeric
1.0
>>> safe_float(False)
0.0
Notes
  • Boolean values (True/False) are converted to 1.0/0.0
  • Empty strings and common missing data indicators ('NA', 'nan', 'none', etc.) return None
  • Case-insensitive string matching for missing data indicators
Source code in pypath/io/utils.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def safe_float(value: Any, default: Optional[float] = None) -> Optional[float]:
    """Safely convert a value to float, handling booleans and strings.

    This function handles various input types and edge cases when converting
    to float, including boolean values, empty strings, and common text
    representations of missing data.

    Parameters
    ----------
    value : Any
        Value to convert to float
    default : float or None, optional
        Default value to return if conversion fails. If None (default),
        returns None on conversion failure.

    Returns
    -------
    float or None
        Converted float value, or default/None if conversion fails

    Examples
    --------
    >>> safe_float(42)
    42.0
    >>> safe_float("3.14")
    3.14
    >>> safe_float("NA")
    None
    >>> safe_float("invalid", default=0.0)
    0.0
    >>> safe_float(True)  # Booleans converted to numeric
    1.0
    >>> safe_float(False)
    0.0

    Notes
    -----
    - Boolean values (True/False) are converted to 1.0/0.0
    - Empty strings and common missing data indicators ('NA', 'nan', 'none', etc.)
      return None
    - Case-insensitive string matching for missing data indicators
    """
    if value is None:
        return None

    # Convert booleans to float (True → 1.0, False → 0.0)
    if isinstance(value, bool):
        return 1.0 if value else 0.0

    # Already numeric
    if isinstance(value, (int, float)):
        return float(value)

    # String conversion with special cases
    if isinstance(value, str):
        value_lower = value.lower().strip()

        # Boolean-like strings → numeric
        if value_lower in ("true", "yes"):
            return 1.0
        if value_lower in ("false", "no"):
            return 0.0

        # Common missing data indicators
        if value_lower in (
            "none",
            "",
            "na",
            "nan",
            "n/a",
        ):
            return None

        try:
            return float(value)
        except ValueError:
            return default

    # Fallback for other types
    return default