seqParallel {SeqArray} | R Documentation |
Applies a user-defined function in parallel.
seqParallel(cl=seqGetParallel(), gdsfile, FUN, split=c("by.variant", "by.sample", "none"), .combine="unlist", .selection.flag=FALSE, .initialize=NULL, .finalize=NULL, .initparam=NULL, .balancing=FALSE, .bl_size=10000L, .bl_progress=FALSE, ...) seqParApply(cl=seqGetParallel(), x, FUN, load.balancing=TRUE, ...)
cl |
|
gdsfile |
a |
FUN |
the function to be applied, should be like
|
split |
split the dataset by variant or sample according to multiple
processes, or "none" for no split; |
.combine |
define a fucntion for combining results from different
processes; by default, |
.selection.flag |
|
.initialize |
a user-defined function for initializing workers, should have two arguments (process_id, param) |
.finalize |
a user-defined function for finalizing workers, should have two arguments (process_id, param) |
.initparam |
parameters passed to |
.balancing |
load balancing if |
.bl_size |
chuck size, the increment for load balancing, 10000 for variants |
.bl_progress |
if |
x |
a vector (atomic or list), passed to |
load.balancing |
if |
... |
optional arguments to |
When cl
is TRUE
or a numeric value, forking techniques are
used to create a new child process as a copy of the current R process, see
?parallel::mcfork
. However, forking is not available on Windows, and
makeCluster
is called to make a cluster which will be
deallocated after calling FUN
.
It is strongly suggested to use seqParallel
together with
seqParallelSetup
. seqParallelSetup
could work around the problem
of forking on Windows, without allocating clusters frequently.
The user-defined function could use two predefined variables
SeqArray:::process_count
and SeqArray:::process_index
to
tell the total number of cluster nodes and which cluster node being used.
seqParallel(, gdsfile=NULL, FUN=..., split="none")
could be used to
setup multiple streams of pseudo-random numbers, and see
nextRNGStream
or nextRNGSubStream
in the package
parallel
.
A vector or list of values.
Xiuwen Zheng
seqSetFilter
, seqGetData
,
seqApply
, seqParallelSetup
,
seqGetParallel
library(parallel) # choose an appropriate cluster size or number of cores seqParallelSetup(2) # the GDS file (gds.fn <- seqExampleFileName("gds")) # display (gdsfile <- seqOpen(gds.fn)) # the uniprocessor version afreq1 <- seqParallel(, gdsfile, FUN = function(f) { seqApply(f, "genotype", as.is="double", FUN=function(x) mean(x==0, na.rm=TRUE)) }, split="by.variant") length(afreq1) summary(afreq1) # run in parallel afreq2 <- seqParallel(, gdsfile, FUN = function(f) { seqApply(f, "genotype", as.is="double", FUN=function(x) mean(x==0, na.rm=TRUE)) }, split="by.variant") length(afreq2) summary(afreq2) # check length(afreq1) # 1348 all(afreq1 == afreq2) ################################################################ # check -- variant splits seqParallel(, gdsfile, FUN = function(f) { v <- seqGetFilter(f) sum(v$variant.sel) }, split="by.variant") # [1] 674 674 ################################################################ seqParallel(, NULL, FUN = function() { paste(SeqArray:::process_index, SeqArray:::process_count, sep=" / ") }, split="none") seqParallel(, NULL, FUN = function() { SeqArray:::process_index }, split="none", .combine=function(i) print(i)) seqParallel(, NULL, FUN = function() { SeqArray:::process_index }, split="none", .combine="+") ################################################################ # close the GDS file seqClose(gdsfile) # clear the parallel cluster seqParallelSetup(FALSE)