前言:我们来学习Spark基础吧!

一、搭建学习环境

1、下载spark

我使用的是spark1.6.2,下载地址 我们直接下载,然后解压。我们看看里面的目录

2、python-shell

我们运行bin/pyspark之后就进入了spark的python shell。我们为了验证是否成功了,可以运行下面的代码

lines = sc.textFile("README.md")

print lines.first()

接下来就会看到打印出一条信息:# Apache Spark。 spark提供的python shell是我们良好的学习平台。我们可以在里面随意的调用spark提供的API。

3、IDE环境

可能有些同学已经习惯了IDE带来的好处(例如我),所以也希望能通过IDE来进行学习和开发。 但是spark并没有提供任何python 模块给我们下载使用, 也就是说,你无法通过pip install的方式下载spark模块。 这一点就不如java和scala了,maven是可以直接集成spark的。 所以我们要做一点额外的事情以让pycharm能够拥有开发spark程序的能力。

在pycharm找到Project Structure 把解压的目录中的python目录加进去 添加run–>Edit configurations。 添加一个运行配置。并配置SPARK_HOME环境变量为解压目录。然后配置PYTHONPATH环境变量为解压目录中的python目录。 然后各位就可以在pycharm上编写spark代码并运行了。 “”“SimpleApp”“” from pyspark import SparkContext logFile = “/Users/sungaofei/Documents/spark/README.md” sc = SparkContext(“local”,“Simple App”) logData = sc.textFile(logFile).cache() numAs = logData.filter(lambda s: ‘a’ in s).count() numBs = logData.filter(lambda s: ‘b’ in s).count() temp = logData.first() print temp print(“Lines with a: %i, lines with b: %i”%(numAs, numBs))

二、从demo中学习

"""SimpleApp"""

from pyspark import SparkContext

logFile = "/Users/sungaofei/Documents/spark/README.md"

conf = SparkConf().setMaster("local").setAppName("My App")

sc = SparkContext(conf = conf)

logData = sc.textFile(logFile).cache()

numAs = logData.filter(lambda s: 'a' in s).count()

numBs = logData.filter(lambda s: 'b' in s).count()

temp = logData.first()

print temp

print("Lines with a: %i, lines with b: %i"%(numAs, numBs))

在全局的level,spark的应用是由一个驱动程序在集群中发起并行的多个操作。这个驱动程序包含了你的应用的main函数并定义了你的分布式数据集合。在上面的例子中,我们可以理解SparkContext对象就是spark的驱动。它负责连接集群,我们用local模式告诉spark我们使用本地集群模式以方便我们学习和调试。在你有了这个驱动之后,我们就可以随意的创建RDD了,RDD是Spark的分布式数据集合的定义,我们暂时只需要知道它是存储数据的地方,之后会详细说明一下。 sc.textFile()是从一个文本中读取数据并转换为RDD的方法。当我们有了RDD之后,就可以随意调用spark提供给我们的方法。例如上面例子中的filter(熟悉python的朋友一定觉得这个方法很熟悉)以及count方法。 在我们上面的操作中。 驱动程序会在集群中管理一定数量的executor。 例如当我们调用count()方法的时候。集群中不同的机器会各自读取这个文件中的一部分并分别计算各自的行数,当然了现在我们使用的是local模式。所以我们的应用是运行在单机上的。整个过程差不多是下面这个样子的。 在这个demo中,我们是可以看到spark是支持函数式编程的,大部分的方法都要求传递一个函数进去。例如上面的filter方法。这是一个过滤函数,上面的demo中我们分别取包含字母a和b的行。熟悉python的小伙伴一定对lambda表达式不陌生了。

三、RDD基础

上面提到过RDD,它是spark定义的固定不变的分布式数据集合

说它固定不变是因为它一经创建后你就无法改变它的内容了。你只能通过当前的RDD调用一些方法来生成新的RDD,但是你永远都无法真正改变一个RDD的数据。例如刚才的demo,我们调用filter方法过滤掉一些数据,但我们并没有改变原有RDD的数据,你在其他地方调用原RDD的时候仍然是全量的未经过滤的数据。 filter方法返回的是一个新的RDD 说它是分布式数据集合是因为每一个RDD都由多个partitions(分片)组成。上一篇我们讲到HDFS,所以首先数据是分布在不同的机器上的。在spark读取数据的时候会根据一定的规则(可以是默认64M一个partition,也可以指定partition数量)。 这些分布在不同partition也就是数据分片组成了RDD。spark在运行的时候,每个partition都会生成一个task。他们会跑在不同的计算资源上。我们知道java中万物皆对象,在spark中所有数据皆RDD。可以说RDD就是spark的一切,就如MapReduce就是Haddop的一切一样。

我们可以使用两种方式创建RDD

通过sc.textFile()从外部文件中读取。就如我们的demo一样 通过从一个集合中初始化一个RDD。如下: lines = sc.parallelize([“pandas”, “i like pandas”])

1、transformations and actions

RDD支持两种操作,transformation和action。ransformation的操会返回一个新的RDD,就如我们在demo中看到的filter()方法,是一种组织和准备数据的方式。为之后的action执行计算提供数据基础。action的操作是真正产生一个计算操作的过程。例如demo中的count()。 action不会返回一个RDD,它会返回实际的操作结果或者将数据保存到外部文件中。spark提供了很多函数,如果你分不清哪些是transformation哪些是action。 只要看它的返回值就好了。返回一个新RDD的就是transformation,不返回的就是action。 区分一个函数是哪种操作很重要,因为spark处理这两种操作的方式很不一样。

transformation transformation是一种返回一个新的RDD的方法。它遵循延迟计算的规则。也就是说spark在运行的时候遇到transformation的时候并不会真正的执行它,直到碰到一个action的时候才会真正的执行。我们稍后会专门讨论延迟计算的规则。这里我们知道有这个概念就好。大部分transformation都是按行元素处理,就是说他们同一时间只处理一行数据(有少数transformation不是的)。就像上面说的,spark大部分的函数都是函数式编程,要求我们传递一个函数作为参数。那么所有transformation都是需要传递至少一个函数作为参数的, 这个参数就是我们指定的如何处理数据的逻辑。spark会将数据拆成一行一行的并作为参数调用我们指定的函数。就如demo中的filter,spark会将RDD的每一行作为参数传递给我们自定义的函数。 action 像之前说的RDD可以使用很多的transformation来组织和准备数据,但是光准备数据还是不行得,我们终究要用数据计算一些东西,这时候就需要我们的action,就如我们demo中的count()用来计算数据的行数. 我们还可以使用frist()取出第一条数据,用take(n)来取出前n条数据,saveAsTextFile()用来把数据存储到外部文件。也就是说action是我们真正使用数据来进行计算的方式,真正实现数据的价值的方式。 延迟计算 之前提到过,transformation的操作是延迟计算的。意思是说spark在运行的时候,运行到transformation的时候实际上并不会真正的执行transformations。直到碰到了这个RDD的action的时候,才会一股脑的执行之前所有的操作。也许这对刚接触大数据处理的同学来说有点难以理解,但如果我们仔细的想一想就会发现其实这样的设计相当的合理。 因为我们在实际情况中面对的是非常庞大的数据。如果我们在一开始就执行所有的数据操作并将数据载入内存中那将是一种很大的浪费。例如在demo中,如果我们使用的不是count这种操作全部数据的方式而是使用了first()或者take(n)这种只取了一部分数据的操作。那么事先就执行transformation的操作并将所有数据载入内存的话,那将是极大的浪费。所以取而代之的,spark在每次遇到transformation的时候并不会立刻执行,而是通过一些元数据记录RDD的操作轨迹,在遇到action的时候再推断出最优的解决方案。

四、常见的transformation

map 除了我们在demo中看到的filter()方法来过滤数据,我们还可以使用map()这种MapReduce时代保留下来的函数。看下面的demo nums = sc.parallelize([1, 2, 3, 4]) squared = nums.map(lambda x: x * x).collect() for num in squared: print "%i " % (num)

刚才我们说大部分spark的transformation是单行处理的。所以当我们把lambda定义的匿名函数传递给map的时候。 map()会把数据中的每一行取出来作为参数进行调用。它和filter的区别可以用下图来表示。

flatMap() 与map()很相似的一个方法是flatMap()。map的操作是处理每一行的同时,返回的也是一行数据。 flatMap不一样,它返回的是一个可迭代的对象。也就是说map是一行数据转换成一行数据,flatMap是一行数据转换成多行数据。例如下面的demo lines = sc.parallelize([“hello world”, “hi”]) words = lines.flatMap(lambda line: line.split(" ")) words.first() # returns “hello” words.count() # returns 3

下图可以表示map和flatMap的区别 集合操作 上图表示了4种集合操作。

action reduce 最常用的action操作是我们在MapReduce时期就熟悉reduce操作,此操作是一个聚合方法。demo如下: rdd = sc.parallelize([1,2,3,4,5]) sum = rdd.reduce(lambda x, y: x + y)

reduce接受一个函数当做参数,而这个函数也接受两个参数x和y。 这俩个参数代表着RDD中的两行,reduce是聚合函数。 它会不断的将之前计算出的两行传递给函数进行聚合计算。上面demo中的sum为15.因为reduce做了一个累加的操作。

其他 此外还有我们早就见过的count(),以及一些其他的例如:

collect():返回RDD中所有的数据 countByValue():统计每一个value出现的次数 take(n):取出前N行数据 foreach:循环RDD中的每一行数据并执行一个操作 persist

我们上面说过从性能上考虑RDD是延迟计算的,每遇到一个action都会从头开始执行。这样是不够的,因为有的时候我们需要重复使用一个RDD很多次。如果这个RDD的每一个action都要重新载入那么多的数据,那也是很蛋疼的。 所以spark提供了persist函数来让我们缓存RDD。

lines = sc.parallelize(["hello world", "hi"])

a = lines.flatMap(lambda line: line.split(" ")).persist()

a.count()

a.take(10)

上面我们使用persist函数缓存了RDD。所以再调用count()和take()的时候,spark并没有重新执行一次RDD的transformation。spark有很多缓存的级别。可以参考下面的图表 可以使用persist(storageLevel='MEMORY_AND_DISK'),像这样的方式指定缓存级别。 默认是MEMORY_ONLY。

好文链接

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。