Cookbook¶
Real-world examples grouped by domain. Each example shows how quent solves a concrete problem, with code you can adapt to your own projects.
Looking for quick copy-paste snippets?
See Recipes for shorter, pattern-focused examples.
Data Processing¶
CSV to JSON Transform¶
Read a CSV file, filter rows, normalize fields, and output JSON.
import csv
import json
from quent import Q
def read_csv(path):
with open(path) as f:
return list(csv.DictReader(f))
def normalize_record(record):
return {
'id': int(record['id']),
'name': record['name'].strip().title(),
'email': record['email'].strip().lower(),
'active': record.get('status', '') == 'active',
}
result = (
Q('users.csv')
.then(read_csv)
.then(lambda records: [r for r in records if r.get('email')]) # drop rows without email
.foreach(normalize_record)
.then(lambda records: json.dumps(records, indent=2))
.run()
)
Aggregation Pipeline¶
Group, aggregate, and sort data in a single pipeline.
from collections import Counter
from quent import Q
def group_by_category(items):
groups = {}
for item in items:
groups.setdefault(item['category'], []).append(item)
return groups
def aggregate(groups):
return {
cat: {'count': len(items), 'total': sum(i['price'] for i in items)}
for cat, items in groups.items()
}
summary = (
Q(fetch_sales_data)
.then(lambda items: [item for item in items if item['price'] > 0])
.then(group_by_category)
.then(aggregate)
.then(lambda agg: sorted(agg.items(), key=lambda x: x[1]['total'], reverse=True))
.run()
)
API Orchestration¶
Multiple Endpoint Calls with Error Recovery¶
Fetch data from several APIs, merge results, and handle partial failures.
from quent import Q
def fetch_user_data(user_id):
return (
Q(user_id)
.gather(
lambda uid: api.get(f'/users/{uid}'),
lambda uid: api.get(f'/users/{uid}/orders'),
lambda uid: api.get(f'/users/{uid}/preferences'),
concurrency=2,
)
.then(lambda r: {
'user': r[0],
'orders': r[1],
'preferences': r[2],
})
.except_(
lambda ei: {'error': str(ei.exc), 'user_id': ei.root_value},
)
.run()
)
# Sync
data = fetch_user_data(42)
# Async (if api.get is async)
data = await fetch_user_data(42)
Request Pipeline with Middleware¶
Build a reusable request pipeline with composable middleware.
from quent import Q
def add_headers(req, headers):
return {**req, 'headers': {**req.get('headers', {}), **headers}}
def add_auth(req, token):
return {
**req,
'headers': {**req.get('headers', {}), 'Authorization': f'Bearer {token}'},
}
result = (
Q({'method': 'GET', 'url': '/api/users'})
.then(add_headers, {'Accept': 'application/json'})
.then(add_auth, auth_token)
.do(lambda req: logger.info('Request: %s %s', req['method'], req['url']))
.then(send_request)
.then(lambda resp: resp.json())
.run()
)
Database Operations¶
Connection Management with with_()¶
Use .with_() to manage database connections as context managers.
from quent import Q
def query_users(db):
return db.execute('SELECT * FROM users WHERE active = 1').fetchall()
def format_users(rows):
return [{'id': r[0], 'name': r[1], 'email': r[2]} for r in rows]
# sqlite3 (sync)
import sqlite3
users = (
Q(lambda: sqlite3.connect('app.db'))
.with_(lambda db: (
Q(db)
.then(query_users)
.then(format_users)
.run()
))
.run()
)
# aiosqlite (async) -- same pipeline structure
import aiosqlite
users = await (
Q(aiosqlite.connect, 'app.db')
.with_(lambda db: (
Q(db)
.then(query_users)
.then(format_users)
.run()
))
.run()
)
File Processing¶
Read, Transform, Write Pipeline¶
Process files in a pipeline with guaranteed cleanup.
from quent import Q
def transform_lines(text):
lines = text.strip().splitlines()
return '\n'.join(
line.upper()
for line in lines
if line.strip() and not line.startswith('#')
)
def process_file(input_path, output_path):
return (
Q(input_path)
.then(open)
.with_(lambda f: f.read())
.then(transform_lines)
.do(lambda content: (
Q(output_path)
.then(lambda p: open(p, 'w'))
.with_(lambda f: f.write(content))
.run()
))
.run()
)
Configuration Pipelines¶
Load, Validate, Merge¶
Load configuration from multiple sources, validate, and merge into a final config.
import json
import os
from quent import Q
def load_file(path):
with open(path) as f:
return json.load(f)
def load_env_overrides(config):
overrides = {}
for key in config:
env_key = f'APP_{key.upper()}'
if env_key in os.environ:
overrides[key] = os.environ[env_key]
return {**config, **overrides}
def validate_config(config):
required = ['database_url', 'secret_key']
missing = [k for k in required if k not in config]
if missing:
return Q.return_({'error': f'Missing keys: {missing}'})
return config
config = (
Q('config.json')
.then(load_file)
.then(load_env_overrides)
.then(validate_config)
.except_(lambda ei: {'error': str(ei.exc)})
.run()
)
Testing Patterns¶
Unit Testing Pipelines with Mocks¶
Q pipelines work naturally with unittest.mock -- swap callables with mocks to test in isolation.
import unittest
from unittest.mock import AsyncMock, MagicMock
from quent import Q
def build_pipeline(fetch, transform, save):
return (
Q()
.then(fetch)
.then(transform)
.do(save)
)
class TestPipeline(unittest.IsolatedAsyncioTestCase):
async def test_sync_pipeline(self):
fetch = MagicMock(return_value={'id': 1, 'raw': 'data'})
transform = MagicMock(return_value={'id': 1, 'processed': True})
save = MagicMock()
pipeline = build_pipeline(fetch, transform, save)
result = pipeline.run(42)
self.assertEqual(result, {'id': 1, 'processed': True})
fetch.assert_called_once_with(42)
save.assert_called_once()
async def test_async_pipeline(self):
fetch = AsyncMock(return_value={'id': 1, 'raw': 'data'})
transform = AsyncMock(return_value={'id': 1, 'processed': True})
save = AsyncMock()
pipeline = build_pipeline(fetch, transform, save)
result = await pipeline.run(42)
self.assertEqual(result, {'id': 1, 'processed': True})
fetch.assert_called_once_with(42)
save.assert_called_once()
async def test_error_handling(self):
fetch = MagicMock(side_effect=ValueError('bad input'))
result = (
Q(1)
.then(fetch)
.except_(lambda ei: {'error': str(ei.exc), 'input': ei.root_value})
.run()
)
self.assertEqual(result, {'error': 'bad input', 'input': 1})
Key points:
- Same pipeline, both mock types.
MagicMockfor sync,AsyncMockfor async -- the pipeline handles either transparently. .do()observers let you inspect intermediate values without altering the pipeline flow.except_()handlers are testable by making mocks raise.