Source code for orangecanvas.scheme.scheme

"""
===============
Scheme Workflow
===============

The :class:`Scheme` class defines a DAG (Directed Acyclic Graph) workflow.

"""
import types
import logging
from contextlib import ExitStack
from itertools import product
from operator import itemgetter
from collections import deque

import typing
from typing import List, Tuple, Optional, Set, Dict, Any, Mapping

from AnyQt.QtCore import QObject, QCoreApplication
from AnyQt.QtCore import pyqtSignal as Signal, pyqtProperty as Property

from .node import SchemeNode
from .link import SchemeLink, compatible_channels, resolved_valid_types, \
    _classify_connection
from .annotations import BaseSchemeAnnotation

from ..utils import check_arg, type_lookup_

from .errors import (
    SchemeCycleError, IncompatibleChannelTypeError, SinkChannelError,
    DuplicatedLinkError
)
from . import events


from ..registry import WidgetDescription, InputSignal, OutputSignal

if typing.TYPE_CHECKING:
    T = typing.TypeVar("T")

log = logging.getLogger(__name__)


[docs]class Scheme(QObject): """ An :class:`QObject` subclass representing the scheme widget workflow with annotations. Parameters ---------- parent : :class:`QObject` A parent QObject item (default `None`). title : str The scheme title. description : str A longer description of the scheme. env: Mapping[str, Any] Extra workflow environment definition (application defined). Attributes ---------- nodes : list of :class:`.SchemeNode` A list of all the nodes in the scheme. links : list of :class:`.SchemeLink` A list of all links in the scheme. annotations : list of :class:`BaseSchemeAnnotation` A list of all the annotations in the scheme. """ # Flags indicating if loops are allowed in the workflow. NoLoops, AllowLoops, AllowSelfLoops = 0, 1, 2 # Signal emitted when a `node` is added to the scheme. node_added = Signal(SchemeNode) # Signal emitted when a `node` is removed from the scheme. node_removed = Signal(SchemeNode) # Signal emitted when a `link` is added to the scheme. link_added = Signal(SchemeLink) # Signal emitted when a `link` is removed from the scheme. link_removed = Signal(SchemeLink) # Signal emitted when a `annotation` is added to the scheme. annotation_added = Signal(BaseSchemeAnnotation) # Signal emitted when a `annotation` is removed from the scheme. annotation_removed = Signal(BaseSchemeAnnotation) # Signal emitted when the title of scheme changes. title_changed = Signal(str) # Signal emitted when the description of scheme changes. description_changed = Signal(str) #: Signal emitted when the associated runtime environment changes runtime_env_changed = Signal(str, object, object) def __init__(self, parent=None, title="", description="", env={}, **kwargs): # type: (Optional[QObject], str, str, Mapping[str, Any], Any) -> None super().__init__(parent, **kwargs) #: Workflow title (empty string by default). self.__title = title or "" #: Workflow description (empty string by default). self.__description = description or "" self.__annotations = [] # type: List[BaseSchemeAnnotation] self.__nodes = [] # type: List[SchemeNode] self.__links = [] # type: List[SchemeLink] self.__loop_flags = Scheme.NoLoops self.__env = dict(env) # type: Dict[str, Any] @property def nodes(self): # type: () -> List[SchemeNode] """ A list of all nodes (:class:`.SchemeNode`) currently in the scheme. """ return list(self.__nodes) @property def links(self): # type: () -> List[SchemeLink] """ A list of all links (:class:`.SchemeLink`) currently in the scheme. """ return list(self.__links) @property def annotations(self): # type: () -> List[BaseSchemeAnnotation] """ A list of all annotations (:class:`.BaseSchemeAnnotation`) in the scheme. """ return list(self.__annotations) def set_loop_flags(self, flags): self.__loop_flags = flags def loop_flags(self): return self.__loop_flags
[docs] def set_title(self, title): # type: (str) -> None """ Set the scheme title text. """ if self.__title != title: self.__title = title self.title_changed.emit(title)
def title(self): """ The title (human readable string) of the scheme. """ return self.__title title = Property(str, fget=title, fset=set_title) # type: ignore
[docs] def set_description(self, description): # type: (str) -> None """ Set the scheme description text. """ if self.__description != description: self.__description = description self.description_changed.emit(description)
def description(self): """ Scheme description text. """ return self.__description description = Property( # type: ignore str, fget=description, fset=set_description)
[docs] def add_node(self, node): # type: (SchemeNode) -> None """ Add a node to the scheme. An error is raised if the node is already in the scheme. Parameters ---------- node : :class:`.SchemeNode` Node instance to add to the scheme. """ assert isinstance(node, SchemeNode) check_arg(node not in self.__nodes, "Node already in scheme.") self.__nodes.append(node) ev = events.NodeEvent(events.NodeEvent.NodeAdded, node) QCoreApplication.sendEvent(self, ev) log.info("Added node %r to scheme %r." % (node.title, self.title)) self.node_added.emit(node)
[docs] def new_node(self, description, title=None, position=None, properties=None): # type: (WidgetDescription, str, Tuple[float, float], dict) -> SchemeNode """ Create a new :class:`.SchemeNode` and add it to the scheme. Same as:: scheme.add_node(SchemeNode(description, title, position, properties)) Parameters ---------- description : :class:`WidgetDescription` The new node's description. title : str, optional Optional new nodes title. By default `description.name` is used. position : Tuple[float, float] Optional position in a 2D space. properties : dict, optional A dictionary of optional extra properties. See also -------- .SchemeNode, Scheme.add_node """ if isinstance(description, WidgetDescription): node = SchemeNode(description, title=title, position=position, properties=properties) else: raise TypeError("Expected %r, got %r." % \ (WidgetDescription, type(description))) self.add_node(node) return node
[docs] def remove_node(self, node): # type: (SchemeNode) -> SchemeNode """ Remove a `node` from the scheme. All links into and out of the `node` are also removed. If the node in not in the scheme an error is raised. Parameters ---------- node : :class:`.SchemeNode` Node instance to remove. """ check_arg(node in self.__nodes, "Node is not in the scheme.") self.__remove_node_links(node) self.__nodes.remove(node) ev = events.NodeEvent(events.NodeEvent.NodeRemoved, node) QCoreApplication.sendEvent(self, ev) log.info("Removed node %r from scheme %r." % (node.title, self.title)) self.node_removed.emit(node) return node
def __remove_node_links(self, node): # type: (SchemeNode) -> None """ Remove all links for node. """ links_in, links_out = [], [] for link in self.__links: if link.source_node is node: links_out.append(link) elif link.sink_node is node: links_in.append(link) for link in links_out + links_in: self.remove_link(link)
[docs] def check_connect(self, link): # type: (SchemeLink) -> None """ Check if the `link` can be added to the scheme and raise an appropriate exception. Can raise: - :class:`.SchemeCycleError` if the `link` would introduce a loop in the graph which does not allow loops. - :class:`.IncompatibleChannelTypeError` if the channel types are not compatible - :class:`.SinkChannelError` if a sink channel has a `Single` flag specification and the channel is already connected. - :class:`.DuplicatedLinkError` if a `link` duplicates an already present link. """ if not self.loop_flags() & Scheme.AllowSelfLoops and \ link.source_node is link.sink_node: raise SchemeCycleError("Cannot create self cycle in the scheme") elif not self.loop_flags() & Scheme.AllowLoops and \ self.creates_cycle(link): raise SchemeCycleError("Cannot create cycles in the scheme") if not self.compatible_channels(link): raise IncompatibleChannelTypeError( "Cannot connect %r to %r." \ % (link.source_channel.type, link.sink_channel.type) ) links = self.find_links(source_node=link.source_node, source_channel=link.source_channel, sink_node=link.sink_node, sink_channel=link.sink_channel) if links: raise DuplicatedLinkError( "A link from %r (%r) -> %r (%r) already exists" \ % (link.source_node.title, link.source_channel.name, link.sink_node.title, link.sink_channel.name) ) if link.sink_channel.single: links = self.find_links(sink_node=link.sink_node, sink_channel=link.sink_channel) if links: raise SinkChannelError( "%r is already connected." % link.sink_channel.name )
[docs] def creates_cycle(self, link): # type: (SchemeLink) -> bool """ Return `True` if `link` would introduce a cycle in the scheme. Parameters ---------- link : :class:`.SchemeLink` """ assert isinstance(link, SchemeLink) source_node, sink_node = link.source_node, link.sink_node upstream = self.upstream_nodes(source_node) upstream.add(source_node) return sink_node in upstream
[docs] def compatible_channels(self, link): # type: (SchemeLink) -> bool """ Return `True` if the channels in `link` have compatible types. Parameters ---------- link : :class:`.SchemeLink` """ assert isinstance(link, SchemeLink) return compatible_channels(link.source_channel, link.sink_channel)
[docs] def can_connect(self, link): # type: (SchemeLink) -> bool """ Return `True` if `link` can be added to the scheme. See also -------- Scheme.check_connect """ assert isinstance(link, SchemeLink) try: self.check_connect(link) return True except (SchemeCycleError, IncompatibleChannelTypeError, SinkChannelError, DuplicatedLinkError): return False
[docs] def upstream_nodes(self, start_node): # type: (SchemeNode) -> Set[SchemeNode] """ Return a set of all nodes upstream from `start_node` (i.e. all ancestor nodes). Parameters ---------- start_node : :class:`.SchemeNode` """ visited = set() # type: Set[SchemeNode] queue = deque([start_node]) while queue: node = queue.popleft() snodes = [link.source_node for link in self.input_links(node)] for source_node in snodes: if source_node not in visited: queue.append(source_node) visited.add(node) visited.remove(start_node) return visited
[docs] def downstream_nodes(self, start_node): # type: (SchemeNode) -> Set[SchemeNode] """ Return a set of all nodes downstream from `start_node`. Parameters ---------- start_node : :class:`.SchemeNode` """ visited = set() # type: Set[SchemeNode] queue = deque([start_node]) while queue: node = queue.popleft() snodes = [link.sink_node for link in self.output_links(node)] for source_node in snodes: if source_node not in visited: queue.append(source_node) visited.add(node) visited.remove(start_node) return visited
[docs] def is_ancestor(self, node, child): # type: (SchemeNode, SchemeNode) -> bool """ Return True if `node` is an ancestor node of `child` (is upstream of the child in the workflow). Both nodes must be in the scheme. Parameters ---------- node : :class:`.SchemeNode` child : :class:`.SchemeNode` """ return child in self.downstream_nodes(node)
[docs] def children(self, node): # type: (SchemeNode) -> Set[SchemeNode] """ Return a set of all children of `node`. """ return set(link.sink_node for link in self.output_links(node))
[docs] def parents(self, node): # type: (SchemeNode) -> Set[SchemeNode] """ Return a set of all parents of `node`. """ return set(link.source_node for link in self.input_links(node))
def find_links(self, source_node=None, source_channel=None, sink_node=None, sink_channel=None): # type: (Optional[SchemeNode], Optional[OutputSignal], Optional[SchemeNode], Optional[InputSignal]) -> List[SchemeLink] # TODO: Speedup - keep index of links by nodes and channels result = [] def match(query, value): # type: (Optional[T], T) -> bool return query is None or value == query for link in self.__links: if match(source_node, link.source_node) and \ match(sink_node, link.sink_node) and \ match(source_channel, link.source_channel) and \ match(sink_channel, link.sink_channel): result.append(link) return result
[docs] def add_annotation(self, annotation): # type: (BaseSchemeAnnotation) -> None """ Add an annotation (:class:`BaseSchemeAnnotation` subclass) instance to the scheme. """ assert isinstance(annotation, BaseSchemeAnnotation) if annotation in self.__annotations: raise ValueError("Cannot add the same annotation multiple times") self.__annotations.append(annotation) ev = events.AnnotationEvent(events.AnnotationEvent.AnnotationAdded, annotation) QCoreApplication.sendEvent(self, ev) self.annotation_added.emit(annotation)
[docs] def remove_annotation(self, annotation): # type: (BaseSchemeAnnotation) -> None """ Remove the `annotation` instance from the scheme. """ self.__annotations.remove(annotation) ev = events.AnnotationEvent(events.AnnotationEvent.AnnotationRemoved, annotation) QCoreApplication.sendEvent(self, ev) self.annotation_removed.emit(annotation)
[docs] def clear(self): # type: () -> None """ Remove all nodes, links, and annotation items from the scheme. """ def is_terminal(node): # type: (SchemeNode) -> bool return not bool(self.find_links(source_node=node)) while self.nodes: terminal_nodes = filter(is_terminal, self.nodes) for node in terminal_nodes: self.remove_node(node) for annotation in self.annotations: self.remove_annotation(annotation) assert not (self.nodes or self.links or self.annotations)
[docs] def sync_node_properties(self): # type: () -> None """ Called before saving, allowing a subclass to update/sync. The default implementation does nothing. """ pass
[docs] def save_to(self, stream, pretty=True, **kwargs): """ Save the scheme as an xml formatted file to `stream` See also -------- readwrite.scheme_to_ows_stream """ with ExitStack() as exitstack: if isinstance(stream, str): stream = exitstack.enter_context(open(stream, "wb")) self.sync_node_properties() readwrite.scheme_to_ows_stream(self, stream, pretty, **kwargs)
[docs] def load_from(self, stream, *args, **kwargs): """ Load the scheme from xml formatted `stream`. Any extra arguments are passed to `readwrite.scheme_load` See Also -------- readwrite.scheme_load """ if self.__nodes or self.__links or self.__annotations: raise ValueError("Scheme is not empty.") with ExitStack() as exitstack: if isinstance(stream, str): stream = exitstack.enter_context(open(stream, "rb")) readwrite.scheme_load(self, stream, *args, **kwargs)
[docs] def set_runtime_env(self, key, value): # type: (str, Any) -> None """ Set a runtime environment variable `key` to `value` """ oldvalue = self.__env.get(key, None) if value != oldvalue: self.__env[key] = value self.runtime_env_changed.emit(key, value, oldvalue)
[docs] def get_runtime_env(self, key, default=None): # type: (str, Any) -> Any """ Return a runtime environment variable for `key`. """ return self.__env.get(key, default)
[docs] def runtime_env(self): # type: () -> Mapping[str, Any] """ Return (a view to) the full runtime environment. The return value is a types.MappingProxyType of the underlying environment dictionary. Changes to the env. will be reflected in it. """ return types.MappingProxyType(self.__env)
[docs] class WindowGroup(types.SimpleNamespace): name = None # type: str default = None # type: bool state = None # type: List[Tuple[SchemeNode, bytes]] def __init__(self, name="", default=False, state=[]): super().__init__(name=name, default=default, state=state)
[docs] def window_group_presets(self): # type: () -> List[Scheme.WindowGroup] """ Return a collection of preset window groups and their encoded states. The base implementation returns an empty list. """ return self.property("_presets") or []
def set_window_group_presets(self, groups): # type: (List[Scheme.WindowGroup]) -> None self.setProperty("_presets", groups)
from . import readwrite