Blog do projektu Open Source JavaHotel

niedziela, 30 sierpnia 2015

AQL, BigInsight Text Analytics and Java, Map/Reduce

Introduction
BigInsight Text Analytics is IBM Hadoop add-on, a tool for extracting information from text data by applying a set of rules. The core of BigInsight Text Analytics is AQL, Annotation Query Language. AQL (similar to SQL) allows to develop text analysis bottom-up, starting with basic elements, like dictionaries and tokens and then build up more complicated statements. The final program is called "extractor" which runs over set of input documents and produces a collection of "views" containing desirable information in a structured form. "View" is like a table (relation) in SQL, sequence of rows composed of a list of columns.
Extractor is AQL program consisting of one or more AQL modules. Every module contains one or more source file with AQL statements.
BigInsight 3 provides Eclipse component to developing, testing and publishing AQL extractor. This component is discontinued in BigInsight 4 and replaced by Web Tool. Unfortunately, there is a design gap in BigInsight 4. The capabilities of Web Tool are very limited and if the developer wants to unleash a full power of Text Analytics AQL beyond Web Tool there is no simple way to do so. So I decided to fill this gap.
Java API
Text Analytics AQL engine can be easily leveraged using Java API and thus integrated with any other Big Data solution. So I created a simple project allowing compiling and executing AQL program.
Prerequisites
Install BigInsight 4 Quick Start Edition. Then find systemT.jar (by executing a command: locate systemT.jar).
Test data, analysis of IBM quarterly business reports,  can be created by running Text Analytics tutorial.
Solution description
The solution can be executed in two different ways: as a standalone program and as a MapReduce task over the Hadoop cluster.
It is recommended, depending on the needs, to prepare two runnable jars: RunAQL.jar (main class com.systemt.runaql.RunAQL) and ExtractMapReduce.jar (main class com.systemt.mapreduce.MapReduceMain).
Parameters
Both methods require three parameters: input directory, output directory, configuration file
Example

