"""
=========================
User Interaction Handlers
=========================
User interaction handlers for a :class:`~.SchemeEditWidget`.
User interactions encapsulate the logic of user interactions with the
scheme document.
All interactions are subclasses of :class:`UserInteraction`.
"""
import typing
from typing import Optional, Any, Tuple, List, Set, Iterable, Sequence, Dict
import abc
import logging
from functools import reduce
from AnyQt.QtWidgets import (
QApplication, QGraphicsRectItem, QGraphicsSceneMouseEvent,
QGraphicsSceneContextMenuEvent, QWidget, QGraphicsItem,
QGraphicsSceneDragDropEvent, QMenu, QAction, QWidgetAction, QLabel
)
from AnyQt.QtGui import QPen, QBrush, QColor, QFontMetrics, QKeyEvent, QFont
from AnyQt.QtCore import (
Qt, QObject, QCoreApplication, QSizeF, QPointF, QRect, QRectF, QLineF,
QPoint, QMimeData,
)
from AnyQt.QtCore import pyqtSignal as Signal
from orangecanvas.document.commands import UndoCommand
from .usagestatistics import UsageStatistics
from ..registry.description import WidgetDescription, OutputSignal, InputSignal
from ..registry.qt import QtWidgetRegistry, tooltip_helper, whats_this_helper
from .. import scheme
from ..scheme import (
SchemeNode as Node, SchemeLink as Link, Scheme, WorkflowEvent,
compatible_channels
)
from ..canvas import items
from ..canvas.items import controlpoints
from ..gui.quickhelp import QuickHelpTipEvent
from . import commands
from .editlinksdialog import EditLinksDialog
from ..utils import unique
from ..utils.pkgmeta import EntryPoint, entry_points
if typing.TYPE_CHECKING:
from .schemeedit import SchemeEditWidget
A = typing.TypeVar("A")
#: Output/Input pair of a link
OIPair = Tuple[OutputSignal, InputSignal]
log = logging.getLogger(__name__)
def assert_not_none(optional):
# type: (Optional[A]) -> A
assert optional is not None
return optional
[docs]class UserInteraction(QObject):
"""
Base class for user interaction handlers.
Parameters
----------
document : :class:`~.SchemeEditWidget`
An scheme editor instance with which the user is interacting.
parent : :class:`QObject`, optional
A parent QObject
deleteOnEnd : bool, optional
Should the UserInteraction be deleted when it finishes (``True``
by default).
"""
# Cancel reason flags
#: No specified reason
NoReason = 0
#: User canceled the operation (e.g. pressing ESC)
UserCancelReason = 1
#: Another interaction was set
InteractionOverrideReason = 3
#: An internal error occurred
ErrorReason = 4
#: Other (unspecified) reason
OtherReason = 5
#: Emitted when the interaction is set on the scene.
started = Signal()
#: Emitted when the interaction finishes successfully.
finished = Signal()
#: Emitted when the interaction ends (canceled or finished)
ended = Signal()
#: Emitted when the interaction is canceled.
canceled = Signal([], [int])
def __init__(self, document, parent=None, deleteOnEnd=True):
# type: ('SchemeEditWidget', Optional[QObject], bool) -> None
super().__init__(parent)
self.document = document
self.scene = document.scene()
scheme_ = document.scheme()
assert scheme_ is not None
self.scheme = scheme_ # type: scheme.Scheme
self.suggestions = document.suggestions()
self.deleteOnEnd = deleteOnEnd
self.cancelOnEsc = False
self.__finished = False
self.__canceled = False
self.__cancelReason = self.NoReason
[docs] def start(self):
# type: () -> None
"""
Start the interaction. This is called by the :class:`CanvasScene` when
the interaction is installed.
.. note:: Must be called from subclass implementations.
"""
self.started.emit()
[docs] def end(self):
# type: () -> None
"""
Finish the interaction. Restore any leftover state in this method.
.. note:: This gets called from the default :func:`cancel`
implementation.
"""
self.__finished = True
if self.scene.user_interaction_handler is self:
self.scene.set_user_interaction_handler(None)
if self.__canceled:
self.canceled.emit()
self.canceled[int].emit(self.__cancelReason)
else:
self.finished.emit()
self.ended.emit()
if self.deleteOnEnd:
self.deleteLater()
[docs] def cancel(self, reason=OtherReason):
# type: (int) -> None
"""
Cancel the interaction with `reason`.
"""
self.__canceled = True
self.__cancelReason = reason
self.end()
[docs] def isFinished(self):
# type: () -> bool
"""
Is the interaction finished.
"""
return self.__finished
[docs] def isCanceled(self):
# type: () -> bool
"""
Was the interaction canceled.
"""
return self.__canceled
[docs] def cancelReason(self):
# type: () -> int
"""
Return the reason the interaction was canceled.
"""
return self.__cancelReason
[docs] def mousePressEvent(self, event):
# type: (QGraphicsSceneMouseEvent) -> bool
"""
Handle a `QGraphicsScene.mousePressEvent`.
"""
return False
[docs] def mouseMoveEvent(self, event):
# type: (QGraphicsSceneMouseEvent) -> bool
"""
Handle a `GraphicsScene.mouseMoveEvent`.
"""
return False
[docs] def mouseReleaseEvent(self, event):
# type: (QGraphicsSceneMouseEvent) -> bool
"""
Handle a `QGraphicsScene.mouseReleaseEvent`.
"""
return False
[docs] def mouseDoubleClickEvent(self, event):
# type: (QGraphicsSceneMouseEvent) -> bool
"""
Handle a `QGraphicsScene.mouseDoubleClickEvent`.
"""
return False
[docs] def keyPressEvent(self, event):
# type: (QKeyEvent) -> bool
"""
Handle a `QGraphicsScene.keyPressEvent`
"""
if self.cancelOnEsc and event.key() == Qt.Key_Escape:
self.cancel(self.UserCancelReason)
return False
[docs] def keyReleaseEvent(self, event):
# type: (QKeyEvent) -> bool
"""
Handle a `QGraphicsScene.keyPressEvent`
"""
return False
[docs] def dragEnterEvent(self, event):
# type: (QGraphicsSceneDragDropEvent) -> bool
"""
Handle a `QGraphicsScene.dragEnterEvent`
.. versionadded:: 0.1.20
"""
return False
[docs] def dragMoveEvent(self, event):
# type: (QGraphicsSceneDragDropEvent) -> bool
"""
Handle a `QGraphicsScene.dragMoveEvent`
.. versionadded:: 0.1.20
"""
return False
[docs] def dragLeaveEvent(self, event):
# type: (QGraphicsSceneDragDropEvent) -> bool
"""
Handle a `QGraphicsScene.dragLeaveEvent`
.. versionadded:: 0.1.20
"""
return False
[docs] def dropEvent(self, event):
# type: (QGraphicsSceneDragDropEvent) -> bool
"""
Handle a `QGraphicsScene.dropEvent`
.. versionadded:: 0.1.20
"""
return False
class NoPossibleLinksError(ValueError):
pass
class UserCanceledError(ValueError):
pass
def reversed_arguments(func):
"""
Return a function with reversed argument order.
"""
def wrapped(*args):
return func(*reversed(args))
return wrapped
class NewLinkAction(UserInteraction):
"""
User drags a new link from an existing `NodeAnchorItem` to create
a connection between two existing nodes or to a new node if the release
is over an empty area, in which case a quick menu for new node selection
is presented to the user.
"""
# direction of the drag
FROM_SOURCE = 1
FROM_SINK = 2
def __init__(self, document, *args, **kwargs):
super().__init__(document, *args, **kwargs)
self.from_item = None # type: Optional[items.NodeItem]
self.from_signal = None # type: Optional[Union[InputSignal, OutputSignal]]
self.direction = 0 # type: int
self.showing_incompatible_widget = False # type: bool
# An `NodeItem` currently under the mouse as a possible
# link drop target.
self.current_target_item = None # type: Optional[items.NodeItem]
# A temporary `LinkItem` used while dragging.
self.tmp_link_item = None # type: Optional[items.LinkItem]
# An temporary `AnchorPoint` inserted into `current_target_item`
self.tmp_anchor_point = None # type: Optional[items.AnchorPoint]
# An `AnchorPoint` following the mouse cursor
self.cursor_anchor_point = None # type: Optional[items.AnchorPoint]
# An UndoCommand
self.macro = None # type: Optional[UndoCommand]
# Cache viable signals of currently hovered node
self.__target_compatible_signals: Sequence[Tuple[OutputSignal, InputSignal]] = []
self.cancelOnEsc = True
def remove_tmp_anchor(self):
# type: () -> None
"""
Remove a temporary anchor point from the current target item.
"""
assert self.current_target_item is not None
assert self.tmp_anchor_point is not None
if self.direction == self.FROM_SOURCE:
self.current_target_item.removeInputAnchor(self.tmp_anchor_point)
else:
self.current_target_item.removeOutputAnchor(self.tmp_anchor_point)
self.tmp_anchor_point = None
def update_tmp_anchor(self, item, scenePos):
# type: (items.NodeItem, QPointF) -> None
"""
If hovering over a new compatible channel, move it.
"""
assert self.tmp_anchor_point is not None
if self.direction == self.FROM_SOURCE:
signal = item.inputAnchorItem.signalAtPos(scenePos,
self.__target_compatible_signals)
else:
signal = item.outputAnchorItem.signalAtPos(scenePos,
self.__target_compatible_signals)
self.tmp_anchor_point.setSignal(signal)
def create_tmp_anchor(self, item, scenePos):
# type: (items.NodeItem, QPointF) -> None
"""
Create a new tmp anchor at the `item` (:class:`NodeItem`).
"""
assert self.tmp_anchor_point is None
if self.direction == self.FROM_SOURCE:
anchor = item.inputAnchorItem
signal = anchor.signalAtPos(scenePos,
self.__target_compatible_signals)
self.tmp_anchor_point = item.newInputAnchor(signal)
else:
anchor = item.outputAnchorItem
signal = anchor.signalAtPos(scenePos,
self.__target_compatible_signals)
self.tmp_anchor_point = item.newOutputAnchor(signal)
def __possible_connection_signal_pairs(
self, target_item: items.NodeItem
) -> Sequence[Tuple[OutputSignal, InputSignal]]:
"""
Return possible connection signal pairs between current
`self.from_item` and `target_item`.
"""
if self.from_item is None:
return []
node1 = self.scene.node_for_item(self.from_item)
node2 = self.scene.node_for_item(target_item)
if self.direction == self.FROM_SOURCE:
links = self.scheme.propose_links(node1, node2,
source_signal=self.from_signal)
else:
links = self.scheme.propose_links(node2, node1,
sink_signal=self.from_signal)
return [(s1, s2) for s1, s2, _ in links]
def can_connect(self, target_item):
# type: (items.NodeItem) -> bool
"""
Is the connection between `self.from_item` (item where the drag
started) and `target_item` possible.
"""
return bool(self.__possible_connection_signal_pairs(target_item))
def set_link_target_anchor(self, anchor):
# type: (items.AnchorPoint) -> None
"""
Set the temp line target anchor.
"""
assert self.tmp_link_item is not None
if self.direction == self.FROM_SOURCE:
self.tmp_link_item.setSinkItem(None, anchor=anchor)
else:
self.tmp_link_item.setSourceItem(None, anchor=anchor)
def target_node_item_at(self, pos):
# type: (QPointF) -> Optional[items.NodeItem]
"""
Return a suitable :class:`NodeItem` at position `pos` on which
a link can be dropped.
"""
# Test for a suitable `NodeAnchorItem` or `NodeItem` at pos.
if self.direction == self.FROM_SOURCE:
anchor_type = items.SinkAnchorItem
else:
anchor_type = items.SourceAnchorItem
item = self.scene.item_at(pos, (anchor_type, items.NodeItem))
if isinstance(item, anchor_type):
return item.parentNodeItem()
elif isinstance(item, items.NodeItem):
return item
else:
return None
def mousePressEvent(self, event):
# type: (QGraphicsSceneMouseEvent) -> bool
anchor_item = self.scene.item_at(
event.scenePos(), items.NodeAnchorItem
)
if anchor_item is not None and event.button() == Qt.LeftButton:
# Start a new link starting at item
self.from_item = anchor_item.parentNodeItem()
if isinstance(anchor_item, items.SourceAnchorItem):
self.direction = NewLinkAction.FROM_SOURCE
else:
self.direction = NewLinkAction.FROM_SINK
event.accept()
helpevent = QuickHelpTipEvent(
self.tr("Create a new link"),
self.tr('<h3>Create new link</h3>'
'<p>Drag a link to an existing node or release on '
'an empty spot to create a new node.</p>'
'<p>Hold Shift when releasing the mouse button to '
'edit connections.</p>'
# '<a href="help://orange-canvas/create-new-links">'
# 'More ...</a>'
)
)
QCoreApplication.postEvent(self.document, helpevent)
return True
else:
# Whoever put us in charge did not know what he was doing.
self.cancel(self.ErrorReason)
return False
def mouseMoveEvent(self, event):
# type: (QGraphicsSceneMouseEvent) -> bool
if self.tmp_link_item is None:
# On first mouse move event create the temp link item and
# initialize it to follow the `cursor_anchor_point`.
self.tmp_link_item = items.LinkItem()
# An anchor under the cursor for the duration of this action.
self.cursor_anchor_point = items.AnchorPoint()
self.cursor_anchor_point.setPos(event.scenePos())
# Set the `fixed` end of the temp link (where the drag started).
scenePos = event.scenePos()
if self.direction == self.FROM_SOURCE:
anchor = self.from_item.outputAnchorItem
else:
anchor = self.from_item.inputAnchorItem
anchor.setHovered(False)
anchor.setCompatibleSignals(None)
if anchor.anchorOpen():
signal = anchor.signalAtPos(scenePos)
anchor.setKeepAnchorOpen(signal)
else:
signal = None
self.from_signal = signal
if self.direction == self.FROM_SOURCE:
self.tmp_link_item.setSourceItem(self.from_item, signal)
else:
self.tmp_link_item.setSinkItem(self.from_item, signal)
self.set_link_target_anchor(self.cursor_anchor_point)
self.scene.addItem(self.tmp_link_item)
assert self.cursor_anchor_point is not None
# `NodeItem` at the cursor position
item = self.target_node_item_at(event.scenePos())
if self.current_target_item is not None and \
(item is None or item is not self.current_target_item):
# `current_target_item` is no longer under the mouse cursor
# (was replaced by another item or the the cursor was moved over
# an empty scene spot.
log.info("%r is no longer the target.", self.current_target_item)
if self.direction == self.FROM_SOURCE:
anchor = self.current_target_item.inputAnchorItem
else:
anchor = self.current_target_item.outputAnchorItem
if self.showing_incompatible_widget:
anchor.setIncompatible(False)
self.showing_incompatible_widget = False
else:
self.remove_tmp_anchor()
anchor.setHovered(False)
anchor.setCompatibleSignals(None)
self.current_target_item = None
if item is not None and item is not self.from_item:
# The mouse is over a node item (different from the starting node)
if self.current_target_item is item:
# Mouse is over the same item
scenePos = event.scenePos()
# Move to new potential anchor
if not self.showing_incompatible_widget:
self.update_tmp_anchor(item, scenePos)
else:
self.set_link_target_anchor(self.cursor_anchor_point)
elif self.can_connect(item):
# Mouse is over a new item
links = self.__possible_connection_signal_pairs(item)
log.info("%r is the new target.", item)
if self.direction == self.FROM_SOURCE:
self.__target_compatible_signals = [s2 for s1, s2 in links]
item.inputAnchorItem.setCompatibleSignals(
self.__target_compatible_signals)
item.inputAnchorItem.setHovered(True)
else:
self.__target_compatible_signals = [s1 for s1, s2 in links]
item.outputAnchorItem.setCompatibleSignals(
self.__target_compatible_signals)
item.outputAnchorItem.setHovered(True)
scenePos = event.scenePos()
self.create_tmp_anchor(item, scenePos)
self.set_link_target_anchor(
assert_not_none(self.tmp_anchor_point)
)
self.current_target_item = item
self.showing_incompatible_widget = False
else:
log.info("%r does not have compatible channels", item)
self.__target_compatible_signals = []
if self.direction == self.FROM_SOURCE:
anchor = item.inputAnchorItem
else:
anchor = item.outputAnchorItem
anchor.setCompatibleSignals(
self.__target_compatible_signals)
anchor.setHovered(True)
anchor.setIncompatible(True)
self.showing_incompatible_widget = True
self.set_link_target_anchor(self.cursor_anchor_point)
self.current_target_item = item
else:
self.showing_incompatible_widget = item is not None
self.__target_compatible_signals = []
self.set_link_target_anchor(self.cursor_anchor_point)
self.cursor_anchor_point.setPos(event.scenePos())
return True
def mouseReleaseEvent(self, event):
# type: (QGraphicsSceneMouseEvent) -> bool
if self.tmp_link_item is not None:
item = self.target_node_item_at(event.scenePos())
node = None # type: Optional[Node]
stack = self.document.undoStack()
self.macro = UndoCommand(self.tr("Add link"))
if item:
# If the release was over a node item then connect them
node = self.scene.node_for_item(item)
else:
# Release on an empty canvas part
# Show a quick menu popup for a new widget creation.
try:
node = self.create_new(event)
except Exception:
log.error("Failed to create a new node, ending.",
exc_info=True)
node = None
if node is not None:
commands.AddNodeCommand(self.scheme, node,
parent=self.macro)
if node is not None and not self.showing_incompatible_widget:
if self.direction == self.FROM_SOURCE:
source_node = self.scene.node_for_item(self.from_item)
source_signal = self.from_signal
sink_node = node
if item is not None and item.inputAnchorItem.anchorOpen():
sink_signal = item.inputAnchorItem.signalAtPos(
event.scenePos(),
self.__target_compatible_signals
)
else:
sink_signal = None
else:
source_node = node
if item is not None and item.outputAnchorItem.anchorOpen():
source_signal = item.outputAnchorItem.signalAtPos(
event.scenePos(),
self.__target_compatible_signals
)
else:
source_signal = None
sink_node = self.scene.node_for_item(self.from_item)
sink_signal = self.from_signal
self.suggestions.set_direction(self.direction)
self.connect_nodes(source_node, sink_node,
source_signal, sink_signal)
self.reset_open_anchor()
if not self.isCanceled() or not self.isFinished() and \
self.macro is not None:
# Push (commit) the add link/node action on the stack.
stack.push(self.macro)
self.end()
return True
else:
self.end()
return False
def create_new(self, event):
# type: (QGraphicsSceneMouseEvent) -> Optional[Node]
"""
Create and return a new node with a `QuickMenu`.
"""
pos = event.screenPos()
menu = self.document.quickMenu()
node = self.scene.node_for_item(self.from_item)
from_signal = self.from_signal
from_desc = node.description
def is_compatible(
source_signal: OutputSignal,
source: WidgetDescription,
sink: WidgetDescription,
sink_signal: InputSignal
) -> bool:
return any(scheme.compatible_channels(output, input)
for output
in ([source_signal] if source_signal else source.outputs)
for input
in ([sink_signal] if sink_signal else sink.inputs))
from_sink = self.direction == self.FROM_SINK
if from_sink:
# Reverse the argument order.
is_compatible = reversed_arguments(is_compatible)
suggestion_sort = self.suggestions.get_source_suggestions(from_desc.name)
else:
suggestion_sort = self.suggestions.get_sink_suggestions(from_desc.name)
def sort(left, right):
# list stores frequencies, so sign is flipped
return suggestion_sort[left] > suggestion_sort[right]
menu.setSortingFunc(sort)
def filter(index):
desc = index.data(QtWidgetRegistry.WIDGET_DESC_ROLE)
if isinstance(desc, WidgetDescription):
return is_compatible(from_signal, from_desc, desc, None)
else:
return False
menu.setFilterFunc(filter)
menu.triggerSearch()
try:
action = menu.exec(pos)
finally:
menu.setFilterFunc(None)
if action:
item = action.property("item")
desc = item.data(QtWidgetRegistry.WIDGET_DESC_ROLE)
pos = event.scenePos()
# a new widget should be placed so that the connection
# stays as it was
offset = 31 * (-1 if self.direction == self.FROM_SINK else
1 if self.direction == self.FROM_SOURCE else 0)
statistics = self.document.usageStatistics()
statistics.begin_extend_action(from_sink, node)
node = self.document.newNodeHelper(desc,
position=(pos.x() + offset,
pos.y()))
return node
else:
return None
def connect_nodes(
self, source_node: Node, sink_node: Node,
source_signal: Optional[OutputSignal] = None,
sink_signal: Optional[InputSignal] = None
) -> None:
"""
Connect `source_node` to `sink_node`. If there are more then one
equally weighted and non conflicting links possible present a
detailed dialog for link editing.
"""
UsageStatistics.set_sink_anchor_open(sink_signal is not None)
UsageStatistics.set_source_anchor_open(source_signal is not None)
try:
possible = self.scheme.propose_links(source_node, sink_node,
source_signal, sink_signal)
log.debug("proposed (weighted) links: %r",
[(s1.name, s2.name, w) for s1, s2, w in possible])
if not possible:
raise NoPossibleLinksError
source, sink, w = possible[0]
# just a list of signal tuples for now, will be converted
# to SchemeLinks later
links_to_add = [] # type: List[Link]
links_to_remove = [] # type: List[Link]
show_link_dialog = False
# Ambiguous new link request.
if len(possible) >= 2:
# Check for possible ties in the proposed link weights
_, _, w2 = possible[1]
if w == w2:
show_link_dialog = True
# Check for destructive action (i.e. would the new link
# replace a previous link) except for explicit only link
# candidates
if sink.single and w2 > 0 and \
self.scheme.find_links(sink_node=sink_node,
sink_channel=sink):
show_link_dialog = True
if show_link_dialog:
existing = self.scheme.find_links(source_node=source_node,
sink_node=sink_node)
if existing:
# edit_links will populate the view with existing links
initial_links = None
else:
initial_links = [(source, sink)]
try:
rstatus, links_to_add, links_to_remove = self.edit_links(
source_node, sink_node, initial_links
)
except Exception:
log.error("Failed to edit the links",
exc_info=True)
raise
if rstatus == EditLinksDialog.Rejected:
raise UserCanceledError
else:
# links_to_add now needs to be a list of actual SchemeLinks
links_to_add = [
scheme.SchemeLink(source_node, source, sink_node, sink)
]
links_to_add, links_to_remove = \
add_links_plan(self.scheme, links_to_add)
# Remove temp items before creating any new links
self.cleanup()
for link in links_to_remove:
commands.RemoveLinkCommand(self.scheme, link,
parent=self.macro)
for link in links_to_add:
# Check if the new requested link is a duplicate of an
# existing link
duplicate = self.scheme.find_links(
link.source_node, link.source_channel,
link.sink_node, link.sink_channel
)
if not duplicate:
commands.AddLinkCommand(self.scheme, link,
parent=self.macro)
except scheme.IncompatibleChannelTypeError:
log.info("Cannot connect: invalid channel types.")
self.cancel()
except scheme.SchemeTopologyError:
log.info("Cannot connect: connection creates a cycle.")
self.cancel()
except NoPossibleLinksError:
log.info("Cannot connect: no possible links.")
self.cancel()
except UserCanceledError:
log.info("User canceled a new link action.")
self.cancel(UserInteraction.UserCancelReason)
except Exception:
log.error("An error occurred during the creation of a new link.",
exc_info=True)
self.cancel()
def edit_links(
self,
source_node: Node,
sink_node: Node,
initial_links: 'Optional[List[OIPair]]' = None
) -> 'Tuple[int, List[Link], List[Link]]':
"""
Show and execute the `EditLinksDialog`.
Optional `initial_links` list can provide a list of initial
`(source, sink)` channel tuples to show in the view, otherwise
the dialog is populated with existing links in the scheme (passing
an empty list will disable all initial links).
"""
status, links_to_add_spec, links_to_remove_spec = \
edit_links(
self.scheme, source_node, sink_node, initial_links,
parent=self.document
)
if status == EditLinksDialog.Accepted:
links_to_add = [
scheme.SchemeLink(
source_node, source_channel,
sink_node, sink_channel
) for source_channel, sink_channel in links_to_add_spec
]
links_to_remove = list(reduce(
list.__iadd__, (
self.scheme.find_links(
source_node, source_channel,
sink_node, sink_channel
) for source_channel, sink_channel in links_to_remove_spec
),
[]
)) # type: List[Link]
conflicting = [conflicting_single_link(self.scheme, link)
for link in links_to_add]
conflicting = [link for link in conflicting if link is not None]
for link in conflicting:
if link not in links_to_remove:
links_to_remove.append(link)
return status, links_to_add, links_to_remove
else:
return status, [], []
def end(self):
# type: () -> None
self.cleanup()
self.reset_open_anchor()
# Remove the help tip set in mousePressEvent
self.macro = None
helpevent = QuickHelpTipEvent("", "")
QCoreApplication.postEvent(self.document, helpevent)
super().end()
def cancel(self, reason=UserInteraction.OtherReason):
# type: (int) -> None
self.cleanup()
self.reset_open_anchor()
super().cancel(reason)
def cleanup(self):
# type: () -> None
"""
Cleanup all temporary items in the scene that are left.
"""
if self.tmp_link_item:
self.tmp_link_item.setSinkItem(None)
self.tmp_link_item.setSourceItem(None)
if self.tmp_link_item.scene():
self.scene.removeItem(self.tmp_link_item)
self.tmp_link_item = None
if self.current_target_item:
if not self.showing_incompatible_widget:
self.remove_tmp_anchor()
else:
if self.direction == self.FROM_SOURCE:
anchor = self.current_target_item.inputAnchorItem
else:
anchor = self.current_target_item.outputAnchorItem
anchor.setIncompatible(False)
self.current_target_item = None
if self.cursor_anchor_point and self.cursor_anchor_point.scene():
self.scene.removeItem(self.cursor_anchor_point)
self.cursor_anchor_point = None
def reset_open_anchor(self):
"""
This isn't part of cleanup, because it should retain its value
until the link is created.
"""
if self.direction == self.FROM_SOURCE:
anchor = self.from_item.outputAnchorItem
else:
anchor = self.from_item.inputAnchorItem
anchor.setKeepAnchorOpen(None)
def edit_links(
scheme: Scheme,
source_node: Node,
sink_node: Node,
initial_links: 'Optional[List[OIPair]]' = None,
parent: 'Optional[QWidget]' = None
) -> 'Tuple[int, List[OIPair], List[OIPair]]':
"""
Show and execute the `EditLinksDialog`.
Optional `initial_links` list can provide a list of initial
`(source, sink)` channel tuples to show in the view, otherwise
the dialog is populated with existing links in the scheme (passing
an empty list will disable all initial links).
"""
log.info("Constructing a Link Editor dialog.")
dlg = EditLinksDialog(parent, windowTitle="Edit Links")
# all SchemeLinks between the two nodes.
links = scheme.find_links(source_node=source_node, sink_node=sink_node)
existing_links = [(link.source_channel, link.sink_channel)
for link in links]
if initial_links is None:
initial_links = list(existing_links)
dlg.setNodes(source_node, sink_node)
dlg.setLinks(initial_links)
log.info("Executing a Link Editor Dialog.")
rval = dlg.exec()
if rval == EditLinksDialog.Accepted:
edited_links = dlg.links()
# Differences
links_to_add = set(edited_links) - set(existing_links)
links_to_remove = set(existing_links) - set(edited_links)
return rval, list(links_to_add), list(links_to_remove)
else:
return rval, [], []
def add_links_plan(scheme, links, force_replace=False):
# type: (Scheme, Iterable[Link], bool) -> Tuple[List[Link], List[Link]]
"""
Return a plan for adding a list of links to the scheme.
"""
links_to_add = list(links)
links_to_remove = [conflicting_single_link(scheme, link)
for link in links]
links_to_remove = [link for link in links_to_remove if link is not None]
if not force_replace:
links_to_add, links_to_remove = remove_duplicates(links_to_add,
links_to_remove)
return links_to_add, links_to_remove
def conflicting_single_link(scheme, link):
# type: (Scheme, Link) -> Optional[Link]
"""
Find and return an existing link in `scheme` connected to the same
input channel as `link` if the channel has the 'single' flag.
If no such channel exists (or sink channel is not 'single')
return `None`.
"""
if link.sink_channel.single:
existing = scheme.find_links(
sink_node=link.sink_node,
sink_channel=link.sink_channel
)
if existing:
assert len(existing) == 1
return existing[0]
return None
def remove_duplicates(links_to_add, links_to_remove):
# type: (List[Link], List[Link]) -> Tuple[List[Link], List[Link]]
def link_key(link):
# type: (Link) -> Tuple[Node, OutputSignal, Node, InputSignal]
return (link.source_node, link.source_channel,
link.sink_node, link.sink_channel)
add_keys = list(map(link_key, links_to_add))
remove_keys = list(map(link_key, links_to_remove))
duplicate_keys = set(add_keys).intersection(remove_keys)
def not_duplicate(link):
# type: (Link) -> bool
return link_key(link) not in duplicate_keys
links_to_add = list(filter(not_duplicate, links_to_add))
links_to_remove = list(filter(not_duplicate, links_to_remove))
return links_to_add, links_to_remove
class NewNodeAction(UserInteraction):
"""
Present the user with a quick menu for node selection and
create the selected node.
"""
def mousePressEvent(self, event):
# type: (QGraphicsSceneMouseEvent) -> bool
if event.button() == Qt.RightButton:
self.create_new(event.screenPos())
self.end()
return True
def create_new(self, pos, search_text=""):
# type: (QPoint, str) -> Optional[Node]
"""
Create and add new node to the workflow using `QuickMenu` popup at
`pos` (in screen coordinates).
"""
menu = self.document.quickMenu()
menu.setFilterFunc(None)
# compares probability of the user needing the widget as a source
def defaultSort(left, right):
default_suggestions = self.suggestions.get_default_suggestions()
left_frequency = sum(default_suggestions[left].values())
right_frequency = sum(default_suggestions[right].values())
return left_frequency > right_frequency
menu.setSortingFunc(defaultSort)
action = menu.exec(pos, search_text)
if action:
item = action.property("item")
desc = item.data(QtWidgetRegistry.WIDGET_DESC_ROLE)
# Get the scene position
view = self.document.view()
pos = view.mapToScene(view.mapFromGlobal(pos))
statistics = self.document.usageStatistics()
statistics.begin_action(UsageStatistics.QuickMenu)
node = self.document.newNodeHelper(desc,
position=(pos.x(), pos.y()))
self.document.addNode(node)
return node
else:
return None
class RectangleSelectionAction(UserInteraction):
"""
Select items in the scene using a Rectangle selection
"""
def __init__(self, document, *args, **kwargs):
# type: (SchemeEditWidget, Any, Any) -> None
super().__init__(document, *args, **kwargs)
# The initial selection at drag start
self.initial_selection = None # type: Optional[Set[QGraphicsItem]]
# Selection when last updated in a mouseMoveEvent
self.last_selection = None # type: Optional[Set[QGraphicsItem]]
# A selection rect (`QRectF`)
self.selection_rect = None # type: Optional[QRectF]
# Keyboard modifiers
self.modifiers = Qt.NoModifier
self.rect_item = None # type: Optional[QGraphicsRectItem]
def mousePressEvent(self, event):
# type: (QGraphicsSceneMouseEvent) -> bool
pos = event.scenePos()
any_item = self.scene.item_at(pos)
if not any_item and event.button() & Qt.LeftButton:
self.modifiers = event.modifiers()
self.selection_rect = QRectF(pos, QSizeF(0, 0))
self.rect_item = QGraphicsRectItem(
self.selection_rect.normalized()
)
self.rect_item.setPen(
QPen(QBrush(QColor(51, 153, 255, 192)),
0.4, Qt.SolidLine, Qt.RoundCap)
)
self.rect_item.setBrush(
QBrush(QColor(168, 202, 236, 192))
)
self.rect_item.setZValue(-100)
# Clear the focus if necessary.
if not self.scene.stickyFocus():
self.scene.clearFocus()
if not self.modifiers & Qt.ControlModifier:
self.scene.clearSelection()
event.accept()
return True
else:
self.cancel(self.ErrorReason)
return False
def mouseMoveEvent(self, event):
# type: (QGraphicsSceneMouseEvent) -> bool
if self.rect_item is not None and not self.rect_item.scene():
# Add the rect item to the scene when the mouse moves.
self.scene.addItem(self.rect_item)
self.update_selection(event)
return True
def mouseReleaseEvent(self, event):
# type: (QGraphicsSceneMouseEvent) -> bool
if event.button() == Qt.LeftButton:
if self.initial_selection is None:
# A single click.
self.scene.clearSelection()
else:
self.update_selection(event)
self.end()
return True
def update_selection(self, event):
# type: (QGraphicsSceneMouseEvent) -> None
"""
Update the selection rectangle from a QGraphicsSceneMouseEvent
`event` instance.
"""
if self.initial_selection is None:
self.initial_selection = set(self.scene.selectedItems())
self.last_selection = self.initial_selection
assert self.selection_rect is not None
assert self.rect_item is not None
assert self.initial_selection is not None
assert self.last_selection is not None
pos = event.scenePos()
self.selection_rect = QRectF(self.selection_rect.topLeft(), pos)
# Make sure the rect_item does not cause the scene rect to grow.
rect = self._bound_selection_rect(self.selection_rect.normalized())
# Need that 0.5 constant otherwise the sceneRect will still
# grow (anti-aliasing correction by QGraphicsScene?)
pw = self.rect_item.pen().width() + 0.5
self.rect_item.setRect(rect.adjusted(pw, pw, -pw, -pw))
selected = self.scene.items(self.selection_rect.normalized(),
Qt.IntersectsItemShape,
Qt.AscendingOrder)
selected = set([item for item in selected if
item.flags() & QGraphicsItem.ItemIsSelectable])
if self.modifiers & Qt.ControlModifier:
for item in selected | self.last_selection | \
self.initial_selection:
item.setSelected(
(item in selected) ^ (item in self.initial_selection)
)
else:
for item in selected.union(self.last_selection):
item.setSelected(item in selected)
self.last_selection = set(self.scene.selectedItems())
def end(self):
# type: () -> None
self.initial_selection = None
self.last_selection = None
self.modifiers = Qt.NoModifier
if self.rect_item is not None:
self.rect_item.hide()
if self.rect_item.scene() is not None:
self.scene.removeItem(self.rect_item)
super().end()
def viewport_rect(self):
# type: () -> QRectF
"""
Return the bounding rect of the document's viewport on the scene.
"""
view = self.document.view()
vsize = view.viewport().size()
viewportrect = QRect(0, 0, vsize.width(), vsize.height())
return view.mapToScene(viewportrect).boundingRect()
def _bound_selection_rect(self, rect):
# type: (QRectF) -> QRectF
"""
Bound the selection `rect` to a sensible size.
"""
srect = self.scene.sceneRect()
vrect = self.viewport_rect()
maxrect = srect.united(vrect)
return rect.intersected(maxrect)
class EditNodeLinksAction(UserInteraction):
"""
Edit multiple links between two :class:`SchemeNode` instances using
a :class:`EditLinksDialog`
Parameters
----------
document : :class:`SchemeEditWidget`
The editor widget.
source_node : :class:`SchemeNode`
The source (link start) node for the link editor.
sink_node : :class:`SchemeNode`
The sink (link end) node for the link editor.
"""
def __init__(self, document, source_node, sink_node, *args, **kwargs):
# type: (SchemeEditWidget, Node, Node, Any, Any) -> None
super().__init__(document, *args, **kwargs)
self.source_node = source_node
self.sink_node = sink_node
def edit_links(self, initial_links=None):
# type: (Optional[List[OIPair]]) -> None
"""
Show and execute the `EditLinksDialog`.
Optional `initial_links` list can provide a list of initial
`(source, sink)` channel tuples to show in the view, otherwise
the dialog is populated with existing links in the scheme (passing
an empty list will disable all initial links).
"""
log.info("Constructing a Link Editor dialog.")
dlg = EditLinksDialog(self.document, windowTitle="Edit Links")
links = self.scheme.find_links(source_node=self.source_node,
sink_node=self.sink_node)
existing_links = [(link.source_channel, link.sink_channel)
for link in links]
if initial_links is None:
initial_links = list(existing_links)
dlg.setNodes(self.source_node, self.sink_node)
dlg.setLinks(initial_links)
log.info("Executing a Link Editor Dialog.")
rval = dlg.exec()
if rval == EditLinksDialog.Accepted:
links_spec = dlg.links()
links_to_add = set(links_spec) - set(existing_links)
links_to_remove = set(existing_links) - set(links_spec)
stack = self.document.undoStack()
stack.beginMacro("Edit Links")
# First remove links into a 'Single' sink channel,
# but only the ones that do not have self.source_node as
# a source (they will be removed later from links_to_remove)
for _, sink_channel in links_to_add:
if sink_channel.single:
existing = self.scheme.find_links(
sink_node=self.sink_node,
sink_channel=sink_channel
)
existing = [link for link in existing
if link.source_node is not self.source_node]
if existing:
assert len(existing) == 1
self.document.removeLink(existing[0])
for source_channel, sink_channel in links_to_remove:
links = self.scheme.find_links(source_node=self.source_node,
source_channel=source_channel,
sink_node=self.sink_node,
sink_channel=sink_channel)
assert len(links) == 1
self.document.removeLink(links[0])
for source_channel, sink_channel in links_to_add:
link = scheme.SchemeLink(self.source_node, source_channel,
self.sink_node, sink_channel)
self.document.addLink(link)
stack.endMacro()
def point_to_tuple(point):
# type: (QPointF) -> Tuple[float, float]
"""
Convert a QPointF into a (x, y) tuple.
"""
return (point.x(), point.y())
class NewArrowAnnotation(UserInteraction):
"""
Create a new arrow annotation handler.
"""
def __init__(self, document, *args, **kwargs):
# type: (SchemeEditWidget, Any, Any) -> None
super().__init__(document, *args, **kwargs)
self.down_pos = None # type: Optional[QPointF]
self.arrow_item = None # type: Optional[items.ArrowAnnotation]
self.annotation = None # type: Optional[scheme.SchemeArrowAnnotation]
self.color = "red"
self.cancelOnEsc = True
def start(self):
# type: () -> None
self.document.view().setCursor(Qt.CrossCursor)
helpevent = QuickHelpTipEvent(
self.tr("Click and drag to create a new arrow"),
self.tr('<h3>New arrow annotation</h3>'
'<p>Click and drag to create a new arrow annotation</p>'
# '<a href="help://orange-canvas/arrow-annotations>'
# 'More ...</a>'
)
)
QCoreApplication.postEvent(self.document, helpevent)
super().start()
def setColor(self, color):
"""
Set the color for the new arrow.
"""
self.color = color
def mousePressEvent(self, event):
# type: (QGraphicsSceneMouseEvent) -> bool
if event.button() == Qt.LeftButton:
self.down_pos = event.scenePos()
event.accept()
return True
else:
return super().mousePressEvent(event)
def mouseMoveEvent(self, event):
# type: (QGraphicsSceneMouseEvent) -> bool
if event.buttons() & Qt.LeftButton:
assert self.down_pos is not None
if self.arrow_item is None and \
(self.down_pos - event.scenePos()).manhattanLength() > \
QApplication.instance().startDragDistance():
annot = scheme.SchemeArrowAnnotation(
point_to_tuple(self.down_pos),
point_to_tuple(event.scenePos())
)
annot.set_color(self.color)
item = self.scene.add_annotation(annot)
self.arrow_item = item
self.annotation = annot
if self.arrow_item is not None:
p1, p2 = map(self.arrow_item.mapFromScene,
(self.down_pos, event.scenePos()))
self.arrow_item.setLine(QLineF(p1, p2))
event.accept()
return True
else:
return super().mouseMoveEvent(event)
def mouseReleaseEvent(self, event):
# type: (QGraphicsSceneMouseEvent) -> bool
if event.button() == Qt.LeftButton:
if self.arrow_item is not None:
assert self.down_pos is not None and self.annotation is not None
p1, p2 = self.down_pos, event.scenePos()
# Commit the annotation to the scheme
self.annotation.set_line(point_to_tuple(p1),
point_to_tuple(p2))
self.document.addAnnotation(self.annotation)
p1, p2 = map(self.arrow_item.mapFromScene, (p1, p2))
self.arrow_item.setLine(QLineF(p1, p2))
self.end()
return True
else:
return super().mouseReleaseEvent(event)
def cancel(self, reason=UserInteraction.OtherReason): # type: (int) -> None
if self.arrow_item is not None:
self.scene.removeItem(self.arrow_item)
self.arrow_item = None
super().cancel(reason)
def end(self):
# type: () -> None
self.down_pos = None
self.arrow_item = None
self.annotation = None
self.document.view().setCursor(Qt.ArrowCursor)
# Clear the help tip
helpevent = QuickHelpTipEvent("", "")
QCoreApplication.postEvent(self.document, helpevent)
super().end()
def rect_to_tuple(rect):
# type: (QRectF) -> Tuple[float, float, float, float]
"""
Convert a QRectF into a (x, y, width, height) tuple.
"""
return rect.x(), rect.y(), rect.width(), rect.height()
class NewTextAnnotation(UserInteraction):
"""
A New Text Annotation interaction handler
"""
def __init__(self, document, *args, **kwargs):
# type: (SchemeEditWidget, Any, Any) -> None
super().__init__(document, *args, **kwargs)
self.down_pos = None # type: Optional[QPointF]
self.annotation_item = None # type: Optional[items.TextAnnotation]
self.annotation = None # type: Optional[scheme.SchemeTextAnnotation]
self.control = None # type: Optional[controlpoints.ControlPointRect]
self.font = document.font() # type: QFont
self.cancelOnEsc = True
def setFont(self, font):
# type: (QFont) -> None
self.font = QFont(font)
def start(self):
# type: () -> None
self.document.view().setCursor(Qt.CrossCursor)
helpevent = QuickHelpTipEvent(
self.tr("Click to create a new text annotation"),
self.tr('<h3>New text annotation</h3>'
'<p>Click (and drag to resize) on the canvas to create '
'a new text annotation item.</p>'
# '<a href="help://orange-canvas/text-annotations">'
# 'More ...</a>'
)
)
QCoreApplication.postEvent(self.document, helpevent)
super().start()
def createNewAnnotation(self, rect):
# type: (QRectF) -> None
"""
Create a new TextAnnotation at with `rect` as the geometry.
"""
annot = scheme.SchemeTextAnnotation(rect_to_tuple(rect))
font = {"family": self.font.family(),
"size": self.font.pixelSize()}
annot.set_font(font)
item = self.scene.add_annotation(annot)
item.setTextInteractionFlags(Qt.TextEditorInteraction)
item.setFramePen(QPen(Qt.DashLine))
self.annotation_item = item
self.annotation = annot
self.control = controlpoints.ControlPointRect()
self.control.rectChanged.connect(item.setGeometry)
self.scene.addItem(self.control)
def mousePressEvent(self, event):
# type: (QGraphicsSceneMouseEvent) -> bool
if event.button() == Qt.LeftButton:
self.down_pos = event.scenePos()
return True
return super().mousePressEvent(event)
def mouseMoveEvent(self, event):
# type: (QGraphicsSceneMouseEvent) -> bool
if event.buttons() & Qt.LeftButton:
assert self.down_pos is not None
if self.annotation_item is None and \
(self.down_pos - event.scenePos()).manhattanLength() > \
QApplication.instance().startDragDistance():
rect = QRectF(self.down_pos, event.scenePos()).normalized()
self.createNewAnnotation(rect)
if self.annotation_item is not None:
assert self.control is not None
rect = QRectF(self.down_pos, event.scenePos()).normalized()
self.control.setRect(rect)
return True
return super().mouseMoveEvent(event)
def mouseReleaseEvent(self, event):
# type: (QGraphicsSceneMouseEvent) -> bool
if event.button() == Qt.LeftButton:
if self.annotation_item is None:
self.createNewAnnotation(QRectF(event.scenePos(),
event.scenePos()))
rect = self.defaultTextGeometry(event.scenePos())
else:
assert self.down_pos is not None
rect = QRectF(self.down_pos, event.scenePos()).normalized()
assert self.annotation_item is not None
assert self.control is not None
assert self.annotation is not None
# Commit the annotation to the scheme.
self.annotation.rect = rect_to_tuple(rect)
self.document.addAnnotation(self.annotation)
self.annotation_item.setGeometry(rect)
self.control.rectChanged.disconnect(
self.annotation_item.setGeometry
)
self.control.hide()
# Move the focus to the editor.
self.annotation_item.setFramePen(QPen(Qt.NoPen))
self.annotation_item.setFocus(Qt.OtherFocusReason)
self.annotation_item.startEdit()
self.end()
return True
return super().mouseMoveEvent(event)
def defaultTextGeometry(self, point):
# type: (QPointF) -> QRectF
"""
Return the default text geometry. Used in case the user single
clicked in the scene.
"""
assert self.annotation_item is not None
font = self.annotation_item.font()
metrics = QFontMetrics(font)
spacing = metrics.lineSpacing()
margin = self.annotation_item.document().documentMargin()
rect = QRectF(QPointF(point.x(), point.y() - spacing - margin),
QSizeF(150, spacing + 2 * margin))
return rect
def cancel(self, reason=UserInteraction.OtherReason): # type: (int) -> None
if self.annotation_item is not None:
self.annotation_item.clearFocus()
self.scene.removeItem(self.annotation_item)
self.annotation_item = None
super().cancel(reason)
def end(self):
# type: () -> None
if self.control is not None:
self.scene.removeItem(self.control)
self.control = None
self.down_pos = None
self.annotation_item = None
self.annotation = None
self.document.view().setCursor(Qt.ArrowCursor)
# Clear the help tip
helpevent = QuickHelpTipEvent("", "")
QCoreApplication.postEvent(self.document, helpevent)
super().end()
class ResizeTextAnnotation(UserInteraction):
"""
Resize a Text Annotation interaction handler.
"""
def __init__(self, document, *args, **kwargs):
# type: (SchemeEditWidget, Any, Any) -> None
super().__init__(document, *args, **kwargs)
self.item = None # type: Optional[items.TextAnnotation]
self.annotation = None # type: Optional[scheme.SchemeTextAnnotation]
self.control = None # type: Optional[controlpoints.ControlPointRect]
self.savedFramePen = None # type: Optional[QPen]
self.savedRect = None # type: Optional[QRectF]
def mousePressEvent(self, event):
# type: (QGraphicsSceneMouseEvent) -> bool
pos = event.scenePos()
if event.button() & Qt.LeftButton and self.item is None:
item = self.scene.item_at(pos, items.TextAnnotation)
if item is not None and not item.hasFocus():
self.editItem(item)
return False
return super().mousePressEvent(event)
def editItem(self, item):
# type: (items.TextAnnotation) -> None
annotation = self.scene.annotation_for_item(item)
rect = item.geometry() # TODO: map to scene if item has a parent.
control = controlpoints.ControlPointRect(rect=rect)
self.scene.addItem(control)
self.savedFramePen = item.framePen()
self.savedRect = rect
control.rectEdited.connect(item.setGeometry)
control.setFocusProxy(item)
item.setFramePen(QPen(Qt.DashDotLine))
item.geometryChanged.connect(self.__on_textGeometryChanged)
self.item = item
self.annotation = annotation
self.control = control
def commit(self):
# type: () -> None
"""
Commit the current item geometry state to the document.
"""
if self.item is None:
return
rect = self.item.geometry()
if self.savedRect != rect:
command = commands.SetAttrCommand(
self.annotation, "rect",
(rect.x(), rect.y(), rect.width(), rect.height()),
name="Edit text geometry"
)
self.document.undoStack().push(command)
self.savedRect = rect
def __on_editingFinished(self):
# type: () -> None
self.commit()
self.end()
def __on_rectEdited(self, rect):
# type: (QRectF) -> None
assert self.item is not None
self.item.setGeometry(rect)
def __on_textGeometryChanged(self):
# type: () -> None
assert self.control is not None and self.item is not None
if not self.control.isControlActive():
rect = self.item.geometry()
self.control.setRect(rect)
def cancel(self, reason=UserInteraction.OtherReason):
# type: (int) -> None
log.debug("ResizeTextAnnotation.cancel(%s)", reason)
if self.item is not None and self.savedRect is not None:
self.item.setGeometry(self.savedRect)
super().cancel(reason)
def end(self):
# type: () -> None
if self.control is not None:
self.scene.removeItem(self.control)
if self.item is not None and self.savedFramePen is not None:
self.item.setFramePen(self.savedFramePen)
self.item = None
self.annotation = None
self.control = None
super().end()
class ResizeArrowAnnotation(UserInteraction):
"""
Resize an Arrow Annotation interaction handler.
"""
def __init__(self, document, *args, **kwargs):
# type: (SchemeEditWidget, Any, Any) -> None
super().__init__(document, *args, **kwargs)
self.item = None # type: Optional[items.ArrowAnnotation]
self.annotation = None # type: Optional[scheme.SchemeArrowAnnotation]
self.control = None # type: Optional[controlpoints.ControlPointLine]
self.savedLine = None # type: Optional[QLineF]
def mousePressEvent(self, event):
# type: (QGraphicsSceneMouseEvent) -> bool
pos = event.scenePos()
if self.item is None:
item = self.scene.item_at(pos, items.ArrowAnnotation)
if item is not None and not item.hasFocus():
self.editItem(item)
return False
return super().mousePressEvent(event)
def editItem(self, item):
# type: (items.ArrowAnnotation) -> None
annotation = self.scene.annotation_for_item(item)
control = controlpoints.ControlPointLine()
self.scene.addItem(control)
line = item.line()
self.savedLine = line
p1, p2 = map(item.mapToScene, (line.p1(), line.p2()))
control.setLine(QLineF(p1, p2))
control.setFocusProxy(item)
control.lineEdited.connect(self.__on_lineEdited)
item.geometryChanged.connect(self.__on_lineGeometryChanged)
self.item = item
self.annotation = annotation
self.control = control
def commit(self):
# type: () -> None
"""Commit the current geometry of the item to the document.
Does nothing if the actual geometry has not changed.
"""
if self.control is None or self.item is None:
return
line = self.control.line()
p1, p2 = line.p1(), line.p2()
if self.item.line() != self.savedLine:
command = commands.SetAttrCommand(
self.annotation,
"geometry",
((p1.x(), p1.y()), (p2.x(), p2.y())),
name="Edit arrow geometry",
)
self.document.undoStack().push(command)
self.savedLine = self.item.line()
def __on_editingFinished(self):
# type: () -> None
self.commit()
self.end()
def __on_lineEdited(self, line):
# type: (QLineF) -> None
if self.item is not None:
p1, p2 = map(self.item.mapFromScene, (line.p1(), line.p2()))
self.item.setLine(QLineF(p1, p2))
def __on_lineGeometryChanged(self):
# type: () -> None
# Possible geometry change from out of our control, for instance
# item move as a part of a selection group.
assert self.control is not None and self.item is not None
if not self.control.isControlActive():
assert self.item is not None
line = self.item.line()
p1, p2 = map(self.item.mapToScene, (line.p1(), line.p2()))
self.control.setLine(QLineF(p1, p2))
def cancel(self, reason=UserInteraction.OtherReason):
# type: (int) -> None
log.debug("ResizeArrowAnnotation.cancel(%s)", reason)
if self.item is not None and self.savedLine is not None:
self.item.setLine(self.savedLine)
super().cancel(reason)
def end(self):
# type: () -> None
if self.control is not None:
self.scene.removeItem(self.control)
if self.item is not None:
self.item.geometryChanged.disconnect(self.__on_lineGeometryChanged)
self.control = None
self.item = None
self.annotation = None
super().end()
[docs]class DropHandler(abc.ABC):
"""
An abstract drop handler.
.. versionadded:: 0.1.20
"""
[docs] @abc.abstractmethod
def accepts(self, document: 'SchemeEditWidget', event: 'QGraphicsSceneDragDropEvent') -> bool:
"""
Returns True if a `document` can accept a drop of the data from `event`.
"""
return False
[docs] @abc.abstractmethod
def doDrop(self, document: 'SchemeEditWidget', event: 'QGraphicsSceneDragDropEvent') -> bool:
"""
Complete the drop of data from `event` onto the `document`.
"""
return False
[docs]class DropHandlerAction(abc.ABC):
[docs] @abc.abstractmethod
def actionFromDropEvent(
self, document: 'SchemeEditWidget', event: 'QGraphicsSceneDragDropEvent'
) -> QAction:
"""
Create and return an QAction representing a drop action.
This action is used to disambiguate between possible drop actions.
The action can have sub menus, however all actions in submenus **must**
have the `DropHandler` instance set as their `QAction.data()`.
The actions **must not** execute the actual drop from their triggered
slot connections. The drop will be dispatched to the `action.data()`
handler's `doDrop()` after that action is triggered and the menu is
closed.
"""
raise NotImplementedError
[docs]class NodeFromMimeDataDropHandler(DropHandler, DropHandlerAction):
"""
Create a new node from dropped mime data.
Subclasses must override `canDropMimeData`, `parametersFromMimeData`,
and `qualifiedName`.
.. versionadded:: 0.1.20
"""
[docs] @abc.abstractmethod
def qualifiedName(self) -> str:
"""
The qualified name for the node created by this handler. The handler
will not be invoked if this name does not appear in the registry
associated with the workflow.
"""
raise NotImplementedError
[docs] @abc.abstractmethod
def canDropMimeData(self, document: 'SchemeEditWidget', data: 'QMimeData') -> bool:
"""
Can the handler create a node from the drop mime data.
Reimplement this in a subclass to check if the `data` has appropriate
format.
"""
raise NotImplementedError
[docs] @abc.abstractmethod
def parametersFromMimeData(self, document: 'SchemeEditWidget', data: 'QMimeData') -> 'Dict[str, Any]':
"""
Return the node parameters based from the drop mime data.
"""
raise NotImplementedError
[docs] def accepts(self, document: 'SchemeEditWidget', event: 'QGraphicsSceneDragDropEvent') -> bool:
"""Reimplemented."""
reg = document.registry()
if not reg.has_widget(self.qualifiedName()):
return False
return self.canDropMimeData(document, event.mimeData())
def nodeFromMimeData(self, document: 'SchemeEditWidget', data: 'QMimeData') -> 'Node':
reg = document.registry()
wd = reg.widget(self.qualifiedName())
node = document.newNodeHelper(wd)
parameters = self.parametersFromMimeData(document, data)
node.properties = parameters
return node
[docs] def doDrop(self, document: 'SchemeEditWidget', event: 'QGraphicsSceneDragDropEvent') -> bool:
"""Reimplemented."""
reg = document.registry()
if not reg.has_widget(self.qualifiedName()):
return False
node = self.nodeFromMimeData(document, event.mimeData())
node.position = (event.scenePos().x(), event.scenePos().y())
activate = self.shouldActivateNode()
wd = document.widgetManager()
if activate and wd is not None:
def activate(node_, widget):
if node_ is node:
try:
self.activateNode(document, node, widget)
finally:
# self-disconnect the slot
wd.widget_for_node_added.disconnect(activate)
wd.widget_for_node_added.connect(activate)
document.addNode(node)
if activate:
QApplication.postEvent(node, WorkflowEvent(WorkflowEvent.NodeActivateRequest))
return True
[docs] def shouldActivateNode(self) -> bool:
"""
Should the new dropped node activate (open GUI controller) immediately.
If this method returns `True` then the `activateNode` method will be
called after the node has been added and the GUI controller created.
The default implementation returns False.
"""
return False
[docs] def activateNode(self, document: 'SchemeEditWidget', node: 'Node', widget: 'QWidget') -> None:
"""
Activate (open) the `node`'s GUI controller `widget` after a
completed drop.
Reimplement this if the node requires further configuration via the
GUI.
The default implementation delegates to the :class:`WidgetManager`
associated with the document.
"""
wd = document.widgetManager()
if wd is not None:
wd.activate_widget_for_node(node, widget)
else:
widget.show()
[docs] def actionFromDropEvent(
self, document: 'SchemeEditWidget', event: 'QGraphicsSceneDragDropEvent'
) -> QAction:
"""Reimplemented."""
reg = document.registry()
ac = QAction(None)
ac.setData(self)
if reg is not None:
desc = reg.widget(self.qualifiedName())
ac.setText(desc.name)
ac.setToolTip(tooltip_helper(desc))
ac.setWhatsThis(whats_this_helper(desc))
else:
ac.setText(f"{self.qualifiedName()}")
ac.setEnabled(False)
ac.setVisible(False)
return ac
def load_entry_point(
ep: EntryPoint, log: logging.Logger = None,
) -> Tuple['EntryPoint', Any]:
if log is None:
log = logging.getLogger(__name__)
try:
value = ep.load()
except (ImportError, AttributeError):
log.exception("Could not load %s", ep)
except Exception: # noqa
log.exception("Unexpected Error; %s will be skipped", ep)
else:
return ep, value
def iter_load_entry_points(
iter: Iterable[EntryPoint], log: logging.Logger = None,
):
if log is None:
log = logging.getLogger(__name__)
for ep in iter:
try:
ep, value = load_entry_point(ep, log)
except Exception:
pass
else:
yield ep, value
[docs]class PluginDropHandler(DropHandler):
"""
Delegate drop event processing to plugin drop handlers.
.. versionadded:: 0.1.20
"""
#: The default entry point group
ENTRY_POINT = "orangecanvas.document.interactions.DropHandler"
def __init__(self, group=ENTRY_POINT, **kwargs):
super().__init__(**kwargs)
self.__group = group
[docs] def iterEntryPoints(self) -> Iterable['EntryPoint']:
"""
Return an iterator over all entry points.
"""
eps = entry_points(group=self.__group)
# Can have duplicated entries here if a distribution is *found* via
# different `sys.meta_path` handlers and/or on different `sys.path`
# entries.
return unique(eps, key=lambda ep: ep.value)
__entryPoints = None
[docs] def entryPoints(self) -> Iterable[Tuple['EntryPoint', 'DropHandler']]:
"""
Return an iterator over entry points and instantiated drop handlers.
"""
eps = []
if self.__entryPoints:
ep_iter = self.__entryPoints
store_eps = lambda ep, value: None
else:
ep_iter = self.iterEntryPoints()
ep_iter = iter_load_entry_points(ep_iter, log)
store_eps = lambda ep, value: eps.append((ep, value))
for ep, value in ep_iter:
if not issubclass(value, DropHandler):
log.error(
f"{ep} yielded {type(value)}, expected a "
f"{DropHandler} subtype"
)
continue
try:
handler = value()
except Exception: # noqa
log.exception("Error in default constructor of %s", value)
else:
yield ep, handler
store_eps(ep, value)
self.__entryPoints = tuple(eps)
__accepts: Sequence[Tuple[EntryPoint, DropHandler]] = ()
[docs] def accepts(self, document: 'SchemeEditWidget', event: 'QGraphicsSceneDragDropEvent') -> bool:
"""
Reimplemented.
Accept the event if any plugin handlers accept the event.
"""
accepts = []
self.__accepts = ()
for ep, handler in self.entryPoints():
if handler.accepts(document, event):
accepts.append((ep, handler))
self.__accepts = tuple(accepts)
return bool(accepts)
[docs] def doDrop(
self, document: 'SchemeEditWidget', event: 'QGraphicsSceneDragDropEvent'
) -> bool:
"""
Reimplemented.
Dispatch the drop to the plugin handler that accepted the event.
In case there are multiple handlers that accepted the event, a menu
is used to select the handler.
"""
handler: Optional[DropHandler] = None
if len(self.__accepts) == 1:
ep, handler = self.__accepts[0]
elif len(self.__accepts) > 1:
class Title(QWidgetAction):
def createWidget(self, parent):
return QLabel("<b>Select a widget</b>", parent, margin=2)
menu = QMenu(event.widget())
menu.addAction(Title(menu))
separator = QAction()
separator.setSeparator(True)
menu.addAction(separator)
for ep_, handler_ in self.__accepts:
ac = action_for_handler(handler_, document, event)
if ac is None:
ac = menu.addAction(ep_.name, )
else:
menu.addAction(ac)
ac.setParent(menu)
if not ac.toolTip():
ac.setToolTip(f"{ep_.name} ({ep_.module_name})")
ac.setData(handler_)
action = menu.exec(event.screenPos())
if action is not None:
handler = action.data()
if handler is None:
return False
return handler.doDrop(document, event)
def action_for_handler(handler: DropHandler, document, event) -> Optional[QAction]:
if isinstance(handler, DropHandlerAction):
return handler.actionFromDropEvent(document, event)
else:
return None
[docs]class DropAction(UserInteraction):
"""
A drop action on the workflow.
"""
def __init__(
self, document, *args, dropHandlers: Sequence[DropHandler] = (),
**kwargs
) -> None:
super().__init__(document, *args, **kwargs)
self.__designatedAction: Optional[DropHandler] = None
self.__dropHandlers = dropHandlers
[docs] def dropHandlers(self) -> Iterable[DropHandler]:
"""Return an iterable over drop handlers."""
return iter(self.__dropHandlers)
[docs] def canHandleDrop(self, event: 'QGraphicsSceneDragDropEvent') -> bool:
"""
Can this interactions handle the drop `event`.
The default implementation checks each `dropHandler` if it
:func:`~DropHandler.accepts` the event. The first such handler that
accepts is selected to be the designated handler and will receive
the drop (:func:`~DropHandler.doDrop`).
"""
for ep in self.dropHandlers():
if ep.accepts(self.document, event):
self.__designatedAction = ep
return True
else:
return False
[docs] def dragEnterEvent(self, event):
if self.canHandleDrop(event):
event.acceptProposedAction()
return True
else:
return False
[docs] def dragMoveEvent(self, event):
if self.canHandleDrop(event):
event.acceptProposedAction()
return True
else:
return False
[docs] def dragLeaveEvent(self, even):
self.__designatedAction = None
self.end()
return False
[docs] def dropEvent(self, event):
if self.__designatedAction is not None:
try:
res = self.__designatedAction.doDrop(self.document, event)
except Exception: # noqa
log.exception("")
res = False
if res:
event.acceptProposedAction()
self.end()
return True
else:
self.end()
return False