type
Post
status
Published
summary
SparkSQL 是 Spark 的一个模块,专为处理结构化数据而设计,提供了对 SQL 查询的支持,使得用户可以通过 SQL 或 DataFrame API 进行高效的数据处理和分析。SparkSQL 允许无缝地在结构化数据(如 JSON、Parquet、Hive 表)和 RDD 之间进行转换,集成了 Catalyst 优化器和 Tungsten 执行引擎,从而实现查询优化和高效执行。此外,SparkSQL 还支持与多种数据源的集成,使其在大数据处理和分析领域具有广泛的应用。
slug
bigdata-spark-SparkSQL
date
Aug 2, 2024
tags
大数据
Spark
SparkSQL
category
大数据
password
icon
URL
Property
Aug 5, 2024 08:28 AM
SparkSQL 是 Spark 的一个模块,专为处理结构化数据而设计,提供了对 SQL 查询的支持,使得用户可以通过 SQL 或 DataFrame API 进行高效的数据处理和分析。SparkSQL 允许无缝地在结构化数据(如 JSON、Parquet、Hive 表)和 RDD 之间进行转换,集成了 Catalyst 优化器和 Tungsten 执行引擎,从而实现查询优化和高效执行。此外,SparkSQL 还支持与多种数据源的集成,使其在大数据处理和分析领域具有广泛的应用。
SparkSQL是非常成熟的海量结构化数据处理框架. SparkSQL支持SQL语言\性能强\可以自动优化\API简单\兼容HIVE等等;SparkSQL:使用简单、API统一、兼容HIVE、支持 标准化JDBC和ODBC连接
SparkSQL的数据抽象
  • SchemaRDD对象(已废弃)
  • DataSet对象:可用于Java、Scala语言
  • DataFrame对象:可用于Java、Scala、Python 、R
DataFrame和RDD都是:弹性的、分布式的、数据集 只是,DataFrame存储的数据结构“限定”为:二维表结构化数 据 而RDD可以存储的数据则没有任何限制,想处理什么就处理什么
相同点是DataFrame和RDD都是分布式的分区处理的

SparkSession 对象

在RDD阶段,程序的执行入口对象是:SparkContext;在Spark 2.0后,推出了SparkSession对象,作为Spark编码的统一入口对象。
SparkSession对象可以:
  • 用于SparkSQL编程作为入口对象
  • 用于SparkCore编程,可以通过SparkSession对象中获取到SparkContext
所以,后续的代码,执行环境入口对象,统一变更为SparkSession对象
from pyspark.sql import SparkSession if __name__ == "__main__": # 创建 SparkSession spark = SparkSession.builder.appName("spark_sql_create").master('local[3]').getOrCreate() # 获取 SparkContext sc = spark.sparkContext # 读取 CSV 文件,不包含 header,因此 header 参数应为 False df = spark.read.csv('file:///home/hadoop/pyspark_project/data/stu_score.txt', sep=',', header=False) # 将 DataFrame 的列重命名 df2 = df.toDF('id', 'name', 'score') # 打印 DataFrame 的 Schema df2.printSchema() # 显示 DataFrame 的数据 df2.show() df2.createTempView("score") spark.sql(""" select * from score where name='语文' limit 5; """).show() df2.where('name == "语文"').limit(5).show() # 停止 SparkSession spark.stop()

DataFrame

DataFrame 介绍

DataFrame是一个二维表结构, 那么表格结构就有无法 绕开的三个点:行、列、表结构描述。
基于这个前提,DataFrame的组成如下:
  • 在结构层面:
    • StructType对象描述整个DataFrame的表结构
    • StructField对象描述一个列的信息
  • 在数据层面
    • Row对象记录一行数据
    • Column对象记录一列数据并包含列的信息
notion image
notion image

DataFrame 创建