java -cp RunAQL.jar:RunAQL_lib/*:/opt/ibm/bignsight/hadbi4/conf com.ibm.runaql.RunAQL hdfs://oc5802506750.ibm.com:8020/user/sbartkowski/$INPUTDIR  hdfs://oc5802506750.ibm.com:8020/user/sbartkowski/$OUTDIR /home/sbartkowski/work/bigi/ma/testj.properties
opt/ibm/bignsight/hadbi4/conf points to Hadoop configuration directory.

  • input directory Input directory with text data to be annotated by Text Analytics extractor. For a standalone program, it could be local directory or hdfs directory (starting with hdfs:)
  • output directory Output directory where Text Analytics result will be stored. In case of Map/Re task, a directory should be removed before. For a standalone program, if could be local directory or hdfs directory (starting with hdfs:)
  • configuration file Configuration file containing information about AQL modules to be executed.
Configuration file
Example
out.tam.dir=hdfs://big64:9000/user/sb/tamdir
#input.multi=hdfs://big64:8020/user/sbartkowski/tamdir
in.module.property1=/home/sbartkowski//TestSystemT/resource/test1/aql/metricsIndicator_dictionaries
in.module.property2=/home/sbartkowski/TestSystemT/resource/test1/aql/metricsIndicator_features
in.module.property3=/home/sbartkowski/TestSystemT/resource/test1/aql/metricsIndicator_externalTypes
in.module.property4=/home/sbartkowski/TestSystemT/resource/test1/aql/main
list.modules=metricsIndicator_dictionaries,metricsIndicator_features,main,metricsIndicator_externalTypes
ex.dictionary1=metricsIndicator_dictionaries.abbreviations,/home/sbartkowski/TestSystemT/resource/test1/resources/abbreviation.dict
  • out.tam.dir For standalone program empty. For Map/Re program,  hdfs directory where compiled AQL program (.tam) files are saved and reused by Map/Re tasks.
  • in.module.propertyN Could be more than 1. Directory with AQL source code module. It is always a local directory.
  • list.modules Should correspond to in.module.propertyN value. List of module names separated by a comma. 
  • ex.dictionaryN Pair separated by comma, external dictionary name and external dictionary file. For Map/Reduce program is should be hdfs address.
  • input.multi Y if Multilingual tokenizer should be used. N or ignore otherwise. More details below.
  • input.lang Language of input documents. List of accepted values. Could be omitted if default English is used.
Standard or Multilingual tokenizer
Although Multilingual tokenizer supersedes Standard, the Standard should be used as often as possible. There is a performance penalty attached to Multilingual tokenizer. Multilingual should be used with no-European languages like Hebrew, Chinese or Japanese. Multilingual tokenizer requires additional dependency. More details (point 3).
An example of launching sequence for Multilingual tokenizer
 OUTDIR=outputh
hadoop fs -rm -r -skipTrash $OUTDIR

ADDJAR=ExtractMapReduce_lib/dlt-3.3.0.jar,ExtractMapReduce_lib/uimaj-core-2.3.1.jar,ExtractMapReduce_lib/an_dlt-3.3.0.jar,ExtractMapReduce_lib/systemT-3.3.0.jar
ADDJAR=$ADDJAR,ExtractMapReduce_lib/icu4j-55.1.jar,ExtractMapReduce_lib/tokenizer-3.3.0.jar
hadoop jar ExtractMapReduce.jar -libjars $ADDJAR Macabbi $OUTDIR testh.properties
testh.properties
out.tam.dir=hdfs://nc9128109010.kraklab.pl.ibm.com:9000/user/sb/hedir 
input.multi=Y 
module.property1=Medical,Util 
list.modules1=aql/Medical 
list.modules2=aql/Util 
input.lang=he
An example of launching sequence for Text Analytics tutorial
OUTDIR=output2
hadoop fs -rm -r -skipTrash $OUTDIR
hadoop jar ExtractMapReduce.jar -libjars ExtractMapReduce_lib/systemT-3.3.0.jar inputdata $OUTDIR test.properties
test.properties
out.tam.dir=hdfs://nc9128109010.kraklab.pl.ibm.com:9000/user/sb/tamdir
in.module.property1=test1/aql/metricsIndicator_dictionaries
in.module.property2=test1/aql/metricsIndicator_features
in.module.property3=test1/aql/metricsIndicator_externalTypes
in.module.property4=test1/aql/main
list.modules=metricsIndicator_dictionaries,metricsIndicator_features,main,metricsIndicator_externalTypes
ex.dictionary1=metricsIndicator_dictionaries.abbreviations,hdfs://nc9128109010.kraklab.pl.ibm.com:9000/user/sb/extdic/abbreviation.dict
Running Text Analytics as Map/Reduce
Annotating text data with Text Analytics AQL is perfectly suited for Map/Reduce paradigm. Input data is divided into a collection of text files and Map task annotates one document.  In most cases, Text Analytics runs over a good number relatively small input documents (for instance: Twitter tweets, medical reports etc). The output is a pair: view name (key) and view content (value). The Reduce task consolidates content of a view from all documents. Next, OutputFormat creates the result (just now only CSV format is supported).



Future extensions, known limitations
  • The current version does not support external tables, only external dictionaries are implemented. 
  • Only CSV format is supported. As a feature first line containing the header is desirable. Additional output format: TSV (tab separated values), HBase, Hive or BigSQL (IBM BigInsight add-on) should be added. The main challenge to overcome: how to pass output view column description to the OutputFormat task.
  • It is not necessary to compile AQL source files every time. Compiled .tam file can be created once, stored and reused later, in a separate process.

środa, 19 sierpnia 2015

Pro*C to DB2 Embedded SQL migration, structures

Introduction
Embedded SQL (DB2) and Pro*C (Oracle) allow to use C structure as hosts variable. Instead of enumerating all columns in INSERT or SELECT statement the developer can use name of the structure variable. Implementation of structure in DB2 and Oracle is similar but a little different.
Oracle code
Pro*C Oracle sample code

create table personal_data (id integer, name varchar(100));
int addPersonalData(personal_data data) {

  int failed;
  
  EXEC SQL WHENEVER SQLERROR DO sql_error("Insert personal_data",&failed);  
 
  EXEC SQL INSERT INTO personal_data (id,name) 
      VALUES (:data); 
  if (failed) return 0;
  
  EXEC SQL INSERT INTO personal_data (id,name) 
      VALUES (:data.id,NULL); 
      
  if (failed) return 0;
  return 1;       
}  

int calculatePersonalStat(int *all, int *nullno) {

   int failed;
   personal_data data;
   struct
        {
   short ind_id;
   short ind_name;
    } data_info_ind;
  
   EXEC SQL WHENEVER SQLERROR DO sql_error("calculate personal stat",&failed);  

   EXEC SQL DECLARE persdata CURSOR FOR 
          SELECT ID,NAME 
              FROM personal_data;
       
   if (failed) return 0;       
       
   EXEC SQL OPEN persdata;       
   
   if (failed) return 0;
   
   EXEC SQL WHENEVER NOT FOUND DO break;   
   
   *all = *nullno = 0;
   
    for (;;) 
    { 

        EXEC SQL FETCH  persdata INTO :data INDICATOR :data_info_ind;        
        if (failed) return 0;
 (*all)++;
 if (data_info_ind.ind_name != 0) (*nullno)++; 
    } 
    
    EXEC SQL CLOSE persdata;
   
    return 1;
            
}     
In this code we see usage of structure to FETCH and INSERT data into the table and also structure as INDICATOR host variable.
DB2 code
Embedded SQL sample code, equivalent of Oracle code above

EXEC SQL BEGIN DECLARE SECTION;

typedef struct { 
  int id;
  char name[100];
} personal_data_struct_db2 ; 

EXEC SQL END DECLARE SECTION;

int addPersonalData(personal_data data) {

    struct sqlca sqlca;
  
   EXEC SQL BEGIN DECLARE SECTION;
      personal_data_struct_db2 rec;
   EXEC SQL END DECLARE SECTION;
     int error = 0;
     
   memcpy(&rec,&data, sizeof(rec));
 
  EXEC SQL INSERT INTO personal_data (id,name) 
      VALUES (:rec); 

      
  EVALRESULT("Insert personal rec");
  if (error) return 0;
  
  EXEC SQL INSERT INTO personal_data (id,name) 
      VALUES (10,NULL); 

  EVALRESULT("Insert personal rec with null");
  if (error) return 0;
      
  return 1;       
}  

int calculatePersonalStat(int *all, int *nullno) {

   struct sqlca sqlca;
  
   EXEC SQL BEGIN DECLARE SECTION;
      personal_data_struct_db2 data;
      short data_info_ind[2];
   EXEC SQL END DECLARE SECTION;
     int error = 0;
    
  
//   EXEC SQL WHENEVER SQLERROR DO sql_error("calculate personal stat",&failed);  

   EXEC SQL DECLARE persdata CURSOR FOR 
          SELECT ID,NAME 
              FROM personal_data;
   EVALRESULT("Declare cursor");
   if (error) return 0;
       
   EXEC SQL OPEN persdata;       
   EVALRESULT("Open cursor");
   if (error) return 0;
  
   
   
   *all = *nullno = 0;
   
    for (;;) 
    { 
        EXEC SQL FETCH  persdata INTO :data INDICATOR :data_info_ind;        
        EVALRESULT("Fetch next");
        if (error) return 0;
 if (SQLCODE != 0) break;
 (*all)++;
 if (data_info_ind[1] != 0) (*nullno)++; 
    } 
    
    EXEC SQL CLOSE persdata;
   
    return 1;
            
}         
The DB2 code is similar but there are differences:
  • Structure declaration must be enclosed in EXEC SQL BEGIN DECLARE and EXEC SQL END DECLARE SECTION. Unfortunately it requires duplicating of the structure definiton.
  • Also structure variable should be declared in similar way. Pay attention too memcpy invocation.
  • Also implementation of INDICATOR host variable is different. Istead of structure a table should be used
Conclusion
Despite these differences migration is simple and straightforward. The code should be modifed but no changes in application logic are necessary.

niedziela, 9 sierpnia 2015

IBM InfoSphere Streams and power of parallelism

Introduction
Basic method to improve performance in IBM InfoSphere Streams is to apply parallelism. Unlike any other programming framework the parallelism in Streams can be achieved using very simple method and the result is amazing.
Application
To prove it I created a very simple application calculating FFT (Fast Fourier Transformation) on a series of numbers. Choice of FFT is arbitrary, it is only an example of CPU thirsty method without paying any attention to validity of input and output values.
Source code of the application is available here. It produces series of random numbers (SeriesSource.spl), aggregate them in 512 size buffer and apply FFT (FFTPerform.spl) and consume  the result (SinkFFT.spl). The performance is gauged by measuring number of FFT calculations output flowing to SinkFFT.
All tests were conducted on two VMWare machines (4 virtual CPUs and 4 GB memory).
First step
First version does not contain any parallel calculation.

The performance was 23-25 tuples/sec (number of FFT output failing into SinkFFT).
Second step
It is very easy to detect that bottleneck is FFT operator. The simplest way to execute FFT in parallel is apply UDP (User Define Parallelism) to FFT.

  
                    @parallel(width = 5)
                    (stream<list<float64> l> Out0) as FFT_2 =
    FFT(Custom_1_out0)
   {
    param
     algorithm : DCT ;
     inputTimeSeries : l ;
     resolution : 1024u ;
    output
     Out0 : l = magnitude() ;
   }

So we have 5 FFT operators running in parallel and performance is 86-86 tuples/ per second.
Third step
Next step is to distribute calculation between two nodes. So we have to create two instances of FFTPerform operator and put them into separate PE (Processing Element).
namespace application ;

use com.testperf.series::SeriesSource ;
use com.testperf.fft::FFTPerform ;
use com.testperf.sink::SinkFFT ;

composite Main
{
 graph
  (stream<float64 num> SeriesSource_1_out0) as SeriesSource_1 = SeriesSource()
  {
  }

  (stream<list<float64> l> FFTPerform_3_out0) as FFTPerform_3 =
   FFTPerform(SeriesSource_1_out0)
  {
   config
    placement : partitionColocation("CC") ;
  }

  () as SinkFFT_5 = SinkFFT(FFTPerform_3_out0, FFTPerform_6_out0)
  {
  }

  (stream<list<float64> l> FFTPerform_6_out0) as FFTPerform_6 =
   FFTPerform(SeriesSource_1_out0)
  {
   config
    placement : partitionColocation("BB") ;
  }

 config
  placement : partitionColocation("AB") ;
}

We do not have do anything with operator logic, just make a copy of SinkFFT operator in main composite and by using partitionCollocation option put them to separate PE. When application is deployed IBM InfoSphere Streams Scheduler will push them into separate hosts.

Now we have 10 FFT operators running in parallel and distributed evenly between two hosts. The performance is 120-125 tuples per second.
Conclusion

  • The default model for Streams application is maximum parallelism, every operator in a separate PE (process). But is does not make any sense, having random tens or hundreds PE (processes) does not improve performance. So a good starting point is fuse all application into single PE (no parallelism at all) and later introduce parallelism according to design.
  • Firstly we have to decide how to measure performance. In the test application above it was very simple but in case of complex application it requires some effort.
  • Next step is identify bottleneck. In the test application it was obvious but in real application it could require a lot of tests and search.
  • Applying UDP annotation is the a very good method to introduce parallelism. But selecting the optimal parallel level number requires some effort, greater number does not mean better performance. The rule of thumb is that should not exceed the number of cores in the host machine.
  • Parallelism is not limited only to multiplying number of threads conducting a particular task but also spraying execution between different hosts. Identifying which operator or job should be distributed into separate host to achieve maximum performance requires some effort.