diff --git a/gnomad_qc/v5/create_release/create_release_sites_ht.py b/gnomad_qc/v5/create_release/create_release_sites_ht.py new file mode 100644 index 000000000..f901324c3 --- /dev/null +++ b/gnomad_qc/v5/create_release/create_release_sites_ht.py @@ -0,0 +1,1197 @@ +"""Script to create release sites HT for v5 genomes.""" + +import argparse +import json +import logging +from datetime import datetime +from functools import reduce +from typing import Any, Dict, List, Optional, Union + +import hail as hl +from gnomad.resources.grch38.reference_data import ( + dbsnp, + lcr_intervals, + seg_dup_intervals, +) +from gnomad.utils.annotations import region_flag_expr +from gnomad.utils.vcf import ALLELE_TYPE_FIELDS, AS_FIELDS, SITE_FIELDS +from gnomad.utils.vep import update_loftee_end_trunc_filter +from hail.utils import new_temp_file + +from gnomad_qc.v5.annotations.insilico_predictors import get_sift_polyphen_from_vep + +# TODO: Add all these utils. +from gnomad_qc.v5.create_release.create_release_utils import ( + DBSNP_VERSION, + GENCODE_VERSION, + GENCODE_VERSIONS, + MANE_SELECT_VERSION, + MANE_SELECT_VERSIONS, + POLYPHEN_VERSION, + SEQREPO_VERSION, + SIFT_VERSION, + VEP_VERSIONS_TO_ADD, + VRS_PYTHON_VERSION, + VRS_SCHEMA_VERSION, + remove_missing_vep_fields, +) +from gnomad_qc.v5.resources.annotations import ( + get_freq, + get_info_ht, + get_insilico_predictors, + get_vep, + get_vrs, +) +from gnomad_qc.v5.resources.basics import qc_temp_prefix +from gnomad_qc.v5.resources.constants import ( + CURRENT_RELEASE, + DEFAULT_VEP_VERSION, + GNOMAD_TMP_BUCKET, + WORKSPACE_BUCKET, +) +from gnomad_qc.v5.resources.release import ( + FREQUENCY_README, + included_datasets_json_path, + release_sites, +) +from gnomad_qc.v5.resources.variant_qc import final_filter + +logging.basicConfig( + format="%(asctime)s (%(name)s %(lineno)s): %(message)s", + datefmt="%m/%d/%Y %I:%M:%S %p", +) +logger = logging.getLogger("create_release_ht") +logger.setLevel(logging.INFO) +# TODO: check AS FIELDS AND SITE FIELDS + +INSILICO_PREDICTORS = ["cadd", "revel", "spliceai", "pangolin", "phylop"] + + +# check release config +def get_finalized_schema(version: str) -> Dict[str, List[str]]: + """ + Get finalized schema for release HT based on version. + + :param version: Release version (e.g., "5.0"). + :return: Dictionary with "globals" and "rows" lists of field names. + """ + globals_list = [ + "freq_meta", + "freq_index_dict", + "freq_meta_sample_count", + "faf_meta", + "faf_index_dict", + "age_distribution", + "aou_downsamplings", + "filtering_model", + "inbreeding_coeff_cutoff", + "tool_versions", + "vrs_versions", + "vep_globals", + ] + rows_list = [ + "freq", + "grpmax", + "faf", + "fafmax", + "a_index", + "was_split", + "rsid", + "filters", + "info", + "vep", + "vqsr_results", + "region_flags", + "allele_info", + "histograms", + "in_silico_predictors", + ] + + # Add additional VEP versions based on release version. + if version in VEP_VERSIONS_TO_ADD: + for vep_version in VEP_VERSIONS_TO_ADD[version]: + globals_list.append(f"vep{vep_version}_globals") + rows_list.append(f"vep{vep_version}") + + globals_list.extend( + [ + "frequency_README", + "date", + "version", + ] + ) + + return { + "globals": globals_list, + "rows": rows_list, + } + + +# Default schema for backward compatibility. +FINALIZED_SCHEMA = get_finalized_schema(CURRENT_RELEASE) + + +def get_tables_for_release(version: str) -> List[str]: + """ + Get list of tables to include in release based on version. + + :param version: Release version (e.g., "5.0"). + :return: List of table names to include in release. + """ + tables = [ + "dbsnp", + "filters", + "freq", + "info", + "region_flags", + "in_silico", + "vep", + ] + # Add additional VEP versions based on release version. + if version in VEP_VERSIONS_TO_ADD: + for vep_version in VEP_VERSIONS_TO_ADD[version]: + tables.append(f"vep{vep_version}") + return tables + + +# Default tables for backward compatibility. +TABLES_FOR_RELEASE = get_tables_for_release(CURRENT_RELEASE) + + +# Config is added as a function, so it is not evaluated until the function is called. +def get_config( + tables_for_join: List[str], + release_exists: bool = False, + version: str = CURRENT_RELEASE, + base_release_version: Optional[str] = None, + test: bool = False, + environment: str = "rwb", +) -> Dict[str, Dict[str, hl.expr.Expression]]: + """ + Get configuration dictionary for specified data type. + + Format: + + .. code-block:: + + '': { + 'ht': '', + 'path': 'gs://path/to/hail_table.ht', + 'select': '', + 'field_name': '', + 'custom_select': '', + 'select_globals': '' + 'custom_globals_select': '' + + }, + + .. warning:: + + The 'in_silico' key's 'ht' logic is handled separately because it is a list of + HTs. In this list, the phyloP HT is keyed by locus only and thus the 'ht' code + below sets the join key to 1, which will grab the first key of + ht.key.dtype.values() e.g. 'locus', when an HT's keys are not {'locus', + 'alleles'}. + All future in_silico predictors should have the keys confirmed to be 'locus' + with or without 'alleles' before using this logic. + + :param tables_for_join: List of tables to join. + :param release_exists: Whether the release HT already exists. If True, uses the + specified `version` as the base release HT. Mutually exclusive with + `base_release_version`. + :param version: Release version for output (e.g., "5.0"). + :param base_release_version: Specific release version to use as the base HT + (e.g., "5.0"). When provided, loads that version's release HT as the base. + Mutually exclusive with `release_exists`. + :param test: Whether or not to use the test versions of the datasets when available. + :param environment: Environment to use. Default is "rwb". + :return: Dict of dataset's configs. + """ + if base_release_version is not None and release_exists: + raise ValueError( + "Cannot specify both --release-exists and --base-release-version. " + "Use --base-release-version to specify a specific version to use as the " + "base, or use --release-exists to use the output version as the base." + ) + + config = {} + + if "dbsnp" in tables_for_join: + config["dbsnp"] = { + "ht": dbsnp.ht(), + "path": dbsnp.path, + "select": ["rsid"], + } + if "filters" in tables_for_join or "info" in tables_for_join: + config["filters"] = { + "ht": final_filter().ht(), + "path": final_filter().path, + "select": ["filters"], + "custom_select": custom_filters_select, + "custom_globals_select": custom_filters_select_globals, + } + if "in_silico" in tables_for_join: + config["in_silico"] = { + "ht": reduce( + ( + lambda joined_ht, ht: ( + joined_ht.join(ht, "outer") + if set(ht.key) == {"locus", "alleles"} + else joined_ht.join(ht, "outer", _join_key=1) + ) + ), + [ + get_insilico_predictors(predictor=predictor).ht() + for predictor in INSILICO_PREDICTORS + ], + ), + "path": [ + get_insilico_predictors(predictor=predictor).path + for predictor in INSILICO_PREDICTORS + ], + "field_name": "in_silico_predictors", + "select": [ + "cadd", + "revel_max", + "spliceai_ds_max", + "pangolin_largest_ds", + "phylop", + ], + "custom_select": custom_in_silico_select, + "select_globals": [ + "cadd_version", + "revel_version", + "spliceai_version", + "pangolin_version", + "phylop_version", + ], + "global_name": "tool_versions", + } + if "info" in tables_for_join: + config["info"] = { + "ht": get_info_ht(test=test, environment=environment).ht(), + "path": get_info_ht(test=test, environment=environment).path, + "select": ["was_split", "a_index"], + "custom_select": custom_info_select, + } + if "freq" in tables_for_join or "info" in tables_for_join: + config["freq"] = { + "ht": get_freq( + test=test, + data_type="genomes", + data_set="merged", + environment=environment, + ).ht(), + "path": get_freq( + test=test, + data_type="genomes", + data_set="merged", + environment=environment, + ).path, + "select": [ + "freq", + "faf", + "histograms", + "grpmax", + "fafmax", + ], + "select_globals": [ + "freq_meta", + "freq_index_dict", + "freq_meta_sample_count", + "faf_meta", + "faf_index_dict", + "age_distribution", + "aou_downsamplings", + ], + } + if "vep" in tables_for_join: + config["vep"] = { + # TODO: add vep version param. + "ht": get_vep( + vep_version=DEFAULT_VEP_VERSION, environment=environment + ).ht(), + "path": get_vep( + vep_version=DEFAULT_VEP_VERSION, environment=environment + ).path, + "select": ["vep"], + "custom_select": get_custom_vep_select(DEFAULT_VEP_VERSION), + "custom_globals_select": get_custom_vep_globals_select(), + "global_name": "vep_globals", + } + + # Dynamically add additional VEP versions based on release version. + if version in VEP_VERSIONS_TO_ADD: + for vep_version in VEP_VERSIONS_TO_ADD[version]: + vep_table_name = f"vep{vep_version}" + if vep_table_name in tables_for_join: + config[vep_table_name] = { + "ht": ( + get_vep( + vep_version=vep_version, test=test, environment=environment + ).ht() + ), + "path": ( + get_vep( + vep_version=vep_version, test=test, environment=environment + ).path + ), + "select": [], + "custom_select": get_custom_vep_select(vep_version), + "custom_globals_select": get_custom_vep_globals_select(), + "global_name": f"{vep_table_name}_globals", + } + if "region_flags" in tables_for_join: + config["region_flags"] = { + "ht": get_freq( + test=test, + data_type="genomes", + data_set="merged", + environment=environment, + ).ht(), + "path": get_freq( + test=test, + data_type="genomes", + data_set="merged", + environment=environment, + ).path, + "custom_select": custom_region_flags_select, + } + + if release_exists or base_release_version is not None: + # Determine which release version to use as the base HT. + base_version = base_release_version or version + release_res = release_sites(environment=environment).versions[base_version] + release_ht = release_res.ht() + config["release"] = { + "ht": release_ht, + "path": release_res.path, + "select": [r for r in release_ht.row_value], + "select_globals": [g for g in release_ht.globals], + } + return config + + +def custom_in_silico_select(ht: hl.Table, **_) -> Dict[str, hl.expr.Expression]: + """ + Get in silico predictors from the VEP resource for the given table. + + This function currently selects only SIFT and Polyphen from VEP. + + :param ht: VEP Hail Table. + :return: Select expression dict. + """ + # TODO: edit get_vep with params + vep_in_silico = get_sift_polyphen_from_vep(get_vep(environment=environment).ht()) + selects = { + "sift_max": vep_in_silico[ht.key].sift_max, + "polyphen_max": vep_in_silico[ht.key].polyphen_max, + } + return selects + + +def custom_region_flags_select(ht: hl.Table, **_) -> Dict[str, hl.expr.Expression]: + """ + Select region flags for release. + + :param ht: Hail Table. + :return: Select expression dict. + """ + selects = { + "region_flags": region_flag_expr( + ht, + non_par=True, + prob_regions={"lcr": lcr_intervals.ht(), "segdup": seg_dup_intervals.ht()}, + ) + } + + return selects + + +def custom_filters_select(ht: hl.Table, **_) -> Dict[str, hl.expr.Expression]: + """ + Select gnomAD filter HT fields for release dataset. + + Extract "results" field and rename based on filtering method. + + :param ht: Filters Hail Table. + :return: Select expression dict. + """ + filter_to_name = { + "RF": "random_forest_results", + "AS_VQSR": "vqsr_results", + "IF": "isolation_forest_results", + } + + filter_name = hl.eval(ht.filtering_model.filter_name) + + if filter_name not in filter_to_name: + raise ValueError(f"Filtering method '{filter_name}' not recognized.") + + name = filter_to_name[filter_name] + selects = {name: ht.results.annotate(**ht.training_info)} + + return selects + + +def custom_filters_select_globals( + globals_expr: hl.expr.StructExpression, +) -> Dict[str, hl.expr.Expression]: + """ + Select filter HT globals for release dataset. + + :param globals_expr: Globals struct from a Filters Hail Table. + :return: Select expression dict. + """ + selects = { + "filtering_model": hl.struct( + **{ + "filter_name": globals_expr.filtering_model.filter_name, + "score_name": globals_expr.filtering_model.score_name, + "snv_cutoff": globals_expr.filtering_model.snv_cutoff.drop("bin_id"), + "indel_cutoff": globals_expr.filtering_model.indel_cutoff.drop( + "bin_id" + ), + "snv_training_variables": ( + globals_expr.filtering_model.snv_training_variables + ), + "indel_training_variables": ( + globals_expr.filtering_model.indel_training_variables + ), + } + ), + "inbreeding_coeff_cutoff": globals_expr.inbreeding_coeff_cutoff, + } + + return selects + + +def custom_info_select( + ht: hl.Table, config: Dict[str, Dict[str, Any]] +) -> Dict[str, hl.expr.Expression]: + """ + Select fields for info Hail Table annotation in release. + + The info field requires fields from the freq HT and the filters HT so those are + pulled in here along with all info HT fields. It also adds the `allele_info` struct + to release HT. + + :param ht: Info Hail Table. + :return: Select expression dict. + """ + # Create a dict of the fields from the filters HT that we want to add to the info. + filters_ht = config.get("filters")["ht"] + + score_name = hl.eval(filters_ht.filtering_model.score_name) + filters_ht = filters_ht.transmute(**filters_ht.truth_sets) + filters = filters_ht[ht.key] + filters_info_fields = [ + "singleton", + "transmitted_singleton", + # "sibling_singleton", + "omni", + "mills", + "monoallelic", + "only_het", + ] + # if data_type == "genomes": + # filters_info_fields.remove("sibling_singleton") + filters_info_dict = {field: filters[field] for field in filters_info_fields} + filters_info_dict[score_name] = filters[score_name] + + # Create a dict of the fields from the freq HT that we want to add to the info. + freq_ht = config.get("freq")["ht"] + # TODO: will change back to 'inbreeding_coeff' once we have the new freq_ht? + freq_info_dict = {"InbreedingCoeff": freq_ht[ht.key]["InbreedingCoeff"]} + + # Create a dict of the fields from the VRS HT that we want to add to the info. + vrs_ht = get_vrs(data_type="genomes").ht() + + # This is the schema of the updated VRS HT generated by Kyle for v4.1.1 and beyond versions: + # 'info': struct { + # VRS_Allele_IDs: array, + # VRS_Starts: array, + # VRS_Ends: array, + # VRS_States: array, + # VRS_Lengths: array, + # VRS_RepeatSubunitLengths: array + # } + # Rename info to `vrs`. + vrs_ht = vrs_ht.transmute(vrs=vrs_ht.info) + + vrs_info_fields = {"vrs": vrs_ht[ht.key].vrs} + + # Create a dict of the fields from the info HT that we want keep in the info. + # v3 info HT has no SOR or AS_SOR fields. They are computed by VQSR, so we can + # grab them from the filters HT. + # TODO: Check back on v4 logic, AS_SOR is already on info ht, is SOR needed? + info_struct = hl.struct(**ht.info, SOR=filters.SOR) + + info_dict = {field: info_struct[field] for field in SITE_FIELDS + AS_FIELDS} + info_dict.update(filters_info_dict) + info_dict.update(freq_info_dict) + info_dict.update(vrs_info_fields) + + # Select the info and allele info annotations. We drop nonsplit_alleles from + # allele_info so that we don't release alleles that are found in non-releasable + # samples. + selects = {"info": hl.struct(**info_dict)} + # TODO: Check back on v4 logic. + selects["allele_info"] = ht.allele_info.drop("nonsplit_alleles") + + return selects + + +def get_custom_vep_globals_select(): + """ + Get custom globals select function for VEP globals. + + :return: Custom globals select function. + """ + + def custom_vep_globals_select( + globals_expr: hl.expr.StructExpression, + ) -> Dict[str, hl.expr.Expression]: + """ + Select globals for VEP Hail Table annotation in release. + + :param globals_expr: Globals struct from a VEP Hail Table. + :return: Select expression dict. + """ + return { + "vep_version": globals_expr.vep_version, + "vep_help": globals_expr.vep_help, + "vep_config": globals_expr.vep_config, + } + + return custom_vep_globals_select + + +def get_custom_vep_select(vep_version: str): + """ + Get custom VEP select function for a given VEP version. + + :param vep_version: VEP version (e.g., "105", "115"). + :return: Custom select function for the VEP version. + """ + + def custom_vep_version_select(ht: hl.Table, **_) -> Dict[str, hl.expr.Expression]: + """ + Select fields for VEP Hail Table annotation in release. + + This custom select function does the following: + + - For VEP 105: Removes fields from VEP annotations that have been excluded + in past releases or are missing in all rows. + - Drops sift_prediction, sift_score, polyphen_prediction, and polyphen_score + fields from the transcript_consequences array. + - Updates the loftee annotations to use a GERP score threshold of 0 instead + of the default applied by the LOFTEE plugin. + + :param ht: VEP Hail table. + :return: Select expression dict. + """ + # Only apply remove_missing_vep_fields for VEP 105. + if vep_version == "105": + vep_expr = remove_missing_vep_fields(ht.vep) + else: + vep_expr = ht.vep + + field_name = ( + f"vep{vep_version}" if vep_version != DEFAULT_VEP_VERSION else "vep" + ) + csqs_expr = vep_expr.transcript_consequences.map( + lambda x: update_loftee_end_trunc_filter(x) + ) + + # Only drop sift/polyphen fields for default VEP version. + if vep_version == DEFAULT_VEP_VERSION: + csqs_expr = csqs_expr.map( + lambda x: x.drop( + "sift_prediction", + "sift_score", + "polyphen_prediction", + "polyphen_score", + ) + ) + + return {field_name: vep_expr.annotate(transcript_consequences=csqs_expr)} + + return custom_vep_version_select + + +def get_select_global_fields( + ht: hl.Table, + config: Dict[str, Dict[str, Any]], + tables_for_join: List[str] = TABLES_FOR_RELEASE, +) -> Dict[str, hl.expr.Expression]: + """ + Generate a dictionary of globals to select by checking the config of all tables joined. + + .. note:: + + This function will place the globals within the select_globals value above + any globals returned from custom_select_globals. If ordering is important, use + only custom_select_globals. + + :param ht: Final joined HT with globals. + :param config: Dictionary with configuration options for each dataset. Expects the + dataset name (matching values in `tables_for_join`) as the key and a dictionary + of options as the value, where the options include any of the following keys: + 'select_globals', 'custom_globals_select', 'global_name'. + :param tables_for_join: List of tables to join into final release HT. + :return: Select mapping from global annotation name to `ht` annotation. + """ + t_globals = [] + for t in tables_for_join: + select_globals = {} + t_config = config.get(t) + if t_config is None: + available_tables = ", ".join(sorted(config.keys())) + raise KeyError( + f"Table '{t}' not found in configuration. Available tables: {available_tables}" + ) + if "select_globals" in t_config: + select_globals = get_select_fields(t_config["select_globals"], ht) + if t_config.get("custom_globals_select"): + custom_globals_select_fn = t_config["custom_globals_select"] + # Use source HT's globals via index_globals() so + # custom_globals_select reads from the original table, not + # the joined HT where globals with the same name (e.g., + # vep_version) may have been overwritten. + source_globals = t_config["ht"].index_globals() if "ht" in t_config else ht + select_globals = { + **select_globals, + **custom_globals_select_fn(source_globals), + } + if "global_name" in t_config: + global_name = t_config.get("global_name") + select_globals = {global_name: hl.struct(**select_globals)} + t_globals.append(select_globals) + + t_globals = reduce(lambda a, b: dict(a, **b), t_globals) + + return t_globals + + +def get_select_fields( + selects: Union[List, Dict], base_ht: hl.Table +) -> Dict[str, hl.expr.Expression]: + """ + Generate a select dict from traversing the `base_ht` and extracting annotations. + + :param selects: Mapping or list of selections. + :param base_ht: Base Hail Table to traverse. + :return: select Mapping from annotation name to `base_ht` annotation. + """ + select_fields = {} + if selects is not None: + if isinstance(selects, list): + select_fields = {selection: base_ht[selection] for selection in selects} + else: + raise ValueError(f"Invalid selects type: {type(selects)}") + return select_fields + + +def get_final_ht_fields( + ht: hl.Table, + schema: Dict[str, List[str]] = FINALIZED_SCHEMA, +) -> Dict[str, List[str]]: + """ + Get the final fields for the release HT. + + Create a dictionary of lists of fields that are in the `schema` and are + present in the HT. If a field is not present in the HT, log a warning. + + :param ht: Hail Table. + :param schema: Schema for the release HT. + :return: Dict of final fields for the release HT. + """ + final_fields = {"rows": [], "globals": []} + for field in schema["rows"]: + if field in ht.row: + final_fields["rows"].append(field) + else: + logger.warning(f"Field {field} from schema not found in HT.") + for field in schema["globals"]: + if field in ht.globals: + final_fields["globals"].append(field) + else: + logger.warning(f"Global {field} from schema not found in HT.") + + return final_fields + + +def get_ht( + dataset: str, + config: Dict[str, Dict[str, Any]], + use_config_ht: bool = False, + checkpoint: bool = False, + test: bool = False, + _intervals: Optional[Any] = None, + base_ht_filter: Optional[hl.Table] = None, +) -> hl.Table: + """ + Return the appropriate Hail table with selects applied. + + :param dataset: Hail Table to join. + :param config: Dictionary with configuration options for each dataset. Expects the + dataset name as the key and a dictionary of options as the value, where the + options include some of the following: 'ht', 'path', 'select', 'field_name', + 'custom_select'. + :param use_config_ht: Whether to use the 'ht' in the dataset config instead of + reading in `path`. Default is False. + :param checkpoint: Whether to checkpoint the Hail Table. Default is False. + :param test: Whether call is for a test run. If True, the dataset will be filtered + to PCSK9. Default is False. + :param _intervals: Intervals for reading in hail Table. Used to optimize join. + :param base_ht_filter: Optional Hail Table to filter on before joining. Default is + None. + :return: Hail Table with fields to select. + """ + logger.info("Getting the %s dataset and its selected annotations...", dataset) + dataset_config = config[dataset] + + if use_config_ht: + ht = dataset_config["ht"] + else: + logger.info("Reading in %s", dataset) + ht = hl.read_table(dataset_config["path"], _intervals=_intervals) + + if test: + # Keep only PCSK9. + ht = hl.filter_intervals( + ht, + [ + hl.parse_locus_interval( + "chr1:55039447-55064852", reference_genome="GRCh38" + ) + ], + ) + + select_fields = get_select_fields(dataset_config.get("select"), ht) + + if "custom_select" in dataset_config: + custom_select_fn = dataset_config["custom_select"] + select_fields = { + **select_fields, + **custom_select_fn(ht, config=config), + } + + if "field_name" in dataset_config: + field_name = dataset_config.get("field_name") + select_query = {field_name: hl.struct(**select_fields)} + else: + select_query = select_fields + + ht = ht.select(**select_query) + + if base_ht_filter: + ht_key = list(ht.key) + if list(base_ht_filter.key) != ht_key: + base_ht_filter = base_ht_filter.key_by(*ht_key) + ht = ht.semi_join(base_ht_filter) + + if checkpoint: + ht = ht.checkpoint(new_temp_file(f"{dataset}.for_release", "ht")) + + return ht + + +def join_hts( + base_table: str, + tables: List[str], + config: Dict[str, Dict[str, Any]], + version: str, + new_partition_percent: Optional[float] = None, + new_n_partitions: Optional[float] = None, + checkpoint_tables: bool = False, + track_included_datasets: bool = False, + use_annotate: bool = False, + test: bool = False, + environment: str = "rwb", +) -> hl.Table: + """ + Outer join a list of Hail Tables. + + :param base_table: Dataset to use for interval partitioning. + :param tables: List of tables to join. + :param config: Dictionary with configuration options for each dataset. Expects the + dataset name (matching values in `tables`) as the key and a dictionary of + options as the value, where the options include some of the following: 'ht', + 'path', 'select', 'field_name', 'custom_select', 'select_globals', + 'custom_globals_select', 'global_name'. + :param version: Release version. + :param new_partition_percent: Percent of base_table partitions used for final + release Hail Table. + :param new_n_partitions: Number of partitions for final release Hail Table. + :param checkpoint_tables: Whether to checkpoint the tables before join. Default is + False. + :param track_included_datasets: Whether to track the datasets included in the + release. This is used to update the included_datasets json file. Default is + False. + :param use_annotate: Whether to use annotate instead of join. Default is False. + :param test: Whether this is for a test run. Default is False. + :param environment: Environment to use. Default is "rwb". + :return: Hail Table with datasets joined. + """ + logger.info( + "Reading in %s to determine partition intervals for efficient join", + base_table, + ) + + partition_intervals = None + if new_n_partitions or new_partition_percent: + base_ht = get_ht( + dataset=base_table, + config=config, + use_config_ht=True, + test=test, + ) + new_n_partitions = new_n_partitions or base_ht.n_partitions() + if new_partition_percent: + new_n_partitions = int(base_ht.n_partitions() * new_partition_percent) + + partition_intervals = base_ht._calculate_new_partitions(new_n_partitions) + + base_ht = get_ht( + dataset=base_table, + config=config, + checkpoint=checkpoint_tables, + _intervals=partition_intervals, + test=test, + ) + + # Remove base_table from tables if it is in the list and add it to the front. + if base_table in tables: + tables.remove(base_table) + + logger.info("Joining datasets: %s...", tables) + hts = [base_ht] + [ + get_ht( + dataset=table, + config=config, + # There is no single path for insilico predictors, so we need to handle + # this case separately. + use_config_ht=True if table in ["in_silico", "exomes_an"] else False, + checkpoint=checkpoint_tables, + _intervals=partition_intervals, + test=test, + base_ht_filter=base_ht.select(), + ) + for table in tables + ] + + if use_annotate: + joined_ht = reduce( + lambda joined_ht, ht: joined_ht.annotate(**ht[joined_ht.key]), + hts, + ) + hts = [g_ht.index_globals() for g_ht in hts] + global_struct = reduce(lambda g, ann_g: g.annotate(**ann_g), hts) + joined_ht = joined_ht.select_globals(**global_struct) + else: + joined_ht = reduce((lambda joined_ht, ht: joined_ht.join(ht, "left")), hts) + + joined_ht = joined_ht.select_globals( + **get_select_global_fields(joined_ht, config, [base_table] + tables) + ) + + if track_included_datasets: + # Track the datasets we've added as well as the source paths. + # If release HT is included in tables, read in the included datasets json + # and update the keys to the path for any new tables + included_datasets = {} + if "release" in tables: + with hl.utils.hadoop_open( + included_datasets_json_path( + test=test, + release_version=hl.eval(config["release"]["ht"].version), + environment=environment, + ) + ) as f: + included_datasets = json.loads(f.read()) + + included_datasets.update({t: config[t]["path"] for t in tables}) + + with hl.utils.hadoop_open( + included_datasets_json_path( + test=test, release_version=version, environment=environment + ), + "w", + ) as f: + f.write(hl.eval(hl.json(included_datasets))) + + return joined_ht + + +def check_duplicate_rows_in_config_hts( + config: dict, +) -> None: + """ + Check all Hail Tables in a config dict for duplicate rows and raises a ValueError if any HT contains duplicate rows. + + :param config: Dictionary with configuration options for each dataset. Expects the + dataset name as the key and a dictionary of options as the value. Options that include a + value for 'ht' will be checked for duplicate rows. + :param logger: Logger object. + :return: None. + """ + dup_errors = [] + + for name, cfg in config.items(): + if "ht" not in cfg: + continue + + ht = cfg["ht"] + total_count = ht.count() + distinct_count = ht.select().distinct().count() + + if distinct_count != total_count: + dup_errors.append( + f"HT {name} has {distinct_count} distinct rows but " + f"{total_count} total rows." + ) + else: + logger.info(f"HT {name} has no duplicate rows.") + + if dup_errors: + raise ValueError("\n".join(dup_errors)) + + +def add_global_annotations(ht: hl.Table, version: str) -> hl.Table: + """ + Add global version annotations to the release HT. + + Adds: + - VEP metadata (gencode + MANE Select versions) + - Tool versions (dbSNP, SIFT, PolyPhen) + - VRS-related versioning + - Release date and version + - Frequency README + :param ht: Hail Table. + :param version: Version of the release. + :return: Hail Table with global version annotations. + """ + # Add additional globals that were not present on the joined HTs. + globals_to_annotate = { + "vep_globals": ht.vep_globals.annotate( + gencode_version=GENCODE_VERSION, + mane_select_version=MANE_SELECT_VERSION, + ), + "tool_versions": ht.tool_versions.annotate( + dbsnp_version=DBSNP_VERSION, + sift_version=SIFT_VERSION, + polyphen_version=POLYPHEN_VERSION, + ), + "vrs_versions": hl.struct( + **{ + "vrs_schema_version": VRS_SCHEMA_VERSION, + "vrs_python_version": VRS_PYTHON_VERSION, + "seqrepo_version": SEQREPO_VERSION, + }, + ), + "date": datetime.now().isoformat(), + "version": version, + "frequency_README": FREQUENCY_README, + } + + # Add additional VEP version globals annotations if they exist + if version in VEP_VERSIONS_TO_ADD: + for vep_version in VEP_VERSIONS_TO_ADD[version]: + vep_globals_name = f"vep{vep_version}_globals" + if vep_globals_name in ht.globals: + globals_to_annotate[vep_globals_name] = ht[vep_globals_name].annotate( + gencode_version=GENCODE_VERSIONS[vep_version], + mane_select_version=MANE_SELECT_VERSIONS[vep_version], + ) + + ht = ht.annotate_globals(**globals_to_annotate) + + return ht + + +def main(args): + """Create release ht.""" + if args.rwb: + environment = "rwb" + hl.init( + log="/home/jupyter/workspaces/gnomadproduction/create_release_ht.log", + tmp_dir=f"gs://{WORKSPACE_BUCKET}/tmp/4_day", + ) + else: + environment = "batch" + hl.init( + tmp_dir=f"gs://{GNOMAD_TMP_BUCKET}-4day", + log="create_release_ht.log", + ) + # TODO: Add machine configurations for Batch. + hl.default_reference("GRCh38") + overwrite = args.overwrite + test = args.test + + # Determine tables to join based on version if not explicitly provided. + tables_for_join = ( + args.tables_for_join + if args.tables_for_join + else get_tables_for_release(args.version) + ) + + # Get schema based on version. + finalized_schema = get_finalized_schema(args.version) + + logger.info( + "Getting config for release HT to check Tables for duplicate variants..." + ) + config = get_config( + tables_for_join=tables_for_join, + release_exists=args.release_exists, + version=args.version, + base_release_version=args.base_release_version, + test=test, + environment=environment, + ) + + # Optionally check all config HTs for duplicate keys before joining. + # Skip for test runs or when --no-dup-check is specified. + if not args.test and not args.no_dup_check: + logger.info("Checking config HTs for duplicate keys...") + check_duplicate_rows_in_config_hts(config) + + logger.info("Creating release HT...") + ht = join_hts( + args.base_table, + tables_for_join, + config, + version=args.version, + new_partition_percent=args.new_partition_percent, + track_included_datasets=True, + # Use annotate if the release exists or the base release version is specified + # because we want to replace the annotations instead of joining them. + use_annotate=args.release_exists or args.base_release_version, + test=test, + environment=environment, + ) + + # TODO: Check if this filter is still needed. + # Filter out chrM, AS_lowqual sites (these sites are dropped in the final_filters HT + # so will not have information in `filters`) and AC_raw == 0. + logger.info("Filtering out chrM, AS_lowqual, and AC_raw == 0 sites...") + ht = hl.filter_intervals(ht, [hl.parse_locus_interval("chrM")], keep=False) + ht = ht.filter(hl.is_defined(ht.filters) & (ht.freq[1].AC > 0)) + + logger.info("Finalizing the release HT globals...") + ht = add_global_annotations(ht, version=args.version) + + # Organize the fields in the release HT to match the order of finalized_schema when + # the fields are present in the HT. + final_fields = get_final_ht_fields(ht, schema=finalized_schema) + ht = ht.select(*final_fields["rows"]).select_globals(*final_fields["globals"]) + + output_path = ( + f"{qc_temp_prefix(environment=environment)}release/gnomad.genomes.sites.test.{datetime.today().strftime('%Y-%m-%d')}.ht" + if test + else release_sites(environment=environment, public=False).path + ) + + logger.info(f"Writing out release HT to %s", output_path) + ht = ht.naive_coalesce(args.n_partitions).checkpoint( + output_path, + args.overwrite, + ) + + logger.info("Final release HT schema:") + ht.describe() + + logger.info("Final variant count: %d", ht.count()) + + +def get_script_argument_parser() -> argparse.ArgumentParser: + """Get script argument parser.""" + parser = argparse.ArgumentParser() + parser.add_argument( + "--rwb", + help="Run the script in RWB environment.", + action="store_true", + ) + parser.add_argument( + "--new-partition-percent", + help=( + "Percent of start dataset partitions to use for release HT. Default is 1.1" + " (110%)" + ), + type=float, + default=1.1, + ) + parser.add_argument("--overwrite", help="Overwrite data", action="store_true") + parser.add_argument( + "-v", + "--version", + help="The version of gnomAD.", + default=CURRENT_RELEASE, + ) + parser.add_argument( + "-t", + "--test", + help="Runs a test on PCSK9 region, chr1:55039447-55064852", + action="store_true", + ) + + parser.add_argument( + "-j", + "--tables-for-join", + help="Tables to join for release", + default=None, + type=str, + nargs="+", + ) + parser.add_argument( + "-b", + "--base-table", + help="Base table for interval partition calculation.", + default="freq", + ) + parser.add_argument( + "--release-exists", + help=( + "Use the release version specified by --version as the base HT. When " + "specified, loads the release HT for the version being updated and uses it " + "as the base for joining other tables. This is useful when updating an " + "existing release in place. Mutually exclusive with --base-release-version." + ), + action="store_true", + ) + parser.add_argument( + "--base-release-version", + help=( + "Specific release version to use as the base HT (e.g., '4.1'). When " + "specified, loads that version's release HT as the base for joining other " + "tables. This is useful when creating a new release version (specified by " + "--version) from an existing release version. For example, to create " + "v4.1.1 from v4.1 base, use --base-release-version 4.1 --version 4.1.1. " + "Mutually exclusive with --release-exists." + ), + type=str, + default=None, + ) + + parser.add_argument( + "--n-partitions", + help="Number of partitions to naive coalesce the release Table to.", + type=int, + default=10000, + ) + parser.add_argument( + "--no-dup-check", + help="Skip duplicate check.", + action="store_true", + ) + + return parser + + +if __name__ == "__main__": + parser = get_script_argument_parser() + args = parser.parse_args() + + main(args) diff --git a/gnomad_qc/v5/create_release/create_release_utils.py b/gnomad_qc/v5/create_release/create_release_utils.py new file mode 100644 index 000000000..9b463dec6 --- /dev/null +++ b/gnomad_qc/v5/create_release/create_release_utils.py @@ -0,0 +1,30 @@ +"""Common utils for creating gnomAD v5 genome release.""" + +import hail as hl + +from gnomad_qc.v5.resources.constants import DEFAULT_VEP_VERSION + +# Tool versions used in the release that were not on the source tables. +DBSNP_VERSION = "b156" +SIFT_VERSION = "5.2.2" +POLYPHEN_VERSION = "2.2.2" +VRS_SCHEMA_VERSION = "2.0.1" +VRS_PYTHON_VERSION = "2.2.0" +SEQREPO_VERSION = "2024-12-20" + +# GENCODE versions by VEP version. +GENCODE_VERSIONS = { + "105": "Release 39", + "115": "Release 49", +} +# MANE Select versions by VEP version. +MANE_SELECT_VERSIONS = { + "105": "v0.95", + "115": "v1.4", +} +# Backward compatibility: default to DEFAULT_VEP_VERSION. +GENCODE_VERSION = GENCODE_VERSIONS[DEFAULT_VEP_VERSION] +MANE_SELECT_VERSION = MANE_SELECT_VERSIONS[DEFAULT_VEP_VERSION] + +# NOTE: VEP 115 annotations were added in v4.1.1. +VEP_VERSIONS_TO_ADD = {"5.0": ["115"]} diff --git a/gnomad_qc/v5/resources/annotations.py b/gnomad_qc/v5/resources/annotations.py index 1abb79bab..8fd877366 100644 --- a/gnomad_qc/v5/resources/annotations.py +++ b/gnomad_qc/v5/resources/annotations.py @@ -6,7 +6,10 @@ from gnomad_qc.v5.resources.constants import ( ANNOTATION_VERSIONS, CURRENT_ANNOTATION_VERSION, + CURRENT_VEP_ANNOTATION_VERSION, + DEFAULT_VEP_VERSION, GNOMAD_BUCKET, + VEP_ANNOTATION_VERSIONS, WORKSPACE_BUCKET, ) @@ -250,3 +253,55 @@ def get_info_ht(test: bool = False, environment: str = "rwb") -> VersionedTableR aou_annotated_sites_only_vcf = ( f"gs://{WORKSPACE_BUCKET}/echo_full_gnomad_annotated.sites-only.vcf.gz" ) + + +def get_vep( + test: bool = False, vep_version: str = DEFAULT_VEP_VERSION, environment: str = "rwb" +) -> VersionedTableResource: + """ + Get the gnomAD v5 VEP annotation VersionedTableResource. + + :param test: Whether to use a tmp path for analysis of the test Table instead of + the full v5 Table. + :param vep_version: VEP version to use (e.g., "105", "115"). Default is "105". + :param environment: Environment to use. Default is "rwb". Must be one of "rwb", "batch", or "dataproc". + :return: gnomAD v5 VEP VersionedTableResource. + """ + vep_version_postfix = "" if vep_version == DEFAULT_VEP_VERSION else vep_version + return VersionedTableResource( + CURRENT_VEP_ANNOTATION_VERSION[vep_version], + { + version: TableResource( + path=( + f"{_annotations_root(version, test=test, environment=environment)}/gnomad.genomes.v{version}.vep{vep_version_postfix}.ht" + ) + ) + for version in VEP_ANNOTATION_VERSIONS[vep_version] + }, + ) + + +def validate_vep_path( + test: bool = False, vep_version: str = DEFAULT_VEP_VERSION, environment: str = "rwb" +) -> VersionedTableResource: + """ + Get the gnomAD v5 VEP annotation VersionedTableResource for validation counts. + + :param test: Whether to use a tmp path for analysis of the test VDS instead of the + full v5 VDS. + :param vep_version: VEP version to use (e.g., "105", "115"). Default is "105". + :param environment: Environment to use. Default is "rwb". Must be one of "rwb", "batch", or "dataproc". + :return: gnomAD v5 VEP VersionedTableResource containing validity check. + """ + vep_version_postfix = "" if vep_version == DEFAULT_VEP_VERSION else vep_version + return VersionedTableResource( + CURRENT_VEP_ANNOTATION_VERSION[vep_version], + { + version: TableResource( + path=( + f"{_annotations_root(version, test=test, environment=environment)}/gnomad.genomes.v{version}.vep{vep_version_postfix}.validate.ht" + ) + ) + for version in VEP_ANNOTATION_VERSIONS[vep_version] + }, + ) diff --git a/gnomad_qc/v5/resources/constants.py b/gnomad_qc/v5/resources/constants.py index 691a6bf70..42909243a 100644 --- a/gnomad_qc/v5/resources/constants.py +++ b/gnomad_qc/v5/resources/constants.py @@ -22,6 +22,17 @@ AOU_LOW_QUALITY_PATH = f"gs://{AOU_BUCKET}/known_issues/wgs_v8_known_issue_1.txt" AOU_GENOMIC_METRICS_PATH = f"gs://{AOU_WGS_AUX_BUCKET}/qc/genomic_metrics.tsv" + +VRS_ANNOTATION_VERSIONS = ["5.0"] +CURRENT_VRS_ANNOTATION_VERSION = "5.0" +# TODO: review what should be used here for 5.0. +VEP_ANNOTATION_VERSIONS = {"105": ["5.0"], "115": ["5.0"]} +CURRENT_VEP_ANNOTATION_VERSION = {"105": "5.0", "115": "5.0"} + +# NOTE: The default VEP version is VEP 105. +DEFAULT_VEP_VERSION = "105" + + # Release constants. RELEASES = ["5.0"] CURRENT_RELEASE = RELEASES[-1] diff --git a/gnomad_qc/v5/resources/release.py b/gnomad_qc/v5/resources/release.py index 8c35b9f1b..95655f290 100644 --- a/gnomad_qc/v5/resources/release.py +++ b/gnomad_qc/v5/resources/release.py @@ -2,12 +2,13 @@ import logging -from gnomad.resources.grch38.gnomad import all_sites_an, coverage +from gnomad.resources.grch38.gnomad import all_sites_an, coverage, public_release from gnomad.resources.resource_utils import ( DataException, TableResource, VersionedTableResource, ) +from gnomad.utils.file_utils import file_exists from gnomad_qc.v5.resources.basics import qc_temp_prefix from gnomad_qc.v5.resources.constants import ( @@ -17,6 +18,7 @@ CURRENT_COVERAGE_RELEASE, CURRENT_RELEASE, GNOMAD_BUCKET, + RELEASES, WORKSPACE_BUCKET, ) @@ -24,6 +26,41 @@ logger = logging.getLogger("release_resources") logger.setLevel(logging.INFO) +FREQUENCY_README = """ +The 'freq' row annotation is an array that contains allele frequency information. Each element of the array is a struct that contains the alternate allele count (AC), alternate allele frequency (AF), total number of alleles (AN), and number of homozygous alternate individuals (homozygote_count) for a specific sample grouping. + +Use the 'freq_index_dict' global annotation to retrieve frequency information for a specific group of samples from the 'freq' array. This global annotation is a dictionary keyed by sample +grouping combinations whose values are the combination's index in the 'freq' array. + +The available key combinations for the 'freq_index_dict' are as follows: + +group, e.g. “adj”, “raw” +sex_group, e.g. “XX_adj” +subset_group, e.g. “non_aou_raw” +gen-anc_group, e.g. “afr_adj” +gen-anc_sex_group, e.g. “ami_XX_adj” +subset_gen-anc_group, e.g. “non_aou_sas_adj” +subset_sex_group, e.g. “non_aou_XY_adj” +subset_gen-anc_sex_group, e.g. “non_aou_mid_XX_adj”, +downsampling_group_gen-anc, e.g. “1373_afr_adj” + +The example below shows how to access the entry of the high quality genotypes +(group: adj) of XX individuals (sex: XX) labeled as AFR (gen_anc: AFR) +in the HT: + + # Use the key 'afr-XX-adj' to retrieve the index of this groups frequency data in 'freq' + ht = ht.annotate(afr_XX_freq=ht.freq[ht.freq_index_dict['afr-XX-adj']]) + +The above example will retrieve the entire frequency struct for each variant. To grab a +certain statistic, such as AC, specify the statistic after the value: + + ht = ht.annotate(afr_XX_AC=ht.freq[ht.freq_index_dict['afr-XX-adj']].AC) + +This same approach can be applied to the filtering allele frequency (FAF) array, 'faf', +by using the 'faf_index_dict'. For more information, please visit the FAQ page: +https://gnomad.broadinstitute.org/help#technical-details:~:text=How%20do%20I%20access%20the%20gnomAD%20Hail%20Table%20frequency%20annotation%3F +""" + def _release_root( version: str = CURRENT_RELEASE, @@ -194,3 +231,77 @@ def release_all_sites_an( for release in ALL_SITES_AN_RELEASES["genomes"] }, ) + + +def included_datasets_json_path( + test: bool = False, + release_version: str = CURRENT_RELEASE, + environment: str = "rwb", +) -> str: + """ + Fetch filepath for the JSON containing all datasets used in the release. + + :param test: Whether to use a tmp path for testing. Default is False. + :param release_version: Release version. Default is CURRENT_RELEASE. + :param environment: Environment to use. Default is "rwb". + :return: File path for release versions included datasets JSON + """ + return f"{_release_root(release_version, test=test, data_type='genomes', extension='json', environment=environment)}/gnomad.genomes.v{release_version}.included_datasets.json" + + +def release_ht_path( + release_version: str = CURRENT_RELEASE, + public: bool = True, + test: bool = False, + environment: str = "rwb", +) -> str: + """ + Fetch filepath for release (variant-only) Hail Tables. + + :param release_version: Release version. Default is CURRENT_RELEASE. + :param public: Whether release sites Table path returned is from public instead of private + bucket. Default is True. + :param test: Whether to use a tmp path for testing. Default is False. + :param environment: Environment to use. Default is "rwb". + :return: File path for desired release Hail Table. + """ + if public: + res = public_release(data_type="genomes").versions + if release_version in res and file_exists(res[release_version].path): + return res[release_version].path + else: + return f"gs://gnomad-public-requester-pays/release/{release_version}/ht/genomes/gnomad.genomes.v{release_version}.sites.ht" + else: + # Note: This function was not used to write out the v4.0 joint release HT. That + # is gs://gnomad/release/4.0/ht/joint/gnomad.joint.v4.0.faf.filtered.ht + return f"{_release_root(version=release_version, test=test, data_type='genomes', environment=environment)}/gnomad.genomes.v{release_version}.sites.ht" + + +def release_sites( + public: bool = True, + test: bool = False, + environment: str = "rwb", +) -> VersionedTableResource: + """ + Retrieve versioned resource for sites-only release Table. + + :param public: Whether release sites Table path returned is from public or private + bucket. Default is True. + :param test: Whether to use a tmp path for testing. Default is False. + :param environment: Environment to use. Default is "rwb". + :return: Sites-only release Table. + """ + return VersionedTableResource( + default_version=CURRENT_RELEASE, + versions={ + release: TableResource( + path=release_ht_path( + release_version=release, + public=public, + test=test, + environment=environment, + ) + ) + for release in RELEASES + }, + )