通过 rdd 和 pands DataFrame 创建
import pandas as pd from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StringType, IntegerType if __name__ == '__main__': spark = SparkSession.builder.appName('dataframe_create').master('local[3]').getOrCreate() sc = spark.sparkContext #### 开头的s小写,并且没有括号 try: rdd = sc.textFile('file:///home/hadoop/pyspark_project/data/sql/people.txt').\ map(lambda line: line.split(',')).\ map(lambda x: [x[0],int(x[1])]) schema1=['name','age'] schema2 = StructType().add('name', StringType(), nullable=True).\ add('age', IntegerType(), nullable=False) # nullable=False 是否允许为空 df1 = spark.createDataFrame(rdd, schema=schema1) df1.printSchema() # 打印表结构 df1.show(20, truncate=False) # 打印表数据,默认前 20 条,默认truncate=True(内容截断) df2 = spark.createDataFrame(rdd, schema=schema2) df2.printSchema() df2.show(20, truncate=False) df3 = rdd.toDF(schema2) df3.printSchema() df3.show(20, truncate=False) # 通过 pandas DataFrame 创建 pdf = pd.read_csv('pyspark_project/data/sql/people.txt') df4 = spark.createDataFrame(pdf, schema2) df4.printSchema() df4.show() df4.createOrReplaceTempView('people') # 将 df 转换成临时表(视图),就可以通过 SQL 语句查询 spark.sql('select * from people where age > 20').show() finally: spark.stop()
通过统一风格的 API 创建数据
from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StringType, IntegerType if __name__ == '__main__': spark = SparkSession.builder.appName('dataframe_create').master('local[3]').getOrCreate() sc = spark.sparkContext #### 开头的s小写,并且没有括号 try: schema_text = StructType().add('data', StringType(), nullable=True) df_text = spark.read.format('text').schema(schema=schema_text).load('file:///home/hadoop/pyspark_project/data/sql/people.txt') df_text.printSchema() df_text.show() df_json = spark.read.format('json').load('file:///home/hadoop/pyspark_project/data/sql/people.json') df_json.printSchema() df_json.show() df_csv = spark.read.format('csv').option('sep',';').option('header',True).schema("name STRING, age INT, job STRING").load('file:///home/hadoop/pyspark_project/data/sql/people.csv') df_csv.printSchema() df_csv.show() df_parquet = spark.read.format('parquet').load('file:///home/hadoop/pyspark_project/data/sql/users.parquet') df_parquet.printSchema() df_parquet.show() finally: spark.stop() ### 语法 # spark.read.format().schema().option().load() # format:指定文件格式;可选:text|csv|json|parquet|orc|avro|jdbc|...... # text:读取的文件只能作为一列放进 DataFrame 中,如果不指定列名,那么默认列名为 value # json:json 文件可以自动识别列名和列类型,一般不需要指定 schema # csv:csv 文件需要通过option 选项指定数据分隔符,数据是否包含表头,数据列名和数据类型等 # parquet: 是Spark中常用的一种列式存储文件格式 和Hive中的ORC差不多, parquet对比普通的文本文件的区别: # ● parquet 内置schema (列名\ 列类型\ 是否为空) # ● 存储是以列作为存储格式 # ● 存储是序列化存储在文件中的(有压缩属性体积小) # schema:指定数据列的相关信息;可以使用的格式:StructType | String # StructType:指定列名、数据类型、是否允许为空 # StructType().add('data', StringType(), nullable=True) # String:指定列名、数据类型,不指定是否允许为空 # schema("name STRING, age INT, job STRING") # load:指定文件路径;支持本地路径和 hdfs 路径 # option:指定一些特殊属性:sep、header、encoding

DataFrame 基操

