spark快速上手
前言
- 基于Spark 2.1版本
- 仅仅是快速上手,没有深究细节
- 主要参考是官方文档
- 代码均为官方文档中代码,语言为Scala
进入spark-shell
终端输入spark-shell
,进入的是Scala环境的终端,也可以输入pyspark
进入Python环境的终端
创建一个SparkSession
import org.apache.spark.sql.SparkSessionval spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate()// For implicit conversions like converting RDDs to DataFramesimport spark.implicits._
创建DataFrame
val df = spark.read.json("examples/src/main/resources/people.json")// Displays the content of the DataFrame to stdoutdf.show()// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19| Justin|// +----+-------+
此处可以读其他格式文件,如text,parquet,对应函数为spark.read.text
和spark.read.parquet
执行一些操作
df.printSchema()// root// |-- age: long (nullable = true)// |-- name: string (nullable = true)// Select only the "name" columndf.select("name").show()// +-------+// | name|// +-------+// |Michael|// | Andy|// | Justin|// +-------+// Select everybody, but increment the age by 1df.select($"name", $"age" + 1).show()// +-------+---------+// | name|(age + 1)|// +-------+---------+// |Michael| null|// | Andy| 31|// | Justin| 20|// +-------+---------+// Select people older than 21df.filter($"age" > 21).show()// +---+----+// |age|name|// +---+----+// | 30|Andy|// +---+----+// Count people by agedf.groupBy("age").count().show()// +----+-----+// | age|count|// +----+-----+// | 19| 1|// |null| 1|// | 30| 1|// +----+-----+
执行一些SQL操作
可以看出上一节中利用自带函数可以实现查询的效果,假如对函数不太熟悉,也可以执行SQL语句,如下:
// Register the DataFrame as a SQL temporary viewdf.createOrReplaceTempView("people")val sqlDF = spark.sql("SELECT * FROM people")sqlDF.show()// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19| Justin|// +----+-------+
- 此处创建的为TempView,还有GlobalTemporaryView,前者当创建它的session关闭时,view消失;后者则不会。
- View的作用相当于数据库中的table
DataSet
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,// you can use custom classes that implement the Product interfacecase class Person(name: String, age: Long)// Encoders are created for case classesval caseClassDS = Seq(Person("Andy", 32)).toDS()caseClassDS.show()// +----+---+// |name|age|// +----+---+// |Andy| 32|// +----+---+// Encoders for most common types are automatically provided by importing spark.implicits._val primitiveDS = Seq(1, 2, 3).toDS()primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by nameval path = "examples/src/main/resources/people.json"val peopleDS = spark.read.json(path).as[Person]peopleDS.show()// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19| Justin|// +----+-------+
第一次接触Spark,可能会搞不清楚DataSet和DataFrame的区别,可以参考下面链接:
还有一个RDD的概念,可以参考:
-