Programming with RDDsjava
This chapter introduces Spark’s core abstraction for working with data, the resilient
distributed dataset (RDD). An RDD is simply a distributed collection of elements. In
Spark all work is expressed as either creating new RDDs, transforming existing RDDs, or
calling operations on RDDs to compute a result.RDDs are the core conceptnode
in Spark.python
RDD Basicsshell
An RDD in Spark is simply an immutable distributed collection of objects. Each RDD is split into multiple partitions, which may be computed on different nodes of the cluster.express
一、Create an RDD apache
by loading an external dataset, or by distributing a collection of objects (e.g., a list or set) in their driver program. We have already seen loading a text file as an RDD of strings using SparkContext.textFile().api
lines = sc.textFile(「README.md」)緩存
二、兩種操做session
RDDs offer two types of operationsjvm
|————transformations
|————actions
***Transformations construct a new RDD from a previous one,for example:
pythonLines = lines.filter(lambda line: 「Python」 in line)
***Actions, on the other hand, compute a result based on an RDD, and either return it to the driver program or save it to an external storage system
pythonLines.first()
Transformations and actions are different because of the way Spark computes RDDs.具體而言,你能夠隨時定義一個RDD,可是spark計算RDD採起的一種懶惰方式(lazzy fashion)——transformation操做不會去真正的掃描計算RDD,直到你使用Action操做的時候,這個時候纔會真正的計算RDD。好比上面的lines=sc.textFile("")不會當即把文件讀入內存,知道使用了一個Action操做linses.first()的時候才真正的scans RDD的數據,並且不是徹底掃描,一部分一部分數據加載,找到第一個知足條件的結果就結束。
三、Finally, Spark’s RDDs are by default recomputed each time you run an action on them. If you would like to reuse an RDD in multiple actions, you can ask Spark to persist it using RDD.persist().其實這也至關因而一種優化,重用計算結果。若是不persist下來,默認狀況下spark會把計算出來的結果消除,這在大數據的場景中也是合理的,這樣能夠節省cluster寶貴的memory。
To summarize, every Spark program and shell session will work as follows:
1. Create some input RDDs from external data.
2. Transform them to define new RDDs using transformations like filter().
3. Ask Spark to persist() any intermediate RDDs that will need to be reused.
4. Launch actions such as count() and first() to kick off a parallel computation,
which is then optimized and executed by Spark.
Tip
cache() is the same as calling persist() with the default storage level.
====================================================
後面是對上面四部中涉及到的操做展開介紹。
一、Creating RDDs
Spark provides two ways to create RDDs: loading an external dataset and parallelizing a collection in your driver program.
(1)The simplest way to create RDDs is to take an existing collection in your program and pass it to SparkContext’s parallelize() method。 Keep in mind, however, that outside of prototyping and testing, this is not widely used since it requires that you have your entire dataset in memory on one machine.
examples:
lines = sc.parallelize([「pandas」, 「i like pandas」]) #python
JavaRDD<String> lines = sc.parallelize(Arrays.asList(「pandas」, 「i like pandas」)); //java
(2) a more common way
sc.textFiel("/path/to/README.md");
RDD Operations
As we’ve discussed, RDDs support two types of operations: transformations and actions.Transformations are operations on RDDs that return a new RDD, such as map() and filter(). Actions are operations that return a result to the driver program or write it to storage, and kick off a computation, such as count() and first(). Spark treats transformations and actions very differently, so understanding which type of operation you are performing will be important. If you are ever confused whether a given function is a transformation or an action, you can look at its return type: transformations return RDDs, whereas actions return some other data type.
(一)Transformations
Transformations are operations on RDDs that return a new RDD. As discussed in 「Lazy Evaluation」, transformed RDDs are computed lazily, only when you use them in an action. Many transformations are element-wise; that is, they work on one element at a time; but this is not true for all transformations.
filter() transformation in Python
inputRDD = sc.textFile(「log.txt」)
errorsRDD = inputRDD.filter(lambda x: 「error」 in x)
filter() transformation in Java
JavaRDD<String> inputRDD = sc.textFile(「log.txt」);
JavaRDD<String> errorsRDD = inputRDD.filter(
new Function<String, Boolean>() {
public Boolean call(String x) { return x.contains(「error」); }
}
});
Note that the filter() operation does not mutate the existing inputRDD. Instead, it returns a pointer to an entirely new RDD.
Finally, as you derive new RDDs from each other using transformations, Spark keeps track of the set of dependencies between different RDDs, called the lineage graph.(這張圖有點相似於類的集成關係圖,一旦發生錯誤或數據丟失的時候的時候,能夠及時恢復)
(二)Actions
Actions are the second type of RDD operation. They are the operations that return a final value to the driver program or write data to an external storage system.
#Python error count using actions print 「Input had 「 + badLinesRDD.count() + 」 concerning lines」 print 「Here are 10 examples:」 for line in badLinesRDD.take(10): print line
Java error count using actions System.out.println(「Input had 「 + badLinesRDD.count() + 」 concerning lines」) System.out.println(「Here are 10 examples:」) for (String line: badLinesRDD.take(10)) { System.out.println(line); }
Note : take() VS collect
RDDs also have a collect() function aproximately equaling to take(), which can retrieve the entire RDD. Keep in mind that your entire dataset must fit in memory on a single machine to use collect() on it, so collect() shouldn’t be used on large datasets.
In most cases RDDs can’t just be collect()ed to the driver because they are too large. In these cases, it’s common to write data out to a distributed storage system such as HDFS or Amazon S3. You can save the contents of an RDD using the saveAsTextFile() action, saveAsSequenceFile(), or any of a number of actions for various built-in formats. We will cover the different options for exporting data in Chapter 5.
It is important to note that each time we call a new action, the entire RDD must be computed 「from scratch.」 To avoid this inefficiency, users can persist intermediate results, as we will cover in 「Persistence (Caching)」.
(三)Lazy Evaluation
Rather than thinking of
an RDD as containing specific data, it is best to think of each RDD as consisting of
instructions on how to compute the data that we build up through transformations.
Passing Functions to Spark
固然也是三種語言的形式,這裏主要介紹的是Python和java版本,其實傳遞函數的方法,前面已經介紹過了。
這裏說明Python的傳遞函數時要注意的一一個問題,
One issue to watch out for when passing functions is inadvertently serializing the object containing the function. When you pass a function that is the member of an object, or contains references to fields in an object (e.g., self.field), Spark sends the entire object to worker nodes, which can be much larger than the bit of information you need (seeExample 3-19). Sometimes this can also cause your program to fail, if your class containsobjects that Python can’t figure out how to pickle.
Example 3-19. Passing a function with field references (don’t do this!) class SearchFunctions(object): def __init__(self, query): self.query = query def isMatch(self, s): return self.query in s def getMatchesFunctionReference(self, rdd): # Problem: references all of 「self」 in 「self.isMatch」 return rdd.filter(self.isMatch) def getMatchesMemberReference(self, rdd): # Problem: references all of 「self」 in 「self.query」 return rdd.filter(lambda x: self.query in x)
做者所說的問題,上面帶註釋的地方就是代表的,就是傳參,尤爲是傳遞對象中的某個域的時候,必定先把域中的內容extract出來,用一個本地變量報錯,而後傳遞這個本地變量,像下面這樣操做:
class WordFunctions(object): … def getMatchesNoReference(self, rdd): # Safe: extract only the field we need into a local variable query = self.query return rdd.filter(lambda x: query in x
Java
In Java, functions are specified as objects that implement one of Spark’s function interfaces from the org.apache.spark.api.java.function package. There are a number of different interfaces based on the return type of the function. We show the most basic function interfaces in Table 3-1, and cover a number of other function interfaces for when we need to return special types of data, like key/value data, in 「Java」.
Table 3-1. Standard Java function interfaces
Function name Method to implement Usage
(1)Function<T, R> R call(T) Take in one input and return one output, for use with operations like map() and filter().
(2)Function2<T1, T2, R> R call(T1, T2) Take in two inputs and return one output, for use with operations like aggregate() or fold().(3)FlatMapFunction<T,R> Iterable<R>call(T) Take in one input and return zero or more outputs, for use with operationslike flatMap().
We can either define our function classes inline as anonymous inner classes (Example 3-
22), or create a named class (Example 3-23).
Example 3-22. Java function passing with anonymous inner class RDD<String> errors = lines.filter(new Function<String, Boolean>() { public Boolean call(String x) { return x.contains(「error」); } }); Example 3-23. Java function passing with named class class ContainsError implements Function<String, Boolean>() { public Boolean call(String x) { return x.contains(「error」); } } RDD<String> errors = lines.filter(new ContainsError());
The style to choose is a personal preference, but we find that top-level named functions
are often cleaner for organizing large programs. One other benefit of top-level functions is
that you can give them constructor parameters, as shown in Example 3-24.
Example 3-24. Java function class with parameters class Contains implements Function<String, Boolean>() { private String query; public Contains(String query) { this.query = query; } public Boolean call(String x) { return x.contains(query); } } RDD<String> errors = lines.filter(new Contains(「error」));
In Java 8, you can also use lambda expressions to concisely implement the function
interfaces. Since Java 8 is still relatively new as of this writing, our examples use the more
verbose syntax for defining classes in previous versions of Java. However, with lambda
expressions, our search example would look like Example 3-25.
Example 3-25. Java function passing with lambda expression in Java 8
RDD<String> errors = lines.filter(s -> s.contains(「error」));
If you are interested in using Java 8’s lambda expression, refer to Oracle’s documentation
and the Databricks blog post on how to use lambdas with Spark.
Tip
Both anonymous inner classes and lambda expressions can reference any final variables
in the method enclosing them, so you can pass these variables to Spark just as in Python
and Scala.
接下來開始認真的介紹經常使用的一些transfornation和Action操做
map()凡有key/value味道的操做,均可以使用這個map()操做,其核心是給map傳遞一個實際執行的函數,好比
Python squaring the values in an RDD nums = sc.parallelize([1, 2, 3, 4]) squared = nums.map(lambda x: x * x).collect() for num in squared: print 「%i 「 % (num)
Java squaring the values in an RDD JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4)); JavaRDD<Integer> result = rdd.map(new Function<Integer, Integer>() { public Integer call(Integer x) { return x*x; } }); System.out.println(StringUtils.join(result.collect(), 「,」));
filter(),前面也屢次使用,好比inputRDD.filter(lambda x: "error" inm x)
flatMap() 用一個輸入,獲得多個輸出內容時,咱們使用這個函數,好比:
flatMap() in Python, splitting lines into words
lines = sc.parallelize([「hello world」, 「hi」])
words = lines.flatMap(lambda line: line.split(」 「))
words.first() # returns 「hello」
flatMap() in Java, splitting lines into multiple words
JavaRDD<String> lines = sc.parallelize(Arrays.asList(「hello world」, 「hi」));
JavaRDD<String> words =
lines.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String line) {
return Arrays.asList(line.split(」 「));
}
});
words.first(); // returns 「hello」
通俗的將map()和flatMap()的區別:
- Spark 中 map函數會對每一條輸入進行指定的操做,而後爲每一條輸入返回一個對象;
- 而flatMap函數則是兩個操做的集合——正是「先映射後扁平化」:
操做1:同map函數同樣:對每一條輸入進行指定的操做,而後爲每一條輸入返回一個對象
操做2:最後將全部對象合併爲一個對象
具體而言就像上面的例子:
lines = sc.parralize(["hello world", "hi lilei"])
wordsMap = lines.map(lambda line : line.split(" "))
wordsMap.first() # ["hello", "word"]
worsFlatMap = lines.FlatMap(lambda line : line.split(" "))
wordsFlatMap.first() # 'hello'
wordsMap: {['hello', 'word'], ['hi', 'lilei']}
wordFlatmap: {'hello', 'world', 'hi', 'lilei'}
RDD支持一些僞集合操做:
包括,distinct, union , intersection , subtract(就是差集)
cartesian(),用來計算兩個RDD的笛卡爾積Cartesian Product
=====再來總結一下常見的transformation====
一、對單個rdd使用的
map() 對每個元素操做,進來多少個元素,返回的元素個數不變
flatMap()對每個元素操做,最終把每個元素又變成更小的不能拆的元素
fileter() distinct() distinct()
二、對兩個rdd操做
union() intersaction() subtract() cartisian()
Actions
reduce()操做:能夠方便的實現sum求和,計算rdd中元素的個數等等.reduce(binary_function)
reduce將RDD中元素前兩個傳給輸入函數,產生一個新的return值,新產生的return值與RDD中下一個元素(第三個元素)組成兩個元素,再被傳給輸入函數,直到最後只有一個值爲止。
Python中的示例代碼
sum = rdd.reduce(lambda x, y: x + y)
Java中的實例代碼:
Integer sum = rdd.reduce(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer x, Integer y) { return x + y; }
});
aggregate()aggregate函數將每一個分區裏面的元素進行聚合,而後用combine函數將每一個分區的結果和初始值(zeroValue)進行combine操做。這個函數最終返回的類型不須要和RDD中元素類型一致。說實話這裏aggregate的Python示例代碼看的不是很明白,可是java版本的示例代碼看的仍是聽明白的。
aggregate() in Python:
1 sumCount = nums.aggregate((0, 0), 2 (lambda acc, value: (acc[0] + value, acc[1] + 1), 3 (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))) 4 return sumCount[0] / float(sumCount[1])
aggregate() in Java:
1 class AvgCount implements Serializable { 2 public AvgCount(int total, int num) { 3 this.total = total; 4 this.num = num; 5 } 6 public int total; 7 public int num; 8 public double avg() { 9 return total / (double) num; 10 } 11 } Function2<AvgCount, Integer, AvgCount> addAndCount = 12 new Function2<AvgCount, Integer, AvgCount>() { 13 public AvgCount call(AvgCount a, Integer x) { 14 a.total += x; 15 a.num += 1; 16 return a; 17 } 18 }; 19 Function2<AvgCount, AvgCount, AvgCount> combine = 20 new Function2<AvgCount, AvgCount, AvgCount>() { 21 public AvgCount call(AvgCount a, AvgCount b) { 22 a.total += b.total; 23 a.num += b.num; 24 return a; 25 } 26 }; 27 AvgCount initial = new AvgCount(0, 0); 28 AvgCount result = rdd.aggregate(initial, addAndCount, combine); 29 System.out.println(result.avg());
collect(), which returns the entire RDD’s contents. collect() is commonly used in unit
tests where the entire contents of the RDD are expected to fit in memory, as that makes it easy to compare the value of our RDD with our expected result.
take(n) returns n elements from the RDD and attempts to minimize the number of
partitions it accesses, so it may represent a biased collection. It’s important to note that
these operations do not return the elements in the order you might expect.
top() If there is an ordering defined on our data, we can also extract the top elements from an RDD using top(). top() will use the default ordering on the data, but we can supply our own comparison function to extract the top elements.
takeSample(withReplacement, num, seed) function allows us to take a sample of our
data either with or without replacement.
固然還有不少的Action的操做,這裏就怒在一一列舉,具體,能夠參看書的P69總結的一個列表。
Converting Between RDD TypesRDD之間的類型轉換
在spark中有一些函數只能操做數值型的RDD(numeric rdds),有一些函數只能操做數值對類型RDD(key/values RDD)。注意這些函數在Scala和Java中不是使用標準類的定義。
Persistence (Caching)
這個意思說說Spark的緩存機制,由於有些數據須要屢次使用,因此就把相應的RDD緩存在機器中。
Python,java和Scala一種三種緩存機制,java和scala的緩存機制同樣,是吧RDD中的數據緩存在jvm的heap中,Python則是把數據序列化出來寫到硬盤中。
緩存技術有不少的等級,persistence levels,這個能夠詳細參見P72的table3-6,好比有MEMORY_ONLY,MEMORY_ONL_SER,MERORY_AND_DISK,MEMORY_AND_DISK_SER,DISK_ONLY
並分析了各類緩存策略的cpu時間,內存佔用率等。下面是一段Scala的實例代碼
1 val result = input.map(x => x * x) 2 result.persist(StorageLevel.DISK_ONLY) 3 println(result.count()) 4 println(result.collect().mkString(「,」))
Notice that we called persist() on the RDD before the first action. The persist() call
on its own doesn’t force evaluation.
If you attempt to cache too much data to fit in memory, Spark will automatically evict old partitions using a Least Recently Used (LRU) cache policy. For the memory-only storage levels, it will recompute these partitions the next time they are accessed, while for the memory-and-disk ones, it will write them out to disk. In either case, this means that you don’t have to worry about your job breaking if you ask Spark to cache too much data.However, caching unnecessary data can lead to eviction of useful data and more recomputation time.
Finally, RDDs come with a method called unpersist() that lets you manually remove
them from the cache.
到這裏chapter 3就講完了。做者在結餘裏面這樣說:The ability to always recompute an RDD is actually why RDDs are called 「resilient.」 When a machine holding RDD data fails, Spark uses this ability to recompute the missing partitions, transparent to the user.RDD老是可以被重複計算的能力就是RDD被稱爲「彈性」的實際緣由,當一臺機器所擁有的RDD數據失敗的時候,Spark會使用這種彈性計算的能力重複計算丟失的部分,這個過程對用戶而言徹底是透明的。