CohortPipeline builds analytic cohorts as a tree of named branches with
full exclusion provenance. Each branch derives from a parent branch by
applying a sequence of named exclusion rules. Every exclusion is
recorded – its reason, the predicate that produced it, the number of
subjects affected – so the resulting object can drive a CONSORT diagram
and serve as the auditable record of how the analytic dataset was
constructed.
Cohort construction is kept strictly upstream of analysis: the class
produces analytic data tables that downstream code can consume. See
vignette("cohort", package = "cohort") for a worked example.
A CohortPipeline stores a single shared base data table and, for each
branch, a small per-row integer status vector identifying which rows are
included and which step excluded them. Branching is therefore O(n) in
the number of rows of the base table and never copies the data values,
so deep cohort trees stay flat in memory.
A cohort becomes frozen the first time either:
another cohort branches from it (via $new_cohort(from = X)), or
an artifact is set on it (via $set_artifact(from = X)).
After freezing, $exclude_and_track() on that cohort errors. The rule
guarantees that a cohort's name maps to exactly one definition forever:
once children depend on it, its exclusion list is fixed, and any
cached artifact stays consistent with the included rows that produced
it. The practical workflow is "apply all exclusions on a cohort, then
branch from it or attach artifacts." Multi-way forks are unaffected:
you can branch a frozen cohort as many times as you like.
CohortPipeline$new(dt) makes a defensive copy of dt once. The
user's data table is never modified.
$get_included(cohort) returns an independent copy. The caller may
mutate it freely without affecting any other cohort or the shared
base table.
The data table passed to a $set_artifact() callback is always an
independent copy. Callbacks may mutate it freely.
$get_everyone(cohort) returns an independent copy with a
.cohort_status column reconstructed from the branch's status
vector.
CohortPipeline$new(dt, cache_file, label) – construct a pipeline
with a shared base table installed as the root cohort. With
cache_file, restore from a prior run if the file exists.
$new_cohort(name, from, label) – branch from an existing cohort.
$exclude_and_track(branch, reason, expr_str) – apply a string-form
predicate and log the exclusion.
$set_artifact(name, from, fn, argset) – cache a derived object on a
cohort. fn may be function(dt, sib) or function(dt, sib, argset).
$get_included(cohort) – included rows of a cohort.
$get_everyone(cohort) – full-cohort view with a reconstructed
.cohort_status column.
$get_artifact(cohort, name) – retrieve a cached artifact.
$n_included(cohort), $n_total() – row counts.
$list_cohorts(), $list_artifacts(cohort), $list_schemas()
$declare_schema(branch, schema, from), $validate() – column
contracts.
$consort() – long-form exclusion log across all branches.
$draw_consort_panels(panels, file) – render CONSORT diagrams.
$save(file), $invalidate(cohort, artifact) – incremental cache
persistence and manual cache invalidation.
$print() – concise text summary of the cohort tree.
Exclusion predicates are passed as strings (expr_str) and parsed with
parse(text = expr_str). The string is evaluated against the included
subset of the base table, so predicates may safely assume that earlier
exclusions have already removed invalid rows. NA predicate results
are treated as FALSE (rows are kept). The original string is stored
verbatim in the exclusion log, which keeps cohort definitions
serializable and auditable.
CohortPipeline$new()Create a new CohortPipeline. If cache_file is set and the file
exists, the pipeline is restored from that snapshot and dt is
used only as a sanity check (its dimensions and column names must
match the cached base table). Otherwise dt is installed as the
root cohort.
CohortPipeline$new(
dt = NULL,
cache_file = NULL,
label = NULL,
auto_validate = FALSE
)dtA data.table to install as the root cohort. Required
on cold construction; optional on warm cache load.
cache_fileOptional character path. When supplied, an
incremental cache is enabled. If the file exists, the pipeline
is restored from it and subsequent operations replay the
recorded log on cache hits, recomputing only divergent steps.
If the file does not exist, fresh state is built and $save()
writes to this path. Recommended idiom for scripts:
on.exit(cp$save(), add = TRUE) near the top.
labelOptional character. Display label for the root cohort
(used in CONSORT diagrams and list_cohorts()). Defaults to
"Cohort participants". Refreshed silently on warm cache load
so changing the label between runs is allowed.
auto_validateLogical. When TRUE, $validate() is invoked
automatically after every $new_cohort() and $set_artifact()
call so schema mismatches stop at the failure site rather than
accumulating until the next manual $validate(). Defaults to
FALSE.
CohortPipeline$declare_schema()Declare a column-type / level / NA contract for a branch. Validation
runs only when $validate() is called (or automatically when the
pipeline was constructed with auto_validate = TRUE).
branchCharacter. Branch name to attach the schema to.
schemaNamed list. Each element describes one column with fields:
type: one of "integer", "numeric", "factor", "logical",
"Date", "character".
levels (factor only): expected levels() vector.
na: if FALSE, the column must contain no NAs.
fromOptional character. If supplied, the new schema starts
as a copy of the schema attached to from and the entries in
schema are merged on top.
CohortPipeline$validate()Validate every declared schema against the included rows of its branch. Throws an error listing every mismatch found.
CohortPipeline$new_cohort()Create a new cohort branched from an existing cohort. The new cohort starts identical to its parent at the moment of branching; subsequent exclusions in the parent do not propagate to the child.
CohortPipeline$exclude_and_track()Apply an exclusion predicate to a cohort and record the result on
the exclusion log. The predicate is evaluated against the included
subset of the base table; rows for which the predicate evaluates
to TRUE are excluded with the supplied reason. NA predicate
results are treated as FALSE.
CohortPipeline$set_artifact()Compute and cache a derived artifact on a cohort.
fn may have either the legacy 2-argument signature
function(dt, sib) or the 3-argument signature
function(dt, sib, argset). The 3-argument form pairs with the
argset parameter to make the cache contract explicit: the cache
key is (name, from, body(fn), argset), so the artifact is
recomputed only when one of those changes. With the 2-argument
form, fn is invoked normally but argset is not used in the
cache key (suitable for one-off scripts; not recommended when
relying on cache_file).
Note that the cache key uses body(fn) literally; if fn calls
a helper that you change, the cache cannot detect that. Either
include the helper's output / a version tag in argset, or call
$invalidate() to force recompute.
nameCharacter. Artifact name (must be unique on the cohort).
fromCharacter. Cohort to attach the artifact to.
fnFunction with signature function(dt, sib) or
function(dt, sib, argset). The return value becomes the
artifact.
argsetOptional named list. Explicit data dependencies of
fn (e.g. list(outcomes = cfg$outcomes)); participates in the
cache key. Use the 3-argument fn signature to read these out.
CohortPipeline$get_included()Return an independent copy of the included rows of a cohort. The
returned data.table may be mutated freely without affecting the
shared base table or any other cohort.
CohortPipeline$get_everyone()Return a copy of the full base table with a .cohort_status column
reconstructed from this branch's exclusion history. Included rows
are labeled "included"; excluded rows carry the reason of the
first exclusion that caught them.
CohortPipeline$list_cohorts()Tabulate every cohort with its parent, sizes and number of own exclusion steps and artifacts.
CohortPipeline$consort()Long-form table of exclusion log entries across all cohorts. Each cohort contributes only its own exclusion steps (steps inherited from the parent at branch time are reported under the parent, not duplicated).
CohortPipeline$draw_consort_panels()Render one or more CONSORT panels for cohort flows. Each panel walks a sequence of cohort names, lumping the named cohorts' exclusion steps into bullet blocks.
Most users want $plot() instead, which auto-discovers every
root-to-leaf path in the tree and lays them out automatically.
$draw_consort_panels() is the manual escape hatch for custom
layouts and labels.
CohortPipeline$draw_consort_panels(
panels,
file = NULL,
ncol = NULL,
width = NULL,
height = NULL,
text_width = 40,
title_fontsize = 14
)panelsA named list. Each element is either a character
vector of cohort names (interpreted as the panel's main flow)
or a list with components flow (character) and optional
side_branches (named character of identity-only branches that
merge into the spine).
fileOptional character path. If supplied, the rendered
plot is written to a .pdf or .png file. Otherwise the
plot is drawn on the active device.
ncolOptional integer. Number of panels per row.
width, heightOptional numeric (inches). File dimensions.
text_widthInteger. Wrap width for box text.
title_fontsizeNumeric. Title fontsize for each panel.
CohortPipeline$plot()Plot a CONSORT diagram of the cohort tree.
With no arguments, plots one panel per cohort. Each panel walks the root-to-cohort path automatically and uses cohort names as box labels. With one or more cohort names, plots only those.
This is the default convenience entry point. Use
$draw_consort_panels() for custom labels or layouts.
CohortPipeline$plot(
cohorts = NULL,
file = NULL,
ncol = NULL,
width = NULL,
height = NULL,
text_width = 40,
title_fontsize = 14
)cohortsOptional character vector of cohort names. If omitted, every cohort is plotted.
fileOptional .pdf/.png path. If supplied, the plot is
written to that file. Otherwise it is drawn on the active device.
ncol, width, height, text_width, title_fontsizeOptional layout
overrides; see $draw_consort_panels().
CohortPipeline$print()Concise text summary of the cohort tree, exclusion counts, and attached artifacts.
CohortPipeline$save()Persist the pipeline to its cache_file (set at construction).
On the next CohortPipeline$new(dt, cache_file = ...) with the same
file, the saved state is restored and re-issued operations replay
from the cache; only divergent operations recompute. Idempotent
beyond the file write.
CohortPipeline$invalidate()Manually invalidate a cached cohort (drops the cohort and every
descendant) or a single artifact. Use when a helper function called
from inside a set_artifact fn has changed – the cache key
(body(fn) + argset) cannot detect that automatically.
if (requireNamespace("data.table", quietly = TRUE)) {
library(data.table)
d <- data.table(
id = 1:10,
age = c(17, 22, 35, NA, 41, 28, 19, 16, 67, 50),
sex = c("F", "M", "F", "F", NA, "M", "M", "F", "F", "M")
)
cp <- CohortPipeline$new(d)
# Root-level exclusions on the shared base
cp$exclude_and_track("root", "Missing sex", "is.na(sex)")
cp$exclude_and_track("root", "Missing age", "is.na(age)")
cp$exclude_and_track("root", "Under 18", "age < 18")
# Branch into an "adults_female" cohort
cp$new_cohort("adults_female", from = "root")
cp$exclude_and_track("adults_female", "Not female", "sex != 'F'")
# Cache a derived artifact on the cohort
cp$set_artifact("mean_age", from = "adults_female",
fn = function(dt, sib) mean(dt$age))
cp$list_cohorts()
cp$consort()
cp$get_artifact("adults_female", "mean_age")
}
#>
#> Attaching package: ‘data.table’
#> The following object is masked from ‘package:base’:
#>
#> %notin%
#> [1] 51