DataFrame支持两种风格进行编程:DSL风格、SQL风格
DSL语法风格
领域特定语言。 其实就是指DataFrame的特有API DSL风格意思就是以调用API的方式来处理Data 比如:df.where().limit()
from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StringType, IntegerType if __name__ == '__main__': spark = SparkSession.builder.appName('dataframe_create').master('local[3]').getOrCreate() sc = spark.sparkContext #### 开头的s小写,并且没有括号 try: df_csv = spark.read.format('csv').schema("id INT, subject STRING, score INT").load('file:///home/hadoop/pyspark_project/data/stu_score.txt') df_csv.printSchema() # df_csv.show() df_csv.select(['id', 'score']).show() df_csv.select('id', 'score').show() df_csv.select(df_csv['id'], df_csv['score']).show() # filter方法与where方法效果相同 df_csv.filter("score < 99").show() df_csv.filter(df_csv['score'] < 99).show() df_csv.where("score < 99").show() df_csv.where(df_csv['score'] < 99).show() # groupBy的返回值是一个GroupedData对象,不是 dataframe;groupBy后一般接聚合函数,常见的聚合函数有:count、sum、avg、max、min;使用聚合函数后返回的数据依然还是 dataframe; df_csv.groupBy('subject').count().show() df_csv.groupBy(df_csv['subject']).count().show() finally: spark.stop()
SQL语法风格
使用SQL语句处理DataFrame的数据 比如:spark.sql(“SELECT * FROM xxx) —— 想使用SQL风格的语法,需要将DataFrame注册成表
from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StringType, IntegerType if __name__ == '__main__': spark = SparkSession.builder.appName('dataframe_create').master('local[3]').getOrCreate() sc = spark.sparkContext #### 开头的s小写,并且没有括号 try: df_csv = spark.read.format('csv').schema("id INT, subject STRING, score INT").load('file:///home/hadoop/pyspark_project/data/stu_score.txt') df_csv.printSchema() # df_csv.show() df_csv.createTempView('score1') # 创建一个临时视图 df_csv.createOrReplaceTempView('score2') # 创建一个临时视图,如果存在则替换 df_csv.createGlobalTempView('score3') # 创建全局临时视图 spark.sql('select subject, count(1) cnt from score1 group by subject').show() spark.sql('select subject, count(1) cnt from score2 group by subject').show() spark.sql('select subject, count(1) cnt from global_temp.score3 group by subject').show() finally: spark.stop()
练习示例1:
from pyspark.sql import SparkSession from pyspark.sql import functions as F if __name__ == '__main__': spark = SparkSession.builder.appName('dataframe_create').master('local[3]').getOrCreate() sc = spark.sparkContext #### 开头的s小写,并且没有括号 try: rdd = sc.textFile('file:///home/hadoop/pyspark_project/data/words.txt') rdd = rdd.flatMap(lambda line:line.split(' ')).map(lambda word: [word]) df = rdd.toDF(['word']) df.createOrReplaceTempView('words') spark.sql('select word, count(1) as cnt from words group by word order by cnt desc').show() df = spark.read.format('text').load('file:///home/hadoop/pyspark_project/data/words.txt') df2 = df.withColumn('value', F.explode(F.split(df.value, ' '))) df2.groupBy('value').count().withColumnRenamed('count', 'cnt').withColumnRenamed('value', 'word').orderBy(F.desc('cnt')).show() finally: spark.stop()
练习示例2:
1. agg: 它是GroupedData对象的API, 作用是 在里面可以写多个聚合 2. alias: 它是Column对象的API, 可以针对一个列 进行改名 3. withColumnRenamed: 它是DataFrame的API, 可以对DF中的列进行改名, 一次改一个列, 改多个列 可以链式调用 4. orderBy: DataFrame的API, 进行排序, 参数1是被排序的列, 参数2是 升序(True) 或 降序 False 5. first: DataFrame的API, 取出DF的第一行数据, 返回值结果是Row对象. # Row对象 就是一个数组, 你可以通过row['列名'] 来取出当前行中, 某一列的具体数值. 返回值不再是DF 或者GroupedData 或者Column而是具体的值(字符串, 数字等)
from pyspark.sql import SparkSession from pyspark.sql import functions as F import time if __name__ == '__main__': spark = SparkSession.builder.appName('dataframe_create').master('local[3]').config('spark.sql.shuffle.partitions', 3).getOrCreate() """ spark.sql.shuffle.partitions 参数指的是, 在sql计算中, shuffle算子阶段默认的分区数是200个. 对于集群模式来说, 200个默认也算比较合适 如果在local下运行, 200个很多, 在调度上会带来额外的损耗 所以在local下建议修改比较低 比如2\4\10均可 这个参数和Spark RDD中设置并行度的参数 是相互独立的. """ sc = spark.sparkContext #### 开头的s小写,并且没有括号 try: df = spark.read.format('csv').\ option('sep', '\t').\ option('header', False).\ schema("user_id STRING, movie_id STRING, rank INT, timestamp STRING").\ load("file:///home/hadoop/pyspark_project/data/sql/u.data") # 统计用户打出的平均分 df.groupBy('user_id').\ avg('rank').\ withColumnRenamed('avg(rank)', 'avg_rank').\ withColumn('avg_rank', F.round(F.col('avg_rank'), 2)).\ orderBy('avg_rank', ascending=False).\ show() # 统计电影的平均分 df.createOrReplaceTempView('user_movie') spark.sql("select movie_id, round(avg(rank), 2) as avg_rank from user_movie group by movie_id order by avg_rank desc").show() # 统计大于平均分的电影的数量 print(df.where(df.rank > df.agg(F.avg(df.rank)).collect()[0][0]).count()) # df.agg:允许你在一个或多个列上应用聚合函数,agg 方法可以接受一个字典,其中键是列名,值是要应用的聚合函数,或者直接传递聚合表达式。 # 统计高分电影打分次数最多的用户的平均打分 user_id = df.where(df.rank > 3).\ groupBy('user_id').\ count().\ withColumnRenamed('count', 'cnt').\ orderBy('cnt', ascending=False).\ first()['user_id'] print(user_id) df.filter(df.user_id == user_id).\ select(F.round(F.avg(df.rank), 2)).\ show() # 统计每个用户的平均分、最低分、最高分 df.groupBy('user_id').\ agg( F.round(F.avg(df.rank), 2).alias('avg_rank'), F.min(df.rank).alias('min_rank'), F.max(df.rank).alias('max_rank') ).show() # 统计评分超过 100 次的电影的平均分top10 df.groupBy('movie_id').\ agg( F.count(df.rank).alias('cnt'), F.round(F.avg(df.rank), 2).alias('avg_rank') # ).where('cnt > 100').\ # 两种方法都行 ).where(F.col('cnt') > 100).\ orderBy('avg_rank', ascending=False).\ show(10) # time.sleep(10000) finally: spark.stop()

