Spark Row
可认为是spark预设好的一个case class,当不想为Dataset[T]定义case class时,就是Dataset[Row],即DataFrame。
定义
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Row
是一个接口 & 伴生对象。代表一行,信息包括:
- size/length:这一行包含了几个元素;
- schema:各个元素对应的schema;
get(index): Any
:根据index获取元素;
这个get是个很重要的方法,因为其他的getXX都是基于这个get实现的,比如:
getAs[T](Int)
:def getAs[T](i: Int): T = get(i).asInstanceOf[T]
;getAs[T](String)
:def getAs[T](fieldName: String): T = getAs[T](fieldIndex(fieldName))
;getBoolean()
等:这一类是基于getAs[T]
实现的,所以也是基于get;
获取字段有两种getAs,一种使用index,另一种使用name。比如getAs[Int]("age")
。它其实是先通过name在schema中的位置获取index,再根据index获取字段的值。
实现
数据
Row的最基础的实现类GenericRow
,就是内部保存了一个Array[Any]
,然后实现了get(index)
方法:override def get(i: Int): Any = values(i)
。
所以,可以理解为Row的核心就是一个能保存Any类型的数组。
这个实现类没有实现fieldIndex方法,Row使用它获取name在schema中的index,借以实现
getAs[T](String)
。所以GenericRow
应该不是Row的常用类,因为它没有schema,没法实现和schema相关的功能,比如fieldIndex。
schema
GenericRowWithSchema
是一个带schema的实现,它继承了GenericRow
,多加了一个schema,是StructType
。
GenericRowWithSchema
就实现了fieldIndex方法:override def fieldIndex(name: String): Int = schema.fieldIndex(name)
。因为它有schema啊。
StructType
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.types.StructType
StructType本质上也是一个数组:Array[StructField]
。这个数组保存在它的fields
属性中。
另外StructType也是DataType的子类,而StructField的类型就是DataType,所以StructType可以嵌套:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import org.apache.spark.sql._
import org.apache.spark.sql.types._
val innerStruct =
StructType(
StructField("f1", IntegerType, true) ::
StructField("f2", LongType, false) ::
StructField("f3", BooleanType, false) :: Nil)
val struct = StructType(
StructField("a", innerStruct, true) :: Nil)
// Create a Row with the schema defined by struct
val row = Row(Row(1, 2, true))
上面定义了一个名为struct的结构体,它的a是一个新的struct,所以a就是结构体的嵌套。对应为数据的话,就是Row的嵌套。所以上述定义的row就是Row套Row。
- a对应Row;
- Row中,f1对应1;
- f2对应2;
- f3对应true;
StructField
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.types.StructField
是一个结构体,包含了:
- name;名称,比如bid;
- dataType;
DataType
类型,是spark sql的基本数据类型,比如String等(也可以是StructType,所以StructType就可以嵌套了:StructType中的一个StructField可以是一个StructType); - nullable:是否可以是null;
- metadata:
Metadata
类型,本质上是一个Map的wrapper(嗯,它就是一个map,很多k-v组成了metadata吧);
获取数据和schema
所以:
- 获取一个Row的schema:
row.schema
; - 想知道schema的名称,比如bid、click、age等:
row.schema.fields.map(field => field.name)
;
一个以zeppelin table格式输出Dataset的样例:
1
2
3
4
5
6
val show_zp_table = (dataset: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]) => {
val title = dataset.schema.fields.map(x => x.name).mkString("\t")
val content = dataset.collect().map(x => x.mkString("\t")).mkString("\n")
print("%table\n" + title + "\n" + content)
}
- 获取所有的字段名称,tab分隔:
- 主要用了Array的mkString,将array使用separator连接为String;
- 获取每一行,字段之间tab分隔,行与行之间换行符分隔:
- 这里要知道Row是有mkString方法的,和Array的mkString类似(实际上mkString的实现就是获取所有元素,构造一个Array,然后调用Array的mkString);