Blog do projektu Open Source JavaHotel

niedziela, 9 sierpnia 2015

IBM InfoSphere Streams and power of parallelism

Basic method to improve performance in IBM InfoSphere Streams is to apply parallelism. Unlike any other programming framework the parallelism in Streams can be achieved using very simple method and the result is amazing.
To prove it I created a very simple application calculating FFT (Fast Fourier Transformation) on a series of numbers. Choice of FFT is arbitrary, it is only an example of CPU thirsty method without paying any attention to validity of input and output values.
Source code of the application is available here. It produces series of random numbers (SeriesSource.spl), aggregate them in 512 size buffer and apply FFT (FFTPerform.spl) and consume  the result (SinkFFT.spl). The performance is gauged by measuring number of FFT calculations output flowing to SinkFFT.
All tests were conducted on two VMWare machines (4 virtual CPUs and 4 GB memory).
First step
First version does not contain any parallel calculation.

The performance was 23-25 tuples/sec (number of FFT output failing into SinkFFT).
Second step
It is very easy to detect that bottleneck is FFT operator. The simplest way to execute FFT in parallel is apply UDP (User Define Parallelism) to FFT.

                    @parallel(width = 5)
                    (stream<list<float64> l> Out0) as FFT_2 =
     algorithm : DCT ;
     inputTimeSeries : l ;
     resolution : 1024u ;
     Out0 : l = magnitude() ;

So we have 5 FFT operators running in parallel and performance is 86-86 tuples/ per second.
Third step
Next step is to distribute calculation between two nodes. So we have to create two instances of FFTPerform operator and put them into separate PE (Processing Element).
namespace application ;

use com.testperf.series::SeriesSource ;
use com.testperf.fft::FFTPerform ;
use com.testperf.sink::SinkFFT ;

composite Main
  (stream<float64 num> SeriesSource_1_out0) as SeriesSource_1 = SeriesSource()

  (stream<list<float64> l> FFTPerform_3_out0) as FFTPerform_3 =
    placement : partitionColocation("CC") ;

  () as SinkFFT_5 = SinkFFT(FFTPerform_3_out0, FFTPerform_6_out0)

  (stream<list<float64> l> FFTPerform_6_out0) as FFTPerform_6 =
    placement : partitionColocation("BB") ;

  placement : partitionColocation("AB") ;

We do not have do anything with operator logic, just make a copy of SinkFFT operator in main composite and by using partitionCollocation option put them to separate PE. When application is deployed IBM InfoSphere Streams Scheduler will push them into separate hosts.

Now we have 10 FFT operators running in parallel and distributed evenly between two hosts. The performance is 120-125 tuples per second.

  • The default model for Streams application is maximum parallelism, every operator in a separate PE (process). But is does not make any sense, having random tens or hundreds PE (processes) does not improve performance. So a good starting point is fuse all application into single PE (no parallelism at all) and later introduce parallelism according to design.
  • Firstly we have to decide how to measure performance. In the test application above it was very simple but in case of complex application it requires some effort.
  • Next step is identify bottleneck. In the test application it was obvious but in real application it could require a lot of tests and search.
  • Applying UDP annotation is the a very good method to introduce parallelism. But selecting the optimal parallel level number requires some effort, greater number does not mean better performance. The rule of thumb is that should not exceed the number of cores in the host machine.
  • Parallelism is not limited only to multiplying number of threads conducting a particular task but also spraying execution between different hosts. Identifying which operator or job should be distributed into separate host to achieve maximum performance requires some effort. 

Brak komentarzy:

Prześlij komentarz