Run C/C++ code on Spark/YARN Cluster


(Maziyar Panahi) #1

Hi @mbredif @lcaraffa

I have successfully tested my simple c/c++ compiled code on Spark on YARN cluster.

Apart what is inside the c/c++ and what it does, the process of executing an external/compiled c/c++ was much easier than I though on the YARN cluster:

  1. Put the already compiled c/c++ code on HDFS to be pushed to all executors
  2. Add the compiled file to the executors inline the code or with a flag (–files) by spark-submit

Sample c code:

#include <stdio.h>

int main (int argc, char *argv[]) {
  char str[100];

  while (1) {
    if (!fgets(str, 100, stdin)) {
      return 0;
    }
    printf("Hello, %s", str);
  }
}

Sample scala code:

val distScript = "hdfs:///user/maziyar/tmp/simple-c"
sc.addFile(distScript)

val names = sc.parallelize(Seq("Don", "Betty", "Sally"))
val piped = names.pipe(Seq("./simple-c"))
piped.collect().map(println(_))

names: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[20] at parallelize at <console>:28
piped: org.apache.spark.rdd.RDD[String] = PipedRDD[21] at pipe at <console>:28
Hello, Don
Hello, Betty
Hello, Sally

Question: Let’s talk about why do you need you c++ code to write back the results to HDFS? Why not collect the results and write it with Spark or just collect and continue with your pipeline? (I am asking because c++ HDFS connection is very painful to compile, shaky, unstable especially in different environments)


(Mathieu Brédif) #2

Thanks for the successful initial tests ! That is promising !

Our intended usecase (3D Delaunay Triangulation of billions or trillions of 3D points) requires the iterative application of a C++ executable (using pipes in a scala spark job) that takes as input an work-in-progress triangulation and a set of points to be inserted and outputs an updated triangulation. The process is iterative in the sense that points are exchanged between the partial triangulation of each partition until everything settles (repeat { insert received points, emit active points } until all partitions emit no points).
Memory wise, a triangulation is composed of 2 things : the input point cloud and the output topology (the vertex indices of all computed tetrahedra). Counting an average number of tetrahedra of 6 times the number of input points and 4 vertices per tetrahedra in 3D, the topology is quite big by itself with 24 vertex ids per input points (which are in billions or trillions !), the input being 3 float or double coordinates per input point.

Thus,

  • collecting (=gathering it to the master) the final or the intermediate triangulations is not an option.
  • We have tried to keep the intermediate triangulations in memory as spark RDDs of the triangulations serialized as strings and write only the final triangulation out to hdfs, but that proved to be very touchy in terms of stability and memory consumption
  • The alternative is to have the spark RDDs be only pointers to the triangulations. We have had success here by mounting the same network directory on all spark executors and having the spark RDD provide the local file path.
  • Our feeling is that we would benefit from making this approach more hadoop-friendly such as reading/writing triangulations to hdfs

We need your input here to design a robust solution and get the most out of multivac without taking it down ;). Should we rather use redis for intermediate results ? Would it help to wrap our executable in python and use pyspark instead of scala spark ? …


(Maziyar Panahi) #3

That is true, collecting that much data to the Driver will bring down the machine which hosts the Driver regardless of its memory.

That is also true as managing memories and partitions manually if it is not optimized will crash the app.

Yes, writing down the data back to HDFS and reading again will solve the memory issue however, the IO on disks especially in the Cloud is going to be slow. (specially on HDFS you have to make sure that you turn down replication to 1 so HDFS doesn’t replicate those data)

If by executable you mean the Spark app itself then it’s always faster and more optimized on Scala as Spark runs on JVM and there is no layer to translate to Python or R. But if you mean to convert the c++ to Spark app because there is similar code/lib in Python, then managing read/write would be much easier (talking to HDFS) but again calling Python script by Pipe will sure results in degrading performance as c++ is much much faster.

Unfortunately, mixing technologies such as MongoDB, Redis, ElasticSearch, etc. with Hadoop even though they all have drivers and proper libraries, is not as efficient when it comes to computations. (normally stuff happen before Hadoop or after Hadoop for those technologies: query form ElasticSearch -> Hadoop computations -> write back to ES)

Question:
So, let’s talk about the C++ code. Imagine one file, one operation, how much data it generates in terms of KB/MB and also, is it Spark who uses the results of C++ output that was written on HDFS by c++ or is it another c++ executor will pick it up? (sorry I am sure you explained in details already, but I didn’t get the points from the file goes to c++ once or the results also comes back to c++)

Files -> Spark-> C++ -> HDFS ->?

