scala

package a

import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

/**
  * Created by yuduy on 2018/1/11.
  */
class A {

  def sc: SparkContext = {

    val conf = new SparkConf().setMaster("").setAppName("")
    val sc = new SparkContext(conf)
    sc
  }

  def aaa: Unit = {
    val conf = new SparkConf().setMaster("").setAppName("")
    val sc = new SparkContext(conf)
    val input: RDD[String] = sc.textFile("")

    val warns: RDD[String] = input.filter(line => line.contains("warn"))
    val errors: RDD[String] = input.filter(linux => linux.contains("error"))

    val calm: RDD[String] = warns.union(errors)

    calm.persist()

    calm.collect().mkString("")

    val tmp: Array[String] = calm.collect()

    val a: RDD[Int] = input.map(l => 1)

    calm.take(10).foreach(println)

    calm.flatMap(_ + 1)
    val words: RDD[String] = input.flatMap(line => line.split(" "))

    val counts = words.map(word => (word, 1)).//reduceByKey((x, y) => x + y)
    reduceByKey{
      case (x, y) => x + y
    }

    counts.saveAsTextFile("")

    //words.groupBy()

    val to_combinue = sc.parallelize(List((1,2)))
    to_combinue
      .combineByKey((v) => (v, 1), (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), (acc1: (Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
        .map{ case (key, value) => (key, value._1 / value._2.toFloat)}

    to_combinue.collectAsMap().map(println(_))

    to_combinue.partitions.size

    //to_combinue.coalesce()
    //to_combinue.mapValues((x, y) => x).persist()
    input.map(s => s)
    val pair: RDD[(String, String)] = sc.sequenceFile[String, String]("").partitionBy(new HashPartitioner(100)).persist()
    //pair.flatMap()
    //val map = pair.map((x, y) => x).persist()
    //pair.map((x, y) => (x, y)).persist()
    val si = sc.parallelize(List(1,2,3))
    si.map(x => (x, 1))
    val du = sc.parallelize(List((1, 2), (3, 4)))
    du.flatMap {
      case (d1, d2) => d1 + ""
    }//((d1, d2) => (d1))
    val links: RDD[(String, Seq[String])] = sc.objectFile[(String, Seq[String])]("link").partitionBy(new HashPartitioner(100)).persist()
    var ranks: RDD[(String, Double)] = links.mapValues(v => 1.0)

    for (i <- 0 to 10) {
      val contributions: RDD[(String, Double)] = links.join(ranks).flatMap {
        case (pageId: String, (links: Iterable[String], rank: Double)) =>
          links.map(dest => (dest, rank / links.size))
      }
      ranks = contributions.reduceByKey((x, y) => x + y).mapValues(v => 0.15 + 0.85 * v)
    }

    ranks.saveAsTextFile("ranks")

  }


}

 

package a

import org.apache.hadoop.fs.Path
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

import scala.util.Random

/**
  * Created by yuduy on 2018/1/17.
  */
object Page {

  val sc: SparkContext

  def page(path: String): Unit = {
    val sc = new SparkContext(new SparkConf())
    //url -> neighbor url
    val lines: RDD[String] = sc.textFile(path)
    //url neighbors
    val links: RDD[(String, Iterable[String])] = lines.map {
      s =>
        val parts = s.split("\\s+")
        (parts(0), parts(1))
    }.distinct().groupByKey() //.cache()

    //submit conf: spark.rdd.compress=true
    lines.persist(StorageLevel.MEMORY_ONLY_SER).setName("links")

    //url rank
    var ranks: RDD[(String, Double)] = links.mapValues(v => 1.0)

    var lastCheckPointRanks: RDD[(String, Double)] = null

    for (i <- 1 to 10) {
      val contribs = links.join(ranks).values.flatMap {
        case (urls, rank) =>
          urls.map(url => (url, rank / urls.size))
      }
      ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _)

      //checkpoint
      if (i != 10) {
        ranks.persist(StorageLevel.MEMORY_ONLY_SER).setName(s"iter$i: ranks")
        ranks.checkpoint()
        //force action, just for trigger calculation
        ranks.foreach(_ => Unit)

        if (lastCheckPointRanks != null) {
          lastCheckPointRanks.getCheckpointFile.foreach {
            ckp =>
              val p = new Path(ckp)
              val fs = p.getFileSystem(sc.hadoopConfiguration)
              fs.delete(p, true)
          }
          lastCheckPointRanks.unpersist(blocking = false)
        }
        lastCheckPointRanks = ranks
      }
    }

    //force action, like ranks.saveAsTextFile()
    ranks.foreach(_ => Unit)
  }

  def improve(path: String): Unit = {
    val sc = new SparkContext(new SparkConf())
    //url -> neighbor url
    val lines: RDD[String] = sc.textFile(path)
    val links: RDD[(Long, Long)] = lines.map {
      s =>
        val parts = s.split("\\s+")
        (parts(0).toLong, parts(1).toLong)
    }.distinct()
    links.persist(StorageLevel.MEMORY_ONLY_SER).setName("links")

    //count of each url's outs
    val outCnts: RDD[(Long, Int)] = links.mapValues(_ => 1).reduceByKey(_ + _)
    outCnts.persist(StorageLevel.MEMORY_ONLY_SER).setName("out-counts")

    //url rank
    var ranks: RDD[(Long, Double)] = links.mapValues(v => 1.0)

    val c1: RDD[(Long, (Iterable[Long], Iterable[Int]))] = links.cogroup(outCnts)
    val c2: RDD[(Long, (Iterable[Long], Iterable[Int], Iterable[Double]))] = links.cogroup(outCnts, ranks)

    //RDD[(Long, (Iterable[Long], Iterable[Int], Iterable[Double]))]
    //                     [url]         [count]       [rank]
    //                   url  rank
    /*val contribs: RDD[(Long, Double)] = links.cogroup(outCnts, ranks).values.flatMap {
      pair =>
        for (u <- pair._1.iterator;
             v <- pair._2.iterator;
             w <- pair._3.iterator)
          yield (u, w / v)
    }*/
    val contribs = keyWithRandomInt(links).cogroup(expandKeyWithRandomInt(outCnts), expandKeyWithRandomInt(ranks)).values.flatMap {
      pair =>
        for (u <- pair._1.iterator;
             v <- pair._2.iterator;
             w <- pair._3.iterator)
          yield (u, w / v)
    }


  }

  def keyWithRandomInt[K, V](rdd: RDD[(K, V)]): RDD[((K, Int), V)] = {
    rdd.map(x => ((x._1, Random.nextInt(10)), x._2))
  }

  def expandKeyWithRandomInt[K, V](rdd: RDD[(K, V)]): RDD[((K, Int), V)] = {
    rdd.flatMap {
      x =>
        for (i <- 0 to 10)
          yield ((x._1, i), x._2)
    }
  }

  def last(path: String): Unit = {
    val lines: RDD[String] = sc.textFile(path)
    val links: RDD[(Long, Long)] = lines.map {
      s =>
        val parts = s.split("\\s+")
        (parts(0).trim.toLong, parts(1).trim.toLong)
    }.distinct()
    links.persist(StorageLevel.MEMORY_ONLY_SER).setName("links")

    //count of each url's outs
    val outCnts: RDD[(Long, Long)] = links.mapValues(_ => 1L).reduceByKey(_ + _)
      .persist(StorageLevel.MEMORY_ONLY_SER).setName("out-counts")

    //init ranks
    var ranks: RDD[(Long, Double)] = outCnts.mapValues(_ => 1.0)
      .persist(StorageLevel.MEMORY_ONLY_SER).setName("init-ranks")
    //force action, just for trigger calculation
    ranks.foreach(_ => Unit)

    val skewedOutCnts: scala.collection.Map[Long, Long] = outCnts.filter(_._2 >= 1000000).collectAsMap()
    val bcSkewedOutCnts: Broadcast[scala.collection.Map[Long, Long]] = sc.broadcast(skewedOutCnts)

    val skewed = links.filter {
      link =>
        val cnts = bcSkewedOutCnts.value
        cnts.contains(link._1)
    }.persist(StorageLevel.MEMORY_ONLY_SER).setName("skewed-links")
    //force action, just for trigger calculation
    skewed.foreach(_ => Unit)

    val noSkewed = links.filter{
      link =>
        val cnts = bcSkewedOutCnts.value
        !cnts.contains(link._1)
    }.groupByKey().persist().setName("no-skewed-links")
    //force action, just for trigger calculation
    noSkewed.foreach(_ => Unit)

    links.unpersist(blocking = false)















  }

}

 

package com.pplive.pike.exec.spoutproto;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

/**
 * Created by yuduy on 2018/5/17.
 */
public class Tps {

    private static class Statistics {

        public final AtomicBoolean busy = new AtomicBoolean(false);
        public final AtomicLong count = new AtomicLong(0);
        public long lastModifyTime;

    }

    private final AtomicLong total;
    private final long tpsLimit;
    private final Statistics[] windows;

    public Tps(int tpsLimit) {
        this.tpsLimit = tpsLimit;
        this.total = new AtomicLong(0);
        this.windows = new Statistics[100];
        for (int i = 0; i < 100; i++) {
            //avoid null
            this.windows[i] = new Statistics();
        }
    }

    private long addAndReturn() {
        for (; ; ) {
            long currentTimeMillis = System.currentTimeMillis();
            Statistics stat = windows[(int) currentTimeMillis % this.windows.length];
            //no contented
            if ((currentTimeMillis - stat.lastModifyTime <= 10) && !stat.busy.get()) {//lastModifyTime and total, count may confix. so !stat.busy.get()
                long rtn = total.incrementAndGet();
                stat.count.incrementAndGet();
                return rtn;
            }
            //contented
            if (stat.busy.compareAndSet(false, true)) {
                try {
                    total.set(total.get() - stat.count.get());
                    stat.count.set(1);
                    stat.lastModifyTime = currentTimeMillis;//non-volatile, but write with barrier
                    return total.get();
                } finally {
                    stat.busy.set(false);//release lock and write barrier
                }
            }
        }
    }

    public boolean overLoad() {
        return addAndReturn() > tpsLimit;
    }

}
相關文章
相關標籤/搜索