Parallel iteration over an indeterminate number of data chunks
bpiterate
iterates over an indeterminate number of data chunks
(e.g., records in a file). Each chunk is processed by parallel workers
in an asynchronous fashion; as each worker finishes it receives a
new chunk. Data are traversed a single time.
bpiterate(ITER, FUN, ..., BPPARAM=bpparam()) ## S4 method for signature 'ANY,ANY,missing' bpiterate(ITER, FUN, ..., BPPARAM=bpparam()) ## S4 method for signature 'ANY,ANY,BatchtoolsParam' bpiterate( ITER, FUN, ..., REDUCE, init, reduce.in.order=FALSE, BPPARAM=bpparam() )
ITER |
A function with no arguments that returns an object to process, generally a chunk of data from a file. When no objects are left (i.e., end of file) it should return NULL and continue to return NULL regardless of the number of times it is invoked after reaching the end of file. This function is run on the master. |
FUN |
A function to process the object returned by |
BPPARAM |
An optional |
REDUCE |
Optional function that combines (reduces)
output from |
init |
Optional initial value for |
reduce.in.order |
Logical. When TRUE, REDUCE is applied to the results from the workers in the same order the tasks were sent out. |
... |
Arguments to other methods, and named arguments for
|
Supported for SnowParam
, MulticoreParam
and
BatchtoolsParam
.
bpiterate
iterates through an unknown number of data
chunks, dispatching chunks to parallel workers as they
become available. In contrast, other bp*apply
functions
such as bplapply
or bpmapply
require the number of
data chunks to be specified ahead of time. This quality makes
bpiterate
useful for iterating through files of unknown length.
ITER
serves up chunks of data until the end of the file
is reached at which point it returns NULL. Note that ITER
should continue to return NULL reguardless of the number of times
it is invoked after reaching the end of the file. FUN
is applied to each object (data chunk) returned by ITER
.
By default, a list
the same length as the number of chunks in
ITER()
. When REDUCE
is used, the return is consistent
with application of the reduction.
Valerie Obenchain mailto:vobencha@fhcrc.org.
bpvec
for parallel, vectorized calculations.
bplapply
for parallel, lapply-like calculations.
BiocParallelParam
for details of BPPARAM
.
BatchtoolsParam
for details of BatchtoolsParam
.
## Not run: if (require(Rsamtools) && require(RNAseqData.HNRNPC.bam.chr14) && require(GenomicAlignments) && require(ShortRead)) { ## ---------------------------------------------------------------------- ## Iterate through a BAM file ## ---------------------------------------------------------------------- ## Select a single file and set 'yieldSize' in the BamFile object. fl <- RNAseqData.HNRNPC.bam.chr14_BAMFILES[[1]] bf <- BamFile(fl, yieldSize = 300000) ## bamIterator() is initialized with a BAM file and returns a function. ## The return function requires no arguments and iterates through the ## file returning data chunks the size of yieldSize. bamIterator <- function(bf) { done <- FALSE if (!isOpen( bf)) open(bf) function() { if (done) return(NULL) yld <- readGAlignments(bf) if (length(yld) == 0L) { close(bf) done <<- TRUE NULL } else yld } } ## FUN counts reads in a region of interest. roi <- GRanges("chr14", IRanges(seq(19e6, 107e6, by = 10e6), width = 10e6)) counter <- function(reads, roi, ...) { countOverlaps(query = roi, subject = reads) } ## Initialize the iterator. ITER <- bamIterator(bf) ## The number of chunks returned by ITER() determines the result length. bpparam <- MulticoreParam(workers = 3) ## bpparam <- BatchtoolsParam(workers = 3), see ?BatchtoolsParam bpiterate(ITER, counter, roi = roi, BPPARAM = bpparam) ## Re-initialize the iterator and combine on the fly with REDUCE: ITER <- bamIterator(bf) bpparam <- MulticoreParam(workers = 3) bpiterate(ITER, counter, REDUCE = sum, roi = roi, BPPARAM = bpparam) ## ---------------------------------------------------------------------- ## Iterate through a FASTA file ## ---------------------------------------------------------------------- ## Set data chunk size with 'n' in the FastqStreamer object. sp <- SolexaPath(system.file('extdata', package = 'ShortRead')) fl <- file.path(analysisPath(sp), "s_1_sequence.txt") ## Create an iterator that returns data chunks the size of 'n'. fastqIterator <- function(fqs) { done <- FALSE if (!isOpen(fqs)) open(fqs) function() { if (done) return(NULL) yld <- yield(fqs) if (length(yld) == 0L) { close(fqs) done <<- TRUE NULL } else yld } } ## The process function summarizes the number of times each sequence occurs. summary <- function(reads, ...) { ShortRead::tables(reads, n = 0)$distribution } ## Create a param. bpparam <- SnowParam(workers = 2) ## Initialize the streamer and iterator. fqs <- FastqStreamer(fl, n = 100) ITER <- fastqIterator(fqs) bpiterate(ITER, summary, BPPARAM = bpparam) ## Results from the workers are combined on the fly when REDUCE is used. ## Collapsing the data in this way can substantially reduce memory ## requirements. fqs <- FastqStreamer(fl, n = 100) ITER <- fastqIterator(fqs) bpiterate(ITER, summary, REDUCE = merge, all = TRUE, BPPARAM = bpparam) } ## End(Not run)
Please choose more modern alternatives, such as Google Chrome or Mozilla Firefox.