API Reference

fennel

class fennel.App(name: str, **kwargs)[source]

The app is the main abstraction provided by Fennel. Python functions are decorated via @app.task to enable background processing. All settings are configured on this object.

Parameters
  • name (str) – Used to identify this application, e.g. to set which tasks a worker will execute.

  • kwargs – Any settings found in fennel.settings.Settings

Examples

>>> from fennel import App
...
>>> app = App(
...     name='myapp',
...     redis_url='redis://127.0.0.1',
...     default_retries=3,
...     results_enabled=True,
...     log_level='info',
...     log_format='json',
...     autodiscover='**/tasks.py',
...     interface='sync',
... )
...
>>> @app.task(retries=1)
>>> def foo(x):
...     return x
...
>>> x = foo.delay(7)  # Execute in the background.
>>> x
AsyncResult(uuid=Tjr75jM3QDOHoLTLyrsY1g)
>>> x.get()  # Wait for the result.
7

If your code is running in an asynchronous event loop (e.g. via Starlette, FastAPI, Quart), you will want to use the async interface instead:

>>> import asyncio
...
>>> app = App(name='foo', interface='async')
...
>>> @app.task
>>> async def bar(x)
...     await asyncio.sleep(x)
...     return x
...
>>> x = await bar.delay(5)
>>> await x.status()
SENT
>>> await x.get()
5
task(func: Callable = None, *, name=None, retries=<object object>) → Any[source]

A decorator to register a function with the app to enable background processing via the task queue.

The worker process (see fennel.worker.worker) will need to discover all registered tasks on startup. The means all the modules containing tasks need to be imported. Fennel will import modules found via fennel.settings.Settings.autodiscover, which by default is '**/tasks.py'.

Parameters
  • func (Callable) – The decorated function.

  • name (str) – The representation used to uniquely identify this task.

  • retries (int) – The number of attempts at execution after a task has failed (meaning raised any exception).

Examples

Exposes an interface similar to Celery:

>>> @app.task(retries=1)
>>> def foo(x):
...     return x

Tasks can be enqueued for processing via:

>>> foo.delay(8)
AsyncResult(uuid=q_jb6KaUT-G4tOAoyQ0yaA)

The can also be called normally, bypassing the Fennel system entirely:

>>> foo(3)
3

By default, tasks are ‘fire-and-forget’, meaning we will not wait for their completion. They will be executed by worker process and will be retried automatically on failure (using exponential backoff), so we assume tasks are idempotent.

You can also wait for the result:

>>> x = foo.delay(4)
>>> x.status()
SENT
>>> x.get(timeout=10)
4

If instead you have many tasks and wish to wait for them to complete you can use the waiting primitives provided (you will want to ensure all tasks have retries=0, which you can set by default with an app setting):

>>> from fennel.client import gather, wait
>>> results = [foo.delay(x) for x in range(10)]
>>> gathered = gather(results)  # Or:
>>> done, pending = wait(results, timeout=2)

If your application is running in an event loop you can elect to use the async interface for your fennel app (see fennel.settings.Settings.interface), which uses aioredis under the hood to enqueue items, retrieve results, etc, so you will need to await those coroutines:

>>> app = App(name='foo', interface='async')
>>>
>>> @app.task
>>> async def bar(x)
...     await asyncio.sleep(x)
>>>
>>> x = await bar.delay(1)
>>> await x.status()
SUCCESS

fennel.settings

class fennel.settings.Settings[source]

Settings can be configured via environment variables or keyword arguments for the fennel.App instance (which take priority).

Examples

For environment variables, the prefix is FENNEL_, for instance:

FENNEL_REDIS_URL=redis://127.0.0.1:6379
FENNEL_DEFAULT_RETRIES=3
FENNEL_RESULTS_ENABLED=true

Or via App kwargs:

