Source code for orangecanvas.scheme.link

"""
===========
Scheme Link
===========

"""
import enum
import warnings
import typing
from traceback import format_exception_only
from typing import List, Tuple, Union, Optional, Iterable

from AnyQt.QtCore import QObject, QCoreApplication
from AnyQt.QtCore import pyqtSignal as Signal, pyqtProperty as Property

from ..registry.description import normalize_type_simple
from ..utils import type_lookup
from .errors import IncompatibleChannelTypeError
from .events import LinkEvent

if typing.TYPE_CHECKING:
    from ..registry import OutputSignal as Output, InputSignal as Input
    from . import SchemeNode as Node


def resolve_types(types):
    # type: (Iterable[str]) -> Tuple[Optional[type], ...]
    """
    Resolve the fully qualified names to python types.

    If a name fails to resolve to a type then the corresponding entry in output
    is replaced with a None.

    Parameters
    ----------
    types: Iterable[str]
        Names of types to resolve

    Returns
    -------
    type: Tuple[Optional[type], ...]
        The `type` instances in the same order as input `types` with `None`
        replacing any type that cannot be resolved.

    """
    rt = []  # type: List[Optional[type]]
    for t in types:
        try:
            rt.append(type_lookup(t))
        except (TypeError, ImportError, AttributeError) as err:
            warnings.warn(
                "Failed to resolve name {!r} to a type: {!s}"
                    .format(t, "\n".join(format_exception_only(type(err), err))),
                RuntimeWarning, stacklevel=2
            )
            rt.append(None)
    return tuple(rt)


def resolved_valid_types(types):
    # type: (Iterable[str]) -> Tuple[type, ...]
    """
    Resolve fully qualified names to python types, omiting all types that
    fail to resolve.

    Parameters
    ----------
    types: Iterable[str]

    Returns
    -------
    type: Tuple[type, ...]
    """
    return tuple(filter(None, resolve_types(types)))


def compatible_channels(source_channel, sink_channel):
    # type: (Output, Input) -> bool
    """
    Do the source and sink channels have compatible types, i.e. can they be
    connected based on their specified types.
    """
    strict, dynamic = _classify_connection(source_channel, sink_channel)
    return strict or dynamic


def _classify_connection(source, sink):
    # type: (Output, Input) -> Tuple[bool, bool]
    """
    Classify the source -> sink connection type check.

    Returns
    -------
    rval : Tuple[bool, bool]
        A `(strict, dynamic)` tuple where `strict` is True if connection
        passes a strict type check, and `dynamic` is True if the
        `source.dynamic` is True and at least one of the sink types is
        a subtype of the source types.
    """
    source_types = resolved_valid_types(source.types)
    sink_types = resolved_valid_types(sink.types)
    if not source_types or not sink_types:
        return False, False
    # Are all possible source types subtypes of the sink_types.
    strict = all(issubclass(source_t, sink_types) for source_t in source_types)
    if source.dynamic:
        # Is at least one of the possible sink types a subtype of
        # the source_types.
        dynamic = any(issubclass(sink_t, source_types) for sink_t in sink_types)
    else:
        dynamic = False
    return strict, dynamic


def can_connect(source_node, sink_node):
    # type: (Node, Node) -> bool
    """
    Return True if any output from `source_node` can be connected to
    any input of `sink_node`.
    """
    return bool(possible_links(source_node, sink_node))


def possible_links(source_node, sink_node):
    # type: (Node, Node) -> List[Tuple[Output, Input]]
    """
    Return a list of (OutputSignal, InputSignal) tuples, that
    can connect the two nodes.
    """
    possible = []
    for source in source_node.output_channels():
        for sink in sink_node.input_channels():
            if compatible_channels(source, sink):
                possible.append((source, sink))
    return possible


def _get_first_type(arg, newname):
    # type: (Union[str, type, Tuple[Union[str, type], ...]], str) -> type
    if isinstance(arg, tuple):
        if len(arg) > 1:
            warnings.warn(
                "Multiple types specified, but using only the first. "
                "Use `{newname}` instead.".format(newname=newname),
                RuntimeWarning, stacklevel=3
            )
        if arg:
            arg0 = normalize_type_simple(arg[0])
            return type_lookup(arg0)
        else:
            raise ValueError("no type spec")
    if isinstance(arg, type):
        return arg
    rv = type_lookup(arg)
    if rv is not None:
        return rv
    else:
        raise TypeError("{!r} does not resolve to a type")