The following is a naive benchmarking of a couple of R's parallelisation functions with a large data frame.
Problem: apply a function to each row of a data frame. The data frame is >10 million rows.
Here: I subset the data frame into different row numbers to compare the speed of different parallelization options. The results are reported in seconds. The time is measured using R's tictoc function and reports in seconds. There runs are not replicated, so there is no uncertainty provided (to my chargin, but one run with 10 mil rows takes > 2 hours).
Shared memory: 2,3,7
Distributed memory: 1,4,5,6
Progress bars: 1,2,3,5
Splitting data frame: 3,4,5,6
The functions are as follows. Note the following variable names:
In this function the entire data frame is passed to the nodes. Within the node, a 50th (defined by num.splits) of the data frame is looped through. So each node does a 50th of the rows.
library(pbapply)
# what fraction should each node operate on
num.splits <- 50
coord.table[,'query_group'] <- coord.table[1:.N,.(ceiling(1:.N/(.N/num.splits)))]
tic()
# parallelize queries
cl <- makeCluster(kNoCores,outfile=outfile.progress)
# export variables and functions to the cluster
clusterExport(cl, c("osrm.url","int.results.file","coord.table","GetSingleTravelInfo","QueryOSRM"), envir=environment())
# export libraries to the cluster
clusterEvalQ(cl, c(library(httr),library(pbapply),library(data.table)))
# conduct the parallelisation
travel.queries <- pbsapply(seq(1,num.splits),function(j) QueryOSRM(j, osrm.url), cl = cl)
# close the cluster
parallel::clusterSetRNGStream(cl, iseed = 0L)
stopCluster(cl)
toc()
This is the same as above except that we're using shared memory so that the data frame is known to all nodes, rather than being exported to them.
library(pbapply)
# what fraction should each node operate on
num.splits <- 50
coord.table[,'query_group'] <- coord.table[1:.N,.(ceiling(1:.N/(.N/num.splits)))]
tic()
# conduct the parallelisation
travel.queries <- pbsapply(seq(1,num.splits),function(j) QueryOSRM_MC(j, osrm.url, coord.table), cl = as.integer(kNoCores))
toc()
# divide into groups
cut.factor <- 1 # potential of having more nodes than available cores, so it would loop
cuts <- cut(1:nrow(coord.table), kNoCores * cut.factor)
tic()
# conduct the parallelisation
travel.queries <- pbsapply(levels(cuts), function(l) QueryOSRM_cut(coord.table[cuts == l,], osrm.url), cl = as.integer(kNoCores))
toc()
Here we cut the data frame so that each loop operates only on a subset of the data frame
library(doParallel)
library(itertools)
tic()
# initialise the cluster
cl <- makePSOCKcluster(kNoCores)
registerDoParallel(cl)
# conduct the parallelisation
travel.queries <- foreach(m=isplitRows(coord.table, chunks=kNoCores), .combine='cbind',
.packages=c('httr','data.table'), .export=c("QueryOSRM_dopar", "GetSingleTravelInfo")) %dopar% {
QueryOSRM_dopar(m,osrm.url,int.results.file)
}
# end cluster
stopCluster(cl)
toc()
library(doSNOW)
library(itertools)
# if size on cores exceeds available memory, increase the chunk factor
chunk.factor <- 1
chunk.num <- kNoCores * cut.factor
tic()
# init the cluster
cl <- makePSOCKcluster(kNoCores)
registerDoSNOW(cl)
# init the progress bar
pb <- txtProgressBar(max = 100, style = 3)
progress <- function(n) setTxtProgressBar(pb, n)
opts <- list(progress = progress)
# conduct the parallelisation
travel.queries <- foreach(m=isplitRows(coord.table, chunks=chunk.num),
.combine='cbind',
.packages=c('httr','data.table'),
.export=c("QueryOSRM_dopar", "GetSingleTravelInfo"),
.options.snow = opts) %dopar% {
QueryOSRM_dopar(m,osrm.url,int.results.file)
}
# close progress bar
close(pb)
# stop cluster
stopCluster(cl)
toc()
library(doSNOW)
library(itertools)
tic()
# init the cluster
cl <- makePSOCKcluster(kNoCores)
registerDoSNOW(cl)
# conduct the parallelisation
travel.queries <- foreach(m=isplitRows(coord.table, chunks=kNoCores), .combine='cbind',
.packages=c('httr','data.table'),
.export=c("QueryOSRM_dopar", "GetSingleTravelInfo")) %dopar% {
QueryOSRM_dopar(m,osrm.url,int.results.file)
}
# stop cluster
stopCluster(cl)
toc()
library(pbapply)
# what fraction should each node operate on
num.splits <- 50
coord.table[,'query_group'] <- coord.table[1:.N,.(ceiling(1:.N/(.N/num.splits)))]
tic()
# conduct the parallelisation
travel.queries = parallel::mclapply(seq(1,num.splits),function(j) QueryOSRM_MC(j, osrm.url, coord.table), mc.cores = as.integer(kNoCores))
toc()
library(parallel)
library(data.table)
library(tictoc)