API

azkaban.project

Project definition module.

class azkaban.project.Project(name, root=None, register=True, version=None)

Bases: object

Azkaban project.

Parameters:
  • name – Name of the project.
  • register – Add project to registry. Setting this to False will make it invisible to the CLI.
  • root – Path to a root file or directory used to enable adding files using relative paths (typically used with root=__file__).
  • version – Project version, currently only used for setting the name of the archive uploaded to Azkaban.

The properties attribute of a project is a dictionary which can be used to pass Azkaban options which will then be available to all jobs in the project. This can be used for example to set project wide defaults.

To avoid undefined behavior, both the name and root attributes should not be altered after instantiation.

add_file(path, archive_path=None, overwrite=False)

Include a file in the project archive.

Parameters:
  • path – Path to file. If no project root exists, only absolute paths are allowed. Otherwise, this path can also be relative to said root.
  • archive_path – Path to file in archive (defaults to same as path).
  • overwrite – Allow overwriting any previously existing file in this archive path.

If the current project has its root parameter specified, this method will allow relative paths (and join those with the project’s root), otherwise it will throw an error. Furthermore, when a project root exists, adding files above it without specifying an archive_path will raise an error. This is done to avoid having files in the archive with lower level destinations than the base root directory.

add_job(name, job, **kwargs)

Include a job in the project.

Parameters:
  • name – Name assigned to job (must be unique).
  • jobJob instance.
  • kwargs – Keyword arguments that will be forwarded to the on_add() handler.

This method triggers the on_add() method on the added job (passing the project and name as arguments, along with any kwargs). The handler will be called right after the job is added.

build(path, overwrite=False)

Create the project archive.

Parameters:
  • path – Destination path.
  • overwrite – Don’t throw an error if a file already exists at path.
files

Returns a list of tuples of files included in the project archive.

The first element of each tuple is the absolute local path to the file, the second the path of the file in the archive.

Note

This property should not be used to add files. Use add_file() instead.

jobs

Returns a dictionary of all jobs in the project, keyed by name.

Note

This property should not be used to add jobs. Use add_job() instead.

classmethod load(path, new=False)

Load Azkaban projects from script.

Parameters:
  • path – Path to python module.
  • new – If set to True, only projects loaded as a consequence of calling this method will be returned.
  • propagate – Propagate any exception raised while importing the module at path.

Returns a dictionary of Project’s keyed by project name. Only registered projects (i.e. instantiated with register=True) can be discovered via this method.

merge_into(project, overwrite=False, unregister=False)

Merge one project with another.

Parameters:
  • project – Target Project to merge into.
  • overwrite – Overwrite any existing files.
  • unregister – Unregister project after merging it.

The current project remains unchanged while the target project gains all the current project’s jobs and files. Note that project properties are not carried over.

versioned_name

Project name, including version if present.

azkaban.job

Job definition module.

class azkaban.job.Job(*options)

Bases: object

Base Azkaban job.

Parameters:options – tuple of dictionaries. The final job options are built from this tuple by keeping the latest definition of each option. Furthermore, by default, any nested dictionary will be flattened (combining keys with '.'). Both these features can be changed by simply overriding the job constructor.

To enable more functionality, subclass and override the on_add() and build() methods. The join_option() and join_prefix() methods are also provided as helpers to write custom jobs.

build(path=None, header=None)

Write job file.

Parameters:
  • path – Path where job file will be created. Any existing file will be overwritten. Writes to stdout if no path is specified.
  • header – Optional comment to be included at the top of the job file.
join_option(option, sep, formatter='%s')

Helper method to join iterable options into a string.

Parameters:
  • key – Option key. If the option doesn’t exist, this method does nothing.
  • sep – Separator used to concatenate the string.
  • formatter – Pattern used to format the option values.

Example usage:

class MyJob(Job):

  def __init__(self, *options):
    super(MyJob, self).__init__(*options)
    self.join_option('dependencies', ',')

