|
|
|
|
|
from dataclasses import dataclass |
|
import numpy as np |
|
import scipy.linalg as la |
|
from scipy.signal import find_peaks |
|
from math import ceil |
|
|
|
|
|
|
|
|
|
def thin_peaks(peak_list, dmin=10, voxel_size=(1,1,1), return_larger_peaks=False): |
|
""" |
|
Remove peaks within a specified distance of each other, retaining the peak with the highest intensity. |
|
|
|
Args: |
|
- peak_list (list of PeakData): Each element contains: |
|
- pos (list of float): 3D coordinates of the peak. |
|
- intensity (float): The intensity value of the peak. |
|
- key (tuple): A unique identifier or index for the peak (#trace, #peak) |
|
- dmin (float, optional): Minimum distance between peaks. peaks closer than this threshold will be thinned. Defaults to 10. |
|
- return_larger_peaks (bool, optional): Indicate larger peak for each thinned peak |
|
|
|
Returns: |
|
- list of tuples: A list containing keys of the removed peaks. |
|
if return_larger_peaks |
|
- list of tuples: A list containing the keys of the larger peak causing the peak to be removed |
|
|
|
Notes: |
|
- The function uses the L2 norm (Euclidean distance) to compute the distance between peaks. |
|
- When two peaks are within `dmin` distance, the peak with the lower intensity is removed. |
|
""" |
|
removed_peaks = [] |
|
removed_larger_peaks = [] |
|
for i in range(len(peak_list)): |
|
if peak_list[i].key in removed_peaks: |
|
continue |
|
for j in range(len(peak_list)): |
|
if i==j: |
|
continue |
|
if peak_list[j].key in removed_peaks: |
|
continue |
|
d = (np.array(peak_list[i].pos) - np.array(peak_list[j].pos))*np.array(voxel_size) |
|
d = la.norm(d) |
|
if d<dmin: |
|
hi = peak_list[i].intensity |
|
hj = peak_list[j].intensity |
|
if hi<hj: |
|
removed_peaks.append(peak_list[i].key) |
|
removed_larger_peaks.append(peak_list[j].key) |
|
break |
|
else: |
|
removed_peaks.append(peak_list[j].key) |
|
removed_larger_peaks.append(peak_list[i].key) |
|
|
|
if return_larger_peaks: |
|
return removed_peaks, removed_larger_peaks |
|
else: |
|
return removed_peaks |
|
|
|
|
|
@dataclass |
|
class CellData(object): |
|
"""Represents data related to a single cell. |
|
|
|
Attributes: |
|
pathdata_list (list): A list of PathData objects representing the various paths associated with the cell. |
|
""" |
|
pathdata_list: list |
|
|
|
@dataclass |
|
class RemovedPeakData(object): |
|
"""Represents data related to a removed peak |
|
|
|
Attributes: |
|
idx (int): Index of peak along path |
|
screening_peak (tuple): (path_idx, position along path) for screening peak |
|
""" |
|
idx: int |
|
screening_peak: tuple |
|
|
|
@dataclass |
|
class PathData(object): |
|
"""Represents data related to a specific path in the cell. |
|
|
|
This dataclass encapsulates information about the peaks, |
|
the defining points, the fluorescence values, and the path length of a specific path. |
|
|
|
Attributes: peaks (list): List of peaks in the path (indicies of positions in points, o_intensity). |
|
removed_peaks (list): List of peaks in the path which have been removed because of a nearby larger peak |
|
points (list): List of points defining the path. |
|
o_intensity (list): List of (unnormalized) fluorescence intensity values along the path |
|
SC_length (float): Length of the path. |
|
|
|
""" |
|
peaks: list |
|
removed_peaks: list |
|
points: list |
|
o_intensity: list |
|
SC_length: float |
|
|
|
@dataclass |
|
class PeakData(object): |
|
pos: tuple |
|
intensity: float |
|
key: tuple |
|
|
|
|
|
def find_peaks2(v, distance=5, prominence=0.5): |
|
""" |
|
Find peaks in a 1D array with extended boundary handling. |
|
|
|
The function pads the input array at both ends to handle boundary peaks. It then identifies peaks in the extended array |
|
and maps them back to the original input array. |
|
|
|
Args: |
|
- v (numpy.ndarray): 1D input array in which to find peaks. |
|
- distance (int, optional): Minimum number of array elements that separate two peaks. Defaults to 5. |
|
- prominence (float, optional): Minimum prominence required for a peak to be identified. Defaults to 0.5. |
|
|
|
Returns: |
|
- list of int: List containing the indices of the identified peaks in the original input array. |
|
- dict: Information about the properties of the identified peaks (as returned by scipy.signal.find_peaks). |
|
|
|
""" |
|
pad = int(ceil(distance))+1 |
|
v_ext = np.concatenate([np.ones((pad,), dtype=v.dtype)*np.min(v), v, np.ones((pad,), dtype=v.dtype)*np.min(v)]) |
|
|
|
assert(len(v_ext) == len(v)+2*pad) |
|
peaks, _ = find_peaks(v_ext, distance=distance, prominence=prominence) |
|
peaks = peaks - pad |
|
n_peaks = [] |
|
for i in peaks: |
|
if 0<=i<len(v): |
|
n_peaks.append(i) |
|
else: |
|
raise Exception |
|
return n_peaks, _ |
|
|
|
|
|
def process_cell_traces(all_paths, path_lengths, measured_trace_fluorescence, dmin=10): |
|
""" |
|
Process traces of cells to extract peak information and organize the data. |
|
|
|
The function normalizes fluorescence data, finds peaks, refines peak information, |
|
removes unwanted peaks that might be due to close proximity of bright peaks from |
|
other paths, and organizes all the information into a structured data format. |
|
|
|
Args: |
|
all_paths (list of list of tuples): A list containing paths, where each path is |
|
represented as a list of 3D coordinate tuples. |
|
path_lengths (list of float): List of path lengths corresponding to the provided paths. |
|
measured_trace_fluorescence (list of list of float): A list containing fluorescence |
|
data corresponding to each path point. |
|
dmin (float): Distance below which brighter peaks screen less bright ones. |
|
|
|
Returns: |
|
CellData: An object containing organized peak and path data for a given cell. |
|
|
|
Note: |
|
- The function assumes that each path and its corresponding length and fluorescence data |
|
are positioned at the same index in their respective lists. |
|
""" |
|
|
|
cell_peaks = [] |
|
|
|
for points, o_intensity in zip(all_paths, measured_trace_fluorescence): |
|
|
|
|
|
intensity_normalized = (o_intensity - np.mean(o_intensity))/np.std(o_intensity) |
|
|
|
|
|
p,_ = find_peaks2(intensity_normalized, distance=5, prominence=0.5*np.std(intensity_normalized)) |
|
peaks = np.array(p, dtype=np.int32) |
|
|
|
|
|
peak_mean_heights = [ o_intensity[u] for u in peaks ] |
|
peak_points = [ points[u] for u in peaks ] |
|
|
|
cell_peaks.append((peaks, peak_points, peak_mean_heights)) |
|
|
|
|
|
|
|
|
|
|
|
to_thin = [] |
|
for k in range(len(cell_peaks)): |
|
for u in range(len(cell_peaks[k][0])): |
|
to_thin.append(PeakData(pos=cell_peaks[k][1][u], intensity=cell_peaks[k][2][u], key=(k, u))) |
|
|
|
|
|
removed_peaks, removed_larger_peaks = thin_peaks(to_thin, return_larger_peaks=True, dmin=dmin) |
|
|
|
|
|
new_cell_peaks = [] |
|
removed_cell_peaks = [] |
|
removed_cell_peaks_larger = [] |
|
for path_idx in range(len(cell_peaks)): |
|
path_retained_peaks = [] |
|
path_removed_peaks = [] |
|
path_peaks = cell_peaks[path_idx][0] |
|
|
|
for peak_idx in range(len(path_peaks)): |
|
if (path_idx, peak_idx) not in removed_peaks: |
|
path_retained_peaks.append(path_peaks[peak_idx]) |
|
else: |
|
|
|
idx = removed_peaks.index((path_idx, peak_idx)) |
|
larger_path, larger_idx = removed_larger_peaks[idx] |
|
path_removed_peaks.append(RemovedPeakData(idx=path_peaks[peak_idx], screening_peak=(larger_path, cell_peaks[larger_path][0][larger_idx]))) |
|
|
|
|
|
new_cell_peaks.append(path_retained_peaks) |
|
removed_cell_peaks.append(path_removed_peaks) |
|
|
|
cell_peaks = new_cell_peaks |
|
pd_list = [] |
|
|
|
|
|
for k in range(len(all_paths)): |
|
|
|
points, o_intensity = all_paths[k], measured_trace_fluorescence[k] |
|
|
|
peaks = cell_peaks[k] |
|
removed_peaks = removed_cell_peaks[k] |
|
|
|
pd = PathData(peaks=peaks, removed_peaks=removed_peaks, points=points, o_intensity=o_intensity, SC_length=path_lengths[k]) |
|
pd_list.append(pd) |
|
|
|
cd = CellData(pathdata_list=pd_list) |
|
|
|
return cd |
|
|
|
|
|
alpha_max = 0.4 |
|
|
|
|
|
|
|
|
|
def focus_criterion(pos, v, alpha=alpha_max): |
|
""" |
|
Identify and return positions where values in the array `v` exceed a certain threshold. |
|
|
|
The threshold is computed as `alpha` times the maximum value in `v`. |
|
|
|
Args: |
|
- pos (numpy.ndarray): Array of positions. |
|
- v (numpy.ndarray): 1D array of values, e.g., intensities. |
|
- alpha (float, optional): A scaling factor for the threshold. Defaults to `alpha_max`. |
|
|
|
Returns: |
|
- numpy.ndarray: Array of positions where corresponding values in `v` exceed the threshold. |
|
""" |
|
if len(v): |
|
idx = (v>=alpha*np.max(v)) |
|
return np.array(pos[idx]) |
|
else: |
|
return np.array([], dtype=np.int32) |
|
|
|
def analyse_celldata(cell_data, config): |
|
""" |
|
Analyse the provided cell data to extract focus-related information. |
|
|
|
Args: |
|
cd (CellData): An instance of the CellData class containing path data information. |
|
config (dictionary): Configuration dictionary containing 'peak_threshold' and 'threshold_type' |
|
'peak_threshold' (float) - threshold for calling peaks as foci |
|
'threshold_type' (str) = 'per-trace', 'per-foci' |
|
|
|
Returns: |
|
tuple: A tuple containing: |
|
- foci_rel_intensity (list): List of relative intensities for the detected foci. |
|
- foci_pos (list): List of absolute positions of the detected foci. |
|
- foci_pos_index (list): List of indices of the detected foci. |
|
- screened_foci_data (list): List of RemovedPeakData indicating positions of removed peaks and the index of the larger peak |
|
- trace_median_intensities (list): Per-trace median intensity |
|
- trace_thresholds (list): Per-trace absolute threshold for calling peaks as foci |
|
""" |
|
foci_abs_intensity = [] |
|
foci_pos = [] |
|
foci_pos_index = [] |
|
screened_foci_data = [] |
|
trace_median_intensities = [] |
|
trace_thresholds = [] |
|
|
|
peak_threshold = config['peak_threshold'] |
|
|
|
threshold_type = config['threshold_type'] |
|
|
|
if threshold_type == 'per-trace': |
|
""" |
|
Call extracted peaks as foci if intensity - trace_mean > peak_threshold * (trace_max_foci_intensity - trace_mean) |
|
""" |
|
|
|
for path_data in cell_data.pathdata_list: |
|
peaks = np.array(path_data.peaks, dtype=np.int32) |
|
|
|
|
|
|
|
h = np.array(path_data.o_intensity) |
|
h = h - np.mean(h) |
|
h = h/np.std(h) |
|
|
|
foci_idx = focus_criterion(peaks, h[peaks], peak_threshold) |
|
|
|
|
|
removed_peaks = path_data.removed_peaks |
|
removed_peaks_idx = np.array([u.idx for u in removed_peaks], dtype=np.int32) |
|
|
|
|
|
if len(peaks): |
|
trace_thresholds.append((1-peak_threshold)*np.mean(path_data.o_intensity) + peak_threshold*np.max(np.array(path_data.o_intensity)[peaks])) |
|
else: |
|
trace_thresholds.append(None) |
|
|
|
if len(removed_peaks): |
|
if len(peaks): |
|
threshold = (1-peak_threshold)*np.mean(path_data.o_intensity) + peak_threshold*np.max(np.array(path_data.o_intensity)[peaks]) |
|
else: |
|
threshold = float('-inf') |
|
|
|
|
|
removed_peak_heights = np.array(path_data.o_intensity)[removed_peaks_idx] |
|
screened_foci_idx = np.where(removed_peak_heights>threshold)[0] |
|
|
|
screened_foci_data.append([removed_peaks[i] for i in screened_foci_idx]) |
|
else: |
|
screened_foci_data.append([]) |
|
|
|
pos_abs = (foci_idx/len(path_data.points))*path_data.SC_length |
|
foci_pos.append(pos_abs) |
|
foci_abs_intensity.append(np.array(path_data.o_intensity)[foci_idx]) |
|
|
|
foci_pos_index.append(foci_idx) |
|
trace_median_intensities.append(np.median(path_data.o_intensity)) |
|
|
|
elif threshold_type == 'per-cell': |
|
""" |
|
Call extracted peaks as foci if intensity - trace_mean > peak_threshold * max(intensity - trace_mean) |
|
""" |
|
max_cell_intensity = float("-inf") |
|
for path_data in cell_data.pathdata_list: |
|
|
|
|
|
|
|
h = np.array(path_data.o_intensity) |
|
h = h - np.mean(h) |
|
max_cell_intensity = max(max_cell_intensity, np.max(h)) |
|
|
|
for path_data in cell_data.pathdata_list: |
|
peaks = np.array(path_data.peaks, dtype=np.int32) |
|
|
|
|
|
|
|
h = np.array(path_data.o_intensity) |
|
h = h - np.mean(h) |
|
|
|
foci_idx = peaks[h[peaks]>peak_threshold*max_cell_intensity] |
|
|
|
removed_peaks = path_data.removed_peaks |
|
removed_peaks_idx = np.array([u.idx for u in removed_peaks], dtype=np.int32) |
|
|
|
trace_thresholds.append(np.mean(path_data.o_intensity) + peak_threshold*max_cell_intensity) |
|
|
|
if len(removed_peaks): |
|
threshold = np.mean(path_data.o_intensity) + peak_threshold*max_cell_intensity |
|
|
|
removed_peak_heights = np.array(path_data.o_intensity)[removed_peaks_idx] |
|
screened_foci_idx = np.where(removed_peak_heights>threshold)[0] |
|
|
|
screened_foci_data.append([removed_peaks[i] for i in screened_foci_idx]) |
|
else: |
|
screened_foci_data.append([]) |
|
|
|
pos_abs = (foci_idx/len(path_data.points))*path_data.SC_length |
|
foci_pos.append(pos_abs) |
|
foci_abs_intensity.append(np.array(path_data.o_intensity)[foci_idx]) |
|
|
|
foci_pos_index.append(foci_idx) |
|
trace_median_intensities.append(np.median(path_data.o_intensity)) |
|
|
|
else: |
|
raise NotImplementedError |
|
|
|
return foci_abs_intensity, foci_pos, foci_pos_index, screened_foci_data, trace_median_intensities, trace_thresholds |
|
|
|
def analyse_traces(all_paths, path_lengths, measured_trace_fluorescence, config): |
|
|
|
cd = process_cell_traces(all_paths, path_lengths, measured_trace_fluorescence, dmin=config['screening_distance']) |
|
|
|
return analyse_celldata(cd, config) |
|
|
|
|
|
|
|
|
|
|