>>> from fennel import App
...
>>> app = App(
...     name='myapp',
...     redis_url='redis://127.0.0.1',
...     default_retries=3,
...     results_enabled=True,
...     log_level='info',
...     log_format='json',
...     autodiscover='**/tasks.py',
...     interface='sync',
... )
Parameters
  • redis_url (str) – Redis URL. Default 'redis://127.0.0.1:6369'

  • interface (str) – Which client interface should we use – sync or async? Default 'sync'

  • processes (int) – How many executor processes to run in each worker. Default multiprocessing.cpu_count()

  • concurrency (int) – How many concurrent consumers to run (we make at least this many Redis connections) in each executor process. The default, 8, can handle 160 req/s in a single executor process if each task is IO-bound and lasts on average 50ms. If you have long running CPU-bound tasks, you will want to run multiple executor processes (and set heartbeat_timeout to greater than your maximum expected task duration). Default 8

  • default_retries (int) – How many times to retry a task in case it raises an exception during execution. With 10 retries and the default fennel.utils.backoff() function, this will be approximately 30 days of retries. Default 10

  • retry_backoff (Callable) – Which algorithm to use to determine the retry schedule. The default is exponential backoff via fennel.utils.backoff().

  • read_timeout (int) – How many milliseconds to wait for messages in the main task queue. Default 4000

  • prefetch_count (int) – How many messages to read in a single call to XREADGROUP. Default 1

  • heartbeat_timeout (float) – How many seconds before an executor is considered dead if heartbeats are missed. If you have long-running CPU-bound tasks, this value should be greater than your maximum expected task duration. Default 60

  • heartbeat_interval (float) – How many seconds to sleep between heartbeats are stored for each executor process. Default 6

  • schedule_interval (float) – How many seconds to sleep between polling for scheduled tasks. Default 4

  • maintenance_interval (float) – How many seconds to sleep between running the maintenance script. Default 8

  • task_timeout (int) – How long to wait for results to be computed when calling .get(), seconds. Default 10

  • grace_period (int) – How many seconds to wait for in-flight tasks to complete before forcefully exiting. Default: 30

  • restults_enabled (bool) – Whether to store results. Can be disabled if your only use-case is ‘fire-and-forget’. Default True

  • results_ttl (int) – How long before expiring results in seconds. Default 3600 (one hour).

  • log_format (str) – Whether to pretty print a human-readable log (“console”) or JSON (“json”). Default 'console'

  • log_level (str) – The minimum log level to emit. Default 'debug'

  • autodiscover (str) – The pattern for pathlib.Path.glob() to find modules containing task-decorated functions, which the worker must import on startup. Will be called relative to current working directory. Can be set to the empty string to disable. Default '**/tasks.py'

fennel.worker

fennel.worker.worker.start(app, exit='signal')[source]

The main entrypoint for the worker.

The worker will create and monitor N fennel.worker.Executor processes. Each Executor will spawn M coroutines via an asyncio event loop. N and M are controlled by fennel.settings.Settings.processes and fennel.settings.Settings.concurrency respectively.

CPU-bound tasks benefit from multiple processes. IO-bound tasks will benefit from high executor concurrency.

Parameters
  • app (fennel.App) – The application instance for which to start a background worker.

  • exit (str) – The exit strategy. EXIT_SIGNAL is used when the worker should only stop on receipt of a interrupt or termination signal. EXIT_COMPLETE is used in tests to exit when all tasks from the queue have completed.

Notes

signal.SIGINT and signal.SIGTERM are handled by gracefully shutting down, which means giving the executor processes a chance to finish their current tasks.

class fennel.worker.executor.Executor(app)[source]

The Executor is responsible for reading jobs from the Redis queue and executing them.

Heartbeats are sent from the executor periodically (controlled by fennel.settings.Settings.heartbeat_interval). If they are missing for more than fennel.settings.Settings.heartbeat_timeout seconds, the executor will be assumed dead and all of its pending messages will be reinserted to the stream by another worker’s maintenance function.

Parameters

app (fennel.App) – The application instance for which to start an Executor.

start(exit: str = 'signal', queue: multiprocessing.context.BaseContext.Queue = None) → None[source]

Begin the main executor loop.

Parameters
  • exit (str) – The exit strategy. EXIT_SIGNAL is used when the worker should only stop on receipt of a interrupt or termination signal. EXIT_COMPLETE is used in tests to exit when all tasks from the queue have completed.

  • queue (multiprocessing.Queue) – A QueueHandler will be used to send logs to this queue to avoid interleaving from multiple processes.

Notes

Intended to run via fennel.worker.worker.start() which will supervise multiple Executor processes.

signal.SIGINT and signal.SIGTERM are handled by gracefully shutting down, which means giving the executor processes a chance to finish their current tasks.

is_running()[source]

fennel.client

A collection of synchronous classes and functions to interact with the Fennel system.

fennel.client.purge_dead(app, filter=<function <lambda>>, batchsize=100)[source]

Iterate over the dead-letter queue and delete any jobs for which filter(job) evaluates to True. The default is to delete all jobs.

fennel.client.read_dead(app, batchsize=100)[source]

Iterate over the dead-letter queue and return all job data.

fennel.client.replay_dead(app, filter=<function <lambda>>, batchsize=100)[source]

Iterate over the dead-letter queue and replay any jobs for which filter(job) evaluates to True. The default is to replay all jobs.

class fennel.client.AsyncResult(job: fennel.job.Job, app)[source]

