文章

Spark Row

可认为是spark预设好的一个case class,当不想为Dataset[T]定义case class时,就是Dataset[Row],即DataFrame。

  1. 定义
  2. 实现
    1. 数据
    2. schema
      1. StructType
      2. StructField
  3. 获取数据和schema

定义

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Row

是一个接口 & 伴生对象。代表一行,信息包括:

  • size/length:这一行包含了几个元素;
  • schema:各个元素对应的schema;
  • get(index): Any:根据index获取元素;

这个get是个很重要的方法,因为其他的getXX都是基于这个get实现的,比如:

  • getAs[T](Int): def getAs[T](i: Int): T = get(i).asInstanceOf[T];
  • getAs[T](String): def getAs[T](fieldName: String): T = getAs[T](fieldIndex(fieldName));
  • getBoolean()等:这一类是基于getAs[T]实现的,所以也是基于get;

获取字段有两种getAs,一种使用index,另一种使用name。比如getAs[Int]("age")。它其实是先通过name在schema中的位置获取index,再根据index获取字段的值。

实现

数据

Row的最基础的实现类GenericRow,就是内部保存了一个Array[Any],然后实现了get(index)方法:override def get(i: Int): Any = values(i)

所以,可以理解为Row的核心就是一个能保存Any类型的数组。

这个实现类没有实现fieldIndex方法,Row使用它获取name在schema中的index,借以实现getAs[T](String)。所以GenericRow应该不是Row的常用类,因为它没有schema,没法实现和schema相关的功能,比如fieldIndex。

schema

GenericRowWithSchema是一个带schema的实现,它继承了GenericRow,多加了一个schema,是StructType

GenericRowWithSchema就实现了fieldIndex方法:override def fieldIndex(name: String): Int = schema.fieldIndex(name)。因为它有schema啊。

StructType

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.types.StructType

StructType本质上也是一个数组:Array[StructField]。这个数组保存在它的fields属性中。

另外StructType也是DataType的子类,而StructField的类型就是DataType,所以StructType可以嵌套:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import org.apache.spark.sql._
import org.apache.spark.sql.types._

val innerStruct =
  StructType(
    StructField("f1", IntegerType, true) ::
    StructField("f2", LongType, false) ::
    StructField("f3", BooleanType, false) :: Nil)

val struct = StructType(
  StructField("a", innerStruct, true) :: Nil)

// Create a Row with the schema defined by struct
val row = Row(Row(1, 2, true))

上面定义了一个名为struct的结构体,它的a是一个新的struct,所以a就是结构体的嵌套对应为数据的话,就是Row的嵌套。所以上述定义的row就是Row套Row。

  • a对应Row;
    • Row中,f1对应1;
    • f2对应2;
    • f3对应true;

StructField

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.types.StructField

是一个结构体,包含了:

  • name;名称,比如bid;
  • dataType;DataType类型,是spark sql的基本数据类型,比如String等(也可以是StructType,所以StructType就可以嵌套了:StructType中的一个StructField可以是一个StructType);
  • nullable:是否可以是null;
  • metadata:Metadata类型,本质上是一个Map的wrapper(嗯,它就是一个map,很多k-v组成了metadata吧);

获取数据和schema

所以:

  • 获取一个Row的schema:row.schema
  • 想知道schema的名称,比如bid、click、age等:row.schema.fields.map(field => field.name)

一个以zeppelin table格式输出Dataset的样例:

1
2
3
4
5
6
val show_zp_table = (dataset: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]) => {
    val title = dataset.schema.fields.map(x => x.name).mkString("\t")
    val content = dataset.collect().map(x => x.mkString("\t")).mkString("\n")

    print("%table\n" + title + "\n" + content)
}
  1. 获取所有的字段名称,tab分隔:
    1. 主要用了Array的mkString,将array使用separator连接为String;
  2. 获取每一行,字段之间tab分隔,行与行之间换行符分隔:
    1. 这里要知道Row是有mkString方法的,和Array的mkString类似(实际上mkString的实现就是获取所有元素,构造一个Array,然后调用Array的mkString);
本文由作者按照 CC BY 4.0 进行授权