Update: I forgot to ask, you mentioned that you have tried. May I know the size of your test data, how long it took, the size of the Spark cluster (number of executors, memory of executors, and cores of executors)?


(Mathieu Brédif) #4

sorry I am sure you explained in details already

No problem :slight_smile:, the pipeline goes as follows (approximately, as I have simplified some steps) :

  • point cloud files are uploaded to HDFS (10MB-10GB each). Let us assume for now that the overall point set is tiled as a preprocess in files with disjoint geographical extents and that we can control the size of each file to optimize the performance of the following steps. (Alternatively, for testing/benchmarking, random point sets may be generated at run time.)
  • a scala/spark job 0 builds a RDD[(Id,String)] of the point cloud file names as a string, affecting an integer tile Id for each file.
  • the scala/spark job 1 takes a RDD[(Id,String)], pipes a c++ program for each input file that reads the point cloud file (from disk?hdfs?), writes a triangulation (to disk? hdfs?) and returns a RDD[(Id,String,List(String))] which gives for each tile Id (_1) the path of the triangulation file (_2) and a set points (_3) to be sent to other tiles (grouped by the target tile Id and serialized as strings, which should be a few MB max)
  • spark is used to do a reduce by key to shuffle the emitted points to their target tiles
  • the scala/spark job 2 takes the RDD[(Id,String,List(String))], pipes a c++ program that reads the triangulation file (_2), and deserializes/inserts the incoming points (_3) in the triangulation. Its output is the same as job1 (the path of the written triangulation file and the string-serialized emitted points)
  • job2 is repeated until all tiles emit no points
  • the output is then the set of triangulation files written at the last iteration

The size of a triangulation file is like 3x the size of the input point file of its tile.


(Mathieu Brédif) #5

I let @lcaraffa answer to this !


(Laurent) #6

Hi!
For the data/size, time, it depends really on your data set.
But for a real-case application, a file is around I would say 10MB to 200MB (this is the size of a Delaunay triangulation stored with some extra information).
Theses files are actually not processed by spark but only send /received as file path.
However, we use spark to process 3D points separately in some case (points that are shared between triangulation)

The triangulation + algorithm can take a lot of memory on the executor : for a 200MB triangulation stored in binary on the disk, we can sometime have like 1GO taken by the c++ app depends of the application. So generally we try to have 1GO / process

Consequently, the “spark.executor.memory” and “spark.driver.memory” are low enough to keep space for the c++ executable

Actually we have 3 computers (72 thread in total, 60GO) 1 master & 3 executor (one computer share master/executor)
I have on my spark call : spark.executor.memory=2g -Dspark.driver.memory=2g
I didn’t motorize seriously the amount of time and size simply because the algorithm wasn’t working after being rewritten from scratch but I’ll do some benchmark next week!


(Maziyar Panahi) #7

So if I understand correctly:

0- point cloud files path ->
  1- Spark -> 
     2- c++ (triangulation files) -> 
         3- HDFS ->
           4- Back to stage 1 until it's over

(Maziyar Panahi) #8

Thanks @lcaraffa, I just wanted to see some estimate on resources. I do appreciate if you can share a simple benchmark whenever you test it.


(Maziyar Panahi) #9

@mbredif @lcaraffa If you guys are free next week we can have a quick working session to see how to compile the c++ with hdfs header in an already compile Hadoop with all the env paths set.
It seems if we can connect the c++ to HDFS in Multivac environment then there is just a matter of testing and optimizing.
We can share notebook for more productivity and also it helps to develop code faster as it keeps the Spark session open. (no need to run Spark every time)


(Maziyar Panahi) #10

I have tested few more times with few more codes whether it is possible to run c/c++ application from Multivac DSL. I can say for sure that it is possible.
Therefore, I am closing this topic in favor of opening another one (how to compile c/c++ code against Multivac native Hadoop library).

Sample c code:

#include <stdio.h>

int main (int argc, char *argv[]) {
  char str[100];

  while (1) {
    if (!fgets(str, 100, stdin)) {
      return 0;
    }
    printf("Hello, %s", str);
  }
}

Sample scala code:

val distScript = "hdfs:///user/maziyar/tmp/simple-c"
sc.addFile(distScript)

val names = sc.parallelize(Seq("Don", "Betty", "Sally"))
val piped = names.pipe(Seq("./simple-c"))
piped.collect().map(println(_))

names: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[20] at parallelize at <console>:28
piped: org.apache.spark.rdd.RDD[String] = PipedRDD[21] at pipe at <console>:28
Hello, Don
Hello, Betty
Hello, Sally

(Maziyar Panahi) #11