# we can now use lists to define job dependencies
job = MyJob({'type': 'noop', 'dependencies': ['bar', 'foo']})
join_prefix(prefix, sep, formatter)

Helper method to join options starting with a prefix into a string.

Parameters:
  • prefix – Option prefix.
  • sep – Separator used to concatenate the string.
  • formatter – String formatter. It is formatted using the tuple (suffix, value) where suffix is the part of key after prefix.

Example usage:

class MyJob(Job):

  def __init__(self, *options):
    super(MyJob, self).__init__(*options)
    self.join_prefix('jvm.args', ' ', '-D%s=%s')

# we can now define JVM args using nested dictionaries
job = MyJob({'type': 'java', 'jvm.args': {'foo': 48, 'bar': 23}})
on_add(project, name, **kwargs)

Handler called when the job is added to a project.

Parameters:
  • projectProject instance
  • name – name corresponding to this job in the project.
  • kwargs – Keyword arguments. If this method is triggered by add_job(), the latter’s keyword arguments will simply be forwarded. Else if this method is triggered by a merge, kwargs will be a dictionary with single key 'merging' and value the merged project.

The default implementation does nothing.

azkaban.remote

Azkaban remote interaction module.

This contains the Session class which will be used for all interactions with a remote Azkaban server.

class azkaban.remote.Execution(session, exec_id)

Bases: object

Remote workflow execution.

Parameters:
  • sessionSession instance.
  • exec_id – Execution ID.
cancel()

Cancel execution.

job_logs(job, delay=5)

Job log generator.

Parameters:
  • job – job name
  • delay – time in seconds between each server poll

Yields line by line.

logs(delay=5)

Execution log generator.

Parameters:delay – time in seconds between each server poll

Yields line by line.

classmethod start(session, *args, **kwargs)

Convenience method to start a new execution.

Parameters:
status

Execution status.

url

Execution URL.

class azkaban.remote.Session(url=None, alias=None, config=None, attempts=3, verify=True)

Bases: object

Azkaban session.

Parameters:
  • url – HTTP endpoint (including protocol, port and optional user).
  • alias – Alias name.
  • config – Configuration object used to store session IDs.
  • attempts – Maximum number of attempts to refresh session.
  • verify – Whether or not to verify HTTPS requests.

This class contains mostly low-level methods that translate directly into Azkaban API calls. The Execution class should be preferred for interacting with workflow executions.

Note that each session’s ID is lazily updated. In particular, instantiating the Session doesn’t guarantee that its current ID (e.g. loaded from the configuration file) is valid.

cancel_execution(exec_id)

Cancel workflow execution.

Parameters:exec_id – Execution ID.
create_project(name, description)

Create project.

Parameters:
  • name – Project name.
  • description – Project description.
delete_project(name)

Delete a project on Azkaban.

Parameters:name – Project name.
classmethod from_alias(alias, config=None)

Create configured session from an alias.

Parameters:
  • alias – Alias name.
  • config – Azkaban configuration object.
get_execution_logs(exec_id, offset=0, limit=50000)

Get execution logs.

Parameters:
  • exec_id – Execution ID.
  • offset – Log offset.
  • limit – Size of log to download.
get_execution_status(exec_id)

Get status of an execution.

Parameters:exec_id – Execution ID.
get_job_logs(exec_id, job, offset=0, limit=50000)

Get logs from a job execution.

Parameters:
  • exec_id – Execution ID.
  • job – Job name.
  • offset – Log offset.
  • limit – Size of log to download.
get_projects()

Get a list of all projects.

get_running_workflows(project, flow)

Get running executions of a flow.

Parameters:
  • project – Project name.
  • flow – Flow name.

Note that if the project doesn’t exist, the Azkaban server will return a somewhat cryptic error Project 'null' not found., even though the name of the project isn’t null.

get_schedule(name, flow)

