Source code for glypy.io.glycoct

# coding: utf-8
'''
A parser for :title-reference:`GlycoCT{condensed}` format.

:title-reference:`GlycoCT{condensed}` is a multi-line format for representing
glycan structures and compositions published in [1]. The format is intended to
be human-readable, easily compressed, and includes a canonicalization algorithm
to ensure that there is only a single representation for a glycan structure.

:title-reference:`GlycoCT{condensed}` can represent glycan structures with ambiguous
or repeating sub-units. The specification includes additional section directives with
support for stochastic sub-units as well as disjoint subgraphs, though these have not
been implemented in :mod:`glypy`.


References
----------
[1] Herget, S., Ranzinger, R., Maass, K., & Lieth, C.-W. V. D. (2008).
    GlycoCT-a unifying sequence format for carbohydrates.
    Carbohydrate Research, 343(12), 2162–2171.
    https://doi.org/10.1016/j.carres.2008.03.011

'''

import re
import warnings
from collections import defaultdict, Counter, deque, namedtuple, OrderedDict
from functools import cmp_to_key

try:
    from collections.abc import Iterator
except ImportError:
    from collections import Iterator

from glypy.utils import (
    opener, StringIO, root as rootp, tree as treep,
    make_counter, invert_dict, uid,
    RootProtocolNotSupportedError)
from glypy.utils.multimap import OrderedMultiMap
from glypy.structure import monosaccharide, substituent, glycan, Modification, constants, UnknownPosition, NoPosition
from glypy.structure.link import Link, AmbiguousLink
from .format_constants_map import (anomer_map, superclass_map,
                                   link_replacement_composition_map,
                                   modification_map, linkage_type_map)
from .file_utils import ParserError
from .tree_builder_utils import (
    decorate_tree,
    undecorate_tree,
    find_root,
    try_int,
    StructurePrecisionEnum,
    AbstractGraphEntryEnum, NodeCollection)
from glypy.composition import Composition

try:
    range = xrange
except NameError:
    pass


__id = id

Glycan = glycan.Glycan
Monosaccharide = monosaccharide.Monosaccharide
Substituent = substituent.Substituent

Configuration = constants.Configuration
Stem = constants.Stem


START = "!START"
REPINNER = "!REPINNER"
UNDINNER = "!UNDINNER"
RES = "RES"
LIN = "LIN"
REP = "REP"
ALT = "ALT"
UND = "UND"
ISO = "ISO"
NON = "NON"

TERMINAL_STATES = {
    RES,
    LIN,
    ISO,
    NON
}

subsituent_start = "s"
base_start = "b"
repeat_start = "r"
alternative_start = "a"

#: Pattern for parsing the lines of the RES section corresponding
#: to individual |Monosaccharide| residues
res_pattern = re.compile(
    r'''
    (?P<anomer>[abxo])?
    (?P<conf_stem>(?:-[dlx][a-z]+)+)?-?
    (?P<superclass>[A-Z]+)-?
    (?P<indices>[0-9x]+:[0-9x]+)
    (?P<modifications>(\|[0-9x,]+:[0-9a-z]+)+)?
    ''', re.VERBOSE)

#: Pattern for parsing the potentially repeated |Configuration| and |Stem|
#: regions of the lines of the RES section.
conf_stem_pattern = re.compile(r'(?P<config>[dlx])(?P<stem>[a-z]+)')

#: Pattern for parsing modifications found on monosaccharide residue
#: lines in the RES section
modification_pattern = re.compile(r"\|?([0-9,x]+):([^\|;\n]+)")


#: Pattern for parsing |Link| lines found in the LIN section
link_pattern = re.compile(
    r'''(?P<doc_index>\d+)?:
    (?P<parent_residue_index>\d+)
    (?P<parent_atom_replaced>[odhnx])
    \((?P<parent_attachment_position>-?[0-9\-\|]+)[\+\-]
        (?P<child_attachment_position>-?[0-9\-\|]+)\)
    (?P<child_residue_index>\d+)
    (?P<child_atom_replaced>[odhnx])
        ''', re.VERBOSE)


#: Special truncation of the :data:`link_pattern` which is used on
#: REP header sections
internal_link_pattern = re.compile(
    r'''(?P<parent_residue_index>\d+)
    (?P<parent_atom_replaced>[odhnx])
    \((?P<parent_attachment_position>-?[0-9\-\|]+)[\+\-]
        (?P<child_attachment_position>-?[0-9\-\|]+)\)
    (?P<child_residue_index>\d+)
    (?P<child_atom_replaced>[odhnx])
    ''',
    re.VERBOSE)

#: Pattern for interpreting the REP# instance header section
rep_header_pattern = re.compile(
    r'''REP(?P<repeat_index>\d+):
    (?P<internal_linkage>.+)
    =(?P<lower_multitude>-?\d+)-(?P<higher_multitude>-?\d+)''', re.VERBOSE)

repeat_line_pattern = re.compile(r"^(?P<graph_index>\d+)r:r(?P<repeat_index>\d+)")

und_header_pattern = re.compile(r'''UND(?P<und_index>\d+):
    (?P<major>\d+(\.\d*)?):
    (?P<minor>\d+(\.\d*)?)
    ''', re.VERBOSE)

und_link_pattern = re.compile(r'''
    (?P<parent_atom_replaced>[odhnx])
    \((?P<parent_attachment_position>-?[0-9\-\|]+)[\+\-]
        (?P<child_attachment_position>-?[0-9\-\|]+)\)
    (?P<child_atom_replaced>[odhnx])
    ''', re.VERBOSE)


