Blog do projektu Open Source JavaHotel

niedziela, 3 stycznia 2016

IBM InfoSphere Streams, PowerBI operator

Introduction
Previously I created a simple Java helper package to get access to PowerBI REST API. Now the time has come to develop Streams operator shipping data out  to PowerBI. Full source code is available here as StreamsStudio project.
Operator description, parameters
Operator (Java source code is available here) accepts seven parameters, five mandatory and two optional.
  • oauth_username
  • oauth_password
  • oauth_clientid: Parameters are required to receive access token to PowerBI. More details about PowerBI REST API and authentication is available here.
  • datasetName
  • tablename: Defines the table in PowerBI namespace.
  • flushsize: optional (default 1). Specifies the buffer size threshold before pushing data to PowerBI. Increasing the value improves performance but there is a delay between data is received and stored to PowerBI. The default is 1, every tuple is sent immediately to PowerBI. Value 0 has special meaning. Data is pushed to PowerBI when punctuation marker is received.
  • cleanfirstly: optional (default false). A logical parameter, if true then PowerBI table is truncated at the beginning.
Operator description, data loading
Pushing data is very simple, just create input streams reflecting the table structure in PowerBI and let the tuples flow. Only primitive data types are allowed (source).

private Map createTableSchema(OperatorContext context) throws PowerException {
  StreamSchema sche = context.getStreamingInputs().get(0).getStreamSchema();
  Map bischema = new HashMap();
  for (String name : sche.getAttributeNames()) {
   Type.MetaType ty = sche.getAttribute(name).getType().getMetaType();
   String biType = null;
   switch (ty) {
   case RSTRING:
   case USTRING:
   case ENUM:
    biType = PowerBI.STRING_TYPE;
    break;
   case INT8:
   case INT32:
   case INT64:
   case UINT8:
   case UINT16:
   case UINT32:
   case UINT64:
   case INT16:
    biType = PowerBI.INT64_TYPE;
    break;
   case DECIMAL128:
   case DECIMAL32:
   case DECIMAL64:
   case FLOAT32:
   case FLOAT64:
    biType = PowerBI.DOUBLE_TYPE;
    break;
   case BOOLEAN:
    biType = PowerBI.BOOL_TYPE;
    break;
   case TIMESTAMP:
    biType = PowerBI.DATETIME_TYPE;
    break;
   default:
    break;
   }
   if (biType == null)
    failure("Attribute " + name + " type " + ty.getLanguageType() + " not supported");
   log.log(TraceLevel.DEBUG, "Attribute " + name + " type " + ty.getLanguageType() + " mapped to " + biType);
   bischema.put(name, biType);
  }
  return bischema;
 }

Usage example

namespace application.test ;

use com.ibm.streams.powerbi::PowerBI ;

composite Main
{
 graph
  (stream
   Beacon_1_out0 as O) as Beacon_1 = Beacon()
  {
   logic
    state : mutable int32 i = 0 ;
   param
    iterations : 100 ;
    period : 0.1f ;
   output
    O : name = "Name " +(rstring)(i ++), num =(uint32)(i ++), flo = 1.23, log =
     i / 2 == 0 ? true : false, ti = createTimestamp(1000l, 100u) ;
  }

  () as PowerBI_2 = PowerBI(Beacon_1_out0)
  {
   param
    oauth_clientid : "22dfcddc-d5b4-4e8a-8358-1658fabdad0b" ;
    oauth_password : "Szczuja123" ;
    oauth_username : "szczodry@szczerbek.onmicrosoft.com" ;
    datasetName : "InfoS6" ;
    tablename : "table6" ;
    flushsize : 10 ;
  }

  () as Custom_3 = Custom(Beacon_1_out0 as inputStream)
  {
   logic
    onTuple inputStream :
    {
     println(inputStream) ;
    }

  }

}
More examples.
Loading modes
Three loading modes are supported. It is specified by parameter flushsize, described above.

  • flushsize = 1 (default). The incoming tuple is immediately pushed to PowerBI. Data is immediately available but there is a performance penalty.
  • flushsize > 1. Sending tuples to PowerBI is held off until the threshold is exceeded and then the whole buffer is shipped out in a single operation. It is more efficient but there is a gap between the time when data arrive and are available in PowerBI.
  • flushsize = 0. Tuples are buffered and sent to PowerBI when punctuation marker is received.
Dependency
In addition to PowerBI Java package, additional dependency is imposed by this library. It is specified by pom.xml file. The dependencies are stored in imp/lib directory in operator structure. The directory is defined by Lib annotation in operator body.

Additional remarks
  • Only one table can be defined per dataset. I cannot tell if it is a limitation of PowerBI or defect in PowerBI REST API
  • If a table does not exist then it is created during initialization of the operator. The table schema mirrors the schema of the operator input stream. Table schema cannot be changed later. If the table is already created then changing operator input stream schema later will cause a crash. 


Brak komentarzy:

Prześlij komentarz