Skip to content

API Reference

All public exports are available from the top-level package:

from quent import (
  Q, QuentExcInfo, QuentIterator, QuentException,
  __version__,
)

Q

class Q(v=<no value>, /, *args, **kwargs)

A sequential pipeline that transparently bridges synchronous and asynchronous operations. Steps are appended with fluent methods (.then(), .do(), etc.) and the pipeline is executed with .run().

Constructor

Q(v=<no value>, /, *args, **kwargs)

Create a new pipeline with an optional root value or callable.

Parameter Type Description
v Any Root value or callable. Defaults to no value (the internal Null sentinel, distinct from None).
*args Any Positional arguments passed to v when it is callable.
**kwargs Any Keyword arguments passed to v when it is callable.

Returns: Q instance.

Raises: TypeError if v is not callable but args or kwargs are provided.

# Q with a literal root value
Q(5)

# Q with a callable root — called at run time
Q(fetch_data, user_id)

# Q with no root value (value injected at run time)
Q()

Q(None) vs Q()

Q(None) creates a pipeline with root value None. Q() creates a pipeline with no root value -- the two are distinct. When the current value is None, a callable step receives fn(None). When there is no value (internal Null), a callable step receives fn() with zero arguments. See Null Sentinel.


Pipeline Building Methods

All pipeline building methods return self (Q), enabling fluent chaining. These methods record intent at build time -- actual execution happens during .run().

then

q.then(v, /, *args, **kwargs) -> Q

Append a step whose result replaces the current pipeline value. v can be any value -- if callable, it is called according to the calling conventions; if not callable, it is used as-is.

Parameter Type Description
v Any Value or callable to evaluate.
*args Any Positional arguments for v.
**kwargs Any Keyword arguments for v.

Returns: self (Q).

Raises: TypeError if v is not callable but args or kwargs are provided.

Q(5).then(lambda x: x * 2).run()
# 10

# Non-callable: replaces the current value with a literal
Q(5).then('hello').run()
# 'hello'

# With explicit arguments (current value is NOT passed)
Q(5).then(pow, 2, 10).run()
# pow(2, 10) = 1024

Tip

Because then accepts any value, you can use it to inject constants into the pipeline: .then(42) replaces the current value with 42.


do

q.do(fn, /, *args, **kwargs) -> Q

Append a side-effect step. fn must be callable (raises TypeError otherwise). The result of fn is discarded -- the current pipeline value passes through unchanged. If fn returns an awaitable, it is still awaited (to complete the side-effect), but the resolved value is discarded.

Parameter Type Description
fn Callable Callable to execute as a side-effect.
*args Any Positional arguments for fn.
**kwargs Any Keyword arguments for fn.

Returns: self (Q).

Raises: TypeError if fn is not callable.

Q(5).do(print).then(lambda x: x * 2).run()
# prints: 5
# result: 10

Why do() requires a callable

A non-callable .do(42) would evaluate 42 as a literal, discard it, and pass the current value through -- indistinguishable from not having the step at all. Requiring callability catches this mistake at build time.


foreach

q.foreach(fn=None, /, *, concurrency=None, executor=None) -> Q

Apply fn to each element of the current iterable value. Collects results into a list that replaces the current value. When fn is None, elements are collected as-is (identity mode).

Parameter Type Default Description
fn Callable \| None None Callable applied to each element, or None for identity collection.
concurrency int \| None None Maximum concurrent executions. None = sequential. See Concurrent Execution.
executor Executor \| None None Optional executor for sync concurrent execution. When provided, used instead of creating a new ThreadPoolExecutor (caller manages lifecycle).

Returns: self (Q). The current value becomes list[result].

Raises:

  • TypeError if fn is provided and not callable.
  • TypeError if concurrency is not an integer (including bool).
  • ValueError if concurrency is less than 1 (excluding -1 for unbounded).
Q([1, 2, 3]).foreach(lambda x: x ** 2).run()
# [1, 4, 9]

# Identity mode -- collect elements as-is
Q(range(5)).foreach().run()
# [0, 1, 2, 3, 4]

# With concurrency: process up to 4 items concurrently
Q(urls).foreach(fetch, concurrency=4).run()

Supports both sync iterables (__iter__) and async iterables (__aiter__). When both protocols are present, the async protocol is preferred if an async event loop is running (asyncio, trio, or curio). Supports mid-iteration async transition -- if fn returns an awaitable for any element, the operation transitions to async automatically.

Use Q.break_() inside fn to stop iteration early. Without a value, partial results collected so far are returned. With a value, it is appended to the partial results.


foreach_do

q.foreach_do(fn, /, *, concurrency=None, executor=None) -> Q

