import numpy from pandas import DataFrame, Series d = {'one' : [1., 2., 3., 4.], 'two' : [4., 3., 2., 1.]} df = DataFrame(d)
one two 0 1.0 4.0 1 2.0 3.0 2 3.0 2.0 3 4.0 1.0In Spark's DataFrame data is provided by features, rows in feature matrix.
val sqlC = new org.apache.spark.sql.SQLContext(sc) import sqlC.implicits._ import org.apache.spark.sql._ import org.apache.spark.sql.types._ val df = Seq((1.0,4.0),(2.0,3.0),(3.0,2.0),(4.0,1.1)).toDF("one","two")
+---+---+ |one|two| +---+---+ |1.0|4.0| |2.0|3.0| |3.0|2.0| |4.0|1.1| +---+---+Of course, there is a plenty of methods to create DataFrame from a file or any external source. But sometimes it is convenient to create Spark's DataFrame manually using panda's convention.
So I created a simple Scala method for creating DataFrame using series, not features.
Zeppelin notebook
import org.apache.spark.sql._ import org.apache.spark.sql.types._ def createDF(spark: SparkSession, names: Seq[String], series: Seq[Any]*): DataFrame = { require(names.length == series.length) // val datas : Seq[Seq[Any]] = List.fill(names.length)(Nil) // val rows : Seq[Row] = List.fill(names.length)(Row()) val numof: Int = series(0).length var rows: Seq[Row] = Nil for (i <- 0 until numof) { var da: Seq[Any] = Nil for (j <- 0 until series.length) da = da :+ series(j)(i) val r: Row = Row.fromSeq(da) rows = rows :+ r } val rdd = spark.sparkContext.makeRDD(rows) // schema val schema: Seq[StructField] = for (i <- 0 until names.length) yield StructField(names(i), series(i)(0) match { case t: Int => IntegerType case t: Double => DoubleType case _ => StringType }, false ) spark.createDataFrame(rdd, StructType(schema)) }Usage example
val names2 = Seq("one", "tow") val seriesone = Seq(1.0,2.0,3.0,4.0) val seriestwo = Seq(4.0,3.0,2.0,1.0) val da = createDF(spark, names2,seriesone,seriestwo) da.showExample taken from Udacity course.
val names1 = Seq("countries","gold","silver","bronze") val countries = Seq("Russian Fed.", "Norway", "Canada", "United States", "Netherlands", "Germany", "Switzerland", "Belarus", "Austria", "France", "Poland", "China", "Korea", "Sweden", "Czech Republic", "Slovenia", "Japan", "Finland", "Great Britain", "Ukraine", "Slovakia", "Italy", "Latvia", "Australia", "Croatia", "Kazakhstan") val gold = Seq(13, 11, 10, 9, 8, 8, 6, 5, 4, 4, 4, 3, 3, 2, 2, 2, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0) val silver = Seq(11, 5, 10, 7, 7, 6, 3, 0, 8, 4, 1, 4, 3, 7, 4, 2, 4, 3, 1, 0, 0, 2, 2, 2, 1, 0) val bronze = Seq(9, 10, 5, 12, 9, 5, 2, 1, 5, 7, 1, 2, 2, 6, 2, 4, 3, 1, 2, 1, 0, 6, 2, 1, 0, 1) val da1 = createDF(spark, names1, countries, gold,silver,bronze) da1.show(3)