22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489 | class DataImporter:
"""
A class for importing, cleaning, and preprocessing multi-omic data for downstream analysis,
including support for incorporating graph-based features from protein-protein interaction networks.
Attributes:
path (str): The base directory path where data is stored.
data_types (list[str]): A list of data modalities to import (e.g., 'rna', 'methylation').
log_transform (bool): If True, apply log transformation to the data.
concatenate (bool): If True, concatenate features from different modalities.
restrict_to_features (path): path to file that includes user-specific list of features (default: None)
min_features (int): The minimum number of features to retain after filtering.
top_percentile (float): The top percentile of features to retain based on variance.
correlation_threshold(float): The correlation threshold for dropping highly redundant features
variance_threshold (float): The variance threshold for removing low-variance features.
na_threshold (float): The threshold for removing features with too many NA values.
string_organism (int): STRING organism (species) id (default: 9606 (human)).
string_node_name (str): The type of node names used in the graph. Available options: "gene_name", "gene_id" (default: "gene_name").
Methods:
import_data():
The primary method to orchestrate the data import and preprocessing workflow. It follows these steps:
1. Validates the presence of required data files in training and testing directories.
2. Imports data using `read_data` for both training and testing sets.
3. Cleans and preprocesses the data through `cleanup_data`.
4. Processes data to align features and samples across modalities using `process_data`.
5. Harmonizes training and testing datasets to have the same features using `harmonize`.
6. Optionally applies log transformation.
7. Normalizes the data.
8. Encodes labels and prepares PyTorch datasets.
9. Returns PyTorch datasets for training and testing.
validate_data_folders(training_path, testing_path):
Checks for the presence of required data files in specified directories.
read_data(folder_path):
Reads and imports data files for a given modality from a specified folder.
cleanup_data(df_dict):
Cleans dataframes by removing low-variance features, imputing missing values,
removing uninformative featuers (too many NA values).
process_data(data, split='train'):
Prepares the data for model input by cleaning, filtering, and selecting features and samples.
select_features(dat):
Implements an unsupervised feature selection by ranking features by the Laplacian score, keeping the features at
the top percentile range and removing highly redundant features (optional) based on a correlation threshold,
while keeping a minimum number of top features as requested by the user.
harmonize(dat1, dat2):
Aligns the feature sets of two datasets (e.g., training and testing) to have the same features.
transform_data(data):
Applies log transformation to the data matrices.
normalize_data(data, scaler_type="standard", fit=True):
Applies normalization to the data matrices.
get_labels(dat, ann):
Aligns and subsets annotations to match the samples present in the data matrices.
get_torch_dataset(dat, ann, samples, feature_ann):
Prepares and returns PyTorch datasets for the imported and processed data.
encode_labels(df):
Encodes categorical labels in the annotation dataframe.
"""
def __init__(self, path, data_types, processed_dir="processed", log_transform = False, concatenate = False, restrict_to_features = None, min_features=None,
top_percentile=20, correlation_threshold = 0.9, variance_threshold=0.01, na_threshold=0.1, downsample=0):
self.path = path
self.data_types = data_types
self.processed_dir = os.path.join(self.path, processed_dir)
self.concatenate = concatenate
self.min_features = min_features
self.top_percentile = top_percentile
self.correlation_threshold = correlation_threshold
self.variance_threshold = variance_threshold
self.na_threshold = na_threshold
self.log_transform = log_transform
# Initialize a dictionary to store the label encoders
self.encoders = {} # used if labels are categorical
# initialize data scalers
self.scalers = None
# initialize data transformers
self.transformers = None
self.downsample = downsample
# read user-specified feature list to restrict the analysis to that
self.restrict_to_features = restrict_to_features
self.get_user_features()
# for each feature in the input training data; keep a log of what happens to the feature
# record metrics such as laplacian score, variance
# record if the feature is dropped due to these metrics or due to high correlation to a
# higher ranking feature
self.feature_logs = {}
def get_user_features(self):
"""
Load and process user-specified features from a file.
"""
if self.restrict_to_features is not None:
if not os.path.isfile(self.restrict_to_features):
raise FileNotFoundError(f"File not found: {self.restrict_to_features}")
try:
with open(self.restrict_to_features, 'r') as fp:
# Read and process the file
feature_list = [x.strip() for x in fp.read().splitlines() if x.strip()]
# Ensure uniqueness and assign
self.restrict_to_features = np.unique(feature_list)
except Exception as e:
print(f"An error occurred while processing the file: {e}")
else:
self.restrict_to_features = None
def import_data(self):
print("\n[INFO] ================= Importing Data =================")
training_path = os.path.join(self.path, 'train')
testing_path = os.path.join(self.path, 'test')
self.validate_data_folders(training_path, testing_path)
# raw data matrices as exists in the data path
train_dat = self.read_data(training_path)
test_dat = self.read_data(testing_path)
if self.downsample > 0:
print("[INFO] Randomly drawing",self.downsample,"samples for training")
train_dat = self.subsample(train_dat, self.downsample)
if self.restrict_to_features is not None:
train_dat = self.filter_by_features(train_dat, self.restrict_to_features)
test_dat = self.filter_by_features(test_dat, self.restrict_to_features)
# check for any problems with the the input files
self.validate_input_data(train_dat, test_dat)
# cleanup uninformative features/samples, subset annotation data, do feature selection on training data
train_dat, train_ann, train_samples, train_features = self.process_data(train_dat, split = 'train')
test_dat, test_ann, test_samples, test_features = self.process_data(test_dat, split = 'test')
# harmonize feature sets in train/test
train_dat, test_dat = self.harmonize(train_dat, test_dat)
train_feature_ann = {}
test_feature_ann = {}
# log_transform
if self.log_transform:
print("[INFO] transforming data to log scale")
train_dat = self.transform_data(train_dat)
test_dat = self.transform_data(test_dat)
# Normalize the training data (for testing data, use normalisation factors
# learned from training data to apply on test data (see fit = False)
train_dat = self.normalize_data(train_dat, scaler_type="standard", fit=True)
test_dat = self.normalize_data(test_dat, scaler_type="standard", fit=False)
# encode the variable annotations, convert data matrices and annotations pytorch datasets
training_dataset = self.get_torch_dataset(train_dat, train_ann, train_samples, train_feature_ann)
testing_dataset = self.get_torch_dataset(test_dat, test_ann, test_samples, test_feature_ann)
# NOTE: Exporting to the disk happens in get_torch_dataset, so the concatenate doesn't work.
# TODO: Find better way for early integration, or move it to get_torch_dataset. Otherwise it will be ignored.
# for early fusion, concatenate all data matrices and feature lists
if self.concatenate:
training_dataset.dat = {'all': torch.cat([training_dataset.dat[x] for x in training_dataset.dat.keys()], dim = 1)}
training_dataset.features = {'all': list(chain(*training_dataset.features.values()))}
testing_dataset.dat = {'all': torch.cat([testing_dataset.dat[x] for x in testing_dataset.dat.keys()], dim = 1)}
testing_dataset.features = {'all': list(chain(*testing_dataset.features.values()))}
print("[INFO] Training Data Stats: ", training_dataset.get_dataset_stats())
print("[INFO] Test Data Stats: ", testing_dataset.get_dataset_stats())
print("[INFO] Merging Feature Logs...")
logs = self.feature_logs
self.feature_logs = {x: pd.merge(logs['cleanup'][x],
logs['select_features'][x],
on = 'feature', how = 'outer',
suffixes=['_cleanup', '_laplacian']) for x in self.data_types}
print("[INFO] Data import successful.")
return training_dataset, testing_dataset
def validate_data_folders(self, training_path, testing_path):
print("[INFO] Validating data folders...")
training_files = set(os.listdir(training_path))
testing_files = set(os.listdir(testing_path))
required_files = {'clin.csv'} | {f"{dt}.csv" for dt in self.data_types}
if not required_files.issubset(training_files):
missing_files = required_files - training_files
raise ValueError(f"Missing files in training folder: {', '.join(missing_files)}")
if not required_files.issubset(testing_files):
missing_files = required_files - testing_files
raise ValueError(f"Missing files in testing folder: {', '.join(missing_files)}")
def read_data(self, folder_path):
data = {}
required_files = {'clin.csv'} | {f"{dt}.csv" for dt in self.data_types}
print("\n[INFO] ----------------- Reading Data ----------------- ")
for file in required_files:
file_path = os.path.join(folder_path, file)
file_name = os.path.splitext(file)[0]
print(f"[INFO] Importing {file_path}...")
data[file_name] = pd.read_csv(file_path, index_col=0)
return data
# randomly draw N samples; return subset of dat (output of read_data)
def subsample(self, dat, N):
clin = dat['clin'].sample(N)
dat_sub = {x: dat[x][clin.index] for x in self.data_types}
dat_sub['clin'] = clin
return dat_sub
def filter_by_features(self, dat, features):
"""
If the user has provided list of features to restrict the analysis to,
subset train/test data to only include those features
"""
dat_filtered = {
key: df if key == "clin" else df.loc[df.index.intersection(features)]
for key, df in dat.items()
}
print("[INFO] The initial features are filtered to include user-provided features only")
for key, df in dat_filtered.items():
remaining_features = len(df.index)
print(f"In layer '{key}', {remaining_features} features are remaining after filtering.")
return dat_filtered
def process_data(self, data, split = 'train'):
print(f"\n[INFO] ----------------- Processing Data ({split}) ----------------- ")
# remove uninformative features and samples with no information (from data matrices)
dat = self.cleanup_data({x: data[x] for x in self.data_types})
ann = data['clin']
dat, ann, samples = self.get_labels(dat, ann)
# do feature selection: only applied to training data
if split == 'train':
if self.top_percentile:
dat = self.select_features(dat)
features = {x: dat[x].index for x in dat.keys()}
return dat, ann, samples, features
def cleanup_data(self, df_dict):
print("\n[INFO] ----------------- Cleaning Up Data ----------------- ")
cleaned_dfs = {}
sample_masks = []
feature_logs = {} # keep track of feature variation/NA value scores
# First pass: remove near-zero-variation features and create masks for informative samples
for key, df in df_dict.items():
print("\n[INFO] working on layer: ",key)
original_features_count = df.shape[0]
# Compute variances and NA percentages for each feature in the DataFrame
feature_variances = df.var(axis=1)
na_percentages = df.isna().mean(axis=1)
# Combine variances and NA percentages into a single DataFrame for logging
log_df = pd.DataFrame({ 'feature': df.index, 'na_percent': na_percentages, 'variance': feature_variances, 'selected': False})
# Filter based on both variance and NA percentage thresholds
# Identify features that meet both criteria
df = df.loc[(feature_variances > feature_variances.quantile(self.variance_threshold)) & (na_percentages < self.na_threshold)]
# set selected features to True
log_df['selected'] = (log_df['variance'] > feature_variances.quantile(self.variance_threshold)) & (log_df['na_percent'] < self.na_threshold)
feature_logs[key] = log_df
# Step 3: Fill NA values with the median of the feature
# Check if there are any NA values in the DataFrame
if np.sum(df.isna().sum()) > 0:
missing_rows = df.isna().any(axis=1)
print("[INFO] Imputing NA values to median of features, affected # of cells in the matrix", np.sum(df.isna().sum()), " # of rows:",sum(missing_rows))
# Calculate medians for each 'column' (originally rows) and fill NAs
# Note: After transposition, operations are more efficient
df_T = df.T
medians_T = df_T.median(axis=0)
df_T.fillna(medians_T, inplace=True)
df = df_T.T
print("[INFO] Number of NA values: ",np.sum(df.isna().sum()))
removed_features_count = original_features_count - df.shape[0]
print(f"[INFO] DataFrame {key} - Removed {removed_features_count} features.")
# Step 2: Create masks for informative samples
# Compute standard deviation of samples (along columns)
sample_stdevs = df.std(axis=0)
# Create mask for samples that do not have std dev of 0 or NaN
mask = np.logical_and(sample_stdevs != 0, np.logical_not(np.isnan(sample_stdevs)))
sample_masks.append(mask)
cleaned_dfs[key] = df
# Find samples that are informative in all dataframes
common_mask = pd.DataFrame(sample_masks).all()
# Second pass: apply common mask to all dataframes
for key in cleaned_dfs.keys():
original_samples_count = cleaned_dfs[key].shape[1]
cleaned_dfs[key] = cleaned_dfs[key].loc[:, common_mask]
removed_samples_count = original_samples_count - cleaned_dfs[key].shape[1]
print(f"[INFO] DataFrame {key} - Removed {removed_samples_count} samples ({removed_samples_count / original_samples_count * 100:.2f}%).")
# update feature logs from this process
self.feature_logs['cleanup'] = feature_logs
return cleaned_dfs
def get_labels(self, dat, ann):
# subset samples and reorder annotations for the samples
samples = list(reduce(set.intersection, [set(item) for item in [dat[x].columns for x in dat.keys()]]))
samples = list(set(ann.index).intersection(samples))
dat = {x: dat[x][samples] for x in dat.keys()}
ann = ann.loc[samples]
return dat, ann, samples
# unsupervised feature selection using laplacian score and correlation filters (optional)
def select_features(self, dat):
counts = {x: max(int(dat[x].shape[0] * self.top_percentile / 100), self.min_features) for x in dat.keys()}
dat_filtered = {}
feature_logs = {} # feature log for each layer
for layer in dat.keys():
# filter features in the layer and keep a log of filtering process; notice we provide a transposed matrix
X_filt, log_df = filter_by_laplacian(X = dat[layer].T, layer = layer,
topN=counts[layer], correlation_threshold = self.correlation_threshold)
dat_filtered[layer] = X_filt.T # transpose after laplacian filtering again
feature_logs[layer] = log_df
# update main feature logs with events from this function
self.feature_logs['select_features'] = feature_logs
return dat_filtered
def harmonize(self, dat1, dat2):
print("\n[INFO] ----------------- Harmonizing Data Sets ----------------- ")
# Get common features
common_features = {x: dat1[x].index.intersection(dat2[x].index) for x in self.data_types}
# Subset both datasets to only include common features
dat1 = {x: dat1[x].loc[common_features[x]] for x in dat1.keys()}
dat2 = {x: dat2[x].loc[common_features[x]] for x in dat2.keys()}
print("\n[INFO] ----------------- Finished Harmonizing ----------------- ")
return dat1, dat2
def transform_data(self, data):
transformed_data = {x: np.log1p(data[x].T).T for x in data.keys()}
return transformed_data
def normalize_data(self, data, scaler_type="standard", fit=True):
print("\n[INFO] ----------------- Normalizing Data ----------------- ")
# notice matrix transpositions during fit and finally after transformation
# because data matrices have features on rows,
# while scaling methods assume features to be on the columns.
if fit:
if scaler_type == "standard":
self.scalers = {x: StandardScaler().fit(data[x].T) for x in data.keys()}
elif scaler_type == "min_max":
self.scalers = {x: MinMaxScaler().fit(data[x].T) for x in data.keys()}
else:
raise ValueError("Invalid scaler_type. Choose 'standard' or 'min_max'.")
normalized_data = {x: pd.DataFrame(self.scalers[x].transform(data[x].T),
index=data[x].columns,
columns=data[x].index).T
for x in data.keys()}
return normalized_data
def get_torch_dataset(self, dat, ann, samples, feature_ann):
features = {x: dat[x].index for x in dat.keys()}
dat = {x: torch.from_numpy(np.array(dat[x].T)).float() for x in dat.keys()}
ann, variable_types, label_mappings = self.encode_labels(ann)
# Convert DataFrame to tensor
ann = {col: torch.from_numpy(ann[col].values) for col in ann.columns}
return MultiOmicDataset(dat, ann, variable_types, features, samples, label_mappings)
def encode_labels(self, df):
label_mappings = {}
def encode_column(series):
nonlocal label_mappings # Declare as nonlocal so that we can modify it
# Fill NA values with 'missing'
# series = series.fillna('missing')
if series.name not in self.encoders:
self.encoders[series.name] = OrdinalEncoder(handle_unknown='use_encoded_value', unknown_value=-1)
encoded_series = self.encoders[series.name].fit_transform(series.to_frame())
else:
encoded_series = self.encoders[series.name].transform(series.to_frame())
# also save label mappings
label_mappings[series.name] = {
int(code): label for code, label in enumerate(self.encoders[series.name].categories_[0])
}
return encoded_series.ravel()
# Select only the categorical columns
df_categorical = df.select_dtypes(include=['object', 'category']).apply(encode_column)
# Combine the encoded categorical data with the numerical data
df_encoded = pd.concat([df.select_dtypes(exclude=['object', 'category']), df_categorical], axis=1)
# Store the variable types
variable_types = {col: 'categorical' for col in df_categorical.columns}
variable_types.update({col: 'numerical' for col in df.select_dtypes(exclude=['object', 'category']).columns})
return df_encoded, variable_types, label_mappings
def validate_input_data(self, train_dat, test_dat):
print("\n[INFO] ----------------- Checking for problems with the input data ----------------- ")
errors = []
warnings = []
def check_rownames(dat, split):
# Check 1: Validate first columns are unique
for file_name, df in dat.items():
if not df.index.is_unique:
identifier_type = "Sample labels" if file_name == 'clin' else "Feature names"
errors.append(f"Error in {split}/{file_name}.csv: {identifier_type} in the first column must be unique.")
def check_sample_labels(dat, split):
clin_samples = set(dat['clin'].index)
for file_name, df in dat.items():
if file_name != 'clin':
omics_samples = set(df.columns)
matching_samples = clin_samples.intersection(omics_samples)
if not matching_samples:
errors.append(f"Error: No matching sample labels found between {split}/clin.csv and {split}/{file_name}.csv.")
elif len(matching_samples) < len(clin_samples):
missing_samples = clin_samples - matching_samples
warnings.append(f"Warning: Some sample labels in {split}/clin.csv are missing in {split}/{file_name}.csv: {missing_samples}")
def check_common_features(train_dat, test_dat):
for file_name in train_dat:
if file_name != 'clin' and file_name in test_dat:
train_features = set(train_dat[file_name].index)
test_features = set(test_dat[file_name].index)
common_features = train_features.intersection(test_features)
if not common_features:
errors.append(f"Error: No common features found between train/{file_name}.csv and test/{file_name}.csv.")
check_rownames(train_dat, 'train')
check_rownames(test_dat, 'test')
check_sample_labels(train_dat, 'train')
check_sample_labels(test_dat, 'test')
check_common_features(train_dat, test_dat)
# Handle errors and warnings
if warnings:
print("\n[WARNING] Warnings:\n")
for i, warning in enumerate(warnings, 1):
print(f"[WARNING] {i}. {warning}")
if errors:
print("[INFO] Found problems with the input data:\n")
for i, error in enumerate(errors, 1):
print(f"[ERROR] {i}. {error}")
raise Exception("[ERROR] Please correct the above errors and try again.")
if not warnings and not errors:
print("[INFO] Data structure is valid with no errors or warnings.")
|