Conductors

Conductor

Overview

Conductors in TaskFlow provide a mechanism that unifies the various TaskFlow concepts under a single easy to use (as plug-and-play as we can make it) construct.

They are responsible for the following:

  • Interacting with jobboards (examining and claiming jobs).
  • Creating engines from the claimed jobs (using factories to reconstruct the contained tasks and flows to be executed).
  • Dispatching the engine using the provided persistence layer and engine configuration.
  • Completing or abandoning the claimed job (depending on dispatching and execution outcome).
  • Rinse and repeat.

Note

They are inspired by and have similar responsibilities as railroad conductors or musical conductors.

Considerations

Some usage considerations should be used when using a conductor to make sure it’s used in a safe and reliable manner. Eventually we hope to make these non-issues but for now they are worth mentioning.

Endless cycling

What: Jobs that fail (due to some type of internal error) on one conductor will be abandoned by that conductor and then another conductor may experience those same errors and abandon it (and repeat). This will create a job abandonment cycle that will continue for as long as the job exists in an claimable state.

Example:

Conductor cycling

Alleviate by:

  1. Forcefully delete jobs that have been failing continuously after a given number of conductor attempts. This can be either done manually or automatically via scripts (or other associated monitoring) or via the jobboards trash() method.
  2. Resolve the internal error’s cause (storage backend failure, other...).

Interfaces

class taskflow.conductors.base.Conductor(name, jobboard, persistence=None, engine=None, engine_options=None)[source]

Bases: object

Base for all conductor implementations.

Conductors act as entities which extract jobs from a jobboard, assign there work to some engine (using some desired configuration) and then wait for that work to complete. If the work fails then they abandon the claimed work (or if the process they are running in crashes or dies this abandonment happens automatically) and then another conductor at a later period of time will finish up the prior failed conductors work.

notifier

The conductor actions (or other state changes) notifier.

NOTE(harlowja): different conductor implementations may emit different events + event details at different times, so refer to your conductor documentation to know exactly what can and what can not be subscribed to.

connect(*args, **kwargs)[source]

Ensures the jobboard is connected (noop if it is already).

close(*args, **kwargs)[source]

Closes the contained jobboard, disallowing further use.

run()[source]

Continuously claims, runs, and consumes jobs (and repeat).

taskflow.conductors.backends.fetch(kind, name, jobboard, namespace='taskflow.conductors', **kwargs)[source]

Fetch a conductor backend with the given options.

This fetch method will look for the entrypoint ‘kind’ in the entrypoint namespace, and then attempt to instantiate that entrypoint using the provided name, jobboard and any board specific kwargs.

Implementations

Blocking

class taskflow.conductors.backends.impl_blocking.BlockingConductor(name, jobboard, persistence=None, engine=None, engine_options=None, wait_timeout=None)[source]

Bases: taskflow.conductors.base.Conductor

A conductor that runs jobs in its own dispatching loop.

This conductor iterates over jobs in the provided jobboard (waiting for the given timeout if no jobs exist) and attempts to claim them, work on those jobs in its local thread (blocking further work from being claimed and consumed) and then consume those work units after completetion. This process will repeat until the conductor has been stopped or other critical error occurs.

NOTE(harlowja): consumption occurs even if a engine fails to run due to a task failure. This is only skipped when an execution failure or a storage failure occurs which are usually correctable by re-running on a different conductor (storage failures and execution failures may be transient issues that can be worked around by later execution). If a job after completing can not be consumed or abandoned the conductor relies upon the jobboard capabilities to automatically abandon these jobs.

START_FINISH_EVENTS_EMITTED = ('compilation', 'preparation', 'validation', 'running')

Events will be emitted for the start and finish of each engine activity defined above, the actual event name that can be registered to subscribe to will be ${event}_start and ${event}_end where the ${event} in this pseudo-variable will be one of these events.

stop(timeout=None)[source]

Requests the conductor to stop dispatching.

This method can be used to request that a conductor stop its consumption & dispatching loop.

The method returns immediately regardless of whether the conductor has been stopped.

Deprecated since version 0.8: The timeout parameter is deprecated and is present for backward compatibility only. In order to wait for the conductor to gracefully shut down, wait() should be used instead.

wait(timeout=None)[source]

Waits for the conductor to gracefully exit.

This method waits for the conductor to gracefully exit. An optional timeout can be provided, which will cause the method to return within the specified timeout. If the timeout is reached, the returned value will be False.

Parameters:timeout – Maximum number of seconds that the wait() method should block for.

Hierarchy

Inheritance diagram of taskflow.conductors.base, taskflow.conductors.backends.impl_blocking