Chainable data pipelines for Python iterables. Zero dependencies.
pip install simplypipe
from simplypipe import pipe
stats = (
pipe(range(1000))
.filter(lambda x: x % 2 == 0)
.map(lambda x: x * 3)
.batch(100)
.run(sink=print)
)
print(stats.processed, stats.emitted, stats.duration)Applies fn to each item and passes the result downstream. The original item is replaced by the return value of fn.
pipe(["hello", "world"])
.map(str.upper)
.run(sink=print)
# HELLO
# WORLDLike map, but runs fn concurrently across a thread pool. Order is preserved. Best suited for I/O-bound work (HTTP requests, DB lookups, file operations).
max_workers— number of threads in the pool.Nonelets Python decide (min(32, os.cpu_count() + 4)).buffer— max futures in-flight at once.None(default) submits all items upfront — fine for bounded sources. Setbufferwhen the source is large or infinite to avoid memory pressure.
# I/O-bound enrichment with bounded memory
pipe(read_urls())
.parallel_map(requests.get, max_workers=10, buffer=30)
.filter(lambda r: r.status_code == 200)
.run(sink=results.append)Applies fn to each item and flattens the result. Use this when fn returns an iterable and you want each element of that iterable to continue as a separate item.
pipe(["hello world", "foo bar"])
.flat_map(str.split)
.run(sink=print)
# hello
# world
# foo
# barKeeps only items for which fn returns a truthy value. Dropped items are counted in RunStats.dropped.
pipe(range(10))
.filter(lambda x: x % 2 == 0)
.run(sink=print)
# 0
# 2
# 4
# 6
# 8Calls fn for its side-effect on each item, then passes the item through unchanged. Useful for logging or debugging mid-pipeline.
pipe(range(3))
.tap(lambda x: print(f"processing {x}"))
.map(lambda x: x * 10)
.run(sink=print)
# processing 0
# 0
# processing 1
# 10
# processing 2
# 20Collects items into lists of up to size elements. The last batch may be smaller if the source is exhausted. Each batch counts in RunStats.batches.
pipe(range(7))
.batch(3)
.run(sink=print)
# [0, 1, 2]
# [3, 4, 5]
# [6]Throttles throughput to at most rate items per per seconds by sleeping between items as needed.
# Process at most 5 items per second
pipe(range(20))
.rate_limit(5, per=1.0)
.run(sink=print)Drops duplicate items, keeping only the first occurrence. Use key to extract the comparison value from each item. Use max_size to bound memory — when the seen-set exceeds max_size, the oldest entry is evicted (LRU). Dropped duplicates are counted in RunStats.dropped.
pipe([1, 2, 2, 3, 1, 4])
.dedupe()
.run(sink=print)
# 1
# 2
# 3
# 4
# With a key function
pipe([{"id": 1, "v": "a"}, {"id": 1, "v": "b"}, {"id": 2, "v": "c"}])
.dedupe(key=lambda x: x["id"])
.run(sink=print)
# {"id": 1, "v": "a"}
# {"id": 2, "v": "c"}Like map, but retries fn up to retries times if it raises one of the specified exceptions. Backoff between attempts is exponential: backoff * 2 ** attempt seconds. If all retries are exhausted, the last exception is re-raised. Each failed attempt increments RunStats.errors.
import random
def flaky(x):
if random.random() < 0.5:
raise ValueError("transient error")
return x * 2
pipe(range(5))
.retry_map(flaky, retries=3, backoff=0.1)
.run(sink=print)Emits at most n items, then stops. Useful for previewing a pipeline, limiting output, or processing only a slice of a large source.
pipe(range(1_000_000))
.map(expensive_transform)
.take(10)
.run(sink=print)Like map, but handles errors per item instead of crashing the pipeline. If fn raises one of the specified exceptions, on_error(item, exc) is called and the item is dropped. Processing continues with the next item. Each caught error increments RunStats.errors.
dead_letters = []
pipe(records)
.catch(
enrich_from_api,
on_error=lambda item, exc: dead_letters.append(item),
exceptions=(IOError, TimeoutError),
)
.run(sink=write_to_db).run() returns a RunStats dataclass:
@dataclass
class RunStats:
processed: int # items from source
emitted: int # items delivered to sink
dropped: int # items removed by filter or dedupe
batches: int # batches produced by batch()
errors: int # exceptions caught by retry_map or catch
duration: float # wall-clock time in secondsgit clone https://github.com/janmarkuslanger/simplypipe.git
cd simplypipe
pip install -e ".[dev]"Run tests:
pytestLint and format:
ruff check .
ruff format .Type-check:
mypy simplypipeMIT