[docs]class GlycoCTError(ParserError): """Base error for GlycoCT-based parsing exceptions. """ pass
class GlycoCTSectionUnsupported(GlycoCTError): '''Indicates that the GlycoCT parser has encountered a section that it does not know how to parse. ''' pass class DeferredRetrieval(object): """Callback object to invoke a :class:`GlycoCTGraph` instance's :meth:`get_node` method with a set of stored parameters at a later time. Attributes ---------- direction : AbstractGraphDirectionEnum The direction with which to retieve the node graph : GlycoCTGraph The node graph to call :meth:`GlycoCTGraph.get_node` with id : object The id of the node to retrieve """ def __init__(self, graph, id, direction=None): self.graph = graph self.id = id self.direction = direction def __call__(self): return self.graph.get_node(self.id, self.direction) def __repr__(self): return "{s.__class__.__name__}({s.graph}, {s.id}, {s.direction})".format( s=self) class GlycoCTGraph(object): """A graph to store nodes from parsing GlycoCT text in. Implements a Mapping interface Attributes ---------- graph : dict Mapping from node id to node-like objects id: tuple A pair of (class name, ID) """ def __init__(self, graph=None): if graph is None: graph = dict() self.graph = graph self.id = (self.__class__, uid()) def __repr__(self): return "{self.__class__.__name__}({self.graph})".format(self=self) def __contains__(self, k): return k in self.graph def __getitem__(self, k): return self.get_node(k) def __setitem__(self, k, v): self.put_node(k, v) def __len__(self): return len(self.graph) def keys(self): '''See :meth:`~.Mapping.keys` ''' return self.graph.keys() def values(self): '''See :meth:`~.Mapping.values` ''' return self.graph.values() def items(self): '''See :meth:`~.Mapping.items` ''' return self.graph.items() def clear(self): '''See :meth:`~.MutableMapping.clear` ''' self.graph.clear() def __iter__(self): return iter(self.graph) def get_node(self, id, direction=None): """Get a node by its id value. Parameters ---------- id : object The node's id. Will be case as an :class:`int` direction : object, optional Included for compatibility, ignored. Returns ------- :class:`~.Monosaccharide` or :class:`~.Substituent """ id = int(id) return self.graph[id] def put_node(self, id, value): """Store a node for the given id value. Parameters ---------- id : object The node's id. Will be case as an :class:`int` value : :class:`~.Monosaccharide` or :class:`~.Substituent The node to store """ id = int(id) self.graph[id] = value def form_link(self, parent, child, parent_position, child_position, parent_loss, child_loss, parent_linkage_type=None, child_linkage_type=None, id=None): """Form a :class:`~.Link` between `parent` and `child` with the specified parameters. If more than one position is passed for `parent_position` or `child_position`, an :class:`~.AmbiguousLink` will be created instead. Parameters ---------- parent : :class:`~.Monosaccharide` or :class:`~.Substituent The parent node in the bond child : :class:`~.Monosaccharide` or :class:`~.Substituent The child node in the bond parent_position : list The set of possible positions on the parent node to attach to. child_position : list The set of possible positions on the child node to attach to. parent_loss : str or :class:`~.Compositition` The composition lost from the parent node child_loss : str or :class:`~.Compositition` The composition lost from the child node parent_linkage_type : :class:`~.EnumValue`, optional A :class:`~.LinkageType` entry describing how the linkage is formed on the parent child_linkage_type : :class:`~.EnumValue`, optional A :class:`~.LinkageType` entry describing how the linkage is formed on the child id : object, optional The within-graph unique identifier of the :class:`~.Link` object Returns ------- :class:`~.Link` """ if parent.node_type is Substituent.node_type and\ child.node_type is Monosaccharide.node_type: warnings.warn( "A monosaccharide with a substituent parent has been detected. " "These structures are not fully supported and may not traverse as expected " "by default.", stacklevel=7) if len(parent_position) > 1 or len(child_position) > 1: link_obj = AmbiguousLink( parent, child, parent_position=list(map(int, parent_position)), child_position=list(map(int, child_position)), parent_loss=parent_loss, child_loss=child_loss, id=id, parent_linkage_type=parent_linkage_type, child_linkage_type=child_linkage_type) link_obj.find_open_position() else: link_obj = Link( parent, child, parent_position=int(parent_position[0]), child_position=int(child_position[0]), parent_loss=parent_loss, child_loss=child_loss, parent_linkage_type=parent_linkage_type, child_linkage_type=child_linkage_type) return link_obj def deferred_retrieval(self, id, direction=None): """Construct a :class:`DeferredRetrieval` instance to carry out the :meth:`get_node` at a later time. Parameters ---------- id : object The node's id. Will be case as an :class:`int` direction : object, optional Included for compatibility, ignored. Returns ------- :class:`DeferredRetrieval` """ return DeferredRetrieval(self, id, direction) def __root__(self): return self.find_root_nodes()[0] def find_root_nodes(self): """Find "root" nodes within the graph. Returns ------- list """ roots = [] for _index, node in self.items(): try: if node.parents(): continue except AttributeError: if not isinstance(node, GlycoCTGraph): raise else: if rootp(node).parents(): continue roots.append(node) if not roots: roots.append(sorted(self.items())[0][1]) return roots def visit(self, node, visited=None, fn=None): """Visit `node`, calling `fn` on it, and then call :meth:`visit` on each connected node from `node` that had not previously been visited (tracked in `visited`). If a `node` is actually a :class:`GlycoCTGraph`, it will be traversed. Parameters ---------- node : :class:`~.Monosaccharide` or :class:`~.Substituent` The node to visit. visited : :class:`set`, optional The set of previously visited node ids. If not provided, an empty set will be used. fn : :class:`callable`, optional The function to call on each node. Returns ------- :class:`set`: The visited nodes. """ if visited is None: visited = set() if isinstance(node, GlycoCTGraph): for node in node.find_root_nodes(): self.visit(node, visited, fn) return visited visited.add(node.id) if fn is not None: fn(node, visited) for _position, link in node.links.items(): ref = link.to(node) if ref.id in visited: continue else: self.visit(ref, visited, fn) try: for _position, link in node.substituent_links.items(): ref = link.to(node) if ref.id in visited: continue else: self.visit(ref, visited, fn) except AttributeError: pass return visited def is_fully_connected(self): """Check that the graph is fully connected, meaning it has only one root node. Returns ------- bool """ roots = self.find_root_nodes() visited = self.visit(roots[0]) return len(visited) >= len(self) class GlycoCTGraphStack(GlycoCTGraph): """Represent a stack of :class:`GlycoCTGraph` instances, which may be nested inside another graph. Attributes ---------- stack: list The stack of :class:`GlycoCTGraph` instances in the current state history: list The historical sequence of :class:`GlycoCTGraph` instances, added to but never removed. """ def __init__(self, stack=None, parent=None): # pylint: disable=super-init-not-called if stack is None: stack = deque([GlycoCTSubgraph(parent=parent)]) else: # pragma: no cover _stack = deque() _parent = parent for level in stack: _stack.append(GlycoCTSubgraph(level, parent=_parent)) _parent = _stack[-1] stack = _stack self.stack = stack self.history = list(stack) self.id = (self.__class__, uid()) @property def graph(self): """The top :class:`GlycoCTGraph` on the stack Returns ------- :class:`GlycoCTGraph` """ return self.stack[-1] @property def parent(self): """The graph that contains this one. Returns ------- :class:`GlycoCTGraph` """ return self.stack[0].parent @parent.setter def parent(self, value): self.stack[0].parent = value def get_node(self, id, direction=None): for level in reversed(self.history): try: return level.get_node(id, direction) except KeyError: continue raise KeyError(id) def deferred_retrieval(self, id, direction=None): return DeferredRetrieval(self, id, direction) def add(self, subgraph, parent=None): # pragma: no cover """Add `subgraph` to :attr:`stack`, setting the graph's parent. Parameters ---------- subgraph: :class:`GlycoCTGraph` The graph to add to the stack parent: :class:`GlycoCTGraph`, optional The parent to set for `subgraph`. If :const:`None`, this defaults to `self`. """ if subgraph.parent is None: if parent is None: parent = self subgraph.parent = parent self.push_level(subgraph) def push_level(self, subgraph): """Push `subgraph` onto the stack, and the history. Parameters ---------- subgraph : :class:`GlycoCTGraph` The graph to add. """ self.stack.append(subgraph) self.history.append(subgraph) def pop_level(self): """Pop the last item from :attr:`stack` Returns ------- :class:`GlycoCTGraph` """ return self.stack.pop() def clear(self): self.stack = deque([GlycoCTSubgraph(parent=None)]) self.history = list(self.stack) def find_subgraph_containing(self, id): """Find the first subgraph which contains a node with the query `id` Parameters ---------- id : tuple The node id to find. Returns ------- :class:`GlycoCTGraph` Raises ------ KeyError: If the `id` is not found. """ for level in reversed(self.history): if id in level: return level raise KeyError(id) def __repr__(self): return "{self.__class__.__name__}({self.history})".format(self=self) def _count_subgraph_nodes(self): layer_relations = defaultdict(list) root_layer = self.stack[0] for layer in self.stack: if layer.parent is None: continue else: key = layer.parent.id layer_relations[key].append(layer) # repeated subgraphs add to the node count on a layer, but # the root layer is also the parent of all other subgraphs # which do not have nodes. This is a hacky solution to # count just those with nodes in the root layer. first_children = layer_relations.pop(root_layer.id, []) adjustment = 0 for child in first_children: if isinstance(child, RepeatedGlycoCTSubgraph): adjustment += 1 return sum(map(len, layer_relations.values())) + adjustment def __len__(self): return sum(map(len, self.stack)) - self._count_subgraph_nodes() def find_root_nodes(self): return self.stack[0].find_root_nodes() def is_fully_connected(self): roots = self.find_root_nodes() visited = self.visit(roots[0]) return len(visited) >= len(self) class GlycoCTSubgraph(GlycoCTGraph): """A :class:`GlycoCTGraph` which has an :attr:`parent` for use with :class:`GlycoCTGraphStack`. Attributes ---------- parent: :class:`GlycoCTGraph` The graph that contains this one. """ def __init__(self, graph=None, parent=None): super(GlycoCTSubgraph, self).__init__(graph) assert not isinstance(parent, dict) self.parent = parent def postprocess(self): """Apply arbitrary post-processing before finalizing the subgraph. To be overriden by subclasses. Base implementation is a no-op. """ pass RepeatedMultitude = namedtuple("RepeatedMultitude", "lower upper")
[docs]class RepeatedGlycoCTSubgraph(GlycoCTSubgraph): """Implements the machinery for representing a repeated subgraph in GlycoCT. Attributes ---------- graph_index: int repeast_index: int The ``i``th repeating subgraph in the graph. internal_linkage: object The linkage connecting two repetitions of the subgraph external_linkage: object The linkage connecting from the final repetition and the outside nodes. multitude: :class:`RepeatedMultitude` Holds the lower and upper range of multiplicities this subgraph may be repeated to. repetitions: :class:`~.OrderedDict` The repetitions of this subgraph, materialized during :meth:`postprocess` postponed: :class:`~.deque` A queue of post-processing callbacks. """ def __init__(self, graph_index, repeat_index, internal_linkage=None, external_linkage=None, multitude=None, graph=None, parent=None): super(RepeatedGlycoCTSubgraph, self).__init__(graph, parent) if multitude is None: multitude = RepeatedMultitude(-1, -1) else: multitude = RepeatedMultitude(*multitude) self.graph_index = graph_index self.repeat_index = repeat_index self.internal_linkage = internal_linkage self.external_linkage = external_linkage self.multitude = multitude self.original_graph = None self.repetitions = OrderedDict() self.postponed = deque() def __repr__(self): rep = super(RepeatedGlycoCTSubgraph, self).__repr__() rep = "%s, %d)" % (rep[:-1], self.repeat_index) return rep @property def terminal_node_index(self): """The index of the node that will connect to either the external or internal target node. Returns ------- int """ return int(self.external_linkage['parent_residue_index']) @property def last_repeat_index(self): """The last repeat's index in :attr:`repetitions` Returns ------- int """ sub_unit_indices = sorted(self.repetitions.keys()) terminal_unit_ix = sub_unit_indices[-1] return terminal_unit_ix @property def terminal_node(self): """Retrieves the terminal residue from the subgraph, where outgoing connections start from Returns ------- MoleculeBase """ terminal_unit_ix = self.last_repeat_index parent_graph = self.repetitions[terminal_unit_ix] parent = parent_graph.get((terminal_unit_ix, self.terminal_node_index)) return parent @property def origin_node_index(self): return int(self.external_linkage['child_residue_index']) @property def first_repeat_index(self): """The first repeat's index in :attr:`repetitions` Returns ------- int """ sub_unit_indices = sorted(self.repetitions.keys()) return sub_unit_indices[0] @property def origin_node(self): """Retrieves the root residue from the subgraph, where the incoming external connection ends at. Returns ------- MoleculeBase """ root_unit_ix = self.first_repeat_index child_graph = self.repetitions[root_unit_ix] try: child = child_graph.get((root_unit_ix, self.origin_node_index)) except IndexError: # the root of this subgraph is nested, coming from an expanded # inner subgraph. The index labels recorded in the section header # are not accurate anymore. Attempt to recover by returning the # root node of this subgraph if isinstance(child_graph.root.id[1], tuple): return child_graph.root else: raise return child def postpone(self, f, args): """Queue a callable `f` with `args` to be run during postprocessing. Parameters ---------- f : :class:`Callable` Some callable object args : :class:`Iterable` The arguments to call `f` with """ self.postponed.append((f, args)) def complete_postponed_tasks(self): """Call all of the postponed tasks. """ i = 0 while self.postponed: i += 1 f, args = self.postponed.popleft() f(*args) def __contains__(self, k): if self.original_graph is not None: return k in self.original_graph else: return k in self.graph @property def structure_precision(self): if -1 in self.multitude: return StructurePrecisionEnum.unknown elif self.multitude.lower == self.multitude.upper: return StructurePrecisionEnum.exact return StructurePrecisionEnum.ranging def _build_minigraph(self, graph): sub_unit_indices = sorted(map(try_int, graph.keys())) for key, node in list(graph.items()): if isinstance(node, GlycoCTGraph): node.postprocess() graph[key] = node.origin_node try: root = find_root(graph[sub_unit_indices[0]]) except RootProtocolNotSupportedError: # pragma: no cover raise GlycoCTError("Could not locate subgraph root") # glycan_graph = Glycan(root, index_method=None).clone() glycan_graph = NodeCollection.from_node(root).clone() return glycan_graph def _duplicate_graph(self, graph): glycan_graph = self._build_minigraph(graph) duplicate_graph = {} for k, v in graph.items(): if isinstance(v, GlycoCTSubgraph): duplicate_graph[k] = v else: try: duplicate_graph[k] = glycan_graph.get(v.id) except IndexError: # if the subgraph intersperses repeats and # residues, they won't be connected in the # Glycan object, so we must read them back # from the original graph duplicate_graph[k] = graph[k] return duplicate_graph def postprocess(self, n=None): # pylint: disable=arguments-differ if n is None: if self.multitude.upper != -1: n = self.multitude.upper elif self.multitude.lower != -1: n = self.multitude.lower else: n = 1 if self.structure_precision is not StructurePrecisionEnum.unknown: if not (self.multitude.lower <= n <= self.multitude.upper): # pragma: no cover raise GlycoCTError("{} is not within the range of {}".format(n, self.multitude)) self.original_graph = self._duplicate_graph(self.graph) glycan_graph = self._build_minigraph(self.graph) graph = OrderedDict({1: glycan_graph.clone(index_method=None)}) decorate_tree(graph[1], 1) parent_residue_index = self.terminal_node_index parent_atom_replaced = link_replacement_composition_map[self.internal_linkage["parent_atom_replaced"]] parent_linkage_type = linkage_type_map[self.internal_linkage["parent_atom_replaced"]] parent_attachment_position = self.internal_linkage["parent_attachment_position"] child_residue_index = self.origin_node_index child_atom_replaced = link_replacement_composition_map[self.internal_linkage["child_atom_replaced"]] child_linkage_type = linkage_type_map[self.internal_linkage["child_atom_replaced"]] child_attachment_position = self.internal_linkage["child_attachment_position"] op_stack = [] for i in range(2, n + 1): graph[i] = glycan_graph.clone(index_method=None) graph[i] = decorate_tree(graph[i], i) parent_graph = graph[i - 1] child_graph = graph[i] parent_node = parent_graph.get((i - 1, parent_residue_index)) child_node = child_graph.get((i, child_residue_index)) op_stack.append( (self.form_link, [parent_node, child_node], dict(parent_position=parent_attachment_position, child_position=child_attachment_position, parent_loss=parent_atom_replaced, child_loss=child_atom_replaced, parent_linkage_type=parent_linkage_type, child_linkage_type=child_linkage_type))) for op in op_stack: f, args, kwargs = op f(*args, **kwargs) self.repetitions = graph self.complete_postponed_tasks() def handle_incoming_link(self, parent_getter, parent_position, parent_loss, child_position, child_loss, id=None): child = self.origin_node parent = parent_getter() if parent_loss == Composition("H"): child_loss = Composition("OH") self.form_link( parent, child, parent_position=parent_position, child_position=child_position, parent_loss=parent_loss, child_loss=child_loss, id=id) def handle_outgoing_link(self, child_getter, parent_position, parent_loss, child_position, child_loss, id=None): child = child_getter() parent = self.terminal_node if isinstance(child, RepeatedGlycoCTSubgraph): child.handle_incoming_link( lambda: parent, parent_position=parent_position, child_position=child_position, parent_loss=parent_loss, child_loss=child_loss, id=id) else: self.form_link( parent, child, parent_position=parent_position, child_position=child_position, parent_loss=parent_loss, child_loss=child_loss, id=id) def handle_abstract_subgraph_link(self, parent_getter, child_getter, parent_position, parent_loss, child_position, child_loss, id=None): parent = parent_getter() child = child_getter() self.form_link( parent, child, parent_position=parent_position, child_position=child_position, parent_loss=parent_loss, child_loss=child_loss, id=id) def get_node(self, id, direction=None): id = int(id) if direction == AbstractGraphEntryEnum.parent: child_graph = self.repetitions[self.first_repeat_index] child = child_graph.get((self.first_repeat_index, id)) return child elif direction == AbstractGraphEntryEnum.child: parent_graph = self.repetitions[self.last_repeat_index] parent = parent_graph.get((self.last_repeat_index, id)) return parent elif direction == AbstractGraphEntryEnum.internal or direction is None: return self.graph[id] else: raise Exception("Unknown direction %s" % direction) def __root__(self): # pragma: no cover root_node = find_root(self.repetitions[self.first_repeat_index]) return root_node def find_root_nodes(self): if not self.repetitions: return super(RepeatedGlycoCTSubgraph, self).find_root_nodes() roots = [] for _rep_index, node_set in self.repetitions.items(): for node in node_set: try: if node.parents(): continue except AttributeError: if not isinstance(node, GlycoCTGraph): raise else: if rootp(node).parents(): continue roots.append(node) if not roots: roots.append(sorted(self.items())[0][1]) return roots def prepare_glycan(self): subglycan_start = self.repetitions[self.first_repeat_index] subglycan_start.deindex() return Glycan(subglycan_start.root, index_method=None)
UndeterminedProbability = namedtuple("UndeterminedProbability", "major minor") LinkageSpecification = namedtuple( "LinkageSpecification", [ "id", "parent_residue_index", "parent_atom_replaced", "parent_attachment_position", "child_residue_index", "child_atom_replaced", "child_attachment_position", "parent_linkage_type", "child_linkage_type" ])
[docs]class UndeterminedGlycoCTSubgraph(GlycoCTSubgraph): def __init__(self, und_index, probability=None, parent_ids=None, subtree_linkages=None, graph=None, parent=None): super(UndeterminedGlycoCTSubgraph, self).__init__(graph, parent) if probability is None: probability = UndeterminedProbability(100., 100.) if parent_ids is None: parent_ids = [] if subtree_linkages is None: subtree_linkages = [] self.parent_ids = parent_ids self.subtree_linkages = subtree_linkages def __root__(self): return self[sorted(self.keys())[0]] def postprocess(self): parents = [ self.parent.get_node(parent_id) for parent_id in self.parent_ids] try: child = [rootp(self)] except RootProtocolNotSupportedError: # pragma: no cover raise GlycoCTError("Could not locate subgraph root") linkage = self.subtree_linkages[0] parent_loss = linkage["parent_atom_replaced"] parent_position = linkage["parent_attachment_position"] child_loss = linkage["child_atom_replaced"] child_position = linkage["child_attachment_position"] link_obj = AmbiguousLink( parents, child, parent_position=list(map(int, parent_position)), child_position=list(map(int, child_position)), parent_loss=parent_loss, child_loss=child_loss) try: link_obj.find_open_position() except ValueError: # pragma: no cover if link_obj.child_position == 1 and Modification.Acidic in link_obj.child.modifications[1]: link_obj.child_position = 2 ix = link_obj.child_position_choices.index(1) link_obj.child_position_choices.pop(ix) link_obj.child_position_choices.insert(ix, 2) link_obj.apply() link_obj.find_open_position() else: raise
def _build_graph(glycoct_str): # pragma: no cover rep = StringIO(glycoct_str) inst = GlycoCTReader(rep, completes=False) return next(inst) def extract_composition(parser): from glypy.structure.glycan_composition import ( GlycanComposition, MonosaccharideResidue, SubstituentResidue) store = GlycanComposition() # remove links between layers in the stack for layer in list(parser.stack)[1:]: node = rootp(layer) for _position, link in list(node.links.items()): if link.is_child(node): link.break_link(refund=True) for layer in parser.stack: for node in layer.find_root_nodes(): if isinstance(node, Monosaccharide): node = MonosaccharideResidue.from_monosaccharide(node) elif isinstance(node, Substituent): node = SubstituentResidue(node.name) else: raise ValueError(node) store[node] += 1 return store
[docs]class GlycoCTReader(GlycoCTGraphStack, Iterator): """Parse :title-reference:`GlycoCT{condensed}` text data into |Glycan| objects. The parser implements the :class:`Iterator` interface, yielding successive glycans from a text stream separated by empty lines. The parser can understand fully specified and partially ambiguous structures. When :attr:`allow_repeats` is |True| and a ``REP`` section is encountered, it will be expanded to its minimum multiplicity, or 1 if the minimum is unknown. ``UND`` sections will be connected to the main graph by :class:`~.AmbiguousLink` instead of :class:`~.Link` objects. Attributes ---------- allow_repeats : :class:`bool` Whether or not to permit ``REP`` sections. Defaults to |True| completes : :class:`bool` Whether or not to translate the built graph into a |Glycan| object. Defaults to |True| handle : file-like The text file being read from in_repeat : :class:`bool` Indicates the parser is currently parsing a ``REP`` section's sub-graph in_undetermined : bool Indicates the parser is currently parsing a ``UND`` section's sub-graph postponed : list Holds all the deferred operations for the top-most graph as :class:`callable` objects root : :class:`Monosaccharide` The root node of the produced graph state : str The current state of the parser's state machine structure_class : type The |Glycan| sub-class to produce repeats : dict Maps RES section index to :class:`RepeatedGlycoCTSubgraph` undetermineds : dict Maps UND section index to :class:`UndeterminedGlycoCTSubgraph` """ @classmethod def loads(cls, glycoct_str, structure_class=Glycan, allow_repeats=True): '''Parse results from |str|''' rep = StringIO(glycoct_str) return cls(rep, structure_class=structure_class, allow_repeats=allow_repeats) def __init__(self, stream, structure_class=Glycan, allow_repeats=True, completes=True): super(GlycoCTReader, self).__init__() self._state = None self.state = START self.completes = completes self.handle = opener(stream, "r") self.in_repeat = False self.in_undetermined = False self.repeats = {} self.undetermineds = {} self.postponed = [] self.root = None self._iter = None self.allow_repeats = allow_repeats self.structure_class = structure_class self._index = 0 self._source_line = 0 self._segment_iterator = None self._output_queue = deque() def _read(self): for line in self.handle: self._source_line += 1 for segment in re.split(r"\s|;", line): if not segment.strip(): continue self._current_segment = segment yield segment def _reset(self): self.clear() self.root = None self.postponed = [] self.repeats.clear() self.undetermineds.clear() self.in_repeat = False self._index += 1 def reset(self): if self.completes: self._reset() def __iter__(self): ''' Calls :meth:`parse` and stores it for reuse with :meth:`__next__` ''' if self._iter is None: self._iter = self.parse() return self._iter def next(self): ''' Calls :meth:`parse` if the internal iterator has not been instantiated ''' if self._iter is None: iter(self) return next(self._iter) #: Alias for next. Supports Py3 Iterator interface __next__ = next @property def state(self): return self._state @state.setter def state(self, value): self._state = value def _parse_modifications(self, residue_dict): mods = residue_dict["modifications"] modifications = OrderedMultiMap() if mods is not None: for modp, mod in modification_pattern.findall(mods): positions = modp.split(",") if len(positions) > 1: warnings.warn("Multi-site Modifications are not fully supported") for p in positions: modifications[try_int(p)] = modification_map[mod] is_reduced = "aldi" in modifications[1] if is_reduced: modifications.pop(1, "aldi") is_reduced = monosaccharide.ReducedEnd() else: is_reduced = None return modifications, is_reduced def _parse_conf_stem(self, residue_dict): conf_stem = residue_dict["conf_stem"] if conf_stem is not None: config, stem = zip(*conf_stem_pattern.findall(conf_stem)) else: config = ('x',) stem = ('x',) stem_ = tuple(Stem[s] for s in stem) configuration_ = tuple(Configuration[c] for c in config) return stem_, configuration_ def handle_residue_line(self, line): ''' Handle a base line, creates an instance of |Monosaccharide| and adds it to :attr:`graph` at the given index. Called by :meth:`parse` ''' _, ix, residue_str = re.split(r"^(\d+)b", line, maxsplit=1) residue_dict = res_pattern.search(residue_str).groupdict() modifications, is_reduced = self._parse_modifications(residue_dict) stem_, configuration_ = self._parse_conf_stem(residue_dict) ring_start_, ring_end_ = [ (try_int(i) if i != 'x' else UnknownPosition) for i in residue_dict["indices"].split(":")] anomer_ = anomer_map[residue_dict['anomer']] super_class_ = superclass_map[residue_dict['superclass']] ix = int(ix) residue = monosaccharide.Monosaccharide( fast=True, stem=stem_, modifications=modifications, reduced=is_reduced, configuration=configuration_, ring_start=ring_start_, ring_end=ring_end_, anomer=anomer_, superclass=super_class_, id=ix) self.put_node(ix, residue) if self.root is None: self.root = residue def handle_residue_substituent(self, line): ''' Handle a substituent line, creates an instance of |Substituent| and adds it to :attr:`graph` at the given index. The |Substituent| object is not yet linked to a |Monosaccharide| instance. Called by :meth:`parse` ''' _, ix, subsituent_str = re.split(r"^(\d+)s:", line, maxsplit=1) sub = Substituent(subsituent_str.strip()) self[int(ix)] = sub def handle_blank(self): self._complete_structure() def enter_res(self): if self.state in (START, REPINNER, UNDINNER): pass elif self.state in TERMINAL_STATES: self.in_repeat = False self._complete_structure() else: raise GlycoCTError("Invalid State Transition at line %d" % self._source_line) self.state = RES def enter_lin(self): if self.state != RES: raise GlycoCTError("LIN before RES at line %d" % self._source_line) self.state = LIN def enter_rep(self): if not self.allow_repeats: raise GlycoCTSectionUnsupported( "Repeat are not allowed (set allow_repeats=True to allow them) at line %d" % self._source_line) self.state = REP self.in_repeat = True def enter_und(self): self.state = UND self.in_undetermined = True def parse_link(self, line): link_dict = link_pattern.search(line) if link_dict is not None: link_dict = link_dict.groupdict() else: raise GlycoCTError("Could not interpret link", line) id = link_dict['doc_index'] parent_residue_index = int(link_dict['parent_residue_index']) child_residue_index = int(link_dict['child_residue_index']) parent_atom_replaced = link_replacement_composition_map[link_dict["parent_atom_replaced"]] parent_attachment_position = list(map(int, link_dict["parent_attachment_position"].split("|"))) try: parent_linkage_type = linkage_type_map[link_dict['parent_atom_replaced']] except KeyError: parent_linkage_type = constants.LinkageType.x child_atom_replaced = link_replacement_composition_map[link_dict["child_atom_replaced"]] child_attachment_position = list(map(int, link_dict["child_attachment_position"].split("|"))) try: child_linkage_type = linkage_type_map[link_dict['child_atom_replaced']] except KeyError: child_linkage_type = constants.LinkageType.x return LinkageSpecification( id, parent_residue_index, parent_atom_replaced, parent_attachment_position, child_residue_index, child_atom_replaced, child_attachment_position, parent_linkage_type, child_linkage_type) def handle_linkage(self, line): ''' Handle a linkage line, creates an instance of |Link| and attaches it to the two referenced nodes in :attr:`graph`. The parent node is always an instance of |Monosaccharide|, and the child node may either be an instance of |Monosaccharide| or |Substituent| or |Monosaccharide|. Called by :meth:`parse` See also |Link| for more information on the impact of instantiating a |Link| object. ''' id, parent_residue_index, parent_atom_replaced, parent_attachment_position,\ child_residue_index, child_atom_replaced, child_attachment_position,\ parent_linkage_type, child_linkage_type = self.parse_link(line) parent = self.get_node(parent_residue_index) child = self.get_node(child_residue_index) is_parent_repeat = isinstance(parent, RepeatedGlycoCTSubgraph) is_child_repeat = isinstance(child, RepeatedGlycoCTSubgraph) if is_parent_repeat and is_child_repeat: inner = max([parent, child], key=lambda x: x.repeat_index) if child == inner: def child_getter(): return child.origin_node def parent_getter(): return parent.get_node( parent.terminal_node_index, AbstractGraphEntryEnum.internal) else: def child_getter(): return child.get_node( child.origin_node_index, AbstractGraphEntryEnum.internal) def parent_getter(): return parent.terminal_node if parent_atom_replaced == child_atom_replaced == Composition('H'): parent_atom_replaced = Composition('H') child_atom_replaced = Composition('OH') inner.postpone( inner.handle_abstract_subgraph_link, (parent_getter, child_getter, parent_attachment_position, parent_atom_replaced, child_attachment_position, child_atom_replaced, id) ) elif is_parent_repeat: parent.postpone(parent.handle_outgoing_link, ( self.deferred_retrieval(child_residue_index), parent_attachment_position, parent_atom_replaced, child_attachment_position, child_atom_replaced, id) ) elif is_child_repeat: child.postpone(child.handle_incoming_link, ( self.deferred_retrieval(parent_residue_index), parent_attachment_position, parent_atom_replaced, child_attachment_position, child_atom_replaced, id) ) else: self.form_link( parent, child, parent_position=parent_attachment_position, child_position=child_attachment_position, parent_loss=parent_atom_replaced, child_loss=child_atom_replaced, id=id, parent_linkage_type=parent_linkage_type, child_linkage_type=child_linkage_type) def handle_repeat_stub(self, line): if not self.allow_repeats: raise GlycoCTSectionUnsupported( "Repeat are not allowed (set allow_repeats=True to allow them)") match = repeat_line_pattern.search(line).groupdict() graph_index = try_int(match['graph_index']) repeat_index = try_int(match["repeat_index"]) repeat = RepeatedGlycoCTSubgraph( int(graph_index), int(repeat_index), parent=self.graph) repeat._index = self._index self[graph_index] = repeat self.repeats[repeat_index] = repeat if self.root is None: self.root = repeat def handle_repeat_inner(self, line): if not self.in_repeat: raise GlycoCTError( "Encountered %r outside of REP at line %d" % ( line, self._source_line)) header_dict = rep_header_pattern.search(line).groupdict() repeat_index = int(header_dict['repeat_index']) repeat_record = self.repeats[repeat_index] self.push_level(repeat_record) linkage = internal_link_pattern.search(header_dict['internal_linkage']).groupdict() repeat_record.internal_linkage = linkage repeat_record.external_linkage = linkage repeat_record.multitude = RepeatedMultitude( try_int(header_dict['lower_multitude']), try_int(header_dict['higher_multitude'])) self.state = REPINNER def handle_und_inner(self, line): if not self.in_undetermined: raise GlycoCTError("Encountered %r outside of UND at line %d" % ( line, self._source_line)) header_dict = und_header_pattern.search(line).groupdict() parent_line = next(self._segment_iterator) subtree_linkage_line = next(self._segment_iterator) ids = list(map(int, parent_line.split(":")[1].split("|"))) subtree_linkages = [] match = und_link_pattern.search(subtree_linkage_line.split(":")[1]) if match is None: raise GlycoCTError("Could not interpret UND SubtreeLinkage %r at line %d" % ( subtree_linkage_line, self._source_line)) else: link_dict = match.groupdict() link_dict["parent_atom_replaced"] = link_replacement_composition_map[ link_dict["parent_atom_replaced"]] link_dict["parent_attachment_position"] = list( map(int, link_dict["parent_attachment_position"].split("|"))) link_dict["child_atom_replaced"] = link_replacement_composition_map[ link_dict["child_atom_replaced"]] link_dict["child_attachment_position"] = list( map(int, link_dict["child_attachment_position"].split("|"))) subtree_linkages.append(link_dict) und_index = int(header_dict['und_index']) prob = UndeterminedProbability(float(header_dict['major']), float(header_dict['minor'])) record = UndeterminedGlycoCTSubgraph( und_index, prob, parent_ids=ids, subtree_linkages=subtree_linkages, parent=self) self.undetermineds[und_index] = record self.push_level(record) self.state = UNDINNER def _complete_structure(self): if self.completes: result = self.postprocess() if result is not None: self._output_queue.append(result) self.reset() else: self._output_queue.append(self) def postprocess(self): ''' Handle all deferred operations such as binding together and expanding repeating units. Removes any distinguishing markers on node ids, and constructs a new instance of :attr:`structure_class` from the accumulated graph Returns ------- Glycan ''' for level in reversed(self.history): level.postprocess() for postop in self.postponed: postop[0](*postop[1:]) if self.root is None: return None if self.is_fully_connected(): try: inst = undecorate_tree( self.structure_class( root=rootp(self.root), index_method=None) ).reindex() return inst except RootProtocolNotSupportedError: # pragma: no cover raise GlycoCTError("Could not locate graph root") else: # warnings.warn("The parsed structure was not fully connected. Producing a Composition") return extract_composition(self) def parse(self): ''' Returns an iterator that yields each complete :class:`Glycan` instance from the underlying text stream. ''' # Create a reference to the segment iterator # as late as possible, but bind it to the object # state so it can be referenced independent of this # outermost loop self._segment_iterator = self._read() for line in self._segment_iterator: if line.strip() == "": self.handle_blank() while self._output_queue: yield self._output_queue.popleft() elif RES == line.strip(): self.enter_res() while self._output_queue: yield self._output_queue.popleft() elif LIN == line.strip(): self.enter_lin() elif REP == line.strip(): self.enter_rep() elif UND == line.strip(): self.enter_und() # REP definition block elif line.strip()[:3] == REP: self.handle_repeat_inner(line) elif line.strip()[:3] == UND: self.handle_und_inner(line) elif ALT == line.strip(): raise GlycoCTSectionUnsupported(ALT) elif re.search(r"^(\d+)b", line) and self.state == RES: self.handle_residue_line(line) elif re.search(r"^(\d+)s:", line) and self.state == RES: self.handle_residue_substituent(line) elif re.search(r"^(\d+)r:", line) and self.state == RES: self.handle_repeat_stub(line) elif re.search(r"^(\d+):(\d+)", line) and self.state == LIN: self.handle_linkage(line) else: raise GlycoCTError("Unknown format error: %s on line %d" % (line, self._source_line)) if self.root is not None: self._complete_structure() while self._output_queue: yield self._output_queue.popleft()
GlycoCT = GlycoCTReader def read(stream, structure_class=Glycan, allow_repeats=True): ''' A convenience wrapper for :class:`GlycoCTReader` ''' return GlycoCTReader(stream, structure_class=structure_class, allow_repeats=allow_repeats)
[docs]def load(stream, structure_class=Glycan, allow_repeats=True, allow_multiple=True): # pragma: no cover """Read all structures from the provided text stream. Parameters ---------- stream : file-like The text stream to parse structures from structure_class : type, optional :class:`~.Glycan` subclass to use allow_repeats : bool, optional Whether or not to allow ``REP`` sections Returns ------- :class:`~.Glycan` or :class:`list` of :class:`~.Glycan` """ g = GlycoCTReader(stream, structure_class=structure_class, allow_repeats=allow_repeats) first = next(g) if not allow_multiple: return first second = None try: second = next(g) collection = [first, second] collection.extend(g) return collection except StopIteration: return first
[docs]def loads(text, structure_class=Glycan, allow_repeats=True, allow_multiple=True): """Read all structures from the provided text string. Parameters ---------- text : str The text to parse structures from structure_class : type, optional :class:`~.Glycan` subclass to use allow_repeats : bool, optional Whether or not to allow ``REP`` sections Returns ------- :class:`~.Glycan` or :class:`list` of :class:`~.Glycan` """ text_buffer = StringIO(text) return load(text_buffer, structure_class, allow_repeats, allow_multiple)
def detect_glycoct(string): return string.lstrip()[:3] == "RES" invert_anomer_map = invert_dict(anomer_map) invert_superclass_map = invert_dict(superclass_map) class DictTree(object): def __init__(self, state=START, store=None): if store is None: store = {} self.store = defaultdict(dict, store) self.state = state def __getitem__(self, key): for subtree in self.store.values(): try: return subtree[key] except KeyError: continue return self.store[key] def __setitem__(self, key, value): self.store[self.state][key] = value def __len__(self): return sum(map(len, self.store)) def __iter__(self): return iter(self.store) def keys(self): return self.store.keys() def items(self): vals = list(self.store.values()) acc = vals[0].items() for v in vals[1:]: acc = acc + v.items() return acc def __contains__(self, key): return key in self.store def get(self, key, subtree=None, default=None): if subtree is None: subtree = self.state return self.store[subtree].get(key, default) class GlycoCTWriterBase(object): """Summary Attributes ---------- buffer : file-like The buffer to write structures to. If :attr:`nobuffer` is |True|, this will be a :class:`~.StringIO` object which will be returned on each write. dependencies : :class:`defaultdict` of :class:`dict` Track the relationships between child nodes and their parents. Used during linkage writing full : :class:`bool` Whether or not to traverse :class:`~.Monosaccharide`-:class:`~.Monosaccharide` linkages. index_to_residue : :class:`DictTree` A state-specific mapping from index to :attr:`~.Monosaccharide.id`. lin_accumulator : list Accumulator list of :class:`~.Link` objects. lin_counter : function A stateful counter which when called returns the next integer in a sequence used to index entries in the `LIN` section nobuffer : bool Whether or not the writer was initialized with a write-able buffer res_counter : function A stateful counter which when called returns the next integer in a sequence used to index entries in the `RES` section residue_to_index : :class:`DictTree` A state-specific mapping from :attr:`~.Monosaccharide.id` to index. state : str The current state of the writer structure : :class:`~.SaccharideCollection` The structure currently being written. May be a :class:`~.Monosaccharide`, :class:`~.Glycan`, :class:`~.GlycanComposition`. und_counter : function A stateful counter which when called returns the next integer in a sequence used to index `UND` sections. """ def __init__(self, structure=None, buffer=None, full=True): self.nobuffer = False if buffer is None: self.nobuffer = True buffer = StringIO() self.buffer = buffer self.structure = structure self.full = full self.state = START self._initialize_counters() self._initialize_index_tree() self._initialize_link_trackers() def _initialize_counters(self): self.res_counter = make_counter() self.lin_counter = make_counter() self.und_counter = make_counter() def _initialize_index_tree(self): # Look-ups for mapping RES nodes to objects by section index and id, # respectively self.index_to_residue = DictTree(self.state) self.residue_to_index = DictTree(self.state) def _initialize_link_trackers(self): # Accumulator for linkage indices and mapping linkage indices to # dependent RES indices self.lin_accumulator = [] self.dependencies = defaultdict(dict) @property def structure(self): return self._structure @structure.setter def structure(self, value): if value is None: self._structure = value return try: structure = treep(value) except TypeError: try: root = rootp(value) structure = Glycan(root, index_method=None) except TypeError: raise TypeError("Could not extract or construct a tree structure from %r" % value) self._structure = structure def _reset(self): self.state = START self._initialize_counters() self._initialize_index_tree() self._initialize_link_trackers() if self.nobuffer: self.buffer = StringIO() def _glycoct_sigils(self, link): ''' Helper method for determining which GlycoCT symbols and losses to present ''' parent_loss_str, child_loss_str = link._glycoct_sigils() return parent_loss_str, child_loss_str def handle_link(self, link, ix, parent_ix, child_ix): parent_loss_str, child_loss_str = self._glycoct_sigils(link) if link.has_ambiguous_linkage(): rep = "{ix}:{parent_ix}{parent_loss}({parent_position}+{child_position}){child_ix}{child_loss}" return rep.format( ix=ix, parent_ix=parent_ix, parent_loss=parent_loss_str, parent_position='|'.join(map(str, link.parent_position_choices)), child_ix=child_ix, child_loss=child_loss_str, child_position='|'.join(map(str, link.child_position_choices))) else: rep = "{ix}:{parent_ix}{parent_loss}({parent_position}+{child_position}){child_ix}{child_loss}" return rep.format( ix=ix, parent_ix=parent_ix, parent_loss=parent_loss_str, parent_position=link.parent_position, child_ix=child_ix, child_loss=child_loss_str, child_position=link.child_position) def handle_substituent(self, substituent): # pylint: disable=redefined-outer-name return "s:{0}".format(substituent.name.replace("_", "-")) def _format_monosaccharide(self, monosaccharide): # pylint: disable=redefined-outer-name residue_template = "{ix}b:{anomer}{conf_stem}{superclass}-{ring_start}:{ring_end}{modifications}" # This index is reused many times monosaccharide_index = self.res_counter() # Format individual fields anomer = invert_anomer_map[monosaccharide.anomer] conf_stem = ''.join("-{0}{1}".format(c.name, s.name) for c, s in zip(monosaccharide.configuration, monosaccharide.stem)) if None in monosaccharide.configuration and None in monosaccharide.stem: conf_stem = '' superclass = "-" + invert_superclass_map[monosaccharide.superclass] modifications = '|'.join( "{0}:{1}".format(k, v.name) for k, v in monosaccharide.modifications.items()) null_positions = (UnknownPosition, NoPosition) modifications = "|" + modifications if modifications != "" else "" ring_start = monosaccharide.ring_start if monosaccharide.ring_start not in null_positions else 'x' ring_end = monosaccharide.ring_end if monosaccharide.ring_end not in null_positions else 'x' # The complete monosaccharide residue line residue_str = residue_template.format(ix=monosaccharide_index, anomer=anomer, conf_stem=conf_stem, superclass=superclass, modifications=modifications, ring_start=ring_start, ring_end=ring_end) return residue_str, monosaccharide_index def handle_monosaccharide(self, monosaccharide): # pylint: disable=redefined-outer-name residue_str, monosaccharide_index = self._format_monosaccharide(monosaccharide) res = [residue_str] lin = [] visited_subst = dict() # Construct the substituent lines # and their links for _lin_pos, link_obj in monosaccharide.substituent_links.items(): sub = link_obj.to(monosaccharide) if sub.id not in visited_subst: sub_index = self.res_counter() subst_str = str(sub_index) + self.handle_substituent(sub) res.append(subst_str) visited_subst[sub.id] = sub_index lin.append( self.handle_link( link_obj, self.lin_counter(), monosaccharide_index, visited_subst[sub.id])) return [res, lin, monosaccharide_index] def handle_glycan(self, structure): # pragma: no cover if structure is None: raise GlycoCTError("No structure is ready to be written.") self.lin_accumulator = [] self.buffer.write("RES\n") visited = set() for node in (structure): if node.id in visited: continue visited.add(node.id) res, lin, index = self.handle_monosaccharide(node) self.lin_accumulator.append((index, lin)) self.residue_to_index[node.id] = index self.index_to_residue[index] = node if self.full: for _pos, lin in node.links.items(): if lin.is_child(node): continue self.dependencies[lin.child.id][node.id] = ((self.lin_counter(), lin)) for line in res: self.buffer.write(line + '\n') # If this serialization is not meant to be full # do not visit residues beyond the first. if not self.full: break self.buffer.write("LIN\n") for res_ix, links in self.lin_accumulator: for line in links: self.buffer.write(line + '\n') residue = self.index_to_residue[res_ix] if self.full: for _pos, lin in residue.links.items(): if lin.is_child(residue): continue child_res = lin.child ix, lin = self.dependencies[child_res.id][residue.id] self.buffer.write( self.handle_link(lin, ix, res_ix, self.residue_to_index[child_res.id]) + "\n") return self.buffer def begin_underdetermined(self): if self.state != UND: self.buffer.write("UND\n") self.state = UND self.index_to_residue.state = UND self.residue_to_index.state = UND def _format_subtree_linkage(self, linkage_args): (parent_link_type, parent_position, child_position, child_link_type) = linkage_args return "%s(%d+%d)%s" % (parent_link_type, parent_position, child_position, child_link_type) def _get_viable_und_parents(self): valid_parent_inds = [] for index, node in self.index_to_residue[START].items(): if node.node_type == Monosaccharide.node_type: valid_parent_inds.append(index) return valid_parent_inds def handle_und_header(self, major_probability=100.0, minor_probability=100.0, parent_ids=None, subtree_linkage_args=None): if parent_ids is None: parent_ids = list(self._get_viable_und_parents()) index = self.und_counter() self.buffer.write("UND%d:%0.1f:%0.1f\n" % (index, major_probability, minor_probability)) self.buffer.write( "ParentIDs:%s\n" % ('|'.join(map(str, parent_ids)))) self.buffer.write("SubtreeLinkageID1:%s\n" % ( self._format_subtree_linkage(subtree_linkage_args))) def dump(self): buffer = self.handle_glycan(self.structure) if self.nobuffer: value = buffer.getvalue() self._reset() return value return buffer def write(self, structure): self.structure = structure self._reset() return self.dump() def embed(self, writer): writer.res_counter = self.res_counter writer.lin_counter = self.lin_counter writer.und_counter = self.und_counter writer.index_to_residue = self.index_to_residue writer.buffer = self.buffer writer.state = self.state return writer def _determine_und_linkage_type_glycan_composition(self, glycan_composition): if len(glycan_composition) == 1: keys = list(glycan_composition) key = keys[0] if key.node_type == Substituent.node_type: return "d", "n" else: return "o", "d" else: return "o", "d" def add_glycan_composition(self, glycan_composition): for m in OrderingComparisonContext(self).sort_residues(glycan_composition): for _i in range(glycan_composition[m]): self.add_glycan_composition_single({m: 1}) def add_glycan_composition_single(self, glycan_composition): self.begin_underdetermined() linkage_types = self._determine_und_linkage_type_glycan_composition(glycan_composition) self.handle_und_header(subtree_linkage_args=(linkage_types[0], -1, -1, linkage_types[1])) writer = GlycanCompositionGlycoCTWriter( glycan_composition, self.buffer) self.embed(writer) writer.handle_glycan(glycan_composition) class GlycanCompositionGlycoCTWriter(GlycoCTWriterBase): def __init__(self, structure=None, buffer=None, full=True, standardize=False): super(GlycanCompositionGlycoCTWriter, self).__init__(structure, buffer, full) self.standardize = standardize @property def structure(self): return self._structure def _standardize_substituent_linkage(self, link): if self.standardize: from glypy.io.nomenclature import identity from glypy.structure.named_structures import monosaccharides try: if link.child.name == 'n_acetyl': if identity.is_a(link.parent, monosaccharides.HexNAc): if link.parent_position == -1: link.parent_position = 2 except AttributeError: pass @structure.setter def structure(self, value): from glypy.structure.glycan_composition import GlycanComposition if value is None: self._structure = value return if isinstance(value, (GlycanComposition, dict)): value = GlycanComposition(value) else: try: structure = treep(value) except TypeError: try: root = rootp(value) structure = Glycan(root, index_method=None) except TypeError: raise TypeError("Could not extract or construct a tree structure from %r" % value) value = GlycanComposition.from_glycan(structure) self._structure = value def _unspool(self, mapping): sorter = OrderingComparisonContext(self) order = sorter.sort_residues(mapping.keys(), reverse=True) for key in order: count = mapping[key] if count < 1: continue for _i in range(count): yield key def _write_und_subgraph(self, substituent): # pylint: disable=redefined-outer-name self.handle_und_header(subtree_linkage_args=('o', -1, 1, 'n')) self.buffer.write("RES\n") sub_index = self.res_counter() subst_str = str(sub_index) + self.handle_substituent(substituent) self.buffer.write("%s\n" % subst_str) def handle_link(self, link, ix, parent_ix, child_ix): self._standardize_substituent_linkage(link) return super(GlycanCompositionGlycoCTWriter, self).handle_link(link, ix, parent_ix, child_ix) def handle_glycan(self, structure): if structure is None: raise GlycoCTError("No structure is ready to be written.") from glypy.structure.glycan_composition import ( SubstituentResidue, MolecularComposition) self.lin_accumulator = [] disconnected_substituents = [] molecules = [] nodes = [] for node in self._unspool(structure): if isinstance(node, SubstituentResidue): disconnected_substituents.append(node) continue elif isinstance(node, MolecularComposition): molecules.append(node) continue else: nodes.append(node) if nodes: self.buffer.write("RES\n") for node in nodes: res, lin, index = self.handle_monosaccharide(node) self.lin_accumulator.append((index, lin)) self.residue_to_index[node.id] = index self.index_to_residue[index] = node if self.full: for _pos, lin in node.links.items(): if lin.is_child(node): continue self.dependencies[lin.child.id][node.id] = ((self.lin_counter(), lin)) for line in res: self.buffer.write(line + '\n') # If this serialization is not meant to be full # do not visit residues beyond the first. if not self.full: break # if self.lin_accumulator: if any(linkages for res_ix, linkages in self.lin_accumulator): self.buffer.write("LIN\n") for res_ix, links in self.lin_accumulator: for line in links: self.buffer.write(line + '\n') residue = self.index_to_residue[res_ix] if self.full: for _pos, lin in residue.links.items(): if lin.is_child(residue): continue child_res = lin.child ix, lin = self.dependencies[child_res.id][residue.id] self.buffer.write( self.handle_link(lin, ix, res_ix, self.residue_to_index[child_res.id]) + "\n") if disconnected_substituents: self.begin_underdetermined() for node in disconnected_substituents: self._write_und_subgraph(node) elif disconnected_substituents: for subst in disconnected_substituents: self.buffer.write("RES\n") sub_index = self.res_counter() subst_str = str(sub_index) + self.handle_substituent(subst) self.buffer.write("%s\n" % subst_str) if molecules: raise TypeError("Cannot serialize MolecularComposition to GlycoCT") return self.buffer def all_node_depth(node, visited=None): if visited is None: visited = set() if node.id in visited: # pragma: no cover return 0 visited.add(node.id) depth_count = 1 children = list(node.children()) try: children += list(node.substituents()) except AttributeError: pass if children: depth_count += max(all_node_depth(ch, visited) for p, ch in children) return depth_count class OrderingComparisonContext(object): def __init__(self, parent): self.parent = parent if isinstance(self.structure, Glycan) and not self.structure.has_index(): self.structure.reindex() self.branch_to_terminal_count = self.build_branch_to_terminal_count() @property def structure(self): return self.parent.structure def get_branch_from_link_label(self, link): return link.label[0] def build_branch_to_terminal_count(self): counter = Counter() try: for key in sorted(self.structure.branch_parent_map.keys(), reverse=True): parent = self.structure.branch_parent_map[key] counter[parent] += counter[key] + 1 except AttributeError: pass return counter def _residue_diff(self, res_a, res_b): n_child_residues_a = all_node_depth(res_a) n_child_residues_b = all_node_depth(res_b) diff_child_res = n_child_residues_a - n_child_residues_b if diff_child_res != 0: if diff_child_res < 0: diff_child_res = -1 else: diff_child_res = 1 try: branch_length_a = max((all_node_depth(cr) for p, cr in res_a.children())) except ValueError: branch_length_a = 0 try: branch_length_b = max((all_node_depth(cr) for p, cr in res_b.children())) except ValueError: branch_length_b = 0 diff_longest_branch = branch_length_a - branch_length_b if diff_longest_branch != 0: if diff_longest_branch < 0: diff_longest_branch = -1 else: diff_longest_branch = 1 n_branches_from_a = 0 n_branches_from_b = 0 for link in res_a.links.values(): if link.is_parent(res_a): branch_label = self.get_branch_from_link_label(link) n_branches_from_a = max(n_branches_from_a, self.branch_to_terminal_count[branch_label]) for link in res_b.links.values(): if link.is_parent(res_b): branch_label = self.get_branch_from_link_label(link) n_branches_from_b = max(n_branches_from_b, self.branch_to_terminal_count[branch_label]) diff_n_branches_from = n_branches_from_a - n_branches_from_b if diff_n_branches_from != 0: if diff_n_branches_from < 0: diff_n_branches_from = -1 else: diff_n_branches_from = 1 if res_a == res_b: subtree_diff = 0 else: subtree_a = GlycoCTWriter(Glycan.subtree_from(self.structure, res_a)).dump() subtree_b = GlycoCTWriter(Glycan.subtree_from(self.structure, res_b)).dump() subtree_diff = (subtree_b > subtree_a) - (subtree_b < subtree_a) # cmp(subtree_b, subtree_a) return (diff_child_res, diff_longest_branch, diff_n_branches_from, subtree_diff, subtree_a, subtree_b) def _compare_residue_ordering(self, res_a, res_b): n_child_residues_a = all_node_depth(res_a) n_child_residues_b = all_node_depth(res_b) diff_child_res = n_child_residues_a - n_child_residues_b if diff_child_res != 0: if diff_child_res < 0: return -1 else: return 1 try: branch_length_a = max((all_node_depth(cr) for p, cr in res_a.children())) except ValueError: branch_length_a = 0 try: branch_length_b = max((all_node_depth(cr) for p, cr in res_b.children())) except ValueError: branch_length_b = 0 diff_longest_branch = branch_length_a - branch_length_b if diff_longest_branch != 0: if diff_longest_branch < 0: return -1 else: return 1 n_branches_from_a = 0 n_branches_from_b = 0 for link in res_a.links.values(): if link.is_parent(res_a): branch_label = self.get_branch_from_link_label(link) n_branches_from_a = max(n_branches_from_a, self.branch_to_terminal_count[branch_label]) for link in res_b.links.values(): if link.is_parent(res_b): branch_label = self.get_branch_from_link_label(link) n_branches_from_b = max(n_branches_from_b, self.branch_to_terminal_count[branch_label]) diff_n_branches_from = n_branches_from_a - n_branches_from_b if diff_n_branches_from != 0: if diff_n_branches_from < 0: return -1 else: return 1 if res_a == res_b: return 0 subtree_a = GlycoCTWriter(Glycan.subtree_from(self.structure, res_a)).dump() subtree_b = GlycoCTWriter(Glycan.subtree_from(self.structure, res_b)).dump() return (subtree_b > subtree_a) - (subtree_b < subtree_a) def compare_residue_ordering(self, res_a, res_b): ordered = self._compare_residue_ordering(res_a, res_b) return ordered def _link_diff(self, link_a, link_b): # pragma: no cover parent_pos_a = link_a.parent_position parent_pos_b = link_b.parent_position try: diff_parent = parent_pos_a - parent_pos_b except TypeError as e: print(parent_pos_a, parent_pos_b, link_a, link_b) raise e if diff_parent != 0: if diff_parent < 0: diff_parent = -1 else: diff_parent = 1 child_pos_a = link_a.child_position child_pos_b = link_b.child_position diff_child = child_pos_a - child_pos_b if diff_child != 0: if diff_child < 0: diff_child = -1 else: diff_child = 1 sigils_a = link_a._glycoct_sigils() sigils_b = link_b._glycoct_sigils() diff_sig0 = 0 if sigils_a[0] != sigils_b[0]: diff_sig0 = ord(sigils_a[0]) - ord(sigils_b[0]) if diff_sig0 < 0: diff_sig0 = -1 else: diff_sig0 = 1 diff_sig1 = 0 if sigils_a[1] != sigils_b[1]: diff_sig1 = ord(sigils_a[1]) - ord(sigils_b[1]) if diff_sig1 < 0: diff_sig1 = -1 else: diff_sig1 = 1 child_a = link_a.child child_b = link_b.child ordered = self.compare_residue_ordering(child_a, child_b) return (diff_parent, diff_child, diff_sig0, diff_sig1, ordered) def _compare_link_ordering(self, link_a, link_b): # Ignoring # of links for now since it is difficult # to compute parent_pos_a = link_a.parent_position parent_pos_b = link_b.parent_position try: diff_parent = parent_pos_a - parent_pos_b except TypeError as e: print(parent_pos_a, parent_pos_b, link_a, link_b) raise e if diff_parent != 0: if diff_parent < 0: return -1 else: return 1 child_pos_a = link_a.child_position child_pos_b = link_b.child_position diff_child = child_pos_a - child_pos_b if diff_child != 0: if diff_child < 0: return -1 else: return 1 sigils_a = link_a._glycoct_sigils() sigils_b = link_b._glycoct_sigils() if sigils_a[0] != sigils_b[0]: diff_sig0 = ord(sigils_a[0]) - ord(sigils_b[0]) if diff_sig0 < 0: return -1 else: return 1 if sigils_a[1] != sigils_b[1]: diff_sig1 = ord(sigils_a[1]) - ord(sigils_b[1]) if diff_sig1 < 0: return -1 else: return 1 child_a = link_a.child child_b = link_b.child ordered = self.compare_residue_ordering(child_a, child_b) return ordered def compare_link_ordering(self, link_a, link_b): ordered = self._compare_link_ordering(link_a, link_b) return ordered def sort_links(self, links, reverse=False): return sorted(links, key=cmp_to_key(self.compare_link_ordering), reverse=reverse) def sort_residues(self, residues, reverse=False): return sorted(residues, key=cmp_to_key(self.compare_residue_ordering), reverse=reverse) class SubtreeJourney(object): def __init__(self, links): self.links = links def __repr__(self): template = "{self.__class__.__name__}({self.links!r})" return template.format(self=self) def __iter__(self): return iter(self.links) def __len__(self): return len(self.links) def __getitem__(self, i): return self.links[i] class OrderedSubtreeTraverser(object): def __init__(self, ordering_context): self.ordering_context = ordering_context def visit_link(self, link, ignore_ambiguous=False): if isinstance(link, AmbiguousLink) and len(link.parent_choices) and not ignore_ambiguous: subtree = OrderedSubtreeTraverser(self.ordering_context) return [SubtreeJourney(subtree.visit_subtree(link, True))] node = link.child if node.node_type is Monosaccharide.node_type: next_links = self.visit_monosaccharide(node) else: next_links = self.visit_substituent(node) return next_links def visit_monosaccharide(self, monosaccharide): link_collection = list(monosaccharide.substituent_links.values()) link_collection.extend( [cl for p, cl in monosaccharide.children(links=True)]) links = self.ordering_context.sort_links(link_collection) return links[::-1] def visit_substituent(self, substituent): links = self.ordering_context.sort_links( [cl for p, cl in substituent.children(links=True)]) return links[::-1] def visit_subtree(self, link, ignore_ambiguous=False): visited = set() link_queue = deque([link]) journey = deque() while link_queue: link = link_queue.popleft() # Explicitly add before skipping to avoid double-writing # residues, but including multiple-link cases journey.append(link) if isinstance(link, SubtreeJourney): continue if link.child.id in visited: continue visited.add(link.child.id) link_queue.extend( self.visit_link( link, ignore_ambiguous=ignore_ambiguous)) ignore_ambiguous &= False return journey class OrderRespectingGlycoCTWriter(GlycoCTWriterBase): def __init__(self, structure, buffer=None, full=True): super(OrderRespectingGlycoCTWriter, self).__init__(structure, buffer, full) self.ordering_context = OrderingComparisonContext(self) self.link_queue = deque() def handle_monosaccharide(self, monosaccharide): residue_str, monosaccharide_index = self._format_monosaccharide(monosaccharide) self.index_to_residue[monosaccharide_index] = monosaccharide self.residue_to_index[monosaccharide.id] = monosaccharide_index link_collection = list(monosaccharide.substituent_links.values()) if self.full: link_collection.extend([cl for p, cl in monosaccharide.children(links=True)]) links = self.ordering_context.sort_links(link_collection) self.link_queue.extendleft(links[::-1]) return residue_str def handle_substituent(self, substituent): substituent_index = self.res_counter() self.index_to_residue[substituent_index] = substituent self.residue_to_index[substituent.id] = substituent_index subst_str = "%ss:%s" % (substituent_index, substituent.name.replace("_", "-")) links = self.ordering_context.sort_links([cl for p, cl in substituent.children(links=True)]) self.link_queue.extendleft(links[::-1]) return subst_str def handle_glycan(self, structure): if structure is None: raise GlycoCTError("No structure is ready to be written.") self.process_graph(structure.root) return self.buffer def process_graph(self, root, visited=None, link_queue=None): if visited is None: visited = set() if link_queue is None: link_queue = self.link_queue links_in_order = [] underdetermined_subtrees = [] self.buffer.write("RES\n") if root.node_type is Monosaccharide.node_type: res_str = self.handle_monosaccharide(root) self.buffer.write(res_str + "\n") else: res_str = self.handle_substituent(root) self.buffer.write(res_str + "\n") while link_queue: link = link_queue.popleft() # Explicitly add before skipping to avoid double-writing # residues, but including multiple-link cases links_in_order.append(link) if link.child.id in visited: continue visited.add(link.child.id) if link.child.node_type is Monosaccharide.node_type: line = self.handle_monosaccharide(link.child) else: line = self.handle_substituent(link.child) self.buffer.write(line + "\n") self.buffer.write("LIN\n") for link in links_in_order: if not link.is_substituent_link() and not self.full: continue parent_ix = self.residue_to_index[link.parent.id] child_ix = self.residue_to_index[link.child.id] line = self.handle_link( link, self.lin_counter(), parent_ix, child_ix) self.buffer.write(line + "\n") return visited, underdetermined_subtrees class UNDOrderRespectingGlycoCTWriter(OrderRespectingGlycoCTWriter): def gobble_subtree(self, link): traverser = OrderedSubtreeTraverser(self.ordering_context) return SubtreeJourney(traverser.visit_subtree(link, True)) def process_graph(self, root, visited=None, link_queue=None): if visited is None: visited = set() if link_queue is None: link_queue = self.link_queue links_in_order = [] underdetermined_subtrees = [] self.buffer.write("RES\n") if root.node_type is Monosaccharide.node_type: res_str = self.handle_monosaccharide(root) self.buffer.write(res_str + "\n") else: res_str = self.handle_substituent(root) self.buffer.write(res_str + "\n") while link_queue: link = link_queue.popleft() if isinstance(link, AmbiguousLink) and len(link.parent_choices) > 1: underdetermined_subtrees.append(self.gobble_subtree(link)) continue # Explicitly add before skipping to avoid double-writing # residues, but including multiple-link cases links_in_order.append(link) if link.child.id in visited: continue visited.add(link.child.id) if link.child.node_type is Monosaccharide.node_type: line = self.handle_monosaccharide(link.child) else: line = self.handle_substituent(link.child) self.buffer.write(line + "\n") self.buffer.write("LIN\n") for link in links_in_order: if not link.is_substituent_link() and not self.full: continue parent_ix = self.residue_to_index[link.parent.id] child_ix = self.residue_to_index[link.child.id] line = self.handle_link( link, self.lin_counter(), parent_ix, child_ix) self.buffer.write(line + "\n") return visited, underdetermined_subtrees def handle_glycan(self, structure): if structure is None: raise GlycoCTError("No structure is ready to be written.") visited, underdetermined_subtrees = self.process_graph(structure.root) if underdetermined_subtrees: self.begin_underdetermined() for i, und in enumerate(underdetermined_subtrees, 1): und = deque(und) self.buffer.write("UND%d:100.0:100.0\n" % i) base_link = und.popleft() parent_ids = "|".join([str(self.residue_to_index[p.id]) for p in base_link.parent_choices]) self.buffer.write("ParentIDs:%s\n" % parent_ids) subtree_linkage = self.handle_link(base_link, '-', '', '')[2:] self.buffer.write("SubtreeLinkageID1:%s\n" % subtree_linkage) outer_link_deque = self.link_queue self.link_queue = deque([]) child = base_link.child self.process_graph(child, visited=visited, link_queue=und) self.link_queue = outer_link_deque return self.buffer GlycoCTWriter = UNDOrderRespectingGlycoCTWriter
[docs]def dump(structure, buffer=None): ''' Serialize the |Glycan| into :title-reference:`GlycoCT{condensed}`, using `buffer` to store the result. If `buffer` is |None|, then the function will operate on a newly created :class:`StringIO` object. Parameters ---------- structure: |Glycan| The structure to serialize buffer: file-like or None The stream to write the serialized structure to. If |None|, uses an instance of :class:`StringIO` Returns ------- file-like or str if ``buffer`` is :const:`None` ''' from glypy import GlycanComposition if isinstance(structure, GlycanComposition): return GlycanCompositionGlycoCTWriter(structure, buffer).dump() return GlycoCTWriter(structure, buffer).dump()
[docs]def dumps(structure): ''' Serialize the |Glycan| into :title-reference:`GlycoCT{condensed}`, returning the text as a string. Parameters ---------- structure: |Glycan| The structure to serialize Returns ------- str ''' from glypy import GlycanComposition if isinstance(structure, GlycanComposition): return GlycanCompositionGlycoCTWriter(structure, None).dump() return GlycoCTWriter(structure, None).dump()
def _postprocessed_single_monosaccharide(monosaccharide, convert=True): if convert: monostring = GlycoCTWriterBase(monosaccharide, None, full=False).dump() else: monostring = monosaccharide monostring = monostring.replace("\n", " ") if monostring.endswith("LIN "): monostring = monostring.replace(" LIN ", "") else: monostring = monostring.strip() return monostring Monosaccharide.register_serializer("glycoct", _postprocessed_single_monosaccharide) Glycan.register_serializer("glycoct", dumps) def canonicalize(structure): return loads(dumps(structure))