blob: da686ab154f474c190a6eadc9ca50de7fb1bff3b [file] [log] [blame]
#!/usr/bin/env python
# -*- coding: utf-8 -*-
### This Source Code Form is subject to the terms of the Mozilla Public
### License, v. 2.0. If a copy of the MPL was not distributed with this
### file, You can obtain one at http://mozilla.org/MPL/2.0/.
### Copyright 2014-2015 (c) TU-Dresden (Author: Chris Iatrou)
### Copyright 2014-2017 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
### Copyright 2016-2017 (c) Stefan Profanter, fortiss GmbH
import sys
import logging
from datatypes import *
__all__ = ['Reference', 'RefOrAlias', 'Node', 'ReferenceTypeNode',
'ObjectNode', 'VariableNode', 'VariableTypeNode',
'MethodNode', 'ObjectTypeNode', 'DataTypeNode', 'ViewNode']
logger = logging.getLogger(__name__)
if sys.version_info[0] >= 3:
# strings are already parsed to unicode
def unicode(s):
return s
class Reference(object):
# all either nodeids or strings with an alias
def __init__(self, source, referenceType, target, isForward):
self.source = source
self.referenceType = referenceType
self.target = target
self.isForward = isForward
def __str__(self):
retval = str(self.source)
if not self.isForward:
retval = retval + "<"
retval = retval + "--[" + str(self.referenceType) + "]--"
if self.isForward:
retval = retval + ">"
return retval + str(self.target)
def __repr__(self):
return str(self)
def __eq__(self, other):
return str(self) == str(other)
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
return hash(str(self))
def RefOrAlias(s):
try:
return NodeId(s)
except Exception:
return s
class Node(object):
def __init__(self):
self.id = None
self.browseName = None
self.displayName = None
self.description = None
self.symbolicName = None
self.writeMask = None
self.userWriteMask = None
self.references = set()
self.hidden = False
self.modelUri = None
self.parent = None
self.parentReference = None
def __str__(self):
return self.__class__.__name__ + "(" + str(self.id) + ")"
def __repr__(self):
return str(self)
def sanitize(self):
pass
def parseXML(self, xmlelement):
for idname in ['NodeId', 'NodeID', 'nodeid']:
if xmlelement.hasAttribute(idname):
self.id = RefOrAlias(xmlelement.getAttribute(idname))
for (at, av) in xmlelement.attributes.items():
if at == "BrowseName":
self.browseName = QualifiedName(av)
elif at == "DisplayName":
self.displayName = LocalizedText(av)
elif at == "Description":
self.description = LocalizedText(av)
elif at == "WriteMask":
self.writeMask = int(av)
elif at == "UserWriteMask":
self.userWriteMask = int(av)
elif at == "EventNotifier":
self.eventNotifier = int(av)
elif at == "SymbolicName":
self.symbolicName = String(av)
for x in xmlelement.childNodes:
if x.nodeType != x.ELEMENT_NODE:
continue
if x.firstChild:
if x.localName == "BrowseName":
self.browseName = QualifiedName(x.firstChild.data)
elif x.localName == "DisplayName":
self.displayName = LocalizedText(x.firstChild.data)
elif x.localName == "Description":
self.description = LocalizedText(x.firstChild.data)
elif x.localName == "WriteMask":
self.writeMask = int(unicode(x.firstChild.data))
elif x.localName == "UserWriteMask":
self.userWriteMask = int(unicode(x.firstChild.data))
if x.localName == "References":
self.parseXMLReferences(x)
def parseXMLReferences(self, xmlelement):
for ref in xmlelement.childNodes:
if ref.nodeType != ref.ELEMENT_NODE:
continue
source = RefOrAlias(str(self.id)) # deep-copy of the nodeid
target = RefOrAlias(ref.firstChild.data)
reftype = None
forward = True
for (at, av) in ref.attributes.items():
if at == "ReferenceType":
reftype = RefOrAlias(av)
elif at == "IsForward":
forward = not "false" in av.lower()
self.references.add(Reference(source, reftype, target, forward))
def getParentReference(self, parentreftypes):
# HasSubtype has precedence
for ref in self.references:
if ref.referenceType == NodeId("ns=0;i=45") and not ref.isForward:
return ref
for ref in self.references:
if ref.referenceType in parentreftypes and not ref.isForward:
return ref
return None
def popTypeDef(self):
for ref in self.references:
if ref.referenceType.i == 40 and ref.isForward:
self.references.remove(ref)
return ref
return Reference(NodeId(), NodeId(), NodeId(), False)
def replaceAliases(self, aliases):
if str(self.id) in aliases:
self.id = NodeId(aliases[self.id])
if isinstance(self, VariableNode) or isinstance(self, VariableTypeNode):
if str(self.dataType) in aliases:
self.dataType = NodeId(aliases[self.dataType])
new_refs = set()
for ref in self.references:
if str(ref.source) in aliases:
ref.source = NodeId(aliases[ref.source])
if str(ref.target) in aliases:
ref.target = NodeId(aliases[ref.target])
if str(ref.referenceType) in aliases:
ref.referenceType = NodeId(aliases[ref.referenceType])
new_refs.add(ref)
self.references = new_refs
def replaceNamespaces(self, nsMapping):
self.id.ns = nsMapping[self.id.ns]
self.browseName.ns = nsMapping[self.browseName.ns]
if hasattr(self, 'dataType') and isinstance(self.dataType, NodeId):
self.dataType.ns = nsMapping[self.dataType.ns]
new_refs = set()
for ref in self.references:
ref.source.ns = nsMapping[ref.source.ns]
ref.target.ns = nsMapping[ref.target.ns]
ref.referenceType.ns = nsMapping[ref.referenceType.ns]
new_refs.add(ref)
self.references = new_refs
class ReferenceTypeNode(Node):
def __init__(self, xmlelement=None):
Node.__init__(self)
self.isAbstract = False
self.symmetric = False
self.inverseName = ""
if xmlelement:
ReferenceTypeNode.parseXML(self, xmlelement)
def parseXML(self, xmlelement):
Node.parseXML(self, xmlelement)
for (at, av) in xmlelement.attributes.items():
if at == "Symmetric":
self.symmetric = "false" not in av.lower()
elif at == "InverseName":
self.inverseName = str(av)
elif at == "IsAbstract":
self.isAbstract = "false" not in av.lower()
for x in xmlelement.childNodes:
if x.nodeType == x.ELEMENT_NODE:
if x.localName == "InverseName" and x.firstChild:
self.inverseName = str(unicode(x.firstChild.data))
class ObjectNode(Node):
def __init__(self, xmlelement=None):
Node.__init__(self)
self.eventNotifier = 0
if xmlelement:
ObjectNode.parseXML(self, xmlelement)
def parseXML(self, xmlelement):
Node.parseXML(self, xmlelement)
for (at, av) in xmlelement.attributes.items():
if at == "EventNotifier":
self.eventNotifier = int(av)
class VariableNode(Node):
def __init__(self, xmlelement=None):
Node.__init__(self)
self.dataType = None
self.valueRank = None
self.arrayDimensions = []
# Set access levels to read by default
self.accessLevel = 1
self.userAccessLevel = 1
self.minimumSamplingInterval = 0.0
self.historizing = False
self.value = None
self.xmlValueDef = None
if xmlelement:
VariableNode.parseXML(self, xmlelement)
def parseXML(self, xmlelement):
Node.parseXML(self, xmlelement)
for (at, av) in xmlelement.attributes.items():
if at == "ValueRank":
self.valueRank = int(av)
elif at == "AccessLevel":
self.accessLevel = int(av)
elif at == "UserAccessLevel":
self.userAccessLevel = int(av)
elif at == "MinimumSamplingInterval":
self.minimumSamplingInterval = float(av)
elif at == "DataType":
self.dataType = RefOrAlias(av)
elif at == "ArrayDimensions":
self.arrayDimensions = av.split(",")
for x in xmlelement.childNodes:
if x.nodeType != x.ELEMENT_NODE:
continue
if x.localName == "Value":
self.xmlValueDef = x
elif x.localName == "DataType":
self.dataType = RefOrAlias(av)
elif x.localName == "ValueRank":
self.valueRank = int(unicode(x.firstChild.data))
elif x.localName == "ArrayDimensions" and len(self.arrayDimensions) == 0:
elements = x.getElementsByTagName("ListOfUInt32");
if len(elements):
for idx, v in enumerate(elements[0].getElementsByTagName("UInt32")):
self.arrayDimensions.append(v.firstChild.data)
elif x.localName == "AccessLevel":
self.accessLevel = int(unicode(x.firstChild.data))
elif x.localName == "UserAccessLevel":
self.userAccessLevel = int(unicode(x.firstChild.data))
elif x.localName == "MinimumSamplingInterval":
self.minimumSamplingInterval = float(unicode(x.firstChild.data))
elif x.localName == "Historizing":
self.historizing = "false" not in x.lower()
def allocateValue(self, nodeset):
dataTypeNode = nodeset.getDataTypeNode(self.dataType)
if dataTypeNode is None:
return False
# FIXME: Don't build at all or allocate "defaults"? I'm for not building at all.
if self.xmlValueDef is None:
#logger.warn("Variable " + self.browseName() + "/" + str(self.id()) + " is not initialized. No memory will be allocated.")
return False
self.value = Value()
self.value.parseXMLEncoding(self.xmlValueDef, dataTypeNode, self)
return True
class VariableTypeNode(VariableNode):
def __init__(self, xmlelement=None):
VariableNode.__init__(self)
self.isAbstract = False
if xmlelement:
VariableTypeNode.parseXML(self, xmlelement)
def parseXML(self, xmlelement):
VariableNode.parseXML(self, xmlelement)
for (at, av) in xmlelement.attributes.items():
if at == "IsAbstract":
self.isAbstract = "false" not in av.lower()
for x in xmlelement.childNodes:
if x.nodeType != x.ELEMENT_NODE:
continue
if x.localName == "IsAbstract":
self.isAbstract = "false" not in av.lower()
class MethodNode(Node):
def __init__(self, xmlelement=None):
Node.__init__(self)
self.executable = True
self.userExecutable = True
self.methodDecalaration = None
if xmlelement:
MethodNode.parseXML(self, xmlelement)
def parseXML(self, xmlelement):
Node.parseXML(self, xmlelement)
for (at, av) in xmlelement.attributes.items():
if at == "Executable":
self.executable = "false" not in av.lower()
if at == "UserExecutable":
self.userExecutable = "false" not in av.lower()
if at == "MethodDeclarationId":
self.methodDeclaration = str(av)
class ObjectTypeNode(Node):
def __init__(self, xmlelement=None):
Node.__init__(self)
self.isAbstract = False
if xmlelement:
ObjectTypeNode.parseXML(self, xmlelement)
def parseXML(self, xmlelement):
Node.parseXML(self, xmlelement)
for (at, av) in xmlelement.attributes.items():
if at == "IsAbstract":
self.isAbstract = "false" not in av.lower()
class DataTypeNode(Node):
""" DataTypeNode is a subtype of Node describing DataType nodes.
DataType contain definitions and structure information usable for Variables.
The format of this structure is determined by buildEncoding()
Two definition styles are distinguished in XML:
1) A DataType can be a structure of fields, each field having a name and a type.
The type must be either an encodable builtin node (ex. UInt32) or point to
another DataType node that inherits its encoding from a builtin type using
a inverse "hasSubtype" (hasSuperType) reference.
2) A DataType may be an enumeration, in which each field has a name and a numeric
value.
The definition is stored as an ordered list of tuples. Depending on which
definition style was used, the __definition__ will hold
1) A list of ("Fieldname", Node) tuples.
2) A list of ("Fieldname", int) tuples.
A DataType (and in consequence all Variables using it) shall be deemed not
encodable if any of its fields cannot be traced to an encodable builtin type.
A DataType shall be further deemed not encodable if it contains mixed structure/
enumaration definitions.
If encodable, the encoding can be retrieved using getEncoding().
"""
def __init__(self, xmlelement=None):
Node.__init__(self)
self.isAbstract = False
self.__xmlDefinition__ = None
self.__baseTypeEncoding__ = []
self.__encodable__ = None
self.__definition__ = []
self.__isEnum__ = False
self.__isOptionSet__ = False
if xmlelement:
DataTypeNode.parseXML(self, xmlelement)
def parseXML(self, xmlelement):
Node.parseXML(self, xmlelement)
for (at, av) in xmlelement.attributes.items():
if at == "IsAbstract":
self.isAbstract = "false" not in av.lower()
for x in xmlelement.childNodes:
if x.nodeType == x.ELEMENT_NODE:
if x.localName == "Definition":
self.__xmlDefinition__ = x
def isEncodable(self):
""" Will return True if buildEncoding() was able to determine which builtin
type corresponds to all fields of this DataType.
If no encoding has been build yet an exception will be thrown.
Make sure to call buildEncoding() first.
"""
if self.__encodable__ is None:
raise Exception("Encoding needs to be built first using buildEncoding()")
return self.__encodable__
def getEncoding(self):
""" If the dataType is encodable, getEncoding() returns a nested list
containing the encoding the structure definition for this type.
If no encoding has been build yet an exception will be thrown.
Make sure to call buildEncoding() first.
If buildEncoding() has failed, an empty list will be returned.
"""
if self.__encodable__ is None:
raise Exception("Encoding needs to be built first using buildEncoding()")
if not self.__encodable__:
return []
else:
return self.__baseTypeEncoding__
def buildEncoding(self, nodeset, indent=0, force=False, namespaceMapping=None):
""" buildEncoding() determines the structure and aliases used for variables
of this DataType.
The function will parse the XML <Definition> of the dataType and extract
"Name"-"Type" tuples. If successful, buildEncoding will return a nested
list of the following format:
[['Alias1', ['Alias2', ['BuiltinType']]], [Alias2, ['BuiltinType']], ...]
Aliases are fieldnames defined by this DataType or DataTypes referenced. A
list such as ['DataPoint', ['Int32']] indicates that a value will encode
an Int32 with the alias 'DataPoint' such as <DataPoint>12827</DataPoint>.
Only the first Alias of a nested list is considered valid for the BuiltinType.
Single-Elemented lists are always BuiltinTypes. Every nested list must
converge in a builtin type to be encodable. buildEncoding will follow
the first type inheritance reference (hasSupertype) of the dataType if
necessary;
If instead to "DataType" a numeric "Value" attribute is encountered,
the DataType will be considered an enumeration and all Variables using
it will be encoded as Int32.
DataTypes can be either structures or enumeration - mixed definitions will
be unencodable.
Calls to getEncoding() will be iterative. buildEncoding() can be called
only once per dataType, with all following calls returning the predetermined
value. Use of the 'force=True' parameter will force the Definition to be
reparsed.
After parsing, __definition__ holds the field definition as a list. Note
that this might deviate from the encoding, especially if inheritance was
used.
"""
prefix = " " + "|" * indent + "+"
if force==True:
self.__encodable__ = None
if self.__encodable__ is not None and self.__encodable__:
if self.isEncodable():
logger.debug(prefix + str(self.__baseTypeEncoding__) + " (already analyzed)")
else:
logger.debug( prefix + str(self.__baseTypeEncoding__) + "(already analyzed, not encodable!)")
return self.__baseTypeEncoding__
self.__encodable__ = True
if indent==0:
logger.debug("Parsing DataType " + str(self.browseName) + " (" + str(self.id) + ")")
if valueIsInternalType(self.browseName.name):
self.__baseTypeEncoding__ = [self.browseName.name]
self.__encodable__ = True
logger.debug( prefix + str(self.browseName) + "*")
logger.debug("Encodable as: " + str(self.__baseTypeEncoding__))
logger.debug("")
return self.__baseTypeEncoding__
# Check if there is a supertype available
parentType = None
for ref in self.references:
if ref.isForward:
continue
# hasSubtype
if ref.referenceType.i == 45:
targetNode = nodeset.nodes[ref.target]
if targetNode is not None and isinstance(targetNode, DataTypeNode):
parentType = targetNode
break
if self.__xmlDefinition__ is None:
if parentType is not None:
logger.debug( prefix + "Attempting definition using supertype " + str(targetNode.browseName) + " for DataType " + " " + str(self.browseName))
subenc = targetNode.buildEncoding(nodeset=nodeset, indent=indent+1,
namespaceMapping=namespaceMapping)
if not targetNode.isEncodable():
self.__encodable__ = False
else:
self.__baseTypeEncoding__ = self.__baseTypeEncoding__ + [self.browseName.name, subenc, None]
if len(self.__baseTypeEncoding__) == 0:
logger.debug(prefix + "No viable definition for " + str(self.browseName) + " " + str(self.id) + " found.")
self.__encodable__ = False
if indent==0:
if not self.__encodable__:
logger.debug("Not encodable (partial): " + str(self.__baseTypeEncoding__))
else:
logger.debug("Encodable as: " + str(self.__baseTypeEncoding__))
logger.debug( "")
return self.__baseTypeEncoding__
isEnum = False
# An option set is at the same time also an enum, at least for the encoding below
isOptionSet = parentType is not None and parentType.id.ns == 0 and parentType.id.i==12755
# We need to store the definition as ordered data, but can't use orderedDict
# for backward compatibility with Python 2.6 and 3.4
enumDict = []
typeDict = []
# An XML Definition is provided and will be parsed... now
for x in self.__xmlDefinition__.childNodes:
if x.nodeType == x.ELEMENT_NODE:
fname = ""
fdtype = ""
enumVal = ""
valueRank = None
#symbolicName = None
for at,av in x.attributes.items():
if at == "DataType":
fdtype = str(av)
if fdtype in nodeset.aliases:
fdtype = nodeset.aliases[fdtype]
elif at == "Name":
fname = str(av)
elif at == "SymbolicName":
# ignore
continue
# symbolicName = str(av)
elif at == "Value":
enumVal = int(av)
isEnum = True
elif at == "ValueRank":
valueRank = int(av)
else:
logger.warn("Unknown Field Attribute " + str(at))
# This can either be an enumeration OR a structure, not both.
# Figure out which of the dictionaries gets the newly read value pair
if isEnum:
# This is an enumeration
enumDict.append((fname, enumVal))
continue
else:
if fdtype == "":
# If no datatype given use base datatype
fdtype = "i=24"
# This might be a subtype... follow the node defined as datatype to find out
# what encoding to use
fdTypeNodeId = NodeId(fdtype)
if namespaceMapping != None:
fdTypeNodeId.ns = namespaceMapping[fdTypeNodeId.ns]
if not fdTypeNodeId in nodeset.nodes:
raise Exception("Node {} not found in nodeset".format(fdTypeNodeId))
dtnode = nodeset.nodes[fdTypeNodeId]
# The node in the datatype element was found. we inherit its encoding,
# but must still ensure that the dtnode is itself validly encodable
typeDict.append([fname, dtnode])
fdtype = str(dtnode.browseName.name)
logger.debug( prefix + fname + " : " + fdtype + " -> " + str(dtnode.id))
subenc = dtnode.buildEncoding(nodeset=nodeset, indent=indent+1,
namespaceMapping=namespaceMapping)
self.__baseTypeEncoding__ = self.__baseTypeEncoding__ + [[fname, subenc, valueRank]]
if not dtnode.isEncodable():
# If we inherit an encoding from an unencodable node, this node is
# also not encodable
self.__encodable__ = False
break
# If we used inheritance to determine an encoding without alias, there is a
# the possibility that lists got double-nested despite of only one element
# being encoded, such as [['Int32']] or [['alias',['int32']]]. Remove that
# enclosing list.
while len(self.__baseTypeEncoding__) == 1 and isinstance(self.__baseTypeEncoding__[0], list):
self.__baseTypeEncoding__ = self.__baseTypeEncoding__[0]
if isOptionSet == True:
self.__isOptionSet__ = True
subenc = parentType.buildEncoding(nodeset=nodeset, namespaceMapping=namespaceMapping)
if not parentType.isEncodable():
self.__encodable__ = False
else:
self.__baseTypeEncoding__ = self.__baseTypeEncoding__ + [self.browseName.name, subenc, None]
self.__definition__ = enumDict
return self.__baseTypeEncoding__
if isEnum == True:
self.__baseTypeEncoding__ = self.__baseTypeEncoding__ + ['Int32']
self.__definition__ = enumDict
self.__isEnum__ = True
logger.debug( prefix+"Int32* -> enumeration with dictionary " + str(enumDict) + " encodable " + str(self.__encodable__))
return self.__baseTypeEncoding__
if indent==0:
if not self.__encodable__:
logger.debug( "Not encodable (partial): " + str(self.__baseTypeEncoding__))
else:
logger.debug( "Encodable as: " + str(self.__baseTypeEncoding__))
self.__isEnum__ = False
self.__definition__ = typeDict
logger.debug( "")
return self.__baseTypeEncoding__
class ViewNode(Node):
def __init__(self, xmlelement=None):
Node.__init__(self)
self.containsNoLoops = False
self.eventNotifier = False
if xmlelement:
ViewNode.parseXML(self, xmlelement)
def parseXML(self, xmlelement):
Node.parseXML(self, xmlelement)
for (at, av) in xmlelement.attributes.items():
if at == "ContainsNoLoops":
self.containsNoLoops = "false" not in av.lower()
if at == "EventNotifier":
self.eventNotifier = "false" not in av.lower()