Source code for glypy.algorithms.subtree_search.inclusion

"""Implementation of explicit sub-graph inclusion and co-traversal algorithms.

Graph inclusion here means that there exists a path through graph A that can be
mapped onto equivalent nodes in graph B that are connected in the same way.

"""
from collections import deque, defaultdict

from glypy.structure import UnknownPosition
from glypy.algorithms.similarity import commutative_similarity, commutative_similarity_score_with_tolerance
from glypy.utils import root


class TopologicalInclusionMatcher(object):
    """A recursive topological traversal.

    Attributes
    ----------
    target: :class:`~.Monosaccharide`
        The target to test for inclusion with
    reference: :class:`~.Monosaccharide`
        The reference to test for inclusion in
    substituents: bool
        Whether to examine substituents when computing similarity and inclusion.
    tolerance: float
        The magnitude of the similarity error to tolerate.
    visited: set
        The set of target and reference node id pairs that have already been considered.

    """
    def __init__(self, target, reference, substituents=True, tolerance=0, visited=None):
        self.target = target
        self.reference = reference
        self.substituents = substituents
        self.tolerance = tolerance
        self.visited = visited or set()

    @classmethod
    def compare(cls, target, reference, substituents=True, tolerance=0, visited=None):
        '''
        A generalization of :meth:`~Monosaccharide.topological_equality` which allows for ``target``
        to be matched to ``reference``, but for ``reference`` to include more. Consequently,
        this method is not commutative.

        Parameters
        ----------
        target: :class:`~.Monosaccharide`
            The monosaccharide to compare
        reference: :class:`~.Monosaccharide`
            The monosaccharide to compare against
        substituents: :class:`bool`, optional
            Whether or not to compare with the substituents of each node. Defaults
            to :const:`True`.
        tolerance: :class:`float`, optional
            The maximum difference to permit between nodes for inclusion. Defaults
            to 0

        Returns
        -------
        :class:`float`
            The inclusion score. Greater than 0 indicates topological inclusion,
            though larger corresponds to better alignment.

        See Also
        --------
        :func:`~.commutative_similarity_score_with_tolerance`
        exact_ordering_inclusion
        '''
        inst = cls(
            target, reference, substituents=substituents,
            tolerance=tolerance, visited=visited)
        score = inst.test()
        return score

    def test_similarity(self):
        """Calculate the commutative similarity between :attr:`target` and
        :attr:`reference`.

        Returns
        -------
        float

        See Also
        --------
        :func:`~.commutative_similarity_score_with_tolerance`
        """
        score, similar = commutative_similarity_score_with_tolerance(
            self.target, self.reference, self.tolerance,
            include_substituents=self.substituents)
        return score, similar

    def test_children(self):
        """Find the optimal similarity pairing between descendents of
        :attr:`target` and :attr:`reference`.

        """
        match_index = self._build_child_pair_score_map()
        required_nodes = {node.id for p, node in self.target.children()}
        optimal_assignment, score = self.optimal_assignment(match_index, required_nodes)
        return optimal_assignment, score, bool(required_nodes)

    def test(self):
        """Compute the similarity metric for the current node pair.

        Returns
        -------
        float:
            The similarity score for the current pair.
        """
        key = (self.target.id, self.reference.id)
        if key in self.visited:
            return True
        self.visited.add(key)
        pair_score, similar = self.test_similarity()
        if similar:
            child_pairs, score, had_children = self.test_children()
            if child_pairs:
                return pair_score + score
            elif not had_children:
                return pair_score
            else:
                return 0
        return 0

    def _build_child_pair_score_map(self):
        combinations = dict()
        for a_pos, a_child in self.target.children():
            for b_pos, b_child in self.reference.children():
                inclusion_score = self.compare(
                    a_child, b_child, substituents=self.substituents,
                    tolerance=self.tolerance, visited=self.visited)
                if inclusion_score > 0:
                    combinations[a_child.id, b_child.id] = inclusion_score
        return combinations

    def build_unique_index_pairs(self, pairs):
        '''
        Generate all unique non-overlapping sets of pairs, given in
        `pairs`
        '''
        depth = 0
        pairings = defaultdict(set)
        for a, b in pairs:
            pairings[a].add(b)
        next_current = [()]
        options_a = list(pairings)
        partial_solutions = set()
        while depth < len(options_a):
            for current in next_current:
                if len(current) > 0:
                    current_a, current_b = map(set, zip(*current))
                else:
                    current_b = set()
                components = set()
                a = options_a[depth]
                for b in pairings[a] - current_b:
                    components.add((current + ((a, b),)))
                partial_solutions.update(components)
            depth += 1
            next_current = partial_solutions
            partial_solutions = set()
        return list(next_current)

    def optimal_assignment(self, assignments, required_nodes=None):
        index_pair_sets = self.build_unique_index_pairs(assignments)
        best_score = -float('inf')
        best_mapping = None
        for assignment in index_pair_sets:
            current_score = 0
            has_nodes = set()
            for ix in assignment:
                current_score += assignments[ix]
                has_nodes.add(ix[0])
            if current_score > best_score and has_nodes == required_nodes:
                best_score = current_score
                best_mapping = assignment
        return best_mapping, best_score


