如何利⽤Python编程执⾏Spark
Apache Spark是⼀个对开发者提供完备的库和API的集计算系统,并且⽀持多种语⾔,包括Java,Python,R和Scala。SparkSQL相当于Apache Spark的⼀个模块,在DataFrame API的帮助下可⽤来处理⾮结构化数据。
通过名为PySpark的Spark Python API,Python实现了处理结构化数据的Spark编程模型。
这篇⽂章的⽬标是展⽰如何通过PySpark运⾏Spark并执⾏常⽤函数。
Python编程语⾔要求⼀个安装好的IDE。最简单的⽅式是通过Anaconda使⽤Python,因其安装了⾜够的IDE包,并附带了其他重要的包。
1、下载Anaconda并安装PySpark
通过这个链接,你可以下载Anaconda。你可以在Windows,macOS和Linux操作系统以及64位/32位图形安装程序类型间选择。我们推荐安装Python的最新版本。
下载好合适的Anaconda版本后,点击它来进⾏安装,安装步骤在Anaconda Documentation中有详细的说明。
安装完成时,Anaconda导航主页(Navigator Homepage)会打开。因为只是使⽤Python,仅需点击“Notebook”模块中的“Launch”按钮。
Anaconda导航主页
为了能在Anaconda中使⽤Spark,请遵循以下软件包安装步骤。
第⼀步:从你的电脑打开“Anaconda Prompt”终端。
第⼆步:在Anaconda Prompt终端中输⼊“conda install pyspark”并回车来安装PySpark包。
第三步:在Anaconda Prompt终端中输⼊“conda install pyarrow”并回车来安装PyArrow包。
当PySpark和PyArrow包安装完成后,仅需关闭终端,回到Jupyter Notebook,并在你代码的最顶部导⼊要求的包。
import pandas as pd
from pyspark.sql import SparkSession
t import SparkContext
from pyspark.sql.functions
import *from pes
import *from datetime import date, timedelta, datetime
import time
2、初始化SparkSession
⾸先需要初始化⼀个Spark会话(SparkSession)。通过SparkSession帮助可以创建DataFrame,并以表格的形式注册。其次,可以执⾏SQL表格,缓存表格,可以阅读parquet/json/csv/avro数据格式的⽂档。
sc = SparkSession.builder.appName("PysparkExample")\
.config ("spark.sql.shuffle.partitions", "50")\
.config("spark.driver.maxResultSize","5g")\
成语大全 四字成语.config ("abled", "true")\
.getOrCreate()
想了解SparkSession每个参数的详细解释,请访问pyspark.sql.SparkSession。
3、创建数据框架
⼀个DataFrame可被认为是⼀个每列有标题的分布式列表集合,与关系的⼀个表格类似。在这篇⽂章中,处理数据集时我们将会使⽤在PySpark API中的DataFrame操作。
3.1、从Spark数据源开始
DataFrame可以通过读txt,csv,json和parquet⽂件格式来创建。在本⽂的例⼦中,我们将使⽤.json格式的⽂件,你也可以使⽤如下列举的相关读取函数来寻并读取text,csv,parquet⽂件格式。
#Creates a spark data frame called as raw_data.
#JSON
快乐大本营宫锁沉香剧组dataframe = sc.read.json('dataset/nyt2.json')
#TXT FILES#
dataframe_txt = ('')
#CSV FILES#
dataframe_csv = sc.read.csv('csv_data.csv')
#PARQUET FILES#
dataframe_parquet = sc.read.load('parquet_data.parquet')
4、重复值
表格中的重复值可以使⽤dropDuplicates()函数来消除。
庆党100周年祝福语dataframe = sc.read.json('dataset/nyt2.json')
dataframe.show(10)
使⽤dropDuplicates()函数后,我们可观察到重复值已从数据集中被移除。
dataframe_dropdup = dataframe.dropDuplicates() dataframe_dropdup.show(10)
5、查询
查询操作可被⽤于多种⽬的,⽐如⽤“select”选择列中⼦集,⽤“when”添加条件,⽤“like”筛选列内容。接下来将举例⼀些最常⽤的操作。完整的查询操作列表请看Apache Spark⽂档。
5.1、“Select”操作
可以通过属性(“author”)或索引(dataframe[‘author’])来获取列。
#Show all entries in title column
dataframe.select("author").show(10)
#Show all entries in title, author, rank, price columns
dataframe.select("author", "title", "rank", "price").show(10)
第⼀个结果表格展⽰了“author”列的查询结果,第⼆个结果表格展⽰多列查询。
5.2、“When”操作
在第⼀个例⼦中,“title”列被选中并添加了⼀个“when”条件。
# Show title and assign 0 or 1 depending on title
dataframe.select("title",when(dataframe.title != 'ODD HOURS',
1).otherwise(0)).show(10)
展⽰特定条件下的10⾏数据
在第⼆个例⼦中,应⽤“isin”操作⽽不是“when”,它也可⽤于定义⼀些针对⾏的条件。
# Show rows with specified authors if in the given options
dataframe [dataframe.author.isin("John Sandford",
"Emily Giffin")].show(5)
5⾏特定条件下的结果集
5.3、“Like”操作
在“Like”函数括号中,%操作符⽤来筛选出所有含有单词“THE”的标题。如果我们寻求的这个条件是精确匹配的,则不应使⽤%算符。
# Show author and title is TRUE if title has " THE " word in titles
dataframe.select("author", "title",
dataframe.title.like("% THE %")).show(15)
title列中含有单词“THE”的判断结果集
5.4、“startswith”-“endswith”
StartsWith指定从括号中特定的单词/内容的位置开始扫描。类似的,EndsWith指定了到某处单词/内容结束。两个函数都是区分⼤⼩写的。dataframe.select("author", "title",
dataframe.title.startswith("THE")).show(5)
dataframe.select("author", "title",
dswith("NT")).show(5)
初一数学教学总结对5⾏数据进⾏startsWith操作和endsWith操作的结果。
5.5、“substring”操作
Substring的功能是将具体索引中间的⽂本提取出来。在接下来的例⼦中,⽂本从索引号(1,3),(3,6)和(1,6)间被提取出来。dataframe.select(dataframe.author.substr(1
, 3).alias("title")).show(5)
dataframe.select(dataframe.author.substr(3
, 6).alias("title")).show(5)
dataframe.select(dataframe.author.substr(1
, 6).alias("title")).show(5)
分别显⽰⼦字符串为(1,3),(3,6),(1,6)的结果
6、增加,修改和删除列
在DataFrame API中同样有数据处理函数。接下来,你可以到增加/修改/删除列操作的例⼦。
6.1、增加列
# Lit() is required while we are creating columns with exact
values.
dataframe = dataframe.withColumn('new_column',
F.lit('This is a new column'))
display(dataframe)
在数据集结尾已添加新列
6.2、修改列
对于新版DataFrame API,withColumnRenamed()函数通过两个参数使⽤。
# Update column 'amazon_product_url' with 'URL'
dataframe = dataframe.withColumnRenamed('amazon_product_url', 'URL')
dataframe.show(5)
“Amazon_Product_URL”列名修改为“URL”
6.3、删除列
列的删除可通过两种⽅式实现:在drop()函数中添加⼀个组列名,或在drop函数中指出具体的列。两个例⼦展⽰如下。dataframe_remove = dataframe.drop("publisher",
"published_date").show(5)
dataframe_remove2=dataframe \
.drop(dataframe.publisher).drop(dataframe.published_date).show(5)
“publisher”和“published_date”列⽤两种不同的⽅法移除。
7、数据审阅
存在⼏种类型的函数来进⾏数据审阅。接下来,你可以到⼀些常⽤函数。想了解更多则需访问Apache Spark doc。# Returns dataframe column names and data types
dataframe.dtypes
# Displays the content of dataframe
演员陈数dataframe.show()
# Return first n rows
dataframe.head()
# Returns first row
dataframe.first()
# Return first n rows
dataframe.take(5)
# Computes summary statistics
dataframe.describe().show()
# Returns columns of dataframe
# Counts the number of rows in dataframe
市场营销就业方向# Counts the number of distinct rows in dataframe
dataframe.distinct().count()
# Prints plans including physical and logical
8、“GroupBy”操作
通过GroupBy()函数,将数据列根据指定函数进⾏聚合。
# Group by author, count the books of the authors in the groups
作者被以出版书籍的数量分组
9、“Filter”操作
通过使⽤filter()函数,在函数内添加条件参数应⽤筛选。这个函数区分⼤⼩写。
# Filtering entries of title
# Only keeps records having value 'THE HOST'
dataframe.filter(dataframe["title"] == 'THE HOST').show(5)
标题列经筛选后仅存在有“THE HOST”的内容,并显⽰5个结果。
10、缺失和替换值
对每个数据集,经常需要在数据预处理阶段将已存在的值替换,丢弃不必要的列,并填充缺失值。pyspark.sql.DataFrameNaFunction库帮助我们在这⼀⽅⾯处理数据。举例如下。
# Replacing null values
dataframe.na.fill()
dataFrame.fillna()
dataFrameNaFunctions.fill()
# Returning new dataframe restricting rows with null valuesdataframe.na.drop()
dataFrame.dropna()
dataFrameNaFunctions.drop()
# Return new dataframe replacing one value with another
place(5, 15)
11、重分区
在RDD(弹性分布数据集)中增加或减少现有分区的级别是可⾏的。使⽤repartition(self,numPartitions)可以实现分区增加,这使得新的RDD获得相同/更⾼的分区数。分区缩减可以⽤coalesce(self, numPartitions, shuffle=False)函数进⾏处理,这使得新的RDD有⼀个减少了的分
区数(它是⼀个确定的值)。请访问Apache Spark doc获得更多信息。
# Dataframe with 10 partitions
# Dataframe with 1 partition
12、嵌⼊式运⾏SQL查询
原始SQL查询也可通过在我们SparkSession中的“sql”操作来使⽤,这种SQL查询的运⾏是嵌⼊式的,返回⼀个DataFrame格式的结果集。请访问Apache Spark doc获得更详细的信息。
# Registering a table
sc.sql("select * from df").show(3)
sc.sql("select \
CASE WHEN description LIKE '%love%' THEN 'Love_Theme' \
WHEN description LIKE '%hate%' THEN 'Hate_Theme' \
WHEN description LIKE '%happy%' THEN 'Happiness_Theme' \
WHEN description LIKE '%anger%' THEN 'Anger_Theme' \
WHEN description LIKE '%horror%' THEN 'Horror_Theme' \
WHEN description LIKE '%death%' THEN 'Criminal_Theme' \
WHEN description LIKE '%detective%' THEN 'Mystery_Theme' \
ELSE 'Other_Themes' \ END Themes \
from df").groupBy('Themes').count().show()
发布评论