Source code for pyoozie.client

# Copyright (c) 2017 "Shopify inc." All rights reserved.
# Use of this source code is governed by a MIT-style license that can be found in the LICENSE file.
from __future__ import unicode_literals

import logging
import requests

from pyoozie import xml
from pyoozie import exceptions
from pyoozie import model


[docs]class OozieClient(object): JOB_TYPE_STRINGS = { model.ArtifactType.Coordinator: ('coordinator', 'coordinatorjobs'), model.ArtifactType.Workflow: ('wf', 'workflows'), } JOB_TYPES = { model.ArtifactType.Coordinator: model.Coordinator, model.ArtifactType.CoordinatorAction: model.CoordinatorAction, model.ArtifactType.Workflow: model.Workflow, model.ArtifactType.WorkflowAction: model.WorkflowAction, } STATUS_TYPES = { model.ArtifactType.Coordinator: model.CoordinatorStatus, model.ArtifactType.CoordinatorAction: model.CoordinatorActionStatus, model.ArtifactType.Workflow: model.WorkflowStatus, model.ArtifactType.WorkflowAction: model.WorkflowActionStatus, }
[docs] class Stats(object): def __init__(self): self.reset()
[docs] def reset(self): self._requests = 0 self._errors = 0 self._bytes_received = 0 self._elapsed = 0
[docs] def update(self, response): self._requests += 1 if response is not None: if not response: self._errors += 1 self._bytes_received += len(response.text) self._elapsed += response.elapsed.microseconds else: self._errors += 1
@property def requests(self): return self._requests @property def errors(self): return self._errors @property def bytes_received(self): return self._bytes_received @property def elapsed(self): return self._elapsed
def __init__(self, url=None, user=None, timeout=None, verbose=True, **_): self.logger = logging.getLogger('pyoozie.OozieClient') oozie_url = (url or 'http://localhost').rstrip('/') if not oozie_url.endswith('/oozie'): oozie_url += '/oozie' self._url = oozie_url self._user = user self._timeout = timeout or 30 self._verbose = verbose # Note: change default for verbose! self._stats = OozieClient.Stats() self._test_connection() def _test_connection(self): response = None try: response = requests.get('{}/versions'.format(self._url), timeout=self._timeout) response.raise_for_status() self._stats.update(response) except requests.RequestException as err: self._stats.update(response) if self._verbose and response is not None: self.logger.error(response.headers) message = "Unable to contact Oozie server at {}".format(self._url) raise exceptions.OozieException.communication_error(message, err) try: versions = response.json() except ValueError as err: message = "Invalid response from Oozie server at {} ".format(self._url) raise exceptions.OozieException.communication_error(message, err) if 2 not in versions: message = "Oozie server at {} does not support API version 2 (supported: {})".format(self._url, versions) raise exceptions.OozieException.communication_error(message) def _headers(self, content_type=None): headers = {} if content_type: headers['Content-Type'] = content_type return headers def _request(self, method, endpoint, content_type, content=None): response = None url = '{}/v2/{}'.format(self._url, endpoint) if self._verbose: if content: self.logger.info("Request: %s %s content bytes: %s", method, url, len(content)) else: self.logger.info("Request: %s %s", method, url) try: response = requests.request(method, url, data=content, timeout=self._timeout, headers=self._headers(content_type)) response.raise_for_status() except requests.RequestException as err: self._stats.update(response) if self._verbose and response is not None: self.logger.error("Reply: status=%s reason=%s elapsed=%sms", response.status_code, response.reason, response.elapsed.microseconds / 1000.0) raise exceptions.OozieException.communication_error(caused_by=err) self._stats.update(response) if self._verbose: self.logger.info("Reply: status=%s bytes=%s elapsed=%sms", response.status_code, len(response.text), response.elapsed.microseconds / 1000.0) try: return response.json() if len(response.content) else None except ValueError as err: message = "Invalid response from Oozie server at {} ".format(self._url) raise exceptions.OozieException.communication_error(message, caused_by=err) def _get(self, endpoint, content_type=None): return self._request('GET', endpoint, content_type) def _put(self, endpoint, content=None, content_type='application/xml'): return self._request('PUT', endpoint, content_type, content) def _post(self, endpoint, content, content_type='application/xml'): return self._request('POST', endpoint, content_type, content)
[docs] def report_stats(self, to_logger=None): if not to_logger: to_logger = self.logger to_logger.info( "OozieClient Stats: requests=%s errors=%s bytes=%s elapsed=%sms", self._stats.requests, self._stats.errors, self._stats.bytes_received, self._stats.elapsed / 1000)
[docs] def reset_stats(self): self._stats.reset()
# =========================================================================== # Admin API # =========================================================================== def _admin_query(self, endpoint): return self._get('admin/' + endpoint)
[docs] def admin_status(self): return self._admin_query('status')
[docs] def admin_os_env(self): return self._admin_query('os-env')
[docs] def admin_java_properties(self): return self._admin_query('java-sys-properties')
[docs] def admin_configuration(self): return self._admin_query('configuration')
[docs] def admin_instrumentation(self): return self._admin_query('instrumentation')
[docs] def admin_metrics(self): return self._admin_query('metrics')
[docs] def admin_build_version(self): return self._admin_query('build-version')
[docs] def admin_available_timezones(self): return self._admin_query('available-timezones')
[docs] def admin_queue_dump(self): return self._admin_query('queue-dump')
[docs] def admin_available_oozie_servers(self): return self._admin_query('available-oozie-servers')
[docs] def admin_list_sharelib(self): return [lib['name'] for lib in self._admin_query('list_sharelib')['sharelib']]
[docs] def admin_list_all_sharelib(self): all_libs = dict() for lib in self.admin_list_sharelib(): files = self._admin_query('list_sharelib?lib={}'.format(lib))['sharelib'][0]['files'] all_libs[lib] = files return all_libs
# =========================================================================== # Jobs API - query coordinators and workflows # =========================================================================== def _filter_string(self, type_enum, user=None, name=None, status=None): status_type = self.STATUS_TYPES[type_enum] filters = [] if user: filters.append('user={}'.format(user)) if name: filters.append('name={}'.format(name)) if status: if isinstance(status, status_type): filters.append('status={}'.format(status)) else: filters.extend(sorted(['status={}'.format(s) for s in status])) filters = '&filter=' + ';'.join(filters) if filters else '' return filters def _jobs_query(self, type_enum, user=None, name=None, status=None, limit=0, details=True): job_type, result_type = self.JOB_TYPE_STRINGS[type_enum] filters = self._filter_string(type_enum, user=user, name=name, status=status) offset = 1 chunk = limit if limit else 500 jobs = [] while True: result = self._get('jobs?jobtype={}{}&offset={}&len={}'.format(job_type, filters, offset, chunk)) jobs.extend(result[result_type]) offset += chunk if (offset > result['total']) or (limit and offset > limit): break if details: return [self.JOB_TYPES[type_enum](self, job).fill_in_details() for job in jobs] else: return [self.JOB_TYPES[type_enum](self, job) for job in jobs]
[docs] def jobs_all_workflows(self, name=None, user=None, limit=0): return self._jobs_query(model.ArtifactType.Workflow, name=name, user=user, limit=limit)
[docs] def jobs_all_active_workflows(self, user=None): return self._jobs_query(model.ArtifactType.Workflow, status=model.WorkflowStatus.active(), user=user)
[docs] def jobs_all_running_workflows(self, user=None): return self._jobs_query(model.ArtifactType.Workflow, status=model.WorkflowStatus.running(), user=user)
[docs] def jobs_running_workflows(self, name, user=None): return self._jobs_query( model.ArtifactType.Workflow, name=name, status=model.WorkflowStatus.running(), user=user)
[docs] def jobs_last_workflow(self, name, user=None): jobs = self._jobs_query(model.ArtifactType.Workflow, name=name, user=user, limit=1) if jobs: return jobs[-1] else: raise exceptions.OozieException.workflow_not_found(name)
[docs] def jobs_workflow_names(self, user=None): jobs = self._jobs_query(model.ArtifactType.Workflow, user=user, details=False) return set([job.appName for job in jobs])
[docs] def jobs_all_coordinators(self, name=None, user=None, limit=0): return self._jobs_query(model.ArtifactType.Coordinator, name=name, user=user, limit=limit)
[docs] def jobs_all_active_coordinators(self, user=None): return self._jobs_query(model.ArtifactType.Coordinator, status=model.CoordinatorStatus.active(), user=user)
[docs] def jobs_all_running_coordinators(self, user=None): return self._jobs_query(model.ArtifactType.Coordinator, status=model.CoordinatorStatus.running(), user=user)
[docs] def jobs_all_suspended_coordinators(self, user=None): return self._jobs_query(model.ArtifactType.Coordinator, status=model.CoordinatorStatus.suspended(), user=user)
[docs] def jobs_running_coordinators(self, name, user=None): return self._jobs_query( model.ArtifactType.Coordinator, name=name, status=model.CoordinatorStatus.running(), user=user)
[docs] def jobs_last_coordinator(self, name, user=None): jobs = self._jobs_query(model.ArtifactType.Coordinator, name=name, user=user, limit=1) if jobs: return jobs[-1] else: raise exceptions.OozieException.coordinator_not_found(name)
[docs] def jobs_coordinator_names(self, user=None): coords = self._jobs_query(model.ArtifactType.Coordinator, user=user, details=False) return set([coord.coordJobName for coord in coords])
# =========================================================================== # Job API - query coordinator details and actions # =========================================================================== def _coordinator_query(self, job_id, status=None, start=0, limit=0): coord_id, action = model.parse_coordinator_id(job_id) if not coord_id: raise ValueError("Unrecognized job ID: '{}'".format(job_id)) else: if action: if start or limit: raise ValueError("Cannot supply both coordinator action ID and start / limit") if status: raise ValueError("Cannot supply both coordinator action ID and status") start = int(action) limit = 1 def wrapped_get(uri): try: return self._get(uri) except exceptions.OozieException as err: raise exceptions.OozieException.coordinator_not_found(job_id, err) filters = self._filter_string(model.ArtifactType.CoordinatorAction, status=status) if start == 0 and limit: # Fetch the most recent `limit` actions length = limit result = wrapped_get('job/{}?order=desc&offset=1&len={}{}'.format(coord_id, length, filters)) elif limit: # Fetch the specified range of actions offset = start length = limit result = wrapped_get('job/{}?offset={}&len={}{}'.format(coord_id, offset, length, filters)) else: # Fetch all actions from `start` onward # Ask for 1 first to get the total offset = start or 1 result = wrapped_get('job/{}?offset={}&len=1{}'.format(coord_id, offset, filters)) total = result['total'] if total > 0: length = total - offset + 1 if length != 1: # Don't re-ask if we have the answer! result = wrapped_get('job/{}?offset={}&len={}{}'.format(coord_id, offset, length, filters)) coord = self.JOB_TYPES[model.ArtifactType.Coordinator](self, result) if action and coord: # There's no guarantee that the Nth job is action N # Ensure the one requested is loaded coord.action(action) return coord def _coordinator_action_query(self, coordinator_id, action, coordinator=None): try: result = self._get('job/{}@{}'.format(coordinator_id, action)) except exceptions.OozieException as err: raise exceptions.OozieException.coordinator_action_not_found(coordinator_id, action, err) coord_action = self.JOB_TYPES[model.ArtifactType.CoordinatorAction](self, result, parent=coordinator) if coordinator: coordinator.actions[action] = coord_action return coord_action def _decode_coord_id(self, coordinator_id=None, name=None, user=None, coordinator=None): if coordinator: if coordinator_id or name: raise ValueError("Supply either a coordinator object or one of coordinator_id or name") if user: raise ValueError("User parameter not supported with coordinator object") result = coordinator.coordJobId else: if bool(coordinator_id) == bool(name): raise ValueError("Supply exactly one of coordinator_id or name") result = coordinator_id if name: coord = self.jobs_last_coordinator(name=name, user=user) if coord: result = coord.coordJobId else: raise exceptions.OozieException.coordinator_not_found(name) elif user: raise ValueError("User parameter not supported with coordinator_id") return result
[docs] def job_coordinator_info(self, coordinator_id=None, name=None, user=None, limit=0): coord_id = self._decode_coord_id(coordinator_id, name, user) return self._coordinator_query(coord_id, limit=limit)
[docs] def job_last_coordinator_info(self, coordinator_id=None, name=None, user=None): coord_id = self._decode_coord_id(coordinator_id, name, user) return self._coordinator_query(coord_id, limit=1)
[docs] def job_coordinator_action(self, coordinator_id=None, name=None, user=None, action_number=0, coordinator=None): coord_id = self._decode_coord_id(coordinator_id, name, user, coordinator) if coordinator_id: coord_id, action = model.parse_coordinator_id(coordinator_id) if bool(action) == bool(action_number): raise ValueError("Supply exactly one of coordinator_id or action_number") action_number = action or action_number else: if action_number == 0: raise ValueError("No action_number supplied") return self._coordinator_action_query(coord_id, action_number, coordinator=coordinator)
[docs] def job_coordinator_all_active_actions(self, coordinator_id=None, name=None, user=None, coordinator=None): coord_id = self._decode_coord_id(coordinator_id, name, user, coordinator) coord = self._coordinator_query(coord_id, status=model.CoordinatorActionStatus.active()) if coordinator: # Copy over any actions to the existing object coordinator.actions = coordinator.actions or {} for number, action in coord.actions.items(): action._parent = coordinator coordinator.actions[number] = action coord = coordinator return [action for action in coord.actions.values() if action.status.is_active()]
# =========================================================================== # Job API - query workflow details and actions # =========================================================================== def _workflow_query(self, job_id): wf_id, _ = model.parse_workflow_id(job_id) if not wf_id: raise ValueError("Unrecognized job ID: '{}'".format(job_id)) try: result = self._get('job/' + wf_id) workflow = self.JOB_TYPES[model.ArtifactType.Workflow](self, result) return workflow except exceptions.OozieException as err: raise exceptions.OozieException.workflow_not_found(job_id, err) def _decode_wf_id(self, workflow_id=None, name=None, user=None): if bool(workflow_id) == bool(name): raise ValueError("Supply exactly one of workflow_id or name") result = workflow_id if name: workflow = self.jobs_last_workflow(name=name, user=user) if workflow: result = workflow.id else: raise exceptions.OozieException.workflow_not_found(name) elif user: raise ValueError("User parameter not supported with workflow_id") return result
[docs] def job_workflow_info(self, workflow_id=None, name=None, user=None): wf_id = self._decode_wf_id(workflow_id, name, user) return self._workflow_query(wf_id)
# =========================================================================== # Job API - query generic job details and actions # ===========================================================================
[docs] def job_info(self, job_id): coord_id, _ = model.parse_coordinator_id(job_id) if coord_id: return self.job_coordinator_info(coordinator_id=job_id) wf_id, _ = model.parse_workflow_id(job_id) if wf_id: return self.job_workflow_info(workflow_id=job_id) raise exceptions.OozieException.job_not_found(job_id)
[docs] def job_action_info(self, job_id): coord_id, action = model.parse_coordinator_id(job_id) if coord_id: coord = self.job_coordinator_info(coordinator_id=job_id) return coord.action(action) if coord and action else coord wf_id, action = model.parse_workflow_id(job_id) if wf_id: workflow = self.job_workflow_info(workflow_id=job_id) return workflow.action(action) if workflow and action else workflow raise exceptions.OozieException.job_not_found(job_id)
# =========================================================================== # Job API - manage coordinator # =========================================================================== def _coordinator_perform_simple_action(self, coord, action, **kwargs): suffix = '&' + '&'.join('='.join(t) for t in kwargs.items()) if kwargs else '' if coord.is_action(): self._put('job/{}?action={}&type=action&scope={}{}'.format(coord.coordJobId, action, coord.actionNumber, suffix)) else: self._put('job/{}?action={}{}'.format(coord.coordJobId, action, suffix)) def _fetch_coordinator_or_action(self, coordinator_id=None, name=None, user=None): coord_id = self._decode_coord_id(coordinator_id, name, user) coord = self.job_action_info(coord_id) return coord
[docs] def job_coordinator_suspend(self, coordinator_id=None, name=None, user=None): coord = self._fetch_coordinator_or_action(coordinator_id, name, user) if coord.status.is_suspendable(): self._coordinator_perform_simple_action(coord, 'suspend') return True return False
[docs] def job_coordinator_resume(self, coordinator_id=None, name=None, user=None): coord = self._fetch_coordinator_or_action(coordinator_id, name, user) if coord.status.is_suspended(): self._coordinator_perform_simple_action(coord, 'resume') return True return False
[docs] def job_coordinator_kill(self, coordinator_id=None, name=None, user=None): coord = self._fetch_coordinator_or_action(coordinator_id, name, user) if coord.status.is_active(): self._coordinator_perform_simple_action(coord, 'kill') return True return False
[docs] def job_coordinator_rerun(self, coordinator_id): action = self._fetch_coordinator_or_action(coordinator_id) if not action.is_action(): raise ValueError('Rerun only supports coordinator action IDs') if action.coordinator().status.is_active() and not action.status.is_active(): self.logger.info('Rerunning coordinator action %s', coordinator_id) self._coordinator_perform_simple_action(action, 'coord-rerun', refresh='true') return True return False
[docs] def job_coordinator_update(self, coordinator_id, xml_path, configuration=None): user = self._user or 'oozie' coord = self._fetch_coordinator_or_action(coordinator_id) if coord.status.is_active(): conf = xml._coordinator_submission_xml(user, xml_path, configuration=configuration) if self._verbose: self.logger.info('Preparing to update coordinator %s:\n%s', xml_path, conf) reply = self._put('job/{}?action=update'.format(coordinator_id), conf) if not reply or 'update' not in reply: raise exceptions.OozieException.operation_failed('update coordinator') if self._verbose: self.logger.info('Coordinator %s updated with diff %s', coordinator_id, reply['update']['diff']) return self.job_coordinator_info(coordinator_id=coordinator_id) else: raise exceptions.OozieException.operation_failed('coordinator status must be active in order to update')
# =========================================================================== # Job API - manage workflow # ===========================================================================
[docs] def job_workflow_suspend(self, workflow_id=None, name=None, user=None): workflow = self.job_workflow_info(workflow_id, name, user) if workflow.status.is_suspendable(): self._put('job/{}?action=suspend'.format(workflow.id)) return True return False
[docs] def job_workflow_resume(self, workflow_id=None, name=None, user=None): workflow = self.job_workflow_info(workflow_id, name, user) if workflow.status.is_suspended(): self._put('job/{}?action=resume'.format(workflow.id)) return True return False
[docs] def job_workflow_start(self, workflow_id=None, name=None, user=None): workflow = self.job_workflow_info(workflow_id, name, user) if workflow.status == model.WorkflowStatus.PREP: self._put('job/{}?action=start'.format(workflow.id)) return True return False
[docs] def job_workflow_kill(self, workflow_id=None, name=None, user=None): workflow = self.job_workflow_info(workflow_id, name, user) if workflow.status.is_active(): self._put('job/{}?action=kill'.format(workflow.id)) return True return False
# =========================================================================== # Jobs API - submit and update jobs # ===========================================================================
[docs] def jobs_submit_coordinator(self, xml_path, configuration=None): user = self._user or 'oozie' conf = xml._coordinator_submission_xml(user, xml_path, configuration=configuration) if self._verbose: self.logger.info('Preparing to submit coordinator %s:\n%s', xml_path, conf) reply = self._post('jobs', conf) if reply and 'id' in reply: if self._verbose: self.logger.info('New coordinator: %s', reply['id']) coord = self.job_coordinator_info(coordinator_id=reply['id']) return coord raise exceptions.OozieException.operation_failed('submit coordinator')
[docs] def jobs_submit_workflow(self, xml_path, configuration=None, start=False): user = self._user or 'oozie' conf = xml._workflow_submission_xml(user, xml_path, configuration=configuration) if self._verbose: self.logger.info('Preparing to submit workflow %s:\n%s', xml_path, conf) endpoint = 'jobs?action=start' if start else 'jobs' reply = self._post(endpoint, conf) if reply and 'id' in reply: if self._verbose: self.logger.info('New workflow: %s', reply['id']) workflow = self.job_workflow_info(workflow_id=reply['id']) return workflow raise exceptions.OozieException.operation_failed('submit workflow')