# Copyright (c) 2017 "Shopify inc." All rights reserved.
# Use of this source code is governed by a MIT-style license that can be found in the LICENSE file.
from __future__ import unicode_literals
import collections
import datetime
import re
import sys
import enum
import typing # pylint: disable=unused-import
import untangle
from pyoozie import exceptions
_COORD_ID_RE = re.compile('^(?P<id>.*-C)(?:@(?P<action>[1-9][0-9]*))?$')
_WORKFLOW_ID_RE = re.compile('^(?P<id>.*-W)(?:@(?P<action>.*))?$')
[docs]def parse_coordinator_id(string):
parts = _COORD_ID_RE.match(string) if string else None
coord_id = parts.group('id') if parts else None
action = parts.group('action') if parts else None
action = int(action) if action else None
return coord_id, action
[docs]def parse_workflow_id(string):
parts = _WORKFLOW_ID_RE.match(string) if string else None
coord_id = parts.group('id') if parts else None
action = parts.group('action') if parts else None
return coord_id, action
def _parse_coordinator_id(_, job_id):
if job_id:
coord_id, action = parse_coordinator_id(job_id)
if coord_id and not action:
return job_id
raise exceptions.OozieException.parse_error("Invalid coordinator id: {}".format(job_id))
return None
def _parse_coordinator_action_id(_, job_id):
if job_id:
coord_id, action = parse_coordinator_id(job_id)
if coord_id and action:
return job_id
raise exceptions.OozieException.parse_error("Invalid coordinator action id: {}".format(job_id))
return None
def _parse_workflow_id(_, job_id):
if job_id:
wf_id, action = parse_workflow_id(job_id)
if wf_id and not action:
return job_id
raise exceptions.OozieException.parse_error("Invalid workflow id: {}".format(job_id))
return None
def _parse_workflow_action_id(_, job_id):
if job_id:
wf_id, action = parse_workflow_id(job_id)
if wf_id and action:
return job_id
raise exceptions.OozieException.parse_error("Invalid workflow action id: {}".format(job_id))
return None
def _parse_workflow_parent_id(_, job_id):
if job_id:
wf_id, action = parse_coordinator_id(job_id)
if wf_id and action:
return job_id
wf_id, action = parse_workflow_id(job_id)
if wf_id and not action:
return job_id
raise exceptions.OozieException.parse_error("Invalid workflow parent id: {}".format(job_id))
return None
def _parse_time(_, time_string):
if time_string:
try:
return datetime.datetime.strptime(time_string, '%a, %d %b %Y %H:%M:%S %Z')
except ValueError as err:
raise exceptions.OozieException.parse_error("Error parsing time '{}'".format(time_string), err)
return None
def _parse_configuration(_, conf_string):
if conf_string is None:
return None
elif conf_string:
xml = conf_string if sys.version_info >= (3, 0) else conf_string.encode('utf-8')
conf = untangle.parse(xml).configuration
return {prop.name.cdata: prop.value.cdata for prop in conf.property}
else:
return {}
def _parse_workflow_actions(artifact, actions_list):
actions_list = actions_list or []
actions = [WorkflowAction(artifact._client, action, parent=artifact) for action in actions_list]
return {action.name: action for action in actions}
def _parse_coordinator_actions(artifact, actions_list):
actions_list = actions_list or []
actions = [CoordinatorAction(artifact._client, action, parent=artifact) for action in actions_list]
return {action.actionNumber: action for action in actions}
def _parse_coordinator_status(_, status_string):
return CoordinatorStatus.parse(status_string)
def _parse_coordinator_action_status(_, status_string):
return CoordinatorActionStatus.parse(status_string)
def _parse_workflow_status(_, status_string):
return WorkflowStatus.parse(status_string)
def _parse_workflow_action_status(_, status_string):
return WorkflowActionStatus.parse(status_string)
[docs]class ArtifactType(enum.Enum):
Coordinator = 1
CoordinatorAction = 2
Workflow = 3
WorkflowAction = 4
_StatusValue = collections.namedtuple('_StatusValue',
['status_id', 'is_active', 'is_running', 'is_suspendable', 'is_suspended'])
def _status(status_id, is_active=False, is_running=False, is_suspendable=False, is_suspended=False):
if is_running and not is_active:
raise exceptions.OozieException.parse_error("A running status implies active")
return _StatusValue(status_id, is_active, is_running, is_suspendable, is_suspended)
class _StatusEnum(enum.Enum):
def __str__(self):
return self.name
@classmethod
def parse(cls, status_string):
values = cls.__members__
return values.get(status_string, values['UNKNOWN'])
@classmethod
def active(cls):
return [status for status in cls if status.is_active()]
@classmethod
def running(cls):
return [status for status in cls if status.is_running()]
@classmethod
def suspendable(cls):
return [status for status in cls if status.is_suspendable()]
@classmethod
def suspended(cls):
return [status for status in cls if status.is_suspended()]
def is_unknown(self):
return self._value_.status_id == 0
def is_active(self):
return self._value_.is_active
def is_running(self):
return self._value_.is_running
def is_suspendable(self):
return self._value_.is_suspendable
def is_suspended(self):
return self._value_.is_suspended
[docs]class CoordinatorStatus(_StatusEnum):
UNKNOWN = _status(0)
DONEWITHERROR = _status(1)
FAILED = _status(2)
IGNORED = _status(3)
KILLED = _status(4)
PAUSED = _status(5, is_active=True)
PAUSEDWITHERROR = _status(6, is_active=True)
PREMATER = _status(7, is_active=True)
PREP = _status(8, is_active=True, is_suspendable=True)
PREPPAUSED = _status(9, is_active=True)
PREPSUSPENDED = _status(10, is_active=True, is_suspended=True)
RUNNING = _status(11, is_active=True, is_running=True, is_suspendable=True)
RUNNINGWITHERROR = _status(12, is_active=True, is_running=True, is_suspendable=True)
SUCCEEDED = _status(13)
SUSPENDED = _status(14, is_active=True, is_running=True, is_suspended=True)
SUSPENDEDWITHERROR = _status(15, is_active=True, is_running=True, is_suspended=True)
[docs]class CoordinatorActionStatus(_StatusEnum):
UNKNOWN = _status(0)
FAILED = _status(1)
IGNORED = _status(2)
KILLED = _status(3)
READY = _status(4, is_active=True)
RUNNING = _status(5, is_active=True, is_running=True, is_suspendable=True)
SKIPPED = _status(6)
SUBMITTED = _status(7, is_active=True)
SUCCEEDED = _status(8)
SUSPENDED = _status(9, is_active=True, is_running=True, is_suspended=True)
TIMEDOUT = _status(10)
WAITING = _status(11, is_active=True)
[docs]class WorkflowStatus(_StatusEnum):
UNKNOWN = _status(0)
FAILED = _status(1)
KILLED = _status(2)
PREP = _status(3, is_active=True)
RUNNING = _status(4, is_active=True, is_running=True, is_suspendable=True)
SUCCEEDED = _status(5)
SUSPENDED = _status(6, is_active=True, is_running=True, is_suspended=True)
[docs]class WorkflowActionStatus(_StatusEnum):
UNKNOWN = _status(0)
DONE = _status(1)
END_MANUAL = _status(2)
END_RETRY = _status(3)
ERROR = _status(4)
FAILED = _status(5)
KILLED = _status(6)
OK = _status(7)
PREP = _status(8, is_active=True)
RUNNING = _status(9, is_active=True, is_running=True)
START_MANUAL = _status(10)
START_RETRY = _status(11)
USER_RETRY = _status(12, is_active=True)
class _OozieArtifact(object):
REQUIRED_KEYS = {} # type: typing.Dict[unicode, typing.Callable]
SUPPORTED_KEYS = {'toString': None} # type: typing.Dict[unicode, typing.Callable]
def __init__(self, oozie_client, details, parent=None):
self._client = oozie_client
self._parent = parent
details = dict(details)
for key, func in self.REQUIRED_KEYS.items():
value = details.pop(key, None)
try:
parsed_value = func(self, value) if func else value
except exceptions.OozieException as err:
raise exceptions.OozieException.required_key_missing(key, self, err)
if parsed_value is None:
raise exceptions.OozieException.required_key_missing(key, self)
else:
setattr(self, key, parsed_value)
for key, func in self.SUPPORTED_KEYS.items():
value = details.pop(key, None)
value = func(self, value) if func else value
setattr(self, key, value)
self._details = details
self._validate_degenerate_fields()
def __str__(self):
return self.toString
def fill_in_details(self):
# Fetch any missing data not supplied
return self
def _validate_degenerate_fields(self):
# For any fields that must be in sync, ensure they are.
# If values are missing, extrapolate them
pass
def is_coordinator(self):
return False
def is_workflow(self):
return False
def is_action(self):
return False
class Coordinator(_OozieArtifact):
REQUIRED_KEYS = {
'coordJobId': _parse_coordinator_id,
}
SUPPORTED_KEYS = {
'acl': None,
'actions': _parse_coordinator_actions,
'bundleId': None,
'concurrency': None,
'conf': _parse_configuration,
'consoleUrl': None,
'coordExternalId': None,
'coordJobName': None,
'coordJobPath': None,
'endTime': _parse_time,
'executionPolicy': None,
'frequency': None,
'group': None,
'lastAction': _parse_time,
'mat_throttling': None,
'nextMaterializedTime': _parse_time,
'pauseTime': None,
'startTime': _parse_time,
'status': _parse_coordinator_status,
'timeOut': None,
'timeUnit': None,
'timeZone': None,
'toString': None,
'total': None,
'user': None,
}
def __init__(self, *args, **kwargs):
super(Coordinator, self).__init__(*args, **kwargs)
self._workflow = None
def fill_in_details(self):
# Undefined `conf` is probably bad, empty is ok
if self.conf is None:
coord = self._client.job_last_coordinator_info(coordinator_id=self.coordJobId)
return coord
else:
return self
def _validate_degenerate_fields(self):
# For any fields that must be in sync, ensure they are.
# If values are missing, extrapolate them
self.toString = self.toString or 'Coordinator application id[{}] status[{}]'.format(
self.coordJobId,
self.status)
if self.coordJobId not in self.toString:
raise exceptions.OozieException.parse_error("toString does not contain coordinator ID")
if not self.status.is_unknown() and str(self.status) not in self.toString:
raise exceptions.OozieException.parse_error("toString does not contain status")
def is_coordinator(self):
return True
def coordinator(self):
return self
def parent(self):
return None
def action(self, number):
if number in self.actions:
action = self.actions[number]
else:
action = self._client.job_coordinator_action(action_number=number, coordinator=self)
return action
class CoordinatorAction(_OozieArtifact):
REQUIRED_KEYS = {
'id': _parse_coordinator_action_id,
}
SUPPORTED_KEYS = {
'actionNumber': None,
'consoleUrl': None,
'coordJobId': _parse_coordinator_id,
'createdConf': None,
'createdTime': _parse_time,
'errorCode': None,
'errorMessage': None,
'externalId': _parse_workflow_id,
'externalStatus': None,
'lastModifiedTime': _parse_time,
'missingDependencies': None,
'nominalTime': _parse_time,
'pushMissingDependencies': None,
'runConf': None,
'status': _parse_coordinator_action_status,
'toString': None,
'trackerUri': None,
'type': None,
}
def __init__(self, *args, **kwargs):
self.status = CoordinatorActionStatus.UNKNOWN
super(CoordinatorAction, self).__init__(*args, **kwargs)
self._workflow = None
def _validate_degenerate_fields(self):
# For any fields that must be in sync, ensure they are.
# If values are missing, extrapolate them
coord_id, action = parse_coordinator_id(self.id)
self.coordJobId = self.coordJobId or coord_id
if self.coordJobId != coord_id:
raise exceptions.OozieException.parse_error("coordJobId does not match coordinator action ID")
self.actionNumber = self.actionNumber or action
if self.actionNumber != action:
raise exceptions.OozieException.parse_error("actionNumber does not match coordinator action ID")
self.toString = self.toString or 'CoordinatorAction name[{}] status[{}]'.format(self.id, self.status)
if self.id not in self.toString:
raise exceptions.OozieException.parse_error("toString does not contain coordinator action ID")
if not self.status.is_unknown() and str(self.status) not in self.toString:
raise exceptions.OozieException.parse_error("toString does not contain status")
def is_coordinator(self):
return True
def is_action(self):
return True
def workflow(self):
# TODO: revisit this to support multiple runs
# Use .../job/...-C@xx?show=allruns to query
if not self._workflow and self.externalId:
workflow = self._client.job_workflow_info(workflow_id=self.externalId)
if workflow:
workflow._parent = self
self._workflow = workflow
return self._workflow
def coordinator(self):
if not self._parent:
self._parent = self._client.job_coordinator_info(coordinator_id=self.coordJobId)
return self._parent
def coordinator_action(self):
return self
def parent(self):
return self.coordinator()
class Workflow(_OozieArtifact):
REQUIRED_KEYS = {
'id': _parse_workflow_id,
}
SUPPORTED_KEYS = {
'acl': None,
'actions': _parse_workflow_actions,
'appName': None,
'appPath': None,
'conf': _parse_configuration,
'consoleUrl': None,
'createdTime': _parse_time,
'endTime': _parse_time,
'externalId': None,
'group': None,
'lastModTime': _parse_time,
'parentId': _parse_workflow_parent_id,
'run': None,
'startTime': _parse_time,
'status': _parse_workflow_status,
'toString': None,
'user': None,
}
def __init__(self, *args, **kwargs):
super(Workflow, self).__init__(*args, **kwargs)
self._workflow = None
def fill_in_details(self):
# Undefined `conf` is probably bad, empty is ok
if self.conf is None:
workflow = self._client.job_workflow_info(workflow_id=self.id)
return workflow
else:
return self
def _validate_degenerate_fields(self):
# For any fields that must be in sync, ensure they are.
# If values are missing, extrapolate them
self.toString = self.toString or 'Workflow id[{}] status[{}]'.format(self.id, self.status)
if self.id not in self.toString:
raise exceptions.OozieException.parse_error("toString does not contain workflow ID")
if not self.status.is_unknown() and str(self.status) not in self.toString:
raise exceptions.OozieException.parse_error("toString does not contain status")
def is_workflow(self):
return True
def coordinator(self):
parent = self.parent()
if parent:
return parent.coordinator()
def coordinator_action(self):
parent = self.parent()
if parent:
return parent.coordinator_action()
def parent(self):
if not self._parent and self.parentId:
self._parent = self._client.job_action_info(self.parentId)
return self._parent
def action(self, name):
return self.actions.get(name, None)
class WorkflowAction(_OozieArtifact):
REQUIRED_KEYS = {
'id': _parse_workflow_action_id,
}
SUPPORTED_KEYS = {
'conf': None,
'consoleUrl': None,
'cred': None,
'data': None,
'endTime': _parse_time,
'errorCode': None,
'errorMessage': None,
'externalChildIDs': None,
'externalId': None,
'externalStatus': None,
'name': None,
'retries': None,
'startTime': _parse_time,
'stats': None,
'status': _parse_workflow_action_status,
'toString': None,
'trackerUri': None,
'transition': None,
'type': None,
'userRetryCount': None,
'userRetryInterval': None,
'userRetryMax': None,
}
def __init__(self, *args, **kwargs):
super(WorkflowAction, self).__init__(*args, **kwargs)
self._subworkflow = None
def _validate_degenerate_fields(self):
# For any fields that must be in sync, ensure they are.
# If values are missing, extrapolate them
_, action = parse_workflow_id(self.id)
self.name = self.name or action
if self.name != action:
raise exceptions.OozieException.parse_error("name does not match workflow action ID")
self.toString = self.toString or 'Action name[{}] status[{}]'.format(self.name, self.status)
if self.name not in self.toString:
raise exceptions.OozieException.parse_error("toString does not contain workflow action name")
if not self.status.is_unknown() and str(self.status) not in self.toString:
raise exceptions.OozieException.parse_error("toString does not contain status")
def is_workflow(self):
return True
def is_action(self):
return True
def subworkflow(self):
if not self._subworkflow and self.type == 'sub-workflow' and self.externalId:
workflow = self._client.job_workflow_info(self.externalId)
if workflow:
workflow._parent = self
self._subworkflow = workflow
return self._subworkflow
def coordinator(self):
parent = self.parent()
if parent:
return parent.coordinator()
def coordinator_action(self):
parent = self.parent()
if parent:
return parent.coordinator_action()
def parent(self):
if not self._parent and self.externalId:
self._parent = self._client.job_workflow_info(self.externalId)
return self._parent