A stagerunner describes a linear sequence of execution: import data,
perform this munging step, then that munging, then do some modeling, etc.
However, it is structured hierarchically as a nested list for easier
usability. This function will create a nested list with the exact same
structure as the stagerunner except that each terminal node is either
TRUE
or FALSE
.
Specifically, given a tree structure with exactly one TRUE
value in the
terminal nodes, all successors of that node will be marked as TRUE
as well. Conversely, if forward = FALSE
, then all predecessors of
that node will be marked as TRUE
.
For example, imagine we have a stagerunner with the following stages:
If el
, the first argument to the boolean_fill
function depicted on
the right, refers to the “impute variable 1” stage, it will be represented
as el = list(F, list(T, F), F)
.
Then calling the boolean_fill
function depicted on the right with
forward = TRUE
will signify we want to select the stages occuring
afterwards:
# The left panel will explain at a high level the code you see in this right panel.
# Scroll down to begin reading the code behind the stagerunner package.
The return value will be list(F, list(T, T), T)
. If instead we
want the stages before “impute variable 1”, the return value
will be list(T, list(T, F), F)
.
#' Fill a nested logical list with TRUEs before or after the first TRUE
#'
#' This is a helper function to implement the \code{to} parameter
#' in the \code{run} method on a stageRunner object.
#'
#' @seealso \code{\link{run}}
#' @name boolean_fill
#' @param el list. A nested list of logicals with exactly one entry \code{TRUE}.
#' @param forward logical. \code{FALSE} for backwards, and \code{TRUE} for forwards.
#' The default is \code{TRUE}.
#' @return the filled list
boolean_fill <- function(el, forward = TRUE) {
We now dig into the actual code. The !is.finite
condition on the right
works due to the behavior of which
. If no element of the (possibly nested)
list el
contains a TRUE
, it will return a zero-length vector. When we
subset to [1]
, we get NA
, which fails is.finite
.
ix <- which(vapply(el, contains_true, logical(1)))[1]
if (!is.finite(ix)) stop("boolean_fill called but no TRUEs found")
We could have checked that there is a TRUE
somewhere in el
more
elegantly, but we need the precise location, ix
.
if (isTRUE(forward)) {
fills <- seq_len(length(el) - ix) + ix
} else {
fills <- seq_len(ix - 1)
}
el[fills] <- TRUE
If the sequence of slots before or after the index which contains a TRUE
(according to the value of forward
) have been flattened to a TRUE
value,
the only remaining TRUE
flattening has to occur recursively in any
remaining list elements.
if (!is.atomic(el[[ix]])) {
el[[ix]] <- boolean_fill(el[[ix]], forward = forward)
}
el
}
A stagerunner is simply a linear sequence (of usually functions) that is packaged as a tree structure to make it easier to reference related groups of operations.
Since stagerunners are intended to be run sequentially, that is,
only backward to forwards rather than the other way around, it is
important to be able to identify when it is accidentally run
in the latter way. For example, if we have three stages and
a runner is called in the wrong order with runner$run(2, 1)
,
we expect stage 1 to execute before stage 2.
The point of compare_stage_keys
is to determine whether a later
stage has been called prior to an earlier stage. If that is
the case, this function will return TRUE
, and later in the internals
of the stagerunner object we will be able to flip the keys.
#' Compare two stage keys to see which one has a stage run first
#'
#' This is a helper function to implement the \code{to} parameter
#' in the \code{run} method on a stageRunner object.
#'
#' @seealso \code{\link{run}}
#' @name compare_stage_keys
#' @param key1 list
#' @param key2 list
#' @return logical. Whether or not key1 runs a stage before key2.
compare_stage_keys <- function(key1, key2) {
if (length(key1) == 0 || isTRUE(key1)) return(TRUE)
index_of_true <- function(el) which(vapply(el, contains_true, logical(1)))[1]
ix
will be a vector of the location in the list structure of key1
and
key2
where the first (and, if they are correctly formed, the only)
TRUE
occurs.
ix <- vapply(list(key1, key2), index_of_true, numeric(1))
If no element in the first structure has a TRUE
, the first key definitely
cannot run a stage first (since no stages have been marked for running!).
if (!is.finite(ix[1])) {
return(FALSE)
Otherwise, if the latter has no TRUE
s, the first key definitely runs first.
} else if (!is.finite(ix[2])) { return(TRUE) }
if (ix[1] == ix[2]) {
If the keys specify the exact same stage in the stagerunner, there is a tie and we may as well return TRUE.
if (is.atomic(key1) && is.atomic(key2)) {
TRUE
} else {
Otherwise, one of the two keys must be another list, so we can recursively determine which key runs first.
compare_stage_keys(key1[[ix[1]]], key2[[ix[2]]])
}
We now come across a special case. Imagine we have a stagerunner as before.
If we call runner$run("clean data/2", "clean data")
this will signify
to “run from 'remove outliers from variable 1' to the end of the
'clean data' stage” (i.e., until “discretize variable 2”).
However, note in this case that the two keys would be represented by
list(F, list(F, T, T), F)
and list(F, T, F)
. If we are not
careful, the compare_stage_keys
function will indicate that
the latter occurs before the former, the keys will be flipped,
and we will end up executing “impute variable 1” instead!
We solve this problem by returning TRUE
if key2
consists purely
of TRUE
s (i.e., if it signifies “run until the end of this stage”).
} else if (all(vapply(key2, isTRUE, logical(1)))) {
TRUE
} else {
Finally, if the keys are directly comparable, the first key should be run
earlier if and only if the first key contains a TRUE
earlier than the
second key.
ix[1] < ix[2]
}
}
#' Copy one environment into another recursively.
#'
#' @param to environment. The new environment.
#' @param from environment. The old environment.
#' @note Both \code{to} and \code{from} must be pre-existing environments
#' or this function will error.
copy_env <- function(to, from) {
stopifnot(is.environment(to) && is.environment(from))
rm(list = ls(to, all.names = TRUE), envir = to)
for (name in ls(from, all.names = TRUE)) {
if (is.environment(from[[name]])) {
# Copy a sub-environment in full.
assign(name, new.env(parent = parent.env(from[[name]])), envir = to)
copy_env(to[[name]], from[[name]])
} else assign(name, from[[name]], envir = to)
}
}
The only way to turn an object into a stagerunner is if it can be interpreted as a hierarchical sequence of execution: run some stuff in group A, then run some stuff in group B, and so on, with each group potentially containing more subgroups.
In other words, the things which can be turned into stagerunners are:
The purpose of the is_pre_stagerunner
function is to determine
whether an object satisfies these restrictions.
#' Whether or not an object can be transformed into a stageRunner.
#'
#' @param x ANY. An R object for which it will be determined whether or not
#' it admits a conversion to a stageRunner.
#' @return TRUE or FALSE according to whether the object can be transformed
#' to a stageRunner. In general, only a function or list of functions
#' can be turned into a stageRunner.
#' @export
#' @examples
#' stopifnot(is_pre_stagerunner(function(e) { e$x <- 1 }))
#' stopifnot(is_pre_stagerunner(list(function(e) { e$x <- 1 }, function(e) { e$y <- 2 })))
#' stopifnot(is_pre_stagerunner(
#' list(a = function(e) { e$x <- 1 },
#' list(b = function(e) { e$y <- 2 }, c = function(e) { e$z <- 3 }))))
#'
#' stopifnot(!is_pre_stagerunner(NULL))
#' stopifnot(!is_pre_stagerunner(5))
#' stopifnot(!is_pre_stagerunner(iris))
is_pre_stagerunner <- function(x) {
if (is.function(x) || is.stagerunner(x)) { return(TRUE) }
if (!is.recursive(x) || is.environment(x)) { return(FALSE) }
Using a for loop is a tiny bit faster than an apply-family operation because we can exit the function early.
for (i in seq_along(x)) {
if (!(is.function(x[[i]]) || is.stagerunner(x[[i]]) || is.null(x[[i]]) ||
We use the base function Recall
for its recursive effect.
(is.recursive(x[[i]]) && Recall(x[[i]])))) {
return(FALSE)
}
}
TRUE
}
To determine what stages of a stagerunner to execute, we will use a nested list format that is equivalent in structure to the runner. For example, imagine we have a stagerunner with the following stages:
We would like to be able to execute swaths of this runner at will:
runner$run("clean")
, runner$run("clean/1", "clean/2")
and
runner$run(2)
should all execute the data cleaning sub-stages.
The normalize_stage_keys
function will convert human-readable
descriptions of what to execute, like "clean"
or 2
, to a
nested list format that will be easier to use later during stage
execution.
For example, "clean/1"
will be converted to
list(F, list(T, F), F)
and mimic the structure of the stagerunner.
#' Normalize a reference to stage keys
#'
#' For example, \code{list('data/one', 2)} would be converted to
#' \code{list('data', list('one')), 2)}.
#'
#' @name normalize_stage_keys
#' @param keys a list. The keys to normalize.
#' @param stages a list. The stages we're normalizing with respect to.
#' @param parent_key character. A helper for sane recursive error handling.
#' For example, if we try to reference key \code{foo/bar}, but a recursive
#' call to \code{normalize_stage_keys} errors when \code{bar} isn't found,
#' we would still like the error to display the full name (\code{foo/bar}).
#' @param to an indexing parameter. If \code{keys} refers to a single stage,
#' attempt to find all stages from that stage to this stage (or, if this one
#' comes first, this stage to that stage). For example, if we have
#' \code{stages = list(a = list(b = 1, c = 2), d = 3, e = list(f = 4, g = 5))}
#' where the numbers are some functions, and we call \code{normalize_stage_keys}
#' with \code{keys = 'a/c'} and \code{to = 'e/f'}, then we would obtain a nested
#' list of logicals referencing \code{"a/c", "d", "e/f"}.
#' @return a list. The format is nested logicals. For example, if \code{stages} is
#' \code{list(one = stageRunner$new(new.env(), list(subone = function(cx) 1)),
#' two = function(cx) 1)}
#' then
#' \code{normalize_stage_keys('one/subone')}
#' would return
#' \code{list(one = list(subone = TRUE), two = FALSE)}.
#' @seealso stageRunner__run
#' @examples
#' \dontrun{
#' stopifnot(identical(normalize_stage_keys("foo/bar",
#' list(foo = list(bar = NULL, baz = NULL))),
#' list(list(TRUE, FALSE))))
#' }
normalize_stage_keys <- function(keys, stages, to = NULL, parent_key = "") {
if (is.null(to)) {
normalize_stage_keys_unidirectional(keys, stages, parent_key)
} else {
normalize_stage_keys_bidirectional(keys, to, stages)
}
}
normalize_stage_keys_unidirectional <- function(keys, stages, parent_key) {
if (is.null(keys) || length(keys) == 0 || identical(keys, "")) {
By default, no key provided means to execute everything in this stage.
For single stages, that means TRUE
. For multiple stages, a
list of TRUE
s equal to the number of stages.
return(if (!is.list(stages) || length(stages) == 1) TRUE
else rep(list(TRUE), length(stages)))
}
Stagerunners are set up recursively, so we need to extract the list
of
stages out of the stagerunner object. For example,
actually consists of two stagerunners, one for the whole list and one for the “clean data” stage.
if (is.stagerunner(stages)) stages <- stages$stages
The output of normalize_stage_keys
is a (possibly nested) list whose
terminal nodes are all logical. If keys
is already of this format, we
are done.
if (all_logical(keys)) return(keys) # Already normalized
We performed checks for special cases, so now we call a function that assumes all those cases have been taken care of.
normalize_stage_keys_unidirectional_(keys, stages, parent_key)
}
Our strategy to determine which stages to run, and thus translate keys
from a form like “munge/impute variable 5” to a nested list, will be to
start with a list of consisting entirely of FALSE
and filling in the
sub-stages the user requested in the keys
with TRUE
s.
normalize_stage_keys_unidirectional_ <- function(keys, stages, parent_key) {
if (is.numeric(keys) && any(keys < 0)) {
as.list(!is.element(seq_len(stage_length(stages)), -keys))
Negative indexing, like -c(2:3)
, is easy: set everything except
those keys to TRUE
.
} else {
key_length <- if (is.list(stages)) length(stages) else 1
normalized_keys <- rep(list(FALSE), key_length)
Each element of the provided keys has a chance to modify
normalized_keys
. We achieve this using Reduce.
Reduce(function(normalized_keys, index) {
normalize_stage_keys_by_index(keys, stages, parent_key, index, normalized_keys)
}, seq_along(keys), normalized_keys)
}
}
normalize_stage_keys_by_index <- function(keys, stages, parent_key,
key_index, normalized_keys) {
key <- keys[[key_index]]
rest_keys <- key[-1]
key <- key[[1]]
normalize_stage_key(key = key, keys = rest_keys, stages = stages,
parent_key = parent_key, key_index = key_index,
normalized_keys = normalized_keys)
}
normalize_stage_key <- function(...) {
UseMethod("normalize_stage_key")
}
normalize_stage_key.logical <- function(key, key_index, normalized_keys, ...) {
#stop("!")
normalized_keys[[key_index]] <- key
normalized_keys
}
normalize_stage_key.numeric <- function(key, keys, stages, parent_key,
normalized_keys, ...) {
stopifnot(length(key) == 1)
if (key > stage_length(stages)) {
stop(sprintf(
"Cannot reference sub-stage %s of stage %s because it only has %d stages",
sQuote(key), dQuote(parent_key), stage_length(stages)
))
}
normalized_keys[[as.integer(key)]] <-
if (length(keys) == 0) TRUE
else normalize_stage_keys(keys, stages[[as.integer(key)]],
parent_key = paste0(parent_key, key, '/'))
normalized_keys
}
normalize_stage_key.character <- function(key, keys, stages, parent_key,
normalized_keys, ...) {
# The hard part! Allow things like one/subone/subsubone/etc
# to reference arbitrarily nested stages.
if (length(key) == 0) stop("Stage key of length zero")
key <- strsplit(key, '/')[[1]]
if (is.stageRunnerNode(stages)) {
stop("No stage with key '", paste0(parent_key, key[[1]]), "' found")
}
key_index <- tolower(key[[1]]) == tolower(names(stages))
if (!any(key_index)) {
key_index <- grepl(tolower(key[[1]]), tolower(names(stages)))
}
if (is.finite(suppressWarnings(tmp <- as.numeric(key[[1]]))) &&
tmp > 0 && tmp <= length(stages)) {
key_index <- tmp
} else if (length(key_index) == 0 || sum(key_index) == 0) {
stop("No stage with key '", paste0(parent_key, key[[1]]), "' found")
} else if (sum(key_index) > 1) {
stop("Multiple stages with key '", paste0(parent_key, key[[1]]),
"', found: ", paste0(parent_key, names(stages)[key_index], collapse = ', '))
} else key_index <- which(key_index) # now an integer of length 1
normalized_keys[[key_index]] <- special_or_lists(
normalized_keys[[key_index]],
normalize_stage_keys(append(paste0(key[-1], collapse = '/'), keys),
stages[[key_index]], parent_key = paste0(parent_key, key[[1]], '/'))
)
normalized_keys
}
normalize_stage_key.default <- function(...) {
stop("Invalid stage key")
}
normalize_stage_keys_bidirectional <- function(from, to, stages) {
First, we turn our human-readable keys like “clean/impute variable 1”
into a more convenient list structure like list(F, list(T, F, F), F)
.
from <- normalize_stage_keys(from, stages)
to <- normalize_stage_keys(to, stages)
Recall our helper compare_stage_keys
, which returns FALSE
if
the first argument occurs before the second. In this situation, we
need to swap the keys.
if (!compare_stage_keys(from, to)) {
A convenient swapping mechanism without introducing temporary variables.
In R, the list2env
utility
can funnel named values in a list directly into an environment.
Try it yourself:
x <- 1
y <- 2
list2env(list(x = y, y = x), environment())
cat(x, ",", y)
list2env(list(from = to, to = from), environment())
}
And finally the magic trick that pulls it all together. See the more
thorough explanation below beside the special_and_lists
helper.
special_and_lists(
boolean_fill(from, forward = TRUE),
boolean_fill(to, forward = FALSE)
)
}
Terminal stages in a stagerunner are stageRunnerNode
objects, so we treat
those as stages of length 1.
stage_length <- function(obj) {
if (is.list(obj)) length(obj)
else 1
}
Consider our example stagerunner from before:
Our goal is to display progress when executing the stagerunner:
#' Show a progress message when executing a stagerunner.
#'
#' @name show_message
#' @param stage_names character.
#' @param stage_index integer.
#' @param begin logical. Whether we are showing the begin or end message.
#' @param nested logical. Whether or not this is a nested stage (i.e.
#' contains another stageRunner).
#' @param depth integer. How many tabs to space by (for nested stages).
#' @return Nothing, but print the message to standard output.
#' @examples
#' \dontrun{
#' show_message(c('one', 'two'), 2) # Will print "Beginning one stage..."
#' }
show_message <- function(stage_names, stage_index, begin = TRUE,
nested = FALSE, depth = 1) {
stage_name <- stage_names[stage_index]
If the stage was not named (i.e., only a function was given), we “impute” the name with an ordinal: “fifth”, “twelfth”, “21st”, etc. (depending on the index of the stage).
if (is.null(stage_name) || identical(stage_name, "") || identical(stage_name, NA_character_)) {
stage_name <- as.ordinal(stage_index)
}
if (begin) {
stage_name <- crayon::green(stage_name)
} else {
stage_name <- crayon::blue(stage_name)
}
We indent by depth
double-spaces to show nested stages clearly.
prefix <- paste(rep(" ", depth - 1), collapse = '')
We turn “import data” into “1. import data”.
stage_name <- paste0(stage_index, ". ", stage_name)
Non-terminal stages (i.e., those with more sub-stages) have a beginning and an ending, so we show “Beginning 2. clean data stage” and “Ending 2. clean data stage”.
if (nested) {
cat(paste0(prefix, if (begin) "Beginn" else "End", "ing ",
stage_name, " stage...\n"))
} else if (begin) {
Whereas terminal stages (i.e., those without sub-stages) just run, so we show “Running 1. import data stage”.
cat(paste0(prefix, "Running ", stage_name, "...\n"))
}
}
Imagine we want to run from “create validation set” to “create derived variable.”
The syntax for this is runner$run("val", "munge/derived")
(amongst
other ways – substrings are matched to stage names by the
normalize_stage_keys
helper).
To translate this into code, stagerunner builds the following two trees:
FALSE
TRUE
TRUE
TRUE
TRUE
TRUE
and
TRUE
TRUE
TRUE
TRUE
FALSE
FALSE
and then intersects them:
FALSE
TRUE
TRUE
TRUE
FALSE
FALSE
The point of special_and_lists
is to perform this intersection operation.
#' AND two lists together with some regards for nesting
#'
#' The structure of the lists should be the same. That is,
#' as a tree, the two lists should be isomorphic. For example,
#' \code{special_and_lists(list(a = FALSE, b = list(b = TRUE, c = FALSE)),
#' list(a = FALSE, b = list(b = FALSE, c = TRUE)))}
#' yields
#' \code{list(a = FALSE, b = list(b = FALSE, c = TRUE))}
#' and
#' \code{special_and_lists(list(a = FALSE, b = list(b = TRUE, c = FALSE)),
#' list(a = list(b = FALSE, c = TRUE), b = FALSE))}
#' yields
#' \code{list(a = list(b = FALSE, c = TRUE), b = list(b = TRUE, c = FALSE))}
#'
#' Note that lists get ANDed based on *order*, not on key names (as this could
#' be ambiguous), so make sure the two lists have the same comparable key orders.
#' For example, \code{special_and_lists(list(a = TRUE, b = FALSE), list(b = FALSE, a = TRUE))}
#' would mistakenly return \code{list(a = TRUE, b = TRUE)}.
#'
#' @name special_and_lists
#' @param list1 a list.
#' @param list2 a list.
#' @seealso \code{\link{special_or_lists}}
#' @return the and'ed list.
#' @examples \dontrun{
#' stopifnot(identical(
#' special_and_lists(list(a = FALSE, b = list(b = TRUE, c = FALSE)),
#' list(a = FALSE, b = list(b = FALSE, c = TRUE))),
#' list(a = FALSE, b = list(b = FALSE, c = TRUE))
#' ))
#'
#' stopifnot(identical(
#' special_and_lists(list(a = FALSE, b = list(b = TRUE, c = FALSE)),
#' list(a = list(b = FALSE, c = TRUE), b = FALSE)),
#' list(a = list(b = FALSE, c = TRUE), b = list(b = TRUE, c = FALSE))
#' ))
#' }
special_and_lists <- function(list1, list2) {
if (identical(list1, FALSE) || identical(list2, FALSE)) {
FALSE
If one of the two lists is TRUE
, an “AND” operation is simply
equivalent to choosing the other list.
} else if (identical(list1, TRUE)) {
list2
} else if (identical(list2, TRUE)) {
list1
} else if (!(is.list(list1) && is.list(list2))) {
stop("special_and_lists only accepts lists or atomic logicals of length 1")
} else if (length(list1) != length(list2)) {
stop("special_and_lists only accepts lists of the same length")
} else {
This function should only ever be used on lists coming from the same hierarchy of stages, so give a warning if this is not the case.
if (!identical(names(list1), names(list2))) {
warning("special_and_lists matches lists by order, not name, ",
"but the names of the two lists do not match!")
}
We use Map to recursively apply the operation to the remaining elements.
Map(special_and_lists, list1, list2)
}
}
This function is equivalent to special_and_lists
but instead we apply
“OR” to each pair of logical values:
FALSE
TRUE
TRUE
TRUE
FALSE
FALSE
and
FALSE
FALSE
FALSE
TRUE
TRUE
FALSE
would become
FALSE
TRUE
TRUE
TRUE
TRUE
FALSE
#' OR two lists together with some regards for nesting
#'
#' The structure of the lists should be the same. That is,
#' as a tree, the two lists should be isomorphic. For example,
#' \code{special_or_lists(list(a = FALSE, b = list(b = TRUE, c = FALSE)),
#' list(a = FALSE, b = list(b = FALSE, c = TRUE)))}
#' yields
#' \code{list(a = FALSE, b = list(b = TRUE, c = TRUE))}
#' and
#' \code{special_or_lists(list(a = FALSE, b = list(b = TRUE, c = FALSE)),
#' list(a = list(b = FALSE, c = TRUE), b = FALSE))}
#' yields
#' \code{list(a = list(b = FALSE, c = TRUE), b = list(b = TRUE, c = FALSE))}
#'
#' Note that lists get ORed based on *order*, not on key names (as this could
#' be ambiguous), so make sure the two lists have the same comparable key orders.
#' For example, \code{special_or_lists(list(a = TRUE, b = FALSE), list(b = FALSE, a = TRUE))}
#' would mistakenly return \code{list(a = TRUE, b = TRUE)}.
#'
#' @name special_or_lists
#' @param list1 a list.
#' @param list2 a list.
#' @seealso \code{\link{special_and_lists}}
#' @return the or'ed list.
special_or_lists <- function(list1, list2) {
if (identical(list1, TRUE) || identical(list2, TRUE)) {
TRUE
} else if (identical(list1, FALSE)) {
list2
} else if (identical(list2, FALSE)) {
list1
} else if (!(is.list(list1) && is.list(list2))) {
stop("special_or_lists only accepts lists or atomic logicals of length 1")
} else if (length(list1) != length(list2)) {
stop("special_or_lists only accepts lists of the same length")
} else {
if (!identical(names(list1), names(list2))) {
warning("special_or_lists matches lists by order, not name, ",
"but the names of the two lists do not match!")
}
Map(special_or_lists, list1, list2)
}
}
Imagine we have two stagerunners:
and
It is a natural operation to concatenate or append these stagerunners
into a single runner. We can do this using the $append
method.
However, append will create one final stage at the end instead of juxtaposing the stages, to make it clear which runner was appended.
#' Append one stageRunner to the end of another.
#'
#' @name stageRunner_append
#' @param other_runner stageRunner. Another stageRunner to append to the current one.
#' @param label character. The label for the new stages (this will be the name of the
#' newly appended list element).
stageRunner_append <- function(other_runner, label = NULL) {
stopifnot(is.stagerunner(other_runner))
new_stage <- structure(list(other_runner), names = label)
Appending a stagerunner is simply concatenating its stages.
self$stages <- c(self$stages, new_stage)
self
}
The around
method on a stagerunner is used as sort of setup and teardown
hooks on arbitrary stages.
For example, imagine we have a stagerunner that looks like the following.
Imagine we want to write test functions that ensure the correct behavior is happening during the data cleaning. We can write a stagerunner with an identical tree structure that performs additional testing to ensure our work is correct:
new_runner <- stageRunner$new(new.env(), list(
"import data" = function(e) { yield(); stopifnot(!is.null(e$data)) },
"clean data" = list(
"impute variable 1" = function(e) {
yield()
stopifnot(!any(is.na(e$data$variable1)))
}, "discretize variable 2" = function(e) {
yield()
stopifnot(is.factor(e$data$variable2))
})
))
The keyword yield
is injected into a stagerunner that is used with the
around
method, and means “execute the stage of the stagerunner that is
being wrapped that would normally occur at this point.” Code before
and after the yield
keyword can be used to perform additional assertions
about what happened during the execution of the stage.
runner$around(new_runner)
runner$run()
If any of the above assertions fail, we will now get an error.
#' Wrap a function around a stageRunner's terminal nodes
#'
#' If we want to execute some behavior just before and just after executing
#' terminal nodes in a stageRunner, a solution without this method would be
#' to overlay two runners -- one before and one after. However, this is messy,
#' so this function is intended to replace this approach with just one function.
#'
#' Consider the runner
#' \code{sr1 <- stageRunner$new(some_env, list(a = function(e) print('2')))}
#' If we run
#' \code{sr2 <- stageRunner$new(some_env, list(a = function(e) {
#' print('1'); yield(); print('3') }))
#' sr1$around(sr2)
#' sr1$run()
#' }
#' then we will see 1, 2, and 3 printed in succession. The \code{yield()}
#' keyword is used to specify when to execute the terminal node that
#' is sandwiched in the "around" runner.
#'
#' @name stageRunner_around
#' @param other_runner stageRunner. Another stageRunner from which to create
#' an around procedure. Alternatively, we could give a function or a list
#' of functions.
stageRunner_around <- function(other_runner) {
if (is.null(other_runner)) return(self)
if (!is.stagerunner(other_runner)) {
# Create a new stagerunner if `other_runner` is not already a runner.
other_runner <- stageRunner$new(self$.context, other_runner)
}
If no names are given names(other_runner$stages)
may be NULL
.
We would like a character vector of empty strings instead so may we
obtain the correct names in the loop below.
stagenames <- names(other_runner$stages) %||% rep("", length(other_runner$stages))
lapply(seq_along(other_runner$stages), function(stage_index) {
name <- stagenames[stage_index]
We assume each named stage has a unique name.
# TODO: (RK) It may be possible to avoid this assumption by counting
# duplicately named stages.
this_index <-
if (identical(name, "")) stage_index
else if (is.element(name, names(self$stages))) name
else return()
If both this stage and the corresponding stage on the other runner are
stagerunners, we recursively use the around
method. Otherwise, we use
the stageRunnerNode$around
method.
if (is.stagerunner(self$stages[[this_index]]) &&
is.stagerunner(other_runner$stages[[stage_index]])) {
self$stages[[this_index]]$around(other_runner$stages[[stage_index]])
} else if (is.stageRunnerNode(self$stages[[this_index]]) &&
is.stageRunnerNode(other_runner$stages[[stage_index]])) {
self$stages[[this_index]]$around(other_runner$stages[[stage_index]])
} else {
warning("Cannot apply stageRunner$around because ",
this_index, " is not a terminal node.")
}
})
self
}
Coalescing two stagerunners is a critical operation for modifying a runner that is “in flight” and is currently being executed.
Imagine we have our usual example runner:
It has remember = TRUE
, which means it is keeping a copy of the
current context in each stage. As in the example, we have executed it
to the imputation substage. We now realize there is a mistake in the
imputation code. We can re-create a new fresh stagerunner with the
same structure, but it will not have the history of context changes!
Instead, we must coalesce the old runner onto the new runner, so that it carries over the environment changes. That way, when we continue execution from the imputation substage in our fixed runner, it will resume as before without having to re-import the data.
This can be inefficient for large datasets, but using the objectdiff package we can avoid the memory problems that may arise. For even larger datasets, we may need database-backed storage, but this is beyond the scope of stagerunners for now.
#' Coalescing a stageRunner object is taking another stageRunner object
#' with similar stage names and replacing the latter's cached environments
#' with the former's.
#'
#' @name stageRunner_coalesce
#' @param other_runner stageRunner. Another stageRunner from which to coalesce.
#' @note coalescing is ill-defined for stageRunner with unnamed stages,
#' since it is impossible to tell when a stage has changed.
stageRunner_coalesce <- function(other_runner) {
# TODO: Should we care about insertion of new stages causing cache wipes?
# For now it seems like this would just be an annoyance.
# stopifnot(remember)
if (!isTRUE(self$remember)) return()
We must handle these cases: (1) integration with objectdiff, and (2) vanilla R environment objects. Both are tricky.
if (self$with_tracked_environment()) {
if (!other_runner$with_tracked_environment()) {
stop("Cannot coalesce stageRunners using tracked_environments with ",
"those using vanilla environments", call. = FALSE)
}
compare_head <- function(x, y) {
m <- seq_len(min(length(x), length(y)))
x[m] != y[m]
}
common <- sum(cumsum(compare_head(self$stage_names(), other_runner$stage_names())) == 0)
# Warning: Coalescing stageRunners with tracked_environments does not
# duplicate the tracked_environment, so the other_runner becomes invalidated,
# and this is a destructive action.
# TODO: (RK) What if the tracked_environment given initially to the stageRunner
# already has some commits?
commits <- package_function("objectdiff", "commits")
`.context<-` <- function(obj, value) {
if (is.stagerunner(obj)) {
obj$.context <- value
for (stage in obj$stages) { Recall(stage, value) }
} else if (is.stageRunnerNode(obj)) {
obj$.context <- value
if (is.stagerunner(obj$callable)) { Recall(obj$callable, value) }
}
}
self$.context <- other_runner$.context
for (stage in self$stages) { .context(stage) <- other_runner$.context }
Mark common executed stages.
self_iterator <- treeSkeleton(self)$root()$first_leaf()
other_iterator <- treeSkeleton(other_runner)$root()$first_leaf()
for (i in seq_along(common)) {
self_iterator$object$executed <- other_iterator$object$executed
self_iterator <- self_iterator$successor()
other_iterator <- other_iterator$successor()
}
Coalescing is a destructive action since the other runner will no longer be able to perform its function after the environments are moved.
other_runner$.context <- new.env(parent = emptyenv())
commit_count <- length(commits(self$.context))
mismatch_count <- commit_count - (common + 1)
if (mismatch_count > 0) {
package_function("objectdiff", "force_push")(self$.context, commit_count)
package_function("objectdiff", "rollback") (self$.context, mismatch_count)
}
} else {
if (other_runner$with_tracked_environment()) {
stop("Cannot coalesce stageRunners using vanilla environments with ",
"those using tracked_environments", call. = FALSE)
}
stagenames <- names(other_runner$stages) %||% character(length(other_runner$stages))
lapply(seq_along(other_runner$stages), function(stage_index) {
# TODO: Match by name *OR* index
if (stagenames[[stage_index]] %in% names(self$stages)) {
# If both are stageRunners, try to coalesce our sub-stages.
if (is.stagerunner(self$stages[[names(self$stages)[stage_index]]]) &&
is.stagerunner(other_runner$stages[[stage_index]])) {
self$stages[[names(self$stages)[stage_index]]]$coalesce(
other_runner$stages[[stage_index]])
# If both are not stageRunners, copy the cached_env if and only if
# the stored function and its environment are identical
} else if (!is.stagerunner(self$stages[[names(self$stages)[stage_index]]]) &&
!is.stagerunner(other_runner$stages[[stage_index]]) &&
!is.null(other_runner$stages[[stage_index]]$.cached_env) #&&
#identical(deparse(stages[[names(stages)[stage_index]]]$fn),
# deparse(other_runner$stages[[stage_index]]$fn)) # &&
# This is way too tricky and far beyond my abilities..
#identical(stagerunner:::as.list.environment(environment(stages[[names(stages)[stage_index]]]$fn)),
# stagerunner:::as.list.environment(environment(other_runner$stages[[stage_index]]$fn)))
) {
self$stages[[names(self$stages)[stage_index]]]$.cached_env <-
new.env(parent = parent.env(self$.context))
if (is.environment(other_runner$stages[[stage_index]]$.cached_env) &&
is.environment(self$stages[[names(self$stages)[stage_index]]]$.cached_env)) {
copy_env(self$stages[[names(self$stages)[stage_index]]]$.cached_env,
other_runner$stages[[stage_index]]$.cached_env)
self$stages[[names(self$stages)[stage_index]]]$executed <-
other_runner$stages[[stage_index]]$executed
}
}
}
})
self$.set_parents()
}
self
}
#' This allows us to get the furthest executed stage.
#'
#' @name stageRunner_current_stage
#' @return a character stage key giving the latest executed stage.
#' If the stageRunner does not have caching enabled, this will
#' always return the first stage key (`'1'`).
stageRunner_current_stage <- function() {
for (stage_index in rev(seq_along(self$stages))) {
We use the stageRunnerNode$was_executed
helper to determine if this
stage has been executed yet.
is_executed_terminal_node <- is.stageRunnerNode(self$stages[[stage_index]]) &&
self$stages[[stage_index]]$was_executed()
if (is_executed_terminal_node) return(as.character(stage_index))
We can recursively use current_stage
if the current stage is another
stagerunner rather than a terminal node.
has_executed_terminal_node <- is.stagerunner(self$stages[[stage_index]]) &&
is.character(tmp <- self$stages[[stage_index]]$current_stage())
if (has_executed_terminal_node) return(paste(c(stage_index, tmp), collapse = '/'))
}
FALSE
}
#' Whether or not the stageRunner has a key matching this input.
#'
#' @param key ANY. The potential key.
#' @return \code{TRUE} or \code{FALSE} accordingly.
stageRunner_has_key <- function(key) {
We turn the key, like “data/foo” or c(1,2)
into a
nested list of logicals in the usual format, or FALSE
if
the key cannot be parsed.
has <- tryCatch(normalize_stage_keys(key, self$stages),
error = function(.) FALSE)
If any substage is TRUE
, the stagerunner contains this key.
Note that keys may refer to several different substages!
This method will tell us whether it is possible to execute anything
using the provided key
.
any(c(has, recursive = TRUE))
}
#' Initialize a stageRunner object.
#'
#' stageRunner objects are used for executing a linear sequence of
#' actions on a context (an environment). For example, if we have an
#' environment \code{e} containing \code{x = 1, y = 2}, then using
#' \code{stages = list(function(e) e$x <- e$x + 1, function(e) e$y <- e$y - e$x)}
#' will cause \code{x = 2, y = 0} after running the stages.
#'
#' @name stageRunner_initialize
#' @param context environment. The initial environment that is getting
#' modified during the execution of the stages.
#' @param stages list. The functions to execute on the \code{context}.
#' @param remember logical. Whether to keep a copy of the context and its
#' contents throughout each stage for debugging purposes--this makes it
#' easy to go back and investigate a stage.
#'
#' The default is \code{FALSE}. When set to \code{TRUE}, the return value
#' of the \code{run} method will be a list of two environments: one of what
#' the context looked like before the \code{run} call, and another
#' of the aftermath.
When a stagerunner object is initialized, it needs to convert a pre-stagerunner, like
list(first = some_function, second = list(
sub1 = another_function, sub2 = a_third_function
)
into a stagerunner object. This class constructor will turn the above into a hierarchy of stagerunners to make it easier to recursively re-use functionality.
#' @param mode character. Controls the default behavior of calling the
#' \code{run} method for this stageRunner. The two supported options are
#' "head" and "next". The former gives a stageRunner which always begins
#' from the first stage if the \code{from} parameter to the \code{run}
#' method is blank. Otherwise, it will begin from the previous unexecuted
#' stage. The default is "head". This argument has no effect if
#' \code{remember = FALSE}.
stagerunner_initialize <- function(context, stages, remember = FALSE,
mode = getOption("stagerunner.mode") %||% "head") {
As a convenient shortcut, if a stagerunner is initialized without a second argument but with a first argument that can be turned into stages, we create a new environment for the context.
if (missing(stages) && !missing(context) && is_pre_stagerunner(context)) {
stages <- context
The only parent environment that makes sense is the calling environment.
context <- new.env(parent = parent.frame())
}
if (identical(remember, FALSE) && is(context, "tracked_environment")) {
stop("Can not use tracked environments with stagerunners that have caching ",
"disabled (remember = FALSE)")
}
The enforce_type
helper in utils.R will print a nice and colorful error
message if we have initialized our stagerunner with the wrong argument
types.
enforce_type(context, "environment", "stagerunner", "context")
enforce_type(remember, "logical", "stagerunner", "remember")
enforce_type(mode, "character", "stagerunner", "mode")
match.arg
is a convenient base R helper that will error unless one of a given set of
options is chosen.
match.arg(mode, c("head", "next"))
stopifnot(length(remember) == 1)
self$.parent <- NULL
# The .finished flag is used for certain features when printing a stagerunner.
self$.finished <- FALSE
self$.context <- context
self$.mode <- tolower(mode)
self$remember <- remember
A stagerunner will recursively be represented using more stagerunners. This way, we can re-use methods defined on a stagerunner on local subsections.
self$stages <- initialize_stages(stages, context, remember)
We wrap up with some messy initialization in case our stagerunner intends to remember progress.
if (isTRUE(self$remember)) {
initialize_remembrance(self)
}
}
initialize_stages <- function(stages, context, remember) {
if (length(stages) == 0) {
warning("stagerunners with zero stages may cause problems.")
}
if (!is_pre_stagerunner(stages)) {
stop("Can only turn a function or list of functions into a stagerunner.")
}
if (is.function(stages)) {
stages <- list(stages)
}
A loop is slightly faster than an lapply
here.
for (i in seq_along(stages)) {
if (is.list(stages[[i]])) {
stages[[i]] <- stagerunner(context, stages[[i]], remember = remember)
} else if (is.function(stages[[i]]) || is.null(stages[[i]])) {
stages[[i]] <- stageRunnerNode(stages[[i]], context)
}
}
We will be using the /
character in a special way for running
stages. For example, if we had a runner such as
we would run the first substage using runner$run("clean data/impute variable 1")
.
To avoid complications, we prevent the use of slashes in the stage names.
prevent_stage_name_violators(stages)
stages
}
prevent_stage_name_violators <- function(stages) {
if (any(violators <- grepl("/", names(stages), fixed = TRUE))) {
stop(paste0("Stage names may not have a '/' character. The following do not ",
"satisfy this constraint: '",
paste0(names(stages)[violators], collapse = "', '"), "'"))
}
}
initialize_remembrance <- function(stagerunner) {
stagerunner$.clear_cache()
We set up some meta-data that will be used to track the
changes occuring in the stagerunner. See the treeSkeleton
class
later for more details.
stagerunner$.set_parents()
if (stagerunner$with_tracked_environment()) {
stagerunner$.set_prefixes()
} else if (length(stagerunner$stages) > 0) {
The very first stage should remember what the context looked like upon initialization. After all, if a user messed with the context and later re-runs the stagerunner from scratch, it should remember what the context looked like at the time of initialization.
first_env <- treeSkeleton$new(stagerunner$stages[[1]])$first_leaf()$object
first_env$.cached_env <- new.env(parent = parent.env(stagerunner$.context))
copy_env(first_env$.cached_env, stagerunner$.context)
}
}
This file contains some messy internal methods that are necessary
for correct interoperation with the treeSkeleton
class and
the objectdiff package.
#' Clear all caches in this stageRunner, and recursively.
#' @name stageRunner_.clear_cache
stageRunner_.clear_cache <- function() {
for (i in seq_along(self$stages)) {
The stagerunner context just prior to stage execution is stored in an environment cache. We clear this cache recursively.
if (is.stagerunner(self$stages[[i]])) self$stages[[i]]$.clear_cache()
else self$stages[[i]]$.cached_env <- NULL
}
TRUE
}
The treeSkeleton
requires a recursive structure to be annotated with
“parent metadata” so it can be traversed like a tree structure. This is
what allows us to go from stage “2/2” to stage “3”, for example: we
are finding the successor node in the tree structure and “running” it.
#' Set all parents for this stageRunner, and recursively
#' @name stageRunner_.set_parents
stageRunner_.set_parents <- function() {
for (i in seq_along(self$stages)) {
# Set convenience helper attribute "child_index" to ensure that treeSkeleton
# can find this stage.
The metadata required by the treeSkeleton
class.
attr(self$stages[[i]], 'child_index') <<- i
attr(self$stages[[i]], 'parent') <<- self
}
self$.parent <- NULL
}
#' Get an environment representing the context directly before executing a given stage.
#'
#' @note If there is a lot of data in the remembered environment, this function
#' may be computationally expensive as it has to create a new environment
#' with a copy of all the relevant data.
#' @param stage_index integer. The substage for which to grab the before
#' environment.
#' @return a fresh new environment representing what would have been in
#' the context as of right before the execution of that substage.
stageRunner_.before_env <- function(stage_index) {
cannot_run_error <- function() {
stop("Cannot run this stage yet because some previous stages have ",
"not been executed.")
}
if (self$with_tracked_environment()) {
# We are using the objectdiff package and its tracked_environment,
# so we have to "roll back" to a previous commit.
current_commit <- paste0(self$.prefix, stage_index)
if (!current_commit %in% names(package_function("objectdiff", "commits")(self$.context))) {
if (`first_commit?`(current_commit)) {
# TODO: (RK) Do this more robustly. This will fail if there is a
# first sub-stageRunner with an empty list as its stages.
package_function("objectdiff", "commit")(self$.context, current_commit)
} else {
cannot_run_error()
}
} else {
package_function("objectdiff", "force_push")(self$.context, current_commit)
}
env <- new.env(parent = package_function("objectdiff", "parent.env.tracked_environment")(self$.context))
copy_env(env, package_function("objectdiff", "environment")(self$.context))
env
} else {
env <- self$stages[[stage_index]]$.cached_env
if (is.null(env)) { cannot_run_error() }
# Restart execution from cache, so set context to the cached environment.
copy_env(self$.context, env)
env
}
}
#' Mark a given stage as being finished.
#'
#' @param stage_index integer. The index of the substage in this stageRunner.
stageRunner_.mark_finished <- function(stage_index) {
node <- treeSkeleton$new(self$stages[[stage_index]])$successor()
if (!is.null(node)) { # Prepare a cache for the future!
if (self$with_tracked_environment()) {
# We assume the head for the tracked_environment is set correctly.
package_function("objectdiff", "commit")(self$.context, node$object$index())
} else {
node$object$.cached_env <- new.env(parent = parent.env(self$.context))
copy_env(node$object$.cached_env, self$.context)
}
} else {
# TODO: Remove this hack used for printing
root <- self$.root()
root$.finished <- TRUE
}
}
#' Determine the root of the stageRunner.
#'
#' @name stageRunner_.root
#' @return the root of the stageRunner
stageRunner_.root <- function() {
treeSkeleton$new(self)$root()$object
}
Stagerunners allow for the option to use a special mode called "next"
. In
this mode, instead of executing by default from the beginning of the
stagerunner, execution will commence from the last non-executed stage.
This allows us to repeatedly call runner$run()
until it has finished
executing, if errors occur during the process and we repeatedly fix them.
#' For stageRunners with caching, find the next unexecuted stage.
#'
#' @name stageRunner_next_stage
#' @return a character stage key giving the next unexecuted stage.
#' If all stages have been executed, this returns \code{FALSE}.
#' If the stageRunner does not have caching enabled, this will
#' always return the first stage key (`'1'`).
stageRunner_next_stage <- function() {
for (stage_index in seq_along(self$stages)) {
We use the stageRunnerNode$was_executed
helper to determine if this
stage has been executed yet.
is_unexecuted_terminal_node <- is.stageRunnerNode(self$stages[[stage_index]]) &&
!self$stages[[stage_index]]$was_executed()
if (is_unexecuted_terminal_node) return(as.character(stage_index))
We can recursively use next_stage
if the current stage is another
stagerunner rather than a terminal node.
has_unexecuted_terminal_node <- is.stagerunner(self$stages[[stage_index]]) &&
is.character(tmp <- self$stages[[stage_index]]$next_stage())
if (has_unexecuted_terminal_node) return(paste(c(stage_index, tmp), collapse = '/'))
}
FALSE
}
Stagerunners remember the full history of their execution. If you have
fifty data preparation steps recorded in a stagerunner and the remember
flag is set to TRUE
, a full copy of the dataset will be made after
each step. This is highly inefficient.
We attempt to solve this problem with a space-time tradeoff: the objectdiff package computes the difference between the environment before and after executing a given stage. By incorporating this package into a stagerunner, we can take slightly more time (by computing differences between environments during the execution of each stage) but save a lot of memory (by only storing patches that record what has changed during each step, rather than a full copy of the data set).
This advanced feature allows stagerunners to remain in-memory, retaining the fast interactive iterate model building process. The downside is either slightly more space or time usage depending on the configuration of the objectdiff package.
We can avoid the problem entirely by doing all of our processing in batches or in-database, but this is outside of the scope of this package. For interactive model development on data sets with less than 1M rows, performance is usually not prohibitive.
# Some features of stageRunner, specifically its interaction with the
# objectdiff package, require some additional setup. We attempt to record
# all of these dependencies in this file.
#' Set all prefixes for child stageRunners.
#'
#' When a stageRunner is used in conjunction with an
#' \code{objectdiff::tracked_environment}, we need to remember
#' the full nested tree structure. This function sets up the
#' \code{prefix} member of each sub-stageRunner recursively to enable
#' correct remembering functionality.
#'
#' @param prefix character. The prefix to assign to this stageRunner.
#' @name stageRunner_.set_prefixes
stageRunner_.set_prefixes <- function(prefix = '') {
self$.prefix <- prefix
for (i in seq_along(self$stages)) {
if (is.stageRunner(self$stages[[i]])) {
self$stages[[i]]$.set_prefixes(paste0(prefix, i, '/'))
}
}
}
`first_commit?` <- function(commit) {
all(strsplit(commit, "/", fixed = TRUE)[[1]] == '1')
}
Overlaying a stagerunner means replacing the terminal nodes with
terminal nodes that do some extra behavior, and can themselves
be full stagerunners. This is subtly different than the around
method, which transforms the terminal node function itself, rather
than turning it from a function to a stagerunner.
For example, if we have a stagerunner like
we may wish to replace each function with a “hidden” mini-runner that runs some tests after each stage.
We can achieve this by passing the stagerunner with the same tree structure
but containing tests in the terminal node as the argument to the main
stagerunner's overlay
method.
#' Overlaying a stageRunner object is taking another stageRunner object
#' with similar stage names and adding the latter's stages as terminal stages
#' to the former (for example, to support tests).
#'
#' @name stageRunner_overlay
#' @param other_runner stageRunner. Another stageRunner from which to overlay.
#' @param label character. The label for the overlayed stageRunner. This refers
#' to the name the former will get wrapped with when appended to the
#' stages of the current stageRunner. For example, if \code{label = 'test'},
#' and a current terminal node is unnamed, it will becomes
#' \code{list(current_node, test = other_runner_node)}.
#' @param flat logical. Whether to use the \code{stageRunner$append} method to
#' overlay, or simply overwrite the given \code{label}. If \code{flat = TRUE},
#' you must supply a \code{label}. The default is \code{flat = FALSE}.
stageRunner_overlay <- function(other_runner, label = NULL, flat = FALSE) {
stopifnot(is.stagerunner(other_runner))
for (stage_index in seq_along(other_runner$stages)) {
name <- names(other_runner$stages)[[stage_index]]
index <-
if (identical(name, '') || identical(name, NULL)) stage_index
else if (name %in% names(self$stages)) name
else stop('Cannot overlay because keys do not match')
self$stages[[index]]$overlay(other_runner$stages[[stage_index]], label, flat)
}
TRUE
}
#' stagerunner: in-memory reproducible data preparation and modeling
#'
#' stagerunner is an attempt to define a notion of data munging that includes
#' \emph{history}. By writing your code as a stagerunner instead of a
#' collection of functions, three key advantages should become clear:
#'
#' \itemize{
#' \item Clarity will emerge in code that is intended to execute a sequence
#' of operations that aims to produce a final result.
#' \item Reproducibility of interactive munging steps is possible without
#' re-executing your analysis from scratch.
#' \item Modularity and extensibility becomes free of charge: methods like
#' \code{around} and \code{transform} allow you to apply the same operation
#' to your entire modeling procedure, simplifying progress monitoring and
#' debugging.
#' }
#'
#' Although originally intended for clarifying the modeling process,
#' stagerunners have much more general applicability. To learn more,
#' begin with the vignettes: \code{browseVignettes(package = "stagerunner")}.
#'
#' @docType package
#' @name stagerunner
#' @import crayon R6
#' @author Robert Krzyzanowski <\url{http://syberia.io}>
#' @seealso The core function in this package: \code{\link{stagerunner}}. It
#' defines the constructor creating stagerunner objects that allow you to
#' wrap a complicated modeling procedure into an organized hierarchy.
#' @references Full documentation and demos: \url{http://robertzk.github.io/stagerunner/};
#' FAQ's: \url{http://robertzk.github.io/stagerunner/faq/}
NULL
Since self
is used all over the place in R6 method definitions,
R CMD CHECK
will yell at us if we do not include the line below.
globalVariables('self')
The heart of a stagerunner object is its run
method, depicted on the
right. A stagerunner consists of two things:
a context: This is an environment object that allows the user to persistently store information between stages. The usual way to build a data pipeline is to provide functions with various inputs and hook them up to functions with various outputs.
This is nice because it is clear what the inputs and outputs will be. However, the disadvantage is that hooking up all the functions can become pretty messy.
In this approach, we let the user set their own conventions for what to place in the context. The advantage is that all stages have the same form, a function taking one argument (the context), and so they become easy to manipulate.
stages: A list of functions or, recursively, other stagerunners. Each function should take precisely one argument: the context described above. If you have some familiarity with pure mathematics, you will know the original inspiration for stagerunners: a stagerunner is a sequence of actions on an environment.
Running a portion of a stagerunner means to execute some of its stages on
its context. For example, suppose we start with an empty environment
context = new.env()
and the following stages:
context <- new.env()
runner <- stagerunner(context, list(
"Set x" = function(e) { e$x <- 1 },
"Double x" = function(e) { e$x <- 2 * e$x }
))
If we write runner$run("Set x")
, then context$x
will become 1
.
If we write runner$run(2)
(a syntactical shortcut), then context$x
becomes 2
. If we write runner$run(2)
again, it will become 4
.
The real advantage of this approach becomes clear when we enable the remember
flag:
context <- new.env()
runner <- stagerunner(remember = TRUE, context, list(
"Import data" = function(e) e$data <- iris,
"Create dependent variable" = function(e) e$dep_var <- e$data[[1]] > 5,
"Create derived variable" = function(e) e$diff <- e$data[[1]] - e$data[[2]]
))
Now, the stagerunner holds a copy of the full environment in each stage: this means we can re-run previous stages at will.
runner$run() # Run all stages
runner$data <- NULL # Clear the data
runner$run(2) # Re-run just the second stage.
In this scenario, the data
gets restored from a cached environment–
what the context looked like after the first stage finished–
and we have a dep_var
column (although no diff
column since
the third stage was now “rolled back”).
This kind of approach also allows us to debug what happens during execution:
envs <- runner$run(2)
ls(envs$before$data) # The iris attributes
ls(envs$after$data) # The iris attributes *and* dep_var
When a stagerunner is set to remember its progress the output of the run
function consists of a list with keys before
and after
representing
two environments: what the stagerunner's context looked like before
and after executing that stage.
#' Run the stages in a stageRunner object.
#'
#' @param from an indexing parameter. Many forms are accepted, but the
#' easiest is the name of the stage. For example, if we have
#' \code{stageRunner$new(context, list(stage_one = some_fn, stage_two = some_other_fn))}
#' then using \code{run('stage_one')} will execute \code{some_fn}.
#' Additional indexing forms are logical (which stages to execute),
#' numeric (which stages to execute by indices), negative (all but the
#' given stages), character (as above), and nested forms of these.
#' The latter refers to instances of the following:
#' \code{stageRunner$new(context, list(stage_one =
#' stageRunner$new(context, substage_one = some_fn, substage_two = other_fn),
#' stage_two = another_fn))}.
#' Here, the following all execute only substage_two:
#' \code{run(list(list(FALSE, TRUE), FALSE))},
#' \code{run(list(list(1, 2)))},
#' \code{run('stage_one/substage_two')},
#' \code{run('one/two')},
#' Notice that substrings are allowed for characters.
#' The default is \code{NULL}, which runs the whole sequences of stages.
#' @param to an indexing parameter. If \code{from} refers to a single stage,
#' attempt to run from that stage to this stage (or, if this one comes first,
#' this stage to that stage). For example, if we have
#' \code{stages = list(a = list(b = 1, c = 2), d = 3, e = list(f = 4, g = 5))}
#' where the numbers are some functions, and we call \code{run} with
#' \code{from = 'a/c'} and \code{to = 'e/f'}, then we would execute
#' stages \code{"a/c", "d", "e/f"}.
#' @param verbose logical. Whether or not to display pretty colored text
#' informing about stage progress.
#' nested list of logicals.
#' @param remember_flag logical. An internal argument used by \code{run}
#' recursively if the \code{stageRunner} object has the \code{remember}
#' field set to \code{TRUE}. If \code{remember_flag} is FALSE, \code{run}
#' will not attempt to restore the context from cache (e.g., if we are
#' executing five stages simultaneously with \code{remember = TRUE},
#' the first stage's context should be restored from cache but none
#' of the remaining stages should).
#' @param mode character. If \code{mode = 'head'}, then by default the
#' \code{from} parameter will be used to execute that stage and that
#' stage only. If \code{mode = 'next'}, then the \code{from} parameter
#' will be used to run (by default, if \code{to} is left missing)
#' from the last successfully executed stage to the stage given by
#' \code{from}. If \code{from} occurs before the last successfully
#' executed stage (say S), the stages will be run from \code{from} to S.
#' @param normalized logical. A convenience recursion performance helper. If
#' \code{TRUE}, stageRunner will assume the \code{from} argument is a
#' nested list of logicals.
Do not worry about this parameter, .depth
.
It is used internally to keep track of how “deep” the current stage execution is.
#' @param .depth integer. Internal parameter for keeping track of nested
#' execution depth.
#' @param ... Any additional arguments to delegate to the \code{stageRunnerNode}
#' object that will execute its own \code{run} method.
#' (See \code{stageRunnerNode$run})
#' @return TRUE or FALSE according as running the stages specified by the
#' \code{from} and \code{to} keys succeeded or failed. If
#' \code{remember = TRUE}, this will instead be a list of the environment
#' before and after executing the aforementioned stages. (This allows
#' comparing what changes were made to the \code{context} during the
#' execution of the stageRunner.)
#' @examples
#' env <- new.env()
#' some_fn <- function(e) e$x <- 1
#' other_fn <- function(e) e$y <- 1
#' another_fn <- function(e) e$z <- 1
#' sr <- stagerunner(env, list(stage_one =
#' stagerunner(env, list(substage_one = some_fn, substage_two = other_fn)),
#' stage_two = another_fn))
#'
#' # Here, the following all execute only substage_two:
#'
#' sr$run(list(list(FALSE, TRUE), FALSE))
#' sr$run(list(list(1, 2)))
#' sr$run('stage_one/substage_two')
#' sr$run('one/two')
#' stopifnot(is.null(env$z), is.null(env$x), identical(env$y, 1))
#'
#' # This will execute all but "stage_one" (i.e., only "stage_two")
#' sr$run(-1)
#' stopifnot(identical(env$z, 1))
run <- function(from = NULL, to = NULL, verbose = FALSE,
remember_flag = getOption("stagerunner.remember", TRUE),
mode = self$.mode, normalized = FALSE, .depth = 1, ...) {
The parameter normalized
refers to whether the input (that is, the from
and to
parameters) are in the canonical nested list format. For example,
if we have a runner with stages “Import”, “Data/impute”, and
“Data/discretize”, the canonical representation for the first substage
of the second stage would be list(FALSE, list(TRUE, FALSE))
. This allows
the stagerunner package to easily tell what is being executed.
If the from
and to
parameters are not in normal form, or the from
parameter is missing and the to
parameter is present (so that we
are asking to run from the beginning to the stage denoted by to
),
we must first normalize the keys to use this nested list format.
We will use the stage_key
local variable to track what substages
to execute during this run
call.
if (identical(normalized, FALSE)) {
if (missing(from) && identical(self$remember, TRUE) && identical(mode, 'next')) {
from <- self$next_stage()
if (missing(to)) to <- TRUE
}
stage_key <- normalize_stage_keys(from, self$stages, to = to)
} else {
stage_key <- from
}
Now that we have determined which stages to run, we cycle through them all. It is up to the user to determine that context changes make sense. We also implicitly sort the stages to ensure linearity is preserved. Stagerunner enforces the linearity and directionality set in the stage definitions.
If we are remembering changes, we must recall what the environment looked like before we ran anything.
before_env <- NULL
for (stage_index in seq_along(stage_key)) {
nested_run <- TRUE
In a stagerunner, recursively nested stages (i.e., stages with substages)
are themselves represented as stagerunners, while final stages
(i.e., the functions to execute) are represented as R6
objects called stageRunnerNode
s. In each scenario, a different
recursive call to $run
will be necessary, so we compute a
closure that gives the correct call for later use.
run_stage <- determine_run_stage(stage_key, stage_index,
self$stages, verbose, .depth)
We keep track of whether this is a nested run so that the verbose display knows whether to say “Beginning stage X” or “Running stage X”.
if (isTRUE(stage_key[[stage_index]]) &&
!is.stagerunner(self$stages[[stage_index]])) {
nested_run <- FALSE
}
The above helper run_stage
will return an object of class next_stage
if we should skip this stage (i.e., because stage_key[[stage_index]]
is FALSE
).
if (is(run_stage, "next_stage")) next
Display a nice message telling us which stage we are currently executing.
display_message <- isTRUE(verbose) && contains_true(stage_key[[stage_index]])
if (display_message) {
show_message(names(self$stages), stage_index, begin = TRUE,
nested = nested_run, depth = .depth)
}
If remember = TRUE
, we have to cache the progress along each stage.
if (self$remember && isTRUE(remember_flag) && is.null(before_env)) {
If we have not determined what the environment on the stagerunner was like prior to running any stages, we do so now. This will eventually be returned by this function, so that the user can inspect what happened before and after all the desired stages were executed.
if (nested_run) {
If this is a nested stage, we grab the “initial environment” recursively.
before_env <- run_stage(..., remember_flag = TRUE)$before
} else {
Otherwise, if it is a terminal node, we just make a copy of the current context.
before_env <- self$.before_env(stage_index)
}
If the current stage is a terminal node,
execute the stage (if it was nested, it's already been
executed in order to recursively fetch the initial environment,
before_env
).
if (!nested_run) { run_stage(...) }
}
else if (self$remember) { run_stage(..., remember_flag = remember_flag) }
else { run_stage(...) }
When we're done running a stage (i.e., processing a terminal node),
set the cache on the successor node to be the current context
(since that node will execute starting with what's in the context now –
this also ensures that running that node with a separate call to
$run
will not bump into a “you haven't executed this stage yet” error).
if (self$remember && isTRUE(remember_flag) && !nested_run) {
self$.mark_finished(stage_index)
}
Finally, display our progress by indicating we are ending this stage.
if (display_message) {
show_message(names(self$stages), stage_index, begin = FALSE,
nested = nested_run, depth = .depth)
}
}
If the stagerunner is a remembering stagerunner, i.e., the field
remember = TRUE
, we will return a list with keys before
and after
indicating what the stagerunner's context looked like before and after
executing the stages indicated by the from
and to
parameters.
This allows the user to perform their own analysis about what happened.
Otherwise, we simply return TRUE
(invisibly).
if (self$remember && isTRUE(remember_flag)) {
list(before = before_env, after = self$.context)
} else {
invisible(TRUE)
}
}
This is a helper function to call $run
correctly if we are recursively
executing substages:
stageRunnerNode$run
method directly.
determine_run_stage <- function(stage_key, stage_index, stages, verbose, .depth) {
if (isTRUE(stage_key[[stage_index]])) {
stage <- stages[[stage_index]]
if (is.stagerunner(stage)) {
function(...) { stage$run(verbose = verbose, .depth = .depth + 1, ...) }
} else {
nested_run <- FALSE
Intercept the remember_flag
argument to calls to the stageRunnerNode
(since it doesn't know how to use it).
function(..., remember_flag = TRUE) { stage$run(...) }
}
} else if (is.list(stage_key[[stage_index]])) {
function(...) {
stages[[stage_index]]$run(stage_key[[stage_index]], normalized = TRUE,
verbose = verbose, .depth = .depth + 1, ...)
}
} else {
structure(list(), class = "next_stage")
}
}
Printing a stagerunner should show information about:
We choose the following notation:
A caching stageRunner with 4 stages:
+ import
* data
+ impute variable
* discretize variable
- train model
Context <environment: 0x101726640>
The +
indicates the stage has been executed successfully; *
indicates
it is currently being executed; and -
means the stage has not yet
been executed. If remember = FALSE
, this information is not available,
so we only use this prefix notation for “caching stagerunners” (those
with remember = TRUE
).
#' Generic for printing stageRunner objects.
#'
#' @name stageRunner_show
#' @param indent integer. Internal parameter for keeping track of nested
#' indentation level.
stageRunner_show <- function(indent = 0) {
if (missing(indent)) {
sum_stages <- function(x) sum(vapply(x,
function(x) if (is.stagerunner(x)) sum_stages(x$stages) else 1L, integer(1)))
caching <- if (self$remember) " caching"
cat("A", caching, " stageRunner with ",
sum_stages(self$stages), " stages:\n", sep = '')
}
stage_names <- names(self$stages) %||% rep("", length(self$stages))
for (index in seq_along(stage_names)) {
prefix <- paste0(rep(' ', (if (is.numeric(indent)) indent else 0) + 1), collapse = '')
currently_executing_this_stage <- self$remember && began_stage(self$stages[[index]])
if (currently_executing_this_stage) {
next_stage <- treeSkeleton$new(self$stages[[index]])$last_leaf()$successor()$object
if (( is.null(next_stage) && !self$.root()$.finished) ||
(!is.null(next_stage) && !began_stage(next_stage)))
marker <- '*' # Use a * if this is the next stage to be executed
# TODO: Fix the bug where we are unable to tell if the last stage
# finished without a .finished internal field.
# We need to look at and set predecessors, not successors.
else {
marker <- '+' # Other use a + for completely executed stage
}
} else {
marker <- '-'
}
prefix <- gsub('.$', marker, prefix)
if (is.na(stage_names[[index]]) || stage_names[[index]] == "") {
stage_name <- paste0("< Unnamed (stage ", index, ") >")
} else {
stage_name <- stage_names[[index]]
}
cat(prefix, stage_name, "\n")
if (is.stagerunner(self$stages[[index]])) {
self$stages[[index]]$show(indent = indent + 1)
}
}
if (missing(indent)) {
cat('Context ')
print(self$.context)
}
NULL
}
# A helper function for determining if a stage has been run yet.
began_stage <- function(stage) {
if (is.stagerunner(stage)) {
any(vapply(stage$stages, began_stage, logical(1)))
} else if (is.stageRunnerNode(stage)) {
node <- treeSkeleton(stage)$predecessor()$object
is.null(node) || node$executed
}
}
#' @export
print.stageRunner <- function(x, ...) {
x$show(...)
}
#' @export
print.stageRunnerNode <- function(x, ...) {
x$show(...)
}
#' Retrieve a flattened list of canonical stage names for a stageRunner object
#'
#' For example, if we have stages
#' \code{stages = list(a = list(b = 1, c = 2), d = 3, e = list(f = 4, g = 5))}
#' then this method would return
#' \code{list('a/b', 'a/c', 'd', 'e/f', 'e/g')}
#'
#' @name stageRunner_stage_names
#' @return a list of canonical stage names.
# # @examples
# # f <- function() {}
# # sr <- stageRunner$new(new.env(),
# # list(a = stageRunner$new(new.env(), list(b = f, c = f)), d = f,
# # e = stageRunner$new(new.env(), list(f = f, g = f))))
# # sr$stage_names()
stageRunner_stage_names <- function() {
nested_stages <- function(x) {
if (is.stagerunner(x)) {
lapply(x$stages, nested_stages)
} else {
x
}
}
nested_names(lapply(self$stages, nested_stages))
}
#' Delimited names of a nested list.
#'
#' Unnamed values will use index number instead.
#'
#' @name nested_names
#' @param el list.
#' @param delim character. The delimiter with which to separate nested names.
#' @param prefix character. A prefix to every name.
#' @return a list of nested names
#' @examples
#' stagerunner:::nested_names(list(a = list(b = 1, c = list(d = 2, e = 3)), f = 4, 5))
#' # c('a/b', 'a/c/d', 'a/c/e', 'f', '3')
#' stagerunner:::nested_names(list(a = list(b = 1, c = 2), d = 2), delim = ' ', prefix = '#')
#' # c('#a b', '#a c', '#d')
nested_names <- function(el, delim = '/', prefix = '') {
list_names <- names(el) %||% rep("", length(el))
Reduce(c, lapply(seq_along(el), function(index) {
name <- if (list_names[[index]] == "") as.character(index)
else list_names[[index]]
paste0(prefix,
if (is.list(el[[index]])) {
paste0(name, delim, nested_names(el[[index]], delim = delim, prefix = ''))
} else name)
}))
}
Straightforwardly, apply some function (transformation
)
to every terminal node in the stagerunner. This is useful for
simple debugging and monitoring. For example, if we wish to
print the variables currently in the context of stagerunner
prior to executing each stage, we can call
runner$transform(function(fn) {
function(context, ...) {
print(ls(context))
fn(context, ...)
}
})
#' Transform the callable's of the terminal nodes of a stageRunner.
#'
#' Every terminal node in a stageRunner is of type stageRunnerNode.
#' These each have a callable, and this method transforms those
#' callables in the way given by the first argument.
#'
#' @name stageRunner_transform
#' @param transformation function. The function which transforms one callable
#' into another.
stageRunner_transform <- function(transformation) {
for (stage_index in seq_along(self$stages)) {
self$stages[[stage_index]]$transform(transformation)
}
self
}
#' @include stagerunner-initialize.R stagerunner-run.R stagerunner-around.R
#' stagerunner-coalesce.R stagerunner-overlay.R stagerunner-transform.R
#' stagerunner-append.R stagerunner-stage_names.R stagerunner-current_stage.R
#' stagerunner-next_stage.R stagerunner-show.R stagerunner-has_key.R
#' stagerunner-internal.R
#' stageRunnerNode.R
NULL
We use R6 instead of the built-in reference classes for several reasons.
A stagerunner is clearly represented as a reference object, rather than an S3 or S4 class, as it is by nature highly mutable: every stage execution triggers updates of the corresponding stage caches.
A stagerunner is primarly defined by its context and its stages. The former is an environment (or when used in conjunction with objectdiff, a \code{\link[objectdiff]{tracked_environment}} that holds the current state of the stagerunner.
A stagerunner's stages are a nested list of either stageRunnerNode
s
(wrappers for functions) or more stagerunners, the latter if we wish to
group together logically bound collections of functions (like a data
preparation procedure or a sequence of modeling steps).
#' Stagerunners are parametrized sequences of linear execution.
#'
#' @name stageRunner
#' @format NULL
#' @docType class
stageRunner_ <- R6::R6Class('stageRunner',
active = list(context = function() self$.context),
public = list(
.context = NULL,
stages = list(),
remember = FALSE,
.mode = "head",
.parent = NULL,
.finished = FALSE,
.prefix = "",
initialize = stagerunner_initialize,
run = run,
around = stageRunner_around,
coalesce = stageRunner_coalesce,
overlay = stageRunner_overlay,
transform = stageRunner_transform,
append = stageRunner_append,
stage_names = stageRunner_stage_names,
parent = function() { self$.parent },
children = function() { self$stages },
current_stage = stageRunner_current_stage,
next_stage = stageRunner_next_stage,
show = stageRunner_show,
has_key = stageRunner_has_key,
mode = function() { self$mode },
.set_parents = stageRunner_.set_parents,
.clear_cache = stageRunner_.clear_cache,
.root = stageRunner_.root,
# objectdiff intertwined functionality
.set_prefixes = stageRunner_.set_prefixes,
.before_env = stageRunner_.before_env,
.mark_finished = stageRunner_.mark_finished,
with_tracked_environment = function() {
out <- is(self$context, 'tracked_environment')
if (out) { requireNamespace("objectdiff", quietly = TRUE) }
out
}
)
)
A little trick to ensure that a stagerunner can be constructed both as
stagerunner(...) and stagerunner$new(...)
.
#' @rdname stageRunner
#' @param ... Arguments to pass to stagerunner initialization.
#' @export
stageRunner <- structure(
function(...) { stageRunner_$new(...) },
class = "stageRunner_"
)
#' @export
#' @rdname stageRunner
stagerunner <- stageRunner
To make the above trick work, we need to prevent access to everything except
new
.
#' @export
`$.stageRunner_` <- function(...) {
stopifnot(identical(..2, "new"))
..1
}
#' Check whether an R object is a stageRunner object
#'
#' @export
#' @param obj any object.
#' @return \code{TRUE} if the object is of class
#' \code{stageRunner}, \code{FALSE} otherwise.
is.stagerunner <- function(obj) inherits(obj, 'stageRunner')
#' @rdname is.stagerunner
#' @export
is.stageRunner <- is.stagerunner
#' Wrap a stageRunnerNode callable with another callable.
#'
#' @param other_node stagerunner or stageRunnerNode.
#' @return \code{TRUE} or \code{FALSE} according as the wrapping was
#' successful.
#' @examples \dontrun{
#' node1 <- stageRunnerNode(function(e) print(2))
#' node2 <- stageRunnerNode(function(e) { print(1); yield(); print(3); })
#' node1$around(node2)
#' node1$run() # Will print 1 2 3
#' # Notice the provided "yield" keyword, which allows calling the
#' # node that is being wrapped.
#' }
stageRunnerNode_around <- function(other_node) {
if (is.stageRunnerNode(other_node)) {
other_node <- other_node$callable
}
if (is.null(other_node)) {
return(FALSE)
}
if (!is.function(other_node)) {
warning("Cannot apply stageRunner$around in a terminal ",
"node except with a function. Instead, I got a ",
class(other_node)[1])
return(FALSE)
}
new_callable <- other_node
We inject the yield
keyword.
environment(new_callable) <- list2env(parent = environment(new_callable), list(
.parent_context = self,
yield = around_yield(self$callable)
))
self$callable <- new_callable
TRUE
}
around_yield <- function(callable) {
Constructing the yield keyword is a little bit messy. We want to pass
the exact same parameters as the call to the original callable, so
can grab ...
from two frames up. However, since we must also
provide the function we are invoking with yield
(i.e., the callable)
,
we have in effect two different kinds of injections.
yield <- function() {
# ... lives up two frames, but the run function lives up 1,
# so we have to do something ugly
run <- eval.parent(quote(.parent_context$run))
args <- append(eval.parent(quote(list(...)), n = 2),
list(.callable = callable))
do.call(run, args, envir = parent.frame())
}
We don't need anything except the base environment for the body of
the yield
keyword itself.
environment(yield) <- list2env(list(callable = callable), parent = baseenv())
yield
}
Consider the following runner.
Imagine we wish to add some assertions at the end of each stage, like ensuring that data was in fact imported and that munging performed some necessary operations.
We can replace each function in the above four stages with a
stageRunner consisting of the original function and a new “assertion”
function. This is precisely the job of the around
method on
stageRunnerNode
s.
#' Append one stageRunnerNode around another.
#'
#' @param other_node stagerunner or stageRunnerNode.
#' @param label character. Under the hood, this will be the "stage name"
#' for the stage represented by the \code{other_node} in the
#' automatically generated new stageRunner used as this node's
#' callable (assuming \code{flat} is \code{FALSE}).
#' @param flat logical. If \code{TRUE}.
#' @return \code{TRUE} or \code{FALSE} according as the wrapping was
#' successful.
#' @examples \dontrun{
#' node1 <- stageRunnerNode(function(e) print(1))
#' node2 <- stageRunnerNode(function(e) print(2))
#' node1$overlay(node2)
#' node1$run() # Will print 1 2
#' }
stageRunnerNode_overlay <- function(other_node, label = NULL, flat = FALSE) {
if (is.stageRunnerNode(other_node)) {
other_node <- other_node$callable
}
if (is.null(other_node)) {
return(FALSE)
}
if (!is.stagerunner(other_node)) {
other_node <- stageRunner$new(self$.context, other_node)
}
# Coerce the current callable object to a stageRunner so that
# we can append the other_node's stageRunner.
if (!is.stagerunner(self$callable)) {
self$callable <- stageRunner$new(self$.context, self$callable)
}
# TODO: Fancier merging here
if (isTRUE(flat)) {
if (!is.character(label)) stop("flat coalescing needs a label")
self$callable$stages[[label]] <- other_node
} else {
self$callable$append(other_node, label)
}
}
#' Execute the callable of a stageRunnerNode.
#'
#' @param ... additional arguments to the \code{callable}. This allows,
#' stagerunner stages to be uniformly parametrized (for example,
#' if all stages should have a \code{verbose} parameter.
#' @param .cached_env An internal helper that passes the cached environment
#' to be used for storing the results of this execution.
#' @param .callable Another internal helper used for some recursive
#' metaprogramming.
#' @return \code{TRUE} if the execution was successful, or an error otherwise.
stageRunnerNode_run <- function(..., .cached_env = NULL, .callable = self$callable) {
# TODO: Clean this up by using environment injection utility fn
correct_cache <- .cached_env %||% self$.cached_env
if (is.null(.callable)) {
FALSE
} else if (is.stagerunner(.callable)) {
.callable$run(..., .cached_env = correct_cache)
} else {
If we are executing a function, we inject the \code{cached_env} into the environment for use by, e.g., testing functions. Ideally, the callable should be able to determine what the state of the runner looked like before execution.
environment(.callable) <- list2env(
list(cached_env = correct_cache), parent = environment(.callable)
)
But once this function finishes executing, restore the environment of
the callable to its former glory (i.e., remove the cached_env
).
on.exit(environment(.callable) <- parent.env(environment(.callable)))
.callable(self$.context, ...)
}
self$executed <- TRUE
}
This helper method is useful when we want to apply some transformation to all terminal nodes of a stageRunner. You can think of it as \code{\link{rapply}} for stageRunners.
#' Transform a stageRunnerNode according to a functional.
#'
#' @param transformation function. An arity-1 function which takes the
#' \code{callable} of a \code{stageRunnerNode} and transforms it
#' into another callable (i.e. a function or a stagerunner). If the
#' original \code{callable} is a stagerunner, its terminal nodes in
#' turn will be transformed recursively.
#' @return The transformed callable.
#' @examples \dontrun{
#' increment <- 1
#' adder <- function(x) x + increment
#' node <- stageRunnerNode$new(function(e) print(adder(1)))
#' node$transform(function(fn) {
#' environment(fn)$increment <- environment(fn)$increment + 1; fn
#' })
#' node$run() # Prints 3, rather than 2
#' }
stageRunnerNode_transform <- function(transformation) {
if (is.stagerunner(self$callable)) {
self$callable$transform(transformation)
} else {
self$callable <- transformation(self$callable)
}
}
In order to give us more flexibility on the terminal nodes of a
stagerunner (the actual functions that will be executed on the
stagerunner's context
), we wrap them in an R6 class called a
[stageRunnerNode]
. This will be extremely useful if we wish to
dynamically allow our stagerunners to be extended or wrapped with
functionality.
For example, if we have a runner such as
we might want to run tests for each stage. To do so, we can replace
each terminal node, a function, with a stagerunner consisting of
two functions. We can do this with the overlay
helper method:
runner$overlay(test_runner)
Here, test_runner
is another stagerunner with the exact same
structure as our main runner, but with testing functions in its
terminal nodes.
#' Stagerunner nodes are environment wrappers around individual stages
#' (i.e. functions) in order to track meta-data (e.g., for caching).
#'
#' @name stageRunnerNode
#' @format NULL
#' @docType class
stageRunnerNode_ <- R6::R6Class('stageRunnerNode',
public = list(
callable = NULL,
.cached_env = NULL,
.context = NULL,
.parent = NULL,
executed = FALSE,
initialize = function(.callable, .context = NULL) {
stopifnot(is_any(.callable, c('stageRunner', 'function', 'NULL')))
self$callable <- .callable
self$.context <- .context
self$executed <- FALSE
},
run = stageRunnerNode_run,
around = stageRunnerNode_around,
overlay = stageRunnerNode_overlay,
transform = stageRunnerNode_transform,
was_executed = function() { self$executed },
parent = function() { attr(self, "parent") },
children = function() list(),
show = function() { cat("A stageRunner node containing: \n"); print(self$callable) },
# Functions which intertwine with the objectdiff package
index = function() {
ix <- which(vapply(attr(self, "parent")$stages,
function(x) identical(self, x), logical(1)))
paste0(attr(self, "parent")$.prefix, ix)
}
)
)
#' @export
stageRunnerNode <- structure(
function(...) { stageRunnerNode_$new(...) },
class = "stageRunnerNode_"
)
#' @export
`$.stageRunnerNode_` <- function(...) {
stopifnot(identical(..2, "new"))
..1
}
#' @param obj ANY. An object to test for class \code{stageRunnerNode}.
#' @export
#' @rdname stageRunnerNode
is.stageRunnerNode <- function(obj) inherits(obj, 'stageRunnerNode')
#' Initialize a treeSkeleton object.
#'
#' treeSkeleton objects allow you to traverse a reference class object
#' as if it had a tree structure, merely by knowing how to call parent
#' or child nodes.
#'
#' @name treeSkeleton__initialize
#' @param object ANY. If a reference class object, then \code{parent_caller}
#' and \code{children_caller} will refer to reference class methods.
#' If an attribute on the object with names of \code{children_caller} and
#' \code{parent_caller} exists, those will be used. Otherwise, the
#' generic methods will be used.
#' @param parent_caller character. The name of the reference class method
#' that returns the parent object, if the object was a node in a tree
#' structure.
#' @param children_caller character. The name of the reference class method
#' that returns the child objects, if the object was a node in a tree
#' structure.
#' @return a treeSkeleton object.
treeSkeleton__initialize <- function(object, parent_caller = 'parent',
children_caller = 'children') {
stopifnot(!is.null(object))
self$object <- object
self$.parent <- uninitialized_field()
self$.children <- uninitialized_field()
# Make sure parent_caller and children_caller are methods of object
if (inherits(object, "R6")) {
stopifnot(all(c(parent_caller, children_caller) %in% ls(object)))
}
self$parent_caller <- parent_caller
self$children_caller <- children_caller
NULL
}
#' Find the index of the current object in the children of its parent.
#' @name treeSkeleton__.parent_index
treeSkeleton__.parent_index <- function() {
if (!is.null(ci <- attr(self$object, 'child_index'))) ci
# Hack for accessing attribute modifications on a reference class object
# See: http://stackoverflow.com/questions/22752021/why-is-r-capricious-in-its-use-of-attributes-on-reference-class-objects
else if (inherits(self$object, 'refClass') && !inherits(self$object, 'R6') &&
!is.null(ci <- attr(attr(self$object, '.xData')$.self, 'child_index'))) ci
else # look through the parent's children and compare to .self
# Danger Will Robinson! This will lead to strange bugs if our tree
# has several nodes with duplicate objects
which(vapply(
self$parent()$children(),
function(node) identical(node$object, self$object), logical(1)))[1]
}
#' Attempt to find the predecessor of the current node.
#'
#' @name treeSkeleton__predecessor
#' @param index integer. If specified, this is the index of the current node
#' in the children of its parent. (Sometimes, this cannot be computed
#' automatically, and should usually be provided.)
#' @return predecessor for the wrapped object.
treeSkeleton__predecessor<- function(index = NULL) {
We define the predecessor of the root node as NULL
.
if (is.null(p <- self$parent())) return(NULL)
parent_index <- if (is.null(index)) self$.parent_index() else index
stopifnot(is.finite(parent_index))
If we are the first leaf in the list of our parent's children, our predecessor is our parent's successor
if (parent_index == 1) {
p$predecessor()
} else {
Otherwise, the predecessor is the last leaf of the previous child.
p$children()[[parent_index - 1]]$last_leaf()
}
}
#' Attempt to find the successor of the current node.
#'
#' @name treeSkeleton__successor
#' @param index integer. If specified, this is the index of the current node
#' in the children of its parent. (Sometimes, this cannot be computed
#' automatically, and should usually be provided.)
#' @return successor for the wrapped object.
treeSkeleton__successor <- function(index = NULL) {
We define the successor of the root node as NULL
.
if (is.null(p <- self$parent())) return(NULL) # no successor of root node
parent_index <- if (is.null(index)) self$.parent_index() else index
stopifnot(is.finite(parent_index))
If we are the last leaf in the list of our parent's children, our successor is our parent's successor
if (parent_index == length(p$children())) {
p$successor()
} else {
Otherwise, the successor is the first leaf of the next child node.
p$children()[[parent_index + 1]]$first_leaf()
}
}
Stagerunners are tree structures) and come with a natural set of operations, like taking the predecessor, successor, and root of a node. However, these are not entirely simple to implement in a manner that is implementation-independent.
Specifically, we recognize that the notion of a node successor and predecessor is implementation agnostic as long as we have access to class methods that provide access to a node's parent and children. In this case, we can write an implementation-agnostic version that works regardless of whether the object is an S3, S4, or R6 object.
#' @include treeSkeleton-initialize.R treeSkeleton-predecessor.R
#' treeSkeleton-successor.R treeSkeleton-parent_index.R
NULL
#' Find the root node of the tree (the only one with no parent).
#'
#' @name treeSkeleton__root
#' @return The root node of the tree or NULL if empty tree.
treeSkeleton__root <- function() {
if (is.null(self$parent())) self
else self$parent()
}
#' Find the first leaf in a tree.
#'
#' @name treeSkeleton__first_leaf
#' @return The first leaf, that is, the first terminal child node.
treeSkeleton__first_leaf <- function() {
if (length(self$children()) == 0) self
else self$children()[[1]]$first_leaf()
}
#' Find the last leaf in a tree.
#'
#' @name treeSkeleton__last_leaf
#' @return The last leaf, that is, the last terminal child node.
treeSkeleton__last_leaf <- function() {
if (length(childs <- self$children()) == 0) self
else childs[[length(childs)]]$last_leaf()
}
#' Find the parent of the current object wrapped in a treeSkeleton.
#' @name treeSkeleton__parent
treeSkeleton__parent <- function() {
if (!is.unitialized_field(self$.parent)) return(self$.parent)
self$.parent <-
if (is.null(obj <- OOP_type_independent_method(self$object, self$parent_caller))) NULL
else treeSkeleton$new(obj, parent_caller = self$parent_caller,
children_caller = self$children_caller)
}
#' Find the children of the current object wrapped in treeSkeletons.
#' @name treeSkeleton__children
treeSkeleton__children <- function() {
if (!is.unitialized_field(self$.children)) return(self$.children)
prechildren <- OOP_type_independent_method(self$object, self$children_caller)
self$.children <- lapply(prechildren, treeSkeleton$new,
parent_caller = self$parent_caller)
}
#' Find the key with the given index using the names of the lists
#' that parametrize each node's children.
#'
#' For example, if our tree structure is given by
#' \code{list(a = list(b = 1, c = 2))}
#' then calling \code{find('a/b')} on the root node will return \code{1}.
#'
#' @name treeSkeleton__find
#' @param key character. The key to find in the given tree structure,
#' whether nodes are named by their name in the \code{children()}
#' list. Numeric indices can be used to refer to unnamed nodes.
#' For example, if key is \code{a/2/b}, this method would try to find
#' the current node's child \code{a}'s second child's \code{b} child.
#' (Just look at the examples).
#' @return the subtree or terminal node with the given key.
#' @examples
#' \dontrun{
#' sr <- stageRunner$new(new.env(), list(a = list(force, list(b = function(x) x + 1))))
#' stagerunner:::treeSkeleton$new(sr)$find('a/2/b') # function(x) x + 1
#' }
treeSkeleton__find <- function(key) {
Currently out of service! Will be back shortly.
# stopifnot(is.character(key))
# if (length(key) == 0 || identical(key, '')) return(self$object)
# # Extract "foo" from "foo/bar/baz"
# subkey <- regmatches(key, regexec('^[^/]+', key))[[1]]
# key_remainder <- substr(key, nchar(subkey) + 2, nchar(key))
# if (grepl('^[0-9]+', subkey)) {
# subkey <- as.integer(subkey)
# key_falls_within_children <- length(self$children()) >= subkey
# stopifnot(key_falls_within_children)
# } else {
# matches <- grepl(subkey, names(self$children()))
# stopifnot(length(matches) == 1)
# key <- which(matches)
# }
# self$children()[[key]]$find(key_remainder)
}
#' This class implements iterators for a tree-based structure
#' without an actual underlying tree.
#'
#' In other dynamic languages, this kind of behavior would be called
#' duck typing. Imagine we have an object \code{x} that is of some
#' reference class. This object has a tree structure, and each node
#' in the tree has a parent and children. However, the methods to
#' fetch a node's parent or its children may have arbitrary names.
#' These names are stored in \code{treeSkeleton}'s \code{parent_caller}
#' and \code{children_caller} fields. Thus, if \code{x$methods()}
#' refers to \code{x}'s children and \code{x$parent_method()} refers
#' to \code{x}'s parent, we could define a \code{treeSkeleton} for
#' \code{x} by writing \code{treeSkeleton$new(x, 'parent_method', 'methods')}.
#'
#' The iterators on a \code{treeSkeleton} use the standard definition of
#' successor, predecessor, ancestor, etc.
#'
#' @name treeSkeleton
#' @docType class
#' @format NULL
treeSkeleton_ <- R6::R6Class('treeSkeleton',
public = list(
object = 'ANY',
As long as we know how to get an objects parent and children, we will be able to determine all the nice derived methods below.
parent_caller = 'character',
children_caller = 'character',
.children = 'ANY',
.parent = 'ANY',
initialize = treeSkeleton__initialize,
successor = treeSkeleton__successor,
predecessor = treeSkeleton__predecessor,
parent = treeSkeleton__parent,
children = treeSkeleton__children,
root = treeSkeleton__root,
first_leaf = treeSkeleton__first_leaf,
last_leaf = treeSkeleton__last_leaf,
find = treeSkeleton__find,
.parent_index = treeSkeleton__.parent_index,
show = function() { cat("treeSkeleton wrapping:\n"); print(self$object) }
)
)
Some fancy tricks to make treeSkeleton(...)
and treeSkeleton(...)
have the same effect, just like in traditional reference classes.
#' @export
treeSkeleton <- structure(
function(...) { treeSkeleton_$new(...) },
class = "treeSkeleton_"
)
#' @export
`$.treeSkeleton_` <- function(...) {
stopifnot(identical(..2, "new"))
..1
}
uninitialized_field <- function() {
structure(NULL, class = "uninitialized_field")
}
is.unitialized_field <- function(x) {
is(x, "uninitialized_field")
}
A handy little trick from Hadley: this will return the second argument
if the first is NULL
.
`%||%` <- function(x, y) if (is.null(x)) y else x
contains_true <- function(x) {
if (is.list(x)) any(vapply(x, contains_true, logical(1)))
else any(x)
}
all_logical <- function(x) {
is.logical(x) || all(vapply(x,
function(y) if (is.atomic(y)) is.logical(y) else all_logical(y),
logical(1)))
}
A helper function for printing stagerunner execution progress.
as.ordinal <- function(number) {
ordinals <- list('first', 'second', 'third', 'fourth', 'fifth',
'sixth', 'seventh', 'eighth', 'ninth', 'tenth', 'eleventh',
'twelfth', 'thirteenth', 'fourteenth', 'fifteenth',
'sixteenth', 'seventeenth', 'eighteenth', 'nineteenth',
'twentieth')
ext <- c("th", "st", "nd", "rd", rep("th", 6))
ordinals[number][[1]] %||%
paste0(number, ext[[(number %% 10) + 1]])
}
Print some nice messages that tell you what type the stagerunner constructor expects.
enforce_type <- function(value, expected, klass, name = deparse(substitute(value))) {
if (missing(value)) {
stop(sprintf(
"Please provide %s%s.",
articleize(sQuote(crayon::red(name))),
if (missing(klass)) "" else paste( " to a", klass)
))
}
check <- utils::getFromNamespace(paste0("is.", expected), "base")
if (!check(value)) {
stop(sprintf(
"Please pass %s as the %s%s; instead I got a %s.",
articleize(sQuote(crayon::yellow(expected))), dQuote(name),
if (missing(klass)) "" else paste(" for a", klass),
crayon::red(sclass(value))
))
}
}
sclass <- function(obj) { class(obj)[1L] }
articleize <- function(word) {
sprintf("a%s %s", if (is_vowel(first_letter(word))) "n" else "", word)
}
is_vowel <- function(char) {
is.element(char, c("a", "e", "i", "o", "u", "A", "E", "I", "O", "U"))
}
first_letter <- function(word) {
substring(gsub("[^a-zA-Z]|\\[3[0-9]m", "", word), 1, 1)
}
# Whether obj is of any of the given types.
is_any <- function(obj, klasses) {
any(vapply(klasses, inherits, logical(1), x = obj))
}
package_function <- function(pkg, fn) { # for when using :: breaks R CMD check
get(fn, envir = getNamespace(pkg))
}
Used in conjunction with treeSkeleton
so that it works for S3, S4, RC,
and R6 classes.
#' Call a method on an object regardless of its OOP type.
#'
#' @name OOP_type_independent_method
#' @param object any. An R object of variable OOP type (S3, S4, RC, R6).
#' @param method character. The method to call on the \code{object}. If the
#' latter is a reference class, it use the \code{$} operator to access the method.
#' (For example, \code{object$some_method}). If it has an attribute with the name
#' \code{method}, it will use that attribute as the method to call. Otherwise,
#' it will try to fetch a generic with the name \code{method} using \code{get}.
OOP_type_independent_method <- function(object, method) {
if (method %in% names(attributes(object))) {
attr(object, method)
} else if (is.environment(object) && method %in% ls(object)) {
object[[method]]()
} else {
get(method)(object)
}
}
as.list.environment <- function(env) {
out <- base::as.list.environment(env)
lapply(out, function(x) if (is.environment(x)) as.list(x) else x)
}