import numpy as np
import pandas as pd
# The purpose of this import block is to mask the dependency on internal
# IHME data and allow CI and automated testing to work.
try:
from vivarium_gbd_access import gbd
except ModuleNotFoundError:
[docs]
class GbdDummy:
"""Mock class to wrap internal dependency."""
def __getattr__(self, item):
raise ModuleNotFoundError("Required package vivarium_gbd_access not found.")
gbd = GbdDummy()
from .globals import CovariateData, CovariateDataSeq
from .util import clean_entity_list
CAUSE_SET_ID = 3
COMPUTATION_CAUSE_SET_ID = 2
RISK_SET_ID = 2
ETIOLOGY_SET_ID = 3
SURVEY_LOCATION_ID = 180
###############################################
# Canonical mappings between entities and ids #
###############################################
[docs]
def get_sequelae():
sequelae = gbd.get_sequela_id_mapping()
sequelae = pd.DataFrame(
{
"sequela_name": clean_entity_list(sequelae.sequela_name),
"sequela_id": sequelae.sequela_id,
}
)
return sequelae.sort_values("sequela_id")
[docs]
def get_etiologies():
etiologies = gbd.get_rei_metadata(rei_set_id=ETIOLOGY_SET_ID)
etiologies = etiologies[etiologies["most_detailed"] == 1]
etiologies = pd.DataFrame(
{"rei_name": clean_entity_list(etiologies.rei_name), "rei_id": etiologies.rei_id}
)
return etiologies.sort_values("rei_id")
[docs]
def get_causes(level=None):
causes = gbd.get_cause_metadata(cause_set_id=CAUSE_SET_ID)
# add causes we use for computation but not reporting
computational_causes = gbd.get_cause_metadata(cause_set_id=COMPUTATION_CAUSE_SET_ID)
missing_cause_ids = [
cause_id
for cause_id in computational_causes["cause_id"].values
if cause_id not in causes["cause_id"].values
]
missing_causes = computational_causes[
computational_causes["cause_id"].isin(missing_cause_ids)
]
causes = pd.concat([causes, missing_causes])
if level is not None:
causes = causes[causes.level == level]
causes = pd.DataFrame(
{"cause_name": clean_entity_list(causes.cause_name), "cause_id": causes.cause_id}
)
return causes.sort_values("cause_id")
[docs]
def get_risks():
risks = gbd.get_rei_metadata(rei_set_id=RISK_SET_ID)
risks = pd.DataFrame(
{"rei_name": clean_entity_list(risks.rei_name), "rei_id": risks.rei_id}
)
return risks.sort_values("rei_id")
[docs]
def get_covariates():
covariates = get_covariate_data()
covariates = {c[0]: c[1] for c in covariates}
covariates = pd.DataFrame.from_dict(covariates, orient="index").reset_index()
return covariates.rename(
columns={"index": "covariate_name", 0: "covariate_id"}
).sort_values("covariate_id")
#####################################
# Lists of entity names in id order #
#####################################
[docs]
def get_sequela_list():
return get_sequelae().sequela_name.tolist()
[docs]
def get_etiology_list():
return get_etiologies().rei_name.tolist()
[docs]
def get_cause_list():
return get_causes().cause_name.tolist()
[docs]
def get_risk_list():
return get_risks().rei_name.tolist()
[docs]
def get_covariate_list(with_survey=False):
return get_covariates().covariate_name.tolist()
#####################################################
# Functions to organize data for mapping production #
#####################################################
[docs]
def get_sequela_data() -> list:
sequelae = gbd.get_sequela_id_mapping()
return list(
zip(
clean_entity_list(sequelae.sequela_name),
sequelae.sequela_id,
sequelae.modelable_entity_id,
clean_entity_list(sequelae.healthstate_name),
sequelae.healthstate_id,
)
)
[docs]
def get_etiology_data() -> list:
etiologies = gbd.get_rei_metadata(rei_set_id=ETIOLOGY_SET_ID)
etiologies = etiologies[etiologies["most_detailed"] == 1]
return list(zip(clean_entity_list(etiologies.rei_name), etiologies.rei_id))
[docs]
def get_cause_data():
sequelae = gbd.get_sequela_id_mapping().sort_values("sequela_id")
etiologies = gbd.get_rei_metadata(rei_set_id=ETIOLOGY_SET_ID)
etiologies = etiologies[etiologies["most_detailed"] == 1].sort_values("rei_id")
cause_etiology_map = gbd.get_cause_etiology_mapping()
cause_me_map = gbd.get_cause_me_id_mapping()
cause_me_map["cause_name"] = clean_entity_list(cause_me_map["modelable_entity_name"])
cause_me_map = cause_me_map[["modelable_entity_id", "cause_name"]].set_index("cause_name")
causes = gbd.get_cause_metadata(cause_set_id=CAUSE_SET_ID)
# add causes we use for computation but not reporting
computational_causes = gbd.get_cause_metadata(cause_set_id=COMPUTATION_CAUSE_SET_ID)
missing_cause_ids = [
cause_id
for cause_id in computational_causes["cause_id"].values
if cause_id not in causes["cause_id"].values
]
missing_causes = computational_causes[
computational_causes["cause_id"].isin(missing_cause_ids)
]
causes = pd.concat([causes, missing_causes])
causes = pd.DataFrame(
{
"cause_name": clean_entity_list(causes.cause_name),
"cause_id": causes.cause_id,
"parent_id": causes.parent_id,
"most_detailed": causes.most_detailed,
"level": causes.level,
"male": causes.male.replace({np.NaN: False, 1: True}),
"female": causes.female.replace({np.NaN: False, 1: True}),
"yll_only": causes.yll_only.replace({np.NaN: False, 1: True}),
"yld_only": causes.yld_only.replace({np.NaN: False, 1: True}),
"yll_age_start": causes.yll_age_start.replace({np.NaN: 0}),
"yll_age_end": causes.yll_age_end.replace({np.NaN: 95}),
"yld_age_start": causes.yld_age_start.replace({np.NaN: 0}),
"yld_age_end": causes.yld_age_end.replace({np.NaN: 95}),
}
)
causes = (
causes.set_index("cause_name")
.join(cause_me_map)
.sort_values("cause_id")
.reset_index()
)
cause_data = []
for _, cause in causes.iterrows():
name = cause["cause_name"]
cid = cause["cause_id"]
parent = causes.set_index("cause_id").at[cause["parent_id"], "cause_name"]
dismod_id = cause["modelable_entity_id"]
most_detailed = cause["most_detailed"]
level = cause["level"]
restrictions = make_cause_restrictions(cause)
eti_ids = cause_etiology_map[cause_etiology_map.cause_id == cid].rei_id.tolist()
associated_etiologies = clean_entity_list(
etiologies[etiologies.rei_id.isin(eti_ids)].rei_name
)
associated_sequelae = clean_entity_list(
sequelae[sequelae.cause_id == cid].sequela_name
)
sub_causes = causes[causes.parent_id == cid].cause_name.tolist()
cause_data.append(
(
name,
cid,
dismod_id,
most_detailed,
level,
parent,
restrictions,
associated_sequelae,
associated_etiologies,
sub_causes,
)
)
return cause_data
[docs]
def get_age_restriction_edge(age_restriction, end=False):
id_map = {
0.0: [2, None],
0.01917808: [3, 2],
0.07671233: [388, 3],
0.10: [388, 3],
0.5: [389, 388],
1.0: [238, 389],
2.0: [34, 238],
5.0: [6, 34],
10.0: [7, 6],
15.0: [8, 7],
20.0: [9, 8],
30.0: [11, 10],
40.0: [13, 12],
45.0: [14, 13],
50.0: [15, 14],
55.0: [16, 15],
60.0: [17, 16],
65.0: [18, 17],
90.0: [32, 31],
95.0: [235, 32],
}
if not end:
edge = id_map[age_restriction][0]
else: # end
_edge = id_map[age_restriction][1]
edge = 235 if _edge == 32 else _edge
return edge
[docs]
def make_cause_restrictions(cause):
restrictions = (
("male_only", not cause["female"]),
("female_only", not cause["male"]),
("yll_only", cause["yll_only"]),
("yld_only", cause["yld_only"]),
(
"yll_age_group_id_start",
(
get_age_restriction_edge(cause["yll_age_start"])
if not cause["yld_only"]
else None
),
),
(
"yll_age_group_id_end",
(
get_age_restriction_edge(cause["yll_age_end"], end=True)
if not cause["yld_only"]
else None
),
),
(
"yld_age_group_id_start",
(
get_age_restriction_edge(cause["yld_age_start"])
if not cause["yll_only"]
else None
),
),
(
"yld_age_group_id_end",
(
get_age_restriction_edge(cause["yld_age_end"], end=True)
if not cause["yll_only"]
else None
),
),
)
if cause.cause_id in [367, 368, 369, 370, 374, 375, 376, 379, 741, 995, 1160]:
# These causes have some estimates for age group 15, but have cause restrictions
# listed as age group 14, since GBD reports based on a "standard" set of age groups
# for certain maternal causes, but does the computation for a somewhat wider interval
# for consistency.
restrictions = list(restrictions)
restrictions[5] = ("yll_age_group_id_end", 15)
restrictions[7] = ("yld_age_group_id_end", 15)
return tuple(restrictions)
[docs]
def get_risk_data() -> list:
risks = get_all_risk_metadata()
causes = get_causes().set_index("cause_id")
out = []
# Some polytomous risks have an explicit tmrel category, some do not.
contain_tmrel = [128, 339]
for rei_id, risk in risks.iterrows():
name = risk["rei_name"]
most_detailed = risk["most_detailed"]
level = risk["level"]
parent = risks.at[risk["parent_id"], "rei_name"]
paf_calculation_type = risk["rei_calculation_type"]
distribution = (
risk["exposure_type"].replace(" ", "_")
if not pd.isnull(risk["exposure_type"])
else None
)
if distribution in ["normal", "lognormal", "ensemble"]:
levels = None
scalar = risk["rr_scalar"] if not pd.isnull(risk["rr_scalar"]) else None
if pd.isnull(risk["tmred_dist"]):
tmred = (
("distribution", "draws"),
("min", None),
("max", None),
(
"inverted",
bool(int(risk["inv_exp"]))
if not pd.isnull(risk["inv_exp"])
else False,
),
)
else:
tmred = (
("distribution", risk["tmred_dist"]),
("min", risk["tmrel_lower"]),
("max", risk["tmrel_upper"]),
("inverted", bool(int(risk["inv_exp"]))),
)
elif distribution == "dichotomous":
levels = (("cat1", "Exposed"), ("cat2", "Unexposed"))
scalar = None
tmred = None
elif distribution in ["ordered_polytomous", "unordered_polytomous"]:
try:
levels = sorted(
[(cat, name) for cat, name in risk["category_map"].items()],
key=lambda x: int(x[0][3:]),
)
max_cat = int(levels[-1][0][3:]) + 1
if rei_id not in contain_tmrel:
levels.append((f"cat{max_cat}", "Unexposed"))
levels = tuple(levels)
except AttributeError: # sometimes the category map is nan
print(f'No levels found for {risk["rei_name"]}.')
levels = tuple()
scalar = None
tmred = None
else: # It's either a custom risk or an aggregate, so we have to do a bunch of checking.
if not pd.isnull(risk["category_map"]): # It's some strange categorical risk.
levels = sorted(
[(cat, name) for cat, name in risk["category_map"].items()],
key=lambda x: int(x[0][3:]),
)
levels = tuple(levels)
else:
levels = None
scalar = risk["rr_scalar"] if not pd.isnull(risk["rr_scalar"]) else None
if pd.isnull(risk["tmred_dist"]):
tmred = None
else:
tmred = (
("distribution", risk["tmred_dist"]),
("min", risk["tmrel_lower"]),
("max", risk["tmrel_upper"]),
("inverted", bool(risk["inv_exp"])),
)
if risk["affected_cause_ids"] is not np.nan:
affected_causes = tuple(
causes.at[cid, "cause_name"]
for cid in risk["affected_cause_ids"]
if cid in causes.index
)
else:
affected_causes = []
if risk["affected_rei_ids"] is not np.nan:
affected_risks = tuple(
risks.at[rei_id, "rei_name"] for rei_id in risk["affected_rei_ids"]
)
else:
affected_risks = []
if risk["paf_of_one_cause_ids"] is not np.nan:
paf_of_one_causes = tuple(
causes.at[cid, "cause_name"] for cid in risk["paf_of_one_cause_ids"]
)
else:
paf_of_one_causes = []
sub_risks = risks[risks.parent_id == rei_id].rei_name.tolist()
restrictions = (
("male_only", pd.isnull(risk["female"])),
("female_only", pd.isnull(risk["male"])),
("yll_only", pd.isnull(risk["yld"])),
("yld_only", pd.isnull(risk["yll"])),
(
"yll_age_group_id_start",
risk["yll_age_group_id_start"] if not pd.isnull(risk["yll"]) else None,
),
(
"yll_age_group_id_end",
risk["yll_age_group_id_end"] if not pd.isnull(risk["yll"]) else None,
),
(
"yld_age_group_id_start",
risk["yld_age_group_id_start"] if not pd.isnull(risk["yld"]) else None,
),
(
"yld_age_group_id_end",
risk["yld_age_group_id_end"] if not pd.isnull(risk["yld"]) else None,
),
)
if rei_id == 95: # iron deficiency fix
restrictions = list(restrictions)
restrictions[1] = ("female_only", False)
restrictions = tuple(restrictions)
if rei_id == 136: # non-exclusive breastfeeding fix
restrictions = list(restrictions)
restrictions[5] = ("yll_age_group_id_end", 388.0)
restrictions[7] = ("yld_age_group_id_end", 388.0)
restrictions = tuple(restrictions)
if rei_id == 137: # discounted breastfeeding fix
restrictions = list(restrictions)
restrictions[4] = ("yll_age_group_id_start", 238.0)
restrictions[5] = ("yll_age_group_id_end", 389.0)
restrictions[6] = ("yld_age_group_id_start", 238.0)
restrictions[7] = ("yld_age_group_id_end", 389.0)
restrictions = tuple(restrictions)
out.append(
(
name,
rei_id,
most_detailed,
level,
paf_calculation_type,
affected_causes,
paf_of_one_causes,
distribution,
levels,
tmred,
scalar,
restrictions,
parent,
sub_risks,
affected_risks,
)
)
return out
[docs]
def get_duplicate_indices(names: list[str]) -> list[int]:
dup_indices = []
check = set()
for i, v in enumerate(names):
if v not in check:
check.add(v)
else:
dup_indices.append(i)
return dup_indices
[docs]
def get_covariate_data() -> CovariateDataSeq:
covariates = gbd.get_covariate_metadata()
clean_names = clean_entity_list(covariates.covariate_name)
covariates.covariate_name = clean_names
dup_indices = get_duplicate_indices(clean_names)
covariates = covariates.drop(dup_indices).reset_index(drop=True)
vals: CovariateData = list(
zip(
covariates.covariate_name,
covariates.covariate_id,
covariates.by_age,
covariates.by_sex,
covariates.dichotomous,
)
)
return vals