Source code for lpath.match

"""
Pattern match your extracted trajectories and cluster pathways classes.
"""
# Code that pattern matches states a trajectory has been through
# and then cluster them into fundamental few pathways.
#
# This is pulled from 'version 6', which expands on reassigning and
# allows for > 9 states.
#

import pickle
from os.path import exists
from shutil import copyfile

import numpy
import pylcs
import matplotlib.pyplot as plt
import scipy.cluster.hierarchy as sch
from scipy.spatial.distance import squareform
from sklearn.metrics import pairwise_distances
from tqdm.auto import tqdm, trange
from timedinput import timedinput

from lpath.extloader import *
from lpath.io import load_file, default_dendrogram_colors

from ._logger import Logger

log = Logger().get_logger(__name__)


[docs] def tostr(b): """ Convert a nonstandard string object ``b`` to str with the handling of the case where ``b`` is bytes. """ if b is None: return None elif isinstance(b, bytes): return b.decode('utf-8') else: return str(b)
[docs] def condense_string(string: str, n: int) -> str: """ Function that takes in a string and remove any consecutive duplicates. Starts from 1, slowly works up to n. Parameters ---------- string : str Input string. n : int How many consecutive duplicates to remove. 0 for none. 1 for just consecutive characters, 2 for consecutive pairs (e.g., ABABABA --> AB) """ for n_idx in range(1, n+1): result = "" count = 0 while count < len(string): if (count + n_idx <= len(string) and string[count:count+n_idx] == string[count+n_idx:count+2*n_idx]): count += n_idx else: result += string[count] count += 1 string = result return string
[docs] def calc_dist(seq1, seq2, dictionary, pbar, condense=0): """ Pattern match and calculate the similarity between two ``state string`` sequences. Parameters ---------- seq1 : numpy.ndarray First string to be compared. seq2 : numpy.ndarray Second string to be compared. dictionary : dict Dictionary mapping ``state_id`` (float/int) to ``state string`` (characters). pbar : tqdm.tqdm A tqdm.tqdm object for the progress bar. condense : int, default: 0 Set to a positive int to shorten consecutive characters in state strings. Returns ------- 1 - similarity : float Similarity score. """ # Remove all instances of "unknown" state, which is always the last entry in the dictionary. seq1 = seq1[seq1 < len(dictionary) - 1] seq1_str = "".join(dictionary[x] for x in seq1) seq2 = seq2[seq2 < len(dictionary) - 1] seq2_str = "".join(dictionary[x] for x in seq2) seq1_str = condense_string(seq1_str, condense) seq2_str = condense_string(seq2_str, condense) km = pylcs.lcs_sequence_length(seq1_str, seq2_str) similarity = (2 * km) / (len(seq1_str) + len(seq2_str) - (abs(len(seq1_str) - len(seq2_str)) / 2)) pbar.update(1) return 1 - similarity
[docs] def calc_dist_substr(seq1, seq2, dictionary, pbar, condense=0): """ Pattern match and calculate the similarity between two ``state string`` substrings. Used when you're comparing segment ids. Parameters ---------- seq1 : numpy.ndarray First string to be compared. seq2 : numpy.ndarray Second string to be compared. dictionary : dict Dictionary mapping ``state_id`` (float/int) to ``state string`` (characters). pbar : tqdm.tqdm A tqdm.tqdm object for the progress bar. condense : int, default: 0 Set to a positive int to shorten consecutive characters in state strings. Returns ------- 1 - similarity : float Similarity score. """ # Remove all instances of initial/basis states. # seq1 = seq1[seq1 > 0] seq1_str = "".join(dictionary[x] for x in seq1) # seq2 = seq2[seq2 > 0] seq2_str = "".join(dictionary[x] for x in seq2) seq1_str = condense_string(seq1_str, condense) seq2_str = condense_string(seq2_str, condense) km = pylcs.lcs_string_length(seq1_str, seq2_str) similarity = (2 * km) / (len(seq1_str) + len(seq2_str) - (abs(len(seq1_str) - len(seq2_str)) / 2)) pbar.update(1) return 1 - similarity
[docs] def calc_dist_vanilla(seq1, seq2, dictionary, pbar, condense=0): """ Pattern match and calculate the similarity between two ``state string`` sequences. This version does not include the reward term for segments of different length. Parameters ---------- seq1 : numpy.ndarray First string to be compared. seq2 : numpy.ndarray Second string to be compared. dictionary : dict Dictionary mapping ``state_id`` (float/int) to ``state string`` (characters). pbar : tqdm.tqdm A tqdm.tqdm object for the progress bar. condense : int, default: 0 Set to a positive int to shorten consecutive characters in state strings. Returns ------- 1 - similarity : float Similarity score. """ # Remove all instances of "unknown" state, which is always the last entry in the dictionary. seq1 = seq1[seq1 < len(dictionary) - 1] seq1_str = "".join(dictionary[x] for x in seq1) seq2 = seq2[seq2 < len(dictionary) - 1] seq2_str = "".join(dictionary[x] for x in seq2) seq1_str = condense_string(seq1_str, condense) seq2_str = condense_string(seq2_str, condense) km = pylcs.lcs_sequence_length(seq1_str, seq2_str) similarity = (2 * km) / (len(seq1_str) + len(seq2_str)) pbar.update(1) return 1 - similarity
[docs] def calc_dist_substr_vanilla(seq1, seq2, dictionary, pbar, condense=0): """ Pattern match and calculate the similarity between two ``state string`` substrings. Used when you're comparing segment ids. This version does not include the reward term for segments of different length. Parameters ---------- seq1 : numpy.ndarray First string to be compared. seq2 : numpy.ndarray Second string to be compared. dictionary : dict Dictionary mapping ``state_id`` (float/int) to ``state string`` (characters). pbar : tqdm.tqdm A tqdm.tqdm object for the progress bar. condense : int, default: 0 Set to a positive int to shorten consecutive characters in state strings. Returns ------- 1 - similarity : float Similarity score. """ # Remove all instances of initial/basis states. # seq1 = seq1[seq1 > 0] seq1_str = "".join(dictionary[x] for x in seq1) # seq2 = seq2[seq2 > 0] seq2_str = "".join(dictionary[x] for x in seq2) seq1_str = condense_string(seq1_str, condense) seq2_str = condense_string(seq2_str, condense) km = pylcs.lcs_string_length(seq1_str, seq2_str) similarity = (2 * km) / (len(seq1_str) + len(seq2_str)) pbar.update(1) return 1 - similarity
[docs] def determine_reassign(reassign_method): """ Argument processing to determine function to reassign trajectories. Parameters ---------- reassign_method : str , default: 'reassign_identity' String from argument.reassign_identity, straight from argparser. Returns ------- reassign : function The reassignment function. """ # Dealing with the preset assign_method preset_reassign = { 'reassign_identity': reassign_identity, 'reassign_statelabel': reassign_statelabel, 'reassign_custom': reassign_custom, 'reassign_segid': reassign_segid, } if reassign_method in preset_reassign.keys(): reassign = preset_reassign[reassign_method] else: import sys import os sys.path.append(os.getcwd()) reassign = get_object(reassign_method) log.info(f'INFO: Replaced reassign() with {reassign_method}') return reassign
[docs] def determine_metric(match_metric, match_vanilla): """ Argument processing to determine function to reassign trajectories. Parameters ---------- match_metric : str , default: 'longest_common_subsequence' String from argument.match_metric, straight from argparser. match_vanilla : bool, default: False Which similarity metric to use. False to use similarity metric with reward term. Returns ------- metric : function The matching function. """ # Dealing with the preset assign_method subsequence_metric = { 'longest_common_subsequence': calc_dist, 'longest_common_subsequence_vanilla': calc_dist_vanilla, } substring_metric = { 'longest_common_substring': calc_dist_substr, 'longest_common_substring_vanilla': calc_dist_substr_vanilla, } preset_metric = {**subsequence_metric, **substring_metric} if match_metric in preset_metric.keys(): metric = preset_metric[match_metric] else: import sys import os sys.path.append(os.getcwd()) metric = get_object(match_metric) log.info(f'INFO: Replaced match_metric with {match_metric}') # Dealing with cases where you called the non-vanilla versions (`--substring` or `--subsequence`), # but also called `--match-reward-off`. The `--match-reward-off` will take priority. if match_vanilla is True: if metric in subsequence_metric.values(): metric = calc_dist_vanilla elif metric in substring_metric.values(): metric = calc_dist_substr_vanilla return metric
[docs] def load_data(file_name): """ Load in the pickle data from ``extract``. Parameters ---------- file_name: str File name of the pickle object from ``extract`` Returns ------- data : list A list with the data necessary to reassign, as extracted from ``output.pickle``. pathways : numpy.ndarray An empty array with shapes for iter_id/seg_id/state_id/pcoord_or_auxdata/frame#/weight. """ data = load_file(file_name, 1) npathways = len(data) assert npathways, "Pickle object is empty. Are you sure there are transitions?" lpathways = max([len(i) for i in data]) n = len(data[0][0]) pathways = numpy.zeros((npathways, lpathways, n), dtype=object) # This "Pathways" array should be Iter/Seg/State/auxdata_or_pcoord/frame#/weight log.debug(f'Loaded pickle object.') return data, pathways
[docs] def reassign_custom(data, pathways, dictionary, assign_file=None): """ Reclassify/assign frames into different states. This is highly specific to the system. If w_assign's definition is sufficient, you can proceed with what's made in the previous step using ``reassign_identity``. In this example, the dictionary maps state idx to its corresponding ``state_string``. We suggest using alphabets as states. Parameters ---------- data : list An array with the data necessary to reassign, as extracted from ``output.pickle``. pathways : numpy.ndarray An empty array with shapes for iter_id/seg_id/state_id/pcoord_or_auxdata/frame#/weight. dictionary : dict An empty dictionary obj for mapping ``state_id`` with ``state string``. The last entry in the dictionary should be the "unknown" state. assign_file : str, default : None A string pointing to the ``assign.h5`` file. Needed as a parameter for all functions, but is ignored if it's an MD trajectory. Returns ------- dictionary : dict A dictionary mapping each ``state_id`` (float/int) with a ``state string`` (character). The last entry in the dictionary should be the "unknown" state. """ # Other example for grouping multiple states into one. for idx, pathway in enumerate(data): # The following shows how you can "merge" multiple states into # a single one. pathway = numpy.asarray(pathway) # Further downsizing... to if pcoord is less than 5 first_contact = numpy.where(pathway[:, 3] < 5)[0][0] for jdx, frame in enumerate(pathway): # First copy all columns over pathways[idx, jdx] = frame # ortho is assigned to state 0 if frame[2] in [1, 3, 4, 6, 7, 9]: frame[2] = 0 # para is assigned to state 1 elif frame[2] in [2, 5, 8]: frame[2] = 1 # Unknown state is assigned 2 if jdx < first_contact: frame[2] = 2 pathways[idx, jdx] = frame # Generating a dictionary mapping each state dictionary = {0: 'A', 1: 'B', 2: '!'} return dictionary
[docs] def reassign_statelabel(data, pathways, dictionary, assign_file): """ Use ``assign.h5`` states as is with ``state_labels``. Does not reclassify/assign frames into new states. In this example, the dictionary maps state idx to its ``state_labels``, as defined in the assign.h5. We suggest using alphabets as ``state_labels`` to allow for more than 9 states. Parameters ---------- data : list An list with the data necessary to reassign, as extracted from ``output.pickle``. pathways : numpy.ndarray An empty array with shapes for iter_id/seg_id/state_id/pcoord_or_auxdata/frame#/weight. dictionary : dict An empty dictionary obj for mapping ``state_id`` with "state string". assign_file : str A string pointing to the ``assign.h5`` file. Needed as a parameter, but ignored if it's an MD trajectory. Returns ------- dictionary : dict A dictionary mapping each ``state_id`` (float/int) with a state string (character). """ for idx, pathway in enumerate(data): pathway = numpy.asarray(pathway) for jdx, frame in enumerate(pathway): pathways[idx, jdx] = frame try: import h5py with h5py.File(assign_file) as f: for idx, state in enumerate(f['state_labels'][:]): dictionary[idx] = tostr(state) dictionary[len(dictionary)] = '!' # Unknown state except ModuleNotFoundError: raise ModuleNotFoundError('Could not import h5py. Exiting out.') return dictionary
[docs] def reassign_segid(data, pathways, dictionary, assign_file=None): """ Use seg ids as state labels. Parameters ---------- data : list An list with the data necessary to reassign, as extracted from ``output.pickle``. pathways : numpy.ndarray An empty array with shapes for iter_id/seg_id/state_id/pcoord_or_auxdata/frame#/weight. dictionary : dict An empty dictionary obj for mapping ``state_id`` with ``state string``. assign_file : str A string pointing to the ``assign.h5`` file. Needed as a parameter, but ignored if it's an MD trajectory. Returns ------- dictionary : dict A dictionary mapping each ``state_id`` (float/int) with a `state string` (character). """ for idx, pathway in enumerate(data): pathway = numpy.asarray(pathway) for jdx, frame in enumerate(pathway): pathways[idx, jdx] = frame # Copy everything... pathways[idx, jdx, 2] = frame[1] # Replace states with seg_id n_states = int(max([seg[2] for traj in pathways for seg in traj])) + 1 for idx in range(n_states): dictionary[idx] = chr(idx + 65) # Map seg_id to a unique character dictionary[n_states] = '!' # Unknown state return dictionary
[docs] def reassign_identity(data, pathways, dictionary, assign_file=None): """ Use assign.h5 states as is. Does not attempt to map assignment to ``state_labels`` from assign.h5. Parameters ---------- data : list An list with the data necessary to reassign, as extracted from ``output.pickle``. pathways : numpy.ndarray An empty array with shapes for iter_id/seg_id/state_id/pcoord_or_auxdata/frame#/weight. dictionary : dict An empty dictionary obj for mapping ``state_id`` with ``state string``. assign_file : str A string pointing to the ``assign.h5`` file. Needed as a parameter, but ignored if it's an MD trajectory. Returns ------- dictionary : dict A dictionary mapping each ``state_id`` (float/int) with a `state string` (character). """ for idx, pathway in enumerate(data): pathway = numpy.asarray(pathway) for jdx, frame in enumerate(pathway): pathways[idx, jdx] = frame n_states = int(max([seg[2] for traj in pathways for seg in traj])) + 1 for idx in range(n_states): dictionary[idx] = str(idx) dictionary[n_states] = '!' # Unknown state return dictionary
[docs] def process_shorter_traj(pathways, dictionary, threshold_length, remove_ends): """ Assigns a non-state to pathways which are shorter than the max length. Parameters ---------- pathways : numpy.ndarray or list An array with shapes for iter_id/seg_id/state_id/pcoord_or_auxdata/frame#/weight. dictionary: dict Maps each state_id to a corresponding string. threshold_length: int or float, default: 0 A parameter such that trajectories < threshold_length are excluded from pattern matching. remove_ends: bool, default: False If True, remove the first and last frames (source and target frames). """ del_list = [] empty_row = numpy.zeros(len(pathways[0, 0])) empty_row[2] = len(dictionary) - 1 for idx, pathway in enumerate(pathways): count = 0 for frame in pathway: if frame[0] == 0: # If no iteration number (i.e., a dummy frame) frame[2] = len(dictionary) - 1 # Mark with the last entry else: count += 1 if count < threshold_length: del_list.append(idx) if remove_ends: # print(f'{idx} {count} : {len(pathways[idx])}') # if count > len(pathways[idx]): # print(pathways[idx]) pathways[idx, 0] = empty_row pathways[idx, count-1] = empty_row if remove_ends: # Delete in one go. pathways = numpy.delete(pathways, [0, -1], axis=1) if len(del_list) > 0: pathways = numpy.delete(pathways, del_list, axis=0) log.debug(f'Indices of pathways removed: {del_list}.') log.info(f'Removed {len(del_list)} trajectories of length < {threshold_length} frames.') return pathways
[docs] def gen_dist_matrix(pathways, dictionary, file_name='succ_traj/distmat.npy', remake=True, metric=calc_dist, condense=0, n_jobs=None): """ Generate the path_string to path_string similarity distance matrix. Parameters ---------- pathways : numpy.ndarray An array with all the sequences to be compared. dictionary : dict A dictionary to map pathways states to characters. file_name : str, default : 'distmat.npy' The file to output the distance matrix. remake : bool, default : True Indicates whether to remake distance matrix or not. metric : bool, default : calc_dist Metric function to use. condense : int, default: 0 Set to a positive int to shorten consecutive characters in state strings. n_jobs : int, default : None Number of jobs to run for the pairwise_distances() calculation. The default issues one job. Returns ------- distmat : numpy.ndarray A condensed form of the distance matrix (Upper Triangle). weights : ndarray An array of the weights of each successful pathway (as taken from the last frame). """ weights = [] path_strings = [] if metric: for pathway in pathways: # remove weights for "non-existent" iters (unknown state) nonzero = pathway[pathway[:, 2] < len(dictionary) - 1] weights.append(nonzero[-1][-1]) # Create path_strings path_strings.append(pathway[:, 2]) else: for pathway in pathways: weights.append(pathway[-1][-1]) # Create path_strings path_strings.append(pathway[:, 2]) weights = numpy.asarray(weights) if not exists(file_name) or remake is True: log.debug(f'Proceeding to calculate distance matrix.') pbar = tqdm(total=int((len(path_strings) * (len(path_strings) - 1))), leave=False) # sklearn.metrics.pairwise_distances() brute-forces it by running all n^2 calculations! distmat = pairwise_distances( X=path_strings, metric=lambda x, y: metric(x, y, dictionary, pbar, condense), n_jobs=n_jobs, ) numpy.save(file_name, distmat) else: distmat = numpy.load(file_name) log.debug(f'Loaded precalculated distance matrix.') distmat = squareform(distmat, checks=False) return distmat, weights
[docs] def calc_linkage(distmat): """ Given distance matrix, calculate and return the linkage. """ return sch.linkage(distmat, method='ward')
[docs] def visualize(z, threshold, out_path="plots", show_fig=True, mpl_colors=None, ax=None): """ Visualize the Dendrogram to determine hyper-parameters (n-clusters). Theoretically done only once to check. Returns ------- plt.gca() : matplotlib.Axes A matplotlib.Axes object, which should be the axes which is used to plot the dendrogram. """ # Clean slate. if ax is None: log.debug('Clearing current plot axis for dendrogram.') plt.cla() # Plot dendrogram try: sch.set_link_color_palette(mpl_colors[:-1]) with plt.rc_context({'lines.linewidth': 2}): sch.dendrogram(z, no_labels=True, color_threshold=threshold, above_threshold_color=mpl_colors[-1], ax=ax) except RecursionError as e: # Catch cases where are too many branches in the dendrogram for default recursion to work. import sys sys.setrecursionlimit(100000) log.warning(e) log.warning(f'Dendrogram too complex to plot with default settings. Upping the recursion limit.') with plt.rc_context({'lines.linewidth': 2}): sch.dendrogram(z, no_labels=True, color_threshold=threshold, above_threshold_color=mpl_colors[-1], ax=ax) plt.axhline(y=threshold, c="k") plt.ylabel("distance") plt.xlabel("pathways") plt.savefig(f"{out_path}/dendrogram.pdf") if show_fig: plt.show() return plt.gca()
[docs] def hcluster(z, n_clusters): """ Scikit-learn Hierarchical Clustering of the different pathways. """ # (Hyper Parameter t=number of cluster) cluster_labels = sch.fcluster(z, t=n_clusters, criterion="maxclust") return cluster_labels - 1
[docs] def determine_clusters(cluster_labels, clusters=None): """ Determine how many clusters to output. Parameters ---------- cluster_labels : numpy.ndarray An array with cluster assignments for each pathway. clusters : list or None Straight from the argparser. Returns ------- clusters : list A list of clusters to output. """ if clusters is None: clusters = list(range(max(cluster_labels) + 1)) elif not isinstance(clusters, list): try: list(clusters) except TypeError: raise TypeError( "Provided cluster numbers don't work. Provide a list desired of cluster numbers or 'None' to output " "all clusters." ) return clusters
[docs] def export_pickle(pathways, output_path): """ Option to output the reassigned pickle object. Parameters ---------- pathways : numpy.ndarray A reassigned pathway object output_path : str Path to output pickle object. """ with open(output_path, 'wb') as f: pickle.dump(pathways, f)
[docs] def select_rep(data_arr, weights, cluster_labels, icluster): """ Small function to determine representative array/weight Parameters ---------- data_arr : numpy.ndarray The array with all the pathways. weights : numpy.ndarray Weight information of the pathways. cluster_labels : numpy.ndarray An array with cluster assignments for each pathway. icluster : int Index of cluster to look at. Returns ------- data_cl : list A list of pathways from icluster. rep_weight : float The weight of the representative structure of icluster. """ selected_cluster = cluster_labels == icluster cluster_arr = numpy.array(data_arr, dtype=object)[selected_cluster] data_cl_trim = [pathway[pathway[:, 0] != 0] for pathway in cluster_arr] weights_cl = weights[selected_cluster] rep_weight = data_cl_trim[numpy.argmax(weights_cl)][-1] return data_cl_trim, rep_weight
[docs] def export_std_files(data_arr, weights, cluster_labels, clusters=None, out_dir="succ_traj"): """ Export data for standard simulations. Parameters ---------- data_arr : numpy.ndarray The array with all the pathways. weights : numpy.ndarray Weight information of the pathways. cluster_labels : numpy.ndarray An array with cluster assignments for each pathway. clusters : list or None A list of clusters to output, straight from the argparser. out_dir : str Directory to output files. """ clusters = determine_clusters(cluster_labels, clusters) representative_file = f'{out_dir}/representative_segments.txt' representative_list = [] for icluster in clusters: trace_out_list = [] data_cl, rep_weight = select_rep(data_arr, weights, cluster_labels, icluster) log.info(f'cluster {icluster} representative weight: {rep_weight}') representative_list.append(f'{rep_weight}\n') for idx, item in enumerate(data_cl): trace_out_list.append(list(numpy.array(item)[:, :2])) with open(representative_file, 'w') as f: f.writelines(representative_list)
[docs] def export_we_files(data_arr, weights, cluster_labels, clusters, file_pattern="west_succ_c{}.h5", out_dir="succ_traj", west_name='west.h5'): """ Export each group of successful trajectories into independent west.h5 file. Parameters ---------- data_arr : numpy.ndarray The array with all the pathways. weights : numpy.ndarray Weight information of the pathways. cluster_labels : numpy.ndarray An array with cluster assignments for each pathway. clusters : list or None A list of clusters to output. file_pattern : str String pattern of how files should be outputted. out_dir : str Directory to output files. west_name : str Name of west.h5 file to use as base. """ try: import h5py except ModuleNotFoundError: raise ModuleNotFoundError('Could not import h5py. Exiting out.') clusters = determine_clusters(cluster_labels, clusters) representative_file = f'{out_dir}' + '/representative_segments.txt' representative_list = [] for icluster in clusters: new_file = f'{out_dir}/' + file_pattern.format(str(icluster)) if not exists(new_file): copyfile(west_name, new_file) first_iter = 1 with h5py.File(west_name, "r") as h5_file: last_iter = len(h5_file['iterations']) # Identify constituents of a cluster to output. trace_out_list = [] data_cl, rep_weight = select_rep(data_arr, weights, cluster_labels, icluster) log.info(f'cluster {icluster} representative weight: {rep_weight}') representative_list.append(f'{rep_weight}\n') for idx, item in enumerate(data_cl): trace_out_list.append(list(numpy.array(item)[:, :2])) # tqdm load bar, working backwards tqdm_iter = trange(last_iter, first_iter - 1, -1, desc=f'c{icluster} iterations') exclusive_set = {tuple(pair) for ilist in trace_out_list for pair in ilist} with h5py.File(new_file, "r+") as h5file: for n_iter in tqdm_iter: for n_seg in trange(len(h5file[f'iterations/iter_{n_iter:>08}/seg_index']), desc=f'iter {n_iter} segments', leave=False, delay=1): if (n_iter, n_seg) not in exclusive_set: h5file[f"iterations/iter_{n_iter:>08}/seg_index"]["weight", n_seg] = 0 with open(representative_file, 'w') as f: f.writelines(representative_list)
[docs] def determine_rerun(z, out_path='plots', mpl_colors=default_dendrogram_colors, ax=None, timeout=None): """ Asks if you want to regenerate the dendrogram. Parameters ---------- z : numpy.ndarray A numpy.ndarray from sch.linkage. out_path : str, default: 'plots' Path to output plots. mpl_colors : list or default_dendrogram_colors A list of colors for coloring the dendrogram. ax : matplotlib.Axes, Default: None Matplotlib.Axes object to be inherited. timeout : int, default: 30 Input timeout in seconds. """ if timeout is None: timeout = 30 while True: try: ans = timedinput('Do you want to regenerate the graph with a new threshold (y/[n])?\n', timeout=timeout, default='N') if ans == 'y' or ans == 'Y': ans2 = timedinput('What new threshold would you like?\n', timeout=15, default=0.5) try: ax = visualize(z, out_path=out_path, threshold=float(ans2), show_fig=True, mpl_colors=mpl_colors, ax=ax) return ax except ValueError: determine_rerun(z, out_path=out_path, mpl_colors=mpl_colors, ax=ax, timeout=timeout) elif ans == 'n' or ans == 'N' or ans == '': return None else: log.warning("Invalid input.\n") except KeyboardInterrupt: sys.exit(0)
[docs] def ask_number_clusters(num_clusters=None, timeout=None): """ Asks how many clusters you want to separate the trajectories into. """ if timeout is None: timeout = 15 if not num_clusters: while True: try: ans = timedinput('How many clusters would you like to separate the pathways into?\n', timeout=timeout, default=2) try: ans = int(ans) return ans except ValueError: log.warning("Invalid input.\n") except KeyboardInterrupt: sys.exit(0) else: return num_clusters
[docs] def report_statistics(n_clusters, cluster_labels, weights, segid_status=False): """ Report statistics about the final clusters. Parameters ---------- n_clusters : int Number of clusters. cluster_labels : numpy.ndarray An array mapping pathways to cluster weights : numpy.ndarray Weight information segid_status : bool, default: False Status of whether we're using seg_ids or not. """ # Initialize the dictionary with 0 weight. 0-based for cl. final_dictionary = dict() counts = dict() uniques = dict() for j in range(n_clusters): final_dictionary[j] = 0 counts[j] = 0 for (cl, weight) in zip(cluster_labels, weights): final_dictionary[cl] += weight counts[cl] += 1 if segid_status: for cluster, unique_count in zip(*numpy.unique(cluster_labels, return_counts=True)): uniques[cluster] = unique_count else: for cl in range(n_clusters): uniques[cl] = 'N/A' report = '\n' report += f'===LPATH Pattern Matching Statistics===\n' report += f' Total Number of clusters: {n_clusters}\n' for (key, val) in final_dictionary.items(): report += f' Weight/count/unique count of cluster {key}: {val:.8e} / {counts[key]} / {uniques[key]}\n' log.info(report)
[docs] def main(arguments): """ Main function that executes the whole `match` step. Pathways are processed in the following order: 1. Assign extra "padding" frames as unknown state for segments too short. 2. Remove pathways that are too short (if specified). 3. Remove end frames (source and target states). 4. During pairwise calculation, condense the frames during comparison and remove frames in "unknown" state. Parameters ---------- arguments : argparse.Namespace A Namespace object will all the necessary parameters. """ # Dealing with the preset assign_method reassign = determine_reassign(arguments.reassign_method) metric = determine_metric(arguments.match_metric, arguments.match_vanilla) # Prepping the data + Calculating the distance matrix data, pathways = load_data(arguments.extract_output) dictionary = {} # Reassignment... (or not) dictionary = reassign(data, pathways, dictionary, arguments.assign_name) # system-specific reassignment of states if len(dictionary) < 3: log.warning(f'Only {len(dictionary)} states defined, including the "unknown" state. ' 'This will likely produce bad clustering results and you should considering reassigning to more ' 'intermediate states using a modified ``--reassign-method``.') log.debug(f'Completed reassignment.') # Cleanup test_obj = process_shorter_traj(pathways, dictionary, arguments.exclude_short, arguments.remove_ends) log.debug(f'Cleaned up trajectories.') dist_matrix, weights = gen_dist_matrix(test_obj, dictionary, file_name=arguments.dmatrix_save, remake=arguments.dmatrix_remake, # Calculate distance matrix metric=metric, # Which metric to use condense=arguments.condense, # Whether to condense consecutive state strings n_jobs=arguments.dmatrix_parallel) # Number of jobs for pairwise_distance log.debug(f'Generated distance matrix.') # Visualize the Dendrogram and determine how clusters used to group successful trajectories z = calc_linkage(dist_matrix) ax = visualize(z, threshold=arguments.dendrogram_threshold, out_path=arguments.out_path, show_fig=arguments.dendrogram_show, mpl_colors=arguments.mpl_colors) ax = determine_rerun(z, out_path=arguments.out_path, mpl_colors=arguments.mpl_colors, ax=ax, timeout=arguments.plot_timeout) n_clusters = ask_number_clusters(arguments.num_clusters, timeout=arguments.plot_timeout) cluster_labels = hcluster(z, n_clusters) # Report statistics if arguments.stats: log.debug('Reporting statistics') if arguments.reassign_method == 'reassign_segid': segid_status = True else: segid_status = False report_statistics(n_clusters, cluster_labels, weights, segid_status) # Output cluster labels and reassigned pickle object log.info('Outputting files') export_pickle(test_obj, arguments.output_pickle) numpy.save(arguments.cl_output, cluster_labels) # Following exports each cluster to its own h5 file, all weights of segments not in that group = 0. if arguments.export_h5: export_we_files( test_obj, weights, cluster_labels, clusters=arguments.clusters, out_dir=arguments.out_dir, file_pattern=arguments.file_pattern, west_name=arguments.west_name, ) else: export_std_files( test_obj, weights, cluster_labels, clusters=arguments.clusters, out_dir=arguments.out_dir, )