package a import org.apache.spark.rdd.RDD import org.apache.spark.{HashPartitioner, SparkConf, SparkContext} /** * Created by yuduy on 2018/1/11. */ class A { def sc: SparkContext = { val conf = new SparkConf().setMaster("").setAppName("") val sc = new SparkContext(conf) sc } def aaa: Unit = { val conf = new SparkConf().setMaster("").setAppName("") val sc = new SparkContext(conf) val input: RDD[String] = sc.textFile("") val warns: RDD[String] = input.filter(line => line.contains("warn")) val errors: RDD[String] = input.filter(linux => linux.contains("error")) val calm: RDD[String] = warns.union(errors) calm.persist() calm.collect().mkString("") val tmp: Array[String] = calm.collect() val a: RDD[Int] = input.map(l => 1) calm.take(10).foreach(println) calm.flatMap(_ + 1) val words: RDD[String] = input.flatMap(line => line.split(" ")) val counts = words.map(word => (word, 1)).//reduceByKey((x, y) => x + y) reduceByKey{ case (x, y) => x + y } counts.saveAsTextFile("") //words.groupBy() val to_combinue = sc.parallelize(List((1,2))) to_combinue .combineByKey((v) => (v, 1), (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), (acc1: (Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)) .map{ case (key, value) => (key, value._1 / value._2.toFloat)} to_combinue.collectAsMap().map(println(_)) to_combinue.partitions.size //to_combinue.coalesce() //to_combinue.mapValues((x, y) => x).persist() input.map(s => s) val pair: RDD[(String, String)] = sc.sequenceFile[String, String]("").partitionBy(new HashPartitioner(100)).persist() //pair.flatMap() //val map = pair.map((x, y) => x).persist() //pair.map((x, y) => (x, y)).persist() val si = sc.parallelize(List(1,2,3)) si.map(x => (x, 1)) val du = sc.parallelize(List((1, 2), (3, 4))) du.flatMap { case (d1, d2) => d1 + "" }//((d1, d2) => (d1)) val links: RDD[(String, Seq[String])] = sc.objectFile[(String, Seq[String])]("link").partitionBy(new HashPartitioner(100)).persist() var ranks: RDD[(String, Double)] = links.mapValues(v => 1.0) for (i <- 0 to 10) { val contributions: RDD[(String, Double)] = links.join(ranks).flatMap { case (pageId: String, (links: Iterable[String], rank: Double)) => links.map(dest => (dest, rank / links.size)) } ranks = contributions.reduceByKey((x, y) => x + y).mapValues(v => 0.15 + 0.85 * v) } ranks.saveAsTextFile("ranks") } }
package a import org.apache.hadoop.fs.Path import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.{SparkConf, SparkContext} import scala.util.Random /** * Created by yuduy on 2018/1/17. */ object Page { val sc: SparkContext def page(path: String): Unit = { val sc = new SparkContext(new SparkConf()) //url -> neighbor url val lines: RDD[String] = sc.textFile(path) //url neighbors val links: RDD[(String, Iterable[String])] = lines.map { s => val parts = s.split("\\s+") (parts(0), parts(1)) }.distinct().groupByKey() //.cache() //submit conf: spark.rdd.compress=true lines.persist(StorageLevel.MEMORY_ONLY_SER).setName("links") //url rank var ranks: RDD[(String, Double)] = links.mapValues(v => 1.0) var lastCheckPointRanks: RDD[(String, Double)] = null for (i <- 1 to 10) { val contribs = links.join(ranks).values.flatMap { case (urls, rank) => urls.map(url => (url, rank / urls.size)) } ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _) //checkpoint if (i != 10) { ranks.persist(StorageLevel.MEMORY_ONLY_SER).setName(s"iter$i: ranks") ranks.checkpoint() //force action, just for trigger calculation ranks.foreach(_ => Unit) if (lastCheckPointRanks != null) { lastCheckPointRanks.getCheckpointFile.foreach { ckp => val p = new Path(ckp) val fs = p.getFileSystem(sc.hadoopConfiguration) fs.delete(p, true) } lastCheckPointRanks.unpersist(blocking = false) } lastCheckPointRanks = ranks } } //force action, like ranks.saveAsTextFile() ranks.foreach(_ => Unit) } def improve(path: String): Unit = { val sc = new SparkContext(new SparkConf()) //url -> neighbor url val lines: RDD[String] = sc.textFile(path) val links: RDD[(Long, Long)] = lines.map { s => val parts = s.split("\\s+") (parts(0).toLong, parts(1).toLong) }.distinct() links.persist(StorageLevel.MEMORY_ONLY_SER).setName("links") //count of each url's outs val outCnts: RDD[(Long, Int)] = links.mapValues(_ => 1).reduceByKey(_ + _) outCnts.persist(StorageLevel.MEMORY_ONLY_SER).setName("out-counts") //url rank var ranks: RDD[(Long, Double)] = links.mapValues(v => 1.0) val c1: RDD[(Long, (Iterable[Long], Iterable[Int]))] = links.cogroup(outCnts) val c2: RDD[(Long, (Iterable[Long], Iterable[Int], Iterable[Double]))] = links.cogroup(outCnts, ranks) //RDD[(Long, (Iterable[Long], Iterable[Int], Iterable[Double]))] // [url] [count] [rank] // url rank /*val contribs: RDD[(Long, Double)] = links.cogroup(outCnts, ranks).values.flatMap { pair => for (u <- pair._1.iterator; v <- pair._2.iterator; w <- pair._3.iterator) yield (u, w / v) }*/ val contribs = keyWithRandomInt(links).cogroup(expandKeyWithRandomInt(outCnts), expandKeyWithRandomInt(ranks)).values.flatMap { pair => for (u <- pair._1.iterator; v <- pair._2.iterator; w <- pair._3.iterator) yield (u, w / v) } } def keyWithRandomInt[K, V](rdd: RDD[(K, V)]): RDD[((K, Int), V)] = { rdd.map(x => ((x._1, Random.nextInt(10)), x._2)) } def expandKeyWithRandomInt[K, V](rdd: RDD[(K, V)]): RDD[((K, Int), V)] = { rdd.flatMap { x => for (i <- 0 to 10) yield ((x._1, i), x._2) } } def last(path: String): Unit = { val lines: RDD[String] = sc.textFile(path) val links: RDD[(Long, Long)] = lines.map { s => val parts = s.split("\\s+") (parts(0).trim.toLong, parts(1).trim.toLong) }.distinct() links.persist(StorageLevel.MEMORY_ONLY_SER).setName("links") //count of each url's outs val outCnts: RDD[(Long, Long)] = links.mapValues(_ => 1L).reduceByKey(_ + _) .persist(StorageLevel.MEMORY_ONLY_SER).setName("out-counts") //init ranks var ranks: RDD[(Long, Double)] = outCnts.mapValues(_ => 1.0) .persist(StorageLevel.MEMORY_ONLY_SER).setName("init-ranks") //force action, just for trigger calculation ranks.foreach(_ => Unit) val skewedOutCnts: scala.collection.Map[Long, Long] = outCnts.filter(_._2 >= 1000000).collectAsMap() val bcSkewedOutCnts: Broadcast[scala.collection.Map[Long, Long]] = sc.broadcast(skewedOutCnts) val skewed = links.filter { link => val cnts = bcSkewedOutCnts.value cnts.contains(link._1) }.persist(StorageLevel.MEMORY_ONLY_SER).setName("skewed-links") //force action, just for trigger calculation skewed.foreach(_ => Unit) val noSkewed = links.filter{ link => val cnts = bcSkewedOutCnts.value !cnts.contains(link._1) }.groupByKey().persist().setName("no-skewed-links") //force action, just for trigger calculation noSkewed.foreach(_ => Unit) links.unpersist(blocking = false) } }
package com.pplive.pike.exec.spoutproto; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; /** * Created by yuduy on 2018/5/17. */ public class Tps { private static class Statistics { public final AtomicBoolean busy = new AtomicBoolean(false); public final AtomicLong count = new AtomicLong(0); public long lastModifyTime; } private final AtomicLong total; private final long tpsLimit; private final Statistics[] windows; public Tps(int tpsLimit) { this.tpsLimit = tpsLimit; this.total = new AtomicLong(0); this.windows = new Statistics[100]; for (int i = 0; i < 100; i++) { //avoid null this.windows[i] = new Statistics(); } } private long addAndReturn() { for (; ; ) { long currentTimeMillis = System.currentTimeMillis(); Statistics stat = windows[(int) currentTimeMillis % this.windows.length]; //no contented if ((currentTimeMillis - stat.lastModifyTime <= 10) && !stat.busy.get()) {//lastModifyTime and total, count may confix. so !stat.busy.get() long rtn = total.incrementAndGet(); stat.count.incrementAndGet(); return rtn; } //contented if (stat.busy.compareAndSet(false, true)) { try { total.set(total.get() - stat.count.get()); stat.count.set(1); stat.lastModifyTime = currentTimeMillis;//non-volatile, but write with barrier return total.get(); } finally { stat.busy.set(false);//release lock and write barrier } } } } public boolean overLoad() { return addAndReturn() > tpsLimit; } }