Source code for orangecanvas.scheme.signalmanager

"""
=================================
SignalManager (``signalmanager``)
=================================

A SignalManager instance handles the runtime signal propagation between
widgets in a scheme workflow.

"""
import os
import logging
import warnings
import enum
import functools

from collections import defaultdict
from operator import attrgetter
from functools import partial, reduce
from itertools import chain

import typing
from typing import (
    Any, Optional, List, NamedTuple, Set, Dict, Callable,
    Sequence, Union, DefaultDict, Type
)

from AnyQt.QtCore import QObject, QTimer, QSettings, QEvent
from AnyQt.QtCore import pyqtSignal, pyqtSlot as Slot

from . import LinkEvent
from ..utils import unique, mapping_get, group_by_all
from ..registry import OutputSignal, InputSignal
from .scheme import Scheme, SchemeNode, SchemeLink
from ..utils.graph import traverse_bf, strongly_connected_components

if typing.TYPE_CHECKING:
    V = typing.TypeVar("V")
    K = typing.TypeVar("K")

log = logging.getLogger(__name__)


[docs]class Signal( NamedTuple( "Signal", ( ("link", SchemeLink), ("value", Any), ("id", Any), ("index", int), )) ): """ A signal sent via a link between two nodes. Attributes ---------- link : SchemeLink The link on which the signal is sent value : Any The signal value id : Any .. deprecated:: 0.1.19 index: int Position of the link in sink_node's input links at the time the signal is enqueued or -1 if not applicable. See also -------- InputSignal.flags, OutputSignal.flags """ def __new__(cls, link: SchemeLink, value: Any, id: Any = None, index: int = -1): return super().__new__(cls, link, value, id, index) @property def channel(self) -> InputSignal: """Alias for `self.link.sink_channel`""" return self.link.sink_channel New: 'Type[New]' Update: 'Type[Update]' Close: 'Type[Close]'
class New(Signal): ... class Update(Signal): ... class Close(Signal): ... Signal.New = New Signal.Update = Update Signal.Close = Close is_enabled = attrgetter("enabled") class _LazyValueType: """ LazyValue is an abstract type for wrapper for lazy evaluation of signals. LazyValue is intended for situations in which computation of outputs is reasonably fast, but we won't to compute it only if the output is connected to some input, in order to save memory. Assume the widget has a method `commit` that outputs the sum of two objects, `self.a` and `self.b`. The output signal is names `signal_name` and its type is `SomeType`. ``` def commit(self): self.send(self.Outputs.signal_name, self.a + self.b) ``` To use lazy values, we modify the method as follows. ``` def commit(self): def f(): return self.a + self.b self.send(self.Outputs.signal_name, LazySignal[SomeType](f)) ``` The lazy function receives no arguments, so `commit` will often prepare some data accessible through closure or default arguments. After calling the function, LazyValue will release the reference to function, which in turn releases references to any data from closure or arguments. LazyValue is a singleton, used in similar way as generic classes from typing. "Indexing" returns an instance of (internal) class `LazyValue_`. Indexing is cached; `LazyValue[SomeType]` always returns the same object. LazySignal[SomeType] (that is: LazyValue_) has a constructor that expects the following arguments. - A function that computes the actual value. This function must expect no arguments, but will usually get data (for instance `self`, in the above example) from closure. - An optional function that can be called to interrupt the computation. This function is called when the signal is deleted. - Optional extra arguments that are stored as LazyValue's attributes. These are not accessible by the above function and are primarily intended to be used in output summaries. Properties: - `is_cached()`, which returns `True` if the value is already computed. Functions for output summaries can use this to show more information if the value is available, and avoid computing it when not. Methods: - `get_value()` returns the actual value by calling the function, if the value has not been computed yet, or providing the cached value. - `type()` returns the type of the lazy signal (e.g. `SomeType`, in above case. """ class LazyValueMeta(type): def __repr__(cls): """ Pretty-prints the LazyValue[SomeType] as "LazyValue[SomeType]" instead of generic `LazyValue_`. """ return f"LazyValue[{cls.type().__name__}]" @classmethod def is_lazy(cls, value): """ Tells whether the given value is lazy. ``` >>> def f(): ... return 12 ... >>> lazy = LazyValue[int](f) >>> eager = f() >>> LazyValue.is_lazy(lazy) True >>> LazyValue.is_lazy(eager) False ``` """ return isinstance(type(value), cls.LazyValueMeta) @classmethod @functools.lru_cache(maxsize=None) def __getitem__(cls, type_): # This is cached, so that it always returns the same class for the # same type. # >>> t1 = LazyValue[int] # >>> t2 = LazyValue[int] # >>> t1 is t2 # True class LazyValue_(metaclass=cls.LazyValueMeta): __type = type_ def __init__(self, func: Callable, interrupt=None, **extra_attrs): self.__func = func self.__cached = None self.interrupt = interrupt self.__dict__.update(extra_attrs) def __del__(self): if self.interrupt is not None: self.interrupt() @property def is_cached(self): return self.__func is None @classmethod def type(cls): return cls.__type def get_value(self): if self.__func is not None: self.__cached = self.__func() # This frees any references to closure and arguments self.__func = None return self.__cached return LazyValue_ LazyValue = _LazyValueType() class _OutputState: """Output state for a single node/channel""" __slots__ = ('flags', 'outputs') #: Flag indicating the output on the channel is invalidated. Invalidated = 1 def __init__(self): self.outputs = defaultdict() self.flags = 0 def __repr__(self): return "State(flags={}, outputs={!r})".format( self.flags, dict(self.outputs) ) __str__ = __repr__ class _LinkExtra: """Extra data tracked for a SchemeLink""" __slots__ = ("flags",) DidScheduleNew = 1 def __init__(self, flags=0): self.flags = flags
[docs]class SignalManager(QObject): """ SignalManager handles the runtime signal propagation for a :class:`.Scheme` instance. Note ---- If a scheme instance is passed as a parent to the constructor it is also set as the workflow model. """
[docs] class State(enum.IntEnum): """ SignalManager state flags. .. seealso:: :func:`SignalManager.state()` """ #: The manager is running, i.e. it propagates signals Running = 0 #: The manager is stopped. It does not track node output changes, #: and does not deliver signals to dependent nodes Stopped = 1 #: The manager is paused. It still tracks node output changes, but #: does not deliver new signals to dependent nodes. The pending signals #: will be delivered once it enters Running state again Paused = 2
#: The manager is running, i.e. it propagates signals Running = State.Running #: The manager is stopped. It does not track node ouput changes, #: and does not deliver signals to dependent nodes Stopped = State.Stopped #: The manager is paused. It still tracks node output changes, but #: does not deliver new signals to dependent nodes. The pending signals #: will be delivered once it enters Running state again Paused = State.Paused # unused; back-compatibility Error = 3
[docs] class RuntimeState(enum.IntEnum): """ SignalManager runtime state. See Also -------- SignalManager.runtime_state """ #: Waiting, idle state. The signal queue is empty Waiting = 0 #: ... Processing = 1
Waiting = RuntimeState.Waiting Processing = RuntimeState.Processing #: Emitted when the state of the signal manager changes. stateChanged = pyqtSignal(int) #: Emitted when signals are added to the queue. updatesPending = pyqtSignal() #: Emitted right before a `SchemeNode` instance has its inputs updated. processingStarted = pyqtSignal([], [SchemeNode]) #: Emitted right after a `SchemeNode` instance has had its inputs updated. processingFinished = pyqtSignal([], [SchemeNode]) #: Emitted when `SignalManager`'s runtime state changes. runtimeStateChanged = pyqtSignal(int) #: Emitted when the execution finishes (there are no more nodes that #: need to run). Note: the nodes can activate again due to user #: interaction or other scheduled events, i.e. finished is not a definitive #: state. Use at your own discretion. finished = pyqtSignal() #: Emitted when starting initial execution and when resuming after already #: emitting `finished`. started = pyqtSignal() def __init__(self, parent=None, *, max_running=None, **kwargs): # type: (Optional[QObject], Optional[int], Any) -> None super().__init__(parent, **kwargs) self.__workflow = None # type: Optional[Scheme] self.__input_queue = [] # type: List[Signal] # mapping a node to its current outputs self.__node_outputs = {} # type: Dict[SchemeNode, DefaultDict[OutputSignal, _OutputState]] #: Extra link state self.__link_extra = defaultdict(_LinkExtra) # type: DefaultDict[SchemeLink, _LinkExtra] self.__state = SignalManager.Running self.__runtime_state = SignalManager.Waiting self.__update_timer = QTimer(self, interval=100, singleShot=True) self.__update_timer.timeout.connect(self.__process_next) self.__max_running = max_running self.__has_finished = True if isinstance(parent, Scheme): self.set_workflow(parent) def _can_process(self): # type: () -> bool """ Return a bool indicating if the manger can enter the main processing loop. """ return self.__state not in [SignalManager.Error, SignalManager.Stopped]
[docs] def workflow(self): # type: () -> Optional[Scheme] """ Return the :class:`Scheme` instance. """ return self.__workflow
#: Alias scheme = workflow
[docs] def set_workflow(self, workflow): # type: (Scheme) -> None """ Set the workflow model. Parameters ---------- workflow : Scheme """ if workflow is self.__workflow: return if self.__workflow is not None: for node in self.__workflow.nodes: node.state_changed.disconnect(self._update) node.removeEventFilter(self) for link in self.__workflow.links: self.__on_link_removed(link) self.__workflow.node_added.disconnect(self.__on_node_added) self.__workflow.node_removed.disconnect(self.__on_node_removed) self.__workflow.link_added.disconnect(self.__on_link_added) self.__workflow.link_removed.disconnect(self.__on_link_removed) self.__workflow.removeEventFilter(self) self.__node_outputs = {} self.__input_queue = [] self.__workflow = workflow if workflow is not None: workflow.node_added.connect(self.__on_node_added) workflow.node_removed.connect(self.__on_node_removed) workflow.link_added.connect(self.__on_link_added) workflow.link_removed.connect(self.__on_link_removed) for node in workflow.nodes: self.__node_outputs[node] = defaultdict(_OutputState) node.state_changed.connect(self._update) node.installEventFilter(self) for link in workflow.links: self.__on_link_added(link) workflow.installEventFilter(self)
[docs] def has_pending(self): # type: () -> bool """ Does the manager have any signals to deliver? """ return bool(self.__input_queue)
[docs] def start(self): # type: () -> None """ Start the update loop. Note ---- The updates will not happen until the control reaches the Qt event loop. """ if self.__state != SignalManager.Running: self.__state = SignalManager.Running self.stateChanged.emit(SignalManager.Running) self._update()
[docs] def stop(self): # type: () -> None """ Stop the update loop. Note ---- If the `SignalManager` is currently in `process_queues` it will still update all current pending signals, but will not re-enter until `start()` is called again. """ if self.__state != SignalManager.Stopped: self.__state = SignalManager.Stopped self.stateChanged.emit(SignalManager.Stopped) self.__update_timer.stop()
[docs] def pause(self): # type: () -> None """ Pause the delivery of signals. """ if self.__state != SignalManager.Paused: self.__state = SignalManager.Paused self.stateChanged.emit(SignalManager.Paused) self.__update_timer.stop()
[docs] def resume(self): # type: () -> None """ Resume the delivery of signals. """ if self.__state == SignalManager.Paused: self.__state = SignalManager.Running self.stateChanged.emit(self.__state) self._update()
[docs] def step(self): # type: () -> None """ Deliver signals to a single node (only applicable while the `state()` is `Paused`). """ if self.__state == SignalManager.Paused: self.process_queued()
[docs] def state(self): # type: () -> State """ Return the current state. Return ------ state : SignalManager.State """ return self.__state
def _set_runtime_state(self, state): # type: (Union[RuntimeState, int]) -> None """ Set the runtime state. Should only be called by `SignalManager` implementations. """ state = SignalManager.RuntimeState(state) if self.__runtime_state != state: self.__runtime_state = state self.runtimeStateChanged.emit(self.__runtime_state)
[docs] def runtime_state(self): # type: () -> RuntimeState """ Return the runtime state. This can be `SignalManager.Waiting` or `SignalManager.Processing`. """ return self.__runtime_state
def __on_node_removed(self, node): # type: (SchemeNode) -> None # remove all pending input signals for node so we don't get # stale references in process_node. # NOTE: This does not remove output signals for this node. In # particular the final 'None' will be delivered to the sink # nodes even after the source node is no longer in the scheme. log.info("Removing pending signals for '%s'.", node.title) self.remove_pending_signals(node) del self.__node_outputs[node] node.state_changed.disconnect(self._update) node.removeEventFilter(self) def __on_node_added(self, node): # type: (SchemeNode) -> None self.__node_outputs[node] = defaultdict(_OutputState) # schedule update pass on state change node.state_changed.connect(self._update) node.installEventFilter(self) def __on_link_added(self, link): # type: (SchemeLink) -> None # push all current source values to the sink link.set_runtime_state(SchemeLink.Empty) state = self.__node_outputs[link.source_node][link.source_channel] link.set_runtime_state_flag( SchemeLink.Invalidated, bool(state.flags & _OutputState.Invalidated) ) signals: List[Signal] = [Signal.New(*s) for s in self.signals_on_link(link)] if not link.is_enabled(): # Send New signals even if disabled. This is changed behaviour # from <0.1.19 where signals were only sent when link was enabled. # Because we need to maintain input consistency we cannot use the # current signal value so replace it with None. signals = [s._replace(value=None) for s in signals] log.info("Scheduling signal data update for '%s'.", link) self._schedule(signals) link.enabled_changed.connect(self.__on_link_enabled_changed) def __on_link_removed(self, link): # type: (SchemeLink) -> None link.enabled_changed.disconnect(self.__on_link_enabled_changed) self.__link_extra.pop(link, None)
[docs] def eventFilter(self, recv: QObject, event: QEvent) -> bool: etype = event.type() if etype == LinkEvent.InputLinkRemoved: event = typing.cast(LinkEvent, event) link = event.link() log.info("Scheduling close signal (%s).", link) signals: List[Signal] = [Signal.Close(link, None, id, event.pos()) for id in self.link_contents(link)] self._schedule(signals) return super().eventFilter(recv, event)
def __on_link_enabled_changed(self, enabled): if enabled: link = self.sender() log.info("Link %s enabled. Scheduling signal data update.", link) self._update_link(link)
[docs] def send(self, node, channel, value, *args, **kwargs): # type: (SchemeNode, OutputSignal, Any, Any, Any) -> None """ Send the `value` on the output `channel` from `node`. Schedule the signal delivery to all dependent nodes Parameters ---------- node : SchemeNode The originating node. channel : OutputSignal The nodes output on which the value is sent. value : Any The value to send, id : Any Signal id. .. deprecated:: 0.1.19 """ if self.__workflow is None: raise RuntimeError("'send' called with no workflow!.") # parse deprecated id parameter from *args, **kwargs. def _id_(id): return id try: id = _id_(*args, **kwargs) except TypeError: id = None else: warnings.warn( "`id` parameter is deprecated and will be removed in v0.2", FutureWarning, stacklevel=2 ) log.debug("%r sending %r (id: %r) on channel %r", node.title, type(value), id, channel.name) scheme = self.__workflow state = self.__node_outputs[node][channel] if state.outputs and id not in state.outputs: raise RuntimeError( "Sending multiple values on the same output channel via " "different ids is no longer supported." ) sigtype: Type[Signal] if id in state.outputs: sigtype = Signal.Update else: sigtype = Signal.New state.outputs[id] = value assert len(state.outputs) == 1 # clear invalidated flag if state.flags & _OutputState.Invalidated: log.debug("%r clear invalidated flag on channel %r", node.title, channel.name) state.flags &= ~_OutputState.Invalidated links = scheme.find_links(source_node=node, source_channel=channel) signals = [] for link in links: extra = self.__link_extra[link] links_in = scheme.find_links(sink_node=link.sink_node) index = links_in.index(link) if not link.is_enabled() and not extra.flags & _LinkExtra.DidScheduleNew: # Send Signal.New with None value. Proper update will be done # when/if the link is re-enabled. signal = Signal.New(link, None, id, index=index) elif link.is_enabled(): signal = sigtype(link, value, id, index=index) else: continue signals.append(signal) link.set_runtime_state_flag(SchemeLink.Invalidated, False) self._schedule(signals)
[docs] def invalidate(self, node, channel): # type: (SchemeNode, OutputSignal) -> None """ Invalidate the `channel` on `node`. The channel is effectively considered changed but unavailable until a new value is sent via `send`. While this state is set the dependent nodes will not be updated. All links originating with this node/channel will be marked with `SchemeLink.Invalidated` flag until a new value is sent with `send`. Parameters ---------- node: SchemeNode The originating node. channel: OutputSignal The channel to invalidate. .. versionadded:: 0.1.8 """ log.debug("%r invalidating channel %r", node.title, channel.name) self.__node_outputs[node][channel].flags |= _OutputState.Invalidated if self.__workflow is None: return links = self.__workflow.find_links( source_node=node, source_channel=channel ) for link in links: link.set_runtime_state(link.runtime_state() | link.Invalidated)
def _schedule(self, signals): # type: (List[Signal]) -> None """ Schedule a list of :class:`Signal` for delivery. """ self.__input_queue.extend(signals) for sig in signals: if isinstance(sig, Signal.New): extra = self.__link_extra[sig.link] extra.flags |= _LinkExtra.DidScheduleNew for link in {sig.link for sig in signals}: # update the SchemeLink's runtime state flags contents = self.link_contents(link) if any(value is not None for value in contents.values()): state = SchemeLink.Active else: state = SchemeLink.Empty link.set_runtime_state(state | SchemeLink.Pending) for node in {sig.link.sink_node for sig in signals}: # type: SchemeNode # update the SchemeNodes's runtime state flags node.set_state_flags(SchemeNode.Pending, True) if signals: self.updatesPending.emit() self._update() def _update_link(self, link): # type: (SchemeLink) -> None """ Schedule update of a single link. """ self._schedule([Signal.Update(*s) for s in self.signals_on_link(link)])
[docs] def process_queued(self, max_nodes=None): # type: (Any) -> None """ Process queued signals. Take the first eligible node from the pending input queue and deliver all scheduled signals. """ if not (max_nodes is None or max_nodes == 1): warnings.warn( "`max_nodes` is deprecated and will be removed in the future", FutureWarning, stacklevel=2) if self.__runtime_state == SignalManager.Processing: raise RuntimeError("Cannot re-enter 'process_queued'") if not self._can_process(): raise RuntimeError("Can't process in state %i" % self.__state) self.process_next()
[docs] def process_next(self): # type: () -> bool """ Process queued signals. Take the first eligible node from the pending input queue and deliver all scheduled signals for it and return `True`. If no node is eligible for update do nothing and return `False`. """ return self.__process_next_helper(use_max_active=False)
[docs] def process_node(self, node): # type: (SchemeNode) -> None """ Process pending input signals for `node`. """ assert self.__runtime_state != SignalManager.Processing signals_in = self.pending_input_signals(node) self.remove_pending_signals(node) signals_in = self.compress_signals(signals_in) log.debug("Processing %r, sending %i signals.", node.title, len(signals_in)) # Clear the link's pending flag. for link in {sig.link for sig in signals_in}: link.set_runtime_state(link.runtime_state() & ~SchemeLink.Pending) def process_dynamic(signals): # type: (List[Signal]) -> List[Signal] """ Process dynamic signals; Update the link's dynamic_enabled flag if the value is valid; replace values that do not type check with `None` """ res = [] for sig in signals: # Check and update the dynamic link state link = sig.link if sig.link.is_dynamic(): enabled = can_enable_dynamic(link, sig.value) link.set_dynamic_enabled(enabled) if not enabled: # Send None instead (clear the link) sig = sig._replace(value=None) res.append(sig) return res signals_in = process_dynamic(signals_in) assert ({sig.link for sig in self.__input_queue} .intersection({sig.link for sig in signals_in}) == set([])) self._set_runtime_state(SignalManager.Processing) self.processingStarted.emit() self.processingStarted[SchemeNode].emit(node) try: self.send_to_node(node, signals_in) finally: node.set_state_flags(SchemeNode.Pending, False) self.processingFinished.emit() self.processingFinished[SchemeNode].emit(node) self._set_runtime_state(SignalManager.Waiting)
[docs] def compress_signals(self, signals): # type: (List[Signal]) -> List[Signal] """ Compress a list of :class:`Signal` instances to be delivered. Before the signal values are delivered to the sink node they can be optionally `compressed`, i.e. values can be merged or dropped depending on the execution semantics. The input list is in the order that the signals were enqueued. The base implementation returns the list unmodified. Parameters ---------- signals : List[Signal] Return ------ signals : List[Signal] """ return signals
[docs] def send_to_node(self, node, signals): # type: (SchemeNode, List[Signal]) -> None """ Abstract. Reimplement in subclass. Send/notify the `node` instance (or whatever object/instance it is a representation of) that it has new inputs as represented by the `signals` list). Parameters ---------- node : SchemeNode signals : List[Signal] """ raise NotImplementedError
[docs] def is_pending(self, node): # type: (SchemeNode) -> bool """ Is `node` (class:`SchemeNode`) scheduled for processing (i.e. it has incoming pending signals). Parameters ---------- node : SchemeNode Returns ------- pending : bool """ return node in [signal.link.sink_node for signal in self.__input_queue]
[docs] def pending_nodes(self): # type: () -> List[SchemeNode] """ Return a list of pending nodes. The nodes are returned in the order they were enqueued for signal delivery. Returns ------- nodes : List[SchemeNode] """ return list(unique(sig.link.sink_node for sig in self.__input_queue))
[docs] def pending_input_signals(self, node): # type: (SchemeNode) -> List[Signal] """ Return a list of pending input signals for node. """ return [signal for signal in self.__input_queue if node is signal.link.sink_node]
[docs] def remove_pending_signals(self, node): # type: (SchemeNode) -> None """ Remove pending signals for `node`. """ for signal in self.pending_input_signals(node): try: self.__input_queue.remove(signal) except ValueError: pass
def __nodes(self): # type: () -> Sequence[SchemeNode] return self.__workflow.nodes if self.__workflow else []
[docs] def blocking_nodes(self): # type: () -> List[SchemeNode] """ Return a list of nodes in a blocking state. """ return [node for node in self.__nodes() if self.is_blocking(node)]
[docs] def invalidated_nodes(self): # type: () -> List[SchemeNode] """ Return a list of invalidated nodes. .. versionadded:: 0.1.8 """ return [node for node in self.__nodes() if self.has_invalidated_outputs(node) or self.is_invalidated(node)]
[docs] def active_nodes(self): # type: () -> List[SchemeNode] """ Return a list of active nodes. .. versionadded:: 0.1.8 """ return [node for node in self.__nodes() if self.is_active(node)]
[docs] def is_blocking(self, node): # type: (SchemeNode) -> bool """ Is the node in `blocking` state. Is it currently in a state where will produce new outputs and therefore no signals should be delivered to dependent nodes until it does so. Also no signals will be delivered to the node until it exits this state. The default implementation returns False. .. deprecated:: 0.1.8 Use a combination of `is_invalidated` and `is_ready`. """ return False
[docs] def is_ready(self, node: SchemeNode) -> bool: """ Is the node in a state where it can receive inputs. Re-implement this method in as subclass to prevent specific nodes from being considered for input update (e.g. they are still initializing runtime resources, executing a non-interruptable task, ...) Note that whenever the implicit state changes the `post_update_request` should be called. The default implementation returns the state of the node's `SchemeNode.NotReady` flag. Parameters ---------- node: SchemeNode """ return not node.test_state_flags(SchemeNode.NotReady)
[docs] def is_invalidated(self, node: SchemeNode) -> bool: """ Is the node marked as invalidated. Parameters ---------- node : SchemeNode Returns ------- state: bool """ return node.test_state_flags(SchemeNode.Invalidated)
[docs] def has_invalidated_outputs(self, node): # type: (SchemeNode) -> bool """ Does node have any explicitly invalidated outputs. Parameters ---------- node: SchemeNode Returns ------- state: bool See also -------- invalidate .. versionadded:: 0.1.8 """ out = self.__node_outputs.get(node) if out is not None: return any(state.flags & _OutputState.Invalidated for state in out.values()) else: return False
[docs] def has_invalidated_inputs(self, node): # type: (SchemeNode) -> bool """ Does the node have any immediate ancestor with invalidated outputs. Parameters ---------- node : SchemeNode Returns ------- state: bool Note ---- The node's ancestors are only computed over enabled links. .. versionadded:: 0.1.8 """ if self.__workflow is None: return False workflow = self.__workflow return any(self.has_invalidated_outputs(link.source_node) for link in workflow.find_links(sink_node=node) if link.is_enabled())
[docs] def is_active(self, node): # type: (SchemeNode) -> bool """ Is the node considered active (executing a task). Parameters ---------- node: SchemeNode Returns ------- active: bool """ return bool(node.state() & SchemeNode.Running)
[docs] def node_update_front(self): # type: () -> Sequence[SchemeNode] """ Return a list of nodes on the update front, i.e. nodes scheduled for an update that have no ancestor which is either itself scheduled for update or is in a blocking state). Note ---- The node's ancestors are only computed over enabled links. """ if self.__workflow is None: return [] workflow = self.__workflow expand = partial(expand_node, workflow) components = strongly_connected_components(workflow.nodes, expand) node_scc = {node: scc for scc in components for node in scc} def isincycle(node): # type: (SchemeNode) -> bool return len(node_scc[node]) > 1 def dependents(node): # type: (SchemeNode) -> List[SchemeNode] return dependent_nodes(workflow, node) # A list of all nodes currently active/executing a non-interruptable # task. blocking_nodes = set(self.blocking_nodes()) # nodes marked as having invalidated outputs (not yet available) invalidated_nodes = set(self.invalidated_nodes()) #: transitive invalidated nodes (including the legacy self.is_blocked #: behaviour - blocked nodes are both invalidated and cannot receive #: new inputs) invalidated_ = reduce( set.union, map(dependents, invalidated_nodes | blocking_nodes), set([]), ) # type: Set[SchemeNode] pending = self.pending_nodes() pending_ = set() for n in pending: depend = set(dependents(n)) if isincycle(n): # a pending node in a cycle would would have a circular # dependency on itself, preventing any progress being made # by the workflow execution. cc = node_scc[n] depend -= set(cc) pending_.update(depend) def has_invalidated_ancestor(node): # type: (SchemeNode) -> bool return node in invalidated_ def has_pending_ancestor(node): # type: (SchemeNode) -> bool return node in pending_ #: nodes that are eligible for update. ready = list(filter( lambda node: not has_pending_ancestor(node) and not has_invalidated_ancestor(node) and not self.is_blocking(node), pending )) return ready
@Slot() def __process_next(self): if not self.__state == SignalManager.Running: log.debug("Received 'UpdateRequest' while not in 'Running' state") return if self.__runtime_state == SignalManager.Processing: # This happens if QCoreApplication.processEvents is called from # the input handlers. A `__process_next` must be rescheduled when # exiting process_queued. log.warning("Received 'UpdateRequest' while in 'process_queued'. " "An update will be re-scheduled when exiting the " "current update.") return if not self.__input_queue: return if self.__has_finished: self.__has_finished = False self.started.emit() if self.__process_next_helper(use_max_active=True): # Schedule another update (will be a noop if nothing to do). self._update() def __process_next_helper(self, use_max_active=True) -> bool: eligible = [n for n in self.node_update_front() if self.is_ready(n)] if not eligible: return False max_active = self.max_active() nactive = len(set(self.active_nodes()) | set(self.blocking_nodes())) log.debug( "Process next, queued signals: %i, nactive: %i " "(max_active: %i)", len(self.__input_queue), nactive, max_active ) _ = lambda nodes: list(map(attrgetter('title'), nodes)) log.debug("Pending nodes: %s", _(self.pending_nodes())) log.debug("Blocking nodes: %s", _(self.blocking_nodes())) log.debug("Invalidated nodes: %s", _(self.invalidated_nodes())) log.debug("Nodes ready for update: %s", _(eligible)) # Select an node that is already running (effectively cancelling # already executing tasks that are immediately updatable) selected_node = None # type: Optional[SchemeNode] for node in eligible: if self.is_active(node): selected_node = node break # Return if over committed, except in the case that the selected_node # is already active. if use_max_active and nactive >= max_active and selected_node is None: return False if selected_node is None: selected_node = eligible[0] self.process_node(selected_node) self.__maybe_emit_finished() return True def _update(self): # type: () -> None """ Schedule processing at a later time. """ if self.__state == SignalManager.Running and \ not self.__update_timer.isActive(): self.__update_timer.start() def __maybe_emit_finished(self): if self.__has_finished: # already emitted finished return if any(chain(self.active_nodes(), self.blocking_nodes(), self.pending_nodes())): return self.__has_finished = True self.finished.emit()
[docs] def post_update_request(self): """ Schedule an update pass. Call this method whenever: * a node's outputs change (note that this is already done by `send`) * any change in the node that influences its eligibility to be picked for an input update (is_ready, is_blocking ...). Multiple update requests are merged into one. """ self._update()
def set_max_active(self, val: int) -> None: if self.__max_running != val: self.__max_running = val self._update() def max_active(self) -> int: value = self.__max_running # type: Optional[int] if value is None: value = mapping_get(os.environ, "MAX_ACTIVE_NODES", int, None) if value is None: s = QSettings() s.beginGroup(__name__) value = s.value("max-active-nodes", defaultValue=1, type=int) if value < 0: ccount = os.cpu_count() if ccount is None: return 1 else: return max(1, ccount + value) else: return max(1, value)
def can_enable_dynamic(link, value): # type: (SchemeLink, Any) -> bool """ Can the a dynamic `link` (:class:`SchemeLink`) be enabled for`value`. """ if LazyValue.is_lazy(value): value = value.get_value() return isinstance(value, link.sink_types()) def compress_signals(signals: List[Signal]) -> List[Signal]: """ Compress a list of signals by dropping 'stale' signals. * Multiple consecutive updates are dropped - preserving only the latest, except when one of the updates had `None` value in which case the `None` update signal is preserved (by historical convention this meant a reset of the input for pending nodes). So for instance if a link had: `1, 2, None, 3` scheduled updates then the list would be compressed to `None, 3`. * Updates preceding a Close signal are dropped - only Close is preserved. See Also -------- SignalManager.compress_signals """ # group by key in reverse order (to preserve order of last update) groups = group_by_all(reversed(signals), key=lambda sig: (sig.link, sig.id)) out: List[Signal] = [] id_to_index = {id(s): i for i, s in enumerate(signals)} for _, signals_rev in groups: signals = compress_single(list(reversed(signals_rev))) out.extend(reversed(signals)) out = list(reversed(out)) assert all(id(s) in id_to_index for s in out), 'Must preserve signal id' # maintain relative order of (surviving) signals return sorted(out, key=lambda s: id_to_index[id(s)]) def compress_single(signals: List[Signal]) -> List[Signal]: def is_none_update(signal: 'Optional[Signal]') -> bool: return is_update(signal) and signal is not None and signal.value is None def is_update(signal: 'Optional[Signal]') -> bool: return isinstance(signal, Update) or type(signal) is Signal def is_close(signal: 'Optional[Signal]') -> bool: return isinstance(signal, Close) out: List[Signal] = [] # 1.) Merge all consecutive updates for i, sig in enumerate(signals): prev = out[-1] if out else None prev_prev = out[-2] if len(out) > 1 else None if is_none_update(prev_prev) and is_update(prev) and is_none_update(sig): # ..., None, X, None --> ..., None out[-2:] = [sig] elif is_none_update(prev_prev) and is_update(prev) and is_update(sig): # ..., None, X, Y -> ..., None, Y out[-1] = sig elif is_none_update(prev) and is_none_update(sig): # ..., None, None -> ..., None out[-1] = sig elif is_none_update(prev) and is_update(sig): # ..., None, X -> ..., None, X out.append(sig) elif is_update(prev) and is_update(sig): # ..., X, Y -> ..., Y out[-1] = sig else: # ..., X -> ..., X out.append(sig) signals = out # Sanity check. There cannot be more then 2 consecutive updates in the # compressed signals queue. for i in range(len(signals) - 3): assert not all(map(is_update, signals[i: i + 3])) out: List[Signal] = [] # 2.) Drop all Update preceding a Close for i, sig in enumerate(signals): prev = out[-1] if out else None prev_prev = out[-2] if len(out) > 1 else None if is_update(prev_prev) and is_update(prev) and is_close(sig): # ..., Y, X, Close --> ..., Close assert is_none_update(prev_prev) out[-2:] = [sig] elif is_update(prev) and is_close(sig): # ..., X, Close -> ..., Close out[-1] = sig else: # ..., X -> ..., X out.append(sig) return out def expand_node(workflow, node): # type: (Scheme, SchemeNode) -> List[SchemeNode] return [link.sink_node for link in workflow.find_links(source_node=node) if link.enabled] def dependent_nodes(scheme, node): # type: (Scheme, SchemeNode) -> List[SchemeNode] """ Return a list of all nodes (in breadth first order) in `scheme` that are dependent on `node`, Note ---- This does not include nodes only reachable by disables links. """ nodes = list(traverse_bf(node, partial(expand_node, scheme))) assert nodes[0] is node # Remove the first item (`node`). return nodes[1:]