A handle for a task that is being processed by workers via the task queue.

Conceptually similar to the AsyncResult from the mutliprocessing library.

status()[source]

Return the status of the task execution.

Examples

>>> @app.task
>>> def bar(x)
...     time.sleep(x)
...     return x
...
>>> x = bar.delay(5)
>>> x.status()
SENT
>>> x.status()  # After roughly 5 seconds...
SUCCESS
get(timeout: int = <object object>) → Any[source]

Wait for the result to become available and return it.

Raises

Examples

>>> @app.task(retries=0)
>>> def foo(x):
...     return x
...
>>> x = foo.delay(7)
>>> x.get()  # Wait for the result.
7

Warning

You must have results storage enabled (fennel.settings.Settings.results_enabled)

If you have retries enabled, they may be rescheduled many times, so you may prefer to use retries=0 for tasks whose result you intend to wait for.

class fennel.client.Task(name: str, func: Callable, retries: int, app)[source]
delay(*args: Any, **kwargs: Any) → fennel.client.results.AsyncResult[source]

Traditional Celery-like interface to enqueue a task for execution by the workers.

The args and kwargs will be passed through to the task when executed.

Examples

>>> @app.task
>>> def foo(x, bar=None):
...     time.sleep(x)
...     if bar == "mystr":
...         return False
...     return True
...
>>> foo.delay(1)
>>> foo.delay(2, bar="mystr")
__call__(*args: Any, **kwargs: Any) → Any[source]

Call the task-decorated function as a normal Python function. The fennel system will be completed bypassed.

Examples

>>> @app.task
>>> def foo(x):
...     return x
...
>>> foo(7)
7
fennel.client.gather(results: Iterable[fennel.client.results.AsyncResult], task_timeout=10, return_exceptions=True)[source]

Multi-result version of .get() – wait for all tasks to complete and return all of their results in order.

Has the same semantics as asyncio.gather.

fennel.client.wait(results: Iterable[fennel.client.results.AsyncResult], timeout: int, return_when='ALL_COMPLETED')[source]

Wait for all tasks to complete and return two sets of Futures (done, pending).

Has the same semantics as asyncio.wait.

fennel.aio.client

A collection of asynchronous classes and functions, expected to be run in an asyncio-compatible event loop, to interact with the Fennel system.

async fennel.client.aio.purge_dead(app, filter=<function <lambda>>, batchsize=100)[source]

Iterate over the dead-letter queue and delete any jobs for which filter(job) evaluates to True. The default is to delete all jobs.

async fennel.client.aio.read_dead(app, batchsize=100)[source]

Iterate over the dead-letter queue and return all job data.

async fennel.client.aio.replay_dead(app, filter=<function <lambda>>, batchsize=100)[source]

Iterate over the dead-letter queue and replay any jobs for which filter(job) evaluates to True. The default is to replay all jobs.

class fennel.client.aio.AsyncResult(job: fennel.job.Job, app)[source]

A handle for a task that is being processed by workers via the task queue.

Conceptually similar to the AsyncResult from the mutliprocessing library.

async status()[source]

Return the status of the task execution.

Examples

>>> @app.task
>>> async def bar(x)
...     await asyncio.sleep(x)
...     return x
...
>>> x = await bar.delay(5)
>>> await x.status()
SENT
>>> await x.status()  # After roughly 5 seconds...
SUCCESS
async get(timeout: int = <object object>) → Any[source]

Wait for the result to become available and return it.

Raises

Examples

>>> @app.task(retries=0)
>>> def foo(x):
...     return x
...
>>> x = await foo.delay(7)
>>> await x.get()  # Wait for the result.
7

Warning

You must have results storage enabled (fennel.settings.Settings.results_enabled)

If you have retries enabled, they may be rescheduled many times, so you may prefer to use retries=0 for tasks whose result you intend to wait for.

class fennel.client.aio.Task(name: str, func: Callable, retries: int, app)[source]
async delay(*args: Any, **kwargs: Any) → fennel.client.aio.results.AsyncResult[source]

Enqueue a task for execution by the workers.

Similar to asyncio.create_task (but also works with non-async functions and runs on our Redis-backed task queue with distributed workers, automatic retry, and result storage with configurable TTL).

The args and kwargs will be passed through to the task when executed.

Examples

>>> @app.task(retries=1)
>>> async def foo(x, bar=None):
...     asyncio.sleep(x)
...     if bar == "mystr":
...         return False
...     return True
...
>>> await foo.delay(1)
>>> await foo.delay(2, bar="mystr")
__call__(*args: Any, **kwargs: Any) → Any[source]

