Error Handling¶
quent provides two mechanisms for dealing with errors in pipelines: .except_() for catching exceptions and .finally_() for cleanup. Each can be registered at most once per pipeline -- a second registration raises QuentException.
Both work transparently with sync and async callables. For per-step error handling, use nested pipelines -- each nested pipeline gets its own except/finally pair.
Design: One Handler Per Pipeline¶
Pipelines support exactly one exception handler and one cleanup handler each. This constraint keeps the execution model simple and predictable: you always know which handler runs, in what order, and what happens to the exception.
For per-step error handling, compose nested pipelines:
from quent import Q
# Each nested pipeline gets its own except_
step_with_fallback = (
Q()
.then(risky_operation)
.except_(lambda ei: 'fallback')
)
pipeline = (
Q()
.then(step_with_fallback) # has its own error handling
.then(next_step)
.except_(handle_pipeline_error) # separate handler for the outer pipeline
)
except_ -- Exception Handling¶
Signature¶
Register an exception handler for the pipeline. When an exception occurs during pipeline execution, the handler is called with the exception and (optionally) additional arguments.
Basic Usage¶
from quent import Q, QuentExcInfo
def handle_error(ei: QuentExcInfo):
print(f"Error: {ei.exc}")
return "fallback value"
result = (
Q(data)
.then(process)
.then(save)
.except_(handle_error)
.run()
)
# If process() or save() raises, handle_error receives QuentExcInfo(exc, root_value).
# The handler's return value ("fallback value") becomes the pipeline's result.
What except_ Catches¶
By default, .except_() catches Exception -- not BaseException. This follows Python convention: system signals like KeyboardInterrupt and SystemExit propagate without interference.
# Default: catches Exception and its subclasses
.except_(handler)
# Catch specific exception types
.except_(handler, exceptions=ValueError)
# Catch multiple exception types
.except_(handler, exceptions=(ConnectionError, TimeoutError))
The exceptions parameter accepts a single exception type or an iterable of types.
Validation at registration time:
- An empty iterable raises
QuentException. - Non-
BaseExceptionsubclasses raiseTypeError. - String values raise
TypeError(common mistake: passing"ValueError"instead ofValueError).
Warning
If you configure except_() to catch a BaseException subclass that is not an Exception subclass (e.g., KeyboardInterrupt), quent emits a RuntimeWarning advising you to consider Exception instead. Catching system signals can suppress critical shutdown behavior.
Except Handler Calling Convention¶
Important
The except_() handler follows the same 2-rule calling convention as all other pipeline contexts. The only difference is what constitutes the "current value": for except handlers, it is QuentExcInfo(exc, root_value) rather than the pipeline value.
How It Works¶
The handler receives a QuentExcInfo NamedTuple as its current value. The standard 2-rule dispatch then applies:
| Registration | Handler invocation |
|---|---|
except_(handler) |
handler(QuentExcInfo(exc, root_value)) |
except_(handler, arg1, arg2) |
handler(arg1, arg2) -- QuentExcInfo NOT passed |
except_(handler, key=val) |
handler(key=val) -- QuentExcInfo NOT passed |
Access the exception via .exc and the root value via .root_value on the QuentExcInfo NamedTuple. The root_value is normalized to None if the pipeline was created with no root value.
from quent import Q, QuentExcInfo
def handle_error(ei: QuentExcInfo):
print(f"Failed on input {ei.root_value}: {ei.exc}")
return "default"
.except_(handle_error)
# calls: handle_error(QuentExcInfo(exc, root_value))
Consumption vs Re-raise¶
The reraise parameter controls what happens after the handler runs:
reraise=False (default) -- Consume the Exception¶
The handler's return value replaces the pipeline's result. The exception is consumed -- it does not propagate. The pipeline is considered to have succeeded.
from quent import Q
result = (
Q(42)
.then(lambda x: x / 0)
.except_(lambda ei: f"failed on {ei.root_value}: {ei.exc}")
.run()
)
# No exception propagates
# result = the handler's return value
reraise=True -- Re-raise After Handler¶
The handler runs for side-effects only (e.g., logging, alerting). After the handler completes, the original exception is re-raised. The handler's return value is ignored.
from quent import Q
def notify_admin(ei):
send_alert(f"Pipeline failed: {ei.exc}")
result = (
Q(data)
.then(process)
.except_(notify_admin, reraise=True)
.run()
)
# notify_admin is called with QuentExcInfo, then the original exception is re-raised.
Handler Failure¶
With reraise=True¶
When the handler itself raises while reraise=True:
Exceptionsubclass: The handler's exception is discarded. ARuntimeWarningis emitted, a note is attached to the original exception (Python 3.11+), and the original exception is re-raised. The caller always sees the original failure.BaseExceptionsubclass (e.g.,KeyboardInterrupt): The handler's exception propagates naturally -- system signals are never suppressed.
With reraise=False¶
When the handler raises while reraise=False, the handler's exception propagates. The original pipeline exception is set as __cause__ (via raise handler_exc from original_exc).
finally_ -- Cleanup¶
Signature¶
Register a cleanup handler that always runs, whether the pipeline succeeds or fails.
Key Behaviors¶
- Always runs -- on both success and failure paths, matching Python's
try/finallysemantics. - Receives the root value -- the pipeline's original input, not the current intermediate value. Normalized to
Noneif no root value exists. - Return value is always discarded -- the finally handler cannot alter the pipeline's result.
- Follows the standard calling convention -- not the except handler's special convention.
from quent import Q
def cleanup(original_input):
print(f"Cleaning up for input: {original_input}")
release_resources()
result = (
Q(resource_id)
.then(acquire_resource)
.then(process_resource)
.finally_(cleanup)
.run()
)
# cleanup always runs, receiving resource_id (the root value).
# Its return value is discarded.
Calling Conventions¶
The finally handler follows the standard calling conventions (2 rules), with the root value (normalized to None if absent) as the current value:
# handler(root_value) -- default
.finally_(cleanup)
# handler(arg1) -- explicit args override root value
.finally_(cleanup, some_resource)
Finally Handler Failure¶
- Exception already active: The finally handler's exception replaces the original exception (matching Python's
try/finallybehavior). The original is preserved as__context__. - Success path: The finally handler's exception propagates as the pipeline's error.
Async Finally in Sync Pipelines¶
When a sync pipeline's finally handler returns a coroutine, the engine performs an async transition: run() returns a coroutine instead of a plain value. When the caller awaits this coroutine, the finally handler's coroutine is awaited first, and then the pipeline's result is returned (success path) or the active exception is re-raised (failure path).
See Async Handling for details on the sync/async transition model.
Control Flow Restrictions¶
Using Q.return_() or Q.break_() inside an except_() or finally_() handler raises QuentException:
from quent import Q
def bad_handler(ei):
return Q.return_("value") # raises QuentException
Q(data).then(process).except_(bad_handler).run()
# QuentException: Using control flow signals inside except handlers is not allowed.
Control flow signals are not allowed in error or cleanup handlers -- they must be used in the main pipeline.
Execution Order¶
The full error handling flow:
pipeline executes
|
+--> step raises exception:
| |
| +--> exception matches `exceptions`?
| | |
| | +--> YES: except_ handler runs
| | | |
| | | +--> reraise=False: handler result = pipeline result
| | | | finally_ runs in success context
| | | |
| | | +--> reraise=True: handler runs for side-effects
| | | original exception re-raised
| | | finally_ runs in failure context
| | |
| | +--> NO: exception propagates
| | finally_ runs in failure context
| |
+--> success:
|
+--> finally_ runs in success context
The order of registration (.except_() vs .finally_()) does not matter -- they always execute in the order above.
ExceptionGroup from Concurrent Operations¶
When concurrent operations (gather(), or foreach()/foreach_do() with concurrency) encounter multiple failures, the exceptions are wrapped in an ExceptionGroup:
try:
await Q(data).gather(validate, enrich, score).run()
except ExceptionGroup as eg:
for exc in eg.exceptions:
print(exc)
- Python 3.11+: Uses the builtin
ExceptionGroup. - Python 3.10: Uses a polyfill that implements
.exceptions,.subgroup(),.split(), and.derive().
When one function/worker fails, the exception propagates directly (not wrapped in an ExceptionGroup).
The ExceptionGroup message indicates the source and count:
"gather() encountered 3 exceptions""foreach() encountered 2 exceptions"
Enhanced Tracebacks¶
When an exception occurs inside a pipeline, quent automatically injects a visualization into the traceback showing the full pipeline and marking the failing step.
Example Output¶
Traceback (most recent call last):
File "example.py", line 28, in <module>
.run()
^^^^^
File "<quent>", line 1, in
Q(fetch_data, 42)
.then(validate) <----
.then(transform)
.then(save)
File "example.py", line 11, in validate
raise ValueError("Value too large")
ValueError: Value too large
What quent Does¶
- Pipeline visualization: A synthetic
<quent>frame is injected showing every step. The<----marker identifies the failing step. - Frame cleaning: Internal quent frames are removed. You see only your code and the
<quent>visualization. - Exception notes (Python 3.11+): A one-line note is attached via
add_note(), surviving traceback reformatting. - Chained exceptions: Frames are cleaned from
__cause__,__context__, andExceptionGroupsub-exceptions.
Nested Pipeline Visualization¶
Nested pipelines are rendered with indentation:
First-Write-Wins Error Marker¶
The <---- marker always points to the innermost (deepest) failing step. When an exception propagates through nested pipelines, only the original failure point is recorded.
Controlling Tracebacks¶
QUENT_NO_TRACEBACK=1¶
Disables all traceback modifications. Must be set before importing quent:
QUENT_TRACEBACK_VALUES=0¶
Suppresses argument values in visualizations while preserving step names and structure. Useful in production to prevent sensitive data from leaking into logs:
Nested Pipelines for Per-Step Error Handling¶
Since each pipeline allows only one except_(), use nested pipelines for granular error handling:
from quent import Q
# Step with its own error recovery
fetch_with_fallback = (
Q()
.then(fetch_from_primary)
.except_(lambda ei: fetch_from_backup())
)
# Step with its own error recovery
save_with_retry = (
Q()
.then(save_to_database)
.except_(lambda ei: retry_save(ei.root_value), reraise=True)
)
pipeline = (
Q()
.then(fetch_with_fallback)
.then(validate)
.then(save_with_retry)
.except_(handle_pipeline_error) # catches anything not handled above
.finally_(cleanup)
)
Each nested pipeline's handlers apply only to that pipeline's execution. Unhandled exceptions propagate to the outer pipeline.
Practical Patterns¶
Logging and Re-raising¶
from quent import Q
pipeline = (
Q()
.then(fetch)
.then(process)
.except_(lambda ei: logger.error(f"Pipeline failed: {ei.exc}"), reraise=True)
.finally_(lambda rv: logger.info(f"Pipeline completed for: {rv}"))
)
Fallback Values¶
from quent import Q
pipeline = (
Q()
.then(fetch_from_cache)
.except_(lambda ei: fetch_from_database())
)
Selective Exception Handling¶
from quent import Q
pipeline = (
Q()
.then(fetch)
.then(process)
.except_(
lambda ei: {'error': str(ei.exc), 'input': ei.root_value},
exceptions=(ConnectionError, TimeoutError)
)
.finally_(cleanup)
)
# Only ConnectionError and TimeoutError are caught.
# All other exceptions propagate directly.
Summary¶
| Feature | Limit | Default catches | Handler receives | Return value |
|---|---|---|---|---|
.except_() |
1 per pipeline | Exception |
QuentExcInfo(exc, root_value) as current value, standard 2-rule calling convention |
Replaces pipeline result (or re-raises with reraise=True) |
.finally_() |
1 per pipeline | N/A (always runs) | Root value (standard calling convention) | Always discarded |
Next Steps¶
- Pipelines -- pipeline building, context managers, conditionals, and control flow
- Async Handling -- sync/async bridging, async transition patterns
- Reuse and Patterns -- cloning, nesting, decorators, and composition