Source code for orangecanvas.scheme.readwrite

Scheme save/load routines.

import numbers
import sys
import types
import warnings
import base64
import binascii
import itertools
import math

from xml.etree.ElementTree import TreeBuilder, Element, ElementTree, parse

from collections import defaultdict
from itertools import chain, count

import pickle
import json
import pprint

import ast
from ast import literal_eval

import logging

from typing import (
    NamedTuple, Dict, Tuple, List, Union, Any, Optional, AnyStr, IO

from . import SchemeNode, SchemeLink
from .annotations import SchemeTextAnnotation, SchemeArrowAnnotation
from .errors import IncompatibleChannelTypeError

from ..registry import global_registry, WidgetRegistry
from ..registry import WidgetDescription, InputSignal, OutputSignal
from ..utils import findf

log = logging.getLogger(__name__)

# protocol v4 is supported since Python 3.4, protocol v5 since Python 3.8

[docs]class UnknownWidgetDefinition(Exception): pass
def _ast_parse_expr(source): # type: (str) -> ast.Expression node = ast.parse(source, "<source>", mode="eval") assert isinstance(node, ast.Expression) return node
[docs]def string_eval(source): # type: (str) -> str """ Evaluate a python string literal `source`. Raise ValueError if `source` is not a string literal. >>> string_eval("'a string'") a string """ node = _ast_parse_expr(source) if not isinstance(node.body, ast.Str): raise ValueError("%r is not a string literal" % source) return node.body.s
[docs]def tuple_eval(source): # type: (str) -> tuple """ Evaluate a python tuple literal `source` where the elements are constrained to be int, float or string. Raise ValueError if not a tuple literal. >>> tuple_eval("(1, 2, '3')") (1, 2, '3') """ node = _ast_parse_expr(source) if not isinstance(node.body, ast.Tuple): raise ValueError("%r is not a tuple literal" % source) if not all(isinstance(el, (ast.Str, ast.Num)) or # allow signed number literals in Python3 (i.e. -1|+1|-1.0) (isinstance(el, ast.UnaryOp) and isinstance(el.op, (ast.UAdd, ast.USub)) and isinstance(el.operand, ast.Num)) for el in node.body.elts): raise ValueError("Can only contain numbers or strings") return literal_eval(source)
[docs]def terminal_eval(source): # type: (str) -> Union[str, bytes, int, float, complex, bool, None] """ Evaluate a python 'constant' (string, number, None, True, False) `source`. Raise ValueError is not a terminal literal. >>> terminal_eval("True") True """ node = _ast_parse_expr(source) return _terminal_value(node.body)
def _terminal_value(node): # type: (ast.AST) -> Union[str, bytes, int, float, complex, None] if isinstance(node, ast.Str): return node.s elif isinstance(node, ast.Bytes): return node.s elif isinstance(node, ast.Num): return node.n elif isinstance(node, ast.NameConstant): return node.value raise ValueError("Not a terminal") # Intermediate scheme representation _scheme = NamedTuple( "_scheme", [ ("title", str), ("version", str), ("description", str), ("nodes", List['_node']), ("links", List['_link']), ("annotations", List['_annotation']), ("session_state", '_session_data') ] ) _node = NamedTuple( "_node", [ ("id", str), ("title", str), ("name", str), ("position", Tuple[float, float]), ("project_name", str), ("qualified_name", str), ("version", str), ("data", Optional['_data']) ] ) _data = NamedTuple( "_data", [ ("format", str), ("data", Union[bytes, str]) ] ) _link = NamedTuple( "_link", [ ("id", str), ("source_node_id", str), ("sink_node_id", str), ("source_channel", str), ("source_channel_id", str), ("sink_channel", str), ("sink_channel_id", str), ("enabled", bool), ] ) _annotation = NamedTuple( "_annotation", [ ("id", str), ("type", str), ("params", Union['_text_params', '_arrow_params']), ] ) _text_params = NamedTuple( "_text_params", [ ("geometry", Tuple[float, float, float, float]), ("text", str), ("font", Dict[str, Any]), ("content_type", str), ] ) _arrow_params = NamedTuple( "_arrow_params", [ ("geometry", Tuple[Tuple[float, float], Tuple[float, float]]), ("color", str), ]) _window_group = NamedTuple( "_window_group", [ ("name", str), ("default", bool), ("state", List[Tuple[str, bytes]]) ] ) _session_data = NamedTuple( "_session_data", [ ("groups", List[_window_group]) ] )
[docs]def parse_ows_etree_v_2_0(tree): # type: (ElementTree) -> _scheme """ Parset an xml.etree.ElementTree struct into a intermediate workflow representation. """ scheme = tree.getroot() version = scheme.get("version") nodes, links, annotations = [], [], [] # First collect all properties properties = {} # type: Dict[str, _data] for property in tree.findall("node_properties/properties"): node_id = property.get("node_id") # type: str format = property.get("format") if version == "2.0" and "data" in property.attrib: data_str = property.get("data", default="") else: data_str = property.text or "" properties[node_id] = _data(format, data_str) # Collect all nodes for node in tree.findall("nodes/node"): node_id = node.get("id") _px, _py = tuple_eval(node.get("position")) nodes.append( _node( # type: ignore id=node_id, title=node.get("title"), name=node.get("name"), position=(_px, _py), project_name=node.get("project_name"), qualified_name=node.get("qualified_name"), version=node.get("version", ""), data=properties.get(node_id, None) ) ) for link in tree.findall("links/link"): params = _link( id=link.get("id"), source_node_id=link.get("source_node_id"), sink_node_id=link.get("sink_node_id"), source_channel=link.get("source_channel"), source_channel_id=link.get("source_channel_id", ""), sink_channel=link.get("sink_channel"), sink_channel_id=link.get("sink_channel_id", ""), enabled=link.get("enabled") == "true", ) links.append(params) for annot in tree.findall("annotations/*"): if annot.tag == "text": rect = tuple_eval(annot.get("rect", "(0.0, 0.0, 20.0, 20.0)")) font_family = annot.get("font-family", "").strip() font_size = annot.get("font-size", "").strip() font = {} # type: Dict[str, Any] if font_family: font["family"] = font_family if font_size: font["size"] = int(font_size) content_type = annot.get("type", "text/plain") annotation = _annotation( id=annot.get("id"), type="text", params=_text_params( # type: ignore rect, annot.text or "", font, content_type), ) elif annot.tag == "arrow": start = tuple_eval(annot.get("start", "(0, 0)")) end = tuple_eval(annot.get("end", "(0, 0)")) color = annot.get("fill", "red") annotation = _annotation( id=annot.get("id"), type="arrow", params=_arrow_params((start, end), color) # type: ignore ) else: log.warning("Unknown annotation '%s'. Skipping.", annot.tag) continue annotations.append(annotation) window_presets = [] for window_group in tree.findall("session_state/window_groups/group"): name = window_group.get("name") # type: str default = window_group.get("default", "false") == "true" state = [] for state_ in window_group.findall("window_state"): node_id = state_.get("node_id") text_ = state_.text if text_ is not None: try: data = base64.decodebytes(text_.encode("ascii")) except (binascii.Error, UnicodeDecodeError): data = b'' else: data = b'' state.append((node_id, data)) window_presets.append(_window_group(name, default, state)) session_state = _session_data(window_presets) return _scheme( version=version, title=scheme.get("title", ""), description=scheme.get("description"), nodes=nodes, links=links, annotations=annotations, session_state=session_state, )
[docs]class InvalidFormatError(ValueError): pass
[docs]class UnsupportedFormatVersionError(ValueError): pass
def parse_ows_stream(stream): # type: (Union[AnyStr, IO]) -> _scheme doc = parse(stream) scheme_el = doc.getroot() if scheme_el.tag != "scheme": raise InvalidFormatError( "Invalid Orange Workflow Scheme file" ) version = scheme_el.get("version", None) if version is None: # Check for "widgets" tag - old Orange<2.7 format if scheme_el.find("widgets") is not None: raise UnsupportedFormatVersionError( "Cannot open Orange Workflow Scheme v1.0. This format is no " "longer supported" ) else: raise InvalidFormatError( "Invalid Orange Workflow Scheme file (missing version)." ) if version in {"2.0", "2.1"}: return parse_ows_etree_v_2_0(doc) else: raise UnsupportedFormatVersionError( f"Unsupported format version {version}") def resolve_replaced(scheme_desc: _scheme, registry: WidgetRegistry) -> _scheme: widgets = registry.widgets() nodes_by_id = {} # type: Dict[str, _node] replacements = {} replacements_channels = {} # type: Dict[str, Tuple[dict, dict]] # collect all the replacement mappings for desc in widgets: # type: WidgetDescription if desc.replaces: for repl_qname in desc.replaces: replacements[repl_qname] = desc.qualified_name input_repl = {} for idesc in desc.inputs or []: # type: InputSignal for repl_qname in idesc.replaces or []: # type: str input_repl[repl_qname] = output_repl = {} for odesc in desc.outputs: # type: OutputSignal for repl_qname in odesc.replaces or []: # type: str output_repl[repl_qname] = replacements_channels[desc.qualified_name] = (input_repl, output_repl) # replace the nodes nodes = scheme_desc.nodes for i, node in list(enumerate(nodes)): if not registry.has_widget(node.qualified_name) and \ node.qualified_name in replacements: qname = replacements[node.qualified_name] desc = registry.widget(qname) nodes[i] = node._replace(qualified_name=desc.qualified_name, project_name=desc.project_name) nodes_by_id[] = nodes[i] # replace links links = scheme_desc.links for i, link in list(enumerate(links)): nsource = nodes_by_id[link.source_node_id] nsink = nodes_by_id[link.sink_node_id] _, source_rep = replacements_channels.get( nsource.qualified_name, ({}, {})) sink_rep, _ = replacements_channels.get( nsink.qualified_name, ({}, {})) if link.source_channel in source_rep: link = link._replace( source_channel=source_rep[link.source_channel]) if link.sink_channel in sink_rep: link = link._replace( sink_channel=sink_rep[link.sink_channel]) links[i] = link return scheme_desc._replace(nodes=nodes, links=links) def scheme_load(scheme, stream, registry=None, error_handler=None): desc = parse_ows_stream(stream) # type: _scheme if registry is None: registry = global_registry() if error_handler is None: def error_handler(exc): raise exc desc = resolve_replaced(desc, registry) nodes_not_found = [] nodes = [] nodes_by_id = {} links = [] annotations = [] scheme.title = desc.title scheme.description = desc.description for node_d in desc.nodes: try: w_desc = registry.widget(node_d.qualified_name) except KeyError as ex: error_handler(UnknownWidgetDefinition(*ex.args)) nodes_not_found.append( else: node = SchemeNode( w_desc, title=node_d.title, position=node_d.position) data = if data: try: properties = loads(, data.format) except Exception: log.error("Could not load properties for %r.", node.title, exc_info=True) else: = properties nodes.append(node) nodes_by_id[] = node for link_d in desc.links: source_id = link_d.source_node_id sink_id = link_d.sink_node_id if source_id in nodes_not_found or sink_id in nodes_not_found: continue source = nodes_by_id[source_id] sink = nodes_by_id[sink_id] try: source_channel = _find_source_channel(source, link_d) sink_channel = _find_sink_channel(sink, link_d) link = SchemeLink(source, source_channel, sink, sink_channel, enabled=link_d.enabled) except (ValueError, IncompatibleChannelTypeError) as ex: error_handler(ex) else: links.append(link) for annot_d in desc.annotations: params = annot_d.params if annot_d.type == "text": annot = SchemeTextAnnotation( params.geometry, params.text, params.content_type, params.font ) elif annot_d.type == "arrow": start, end = params.geometry annot = SchemeArrowAnnotation(start, end, params.color) else: log.warning("Ignoring unknown annotation type: %r", annot_d.type) continue annotations.append(annot) for node in nodes: scheme.add_node(node) for link in links: scheme.add_link(link) for annot in annotations: scheme.add_annotation(annot) if desc.session_state.groups: groups = [] for g in desc.session_state.groups: # type: _window_group # resolve node_id -> node state = [(nodes_by_id[node_id], data) for node_id, data in g.state if node_id in nodes_by_id] groups.append(Scheme.WindowGroup(, g.default, state)) scheme.set_window_group_presets(groups) return scheme def _find_source_channel(node: SchemeNode, link: _link) -> OutputSignal: source_channel: Optional[OutputSignal] = None if link.source_channel_id: source_channel = findf( node.output_channels(), lambda c: == link.source_channel_id, ) if source_channel is not None: return source_channel source_channel = findf( node.output_channels(), lambda c: == link.source_channel, ) if source_channel is not None: return source_channel raise ValueError( f"{link.source_channel!r} is not a valid output channel " f"for {!r}." ) def _find_sink_channel(node: SchemeNode, link: _link) -> InputSignal: sink_channel: Optional[InputSignal] = None if link.sink_channel_id: sink_channel = findf( node.input_channels(), lambda c: == link.sink_channel_id, ) if sink_channel is not None: return sink_channel sink_channel = findf( node.input_channels(), lambda c: == link.sink_channel, ) if sink_channel is not None: return sink_channel raise ValueError( f"{link.sink_channel!r} is not a valid input channel " f"for {!r}." )
[docs]def scheme_to_etree(scheme, data_format="literal", pickle_fallback=False): """ Return an `xml.etree.ElementTree` representation of the `scheme`. """ builder = TreeBuilder(element_factory=Element) builder.start("scheme", {"version": "2.0", "title": scheme.title or "", "description": scheme.description or ""}) # Nodes node_ids = defaultdict(lambda c=itertools.count(): next(c)) builder.start("nodes", {}) for node in scheme.nodes: # type: SchemeNode desc = node.description attrs = {"id": str(node_ids[node]), "name":, "qualified_name": desc.qualified_name, "project_name": desc.project_name or "", "version": desc.version or "", "title": node.title, } if node.position is not None: attrs["position"] = str(node.position) if type(node) is not SchemeNode: attrs["scheme_node_type"] = "%s.%s" % (type(node).__name__, type(node).__module__) builder.start("node", attrs) builder.end("node") builder.end("nodes") # Links link_ids = defaultdict(lambda c=itertools.count(): next(c)) builder.start("links", {}) for link in scheme.links: source = link.source_node sink = link.sink_node source_id = node_ids[source] sink_id = node_ids[sink] attrs = {"id": str(link_ids[link]), "source_node_id": str(source_id), "sink_node_id": str(sink_id), "source_channel":, "sink_channel":, "enabled": "true" if link.enabled else "false", } if attrs["source_channel_id"] = if attrs["sink_channel_id"] = builder.start("link", attrs) builder.end("link") builder.end("links") # Annotations annotation_ids = defaultdict(lambda c=itertools.count(): next(c)) builder.start("annotations", {}) for annotation in scheme.annotations: annot_id = annotation_ids[annotation] attrs = {"id": str(annot_id)} data = None if isinstance(annotation, SchemeTextAnnotation): tag = "text" attrs.update({"type": annotation.content_type}) attrs.update({"rect": repr(annotation.rect)}) # Save the font attributes font = annotation.font attrs.update({"font-family": font.get("family", None), "font-size": font.get("size", None)}) attrs = [(key, value) for key, value in attrs.items() if value is not None] attrs = dict((key, str(value)) for key, value in attrs) data = annotation.content elif isinstance(annotation, SchemeArrowAnnotation): tag = "arrow" attrs.update({"start": repr(annotation.start_pos), "end": repr(annotation.end_pos), "fill": annotation.color}) data = None else: log.warning("Can't save %r", annotation) continue builder.start(tag, attrs) if data is not None: builder.end(tag) builder.end("annotations") builder.start("thumbnail", {}) builder.end("thumbnail") # Node properties/settings builder.start("node_properties", {}) for node in scheme.nodes: data = None if try: data, format = dumps(, format=data_format, pickle_fallback=pickle_fallback) except Exception: log.error("Error serializing properties for node %r", node.title, exc_info=True) if data is not None: builder.start("properties", {"node_id": str(node_ids[node]), "format": format}) builder.end("properties") builder.end("node_properties") builder.start("session_state", {}) builder.start("window_groups", {}) for g in scheme.window_group_presets(): builder.start( "group", {"name":, "default": str(g.default).lower()} ) for node, data in g.state: if node not in node_ids: continue builder.start("window_state", {"node_id": str(node_ids[node])})"ascii")) builder.end("window_state") builder.end("group") builder.end("window_group") builder.end("session_state") builder.end("scheme") root = builder.close() tree = ElementTree(root) return tree
[docs]def scheme_to_ows_stream(scheme, stream, pretty=False, pickle_fallback=False): """ Write scheme to a a stream in Orange Scheme .ows (v 2.0) format. Parameters ---------- scheme : :class:`.Scheme` A :class:`.Scheme` instance to serialize. stream : file-like object A file-like object opened for writing. pretty : bool, optional If `True` the output xml will be pretty printed (indented). pickle_fallback : bool, optional If `True` allow scheme node properties to be saves using pickle protocol if properties cannot be saved using the default notation. """ tree = scheme_to_etree(scheme, data_format="literal", pickle_fallback=pickle_fallback) if pretty: indent(tree.getroot(), 0) tree.write(stream, encoding="utf-8", xml_declaration=True)
[docs]def indent(element, level=0, indent="\t"): """ Indent an instance of a :class:`Element`. Based on ( """ def empty(text): return not text or not text.strip() def indent_(element, level, last): child_count = len(element) if child_count: if empty(element.text): element.text = "\n" + indent * (level + 1) if empty(element.tail): element.tail = "\n" + indent * (level + (-1 if last else 0)) for i, child in enumerate(element): indent_(child, level + 1, i == child_count - 1) else: if empty(element.tail): element.tail = "\n" + indent * (level + (-1 if last else 0)) return indent_(element, level, True)
[docs]def dumps(obj, format="literal", prettyprint=False, pickle_fallback=False): """ Serialize `obj` using `format` ('json' or 'literal') and return its string representation and the used serialization format ('literal', 'json' or 'pickle'). If `pickle_fallback` is True and the serialization with `format` fails object's pickle representation will be returned """ if format == "literal": try: return (literal_dumps(obj, indent=1 if prettyprint else None), "literal") except (ValueError, TypeError) as ex: if not pickle_fallback: raise log.warning("Could not serialize to a literal string", exc_info=True) elif format == "json": try: return (json.dumps(obj, indent=1 if prettyprint else None), "json") except (ValueError, TypeError): if not pickle_fallback: raise log.warning("Could not serialize to a json string", exc_info=True) elif format == "pickle": return base64.encodebytes(pickle.dumps(obj, protocol=PICKLE_PROTOCOL)). \ decode('ascii'), "pickle" else: raise ValueError("Unsupported format %r" % format) if pickle_fallback: log.warning("Using pickle fallback") return base64.encodebytes(pickle.dumps(obj, protocol=PICKLE_PROTOCOL)). \ decode('ascii'), "pickle" else: raise Exception("Something strange happened.")
def loads(string, format): if format == "literal": return literal_eval(string) elif format == "json": return json.loads(string) elif format == "pickle": return pickle.loads(base64.decodebytes(string.encode('ascii'))) else: raise ValueError("Unknown format") # This is a subset of PyON serialization.
[docs]def literal_dumps(obj, indent=None, relaxed_types=True): """ Write obj into a string as a python literal. Note ---- :class:`set` objects are not supported as the empty set is not representable as a literal. Parameters ---------- obj : Any indent : Optional[int] If not None then it is the indent for the pretty printer. relaxed_types : bool Relaxed type checking. In addition to exact builtin numeric types, the numbers.Integer, numbers.Real are checked and allowed if their repr matches that of the builtin. .. warning:: The exact type of the values will be lost. Returns ------- repr : str String representation of `obj` See Also -------- ast.literal_eval Raises ------ TypeError If obj contains non builtin types that cannot be represented as a literal value. ValueError If obj is a recursive structure. """ memo = {} # non compounds builtins = {int, float, bool, type(None), str, bytes} # sequences builtins_seq = {list, tuple} # mappings builtins_mapping = {dict} def check(obj): if type(obj) == float and not math.isfinite(obj): raise TypeError("Non-finite values can not be " "serialized as a python literal") if type(obj) in builtins: return True if id(obj) in memo: raise ValueError("{0} is a recursive structure".format(obj)) memo[id(obj)] = obj if type(obj) in builtins_seq: return all(map(check, obj)) elif type(obj) in builtins_mapping: return all(map(check, chain(obj.keys(), obj.values()))) else: raise TypeError("{0} can not be serialized as a python " "literal".format(type(obj))) def check_relaxed(obj): if isinstance(obj, numbers.Real) and not math.isfinite(obj): raise TypeError("Non-finite values can not be " "serialized as a python literal") if type(obj) in builtins: return True if id(obj) in memo: raise ValueError("{0} is a recursive structure".format(obj)) memo[id(obj)] = obj if type(obj) in builtins_seq: return all(map(check_relaxed, obj)) elif type(obj) in builtins_mapping: return all(map(check_relaxed, chain(obj.keys(), obj.values()))) #, uint, ... elif isinstance(obj, numbers.Integral): if repr(obj) == repr(int(obj)): return True # numpy.float, ... elif isinstance(obj, numbers.Real): if repr(obj) == repr(float(obj)): return True raise TypeError("{0} can not be serialized as a python " "literal".format(type(obj))) if relaxed_types: check_relaxed(obj) else: check(obj) if indent is not None: return pprint.pformat(obj, width=80 * 2, indent=indent, compact=True) else: return repr(obj)
literal_loads = literal_eval from .scheme import Scheme # pylint: disable=all