Apply fn as a side-effect to each element. The original elements are collected into a list (fn's return values are discarded).

Parameter Type Default Description
fn Callable (required) Side-effect callable applied to each element.
concurrency int \| None None Maximum concurrent executions. None = sequential. See Concurrent Execution.
executor Executor \| None None Optional executor for sync concurrent execution. When provided, used instead of creating a new ThreadPoolExecutor (caller manages lifecycle).

Returns: self (Q). The current value becomes a list of the original elements.

Raises: Same as foreach().

Q([1, 2, 3]).foreach_do(print).run()
# prints: 1, 2, 3
# result: [1, 2, 3]

All execution mechanics (concurrent, error, break behavior) are identical to foreach().


gather

q.gather(*fns, concurrency=-1, executor=None) -> Q

Run multiple functions on the current pipeline value concurrently. Returns a tuple of results in the same order as fns.

Parameter Type Default Description
*fns Callable (required) One or more callables. Each receives the current value.
concurrency int -1 Maximum simultaneous executions. -1 = unbounded (all run concurrently). See Concurrent Execution.
executor Executor \| None None Optional executor for sync concurrent execution. When provided, used instead of creating a new ThreadPoolExecutor (caller manages lifecycle).

Returns: self (Q). The current value becomes a tuple of results.

Raises:

  • QuentException if zero functions are provided.
  • TypeError if any fn is not callable.
  • TypeError if concurrency is not an integer (including bool).
  • ValueError if concurrency is less than 1 (excluding -1 for unbounded).
Q(10).gather(
  lambda x: x + 1,
  lambda x: x * 2,
  lambda x: x ** 2,
).run()
# (11, 20, 100)

Gather is always concurrent

Unlike foreach()/foreach_do() which default to sequential, gather() is always concurrent. In sync mode it uses ThreadPoolExecutor; in async mode it uses TaskGroup (3.11+) or asyncio.gather (3.10). There is no sequential fallback.

ExceptionGroup

When multiple gathered functions raise concurrently, the exceptions are wrapped in an ExceptionGroup. A single failure propagates directly (no wrapping).

Control flow signals inside gather() workers

  • Q.return_() inside a worker returns from that worker -- the value becomes that gather position's tuple element. Sibling workers continue. (Changed in 7.0.0; pre-7.0 it exited the entire pipeline -- use Q.exit_() for that.)
  • Q.break_() inside a worker raises QuentException -- gather is concurrent fan-out, not an iteration scope.
  • Q.exit_() inside a worker propagates outward; sibling tasks are cancelled per asyncio/threadpool semantics; absorbed at the outermost run().

with_

q.with_(fn, /, *args, **kwargs) -> Q

Enter the current pipeline value as a context manager, call fn with the context value (the result of __enter__ or __aenter__). fn's result replaces the current pipeline value. Works with both sync and async context managers.

Parameter Type Description
fn Callable Callable to evaluate with the context value. Must be callable.
*args Any Positional arguments for fn.
**kwargs Any Keyword arguments for fn.

Returns: self (Q).

Raises: TypeError if fn is not callable.

Q(open('data.txt')).with_(lambda f: f.read()).run()
# reads file contents; file is properly closed

Exception suppression: If fn raises and __exit__ returns a truthy value, the pipeline continues with None as the current value.

Dual-protocol objects: When the current value supports both sync and async context manager protocols and an async event loop is running (asyncio, trio, or curio), the async protocol is preferred.

Control flow signals: If fn raises return_(), break_(), or exit_(), __exit__ is called with no exception info (clean exit), and the signal propagates per §7.1/§7.2/§7.5.


with_do

q.with_do(fn, /, *args, **kwargs) -> Q

Same as with_() but fn's result is discarded. The pipeline value remains the original value (the context manager object itself, not the __enter__ result).

Parameter Type Description
fn Callable Callable to execute as a side-effect inside the context.
*args Any Positional arguments for fn.
**kwargs Any Keyword arguments for fn.

Returns: self (Q).

Raises: TypeError if fn is not callable.

If an exception is suppressed by __exit__, the original pipeline value passes through (not None -- unlike with_).


if_

q.if_(predicate=None, /, *args, **kwargs) -> Q

Begin a conditional branch. Must be followed by .then() or .do(), which registers the truthy branch. When predicate is None, the truthiness of the current pipeline value is used. When predicate is callable, it is invoked per the standard 2-rule calling convention. When predicate is a non-callable value, its truthiness is used directly.

Parameter Type Description
predicate Any \| None Callable, literal value, or None (uses current value truthiness).
*args Any Positional arguments forwarded to the predicate callable.
**kwargs Any Keyword arguments forwarded to the predicate callable.

Returns: self (Q). The next .then() or .do() becomes the truthy branch.

Raises: QuentException if if_() is called while another if_() is already pending.

Call .then(fn, *args, **kwargs) or .do(fn, *args, **kwargs) after if_() to specify the truthy branch:

Q(data).if_(lambda x: len(x) > 0).then(process).run()

# with arguments passed to the branch function
Q(data).if_(predicate).then(save, 'backup', compress=True).run()

# else branch
Q(-5).if_(lambda x: x > 0).then(str).else_(abs).run()  # 5

Predicate semantics

When predicate is None and the pipeline has no current value (Null), the predicate evaluates to falsy.


else_

q.else_(v, /, *args, **kwargs) -> Q

Register an else branch for the immediately preceding if_(). Evaluated when if_()'s predicate was falsy.

Parameter Type Description
v Any Value or callable for the else branch.
*args Any Positional arguments for v.
**kwargs Any Keyword arguments for v.

Returns: self (Q).

Raises: QuentException if not immediately preceded by if_(), or if a second else_() is registered on the same if_().

Q(value).if_(
  lambda x: x > 0
).then(
  process_positive
).else_(
  process_negative
).run()

Warning

else_() must be called immediately after .then()/.do() on an if_() with no other operations in between. Calling else_() while if_() is still pending (no .then()/.do() registered) raises QuentException.


name

q.name(label, /) -> Q

Assign a user-provided label for traceback identification. The label appears in pipeline visualizations (Q[label](root)), exception notes, and repr(q). No effect on execution semantics -- purely for debuggability.

Parameter Type Description
label str A short descriptive string identifying this pipeline.

Returns: self (Q).

Q(fetch).name('auth_pipeline').then(validate).run()

while_

q.while_(predicate=None, /, *args, **kwargs) -> Q

Begin a loop. Must be followed by .then() or .do(), which registers the loop body. The loop repeatedly evaluates the body while the predicate is truthy.

Parameter Type Description
predicate Any \| None Callable, literal value, or None (uses current value truthiness).
*args Any Positional arguments forwarded to the predicate callable.
**kwargs Any Keyword arguments forwarded to the predicate callable.

Returns: self (Q). The next .then() or .do() becomes the loop body.

Raises:

  • QuentException if while_() is called while an if_() is already pending.
  • QuentException if while_() is called while another while_() is already pending.
  • QuentException if any method other than .then() or .do() is called while while_() is pending.
  • TypeError if predicate is a non-callable, non-None literal and args/kwargs are provided.

Body modes:

  • .then(fn): fn's result feeds back as the loop value each iteration. When the loop terminates, the final loop value replaces the current pipeline value.
  • .do(fn): fn runs for side effects; its return value is discarded. The loop value is unchanged.
# Decrement until zero — default predicate tests truthiness
Q(10).while_().then(lambda x: x - 1).run()  # 0

# Predicate callable — halve while value exceeds 1
Q(100).while_(lambda x: x > 1).then(lambda x: x // 2).run()  # 1

# break_() to exit early with a value
Q(1).while_(True).then(lambda x: Q.break_(x) if x >= 100 else x * 2).run()  # 128

# Nested pipeline for combining while with if
Q(data).while_(has_more).then(
  Q().if_(is_valid).then(process).else_(skip)
).run()

Infinite loops with .do()

When using .do() with while_(), the loop value never changes. If the predicate tests the loop value (including the default None predicate), this creates an infinite loop. Use break_() to exit, or use .then() to transform the loop value.

break_() semantics in while_

break_() in while_ preserves the current loop value (or uses the break value if provided). This differs from foreach, where break_() preserves partial results collected so far.

else_() not supported

else_() / else_do() after while_().then() or while_().do() raises QuentException — else branches are only valid after if_(), not after while_().


drive_gen

q.drive_gen(fn, /) -> Q

Drive a sync or async generator bidirectionally using Python's generator send protocol. The step function fn processes each yielded value; its return is sent back into the generator. When the generator stops, the last fn result becomes the pipeline value.

Parameter Type Description
fn Callable Step function. Called as fn(yielded_value) for each value the generator yields.

Returns: self (Q).

Raises:

  • TypeError if fn is not callable.
  • TypeError if the current pipeline value at execution time is not a sync generator, async generator, or callable that produces one.

Prerequisite: The current pipeline value must be a generator (sync or async) or a callable that produces one.

Behavior:

  1. If the current value is callable (not already a generator), invoke it to obtain the generator.
  2. Get the first yielded value (next(gen) or await gen.__anext__()).
  3. Call result = fn(yielded_value). If result is awaitable, await it.
  4. Send result back (gen.send(result) or await gen.asend(result)).
  5. Repeat from step 3 until StopIteration / StopAsyncIteration.
  6. The last fn result becomes the current pipeline value.

If the generator yields nothing (immediate stop), the pipeline value is None.

def gen():
  x = yield 1
  x = yield x + 1
  x = yield x + 1

Q(gen()).drive_gen(lambda x: x * 2).run()
# Flow: yield 1 → fn(1)=2 → send 2 → yield 3 → fn(3)=6 → send 6 → yield 7 → fn(7)=14
# returns: 14

# Async generator
result = await Q(async_gen()).drive_gen(process_request).run()

# Callable that produces a generator
Q(lambda: gen()).drive_gen(step_fn).run()

# Compose with error handling
Q(gen()).drive_gen(step_fn).except_(handle_error).finally_(cleanup).run()

Calling convention

Unlike other pipeline steps, drive_gen's step function does not follow the standard 2-rule calling convention. The yielded value is always passed directly as fn(yielded_value) — no args/kwargs dispatch.

Error semantics

Exceptions from fn or gen.send() propagate out of drive_gen. The generator is always closed in cleanup (via gen.close() or await gen.aclose()). Exceptions are NOT injected into the generator (no gen.throw()).

Control flow signals inside fn

  • Q.return_() inside fn returns from fn -- value becomes the pipeline CV (drive_gen's normal "last fn result → CV"); subsequent steps run. Generator is closed in cleanup. (Changed in 7.0.0; pre-7.0 it exited the entire pipeline -- use Q.exit_() for that.)
  • Q.break_() inside fn propagates outward through drive_gen toward the nearest enclosing iteration scope. Generator is closed first via close()/aclose() in the cleanup finally:, then the signal propagates.
  • Q.exit_() inside fn propagates outward; absorbed at the outermost run(). Generator is closed in cleanup.

Concurrent Execution

The concurrency parameter is available on .foreach(), .foreach_do(), and .gather().

concurrency Sync mode Async mode
None (default) Sequential (foreach/foreach_do). All concurrent (gather). Sequential (foreach/foreach_do). All concurrent (gather).
-1 (unbounded) ThreadPoolExecutor with one worker per item/fn All tasks launched concurrently (no semaphore)
Positive integer ThreadPoolExecutor(max_workers=concurrency) asyncio.Semaphore(concurrency) to limit concurrent tasks

Validation: Must be a positive integer (>= 1), -1 (unbounded), or None. Booleans are rejected (TypeError). Values less than 1 (excluding -1) raise ValueError.

Sync/async detection: The first item/function is probed. If it returns an awaitable, async path is used. If not, sync path. Mixed sync/async within a single concurrent operation raises TypeError.

Concurrent iteration materializes eagerly: The entire input iterable is converted to a list before processing begins. Do not use with infinite or very large iterables.

Unbounded concurrency

Use concurrency=-1 for unbounded concurrent execution. The effective concurrency equals the number of items at runtime.

gather() concurrency default

None applies only to foreach()/foreach_do(). gather() does not accept None; its default is -1 (unbounded).


Error Handling Methods

except_

q.except_(fn, /, *args, exceptions=None, reraise=False, **kwargs) -> Q

Register an exception handler. Only one except_ per pipeline.

Parameter Type Default Description
fn Callable (required) Handler callable.
*args Any Positional arguments for fn.
exceptions type \| Iterable[type] \| None None Exception types to catch. Defaults to (Exception,) when None.
reraise bool False When True, re-raise the original exception after running the handler.
**kwargs Any Keyword arguments for fn.

Returns: self (Q).

Raises:

  • TypeError if fn is not callable.
  • QuentException if except_ is already registered.
  • QuentException if exceptions is an empty iterable.
  • TypeError if any value in exceptions is not a BaseException subclass.
  • TypeError if a string is passed as exceptions (common mistake: "ValueError" instead of ValueError).

Handler calling convention (uses the standard 2-rule convention):

The handler receives a QuentExcInfo(exc, root_value) as its current value. The standard 2-rule dispatch applies:

Registration Handler invocation
except_(handler) handler(QuentExcInfo(exc, root_value))
except_(handler, arg1, arg2) handler(arg1, arg2) -- QuentExcInfo NOT passed
except_(handler, key=val) handler(key=val) -- QuentExcInfo NOT passed

Access the exception via .exc and the root value via .root_value on the QuentExcInfo NamedTuple.

The root_value is normalized to None when the pipeline has no root value.

reraise=False (default): Handler's return value replaces the pipeline's result. Exception is consumed.

reraise=True: Handler runs for side-effects only, then the original exception is re-raised. Handler's return value is discarded.

Handler failure with reraise=True: If the handler raises an Exception, the handler's exception is discarded (with a RuntimeWarning), and the original exception is re-raised. If the handler raises a BaseException (e.g., KeyboardInterrupt), it propagates naturally.

Handler failure with reraise=False: The handler's exception propagates. The original pipeline exception is set as __cause__ via raise handler_exc from original_exc.

# Swallow exception, return fallback
Q(fetch_data).except_(lambda ei: 'fallback').run()

# Log and re-raise -- ei is QuentExcInfo(exc, root_value)
Q(fetch_data).except_(
  lambda ei: logger.error('Failed on %s: %s', ei.root_value, ei.exc),
  reraise=True,
).run()

# Catch specific exception types
Q(fetch_data).except_(
  handle_error,
  exceptions=ConnectionError,
).run()

BaseException subclasses

A RuntimeWarning is emitted if you configure except_() to catch BaseException subclasses that are not Exception subclasses (e.g., KeyboardInterrupt, SystemExit).

Control flow in except handlers

Using Q.return_() or Q.break_() inside an except handler raises QuentException (they would skip the handler's invariants). Q.exit_() is allowed -- it propagates outward unconditionally.

Exception: when a handler is registered on a nested Q, that nested Q absorbs its own Q.return_() -- the handler "returns" with the signal's value, and the outer pipeline continues. The QuentException trap applies to direct invocation in a handler position only.


finally_

q.finally_(fn, /, *args, **kwargs) -> Q

Register a cleanup handler. Only one finally_ per pipeline. Always runs regardless of success or failure.

Parameter Type Description
fn Callable Cleanup callable.
*args Any Positional arguments for fn.
**kwargs Any Keyword arguments for fn.

Returns: self (Q).

Raises:

  • TypeError if fn is not callable.
  • QuentException if finally_ is already registered.

Finally handler semantics

  • Receives the pipeline's root value (normalized to None if absent), not the current pipeline value.
  • Follows the standard calling conventions (not the except handler convention).
  • Return value is always discarded.
  • If the handler raises while an exception is active, the handler's exception replaces the original (preserved as __context__). If the in-flight item is a control-flow signal (Q.return_()/Q.break_()), the signal is preserved as __context__ on the handler's exception.
  • Q.return_()/Q.break_() inside the handler raises QuentException (they would skip finally's "always runs" guarantee). Q.exit_() is allowed -- it propagates outward unconditionally.
Q(acquire_resource).then(process).finally_(release_resource).run()

Async finally in sync pipelines: When a sync pipeline's finally handler returns a coroutine, the engine performs an async transition: run() returns a coroutine instead of a plain value. When the caller awaits this coroutine, the finally handler's coroutine is awaited first, and then the pipeline's result is returned (or the active exception is re-raised).


Execution Order

The full error handling flow:

  1. Pipeline steps execute sequentially.
  2. If a step raises matching exceptions:
    • Except handler runs (if registered).
    • reraise=False: handler's return value becomes the result; finally runs in success context.
    • reraise=True: original exception re-raised; finally runs in failure context.
  3. If a step raises a non-matching exception: exception propagates; finally runs in failure context.
  4. On success: finally runs in success context.
  5. Finally always runs last.

Execution Methods

run

q.run(v=<no value>, /, *args, **kwargs) -> Any

Execute the pipeline and return the final result.

Parameter Type Description
v Any Optional value injected into the pipeline. Overrides the root value if both are present.
*args Any Positional arguments for v if callable.
**kwargs Any Keyword arguments for v if callable.

Returns: The final pipeline value. Returns a coroutine if any step returned an awaitable (caller must await it). Returns None if no value was produced.

Raises:

  • TypeError if v is not callable but args/kwargs are provided.
  • QuentException if a control flow signal escapes the pipeline.
# Execute with the root value
Q(5).then(lambda x: x * 2).run()
# 10

# Inject a value at run time (overrides root)
q = Q().then(lambda x: x * 2)
q.run(5)   # 10
q.run(10)  # 20

Run value vs root value: When both exist, the run value replaces the root entirely. Q(A).then(B).run(C) is equivalent to Q(C).then(B).run().

Root value capture: The evaluated root/run value is captured as the "root value" for error handlers. except_() and finally_() receive this root value, not the current pipeline value at the point of failure.


__call__

q(v=<no value>, /, *args, **kwargs) -> Any

Alias for run(). A Q instance is directly callable:

q = Q(5).then(lambda x: x * 2)
q()  # 10

debug

q.debug(v=<no value>, /, *args, **kwargs) -> DebugResult | Coroutine[Any, Any, DebugResult]

Execute the pipeline with step-level instrumentation and return a DebugResult capturing the execution trace. The original pipeline is not modified — debug() clones the pipeline internally.

Parameter Type Description
v Any Optional value injected into the pipeline (same as run()).
*args Any Positional arguments for v if callable.
**kwargs Any Keyword arguments for v if callable.

Returns: A DebugResult (sync pipelines) or a coroutine resolving to a DebugResult (async pipelines).

Raises: QuentException if if_() or while_() is pending.

DebugResult fields:

Field / Property Type Description
value Any The pipeline's final result (same as run() would return).
steps list[StepRecord] Ordered list of step records.
elapsed_ns int Total wall-clock nanoseconds.
succeeded bool True if all steps completed without error.
failed bool True if any step raised an exception.
print_trace(file=None) method Print a formatted execution trace table.

StepRecord fields (frozen dataclass):

Field / Property Type Description
step_name str Name of the step.
input_value Any Current value passed to the step.
result Any Value produced by the step.
elapsed_ns int Wall-clock nanoseconds for this step.
exception BaseException \| None Exception raised, or None.
ok bool True if the step succeeded.
result = Q(5).then(lambda x: x * 2).then(str).debug()
print(result.value)       # '10'
print(result.elapsed_ns)  # total nanoseconds
result.print_trace()      # formatted table to stderr

# Async pipeline
result = await Q(fetch).then(parse).debug(url)

DebugResult and StepRecord

These types are not exported in __all__. They are accessible from the quent._debug module or via the return value of debug().

Exception behavior

If the pipeline raises during debug(), the exception propagates normally — debug() does not suppress errors.


Iteration Methods

iterate

q.iterate(fn=None) -> QuentIterator

Return a dual sync/async iterator over the pipeline's output. The pipeline is executed when iteration begins (not when iterate() is called). Each element of the iterable result is yielded.

Parameter Type Description
fn Callable \| None Optional transform applied to each element. None = yield as-is.

Returns: QuentIterator -- supports both __iter__ (for for loops) and __aiter__ (for async for loops).

# Sync iteration
for item in q.iterate():
  process(item)

# Async iteration
async for item in q.iterate():
  await process(item)

# With transform
for name in q.iterate(lambda item: item['name']):
  print(name)

Error behavior:

  • If fn returns an awaitable during sync iteration (for), a TypeError is raised directing the user to use async for.
  • Exceptions from fn propagate directly to the caller at the iteration point -- they are NOT covered by the pipeline's except_() handlers.

Control flow in iteration:

  • return_(v): Yields the value (if provided) as a final item before stopping. Previously yielded items are preserved.
  • break_(v): Yields the value (if provided) before stopping. Without a value, stops immediately.

Callable reuse

The returned iterator is callable. Calling it with arguments creates a new iterator with those arguments as the run-time parameters:

it = q.iterate(fn)
for item in it:          # runs pipeline with no arguments
  ...
for item in it(value):   # runs pipeline with `value` as run value
  ...

iterate_do

q.iterate_do(fn=None) -> QuentIterator

Like iterate() but fn's return values are discarded. The original elements are yielded.

Parameter Type Description
fn Callable \| None Optional side-effect callable.

Returns: QuentIterator.

for item in q.iterate_do(print):
  process(item)

flat_iterate

q.flat_iterate(fn=None, *, flush=None) -> QuentIterator

Return a dual sync/async flatmap iterator over the pipeline's output. Each element of the pipeline's iterable result is either iterated directly (when fn is None, flattening one level of nesting) or transformed by fn into a sub-iterable whose items are individually yielded.

Parameter Type Default Description
fn Callable \| None None Optional callable that receives each element and returns an iterable. Each item from the returned iterable is yielded individually. When None, each source element is iterated directly (flattening one level).
flush Callable \| None None Optional zero-argument callable invoked once after the source iterable is fully consumed. Must return an iterable; each item is yielded into the stream. Intended for emitting buffered or remaining items after the source ends (e.g., flushing a codec buffer).

Returns: QuentIterator -- supports both __iter__ (for for loops) and __aiter__ (for async for loops).

# Flatten one level of nesting (fn=None)
for item in Q([[1, 2], [3, 4]]).flat_iterate():
  print(item)  # 1, 2, 3, 4

# Transform each element into a sub-iterable
for word in Q(['hello world', 'foo bar']).flat_iterate(str.split):
  print(word)  # hello, world, foo, bar

# With flush -- emit remaining buffered items after source ends
buffer = []
def chunk(item):
  buffer.append(item)
  if len(buffer) >= 3:
    result, buffer[:] = buffer[:], []
    return result
  return []

def flush_buffer():
  return buffer

for chunk in Q(range(7)).flat_iterate(chunk, flush=flush_buffer):
  print(chunk)

All iteration behavior -- sync/async support, error handling, deferred finally_(), control flow, iterator reuse -- matches iterate().

Error behavior:

  • Exceptions from fn propagate to the caller at the iteration point, as with iterate().
  • If flush() raises, the exception propagates at the iteration point.
  • If fn or flush returns an awaitable during sync iteration (for), a TypeError is raised directing the user to use async for.

flat_iterate_do

q.flat_iterate_do(fn=None, *, flush=None) -> QuentIterator

Like flat_iterate(), but fn runs as a side-effect -- its returned iterable is fully consumed (driving side-effects) but not yielded. The original source elements are yielded instead.

Parameter Type Default Description
fn Callable \| None None Optional side-effect callable. Its returned iterable is consumed but discarded.
flush Callable \| None None Optional zero-argument callable. Output is yielded normally (not discarded).

Returns: QuentIterator.

for item in Q([[1, 2], [3]]).flat_iterate_do(lambda sub: [print(x) for x in sub]):
  print('source:', item)
  # prints each sub-item via fn, then yields the original source element

When fn is None, behaves identically to flat_iterate() with no fn (flattens one level). The flush output is always yielded (the "do" discard semantic applies only to fn's results).


buffer

q.buffer(n, /) -> Q

Attach a backpressure-aware bounded buffer for use with iteration terminals (iterate(), iterate_do(), flat_iterate(), flat_iterate_do()). The buffer decouples producer and consumer, enabling the producer to run ahead up to n items while the consumer processes them.

Parameter Type Description
n int Maximum number of items the buffer can hold. Must be a positive integer.

Returns: self (Q). Does not add a pipeline step — it is a pipeline-level modifier.

Raises:

  • ValueError if n is zero or negative.
  • TypeError if n is not an integer (booleans are rejected even though bool is a subclass of int).
  • QuentException if run() is called instead of an iteration terminal.
  • QuentException if called while an if_() or while_() is pending.

Behavior:

  • Sync (for): the producer runs in a background daemon thread using queue.Queue(maxsize=n).
  • Async (async for): the producer runs as a background asyncio.Task using asyncio.Queue(maxsize=n).
  • Items are delivered in order (FIFO).
  • When the buffer is full, the producer blocks (backpressure). When empty, the consumer blocks.
  • If the producer raises, the exception is propagated to the consumer at the next read.
  • If the consumer exits early (break, GeneratorExit), the producer is signaled to stop.
# Producer runs ahead up to 10 items
for item in Q(produce).buffer(10).iterate():
  process(item)

# With transformation
for item in Q(produce).buffer(5).iterate(transform):
  consume(item)

# Async usage
async for item in Q(async_produce).buffer(10).iterate():
  await process(item)

Interaction with other features

The buffer size is preserved across clone() and iterator reuse. When both buffer() and a deferred with_ are active, the buffer wraps the iterable after the context manager is entered.


Deferred with_ in Iteration

When .with_(fn) or .with_do(fn) is the last pipeline step before an iteration terminal (.iterate(), .iterate_do(), .flat_iterate(), .flat_iterate_do()), context manager entry is deferred to iteration time. The context manager remains open for the entire duration of iteration and is exited when iteration ends.

Only .with_(fn) is supported

Bare .with_() (no argument) is prohibited. Only .with_(fn) with an explicit callable triggers deferred context manager wrapping.

Why deferral? Without deferral, .with_() would enter the CM during the pipeline's run() phase and exit it before iteration begins -- the resource would be closed before any items are consumed. Deferral keeps the CM open throughout iteration, matching the natural lifetime of with blocks in Python.

Lifecycle:

  1. The pipeline runs normally, producing a value that must be a context manager.
  2. At iteration start, the CM is entered via __enter__() (or __aenter__()).
  3. If .with_(fn) was used: fn is invoked with the context value per the standard calling convention. The result becomes the iterable for iteration.
  4. If .with_do(fn) was used: fn runs as a side-effect (result discarded); the CM object itself becomes the iterable (it must be iterable).
  5. Iteration proceeds with the CM open.
  6. The CM is exited in the generator's finally: block, guaranteeing cleanup on all exit paths (normal exhaustion, break, exceptions, generator .close()).

CM exit semantics:

  • Normal completion / source exhausted: __exit__(None, None, None).
  • break, return_(), break_() (control flow): __exit__(None, None, None) -- control flow signals are not errors.
  • Generator .close() / GeneratorExit: __exit__(None, None, None).
  • Exception during iteration: __exit__(*sys.exc_info()) -- the CM receives the exception. If __exit__ returns truthy, the exception is suppressed.

Ordering with deferred finally_(): When both a deferred with_ and a deferred finally_() are active, the CM exits first, then the deferred finally_() runs. The deferred finally runs even if __exit__ raises.

# File stays open for entire iteration, then closes
for line in Q(open, 'data.txt').with_(lambda f: f).iterate(str.strip):
  process(line)

# Async context manager with deferred cleanup
async for row in Q(db.connect).with_(lambda conn: conn.cursor()).iterate():
  process(row)

Class Methods

from_steps

Q.from_steps(*steps) -> Q

Construct a pipeline from a sequence of steps, each appended via .then().

Parameter Type Description
*steps Any Variadic positional arguments. Each becomes a .then() step. If a single argument is passed and it is a list or tuple, it is unpacked as the step sequence.

Returns: A new Q instance with no root value.

Equivalence: Q.from_steps(a, b, c) is equivalent to Q().then(a).then(b).then(c).

Steps can be callables, literal values, or nested Q pipelines -- anything .then() accepts. Q.from_steps() with no arguments returns an empty pipeline (equivalent to Q()).

# Variadic form
pipeline = Q.from_steps(validate, normalize, str.upper)
pipeline.run('  hello  ')

# List form -- useful for dynamic pipeline construction
steps = [validate, normalize, str.upper]
pipeline = Q.from_steps(steps)
pipeline.run('  hello  ')

# Dynamic pipeline from a plugin registry
plugins = load_plugins()
pipeline = Q.from_steps([p.transform for p in plugins])

Reuse Methods

clone

q.clone() -> Q

Create an independent copy of this pipeline.

Returns: A new Q of the same type (subclass-safe).

What is copied:

  • Pipeline structure (all step nodes) -- deep-copied. The clone has its own independent linked list.
  • Nested pipelines within steps are recursively cloned.
  • Conditional operations (if_/else_) are deep-copied.
  • Error handler step nodes are cloned. Handler callables that are Q instances are recursively cloned; non-pipeline callables are shared by reference.
  • Keyword argument dictionaries are shallow-copied (mutable). Positional argument tuples are shared (immutable).

What is shared by reference:

  • All callables (except Q instances, which are always recursively cloned).
  • Values and argument objects.
  • Exception type tuples for except_().
base = Q(fetch).then(validate)
branch_a = base.clone().then(transform_a)
branch_b = base.clone().then(transform_b)

Clones always behave as top-level pipelines, regardless of whether the original was nested.


decorator

q.as_decorator() -> Callable[..., Callable[..., Any]]

Wrap the pipeline as a function decorator. The decorated function's return value becomes the pipeline's input. The pipeline is cloned internally.

Returns: A decorator function.

@Q().then(lambda x: x.strip()).then(str.upper).as_decorator()
def get_name():
  return '  alice  '

get_name()  # 'ALICE'

The decorated function preserves its original signature via functools.wraps. Control flow signals that escape the decorated pipeline are caught and wrapped in QuentException.


Control Flow (Class Methods)

The three signals map onto Python control-flow analogies:

Signal Python analogy Scope
Q.return_() return exits the current Q only
Q.break_() labeled break to nearest loop propagates out to the nearest enclosing iteration scope
Q.exit_() sys.exit() propagates through everything; absorbed only at the outermost run()

Must use return

All three signals raise an internal exception. Always write return Q.return_(...) / return Q.break_(...) / return Q.exit_(...) so linters don't flag subsequent code as unreachable. (The mechanism does not require return -- the call raises -- but the convention satisfies type checkers and readers.)

return_

Q.return_(v=<no value>, /, *args, **kwargs) -> NoReturn

Return from the current Q (Python-return-style). The optional value becomes that Q's result. If the current Q is nested as a step in an outer Q, the outer pipeline continues with the returned value as the nested step's result. If the current Q is the outermost run(), the value becomes run()'s return.

Parameter Type Description
v Any Return value. When Null, the result is None.
*args Any Positional arguments for v if callable.
**kwargs Any Keyword arguments for v if callable.

Value semantics:

Form Result
Q.return_() None
Q.return_(42) 42 (non-callable as-is)
Q.return_(fn) fn() lazily
Q.return_(fn, *args, **kwargs) fn(*args, **kwargs) lazily

If the lazy form's fn raises a control-flow signal, it is wrapped as QuentException (signals inside lazy values are misuse).

Nested pipeline propagation: Each Q boundary absorbs its own Q.return_(). To exit the entire top-level pipeline from a nested Q, use Q.exit_().

Changed in 7.0.0

Pre-7.0, Q.return_() propagated to the outermost run(). It now exits the current Q only. See Migrating to 7.0.

Carve-outs:

  • Inside except_/finally_ handler: raises QuentException (would skip handler invariants). Exception: when the handler is registered on a nested Q, that nested Q absorbs the signal locally.
  • Inside a gather() worker: returns from the worker -- value becomes that gather position's tuple element.
  • Inside a drive_gen fn: returns from fn -- value becomes the pipeline CV; subsequent steps run.

break_

Q.break_(v=<no value>, /, *args, **kwargs) -> NoReturn

Stop the nearest enclosing iteration scope (labeled-break-style). Iteration scopes are: foreach, foreach_do, iterate, iterate_do, flat_iterate, flat_iterate_do, while_. Q.break_() propagates outward through Q boundaries, if_/else_*, with_/with_do, drive_gen, and other non-iteration scopes until iteration catches it.

Parameter Type Description
v Any Break value.
*args Any Positional arguments for v if callable.
**kwargs Any Keyword arguments for v if callable.

Value semantics:

Form Behavior
Q.break_() Stops iteration. foreach/foreach_do: returns results collected so far. iterate*: generator completes without further yields. while_: result is the current loop value.
Q.break_(value) As above, plus: foreach/foreach_do -- value appended to results; iterate* -- value yielded as one final item before stopping; while_ -- value replaces the loop value (not append/yield).
Q.break_(fn, ...) fn(...) lazy; resulting value handled per the row above. Awaitable result triggers async transition; awaited before append/yield/replace.
Q([1, 2, 3, 4, 5]).foreach(
  lambda x: Q.break_() if x == 3 else x * 2
).run()
# [2, 4]  -- partial results up to the break

Q([1, 2, 3, 4, 5]).foreach(
  lambda x: Q.break_(x * 10) if x == 3 else x * 2
).run()
# [2, 4, 30]  -- break value appended to partial results

Concurrent iteration break: The break from the earliest input index wins. Results from later indices are discarded.

Priority in concurrent operations: return_() > break_() > regular exceptions.

Escape: If Q.break_() reaches the outermost run() without an enclosing iteration scope, it is wrapped as QuentException.

Carve-outs: Trapped (wraps as QuentException) inside except_/finally_ handlers and gather() workers. Everywhere else propagates per the rules above.

Changed in 7.0.0

Q.break_() now propagates through if_() predicates, with_/with_do bodies, drive_gen fn, and nested-Q step boundaries (CM __exit__ still runs cleanly, generators still close). Pre-7.0 these positions raised QuentException.


exit_

Q.exit_(v=<no value>, /, *args, **kwargs) -> NoReturn

New in 7.0.0. Exit the entire top-level pipeline (sys.exit()-style). Propagates through every Q boundary, every signal carve-out (except_/finally_/gather/drive_gen), and every level of nesting. Absorbed only at the outermost run(), whose return value becomes the signal's value.

Parameter Type Description
v Any Exit value. When Null, the pipeline returns None.
*args Any Positional arguments for v if callable.
**kwargs Any Keyword arguments for v if callable.

Value semantics: same forms as Q.return_() (Q.exit_(), Q.exit_(42), Q.exit_(fn), Q.exit_(fn, *args, **kwargs)). Lazy fn is evaluated at the outermost run()'s catch frame.

Cleanup respects Python try/finally semantics: as _Exit propagates, every finally_() runs, every CM's __exit__ runs, every drive_gen generator closes, every concurrent task cancels and awaits. Resources release. Only after all cleanup unwinds does _Exit reach the outermost run().

If the lazy form's fn raises a control-flow signal, it is wrapped as QuentException (signals inside lazy values are misuse).

When to prefer Q.exit_() over Q.return_()/Q.break_(): when the intent is "exit the entire pipeline from arbitrary depth, regardless of nesting or handler scope". If returning from the current Q suffices, use Q.return_(). If exiting the nearest iteration suffices, use Q.break_().

inner = Q().foreach(lambda x: Q.exit_('done') if x == 5 else x)
outer = Q(range(10)).then(inner).then(lambda r: ['POST', r])
outer.run()
# 'done'  -- exit_() bypasses inner's run(), bypasses outer's then(), absorbed at outermost

Instrumentation

on_step

Q.on_step: ClassVar[Callable[[Q, str, Any, Any, int, BaseException | None], None] | None] = None

Class-level callback for pipeline execution instrumentation. Called after each pipeline step completes (or fails).

Argument Type Description
q Q The Q instance being executed.
step_name str Method name: 'root', 'then', 'do', 'foreach', 'foreach_do', 'gather', 'with_', 'with_do', 'if_', 'while_', 'drive_gen', 'except_', or 'finally_'.
input_value Any The input value the step received.
result Any The value produced by the step. None on failure.
elapsed_ns int Wall-clock nanoseconds via time.perf_counter_ns().
exception BaseException \| None The exception raised by the step, or None on success. Fires before the pipeline's except_ handler runs.

Zero overhead when disabled: When on_step is None (default), no timing or callback dispatch occurs. The code path is short-circuited entirely.

Not called for control flow signals: on_step does not fire for control flow signals (return_(), break_(), exit_()).

Error handling: If the callback raises, it is logged at WARNING level and pipeline execution continues uninterrupted.

Thread safety: on_step is class-level. Set it before concurrent pipeline execution begins. Subclass overrides are respected.

Q.on_step = lambda q, step, inp, result, ns, exc: print(f'{step}: {ns/1e6:.1f}ms{" FAIL" if exc else ""}')
Q(5).then(lambda x: x * 2).run()
Q.on_step = None  # disable

Context API

Pipeline steps are positional -- each step receives only the current value from the immediately preceding step. When non-adjacent steps need to share data (e.g., an early step produces a value that a later step needs, but intermediate transformations change the current value), the context API provides named storage scoped to the execution context, accessible from any step without altering the pipeline's value flow.

Storage is backed by a ContextVar-based dictionary. Each set() creates a new dict (copy-on-write), ensuring concurrent workers (via foreach/gather with concurrency) are properly isolated -- a worker's set() does not affect the parent or sibling workers.

set (instance -- pipeline step)

q.set(key: str) -> Q
q.set(key: str, value: Any) -> Q

Append a pipeline step that stores a value under key in the execution context. The current pipeline value is not changed (like .do()).

Form Behavior
q.set(key) Stores the current pipeline value under key.
q.set(key, value) Stores the explicit value under key.

Returns: self (Q).

result = (
  Q(fetch_user)
  .set('user')                        # store current value (the user) in context
  .set('source', 'api')              # store explicit value 'api' under 'source'
  .then(validate_permissions)         # transform continues with original user
  .get('user')                        # retrieve original user
  .then(format_response)
  .run(user_id)
)

set (class -- immediate)

Q.set(key: str, value: Any) -> None

Store a value in the execution context immediately. This is not a pipeline step -- it takes effect at the call site, not during run().

Q.set('config', load_config())    # pre-populate context before running pipelines

get (instance -- pipeline step)

q.get(key: str) -> Q
q.get(key: str, default: Any) -> Q

Append a pipeline step that retrieves the value stored under key from the execution context. The retrieved value replaces the current value (like .then()).

Scenario Behavior
Key found Stored value becomes the new current value.
Key not found, no default Raises KeyError at execution time.
Key not found, default provided Default becomes the new current value.

Returns: self (Q).

result = (
  Q(fetch_user)
  .set('user')                        # store user in context
  .then(transform)                    # current value changes
  .get('user')                        # retrieve original user -- becomes current value
  .then(format_response)
  .run(user_id)
)

get (class -- immediate)

Q.get(key: str, default: Any = <missing>) -> Any

Retrieve a value from the execution context immediately. This is not a pipeline step.

  • If key is found: returns the stored value.
  • If key is not found and no default: raises KeyError.
  • If key is not found and default provided: returns default.
Q.set('config', load_config())
result = (
  Q(fetch_data)
  .then(lambda data: process(data, Q.get('config')))
  .run()
)

Dual dispatch

Both set and get are Python descriptors that dispatch differently based on instance vs. class access. Instance access (q.set(...) / q.get(...)) appends a pipeline step. Class access (Q.set(...) / Q.get(...)) operates on context immediately.

if_() constraint

.set() and .get() do not consume a pending if_(). Calling them while if_() is pending raises QuentException.


Dunder Methods

__bool__

q.__bool__() -> bool

Always returns True. A Q instance is always truthy.


__repr__

q.__repr__() -> str

Returns the pipeline visualization (multiline, indented) — the same format used in traceback injection, but without the <---- error marker. When .name(label) has been called, renders as Q[label](root). Respects QUENT_TRACEBACK_VALUES=0.

repr(Q(fetch_data).then(validate).do(log))
# Q(fetch_data)
#     .then(validate)
#     .do(log)

Copying

copy.copy() and copy.deepcopy() are blocked on Q instances (TypeError). A shallow or deep copy would produce a broken object with shared linked-list structure. Use .clone() to produce a correct independent copy.

Pickling is not blocked — most pipeline contents (lambdas, closures, bound methods) will naturally fail to pickle, but quent does not enforce this.


QuentIterator

from quent import QuentIterator

The iterator object returned by q.iterate() and q.iterate_do(). Supports both __iter__ (sync) and __aiter__ (async). Is callable -- calling with arguments creates a new iterator with those arguments as run-time parameters.


QuentExcInfo

from quent import QuentExcInfo

A NamedTuple with two fields passed to except_() handlers as their current value:

Field Type Description
exc BaseException The caught exception.
root_value Any The pipeline's evaluated root value, normalized to None when absent.
from quent import Q, QuentExcInfo

def handle_error(ei: QuentExcInfo):
  print(f'Failed on {ei.root_value}: {ei.exc}')
  return 'fallback'

Q(42).then(lambda x: 1/0).except_(handle_error).run()
# prints: Failed on 42: division by zero
# returns: 'fallback'

Null Sentinel (Internal)

Null is an internal implementation detail — it is not part of __all__ and should not be imported directly. It is used internally to distinguish "no value was provided" from None. It is never exposed to user code -- run() returns None (not Null) when no value is produced, and handlers always receive None when no root value exists. You may see <Null> mentioned in tracebacks or debug output; it indicates the absence of a value rather than a None value.


QuentException

from quent import QuentException

Subclass of Exception raised for quent API misuse. Never raised for errors in user code.

Error Cause
Duplicate handler Second except_() or finally_() on same pipeline
Escaped signal _Break escaped past the outermost run() with no enclosing iteration scope
break_() outside iteration Used outside any iteration scope (foreach, foreach_do, iterate, iterate_do, flat_iterate, flat_iterate_do, while_)
else_() without if_() No immediately preceding if_()
Pending if_() run() called while if_() is pending (no .then()/.do() consumed it)
Control flow in handlers return_()/break_() in except_/finally_ (exit_() is allowed)
Signal in lazy value Lazy fn passed to Q.return_(fn)/Q.break_(fn)/Q.exit_(fn) raised a control-flow signal
break_() in gather() worker gather() is concurrent fan-out, not an iteration scope
Visualization depth truncated Nested pipeline visualization truncated at depth 50 (visualization-only, not an execution limit)

Calling Conventions

Standard Calling Conventions (2 Rules)

Applied in priority order. First match wins.

Priority Rule Trigger Invocation
1 Explicit Args Args/kwargs provided at registration fn(*args, **kwargs) -- current value NOT passed
2 Default None of the above Callable + value: fn(cv). Callable + no value: fn(). Non-callable: returned as-is

Nested pipelines are detected via duck-typing (_quent_is_chain) and execute with the current value as input.

Constraints:

  • Providing args/kwargs to a non-callable raises TypeError at build time.

Except Handler Calling Convention

Uses the standard 2-rule convention. The handler's current value is a QuentExcInfo(exc, root_value) NamedTuple:

Registration Handler invocation
except_(handler) handler(QuentExcInfo(exc, root_value))
except_(handler, arg1, arg2) handler(arg1, arg2) -- QuentExcInfo NOT passed
except_(handler, key=val) handler(key=val) -- QuentExcInfo NOT passed

The root_value is normalized to None when the pipeline has no root value.

Finally Handler Calling Convention

Follows the standard calling conventions (2 rules). Receives the root value (normalized to None if absent) as its current value. Return value is always discarded.


Enhanced Tracebacks

Pipeline Visualization

When an exception occurs during pipeline execution, a synthetic <quent> frame is injected into the traceback:

Traceback (most recent call last):
  File "<quent>", line 1, in
    Q(fetch_data)
    .then(validate)
    .then(transform) <----
    .do(log)
ValueError: invalid data

The <---- marker points to the step that raised the exception. Nested pipelines are rendered with increasing indentation.

Internal Frame Cleaning

All quent-internal frames are removed from tracebacks. Only user code and the synthetic <quent> frame are shown. Cleaning applies recursively to __cause__, __context__, and ExceptionGroup sub-exceptions.

Exception Notes (Python 3.11+)

A concise note is attached: quent: exception at .then(validate) in Q(fetch_data). Attached once per exception (idempotent).

Hook Patching

Two patches installed at import time:

  • sys.excepthook -- for uncaught exceptions.
  • traceback.TracebackException.__init__ -- for logging, traceback.print_exc(), etc.

Visualization Limits

  • Nesting depth: 50
  • Links per level: 100
  • Total length: 10,000 characters
  • Total recursive calls: 500 per level

Repr Sanitization (CWE-117)

All repr() output in visualizations is sanitized: ANSI escape sequences stripped, Unicode control characters stripped, length truncated to 200 characters.


Alternative Event Loops (Trio & Curio)

quent supports asyncio, trio, and curio event loops. Async pipelines work transparently under any of these runtimes -- no configuration or adapter code is required.

How It Works

Event loop detection uses sys.modules lookups to check whether a runtime's loop is active. This adds zero overhead when a library is not loaded (~50ns dict lookup returning None). Detection order:

  1. asyncio -- checked first via the C-level asyncio._get_running_loop() for performance.
  2. trio -- detected via trio.lowlevel.current_trio_token() (only probed if trio.lowlevel is in sys.modules).
  3. curio -- detected via curio.meta.curio_running() (only probed if curio.meta is in sys.modules).

Dual-Protocol Preference

When a pipeline value supports both sync and async protocols -- context managers (__enter__/__exit__ vs __aenter__/__aexit__) or iterables (__iter__ vs __aiter__) -- and any async event loop is running, the async protocol is preferred. This applies uniformly across asyncio, trio, and curio.

Usage

No special API is needed. Use the same Q API under any runtime:

import trio
from quent import Q

async def async_double(x):
  return x * 2

async def main():
  result = await Q(5).then(async_double).run()
  print(result)  # 10

trio.run(main)
import curio
from quent import Q

async def main():
  result = await Q(10).then(lambda x: x + 1).run()
  print(result)  # 11

curio.run(main)

Dual-protocol context managers and iterables automatically use the async protocol under trio and curio, just as they do under asyncio:

import trio
from quent import Q

# Dual-protocol CM uses __aenter__/__aexit__ under trio
async def main():
  result = await Q(dual_protocol_cm).with_(lambda ctx: ctx).run()

trio.run(main)

Differences from asyncio

  • Concurrency operations (gather, foreach with concurrency): The async concurrent path uses asyncio.Semaphore and asyncio.TaskGroup (or asyncio.gather on 3.10). Under trio or curio, async steps still work correctly for sequential execution, but the concurrent async path relies on asyncio primitives. For concurrent async pipelines under trio or curio, ensure the steps are compatible with asyncio task scheduling.
  • Event loop detection is lightweight and non-invasive -- it never imports trio or curio, only checks sys.modules for already-loaded modules.

Environment Variables

Variable Values Effect
QUENT_NO_TRACEBACK 1, true, yes (case-insensitive) Disable all traceback modifications. Must be set before import.
QUENT_TRACEBACK_VALUES 0, false, no (case-insensitive) Suppress argument values in visualizations (show type placeholders instead).

Debug Logging

The 'quent' logger emits debug-level messages at key execution points:

  • Pipeline start/completion
  • Step completion with result
  • Async transition
  • Step failure

Gated by _log.isEnabledFor(DEBUG) -- zero overhead when not at DEBUG level. Respects QUENT_TRACEBACK_VALUES=0 for value suppression.


Version

from quent import __version__

Package version string via importlib.metadata.version('quent').


Complete Signatures

# Constructor
Q(v=<no value>, /, *args, **kwargs)

# Pipeline building
q.then(v, /, *args, **kwargs) -> Q
q.do(fn, /, *args, **kwargs) -> Q
q.foreach(fn=None, /, *, concurrency=None, executor=None) -> Q
q.foreach_do(fn, /, *, concurrency=None, executor=None) -> Q
q.gather(*fns, concurrency=-1, executor=None) -> Q
q.with_(fn, /, *args, **kwargs) -> Q
q.with_do(fn, /, *args, **kwargs) -> Q
q.if_(predicate=None, /, *args, **kwargs) -> Q  # follow with .then()/.do()
q.else_(v, /, *args, **kwargs) -> Q
q.else_do(fn, /, *args, **kwargs) -> Q
q.name(label, /) -> Q

# Error handling
q.except_(fn, /, *args, exceptions=None, reraise=False, **kwargs) -> Q
q.finally_(fn, /, *args, **kwargs) -> Q

# Execution
q.run(v=<no value>, /, *args, **kwargs) -> Any
q(v=<no value>, /, *args, **kwargs) -> Any  # alias for run()

# Iteration
q.iterate(fn=None) -> QuentIterator
q.iterate_do(fn=None) -> QuentIterator
q.flat_iterate(fn=None, *, flush=None) -> QuentIterator
q.flat_iterate_do(fn=None, *, flush=None) -> QuentIterator

# Context API (instance -- pipeline steps)
q.set(key: str) -> Q
q.set(key: str, value: Any) -> Q
q.get(key: str) -> Q
q.get(key: str, default: Any) -> Q

# Context API (class -- immediate)
Q.set(key: str, value: Any) -> None
Q.get(key: str, default: Any = <missing>) -> Any

# Reuse
q.clone() -> Q
q.as_decorator() -> Callable
Q.from_steps(*steps) -> Q

# Control flow (class methods)
Q.return_(v=<no value>, /, *args, **kwargs) -> NoReturn  # exits current Q
Q.break_(v=<no value>, /, *args, **kwargs) -> NoReturn   # exits nearest iteration scope
Q.exit_(v=<no value>, /, *args, **kwargs) -> NoReturn    # exits entire top-level pipeline (new in 7.0.0)

# Instrumentation (class attribute)
Q.on_step: Callable[[Q, str, Any, Any, int, BaseException | None], None] | None = None