# Copyright (c) 2017 "Shopify inc." All rights reserved.
# Use of this source code is governed by a MIT-style license that can be found in the LICENSE file.
from __future__ import unicode_literals
import abc
import collections
import copy
import datetime
import itertools
import re
import string # pylint: disable=deprecated-module
import uuid
import enum
import typing # pylint: disable=unused-import
import six
import yattag
MAX_NAME_LENGTH = 255
MAX_IDENTIFIER_LENGTH = 50
REGEX_IDENTIFIER = r'^[a-zA-Z_][\-_a-zA-Z0-9]{0,%i}$' % (MAX_IDENTIFIER_LENGTH - 1)
COMPILED_REGEX_IDENTIFIER = re.compile(REGEX_IDENTIFIER)
ALLOWABLE_NAME_CHARS = set(string.ascii_letters + string.punctuation + string.digits + ' ')
ONE_HUNDRED_YEARS = 100 * 365.24
PropertyValuesType = typing.Dict[typing.Text, typing.Any]
JobXmlFilesType = typing.Iterable[typing.Text]
[docs]class ExecutionOrder(enum.Enum):
"""Execution order used for coordinator jobs."""
FIFO = 'FIFO'
LAST_ONLY = 'LAST_ONLY'
LIFO = 'LIFO'
NONE = 'NONE'
def __str__(self):
return self.value
EXEC_FIFO = ExecutionOrder.FIFO
EXEC_LAST_ONLY = ExecutionOrder.LAST_ONLY
EXEC_LIFO = ExecutionOrder.LIFO
EXEC_NONE = ExecutionOrder.NONE
[docs]def validate_xml_name(name):
# type: (typing.Text) -> typing.Text
assert len(name) <= MAX_NAME_LENGTH, \
"Name must be less than {max_length} chars long, '{name}' is {length}".format(
max_length=MAX_NAME_LENGTH,
name=name,
length=len(name))
assert all(c in ALLOWABLE_NAME_CHARS for c in name), \
"Name must be comprised of printable ASCII characters, '{name}' is not".format(name=name)
return name
[docs]def validate_xml_id(identifier):
# type: (typing.Text) -> typing.Text
assert len(identifier) <= MAX_IDENTIFIER_LENGTH, \
"Identifier must be less than {max_length} chars long, '{identifier}' is {length}".format(
max_length=MAX_IDENTIFIER_LENGTH,
identifier=identifier,
length=len(identifier))
assert COMPILED_REGEX_IDENTIFIER.match(identifier), \
"Identifier must match {regex}, '{identifier}' does not".format(
regex=REGEX_IDENTIFIER,
identifier=identifier)
return identifier
class XMLSerializable(object):
"""An abstract object that can be serialized to XML."""
__metaclass__ = abc.ABCMeta
def __init__(self, xml_tag):
# type: (typing.Text) -> None
self.xml_tag = xml_tag
def xml(self, indent=False):
# type: (bool) -> str
doc, tag, text = yattag.Doc().tagtext()
doc.asis("<?xml version='1.0' encoding='UTF-8'?>")
xml = self._xml(doc, tag, text).getvalue()
if indent:
xml = yattag.indent(xml, indentation=' ' * 4, newline='\r\n')
return xml.encode('utf-8')
@abc.abstractmethod
def _xml(self, doc, tag, text):
# type: (yattag.doc.Doc, yattag.doc.Doc.tag, yattag.doc.Doc.text) -> yattag.doc.Doc
raise NotImplementedError()
class _PropertyList(XMLSerializable, dict):
"""
Object used to represent Oozie workflow/coordinator property-value sets.
Generates XML of the form:
...
<xml_tag>
<property>
<name>[PROPERTY-NAME]</name>
<value>[PROPERTY-VALUE]</value>
</property>
...
</xml_tag>
"""
def __init__(
self,
xml_tag, # type: typing.Text
attributes=None, # type: typing.Optional[typing.Dict[typing.Text, typing.Text]]
values=None # type: typing.Optional[PropertyValuesType]
):
# type: (...) -> None
super(_PropertyList, self).__init__(xml_tag=xml_tag)
if values:
self.update(values)
self.attributes = attributes or {}
def _xml(self, doc, tag, text):
# type: (yattag.doc.Doc, yattag.doc.Doc.tag, yattag.doc.Doc.text) -> yattag.doc.Doc
with tag(self.xml_tag, **self.attributes):
for name, value in sorted(self.items()):
with tag('property'):
with tag('name'):
doc.text('{}'.format(name))
with tag('value'):
doc.text('{}'.format(value) if value is not None else '')
return doc
[docs]class Parameters(_PropertyList):
"""Coordinator/workflow parameters.
Allows one to specify properties that can be reused in actions. "Properties that are a valid Java identifier,
`[A-Za-z_][0-9A-Za-z_]*` , are available as '${NAME}' variables within the workflow definition."
"Properties that are not valid Java Identifier, for example 'job.tracker', are available via the
String wf:conf(String name) function. Valid identifier properties are available via this function as well."
"""
def __init__(self, values=None):
# type: (typing.Optional[PropertyValuesType]) -> None
super(Parameters, self).__init__(xml_tag='parameters', values=values)
[docs]class Configuration(_PropertyList):
"""Coordinator job submission, workflow, workflow action configuration XML."""
def __init__(self, values=None):
# type: (typing.Optional[PropertyValuesType]) -> None
super(Configuration, self).__init__(xml_tag='configuration', values=values)
[docs]class Credential(_PropertyList):
"""HCatalog, Hive Metastore, HBase, or Hive Server 2 action credentials.
Generates XML of the form::
...
<credentials>
<credential name='my-hcat-creds' type='hcat'>
<property>
<name>hcat.metastore.uri</name>
<value>HCAT_URI</value>
</property>
...
</credential>
</credentials>
<action name='pig' cred='my-hcat-creds'>
<pig>
...
"""
def __init__(
self,
values, # type: PropertyValuesType
credential_name, # type: typing.Text
credential_type # type: typing.Text
):
# type: (...) -> None
super(Credential, self).__init__(
xml_tag='credential',
attributes={
'name': credential_name,
'type': credential_type,
},
values=values
)
self.name = validate_xml_id(credential_name)
[docs]class Shell(XMLSerializable):
"""Workflow shell action (v0.3)."""
def __init__(
self,
exec_command, # type: typing.Text
job_tracker=None, # type: typing.Optional[typing.Text]
name_node=None, # type: typing.Optional[typing.Text]
prepare=None, # type: typing.Optional[typing.Sequence]
job_xml_files=None, # type: typing.Optional[JobXmlFilesType]
configuration=None, # type: typing.Optional[PropertyValuesType]
arguments=None, # type: typing.Optional[typing.Iterable[typing.Text]]
env_vars=None, # type: typing.Optional[PropertyValuesType]
files=None, # type: typing.Optional[typing.Iterable[typing.Text]]
archives=None, # type: typing.Optional[typing.Iterable[typing.Text]]
capture_output=False # type: bool
):
# type: (...) -> None
super(Shell, self).__init__(xml_tag='shell')
self.exec_command = exec_command
self.job_tracker = job_tracker
self.name_node = name_node
self.prepare = prepare if prepare else []
self.job_xml_files = job_xml_files if job_xml_files else []
self.configuration = Configuration(configuration)
self.arguments = arguments if arguments else []
self.env_vars = env_vars if env_vars else {}
self.files = files if files else []
self.archives = archives if archives else []
self.capture_output = capture_output
def _xml(self, doc, tag, text):
# type: (yattag.doc.Doc, yattag.doc.Doc.tag, yattag.doc.Doc.text) -> yattag.doc.Doc
with tag(self.xml_tag, xmlns='uri:oozie:shell-action:0.3'):
if self.job_tracker:
with tag('job-tracker'):
doc.text(self.job_tracker)
if self.name_node:
with tag('name-node'):
doc.text(self.name_node)
if self.prepare:
raise NotImplementedError("Shell action's prepare has not yet been implemented")
for xml_file in self.job_xml_files:
with tag('job-xml'):
doc.text(xml_file)
if self.configuration:
self.configuration._xml(doc, tag, text)
with tag('exec'):
doc.text(self.exec_command)
for argument in self.arguments:
with tag('argument'):
doc.text(argument)
for key, value in self.env_vars.items():
with tag('env-var'):
doc.text('{key}={value}'.format(key=key, value=value))
for filename in self.files:
with tag('file'):
doc.text(filename)
for archive in self.archives:
with tag('archive'):
doc.text(archive)
if self.capture_output:
doc.stag('capture-output')
return doc
[docs]class SubWorkflow(XMLSerializable):
"""Run another workflow defined in another XML file on HDFS.
An Oozie sub-workflow is an "action [that] runs a child workflow job [...]. The parent workflow job will wait
until the child workflow job has completed."
"""
def __init__(
self,
app_path, # type: str
propagate_configuration=True, # type: bool
configuration=None # type: typing.Optional[PropertyValuesType]
):
# type: (...) -> None
super(SubWorkflow, self).__init__(xml_tag='sub-workflow')
self.app_path = app_path
self.propagate_configuration = propagate_configuration
self.configuration = Configuration(configuration)
def _xml(self, doc, tag, text):
# type: (yattag.doc.Doc, yattag.doc.Doc.tag, yattag.doc.Doc.text) -> yattag.doc.Doc
with tag(self.xml_tag):
with tag('app-path'):
doc.text(self.app_path)
if self.propagate_configuration:
doc.stag('propagate-configuration')
if self.configuration:
self.configuration._xml(doc, tag, text)
return doc
[docs]class GlobalConfiguration(XMLSerializable):
"""Global configuration values for all actions in a workflow.
"Oozie allows a global section to reduce the redundant job-tracker and name-node declarations for each action.
[...] The global section may contain the job-xml, configuration, job-tracker, or name-node that the user would
like to set for every action. If a user then redefines one of these in a specific action node, Oozie will
update [sic] use the specific declaration instead of the global one for that action."
"The job-xml element, if present, must refer to a Hadoop JobConf job.xml file bundled in the workflow
application."
"""
def __init__(
self,
job_tracker=None, # type: typing.Optional[typing.Text]
name_node=None, # type: typing.Optional[typing.Text]
job_xml_files=None, # type: typing.Optional[JobXmlFilesType]
configuration=None, # type: typing.Optional[PropertyValuesType]
):
# type: (...) -> None
super(GlobalConfiguration, self).__init__(xml_tag='global')
self.job_tracker = job_tracker
self.name_node = name_node
self.job_xml_files = job_xml_files if job_xml_files else []
self.configuration = Configuration(configuration)
def _xml(self, doc, tag, text):
# type: (yattag.doc.Doc, yattag.doc.Doc.tag, yattag.doc.Doc.text) -> yattag.doc.Doc
with tag(self.xml_tag):
if self.job_tracker:
with tag('job-tracker'):
doc.text(self.job_tracker)
if self.name_node:
with tag('name-node'):
doc.text(self.name_node)
if self.job_xml_files:
for xml_file in self.job_xml_files:
with tag('job-xml'):
doc.text(xml_file)
if self.configuration:
self.configuration._xml(doc, tag, text)
return doc
[docs]class Email(XMLSerializable):
"""Email action for use within a workflow."""
def __init__(
self,
to, # type: typing.Union[typing.Text, typing.Iterable[typing.Text]]
subject, # type: typing.Text
body, # type: typing.Text
cc=None, # type: typing.Optional[typing.Union[typing.Text, typing.Iterable[typing.Text]]]
bcc=None, # type: typing.Optional[typing.Union[typing.Text, typing.Iterable[typing.Text]]]
content_type=None, # type: typing.Optional[typing.Text]
attachments=None # type: typing.Optional[typing.Text]
):
# type: (...) -> None
super(Email, self).__init__(xml_tag='email')
self.to = to
self.subject = subject
self.body = body
self.cc = cc
self.bcc = bcc
self.content_type = content_type
self.attachments = attachments
def _xml(self, doc, tag, text):
# type: (yattag.doc.Doc, yattag.doc.Doc.tag, yattag.doc.Doc.text) -> yattag.doc.Doc
def format_list(strings):
if hasattr(strings, '__iter__') and not isinstance(strings, six.string_types):
return ','.join(sorted(strings))
else:
return strings
with tag(self.xml_tag, xmlns='uri:oozie:email-action:0.2'):
with tag('to'):
doc.text(format_list(self.to))
with tag('subject'):
doc.text(self.subject)
with tag('body'):
doc.text(self.body)
if self.cc:
with tag('cc'):
doc.text(format_list(self.cc))
if self.bcc:
with tag('bcc'):
doc.text(format_list(self.bcc))
if self.content_type:
with tag('content_type'):
doc.text(self.content_type)
if self.attachments:
with tag('attachment'):
doc.text(format_list(self.attachments))
return doc
[docs]class CoordinatorApp(XMLSerializable):
def __init__(
self,
name, # type: typing.Text
workflow_app_path, # type: typing.Text
frequency, # type: int
start, # type: datetime.datetime
end=None, # type: typing.Optional[datetime.datetime]
timezone=None, # type: typing.Optional[typing.Text]
workflow_configuration=None, # type: typing.Optional[PropertyValuesType]
timeout=None, # type: typing.Optional[int]
concurrency=None, # type: typing.Optional[int]
execution_order=None, # type: typing.Optional[ExecutionOrder]
throttle=None, # type: typing.Optional[int]
parameters=None # type: typing.Optional[PropertyValuesType]
):
# type: (...) -> None
super(CoordinatorApp, self).__init__(xml_tag='coordinator-app')
# Compose and validate dates/frequencies
if end is None:
end = start + datetime.timedelta(days=ONE_HUNDRED_YEARS)
assert end > start, "End time ({end}) must be greater than the start time ({start})".format(
end=CoordinatorApp.__format_datetime(end), start=CoordinatorApp.__format_datetime(start))
assert frequency >= 5, "Frequency ({frequency} min) must be greater than or equal to 5 min".format(
frequency=frequency)
# Coordinator
self.name = validate_xml_name(name)
self.frequency = frequency
self.start = start
self.end = end
self.timezone = timezone if timezone else 'UTC'
# Workflow action
self.workflow_app_path = workflow_app_path
self.workflow_configuration = Configuration(workflow_configuration)
# Controls
self.timeout = timeout
self.concurrency = concurrency
self.execution_order = execution_order
self.throttle = throttle
self.parameters = Parameters(parameters)
@staticmethod
def __format_datetime(value): # type: (datetime.datetime) -> typing.Text
return value.strftime('%Y-%m-%dT%H:%MZ')
def _xml(self, doc, tag, text):
# type: (yattag.doc.Doc, yattag.doc.Doc.tag, yattag.doc.Doc.text) -> yattag.doc.Doc
with tag(self.xml_tag, xmlns="uri:oozie:coordinator:0.4", name=self.name, frequency=str(self.frequency),
start=CoordinatorApp.__format_datetime(self.start), end=CoordinatorApp.__format_datetime(self.end),
timezone=self.timezone):
if self.parameters:
self.parameters._xml(doc, tag, text)
if self.timeout or self.concurrency or self.execution_order or self.throttle:
with tag('controls'):
if self.timeout:
with tag('timeout'):
text(str(self.timeout))
if self.concurrency:
with tag('concurrency'):
text(str(self.concurrency))
if self.execution_order:
with tag('execution'):
text(str(self.execution_order))
if self.throttle:
with tag('throttle'):
text(str(self.throttle))
with tag('action'):
with tag('workflow'):
with tag('app-path'):
text(self.workflow_app_path)
if self.workflow_configuration:
self.workflow_configuration._xml(doc, tag, text)
return doc
class _AbstractWorkflowEntity(typing.Iterable):
"""An abstract object representing an Oozie workflow action that can be serialized to XML."""
# pylint: disable=abstract-method
__metaclass__ = abc.ABCMeta
def __init__(
self,
xml_tag=None, # type: typing.Optional[typing.Text]
name=None, # type: typing.Optional[typing.Text]
on_error=None # type: typing.Optional[_AbstractWorkflowEntity]
):
# type: (...) -> None
self.__xml_tag = xml_tag
self.__name = name if name else uuid.uuid4().hex[:8]
self.__on_error = copy.deepcopy(on_error)
self.__identifier = self.create_identifier(xml_tag)
def xml_tag(self):
# type: () -> typing.Text
return self.__xml_tag
def identifier(self):
# type: () -> typing.Text
return self.__identifier
def create_identifier(self, xml_tag):
# type: (typing.Text) -> typing.Text
return validate_xml_id('{tag}-{name}'.format(tag=xml_tag, name=self.__name))
def _xml_and_get_on_error(self, doc, tag, text, on_next, on_error):
# type: (yattag.doc.Doc, yattag.doc.Doc.tag, yattag.doc.Doc.text, typing.Text, typing.Text) -> yattag.doc.Doc
if self.__on_error:
self.__on_error._xml(doc, tag, text, on_next, on_error)
return self.__on_error.identifier() if self.__on_error else (
on_error if on_error else on_next
)
@abc.abstractmethod
def _xml(self, doc, tag, text, on_next, on_error):
# type: (yattag.doc.Doc, yattag.doc.Doc.tag, yattag.doc.Doc.text, typing.Text, typing.Text) -> yattag.doc.Doc
raise NotImplementedError()
def __iter__(self):
# type: () -> typing.Generator[_AbstractWorkflowEntity, None, None]
yield self
if self.__on_error:
for action in self.__on_error:
yield action
def __bool__(self): # type: () -> bool
return bool(list(self))
def __nonzero__(self): # type: () -> bool
return self.__bool__()
def __repr__(self): # type: () -> str
return str('<{_class}({identifier})>'.format(_class=type(self).__name__, identifier=self.identifier()))
[docs]class Kill(_AbstractWorkflowEntity):
"""Workflow graph terminal node(s) to end upon to indicate failure."""
def __init__(self, message, name=None):
# type: (typing.Text, typing.Optional[typing.Text]) -> None
super(Kill, self).__init__(xml_tag='kill', name=name)
self.message = message
def _xml(self, doc, tag, text, on_next, on_error):
# type: (yattag.doc.Doc, yattag.doc.Doc.tag, yattag.doc.Doc.text, typing.Text, typing.Text) -> yattag.doc.Doc
with tag(self.xml_tag(), name=self.identifier()):
with tag('message'):
doc.text(self.message)
return doc
ConcreteAction = typing.Union[Shell, SubWorkflow, Email]
[docs]class Action(_AbstractWorkflowEntity):
"""Workflow action nodes carrying concrete actions that perform an action."""
def __init__(
self,
action, # type: ConcreteAction
name=None, # type: typing.Optional[typing.Text]
credential=None, # type: typing.Optional[typing.Text]
retry_max=None, # type: typing.Optional[int]
retry_interval=None, # type: typing.Optional[int]
on_error=None # type: typing.Optional[_AbstractWorkflowEntity]
):
# type: (...) -> None
super(Action, self).__init__(xml_tag='action', name=name, on_error=on_error)
# XML-document-related values
self.__action = action
self.__credential = credential
self.__retry_max = retry_max
self.__retry_interval = retry_interval
[docs] def credential(self):
# type: () -> typing.Optional[typing.Text]
return self.__credential
def _xml(self, doc, tag, text, on_next, on_error):
# type: (yattag.doc.Doc, yattag.doc.Doc.tag, yattag.doc.Doc.text, typing.Text, typing.Text) -> yattag.doc.Doc
_on_error = self._xml_and_get_on_error(doc, tag, text, on_next, on_error)
attributes = {
'name': self.identifier(),
}
if self.__credential:
attributes['cred'] = self.__credential
if self.__retry_max is not None:
attributes['retry-max'] = str(self.__retry_max)
if self.__retry_interval is not None:
attributes['retry-interval'] = str(self.__retry_interval)
with tag(self.xml_tag(), **attributes):
self.__action._xml(doc, tag, text)
doc.stag('ok', to=on_next)
doc.stag('error', to=_on_error)
return doc
[docs]class Decision(_AbstractWorkflowEntity):
"""Node specifying a switch/case."""
def __init__(
self,
default, # type: _AbstractWorkflowEntity
choices, # type: typing.Dict[typing.Text, _AbstractWorkflowEntity]
name=None, # type: typing.Optional[typing.Text]
on_error=None, # type: typing.Optional[_AbstractWorkflowEntity]
):
# type: (...) -> None
super(Decision, self).__init__(xml_tag='decision', name=name, on_error=on_error)
assert default, 'A default must be supplied'
assert choices, 'At least one choice required'
self.__default = copy.deepcopy(default)
self.__choices = copy.deepcopy(choices)
def _xml(self, doc, tag, text, on_next, on_error):
# type: (yattag.doc.Doc, yattag.doc.Doc.tag, yattag.doc.Doc.text, typing.Text, typing.Text) -> yattag.doc.Doc
_on_error = self._xml_and_get_on_error(doc, tag, text, on_next, on_error)
# Write switch/case
with tag(self.xml_tag(), name=self.identifier()):
with tag('switch'):
for case, dest in self.__choices.items():
with tag('case', to=dest.identifier()):
doc.text(case)
doc.stag('default', to=self.__default.identifier())
# Write contained actions
self.__default._xml(doc, tag, text, on_next, _on_error)
for choice in self.__choices.values():
choice._xml(doc, tag, text, on_next, _on_error)
return doc
def __iter__(self):
# type: () -> typing.Generator[_AbstractWorkflowEntity, None, None]
for action in self.__choices.values():
yield action
yield self.__default
for action in super(Decision, self).__iter__():
yield action
[docs]class Serial(_AbstractWorkflowEntity):
"""Sequence of entities to execute (implemented by chaining entities and 'OK' transitions)"""
def __init__(self, *entities, **kwargs):
# type: (*_AbstractWorkflowEntity, **_AbstractWorkflowEntity) -> None
super(Serial, self).__init__(on_error=kwargs.get(str('on_error')))
self.__entities = tuple(copy.deepcopy(entities)) # type: typing.Tuple[_AbstractWorkflowEntity, ...]
[docs] def identifier(self): # type: () -> typing.Text
return self.__entities[0].identifier() if self.__entities else None
def _xml(self, doc, tag, text, on_next, on_error):
# type: (yattag.doc.Doc, yattag.doc.Doc.tag, yattag.doc.Doc.text, typing.Text, typing.Text) -> yattag.doc.Doc
_on_error = self._xml_and_get_on_error(doc, tag, text, on_next, on_error)
entity_nextidentifier = zip(self.__entities, itertools.chain(
(a.identifier() for a in self.__entities[1:]),
(on_next,)
))
for entity, next_identifier in entity_nextidentifier:
entity._xml(doc, tag, text, on_next=next_identifier, on_error=_on_error)
return doc
def __iter__(self):
# type: () -> typing.Generator[_AbstractWorkflowEntity, None, None]
for entity in self.__entities:
yield entity
for entity in super(Serial, self).__iter__():
if entity is not self: # Don't return self because this collection doesn't result in a node
yield entity
[docs]class Parallel(_AbstractWorkflowEntity):
"""Set of entities to execute in parallel (implemented as fork/join tag pair)"""
def __init__(self, *entities, **kwargs):
# type: (*_AbstractWorkflowEntity, **typing.Union[_AbstractWorkflowEntity, typing.Text]) -> None
name = kwargs.get(str('name'))
name = name if isinstance(name, six.string_types) else None
on_error = kwargs.get(str('on_error'))
on_error = on_error if isinstance(on_error, _AbstractWorkflowEntity) else None
super(Parallel, self).__init__(
xml_tag='fork',
name=name,
on_error=on_error
)
assert entities, 'At least 1 entity required'
self.__entities = frozenset(copy.deepcopy(entities)) # type: typing.FrozenSet[_AbstractWorkflowEntity]
def _xml(self, doc, tag, text, on_next, on_error):
# type: (yattag.doc.Doc, yattag.doc.Doc.tag, yattag.doc.Doc.text, typing.Text, typing.Text) -> yattag.doc.Doc
_on_error = self._xml_and_get_on_error(doc, tag, text, on_next, on_error)
with tag(self.xml_tag(), name=self.identifier()):
for entity in self.__entities:
doc.stag('path', start=entity.identifier())
for entity in self.__entities:
entity._xml(doc, tag, text, on_next=self.create_identifier('join'), on_error=_on_error)
doc.stag('join', name=self.create_identifier('join'), to=on_next)
return doc
def __iter__(self):
# type: () -> typing.Generator[_AbstractWorkflowEntity, None, None]
for entity in self.__entities:
yield entity
for entity in super(Parallel, self).__iter__():
yield entity
[docs]class WorkflowApp(XMLSerializable):
def __init__(
self,
name, # type: typing.Text
parameters=None, # type: typing.Optional[PropertyValuesType]
configuration=None, # type: typing.Optional[PropertyValuesType]
credentials=None, # type: typing.Optional[typing.Iterable[Credential]]
job_tracker=None, # type: typing.Optional[typing.Text]
name_node=None, # type: typing.Optional[typing.Text]
job_xml_files=None, # type: typing.Optional[JobXmlFilesType]
entities=None, # type: typing.Optional[_AbstractWorkflowEntity]
):
# type: (...) -> None
XMLSerializable.__init__(self, 'workflow-app')
# XML-document-related values
self.__name = validate_xml_name(name)
self.__parameters = Parameters(parameters)
self.__global_configuration = GlobalConfiguration(
job_tracker=job_tracker,
name_node=name_node,
job_xml_files=job_xml_files,
configuration=configuration
)
self.__credentials = copy.deepcopy(credentials) or []
self.__entities = copy.deepcopy(entities) or Serial()
self.__validate()
def __validate(self): # type () -> None
entity_identifiers = []
credentials_needed = set()
def _parse_entity(entity):
# Each entity's identifier should be unique
entity_identifiers.append(entity.identifier())
# If the entity refers to a credential by name, it should be defined upon instantiation
if hasattr(entity, 'credential'):
credential = entity.credential()
if credential:
credentials_needed.add(credential)
# Parse entitys for attributes
if self.__entities:
for entity in set(self.__entities):
_parse_entity(entity)
# Verify that all needed credentials are defined
credentials_provided = frozenset([cred.name for cred in self.__credentials])
assert credentials_needed <= credentials_provided, (
'Missing credentials: %s' % ', '.join(credentials_needed - credentials_provided)
)
# Verify that no duplicate identifiers are used
duplicate_identifiers = tuple(
item for item, count in collections.Counter(entity_identifiers).items() if count > 1)
assert not duplicate_identifiers, 'Name(s) reused: %s' % ', '.join(sorted(duplicate_identifiers))
def _xml(self, doc, tag, text):
# type: (yattag.doc.Doc, yattag.doc.Doc.tag, yattag.doc.Doc.text) -> yattag.doc.Doc
with tag(self.xml_tag, name=self.__name, xmlns="uri:oozie:workflow:0.5"):
# Preamble
if self.__parameters:
self.__parameters._xml(doc, tag, text)
if self.__global_configuration:
self.__global_configuration._xml(doc, tag, text)
if self.__credentials:
with tag('credentials'):
for credential in self.__credentials:
credential._xml(doc, tag, text)
# Create a serial collection of workflow entities to hold entities
doc.stag('start', to=self.__entities.identifier() if self.__entities else 'end')
self.__entities._xml(doc, tag, text, on_next='end', on_error=None)
doc.stag('end', name='end')
return doc