Get schedule information.

Parameters:
  • name – Project name.
  • flow – Name of flow in project.
get_sla(schedule_id)

Get SLA information.

Parameters:schedule_id – Schedule Id - obtainable from get_schedule
get_workflow_executions(project, flow, start=0, length=10)

Fetch executions of a flow.

Parameters:
  • project – Project name.
  • flow – Flow name.
  • start – Start index (inclusive) of the returned list.
  • length – Max length of the returned list.
get_workflow_info(name, flow)

Get list of jobs corresponding to a workflow.

Parameters:
  • name – Project name.
  • flow – Name of flow in project.
get_workflows(name)

Get list of workflows corresponding to a project

Parameters:name – Project name
is_valid(response=None)

Check if the current session ID is valid.

Parameters:response – If passed, this reponse will be used to determine the validity of the session. Otherwise a simple test request will be emitted.
run_workflow(name, flow, jobs=None, disabled_jobs=None, concurrent=True, properties=None, on_failure='finish', notify_early=False, emails=None)

Launch a workflow.

Parameters:
  • name – Name of the project.
  • flow – Name of the workflow.
  • jobs – List of names of jobs to run (run entire workflow by default). Mutually exclusive with disabled_jobs parameter.
  • disabled_jobs – List of names of jobs not to run. Mutually exclusive with jobs parameter.
  • concurrent – Run workflow concurrently with any previous executions. Can either be a boolean or a valid concurrency option string. Available string options: 'skip' (do not run flow if it is already running), 'concurrent' (run the flow in parallel with any current execution), 'pipeline:1' (pipeline the flow such that the current execution will not be overrun: block job A until the previous flow job A has completed), 'pipeline:2' (pipeline the flow such that the current execution will not be overrun: block job A until the previous flow job A’s _children_ have completed).
  • properties – Dictionary that will override global properties in this execution of the workflow. This dictionary will be flattened similarly to how Job options are handled.
  • on_failure – Set the execution behavior on job failure. Available options: 'finish' (finish currently running jobs, but do not start any others), 'continue' (continue executing jobs as long as dependencies are met),`’cancel’` (cancel all jobs immediately).
  • notify_early – Send any notification emails when the first job fails rather than when the entire workflow finishes.
  • emails – List of emails or pair of list of emails to be notified when the flow fails. Note that this will override any properties set in the worfklow. If a single list is passed, the emails will be used for both success and failure events. If a pair of lists is passed, the first will receive failure emails, the second success emails.

Note that in order to run a workflow on Azkaban, it must already have been uploaded and the corresponding user must have permissions to run it.

schedule_cron_workflow(name, flow, cron_expression, **kwargs)

Schedule a cron workflow.

Parameters:
  • name – Project name.
  • flow – Name of flow in project.
  • cron_expression – A CRON expression comprising 6 or 7 fields separated by white space that represents a set of times in Quartz Cron Format.
  • **kwargs – See run_workflow() for documentation.
schedule_workflow(name, flow, date, time, period=None, **kwargs)

Schedule a workflow.

Parameters:
  • name – Project name.
  • flow – Name of flow in project.
  • date – Date of the first run (possible values: '08/07/2014', '12/11/2015').
  • time – Time of the schedule (possible values: '9,21,PM,PDT', '10,30,AM,PDT').
  • period – Frequency to repeat. Consists of a number and a unit (possible values: '1s', '2m', '3h', '2M'). If not specified the flow will be run only once.
  • **kwargs – See run_workflow() for documentation.
set_sla(schedule_id, email, settings)

Set SLA for a schedule.

Parameters:
  • schedule_id – Schedule ID.
  • email – Array of emails to receive notifications.
  • settings

    Array of comma delimited strings of SLA settings consisting of:

    • job name - blank for full workflow
    • rule - SUCCESS or FINISH
    • duration - specified in hh:mm
    • email action - bool
    • kill action - bool