Call the task-decorated function as a normal Python function. The fennel system will be completed bypassed.

Examples

>>> @app.task
>>> def foo(x):
...     return x
...
>>> foo(7)
7
async fennel.client.aio.gather(results: Iterable[fennel.client.aio.results.AsyncResult], task_timeout=10, return_exceptions=True)[source]

Multi-result version of .get() – wait for all tasks to complete and return all of their results in order.

Has the same semantics as asyncio.gather.

async fennel.client.aio.wait(results: Iterable[fennel.client.aio.results.AsyncResult], timeout: int, return_when='ALL_COMPLETED')[source]

Wait for all tasks to complete and return two sets of Futures (done, pending).

Has the same semantics as asyncio.wait.

fennel.status

Jobs have a number of statuses through their lifecycle. This module contains the constants. If you have enqueued a task for execution, then you can obtain its status as follows:

>>> x = mytask.delay()
>>> x.status()
EXECUTING
fennel.status.UNKNOWN = 'UNKNOWN'

The job’s status is not stored in Redis. Presumably no action has been taken on the job.

fennel.status.SENT = 'SENT'

The job has been sent to Redis, but execution has not yet started.

fennel.status.EXECUTING = 'EXECUTING'

A worker has received the job from the queue and has begun executing it.

fennel.status.SUCCESS = 'SUCCESS'

Execution was successful and the job’s result is ready (if results storage is enabled).

fennel.status.RETRY = 'RETRY'

Execution was not successful (an exception was raised) and a retry is scheduled to occur in the future.

fennel.status.DEAD = 'DEAD'

Execution was not successful (an exception was raised) and retries have been exhausted, so the job is now in the dead-letter queue where it will remain until manual intervention (via the CLI or client code).

fennel.exceptions

exception fennel.exceptions.FennelException[source]
exception fennel.exceptions.TaskFailed(original_type: str, original_args: List)[source]

This exception is returned by worker processes which experienced an exception when executing a task.

Parameters
  • original_type (str) – The name of the original exception, e.g. 'ValueError'.

  • original_args (List) – The arguments given to the original exception, e.g. ['Not found']

Examples

>>> @app.task(retries=0)
>>> async def foo(n):
...     raise Exception("baz")
...
>>> x = await foo.delay(3)
>>> try:
...     result = await x.get()
>>> except TaskFailed as e:
...     assert e.original_type == "Exception"
...     assert e.original_args == ["baz"]
exception fennel.exceptions.ResultsDisabled[source]

Raised when results_enabled=False and code attempts to access a tasks result via .get().

exception fennel.exceptions.UnknownTask[source]

Raised by a worker process if it is unable to find a Python function corresponding to the task it has read from the queue.

exception fennel.exceptions.Timeout[source]

Raised by client code when a given timeout is exceeded when waiting for results to arrive.

exception fennel.exceptions.JobNotFound[source]

Raised by client code when attempting to retrieve job information that cannot be found in Redis.

exception fennel.exceptions.Chaos[source]

Used in tests to ensure failures are handled properly.

exception fennel.exceptions.Completed[source]

Used internally to shutdown an Executor if the exit condition is completing all tasks.

fennel.utils

fennel.utils.backoff(retries: int, jitter: bool = True) → int[source]

Compute duration (seconds) to wait before retrying using exponential backoff with jitter based on the number of retries a message has already experienced.

The minimum returned value is 1s The maximum returned value is 604800s (7 days)

With max_retries=9, you will have roughly 30 days to fix and redeploy the the task code.

Parameters
  • retries (int) – How many retries have already been attemped.

  • jitter (bool) – Whether to add random noise to the return value (recommended).

Notes

https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/

fennel.job

class fennel.job.Job(task: str, args: List, kwargs: Dict, tries: int = 0, max_retries: int = 9, exception: Dict = <factory>, return_value: Any = None, status: str = 'UNKNOWN', uuid: str = <factory>)[source]

The internal representation of a job.

Parameters
  • task (str) – The name of the task. By default will use f"{func.__module__}.{func.__qualname__}", where func is the Python callable.

  • args (List) – The job’s args.

  • kwargs (Dict) – The job’s kwargs.

  • tries (int) – The number of attempted executions.

  • max_retries (int) – The maximum number of retries to attempt after failure.

  • exception (Dict) – Exception information for the latest failure, contains ‘original_type’ (str, e.g. ‘ValueError’) and ‘original_args’ (List, e.g. [‘Not found’]).

  • return_value (Any) – The return value of the Python callable when execution succeeds.

  • status (str) – One of fennel.status, the current lifecycle stage.

  • uuid (str) – Base64-encoded unique identifier.