16/08/1718:48:31 INFO DAGScheduler: ResultStage 0 (collect at Map.scala:17) finished in0.093 s 16/08/1718:48:31 INFO DAGScheduler: Job 0finished: collect at Map.scala:17, took 0.268871 s 1,4,9,16 ........ 16/08/1718:48:31 INFO DAGScheduler: ResultStage 1 (collect at Map.scala:19) finished in0.000 s 16/08/1718:48:31 INFO DAGScheduler: Job 1finished: collect at Map.scala:19, took 0.018291 s 2,3,4
再回头看下上面那张图,是不是明白什么意思了!
flatMap 另外一个常用的就是flatMap,输入一串字符,分割出每个字符
来用代码实践一下:
1 2 3 4
val lines = sc.parallelize(List("hello world","hi")) val words = lines.flatMap (lines=>lines.split(" ")) print(words.first()) print("\n")
执行结果:
1 2 3
16/08/1719:23:24 INFO DAGScheduler: Job 2finished: first at Map.scala:24, took 0.016987 s hello 16/08/1719:23:24 INFO SparkContext: Invoking stop() from shutdown hook
分隔符如果改一下的话:
1
val words = lines.flatMap (lines=>lines.split(","))
结果会怎样呢?
1 2 3
16/08/17 19:33:14 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool hello world 16/08/17 19:33:14 INFO SparkContext: Invoking stop() from shutdown hook
val rdd1 = sc.parallelize(List("coffee","coffee","panda","monkey","tea")) val rdd2 = sc.parallelize(List("coffee","monkey","kitty"))
rdd1.distinct().take(100).foreach(println)
结果:
1 2 3 4 5 6 7 8
16/08/17 19:52:29 INFO DAGScheduler: ResultStage 4 (take at Map.scala:30) finished in 0.047 s 16/08/17 19:52:29 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool 16/08/17 19:52:29 INFO DAGScheduler: Job 3 finished: take at Map.scala:30, took 0.152405 s monkey coffee panda tea 16/08/17 19:52:29 INFO SparkContext: Starting job: take at Map.scala:32
代码:
1
rdd1.union(rdd2).take(100).foreach(println)
结果:
1 2 3 4 5 6 7 8 9 10 11 12
6/08/1719:52:29 INFO DAGScheduler: Job 5finished: take at Map.scala:32, took 0.011825 s coffee coffee panda monkey tea coffee monkey kitty 16/08/1719:52:30 INFO SparkContext: Starting job: take at Map.scala:34 16/08/1719:52:30 INFO DAGScheduler: Registering RDD 11 (intersection at Map.scala:34) 16/08/1719:52:30 INFO DAGScheduler: Registering RDD 12 (intersection at Map.scala:34)
16/08/17 19:52:30 INFO TaskSetManager: Finished task 0.0 in stage 9.0 (TID 9) in 31 ms on localhost (1/1) 16/08/17 19:52:30 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks have all completed, from pool 16/08/17 19:52:30 INFO DAGScheduler: ResultStage 9 (take at Map.scala:34) finished in 0.031 s 16/08/17 19:52:30 INFO DAGScheduler: Job 6 finished: take at Map.scala:34, took 0.060785 s monkey coffee 16/08/17 19:52:30 INFO SparkContext: Starting job: take at Map.scala:36
代码:
1
rdd1.subtract(rdd2).take(100).foreach(println)
结果:
1 2 3 4
16/08/1719:52:30 INFO DAGScheduler: Job 6finished: take at Map.scala:34, took 0.060785 s monkey coffee 16/08/1719:52:30 INFO SparkContext: Starting job: take at Map.scala:36
val rdd5 = sc.parallelize(List(1,2,3,4)) print("reduce action:"+rdd5.reduce((x,y)=>x+y)+"\n")
1 2 3
16/08/1811:51:16 INFO DAGScheduler: Job 15finished: reduce at Function.scala:55, took 0.012698 s reduce action:10 16/08/1811:51:16 INFO SparkContext: Starting job: aggregate at Function.scala:57
countByValue
1
print(rdd1.countByValue() + "\n")
1 2 3
16/08/1811:51:16 INFO DAGScheduler: Job 11 finished: countByValue at Function.scala:48, took 0.031726 s Map(monkey ->1, coffee ->2, panda ->1, tea ->1) 16/08/1811:51:16 INFO SparkContext: Starting job: takeOrdered at Function.scala:50
takeOrdered
1
rdd1.takeOrdered(10).take(100).foreach(println)
1 2 3 4 5 6 7
16/08/1811:51:16 INFO DAGScheduler: Job 12finished: takeOrdered at Function.scala:50, took 0.026160 s coffee coffee monkey panda tea 16/08/1811:51:16 INFO SparkContext: Starting job: takeSample at Function.scala:52
aggregate 这个要重点介绍一下:
Spark文档中aggregate函数定义如下 def aggregateU(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U Aggregate the elements of each partition, and then the results for all the partitions, using given combine functions and a neutral “zero value”. This function can return a different result type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U and one operation for merging two U’s, as in scala.TraversableOnce. Both of these functions are allowed to modify and return their first argument instead of creating a new U to avoid memory allocation.
16/08/1811:51:16 INFO DAGScheduler: Job 16finished: aggregate at Function.scala:57, took 0.011686 s aggregate action : (10,4) 16/08/1811:51:16 INFO SparkContext: Invoking stop() from shutdown hook