from xml.sax.handler import ContentHandler
from xml.sax import parse as xml_parse
from urllib import parse
from opyenxes.factory.XFactoryRegistry import XFactoryRegistry
from opyenxes.extension.XExtensionManager import XExtensionManager
from opyenxes.classification.XEventAttributeClassifier import XEventAttributeClassifier
from opyenxes.utils.XsDateTimeConversion import parse_date_time
from opyenxes.utils.XTokenHelper import XTokenHelper
from opyenxes.id.XID import XID
from opyenxes.model.XAttributeCollection import XAttributeCollection
[docs]class XesXmlParser:
"""Parser for the XES XML serialization.
:param factory: The XES model factory instance used to build the model from
the serialization.
:type factory: `XFactory`
"""
def __init__(self, factory=None):
if factory:
self.factory = factory
else:
self.factory = XFactoryRegistry().current_default()
[docs] def can_parse(self, file):
"""Checks whether this parser can handle the given file.
:param file: path of the file to check against parser.
:type file: str
:return: Whether this parser can handle the given file.
:rtype: bool
"""
return self.ends_with_ignore_case(file, ".xes")
[docs] def parse(self, file):
"""Parses a log from the given input stream, which is supposed to
deliver an XES log in XML representation.
:param file: file generated by the function 'open(path)', which is
supposed to deliver an XES log in XML representation.
:type file: _io.TextIOWrapper
:return: The parsed list of logs.
:rtype: list[`XLog`]
"""
handler = XesXmlParser.XesXmlHandler()
xml_parse(file, handler)
return [handler.get_log()]
[docs] @staticmethod
def ends_with_ignore_case(name, suffix):
"""Returns whether the given file name ends (ignoring the case) with the
given suffix.
:param name: The given file name.
:type name: str
:param suffix: The given suffix.
:type suffix: str
:return: Whether the given file name ends (ignoring the case) with the given suffix.
:rtype: bool
"""
i = len(name) - len(suffix)
if i > 0:
return suffix in name[i:]
return False
[docs] class XesXmlHandler(ContentHandler):
"""SAX handler class for XES in XML representation.
"""
def __init__(self):
super().__init__()
self.__log = None
self.__trace = None
self.__event = None
self.__attribute_stack = list() # stack
self.__attributable_stack = list() # stack
self.__extension = set()
self.__globals = None
self.__last = None
[docs] def get_log(self):
"""Retrieves the parsed log.
:return: The parsed log.
:rtype: `XLog`
"""
return self.__log
[docs] def startElement(self, element_name, attributes):
""" Overrides startElement in class ContentHandler
:param element_name: Contains the raw XML 1.0 name of the element type.
:type element_name: str
:param attributes: An instance of the Attributes class containing
the attributes of the element
:type attributes: xml.sax.xmlreader.AttributesImpl
"""
tag_name = element_name.lower()
if tag_name not in ["string", "date", "int", "float", "boolean", "id", "list", "container"]:
if tag_name == "event":
self.__event = XesXmlParser().factory.create_event()
self.__attributable_stack.append(self.__event)
elif tag_name == "trace":
self.__trace = XesXmlParser().factory.create_trace()
self.__attributable_stack.append(self.__trace)
elif tag_name == "log":
self.__log = XesXmlParser().factory.create_log()
for attr_name in attributes.getNames():
if attr_name.startswith('xes.') or \
attr_name.startswith('openxes.'):
attr = attributes.get(attr_name)
self.__log.set_features(attr_name, attr)
self.__attributable_stack.append(self.__log)
elif tag_name == "extension":
extension = None
prefix = attributes.get("prefix")
keys = attributes.get("uri")
if prefix is not None:
extension = XExtensionManager().get_by_prefix(prefix)
elif keys is not None:
extension = XExtensionManager().get_by_uri(parse.urlparse(keys))
if extension is not None:
self.__log.get_extensions().add(extension)
else:
print("Unknown extension: " + keys)
elif tag_name == "global":
name = attributes.get("scope")
if name.lower() == "trace":
self.__globals = self.__log.get_global_trace_attributes()
elif name.lower() == "event":
self.__globals = self.__log.get_global_event_attributes()
elif tag_name == "classifier":
name = attributes.get("name")
keys = attributes.get("keys")
if name is not None and keys is not None and len(name) > 0 and len(keys) > 0:
array = XTokenHelper.extract_tokens(keys)
classifier = XEventAttributeClassifier(name, array)
self.__log.get_classifiers().append(classifier)
else:
name = attributes.get("key")
if name is None:
name = "UNKNOWN"
value = attributes.get("value")
if value is None:
value = ""
keys_list = None
if ":" in name:
prefix = name[:name.index(":")]
keys_list = XExtensionManager().get_by_prefix(prefix)
attribute = None
if tag_name == "string":
attribute = XesXmlParser().factory.create_attribute_literal(name, value, keys_list)
elif tag_name == "date":
var15 = parse_date_time(value)
if var15 is None:
return
attribute = XesXmlParser().factory.create_attribute_timestamp(name, var15, keys_list)
elif tag_name == "int":
attribute = XesXmlParser().factory.create_attribute_discrete(name, int(value), keys_list)
elif tag_name == "float":
attribute = XesXmlParser().factory.create_attribute_continuous(name, float(value), keys_list)
elif tag_name == "boolean":
var16 = True if value.lower() == "true" else False
attribute = XesXmlParser().factory.create_attribute_boolean(name, var16, keys_list)
elif tag_name == "id":
attribute = XesXmlParser().factory.create_attribute_id(name, XID.parse(value), keys_list)
elif tag_name == "list":
attribute = XesXmlParser().factory.create_attribute_list(name, keys_list)
elif tag_name == "container":
attribute = XesXmlParser().factory.create_attribute_container(name, keys_list)
if attribute is not None:
self.__attribute_stack.append(attribute)
self.__attributable_stack.append(attribute)
[docs] def endElement(self, local_name):
""" Overrides endElement in class ContentHandler
:param local_name: The name of the element type, just as with the startElement event
:type local_name: str
"""
tag_name = local_name
if tag_name.lower() == "global":
self.__globals = None
elif tag_name.lower() not in ["string", "date", "int", "float", "boolean", "id", "list", "container"]:
if tag_name.lower() == "event":
self.__trace.append(self.__event)
self.__event = None
self.__attributable_stack.pop()
elif tag_name.lower() == "trace":
self.__log.append(self.__trace)
self.__trace = None
self.__attributable_stack.pop()
elif tag_name.lower() == "log":
for extension in self.__extension:
self.__log.get_extensions().add(extension)
self.__attributable_stack.pop()
else:
i = self.__attribute_stack.pop()
self.__attributable_stack.pop()
if self.__globals is not None:
self.__globals.append(i)
else:
self.__attributable_stack[-1].get_attributes()[i.get_key()] = i
if len(self.__attribute_stack) != 0 and\
isinstance(self.__attribute_stack[-1], XAttributeCollection):
self.__attribute_stack[-1].add_to_collection(i)