Architecture: lazycogs¶
lazycogs turns a geoparquet STAC item index into a lazy (time, band, y, x) xarray DataArray backed by Cloud-Optimized GeoTIFFs. It requires no GDAL. All raster I/O is done through async-geotiff (Rust-backed), spatial queries go through DuckDB via rustac, and reprojection is pure pyproj + numpy.
Why parquet, not a STAC API URL¶
open() accepts a path to a geoparquet file or directory, not a STAC API endpoint. This is intentional. When dask computes a large DataArray, it may execute hundreds of chunk tasks in parallel. If each task queried a remote STAC API directly, that would fire hundreds of concurrent HTTP requests at the API, which is both impolite and likely to trigger rate limiting or outright bans.
Instead, the expected workflow is to run one rustac.search_to("items.parquet", api_url, ...) call upfront to download the matching items into a local geoparquet file, and then pass that file to open(). Per-chunk spatial filtering is then a fast DuckDB query, with no network traffic and no API involvement.
For large pre-existing STAC archives stored as hive-partitioned parquet directories (e.g. year=2023/month=01/...), pass a DuckdbClient(use_hive_partitioning=True) via the duckdb_client parameter to enable DuckDB partition pruning. All internal queries use duckdb_client.search() regardless of whether a custom client is supplied; when duckdb_client is None, a plain DuckdbClient() is created automatically.
Two-phase execution model¶
Work is split sharply into two phases.
Phase 0 — open time runs at open() / open_async() call time. It does the minimum needed to build a fully-described lazy DataArray: one DuckDB query to discover bands, one DuckDB query to build the time axis, and a grid computation. No pixel I/O happens. No dask task graph is built. Each band becomes a StacBackendArray wrapped in an xarray LazilyIndexedArray.
Phase 1 — compute time runs inside a dask worker when a chunk is actually computed. StacBackendArray.__getitem__ receives the exact (time, y, x) index for the chunk, derives the chunk's spatial footprint, queries DuckDB for only the COGs that overlap that footprint and time step, reads and reprojects them concurrently with asyncio, and mosaics the results.
This split means that open() is nearly instant even for large queries, and the DuckDB spatial filter runs once per chunk rather than over the entire bbox at open time.
Module overview¶
src/lazycogs/
_core.py Entry point. open() / open_async(), band discovery, time-step building.
_backend.py StacBackendArray (per-band) and MultiBandStacBackendArray (4-D wrapper) — xarray BackendArray implementations that bridge xarray indexing to chunk reads.
_chunk_reader.py Async mosaic logic: open COGs, select overviews, read windows, reproject, mosaic.
_executor.py Per-chunk reprojection thread pool configuration. Exposes set_reproject_workers() and get_max_workers(); the actual pool is created per event loop in _backend.py.
_explain.py Dry-run read estimator. Registers the da.lazycogs.explain() xarray accessor.
_grid.py Compute output affine transform and coordinate arrays from bbox + resolution.
_reproject.py Nearest-neighbor reprojection using pyproj Transformer + numpy fancy indexing.
_store.py Resolve cloud HREFs into obstore Store instances (or route through a user-supplied store) with a thread-local cache.
_temporal.py Temporal grouping strategies (day, week, month, year, fixed-day-count).
_mosaic_methods.py Pixel-selection strategies (First, Highest, Lowest, Mean, Median, Stdev, Count).
Phase 0 in detail¶
open_async() in _core.py:
- Resolves
duckdb_client: if not provided, creates a plainDuckdbClient(). Validates thathrefends in.parquet/.geoparquetwhen no client is supplied (a directory path is accepted when a custom client is passed). - Parses
time_periodinto a_TemporalGrouper(see_temporal.py). - Converts
bboxfrom the target CRS to EPSG:4326 usingpyproj.Transformer. - Calls
_discover_bands(): queries the parquet source viaduckdb_client.search(..., max_items=1)to find asset keys. Assets with role"data"or media type"image/tiff"are returned first. - Calls
_build_time_steps(): queries the parquet source for all matching items, extracts their datetimes, buckets them with the_TemporalGrouper, deduplicates, and returns sorted(filter_strings, time_coords)pairs. Only groups with at least one item produce a time step. - Calls
compute_output_grid()to get the output affine transform and x/y coordinate arrays. - For each band, creates a
StacBackendArray(a dataclass) holding all the parameters needed to materialise any chunk later. - Wraps all per-band arrays in a single
MultiBandStacBackendArraywith shape(band, time, y, x), then wraps that in onexarray.core.indexing.LazilyIndexedArray. This avoidsxr.concat(used internally byds.to_array()), which would eagerly loadLazilyIndexedArray-backed objects. - Constructs the
xr.DataArraydirectly from the 4-D variable. Ifchunksis provided, calls.chunk(chunks)to convert to a dask-backed array; otherwise theLazilyIndexedArrayremains in play so narrow slices (e.g. a single pixel) translate to minimal I/O. - Stores
_stac_backends(the list ofStacBackendArrayinstances) and_stac_time_coords(the full time coordinate array) inda.attrsso thatda.lazycogs.explain()can reconstruct the explain plan without re-specifyingopen()parameters.
open() is a thin synchronous wrapper that calls _run_coroutine(open_async(...)), which handles both scripts and Jupyter kernels transparently (see the Jupyter fallback section).
Explain: dry-run read estimator¶
da.lazycogs.explain() (registered in _explain.py as an xarray accessor) provides an EXPLAIN ANALYZE-style dry run. It runs the same DuckDB spatial queries that fire during .compute() — one per (band, time step, spatial chunk) combination — but stops before any pixel I/O.
plan = da.lazycogs.explain() # DuckDB queries only, no pixel reads
plan = da.lazycogs.explain(fetch_headers=True) # also reads COG IFD headers
print(plan.summary())
df = plan.to_dataframe()
The accessor reads _stac_backends and _stac_time_coords from da.attrs and respects the DataArray's current extent and chunk sizes, so explaining a sliced DataArray (da.isel(time=0).lazycogs.explain()) queries only the reads needed for that slice.
ExplainPlan exposes:
- total_chunk_reads — number of (band, time, spatial tile) combinations
- total_cog_reads — total COG files matched across all chunks
- empty_chunk_count — chunks with no matching COG files (useful for diagnosing sparse series)
- summary() — multi-line human-readable report with COG-read distribution histogram
- to_dataframe() — one row per (chunk, COG file) for further analysis in pandas
With fetch_headers=True, each matched COG header is fetched (a small HTTP range request to read the IFD block) and CogRead.overview_level, window_col_off, window_row_off, window_width, and window_height are populated.
Phase 1 in detail¶
MultiBandStacBackendArray.__getitem__ in _backend.py (the 4-D entry point):
- xarray calls
__getitem__with anExplicitIndexer. The call is forwarded throughindexing.explicit_indexing_adapterto_raw_getitemwith a basic(int | slice, int | slice, int | slice, int | slice)key for(band, time, y, x). - The band key is resolved to a list of integer band indices. If it was an integer the band dimension is squeezed in the output.
- For each selected band,
StacBackendArray._raw_getitem((time_key, y_key, x_key))is called. If the per-band results are not band-squeezed they are stacked withnp.stack(..., axis=0).
StacBackendArray._raw_getitem in _backend.py (per-band):
- The time key is resolved to a list of integer positions. Integer keys squeeze the time dimension.
- Integer y or x keys are normalised to size-1 slices; the dimension is squeezed before returning.
- Logical y-indices (ascending, south-to-north) are converted to physical row indices (descending, north-to-south) to match the affine transform origin.
- The chunk's affine transform is derived:
chunk_affine = dst_affine * Affine.translation(x_start, y_start_physical). - The chunk's EPSG:4326 bounding box is computed from the four corners of the chunk using
pyproj.Transformer. - For each time step:
a.
duckdb_client.search(parquet_path, bbox=chunk_bbox_4326, datetime=date)returns only items whose geometry intersects this specific chunk for this date. Empty results short-circuit to nodata immediately. b._run_coroutine(async_mosaic_chunk(...))materialises the chunk. This helper usesasyncio.runnormally, but falls back to aThreadPoolExecutorworker when called from inside a running event loop (e.g. a Jupyter kernel) to avoid the "asyncio.run() cannot be called from a running event loop" error. The same helper is used at open time, soopen()works in Jupyter withoutawait. - The result array is flipped vertically (
result[:, ::-1, :]) to restore ascending y-order before squeezing and returning.
async_mosaic_chunk in _chunk_reader.py:
- Processes items in batches of
max_concurrent_reads(default 32) usingasyncio.gatherwith anasyncio.Semaphoreto bound the number of concurrent reads. This limits peak in-flight memory when a chunk overlaps many COGs. - For each item:
a.
resolve(href, store)returns an(ObjectStore, path)pair. If the caller supplied astore, it is returned unchanged and only the object path is extracted from the HREF; otherwiseobstore.store.from_urlconstructs a store fromscheme://netlocand caches it per thread. b.await GeoTIFF.open(path, store=store)opens the COG header. c._select_overview()picks the coarsest overview whose pixel size is still ≤ the target resolution. d._native_window()computes the pixel window in source space that covers the chunk bbox, clamped to the image extent. e.await reader.read(window=window)fetches the tile data. f.reproject_array()warps the read data to the destination chunk grid. - Each reprojected array is fed to the
MosaicMethodBaseinstance. Ifmosaic_method.is_donebecomes true (e.g.FirstMethodwith no nodata pixels remaining), the current batch finishes and all remaining batches are skipped — so items are never read once the mosaic is complete. - Returns
mosaic_method.dataas a(bands, chunk_height, chunk_width)array.
Grid and coordinate convention¶
compute_output_grid() in _grid.py produces:
- An affine transform with origin at the top-left corner of the top-left pixel and a negative y-scale (standard north-up raster convention).
- x-coordinates and y-coordinates as 1-D arrays of pixel-centre values, with y ascending (south to north) to match xarray label-based slicing conventions.
The ascending-y / top-down-affine duality is resolved in _raw_getitem by converting logical y-indices to physical row indices before computing chunk_affine, and flipping the result array before returning.
Per-chunk read and resample pipeline¶
Each call to _read_item_band() in _chunk_reader.py follows a four-step pipeline to turn a remote COG into a correctly-sized, correctly-projected numpy array for one destination chunk.
1. Overview selection¶
Before fetching any pixels, _select_overview() picks the right level of the COG's built-in pyramid. The target resolution is estimated in the source image's native CRS: if dst_crs differs from the COG's CRS, a 1-pixel offset at the chunk centre is transformed with pyproj to approximate the pixel size in source units. The overview list (ordered finest → coarsest) is walked to find the coarsest overview whose pixel size is still ≤ the target — i.e. the finest source data that avoids upsampling. This preserves as much spatial detail as the output scale warrants without reading unnecessarily fine data. If no overview satisfies the condition (target falls between native and the finest overview), or no overviews exist, full-resolution is used.
This is the primary resampling control: the pyramid level is chosen to match the output resolution, so the reprojection step works on roughly the right amount of data.
2. Source window computation¶
The destination chunk's bounding box (in dst_crs) is reprojected to the source CRS via pyproj. The inverse of the source affine transform maps those four corners to fractional pixel coordinates. floor/ceil and clamping to image bounds gives the Window that covers exactly the source pixels needed. Only that window is fetched from the COG, minimising I/O.
If the chunk bbox falls entirely outside the source image after clamping, _native_window() returns None and the item is skipped.
3. Tile read¶
await reader.read(window=window) fetches the windowed pixel data from the selected overview level (or full-res). The result is a (bands, window_h, window_w) array in the source CRS/grid.
4. Nearest-neighbor reprojection¶
reproject_array() in _reproject.py warps the source tile onto the destination chunk grid without GDAL:
- Build a meshgrid of destination pixel-centre coordinates.
- Transform all coordinates from
dst_crstosrc_crsin one vectorisedTransformer.transform()call. - Apply the inverse source affine transform to get fractional source pixel indices.
np.floorrounds to the nearest-neighbor sample; numpy fancy indexing populates the output array.- Out-of-bounds pixels get the nodata fill value.
Nearest-neighbor is the only supported resampling method.
Concurrency model¶
There are three nested layers of concurrency in a chunk read.
Dask (chunk level). When a dask-backed DataArray is computed, dask dispatches each chunk task to a worker thread. Each worker thread calls _run_coroutine() in _backend.py, which calls asyncio.run() to create a fresh event loop for that chunk. Worker threads are independent — they share no state except the thread-local store cache in _store.py.
asyncio (item level). Inside a chunk's event loop, async_mosaic_chunk processes overlapping items in batches of max_concurrent_reads (configurable via open(), default 32). Each batch fires _read_item_band() concurrently via asyncio.gather with an asyncio.Semaphore enforcing the limit. COG header reads and tile fetches from async-geotiff are all awaitable, so the event loop multiplexes them without blocking. Batching caps peak in-flight memory and enables true early exit: if the mosaic method signals completion within a batch, subsequent batches are never issued.
Thread pool (CPU work per item). reproject_array is synchronous CPU-bound work — there is no way to await it. Calling it directly in the coroutine would block the event loop and stall all other pending tile reads in the same asyncio.gather. Instead, each completed tile read immediately submits its reprojection to the event loop's default executor via loop.run_in_executor(None, ...). The event loop is then free to process the next completed tile read while earlier reprojections run in parallel on other threads.
This means I/O and CPU work overlap: reprojections for items whose tiles arrived first run while tiles for other items are still in flight.
Why threads, not a process pool. pyproj.Transformer.transform() and numpy's fancy-indexing both release the GIL during their heavy inner loops. Threads therefore give real CPU parallelism here — not just interleaving — without the overhead of process spawning and array pickling that a ProcessPoolExecutor would require.
Why reprojection is memory-bandwidth-bound, not compute-bound. compute_warp_map builds two meshgrids the size of the output chunk, transforms all coordinates in one vectorised call, and produces large index arrays. apply_warp_map samples the source array with random-access fancy indexing (out[:, valid] = data[:, row_idx[valid], col_idx[valid]]), which produces near-constant cache misses. Both phases are dominated by memory latency and bandwidth rather than arithmetic. In practice this means CPU utilisation is low (threads stall waiting for memory), and adding more than 4 concurrent reprojection threads provides no throughput benefit — they saturate the memory bus instead.
Bounded per-loop executor. Rather than using Python's default min(32, cpu_count + 4) thread count, _run_coroutine() installs a bounded ThreadPoolExecutor (default min(os.cpu_count(), 4)) as the default executor on each event loop it creates. This caps thread count per loop while preserving per-loop isolation: each dask task has its own independent pool and does not queue behind other tasks. The executor is automatically shut down when asyncio.run() closes the loop, so no threads leak. Call lazycogs.set_reproject_workers(n) to change the per-loop bound (see _executor.py).
Jupyter fallback. Jupyter kernels run a persistent event loop, which prevents re-entrant asyncio.run() calls. _run_coroutine() detects this with asyncio.get_running_loop() and falls back to spawning a single-worker ThreadPoolExecutor, submitting asyncio.run(coro) to that thread so it gets its own loop. The rest of the concurrency model is unchanged. One consequence: credential providers that hold event-loop-bound resources (such as NasaEarthdataAsyncCredentialProvider, which creates an aiohttp session at construction time) fail in this path because the session is bound to Jupyter's loop, not the worker thread's loop. Use the synchronous credential provider equivalents (e.g. NasaEarthdataCredentialProvider) instead.
Chunking strategy and throughput tradeoffs¶
The chunks argument to open() / open_async() controls whether the returned DataArray is backed by dask. Choosing the wrong chunking strategy — particularly adding spatial chunks — can significantly reduce throughput compared to leaving the array unchunked.
Why spatial chunks hurt¶
Without spatial chunks, xarray calls MultiBandStacBackendArray.__getitem__ once per time step for the full spatial extent. That single call fires one asyncio.gather that reads every overlapping COG for that time step concurrently. This is the maximum possible I/O parallelism for a time step: all tile fetches are in flight simultaneously in a single event loop.
With spatial chunks (e.g. chunks={"x": 512, "y": 512}), dask splits the extent into N tasks. Each task:
- Runs a separate
rustac.search_syncDuckDB query to find overlapping items. - Creates a fresh event loop and default
ThreadPoolExecutor. - Fires a smaller
asyncio.gatherover only the COGs that overlap its sub-region.
The total number of COG reads is the same, but they are spread across N smaller gathers rather than one large one. Dask workers do provide some task-level parallelism, but the overhead of N DuckDB queries, N event loop creations, and N executor instantiations typically outweighs the benefit, especially for small chunk sizes. A COG that spans multiple spatial chunks is also opened once per overlapping chunk rather than once per time step.
The async layer already handles spatial I/O concurrency. Dask spatial chunks add overhead without adding concurrency.
Where dask helps¶
The time dimension is where dask pays off. Different time steps are fully independent, and the async layer does nothing to parallelize across them. chunks={"time": 1} lets dask run multiple time steps in parallel across worker threads, each with its own full-spatial-extent gather.
Band dimension chunking does not help. Within a single time step, bands are read sequentially by MultiBandStacBackendArray. Splitting bands into separate dask tasks (chunks={"band": 1}) creates the same per-task overhead as spatial chunks (separate DuckDB queries, event loop creation, executor instantiation) without a meaningful parallelism benefit. Keep all bands in a single chunk.
Memory pressure is the other legitimate reason to add spatial chunks. If the full array does not fit in memory, spatial chunking limits how much is materialised at once even if it costs throughput.
Recommended defaults¶
| Goal | Suggested chunks |
|---|---|
| Maximum throughput, array fits in memory | omit chunks (or chunks={}) |
| Parallelise across time, memory is not a constraint | {"time": 1} |
| Large array that does not fit in memory | {"time": 1, "x": N, "y": N} with N as large as memory allows |
When spatial chunks are necessary for memory reasons, making them as large as possible minimises per-chunk overhead and keeps the asyncio.gather fan-out wide. An alternative to adding spatial chunks is reducing max_concurrent_reads on open(): this limits peak in-flight memory per chunk without the overhead of smaller dask tasks or additional DuckDB queries.
Temporal grouping¶
_temporal.py defines the _TemporalGrouper protocol and five concrete implementations:
| Class | time_period |
Bucket boundary |
|---|---|---|
_DayGrouper |
P1D |
Calendar day |
_WeekGrouper |
P1W |
ISO 8601 calendar week (Monday anchor) |
_MonthGrouper |
P1M |
Calendar month |
_YearGrouper |
P1Y |
Calendar year |
_FixedDayGrouper |
PnD (n>1) or PnW (n>1) |
Fixed-length windows aligned to 2000-01-01 |
Each grouper provides three methods: group_key() maps a datetime string to a sortable label, datetime_filter() converts a label to a rustac-compatible datetime range string, and to_datetime64() converts a label to a numpy.datetime64[D] coordinate value.
Mosaic methods¶
_mosaic_methods.py contains pure-numpy pixel-selection strategies operating on numpy.ma.MaskedArray:
FirstMethod: Returns the first valid pixel seen. Short-circuits when all pixels are filled.HighestMethod/LowestMethod: Tracks the running max / min across items.MeanMethod/MedianMethod/StdevMethod: Accumulates all arrays and reduces at the end.CountMethod: Counts valid (non-masked) observations per pixel.
These are copied from rio-tiler (MIT licence, zero GDAL imports) to avoid pulling in rasterio as a transitive dependency.
Temporal compositing¶
Combining time_period with a mosaic method is the idiomatic way to produce
temporal composites. Setting time_period="P1W" groups every STAC item within
the same ISO calendar week into a single time step. When a chunk is read,
async_mosaic_chunk feeds all items for that week to the mosaic method in
order. With FirstMethod (the default), reading stops as soon as every output
pixel has a valid (non-nodata) value — the remaining items in the week are
never fetched.
Note that mosaic methods operate on nodata masks only; they have no awareness of semantic content like clouds. Pixels that are not masked as nodata in the source COG will be treated as valid regardless of what they represent. If cloud masking is desired, it must be applied to the source data before lazycogs reads it (for example, by using scene-level cloud-mask bands to set nodata at the COG level).
This approach is substantially more efficient than applying temporal reductions
as post-processing steps on a daily array. Operations like ffill(dim="time")
or max(dim="time") over a dask-backed DataArray force xarray to materialise
every time step before reducing, because each step's result depends on the
previous one (for ffill) or all steps at once (for max). A weekly composite
opened with time_period="P1W" reads at most one week's worth of COGs per
output time step, with early exit when the mosaic is complete.
The general rule: push compositing logic into time_period and mosaic_method
at open() time. Reach for post-hoc xarray reductions only for operations that
cannot be expressed as a per-time-step mosaic (for example, a multi-week
mean that requires all weeks to be present before reducing).
Store caching and the store parameter¶
resolve() in _store.py defers to obstore.store.from_url for scheme detection — including the special-case HTTPS routing for amazonaws.com, r2.cloudflarestorage.com, and Azure hosts — rather than maintaining its own list of known object-store domains. The constructed store is cached per thread in a dict[str, ObjectStore] keyed by root URL (scheme://netloc). Because dask tasks run in threads, this avoids repeated connection setup within a single task while remaining safe across concurrent tasks.
Native cloud schemes (s3, s3a, gs) default to skip_signature=True so public buckets work without credentials. HTTPS URLs get no such default: if from_url routes https://bucket.s3.amazonaws.com/... to an S3Store, it will attempt to sign requests normally. For authenticated access or any non-default configuration, the caller is expected to construct an ObjectStore and pass it via the store= parameter to open() / open_async(); resolve() then returns it unchanged and only extracts the object path from each HREF. No introspection is done on a user-supplied store — the caller is responsible for ensuring it is rooted at the same scheme://netloc the HREFs point to.
store_for(href, *, asset=None, **kwargs) is a public convenience factory that automates this construction. It reads one sample item from the geoparquet file, extracts a data asset HREF, and calls from_url with the same skip_signature=True default as resolve(). If the item carries STAC Storage Extension metadata (v1.0.0 flat fields or v2.0.0 storage:schemes/storage:refs), region and requester_pays are also extracted and forwarded. Caller kwargs override all inferred values. The returned store is not cached — the caller owns its lifetime and passes it to open() via store=.
When the store root does not align with the URL structure of the asset HREFs — for example, an Azure Blob Storage store rooted at a container while the HREFs include the container name in the path — the caller can provide a path_from_href callable to open() / open_async(). The callable takes the full HREF string and returns the object path to use with the store. When supplied, it replaces the default urlparse-based extraction in resolve().
Key dependencies¶
| Package | Role |
|---|---|
rustac |
STAC search against local geoparquet files via DuckDB |
async-geotiff |
Async COG header reads and windowed tile reads (Rust, no GDAL) |
obstore |
Cloud object store abstraction layer for async-geotiff |
pyproj |
CRS transforms: bbox reprojection, warp map generation |
xarray |
DataArray / Dataset assembly, BackendArray / LazilyIndexedArray protocol |
dask |
Parallel chunk execution |
affine |
Affine transform arithmetic |
numpy |
Array operations throughout |
Documentation¶
The documentation site is built with mkdocs-material and deployed to GitHub Pages via GitHub Actions (.github/workflows/docs.yml). It includes:
- Introduction: rendered from
README.md(symlinked asdocs/index.md) - Demo: the Jupyter notebook at
docs/demo.ipynb, rendered bymkdocs-jupyter - API Reference: auto-generated from docstrings by
mkdocstrings[python]
To preview locally: uv run mkdocs serve