unschedule_workflow(name, flow)

Unschedule a workflow.

Parameters:
  • name – Project name.
  • flow – Name of flow in project.
upload_project(name, path, archive_name=None, callback=None)

Upload project archive.

Parameters:
  • name – Project name.
  • path – Local path to zip archive.
  • archive_name – Filename used for the archive uploaded to Azkaban. Defaults to basename(path).
  • callback – Callback forwarded to the streaming upload.

azkaban.util

Utility module.

class azkaban.util.Adapter(prefix, logger, extra=None)

Bases: logging.LoggerAdapter

Logger adapter that includes a prefix to all messages.

Parameters:
  • prefix – Prefix string.
  • logger – Logger instance where messages will be logged.
  • extra – Dictionary of contextual information, passed to the formatter.
process(msg, kwargs)

Adds a prefix to each message.

Parameters:
  • msg – Original message.
  • kwargs – Keyword arguments that will be forwarded to the formatter.
exception azkaban.util.AzkabanError(message, *args)

Bases: exceptions.Exception

Base error class.

class azkaban.util.Config(path=None)

Bases: object

Configuration class.

Parameters:path – path to configuration file. If no file exists at that location, the configuration parser will be empty. Defaults to ~/.azkabanrc.
get_file_handler(command)

Add and configure file handler.

Parameters:command – Command the options should be looked up for.

The default path can be configured via the default.log option in the command’s corresponding section.

get_option(command, name, default=None)

Get option value for a command.

Parameters:
  • command – Command the option should be looked up for.
  • name – Name of the option.
  • default – Default value to be returned if not found in the configuration file. If not provided, will raise AzkabanError.
save()

Save configuration parser back to file.

class azkaban.util.MultipartForm(files, params=None, callback=None, chunksize=4096)

Bases: object

Form allowing streaming.

Parameters:
  • files – List of filepaths. For more control, each file can also be represented as a dictionary with keys 'path', 'name', and 'type'.
  • params – Optional dictionary of parameters that will be included in the form.
  • callback – Arguments cur_bytes, tot_bytes, index.
  • chunksize – Size of each streamed file chunk.

Usage:

from requests import post

form = MultipartForm(files=['README.rst'])
post('http://your.url', headers=form.headers, data=form)
size

Total size of all the files to be streamed.

Note that this doesn’t include the bytes used for the header and parameters.

azkaban.util.catch(*error_classes)

Returns a decorator that catches errors and prints messages to stderr.

Parameters:
  • error_classes – Error classes.
  • log – Filepath to log file.

Also exits with status 1 if any errors are caught.

azkaban.util.flatten(dct, sep='.')

Flatten a nested dictionary.

Parameters:
  • dct – Dictionary to flatten.
  • sep – Separator used when concatenating keys.
azkaban.util.human_readable(size)

Transform size from bytes to human readable format (kB, MB, …).

Parameters:size – Size in bytes.
azkaban.util.read_properties(*paths)

Read options from a properties file and return them as a dictionary.

Parameters:*paths – Paths to properties file. In the case of multiple definitions of the same option, the latest takes precedence.

Note that not all features of .properties files are guaranteed to be supported.

azkaban.util.stream_file(path, chunksize)

Get iterator over a file’s contents.

Parameters:
  • path – Path to file.
  • chunksize – Bytes per chunk.
azkaban.util.suppress_urllib_warnings()

Capture urllib warnings if possible, else disable them (python 2.6).

azkaban.util.temppath(*args, **kwds)

Create a temporary filepath.

Usage:

with temppath() as path:
  # do stuff

Any file corresponding to the path will be automatically deleted afterwards.

azkaban.util.write_properties(options, path=None, header=None)

Write options to properties file.

Parameters:
  • options – Dictionary of options.
  • path – Path to file. Any existing file will be overwritten. Writes to stdout if no path is specified.
  • header – Optional comment to be included at the top of the file.