Authors:
Madhusudanan Kandasamy and Josiah Samuel
html
The computing industry has continued to evolve in its quest for more and more performance, first opting for systems with higher and higher CPU frequencies and then through parallelism with multi-core and multi-thread processor systems.node
However, the ever increasing workload demands require even more processing power which cannot be satisfied with one large system. So the industry has shifted to scale out solutions using cluster computing software to divide, distribute and synchronize execution across many smaller nodes. This increases power consumption by a large extent, and network overheads continue to rise in proportion to the number of nodes in the system. An alternative solution is to leverage special purpose hardware accelerators to provide additional computational power to individual cluster nodes. This requires fewer cluster nodes, which translates to less deployment complexity and less power consumption in aggregate.git
For scaled-out, distributed computing, Apache Spark™ is the most popular software framework used to develop clustered applications for real-time big data analytics. On the special-purpose hardware acceleration side, NVIDIA’s CUDA GPU is the most popular hardware acceleration for HPC and machine learning.github
In this article, we'll discuss how we're enabling these two technologies, Apache Spark and NVIDIA GPUs, to work together, while also making it easier for the programmer to exploit GPUs under Spark. This article assumes the audience are familiar with Spark and NVIDIA CUDA.shell
Off-loading specific tasks to GPU involves multiple steps:express
The first step in bringing GPUs into the Spark computational framework is to solve the problem of getting the data into a format that the GPU can consume. The fundamental programming abstraction in Spark is the RDD (Resilient Distributed Dataset) which Spark uses to partition data for parallel execution. By design, Spark performs computation on an individual RDD partition array data row by row for every transformation. In order to offload the computation to the GPU and enable the thousands of GPU threads to operate on the RDD partition data in parallel, we need all the data available for GPU simultaneously instead of row by row. Hence, we must convert the row-formatted RDDs to a columnar format for GPU and vice versa for GPU. These data should be made available in off-heap memory as GPU cannot access Java objects directly. For example, we need to accept array pointers as input and output parameters over which the GPU threads are made to perform operations in parallel on different indexes of the array.apache
The GPU must first read the data it will compute on from system memory into its local memory, and then likewise write the result of its computation from its local memory back to system memory. (On POWER8 systems with NVLINK 1.0 support, these data movements can occur with extremely high bandwidth compared to PCI Express. On future POWER9 systems with NVLINK 2.0, the GPU will have full coherent access to all of system memory, further expanding what's possible with GPUs.) With Spark, there is a need to transfer converted columnar RDD data from the CPU to GPU and vice versa for each computation. In some cases, it would be good to cache the data in GPU memory itself similar to Spark’s block manager cache. This will help with iterative processing algorithms like those found in machine learning.app
The Spark application also needs to work in a heterogeneous environment where Java virtual machines (worker nodes) with and without GPU support coexist. If the GPU is attached to a worker node, the executor should be capable of initializing the GPU, loading the native GPU kernel provided by the user and then copying only the relevant partitioned RDD data into the GPU memory. If it succeeds, the executor should perform the GPU computation and copy back the results from the GPU memory to the CPU memory. In the case the executor does not have a GPU, it should run the fallback lambda expression provided by the user and yield the result. We will discuss this in detail in the later sections.less
Normally, GPUs are attached to the machine in PCIe card slots which have much lower bandwidth compared to CPU memory bandwidth. The data transfer time should never outrun the computational gain we get from GPU executions. If data is transferred back and forth between CPU memory and GPU memory too often we will definitely hit this issue, and defeat the purpose of using the GPU accelerator. This is again where NVLINK connections to the host CPU memory on POWER systems will help expand the problem sets that can be tackled with GPUs.ide
To study the performance impact, we compared the CPU and GPU execution time on a Power8 machine (Firestone) with NVidia K80 card by running "Logistic Regression" with the following parameters:
It's evident that the performance gain with respect to the computational time is ~2.5X including the data conversion time (time spent in modifying the row format to columnar format and then moving the data to GPU memory). Then the amount of gain with respect to computational cores after off-loading the task to GPU is multi-fold. In our case, we used 160 cores to get maximum application performance in CPU, however just 4 cores were used when the task was off-loaded to GPU. So the gain in CPU cores is around ~40X.
The GPUEnabler package (available here: https://github.com/IBMSparkGPU/GPUEnabler) provides these capabilities as a user plug-in to the Spark core. User applications can offload workloads to the GPU in a transparent way just by adding this package to its dependency list as follows:
Using SBT: libraryDependencies += "com.ibm" %% "gpu-enabler_2.10" % "1.0.0"
Using Maven: <dependency> <groupId>com.ibm</groupId> <artifactId>gpu-enabler_2.10</artifactId> <version>1.0.0</version> </dependency>
To try this package, here are some prerequisites:
This package adds two "Transformer APIs" (mapExtFunc and CacheGpu) and one "Action API" (reduceExtFunc) to the RDD interface. Running tasks on the GPU involves writing native code and this package provides a way to map the native symbol/functions to Scala objects using CUDAFunction.
Let's see how to write a GPU CUDA kernel for GPUEnabler package. The input/output arguments of the GPU kernel functions should adhere to the following guidelines:
int *numelements
)MapGPUPartitionsRDD
as inputFreeVariables
. This set can be empty or take arguments.CUDAFunction
as constArgs
. This set can be empty or take arguments.The CUDA program needs the nvcc compiler provided by NVIDIA to get the kernel ready. Once the kernel is ready, you can register the native symbols as follows:
// Import needed for the Spark GPU method to be added import com.ibm.gpuenabler.CUDARDDImplicits._ import com.ibm.gpuenabler.CUDAFunction // Load a kernel function from the GPU kernel binary val ptxURL = SparkGPULR.getClass.getResource("/GpuEnablerExamples.pt x") val mapFunction = new CUDAFunction( "multiplyBy2", // Native GPU function to multiple a given no. by 2 and return the result Array("this"), // Input arguments Array("this"), // Output arguments ptxURL) val reduceFunction = new CUDAFunction( "sum", // Native GPU function to sum the input argument and return the result Array("this"), // Input arguments Array("this"), // Output arguments ptxURL)
Once we have the CUDAFunction objects, we can pass them as an argument to either "mapExtFunc" or "reduceExtFunc" APIs so that the functionality corresponding to the native function mapped to the CUDAFunction object gets executed in the GPU using the data partitions referred to by the concerned RDD.
// 1. Apply a transformation. (Multiply all the values of the RDD by 2.) // (Note: Conversion of row based formatting to columnar format for consumption // by the GPU is done internally.) // 2. Trigger a reduction action (sum up all the values and return the result) val output = sc.parallelize(1 to n, 1) .mapExtFunc((x: Int) => 2 * x, mapFunction) .reduceExtFunc((x: Int, y: Int) => x + y, reduceFunction)
This library can also be added to Spark jobs launched through spark-shell
or spark-submit
by using the --packages
command line option. For example, to include it when starting the Spark shell:
$ bin/spark-shell --packages com.ibm:gpu- enabler_2.10:1.0.0
Unlike using --jars
, using --packages
ensures that this library and its dependencies will be added to the classpath. The --packages
argument can also be used with bin/spark-submit
.
The complete program using GPUEnabler can be found under:
The CUDA program used by the above application can be found under:
Follow these steps to run this sample program:
./bin/run-example GpuEnablerExample
The current support for GPU Enabler package is as follows:
To know more about compiling this package from source and to try out the examples, please follow the README.md file that comes along with the package found under https://github.com/IBMSparkGPU/GPUEnabler.
How it all works will continue to be a mystery until you access the source code also published under the github URL above.
We are continuing to improve and enhance the GPUEnabler, and would welcome your feedback and contributions. We'd also like to thank Kazuaki Ishizaki and Randy Swanberg for their contribution to this project.
Here are some of the future enhancements we are working on: