Blog do projektu Open Source JavaHotel

piątek, 31 sierpnia 2018

Pandas DataFrame and Scale Spark DataFrame

In pandas DataFrame (similar but different then Spark's DataFrame), data is provided by series.

import numpy
from pandas import DataFrame, Series
d = {'one' : [1., 2., 3., 4.],  'two' : [4., 3., 2., 1.]}
df = DataFrame(d)
one two 
0 1.0 4.0 
1 2.0 3.0 
2 3.0 2.0 
3 4.0 1.0
In Spark's DataFrame data is provided by features, rows in feature matrix.
val sqlC = new org.apache.spark.sql.SQLContext(sc)
import sqlC.implicits._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
val df = Seq((1.0,4.0),(2.0,3.0),(3.0,2.0),(4.0,1.1)).toDF("one","two")
+---+---+
|one|two|
+---+---+
|1.0|4.0|
|2.0|3.0|
|3.0|2.0|
|4.0|1.1|
+---+---+
Of course, there is a plenty of methods to create DataFrame from a file or any external source. But sometimes it is convenient to create Spark's DataFrame manually using panda's convention.
So I created a simple Scala method for creating DataFrame using series, not features.
Zeppelin notebook
import org.apache.spark.sql._
import org.apache.spark.sql.types._

def createDF(spark: SparkSession, names: Seq[String], series: Seq[Any]*): DataFrame = {
    require(names.length == series.length)
    //    val datas : Seq[Seq[Any]] = List.fill(names.length)(Nil)
    //    val rows : Seq[Row] = List.fill(names.length)(Row())
    val numof: Int = series(0).length
    var rows: Seq[Row] = Nil
    for (i <- 0 until numof) {
      var da: Seq[Any] = Nil
      for (j <- 0 until series.length)
        da = da :+ series(j)(i)
      val r: Row = Row.fromSeq(da)
      rows = rows :+ r
    }
    val rdd = spark.sparkContext.makeRDD(rows)
    // schema
    val schema: Seq[StructField] =
      for (i <- 0 until names.length)
        yield StructField(names(i),
          series(i)(0) match {
            case t: Int => IntegerType
            case t: Double => DoubleType
            case _ => StringType
          },
          false
        )
    spark.createDataFrame(rdd, StructType(schema))
  }
Usage example
val names2 = Seq("one", "tow")
    val seriesone = Seq(1.0,2.0,3.0,4.0)
    val seriestwo = Seq(4.0,3.0,2.0,1.0)
    val da =  createDF(spark, names2,seriesone,seriestwo)
    da.show
Example taken from Udacity course.
val names1 = Seq("countries","gold","silver","bronze")
    val countries = Seq("Russian Fed.", "Norway", "Canada", "United States",
    "Netherlands", "Germany", "Switzerland", "Belarus",
    "Austria", "France", "Poland", "China", "Korea",
    "Sweden", "Czech Republic", "Slovenia", "Japan",
    "Finland", "Great Britain", "Ukraine", "Slovakia",
    "Italy", "Latvia", "Australia", "Croatia", "Kazakhstan")

    val gold = Seq(13, 11, 10, 9, 8, 8, 6, 5, 4, 4, 4, 3, 3, 2, 2, 2, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0)
    val silver = Seq(11, 5, 10, 7, 7, 6, 3, 0, 8, 4, 1, 4, 3, 7, 4, 2, 4, 3, 1, 0, 0, 2, 2, 2, 1, 0)
    val bronze = Seq(9, 10, 5, 12, 9, 5, 2, 1, 5, 7, 1, 2, 2, 6, 2, 4, 3, 1, 2, 1, 0, 6, 2, 1, 0, 1)

    val da1 =  createDF(spark, names1, countries, gold,silver,bronze)
    da1.show(3)

Brak komentarzy:

Prześlij komentarz