import numpy
from pandas import DataFrame, Series
d = {'one' : [1., 2., 3., 4.], 'two' : [4., 3., 2., 1.]}
df = DataFrame(d)
one two 0 1.0 4.0 1 2.0 3.0 2 3.0 2.0 3 4.0 1.0In Spark's DataFrame data is provided by features, rows in feature matrix.
val sqlC = new org.apache.spark.sql.SQLContext(sc)
import sqlC.implicits._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
val df = Seq((1.0,4.0),(2.0,3.0),(3.0,2.0),(4.0,1.1)).toDF("one","two")
+---+---+ |one|two| +---+---+ |1.0|4.0| |2.0|3.0| |3.0|2.0| |4.0|1.1| +---+---+Of course, there is a plenty of methods to create DataFrame from a file or any external source. But sometimes it is convenient to create Spark's DataFrame manually using panda's convention.
So I created a simple Scala method for creating DataFrame using series, not features.
Zeppelin notebook
import org.apache.spark.sql._
import org.apache.spark.sql.types._
def createDF(spark: SparkSession, names: Seq[String], series: Seq[Any]*): DataFrame = {
require(names.length == series.length)
// val datas : Seq[Seq[Any]] = List.fill(names.length)(Nil)
// val rows : Seq[Row] = List.fill(names.length)(Row())
val numof: Int = series(0).length
var rows: Seq[Row] = Nil
for (i <- 0 until numof) {
var da: Seq[Any] = Nil
for (j <- 0 until series.length)
da = da :+ series(j)(i)
val r: Row = Row.fromSeq(da)
rows = rows :+ r
}
val rdd = spark.sparkContext.makeRDD(rows)
// schema
val schema: Seq[StructField] =
for (i <- 0 until names.length)
yield StructField(names(i),
series(i)(0) match {
case t: Int => IntegerType
case t: Double => DoubleType
case _ => StringType
},
false
)
spark.createDataFrame(rdd, StructType(schema))
}
Usage exampleval names2 = Seq("one", "tow")
val seriesone = Seq(1.0,2.0,3.0,4.0)
val seriestwo = Seq(4.0,3.0,2.0,1.0)
val da = createDF(spark, names2,seriesone,seriestwo)
da.show
Example taken from Udacity course.
val names1 = Seq("countries","gold","silver","bronze")
val countries = Seq("Russian Fed.", "Norway", "Canada", "United States",
"Netherlands", "Germany", "Switzerland", "Belarus",
"Austria", "France", "Poland", "China", "Korea",
"Sweden", "Czech Republic", "Slovenia", "Japan",
"Finland", "Great Britain", "Ukraine", "Slovakia",
"Italy", "Latvia", "Australia", "Croatia", "Kazakhstan")
val gold = Seq(13, 11, 10, 9, 8, 8, 6, 5, 4, 4, 4, 3, 3, 2, 2, 2, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0)
val silver = Seq(11, 5, 10, 7, 7, 6, 3, 0, 8, 4, 1, 4, 3, 7, 4, 2, 4, 3, 1, 0, 0, 2, 2, 2, 1, 0)
val bronze = Seq(9, 10, 5, 12, 9, 5, 2, 1, 5, 7, 1, 2, 2, 6, 2, 4, 3, 1, 2, 1, 0, 6, 2, 1, 0, 1)
val da1 = createDF(spark, names1, countries, gold,silver,bronze)
da1.show(3)
Brak komentarzy:
Prześlij komentarz