DataFrame 数据清洗

from pyspark.sql import SparkSession if __name__ == '__main__': spark = SparkSession.builder.appName('dataframe_create').master('local[3]').config('spark.sql.shuffle.partitions', 3).getOrCreate() sc = spark.sparkContext #### 开头的s小写,并且没有括号 try: df = spark.read.format('csv').\ option('sep', ';').\ option('header', True).\ load("file:///home/hadoop/pyspark_project/data/sql/people.csv") df.dropDuplicates().show() df.dropDuplicates(['age','job']).show() df.dropna().show() df.dropna(thresh=3).show() # 至少 3 个有效列才保留 df.dropna(thresh=2, subset=['name','age']).show() # 在['name','age']列中判断,至少 2 个有效列才保留 df.fillna('loss').show() df.fillna('loss', subset=['job']).show() # 只对 job 列填充 df.fillna({'name': 'unknown', 'age': 0, 'job': 'unemployed'}).show() # 对不同的列填充不同的值 finally: spark.stop()

DataFrame 数据写出

from pyspark.sql import SparkSession from pyspark.sql import functions as F if __name__ == '__main__': spark = SparkSession.builder.appName('dataframe_create').master('local[3]').config('spark.sql.shuffle.partitions', 3).getOrCreate() sc = spark.sparkContext #### 开头的s小写,并且没有括号 try: df = spark.read.format('csv').\ option('sep', '\t').\ option('header', False).\ schema("user_id STRING, movie_id STRING, rank INT, timestamp STRING").\ load("file:///home/hadoop/pyspark_project/data/sql/u.data") # Write text 写出, 只能写出一个列的数据, 需要将df转换为单列df df.select(F.concat_ws("---", "user_id", "movie_id", "rank", "timestamp")).\ write.\ mode("overwrite").\ format("text").\ save("file:///home/hadoop/pyspark_project/data/output/text") # Write csv df.write.mode("overwrite").\ format("csv").\ option("sep", ";").\ option("header", True).\ save("file:///home/hadoop/pyspark_project/data/output/csv") # Write json df.write.mode("overwrite").\ format("json").\ save("file:///home/hadoop/pyspark_project/data/output/json") # Write parquet df.write.mode("overwrite").\ format("parquet").\ save("file:///home/hadoop/pyspark_project/data/output/parquet") finally: spark.stop()
通过JDBC读写数据库
from pyspark.sql import SparkSession from pyspark.sql import functions as F if __name__ == '__main__': # 创建 SparkSession 并配置 JAR 文件路径 spark = SparkSession.builder \ .appName('dataframe_create') \ .master('local[3]') \ .config('spark.jars', '/export/server/miniconda3/envs/pyspark/lib/python3.8/site-packages/pyspark/jars/mysql-connector-java-8.0.26.jar') \ .config('spark.sql.shuffle.partitions', 3) \ .getOrCreate() sc = spark.sparkContext try: df = spark.read.format('csv') \ .option('sep', '\t') \ .option('header', False) \ .schema("user_id STRING, movie_id STRING, rank INT, timestamp STRING") \ .load("file:///home/hadoop/pyspark_project/data/sql/u.data") # 写入 df.write.mode('overwrite') \ .format('jdbc') \ .option('url', 'jdbc:mysql://anjhon:3306/bigdata?useSSL=false&useUnicode=true') \ .option('dbtable', 'movie_data') \ .option('user', 'root') \ .option('password', '32924ayd') \ .option('driver', 'com.mysql.cj.jdbc.Driver') \ .save() # 读取 df2 = spark.read.format('jdbc') \ .option('url', 'jdbc:mysql://anjhon:3306/bigdata?useSSL=false&useUnicode=true') \ .option('dbtable', 'movie_data') \ .option('user', 'root') \ .option('password', '32924ayd') \ .option('driver', 'com.mysql.cj.jdbc.Driver') \ .load() df2.printSchema() df2.show() finally: spark.stop()
遇到问题
问题提示
(pyspark) hadoop@anjhon:~$ /export/server/miniconda3/envs/pyspark/bin/python /home/hadoop/pyspark_project/02_SQL/12_dataframe_jdbc.py 24/07/28 22:52:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Traceback (most recent call last): File "/home/hadoop/pyspark_project/02_SQL/12_dataframe_jdbc.py", line 15, in <module> df.write.mode('overwrite').\ File "/export/server/miniconda3/envs/pyspark/lib/python3.8/site-packages/pyspark/sql/readwriter.py", line 1396, in save self._jwrite.save() File "/export/server/miniconda3/envs/pyspark/lib/python3.8/site-packages/py4j/java_gateway.py", line 1322, in __call__ return_value = get_return_value( File "/export/server/miniconda3/envs/pyspark/lib/python3.8/site-packages/pyspark/errors/exceptions/captured.py", line 169, in deco return f(*a, **kw) File "/export/server/miniconda3/envs/pyspark/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value raise Py4JJavaError( py4j.protocol.Py4JJavaError: An error occurred while calling o49.save. : java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver at java.net.URLClassLoader.findClass(URLClassLoader.java:387) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46) at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:103) at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:103) at scala.Option.foreach(Option.scala:407) at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:103) at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:246) at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:250) at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:47) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79) at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.lang.Thread.run(Thread.java:750)
解决
将MySQL 的数据连接驱动放到正确位置(/export/server/miniconda3/envs/pyspark/lib/python3.8/site-packages/pyspark/jars)并在代码中配置相关路径(.config('spark.jars', '/export/server/miniconda3/envs/pyspark/lib/python3.8/site-packages/pyspark/jars/mysql-connector-java-8.0.26.jar'))和 driver(.option('driver', 'com.mysql.cj.jdbc.Driver')) 以方便程序找到驱动进行连接

SparkSQL函数定义

无论Hive还是SparkSQL分析处理数据时,往往需要使用函数,SparkSQL模块本身自带很多实现公共功能的函数,在pyspark.sql.functions中。
SparkSQL与Hive一样支持定义函数:UDF和UDAF,尤其是UDF函数在实际项目中使用最为广泛。
回顾Hive中自定义函数有三种类型:
  • 第一种:UDF (User-Defined-Function)函数
    • 一对一的关系,输入一个值经过函数以后输出一个值;
    • 在Hive中继承UDF类,方法名称为evaluate,返回值不能为void,其实就是实现一个方法;
  • 第二种:UDAF (User-Defined Aggregation Function) 聚合函数
    • 多对一的关系,输入多个值输出一个值,通常与groupBy联合使用;
  • 第三种:UDTF (User-Defined Table-Generating Functions) 函数
    • 一对多的关系,输入一个值输出多个值 (一行变为多行);
    • 用户自定义生成函数,有点像flatMap;

SparkSQL 定义UDF函数

  • UDF 是用户自定义函数,接收一行中的一个或多个列作为输入,并返回一个标量值。通常用于数据转换或自定义计算。如:对单个值进行操作,比如将字符串转换为大写、计算数值的平方等。
  • UDAF 是用户自定义聚合函数,接收一组值作为输入,并返回一个标量值。通常用于聚合操作,如求和、平均值等。如:对一组值进行聚合操作,比如计算列的平均值、最大值等。
  • UDTF 是用户自定义表生成函数,接收一行中的一个或多个列作为输入,并返回多行和多列。通常用于将一行拆分为多行。如:将一行数据转换成多行数据,例如解析嵌套结构或拆分数组。
PySpark 不直接支持定义 UDAF 和 UDTF。但是可以通过 mappartitions 算子来模拟实现 UDAF,通过定义 UDF 函数然后返回 array 类型、dict 类型来模拟实现 UDTF 函数
import string from pyspark.sql import SparkSession from pyspark.sql import functions as F from pyspark.sql.types import IntegerType, ArrayType, StringType, MapType if __name__ == '__main__': # 创建 SparkSession 并配置 JAR 文件路径 spark = SparkSession.builder \ .appName('dataframe_create') \ .master('local[3]') \ .config('spark.jars', '/export/server/miniconda3/envs/pyspark/lib/python3.8/site-packages/pyspark/jars/mysql-connector-java-8.0.26.jar') \ .config('spark.sql.shuffle.partitions', 3) \ .getOrCreate() sc = spark.sparkContext def num_ride_10(data): return data*10 def splite_str(data): return data.split(' ') def get_dict(data): return {data: string.ascii_letters[data]} try: rdd = sc.parallelize([1,2,3,4,5,6,7]).map(lambda x: [x]) df = rdd.toDF(['num']) # 创建 UDF 方式一 # 参数一:udf1:udf 函数名称,仅可用于 SQL 风格 # 参数二:num_ride_10:处理函数名称 # 参数三:IntegerType:返回值类型 # 返回值:udf1是一个 UDF 对象,仅可用于 DSL 语法 udf2 = spark.udf.register('udf1', num_ride_10, IntegerType()) df.selectExpr("udf1(num)").show() # 以 SQL 表达式执行,只能使用 udf 函数名称 df.select(udf2(df.num)).show() # 以 DSL 语法执行,只能使用 udf 返回值 # 创建 UDF 方式二 udf3 = F.udf(num_ride_10, IntegerType()) df.select(udf3(df.num)).show() # 只能以以 DSL 语法执行,只能使用 udf 返回值 ### 返回 Array 类型的数据 rdd2 = sc.parallelize([['hadoop hive spark'],['hive flink jave']]) df2 = rdd2.toDF(['lins']) udf4 = spark.udf.register('split_str', splite_str, ArrayType(elementType=StringType())) df2.selectExpr("split_str(lins)").show(truncate=False) df2.select(udf4(df2.lins)).show(truncate=False) udf5 = F.udf(f=splite_str, returnType=ArrayType(elementType=StringType())) df2.select(udf5(df2.lins)).show(truncate=False) ### 返回字典类型的数据 rdd3 = sc.parallelize([[1],[2],[3],[4]]) df3 = rdd3.toDF(['num']) udf6 = spark.udf.register('udf_dict', get_dict, MapType(keyType=IntegerType(), valueType=StringType())) df3.selectExpr("udf_dict(num)").show() df3.select(udf6(df3.num)).show() udf7 = F.udf(f=get_dict, returnType=MapType(keyType=IntegerType(), valueType=StringType())) df3.select(udf7(df3.num)).show() finally: spark.stop()

SparkSQL 定义UDAF函数

from pyspark.sql import SparkSession if __name__ == '__main__': # 创建 SparkSession 并配置 JAR 文件路径 spark = SparkSession.builder \ .appName('dataframe_create') \ .master('local[3]') \ .config('spark.jars', '/export/server/miniconda3/envs/pyspark/lib/python3.8/site-packages/pyspark/jars/mysql-connector-java-8.0.26.jar') \ .config('spark.sql.shuffle.partitions', 3) \ .getOrCreate() sc = spark.sparkContext def data_process(data): num = 0 for row in data: num += row['num'] # 每一行都是一个 row 对象,row['num'] 就是 num 列的值 return [num] # 一定要返回一个 list 对象,因为 mappartition 函数要求返回一个 list 对象 try: rdd = sc.parallelize([1,2,3,4,5,6,7], 3).map(lambda x: [x]) df = rdd.toDF(['num']) df.show() single_partition_rdd = df.rdd.repartition(1) # 将df转换为rdd,然后重新分区为单分区 print(single_partition_rdd.collect()) mpp_rdd = single_partition_rdd.mapPartitions(data_process) # 使用 mappartitions 函数,传入自定义的函数,实现 udaf 的聚合效果 print(mpp_rdd.collect()) finally: spark.stop()

SparkSQL 使用窗口函数

from pyspark.sql import SparkSession from pyspark.sql.types import * if __name__ == '__main__': # 创建 SparkSession 并配置 JAR 文件路径 spark = SparkSession.builder \ .appName('dataframe_create') \ .master('local[3]') \ .config('spark.jars', '/export/server/miniconda3/envs/pyspark/lib/python3.8/site-packages/pyspark/jars/mysql-connector-java-8.0.26.jar') \ .config('spark.sql.shuffle.partitions', 3) \ .getOrCreate() sc = spark.sparkContext try: df = spark.read.format('csv').option('sep', '\t').option('header', 'false').schema('name STRING, subject STRING,score INTEGER').load('file:///home/hadoop/score.txt') df.show() df.createOrReplaceTempView('stu') spark.sql('select *, avg(score) over() as avg_over from stu').show() spark.sql(""" select *, row_number() over(order by score desc) as row_num_rank, dense_rank() over(partition by subject order by score desc) as dense_rank, rank() over(order by score desc) as rank from stu """).show() spark.sql('select *, ntile(3) over(order by score desc) as percent_rank from stu').show() finally: spark.stop()

SparkSQL的运行流程

RDD 执行逻辑:代码->DAG调度器逻辑任务-> Task调度器任务分配和管理监控-> Worker干活
RDD的运行会完全按照开发者的代码执行, 如果开发者水平有限,RDD的执行效率也会受到影响。而SparkSQL会对写完的代码,执行“自动优化”, 以提升代码运行效率,避免开发者水平影响到代码执行效率。
为什么SparkSQL可以自动优化 而RDD不可以?
RDD:内含数据类型不限格式和结构 DataFrame:100% 是二维表结构,可以被针对

Catalyst优化器

SparkSQL的自动优化,依赖于:Catalyst优化器
为了解决过多依赖 Hive 的问题,SparkSQL 使用了一个新的 SQL优化器替代 Hive 中的优化器,这个优化器就是 Catalyst, 整个SparkSQL 的架构大致如下:
notion image
1.API 层简单的说就是 Spark 会通过一些 API 接受 SQL 语句 2.收到 SQL 语句以后,将其交给 Catalyst, Catalyst 负责解析 SQL,生成执行计划等 3.Catalyst 的输出应该是 RDD 的执行计划 4.最终交由集群运行
Catalyst优化器的具体流程
语法树的阅读顺序是从下往上的
语法树的阅读顺序是从下往上的
notion image
notion image
列值裁剪:在读取到数据之后就判断 select 所需要的数据列,并将不需要的列清理掉。scv、json 格式的数据是先读取在裁剪,但是 parquet 格式的文件是列式存储的,每个列单独存储,所以在读取的时候就能直接筛选。因此 parquet 格式的数据也是比较适合 spark 的数据格式
notion image
notion image
notion image
  1. 提交SparkSQL代码
  1. catalyst优化 a. 生成原始AST语法数 b. 标记AST元数据 c.进行断言下推和列值裁剪 以及其它方面的优化作用在AST上 d. 将最终AST得到,生成执行计划 e. 将执行计划翻译为RDD代码
  1. Driver执行环境入口构建 (SparkSession)
  1. DAG 调度器规划逻辑任务
  1. TASK 调度区分配逻辑任务到具体Executor上工作并监控管理任务
  1. Worker干活.

  • Twikoo
  • Giscus
  • GitTalk