topological_inclusion = TopologicalInclusionMatcher.compare


[docs]def exact_ordering_inclusion(target, reference, substituents=True, tolerance=0, visited=None): ''' A generalization of :meth:`~Monosaccharide.exact_ordering_equality` which allows for ``target`` to be matched to ``reference``, but for ``reference`` to include more. Consequently, this method is not commutative. Parameters ---------- target: :class:`~.Monosaccharide` The monosaccharide to compare reference: :class:`~.Monosaccharide` The monosaccharide to compare against substituents: :class:`bool`, optional Whether or not to compare with the substituents of each node. Defaults to :const:`True`. tolerance: :class:`float`, optional The maximum difference to permit between nodes for inclusion. Defaults to 0 Returns ------- :class:`float`: The similarity score between the two structures. A score of 0 means no inclusion, and a non-zero score means inclusion, with larger scores indicating more node pairs matching. See Also -------- commutative_similarity_score_with_tolerance topological_inclusion ''' if visited is None: visited = set() if (target.id, reference.id) in visited: return True node_score, similar = commutative_similarity_score_with_tolerance( target, reference, tolerance, include_substituents=substituents) if similar: if substituents: reference_substituents = dict(reference.substituents()) for a_pos, a_sub in target.substituents(): b_sub = reference_substituents.get(a_pos) if b_sub is None: # pragma: no cover return 0 if a_sub != b_sub: # pragma: no cover return 0 reference_mods = dict(reference.modifications.items()) for a_pos, a_mod in target.modifications.items(): b_mod = reference_mods.get(a_pos) if b_mod is None: # pragma: no cover return 0 if a_mod != b_mod: # pragma: no cover return 0 reference_children = dict(reference.children()) for pos, a_child in target.children(): b_child = reference_children.get(pos) if b_child is None: # pragma: no cover return 0 if a_child[0] == b_child[0]: match_score = exact_ordering_inclusion(a_child, b_child, substituents=substituents, tolerance=tolerance, visited=visited) if not match_score: return 0 node_score += match_score else: return 0 return node_score
[docs]def subtree_of(subtree, tree, exact=False, include_substituents=True, tolerance=0): ''' Test to see if `subtree` is included in `tree` anywhere. Returns the node id number of the first occurence of `subtree` included in `tree` or |None| if it is not found. Parameters ---------- subtree: :class:`~.Glycan` The structure to search for. The search attempts to match the complete structure of subtree. tree: :class:`~.Glycan` The sturcture to search in. The search iterates over each residue in `tree` and calls a comparator function, comparing the `subtree` to the substructure rooted at that residue. exact: :class:`bool` If |True|, use :func:`exact_ordering_inclusion` to compare nodes. Otherwise use :func:`topological_inclusion`. Defaults to |False|. Returns ------- |int| or |None| if no match ''' if exact: comparator = exact_ordering_inclusion else: comparator = topological_inclusion tree_root = root(subtree) for node in tree: if comparator(tree_root, node, substituents=include_substituents, tolerance=tolerance): return node.id return None
[docs]def find_matching_subtree_roots(subtree, tree, exact=False, include_substituents=True, tolerance=0): ''' Find the list of nodes where occurences of `subtree` included in `tree` are rooted. Parameters ---------- subtree: :class:`~.Glycan` The structure to search for. The search attempts to match the complete structure of subtree. tree: :class:`~.Glycan` The sturcture to search in. The search iterates over each residue in `tree` and calls a comparator function, comparing the `subtree` to the substructure rooted at that residue. exact: :class:`bool` If |True|, use :func:`exact_ordering_inclusion` to compare nodes. Otherwise use :func:`topological_inclusion`. Defaults to |False|. Returns ------- :class:`list` of :class:`~.Monosaccharide` ''' if exact: comparator = exact_ordering_inclusion else: comparator = topological_inclusion tree_root = root(subtree) matched_nodes = [] for node in tree: if comparator(tree_root, node, substituents=include_substituents, tolerance=tolerance): matched_nodes.append(node) return matched_nodes
[docs]def walk_with(query, reference, visited=None, comparator=commutative_similarity, include_substituents=True): """Walk the `query` along `reference`, yielding successive matched nodes along a subtree of `reference` using a :term:`Exact Matching` traversal. This function provides slightly more detail than :func:`subtree_of` as it exposes every step along the subgraph traversal, rather than just the root. Parameters ---------- query : :class:`Glycan` The query structure to search with reference : :class:`Glycan` The reference structure to search in visited : :class:`set`, optional The set of node id pairs to ignore comparator : :class:`Callable`, optional The :class:`Callable` object which can compare two :class:`~.Monosaccharide` Yields ------ (:class:`~.Monosaccharide`, :class:`~.Monosaccharide`) Pairs of matched nodes along a path """ if visited is None: visited = set() query_root = root(query) reference_root = root(reference) node_stack = deque() if comparator(query_root, reference_root, include_substituents=include_substituents): node_stack.append((reference_root, query_root)) while len(node_stack) != 0: rnode, qnode = node_stack.pop() key = (rnode.id, qnode.id) if key in visited: continue visited.add(key) # yield the next step along the matched path yield rnode, qnode for p, rlink in rnode.links.items(): rparent = rlink.parent qparent = None rchild = rlink.child qchild = None # query link position is known if p != UnknownPosition: # check the parent of the link for matches, potentially flowing # through a cycle if one is present or flowing down the tree if # not starting at the root for qlink in qnode.links[p]: if comparator(qlink.parent, rparent): qparent = qlink.parent break if qparent is not None: key = (rparent.id, qparent.id) if key not in visited: node_stack.append((rparent, qparent)) # check the child of the link for matches, proceding down the tree for qlink in qnode.links[p]: if comparator(qlink.child, rchild): qchild = qlink.child break if qchild is not None: key = (rchild.id, qchild.id) if key not in visited: node_stack.append((rchild, qchild)) else: for qlink in qnode.links.values(): if comparator(qlink.parent, rparent) and comparator(qlink.child, rchild): if rnode is rparent: if (rchild.id, qlink.child.id) in visited: continue elif rnode is rchild: if (rparent.id, qlink.parent.id) in visited: continue qparent = qlink.parent qchild = qlink.child break if qparent is not None: key = (rparent.id, qparent.id) if key not in visited: node_stack.append((rparent, qparent)) key = (rchild.id, qchild.id) if key not in visited: node_stack.append((rchild, qchild))