File size: 14,519 Bytes
3772986 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 |
"""
Geneformer in silico perturber stats generator.
Usage:
from geneformer import InSilicoPerturberStats
ispstats = InSilicoPerturberStats(mode="goal_state_shift",
combos=0,
anchor_gene=None,
cell_states_to_model={"disease":(["dcm"],["ctrl"],["hcm"])})
ispstats.get_stats("path/to/input_data",
None,
"path/to/output_directory",
"output_prefix")
"""
import os
import logging
import numpy as np
import pandas as pd
import pickle
import statsmodels.stats.multitest as smt
from pathlib import Path
from scipy.stats import ranksums
from tqdm.notebook import trange
from .tokenizer import TOKEN_DICTIONARY_FILE
GENE_NAME_ID_DICTIONARY_FILE = Path(__file__).parent / "gene_name_id_dict.pkl"
logger = logging.getLogger(__name__)
# invert dictionary keys/values
def invert_dict(dictionary):
return {v: k for k, v in dictionary.items()}
# read raw dictionary files
def read_dictionaries(dir, cell_or_gene_emb):
dict_list = []
for file in os.listdir(dir):
# process only _raw.pickle files
if file.endswith("_raw.pickle"):
with open(f"{dir}/{file}", "rb") as fp:
cos_sims_dict = pickle.load(fp)
if cell_or_gene_emb == "cell":
cell_emb_dict = {k: v for k,
v in cos_sims_dict.items() if v and "cell_emb" in k}
dict_list += [cell_emb_dict]
return dict_list
# get complete gene list
def get_gene_list(dict_list):
gene_set = set()
for dict_i in dict_list:
gene_set.update([k[0] for k, v in dict_i.items() if v])
gene_list = list(gene_set)
gene_list.sort()
return gene_list
def n_detections(token, dict_list):
cos_sim_megalist = []
for dict_i in dict_list:
cos_sim_megalist += dict_i.get((token, "cell_emb"),[])
return len(cos_sim_megalist)
def get_fdr(pvalues):
return list(smt.multipletests(pvalues, alpha=0.05, method="fdr_bh")[1])
def isp_stats(cos_sims_df, dict_list, cell_states_to_model):
random_tuples = []
for i in trange(cos_sims_df.shape[0]):
token = cos_sims_df["Gene"][i]
for dict_i in dict_list:
random_tuples += dict_i.get((token, "cell_emb"),[])
goal_end_random_megalist = [goal_end for goal_end,alt_end,start_state in random_tuples]
alt_end_random_megalist = [alt_end for goal_end,alt_end,start_state in random_tuples]
start_state_random_megalist = [start_state for goal_end,alt_end,start_state in random_tuples]
# downsample to improve speed of ranksums
if len(goal_end_random_megalist) > 100_000:
random.seed(42)
goal_end_random_megalist = random.sample(goal_end_random_megalist, k=100_000)
if len(alt_end_random_megalist) > 100_000:
random.seed(42)
alt_end_random_megalist = random.sample(alt_end_random_megalist, k=100_000)
if len(start_state_random_megalist) > 100_000:
random.seed(42)
start_state_random_megalist = random.sample(start_state_random_megalist, k=100_000)
names=["Gene",
"Gene_name",
"Ensembl_ID",
"Shift_from_goal_end",
"Shift_from_alt_end",
"Goal_end_vs_random_pval",
"Alt_end_vs_random_pval"]
cos_sims_full_df = pd.DataFrame(columns=names)
for i in trange(cos_sims_df.shape[0]):
token = cos_sims_df["Gene"][i]
name = cos_sims_df["Gene_name"][i]
ensembl_id = cos_sims_df["Ensembl_ID"][i]
token_tuples = []
for dict_i in dict_list:
token_tuples += dict_i.get((token, "cell_emb"),[])
goal_end_cos_sim_megalist = [goal_end for goal_end,alt_end,start_state in token_tuples]
alt_end_cos_sim_megalist = [alt_end for goal_end,alt_end,start_state in token_tuples]
mean_goal_end = np.mean(goal_end_cos_sim_megalist)
mean_alt_end = np.mean(alt_end_cos_sim_megalist)
pval_goal_end = ranksums(goal_end_random_megalist,goal_end_cos_sim_megalist).pvalue
pval_alt_end = ranksums(alt_end_random_megalist,alt_end_cos_sim_megalist).pvalue
data_i = [token,
name,
ensembl_id,
mean_goal_end,
mean_alt_end,
pval_goal_end,
pval_alt_end]
cos_sims_df_i = pd.DataFrame(dict(zip(names,data_i)),index=[i])
cos_sims_full_df = pd.concat([cos_sims_full_df,cos_sims_df_i])
cos_sims_full_df["Goal_end_FDR"] = get_fdr(list(cos_sims_full_df["Goal_end_vs_random_pval"]))
cos_sims_full_df["Alt_end_FDR"] = get_fdr(list(cos_sims_full_df["Alt_end_vs_random_pval"]))
return cos_sims_full_df
def isp_stats_vs_null(cos_sims_df, dict_list, null_dict_list):
cos_sims_full_df = cos_sims_df.copy()
# I think pre-initializing is faster than concatenating
cos_sims_full_df["Shift_avg"] = np.empty(cos_sims_df.shape[0], dtype=float)
cos_sims_full_df["Shift_pval"] = np.empty(cos_sims_df.shape[0], dtype=float)
cos_sims_full_df["Null_avg"] = np.empty(cos_sims_df.shape[0], dtype=float)
cos_sims_full_df["N_Detections"] = np.empty(cos_sims_df.shape[0], dtype="uint_32")
cos_sims_full_df["N_Detections_null"] = np.empty(cos_sims_df.shape[0], dtype="uint_32")
for i in trange(cos_sims_df.shape[0]):
token = cos_sims_df["Gene"][i]
name = cos_sims_df["Gene_name"][i]
ensembl_id = cos_sims_df["Ensembl_ID"][i]
token_shifts = []
null_shifts = []
for dict_i in dict_list:
token_tuples += dict_i.get((token, "cell_emb"),[])
for dict_i in null_dict_list:
null_tuples += dict_i.get((token, "cell_emb"),[])
cos_sims_full_df.loc[i, "Shift_pvalue"] = ranksums(token_shifts,
null_shifts, nan_policy="omit").pvalue
cos_sims_full_df.loc[i, "Shift_avg"] = np.mean(token_shifts)
cos_sims_full_df.loc[i, "Null_avg"] = np.mean(null_shifts)
cos_sims_full_df.loc[i, "N_Detections"] = len(token_shifts)
cos_sims_full_df.loc[i, "N_Detections_null"] = len(null_shifts)
cos_sims_full_df["Shift_FDR"] = get_fdr(cos_sims_full_df["Shift_pvalue"])
return cos_sims_full_df
class InSilicoPerturberStats:
valid_option_dict = {
"mode": {"goal_state_shift","vs_null","vs_random"},
"combos": {0,1,2},
"anchor_gene": {None, str},
"cell_states_to_model": {None, dict},
}
def __init__(
self,
mode="vs_random",
combos=0,
anchor_gene=None,
cell_states_to_model=None,
token_dictionary_file=TOKEN_DICTIONARY_FILE,
gene_name_id_dictionary_file=GENE_NAME_ID_DICTIONARY_FILE,
):
"""
Initialize in silico perturber stats generator.
Parameters
----------
mode : {"goal_state_shift","vs_null","vs_random"}
Type of stats.
"goal_state_shift": perturbation vs. random for desired cell state shift
"vs_null": perturbation vs. null from provided null distribution dataset
"vs_random": perturbation vs. random gene perturbations in that cell (no goal direction)
combos : {0,1,2}
Whether to perturb genes individually (0), in pairs (1), or in triplets (2).
anchor_gene : None, str
ENSEMBL ID of gene to use as anchor in combination perturbations.
For example, if combos=1 and anchor_gene="ENSG00000148400":
anchor gene will be perturbed in combination with each other gene.
cell_states_to_model: None, dict
Cell states to model if testing perturbations that achieve goal state change.
Single-item dictionary with key being cell attribute (e.g. "disease").
Value is tuple of three lists indicating start state, goal end state, and alternate possible end states.
token_dictionary_file : Path
Path to pickle file containing token dictionary (Ensembl ID:token).
gene_name_id_dictionary_file : Path
Path to pickle file containing gene name to ID dictionary (gene name:Ensembl ID).
"""
self.mode = mode
self.combos = combos
self.anchor_gene = anchor_gene
self.cell_states_to_model = cell_states_to_model
self.validate_options()
# load token dictionary (Ensembl IDs:token)
with open(token_dictionary_file, "rb") as f:
self.gene_token_dict = pickle.load(f)
# load gene name dictionary (gene name:Ensembl ID)
with open(gene_name_id_dictionary_file, "rb") as f:
self.gene_name_id_dict = pickle.load(f)
if anchor_gene is None:
self.anchor_token = None
else:
self.anchor_token = self.gene_token_dict[self.anchor_gene]
def validate_options(self):
for attr_name,valid_options in self.valid_option_dict.items():
attr_value = self.__dict__[attr_name]
if type(attr_value) not in {list, dict}:
if attr_value in valid_options:
continue
valid_type = False
for option in valid_options:
if (option in [int,list,dict]) and isinstance(attr_value, option):
valid_type = True
break
if valid_type:
continue
logger.error(
f"Invalid option for {attr_name}. " \
f"Valid options for {attr_name}: {valid_options}"
)
raise
if self.cell_states_to_model is not None:
if (len(self.cell_states_to_model.items()) == 1):
for key,value in self.cell_states_to_model.items():
if (len(value) == 3) and isinstance(value, tuple):
if isinstance(value[0],list) and isinstance(value[1],list) and isinstance(value[2],list):
if len(value[0]) == 1 and len(value[1]) == 1:
all_values = value[0]+value[1]+value[2]
if len(all_values) == len(set(all_values)):
continue
else:
logger.error(
"Cell states to model must be a single-item dictionary with " \
"key being cell attribute (e.g. 'disease') and value being " \
"tuple of three lists indicating start state, goal end state, and alternate possible end states. " \
"Values should all be unique. " \
"For example: {'disease':(['start_state'],['ctrl'],['alt_end'])}")
raise
if self.anchor_gene is not None:
self.anchor_gene = None
logger.warning(
"anchor_gene set to None. " \
"Currently, anchor gene not available " \
"when modeling multiple cell states.")
def get_stats(self,
input_data_directory,
null_dist_data_directory,
output_directory,
output_prefix):
"""
Get stats for in silico perturbation data and save as results in output_directory.
Parameters
----------
input_data_directory : Path
Path to directory containing cos_sim dictionary inputs
null_dist_data_directory : Path
Path to directory containing null distribution cos_sim dictionary inputs
output_directory : Path
Path to directory where perturbation data will be saved as .csv
output_prefix : str
Prefix for output .dataset
"""
if self.mode not in ["goal_state_shift", "vs_null"]:
logger.error(
"Currently, only modes available are stats for goal_state_shift \
and comparing vs a null distribution.")
raise
self.gene_token_id_dict = invert_dict(self.gene_token_dict)
self.gene_id_name_dict = invert_dict(self.gene_name_id_dict)
# obtain total gene list
gene_list = get_gene_list(dict_list)
# initiate results dataframe
cos_sims_df_initial = pd.DataFrame({"Gene": gene_list,
"Gene_name": [self.token_to_gene_name(item) \
for item in gene_list], \
"Ensembl_ID": [self.gene_token_id_dict[genes[1]] \
if isinstance(genes,tuple) else \
self.gene_token_id_dict[genes] \
for genes in gene_list]}, \
index=[i for i in range(len(gene_list))])
dict_list = read_dictionaries(input_data_directory, "cell")
if self.mode == "goal_state_shift":
cos_sims_df = isp_stats(cos_sims_df_initial, dict_list, self.cell_states_to_model)
# quantify number of detections of each gene
cos_sims_df["N_Detections"] = [n_detections(i, dict_list) for i in cos_sims_df["Gene"]]
# sort by shift to desired state
cos_sims_df = cos_sims_df.sort_values(by=["Shift_from_goal_end",
"Goal_end_FDR"])
elif self.mode == "vs_null":
dict_list = read_dictionaries(input_data_directory, "cell")
null_dict_list = read_dictionaries(null_dist_data_directory, "cell")
cos_sims_df = isp_stats_vs_null(cos_sims_df_initial, dict_list,
null_dict_list)
# save perturbation stats to output_path
output_path = (Path(output_directory) / output_prefix).with_suffix(".csv")
cos_sims_df.to_csv(output_path)
def token_to_gene_name(self, item):
if isinstance(item,int):
return self.gene_id_name_dict.get(self.gene_token_id_dict.get(item, np.nan), np.nan)
if isinstance(item,tuple):
return tuple([self.gene_id_name_dict.get(self.gene_token_id_dict.get(i, np.nan), np.nan) for i in item])
|