關於Hadoop的雜亂無章(續更)

JPS(是jdk的工具):表示查看當前主機有哪些運行的進程
NameNode :表示主節點
DataNode:表示數據節點
SecondaryNameNode :表示次要名稱節點
--節點表示:一臺機器
進程是運行在機器上的,一個軟件能夠有多個進程(分佈式軟件:Hadoop)
HDFS只是Hadoop的一部分,Hadoop還有MR、yarn
HDFS是分佈式軟件系統:將文件自動分佈在三臺機器上(副本/備份)
HDFS的特色:
1.可容錯:表示你把軟件刪除了,還能夠復原
2,低廉硬件(安裝了x86服務器/CPU)
3.高吞吐量(IO):分佈多臺機器同時處理
4.適用於有超大數據集的應用程序(1G以上大小的表格)
--可是不能知足:update(隨機讀寫),可是能夠以流的形式訪問文件,提升了性能
---------
POSIX:可移植操做系統接口(是一個規範)
說白就是一個文件系統的接口,ext四、ext三、FAT3二、NTFS所有都實現了這個接口,可是功能不必定所有實現
-HDFS框架是有中心的架構
主節點是:NameNode
用於對外(client)通訊,經過交換機接收和發送,可是查詢到的數據不通過這裏直接由DataNode交給client
管理+協調,能夠控制其餘節點
完成任務(幹活節點):DataNode
DataNode之間沒有關聯,之間的通訊也是經過局域網
--實際生產環境中是有多個NameNode
MateData(元數據):元數據就是形容數據的數據
在這裏他包含NameSpace:目錄結構+blockdata(塊數據)
HDFS是分塊存儲的,塊表示一個文件存儲在哪臺機器上
------------
client(客戶端)與NameNode之間是元數據交換
client與DataNode之間是數據交換
在同一個機架上通訊快,HDFS會在同一個機架上放兩個數據(在兩臺機器上),在不一樣的機架上保存一個(備份)
128M是一個block(塊)
block就是鍵值對--映射到內存中就是元數據
key:block的id(哪臺機器)-----------value:block的內容地址
--文件越小消耗的內存越大,由於保存相同大小,文件越多分的塊越多,映射到內存中的元數據越多
例如:文件1024G,保存1T大小的這樣的文件,佔用內存8M,磁盤大小是3T
    文件1M,保存1pb大小的這樣的文件,佔用內存1T,磁盤大小是3pb
分塊是客戶端
客戶端將文件分塊,串行依次寫入,分的塊不必定保存在同一臺機器上
1.向文件系統(Hadoop集羣)中上傳文件
hadoop fs -put /abc /
/abc:表示將要上傳的文件
/:表示上傳到集羣中的路徑是:/
----------
Fs文件系統----保存file
數據庫(DB)----保存表格/table-----存入以後須要常常修改
HDFS---file----crd(沒有u不能隨機讀寫,可是能追加),
記住HDFS與數據庫無關,HDFS是處理海量數據的,不能常常修改
---
OLTP:在線事務處理(對數據庫的寫操做)
主要是數據庫操做--web網站
特色:實時(當即有效果),處理的數據量小
OLAP:對數據庫的讀、分析處理
主要是:Hadoop
特色:實時不高,數據量大,HDFS是一次寫入屢次讀取的模式
--元數據:ns(目錄結構)+blockMap(文件所在的地址)
Secondary不是NameNode的備份,由於NameNode與Secondary共存亡
他相似於祕書:將內存中的內容持久化
工做:將最近的一個image(存量)與edit—log(增量)合併--生成出最新的image,並將開始的image刪除,循環操做
這是由於每寫一個文件,都會產生一個edit-log
過程:NameNode將日誌+最新的image發送給--Secondary,Secondary將這兩個和合並,並刪除原來的image,發送給NameNode一個最新的image(這兩個是進程之間通訊經過Http協議)
--Hadoop中的配置文件
permission權限,false:表示任何人均可以訪問,實際生產環境中true
每次格式化,version中的NameSpaceID、clusterID(集羣的id)改變了
集羣在開啓前幾分鐘會開啓安全模式(SafeMode),DataNode向NameNode彙報數據信息,以後自動關閉
50010是集羣之間的通訊端口號,若是是非知名端口號,防火牆會攔截
hadoop fs -checksum /fileName:校驗碼,能夠查看文件是否被改變(是否成爲髒數據)html

------mapreducejava

hdfs:是分佈式存儲,能夠單獨使用node

mapreduce是分佈式計算,必須按照分佈式存儲才能分佈式計算mysql

map:映射、轉化、分web

reduce:合併、減小算法

MR是計算模型:能夠並行化處理大量數據(提升效率)sql

a.並行(事):一件事分紅多個快,多我的同時作shell

b.併發(人):多我的同時作一件事數據庫

cdh與hdp是兩家最大的hadoop上市公司(在2018年據說要合併)json

之後spark(函數式設計語言)會代替mapreduce,由於mapreduce相對於比較慢、難以維護。

-------實際生產中的集羣需求:

MapReduce集羣搭建

1.yarn環境

在/usr/local/hadoop/etc/hadoop/yarn-env.sh中配置--java安裝路徑

2.MapReduce配置 IP:8088

將mapred-site.xml.tmplate複製成mapred-site.xml

cp mapred-site.xml.tmplate mapred-site.xml

將mapred-site.xml中添加配置

<property>

        <name>mapreduce.framework.name</name>

       <value>yarn</value>

</property>

<property>

       <name>mapreduce.jobhistory.address</name>

        <value>master:10020</value>

</property>

<property>

        <name>mapreduce.jobhistory.webapp.address</name>

        <value>master:19888</value>

</property>

3.yarn配置

在yarn-site.xml中添加配置

<property>

        <name>yarn.resourcemanager.hostname</name>

        <value>master</value>

這裏表示yarn存放在master,這裏value中的值填什麼yarn就在那裏

</property>

<property>

        <name>yarn.nodemanager.aux-services</name>

        <value>mapreduce_shuffle</value>

</property>

--若是不是單機僞分佈集羣則處理一下操做

4.將etc/hadoop複製到其餘節點

5.最終啓動

啓動

執行start-yarn.sh命令(在這以前確保HDFS已經啓動,沒有啓動的先start-dfs.sh)。

執行成功後,經過JPS檢查ResourceManager、NodeManager是否啓動。

若是啓動成功,經過master:8088能夠打開MapReduce「應用」站點:

注意:須要配置本地Windows系統的hosts文件才能在本地使用主機名!

phoenix在測試javaAPI的注意: 

(1)插入值若是是字符串要用單引號引發來,切記不能用雙引號!!! 
(2)表名若是要體現小寫效果,必需要用雙引號!!!


upsert into "person" values (1, 'test', 100);        正確
upsert into "person" values (1, "test", 100);        錯誤


HBase的協處理器

過程:

Java寫協處理器代碼-------打jar包-------上傳到hbase/lib目錄下-----重啓hbase(爲了加載jar包)-------在hbase上建立表----爲表添加自定義的協處理器(alter  ‘tableName’,’coprocessor’ => ‘|classPath|‘)------測試:put數據。

類:將索引放在另外一張表中,表也不放在hbase上,放在sole上。

  1. EndPoint:數據存儲過程(hbase的協處理器—如今已經不經常使用)

存儲在mysql中的多條sql語句的集合,執行多條語句,提升性能

可是最致命的缺點是:遷移性差(寫的越多,遷移的時候要寫的sql語句越多)

 

b. Observer(觸發器):就是在寫入數據以前,爲表建立二級索引

DML:觀察表數據的變化—CURD(增刪改查)-----在Observer中RegionObserver

DDL:觀察表的元數據變化---創表刪表之類-------在Observer中MasterObserver

WAL:寫以前記錄日誌

 

索引:排序以後的文件---有索引,先讀取索引文件,再查詢數據內容。(不用全表掃描)

覆蓋索引(非主鍵的索引):增大索引文件,可是讀取數據快,也會稍微下降寫入數據的速度(以空間換時間)

Hbase自己只有rowKey一個索引。

能夠利用phoenix插件能夠隨意建立二級索引,提升hbase的查詢速度;

也能夠在寫入數據以前對錶添加協處理器。


Hbase RowKey設計:

  1. rowKey的大小:不是很大

長度不宜過長:<=256字節

  1. rowKey與業務整合:

由於hbase只有rowKey一個索引,建立的時候儘可能與業務邏輯整合(就是查詢的條件放在rowKey中);爲了命中索引,提升查詢速度。

  1. rowKey散列:爲了不寫熱點

寫熱點:數據傾斜(數據分佈不均勻)

  1. 加隨機前綴(不建議使用)--由於須要另保存前綴用來查詢數據
  2. 加hash前綴(建議使用)--由hash算法就能夠取前綴值,用來查詢
  3. 倒敘寫入(偷懶式)

好比:1-2018-start--------------àtrats-8102-1

先構造rowKey,再倒序。

       正序可能(數據)遞增,而致使寫熱點。

----------libreoffice

下載:

Linux centos

https://donate.libreoffice.org/zh-CN/dl/rpm-x86_64/6.0.6/zh-CN/LibreOffice_6.0.6_Linux_x86-64_rpm.tar.gz

安裝:

1.解壓文件

2.進入到的RPMS目錄

3.yum localinstall *.rpm

文檔轉換:

word→pdf

libreoffice6.0 --invisible --convert-to pdf some.doc

word→html
libreoffice6.0 --invisible --convert-to html some.doc

或者

soffice --convert-to pdf somedoc

轉換(word-->pdf)出現下面錯誤,請下載插件

yum -y install ibus

[root@master209 local]# libreoffice --invisible --convert-to pdf 1.doc
-bash: libreoffice: command not found
[root@master209 local]# libreoffice6.0 --invisible --convert-to pdf 1.doc
/opt/libreoffice6.0/program/soffice.bin: error while loading shared libraries: libcairo.so.2: cannot open shared object file: No such file or directory

異常處理:
word->pdf時中文亂碼

1.打開c盤下的Windows/Fonts目錄

2.在這以前咱們還須要新建目錄,首先在/usr/shared/fonts目錄下新建一個目錄chinese

3.而後就是將上面的兩個字體上傳至/usr/share/fonts/chinese目錄下便可

chmod -R 755 /usr/share/fonts/chinese

4.刷新內存中的字體緩存,這樣就不用reboot重啓了,輸入:fc-cache

5.最後再次經過fc-list看一下字體列表:(ps可能我添加的比較多)

卸載libreoffice

yum erase libreoffice\*

 

--------------kafka

Kafka 消息隊列、消息系統、消息中間件、消息的推送口

生產者-----à(中介)ß-------消費者

中間經過TCP通訊

中間件的演化:

類---à軟件

線程-à進程

Flum中agent表示一個節點(進程)

Kafka中broker表示一個節點(進程)

--特性:分佈、可分區、可複製(備份)、順序讀寫-速度高

Offset:消費者的(每一個分區的)偏移量

Offset是有消費者來維護

Topic:話題—默認是hash分區

每一個topic的領導者是相對的

一條數據(一個分區)能夠由不一樣組的多個消費者來消費

有兩種模式:排隊、訂閱

消息隊列的種類:

1.ActiceMQ   java

2.zero MQ 

3.Rabbit MQ

4. Rocket MQ   阿里雲開元

5.kafka

以上都是消息隊列,均可以相互替換,通常2,3不經常使用—使用消息隊列就是爲了解耦

一個分區,一個組 能保證有序性

在0.11以前kafka將元數據保存在zookeeper中

以日誌的形式將數據持久化

分區個數 按照kafka集羣個數的10倍關係

 

Kafka中的相關配置

  1. acks

設置爲all:安全,可能數據重複,先複製,再告訴

à-1  提升性能,可是不安全

à1  數據在leader接收以後,當即回覆,若是中介機器宕機,可能丟失

16kb----批量發送的單位

Batch(緩衝區-一次發送的數據)與linger(間隔)知足之1、都會發送

Kafka---à發送的是json字符串

Enable.auto.commit--àtrue    自動提交

消費者取出以後就告訴別人消費過了,消費者尚未消費就告訴,若是中間shutdown了,數據就丟失。

防止數據重複與數據丟失---偏移量問題

冪等(事務控制)+手動

Auto.offset.reset---àearliest(最先)  有--(上次)   無---(從頭)

               -àbest (最近)   數據丟失

               -ànone   測試用

---------------------spark

spark.streaming._    實時處理數據   

localhost:4040

storm與stream的區別:

storm:每次處理一條數據,快,可是處理的數據小(吞吐量小)

stream:每次能夠快速處理一批數據,特徵:high-throughput(高吞吐)、fault-tolerant(可容錯)、scalable(可擴展)

配置信息:

streaming至少兩個線程

new SparkConf().setMaster("local[*]").setAppName("streaming")

時間用於分割數據:Seconds(5)---5秒

new StreamingContext(conf,Seconds(5))

 

數據的來源是經過tcp通訊獲取Datastream

Datastream:表示一系列的rdd

Dstreammap,就是對每一個rddmap

模擬從tcp端口讀取數據

val ds=ssc.socketTextStream("localhost",999)

 

//啓動streaming context,防止沒有數據關閉

    //若是沒有接受導數據,也不會馬上關閉,會嘗試一段時間強制關閉

    ssc.start()

    ssc.awaitTermination()

--------------------------mobaxterm配置

Settings

    Configuration

      Terminal

        勾選 Paste using right-click(左鍵選取,鬆開左鍵複製,右鍵粘貼, 可使用Ctrl+右鍵來使用右鍵功能)

      SSH

        取消勾選 Automatically switch to SSH-browser tab after login(登陸後自動切換到SFTP瀏覽器)

        勾選 SSH keepalive(保持心跳鏈接不斷)

------

 

命令  hadoop dfsadmin -safemode get  查看安全模式狀態

命令  hadoop dfsadmin -safemode enter    進入安全模式狀態

命令   hadoop dfsadmin -safemode leave   離開安全模式

--------------spark中rdd、dataframe、dataset聯繫與區別

 

在spark中,RDD、DataFrame、Dataset是最經常使用的數據類型,本博文給出筆者在使用的過程當中體會到的區別和各自的優點

 

共性:

一、RDD、DataFrame、Dataset全都是spark平臺下的分佈式彈性數據集,爲處理超大型數據提供便利

二、三者都有惰性機制,在進行建立、轉換,如map方法時,不會當即執行,只有在遇到Action如foreach時,三者纔會開始遍歷運算,極端狀況下,若是代碼裏面有建立、轉換,可是後面沒有在Action中使用對應的結果,在執行時會被直接跳過,如

1

2

3

4

5

6

7

8

val sparkconf = new SparkConf().setMaster("local").setAppName("test").set("spark.port.maxRetries","1000")

val spark = SparkSession.builder().config(sparkconf).getOrCreate()

val rdd=spark.sparkContext.parallelize(Seq(("a"1), ("b"1), ("a"1)))

 

rdd.map{line=>

  println("運行")

  line._1

}

map中的println("運行")並不會運行

三、三者都會根據spark的內存狀況自動緩存運算,這樣即便數據量很大,也不用擔憂會內存溢出

四、三者都有partition的概念,如

1

2

3

4

5

6

7

8

var predata=data.repartition(24).mapPartitions{

      PartLine => {

        PartLine.map{

          line =>

             println(「轉換操做」)

                            }

                         }

這樣對每個分區進行操做時,就跟在操做數組同樣,不但數據量比較小,並且能夠方便的將map中的運算結果拿出來,若是直接用map,map中對外面的操做是無效的,如

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

val rdd=spark.sparkContext.parallelize(Seq(("a"1), ("b"1), ("a"1)))

    var flag=0

    val test=rdd.map{line=>

      println("運行")

      flag+=1

      println(flag)

      line._1

    }

println(test.count)

println(flag)

    /**

    運行

    1

    運行

    2

    運行

    3

    3

    0

   * */

不使用partition時,對map以外的操做沒法對map以外的變量形成影響

五、三者有許多共同的函數,如filter,排序等

六、在對DataFrame和Dataset進行操做許多操做都須要這個包進行支持

1

2

import spark.implicits._

//這裏的spark是SparkSession的變量名

七、DataFrame和Dataset都可使用模式匹配獲取各個字段的值和類型

DataFrame:

1

2

3

4

5

6

7

testDF.map{

      case Row(col1:String,col2:Int)=>

        println(col1);println(col2)

        col1

      case _=>

        ""

    }

爲了提升穩健性,最好後面有一個_通配操做,這裏提供了DataFrame一個解析字段的方法

Dataset:

1

2

3

4

5

6

7

8

case class Coltest(col1:String,col2:Int)extends Serializable //定義字段名和類型

    testDS.map{

      case Coltest(col1:String,col2:Int)=>

        println(col1);println(col2)

        col1

      case _=>

        ""

    }

  

區別:

RDD:

一、RDD通常和spark mlib同時使用

二、RDD不支持sparksql操做

DataFrame:

一、與RDD和Dataset不一樣,DataFrame每一行的類型固定爲Row,只有經過解析才能獲取各個字段的值,如

1

2

3

4

5

testDF.foreach{

  line =>

    val col1=line.getAs[String]("col1")

    val col2=line.getAs[String]("col2")

}

每一列的值無法直接訪問

二、DataFrame與Dataset通常與spark ml同時使用

三、DataFrame與Dataset均支持sparksql的操做,好比select,groupby之類,還能註冊臨時表/視窗,進行sql語句操做,如

1

2

dataDF.createOrReplaceTempView("tmp")

spark.sql("select  ROW,DATE from tmp where DATE is not null order by DATE").show(100,false)

四、DataFrame與Dataset支持一些特別方便的保存方式,好比保存成csv,能夠帶上表頭,這樣每一列的字段名一目瞭然

1

2

3

4

5

6

//保存

val saveoptions = Map("header" -> "true""delimiter" -> "\t""path" -> "hdfs://172.xx.xx.xx:9000/test")

datawDF.write.format("com.databricks.spark.csv").mode(SaveMode.Overwrite).options(saveoptions).save()

//讀取

val options = Map("header" -> "true""delimiter" -> "\t""path" -> "hdfs://172.xx.xx.xx:9000/test")

val datarDF= spark.read.options(options).format("com.databricks.spark.csv").load()

利用這樣的保存方式,能夠方便的得到字段名和列的對應,並且分隔符(delimiter)能夠自由指定

Dataset:

這裏主要對比Dataset和DataFrame,由於Dataset和DataFrame擁有徹底相同的成員函數,區別只是每一行的數據類型不一樣

DataFrame也能夠叫Dataset[Row],每一行的類型是Row,不解析,每一行究竟有哪些字段,各個字段又是什麼類型都無從得知,只能用上面提到的getAS方法或者共性中的第七條提到的模式匹配拿出特定字段

而Dataset中,每一行是什麼類型是不必定的,在自定義了case class以後能夠很自由的得到每一行的信息

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

case class Coltest(col1:String,col2:Int)extends Serializable //定義字段名和類型

/**

      rdd

      ("a", 1)

      ("b", 1)

      ("a", 1)

      * */

val test: Dataset[Coltest]=rdd.map{line=>

      Coltest(line._1,line._2)

    }.toDS

test.map{

      line=>

        println(line.col1)

        println(line.col2)

    }

能夠看出,Dataset在須要訪問列中的某個字段時是很是方便的,然而,若是要寫一些適配性很強的函數時,若是使用Dataset,行的類型又不肯定,多是各類case class,沒法實現適配,這時候用DataFrame即Dataset[Row]就能比較好的解決問題

轉化:

RDD、DataFrame、Dataset三者有許多共性,有各自適用的場景經常須要在三者之間轉換

DataFrame/Dataset轉RDD:

這個轉換很簡單

1

2

val rdd1=testDF.rdd

val rdd2=testDS.rdd

RDD轉DataFrame:

1

2

3

4

import spark.implicits._

val testDF = rdd.map {line=>

      (line._1,line._2)

    }.toDF("col1","col2")

通常用元組把一行的數據寫在一塊兒,而後在toDF中指定字段名

RDD轉Dataset:

1

2

3

4

5

import spark.implicits._

case class Coltest(col1:String,col2:Int)extends Serializable //定義字段名和類型

val testDS = rdd.map {line=>

      Coltest(line._1,line._2)

    }.toDS

能夠注意到,定義每一行的類型(case class)時,已經給出了字段名和類型,後面只要往case class裏面添加值便可

Dataset轉DataFrame:

這個也很簡單,由於只是把case class封裝成Row

1

2

import spark.implicits._

val testDF = testDS.toDF

DataFrame轉Dataset:

1

2

3

import spark.implicits._

case class Coltest(col1:String,col2:Int)extends Serializable //定義字段名和類型

val testDS = testDF.as[Coltest]

這種方法就是在給出每一列的類型後,使用as方法,轉成Dataset,這在數據類型是DataFrame又須要針對各個字段處理時極爲方便

特別注意:

在使用一些特殊的操做時,必定要加上 import spark.implicits._ 否則toDF、toDS沒法使用

 

------------hive中的列轉行,與行轉列

Hive
行轉列和列轉行
表1:cityInfo


表2:cityInfoSet

   

表1和表2的結構如上所示。如何在 hive 中使用 Hql 語句對錶1和表2進行互相轉化呢?

行轉列
表1=>表2 可使用 hive 的內置函數 concat_ws() 和 collect_set()進行轉換:

執行代碼以下所示:

select cityname,concat_ws(',',collect_set(regionname)) as address_set from cityInfo group by cityname;
1
列轉行
表2=>表1 可使用 hive 的內置函數 explode()進行轉化。代碼以下:

select cityname, region from cityInfoSet  lateral view explode(split(address_set, ',')) aa as region;

 

------------------spark中的driver與executor

1、看了不少網上的圖,大可能是dirver和executor之間的圖,都不涉及物理機器

 

以下圖,本人以爲這些始終有些抽象

看到這樣的圖,我很想知道driver program在哪裏啊,鬼知道?爲此我本身研究了一下,網友大多都說是對的有不一樣想法的請評論

 

2、如今我有三臺電腦 分別是

 

 
  1. 192.168.10.82 –>bigdata01.hzjs.co

  2. 192.168.10.83 –>bigdata02.hzjs.co

  3. 192.168.10.84 –>bigdata03.hzjs.co

集羣的slaves文件配置以下:

 
  1. bigdata01.hzjs.co

  2. bigdata02.hzjs.co

  3. bigdata03.hzjs.co

那麼這三臺機器都是worker節點,本集羣是一個徹底分佈式的集羣通過測試,我使用# ./start-all.sh ,那麼你在哪臺機器上執行的哪臺機器就是7071 Master主節點進程的位置,我如今在192.168.10.84使用./start-all.sh 

那麼就會這樣

 

3、那麼咱們來看看local模式下 

 

如今假設我在192.168.10.84上執行了 bin]# spark-shell 那麼就會在192.168.10.84產生一個SparkContext,此時84就是driver,其餘woker節點(三臺都是)就是產生executor的機器。如圖 

如今假設我在192.168.10.83上執行了 bin]# spark-shell 那麼就會在192.168.10.83產生一個SparkContext,此時83就是driver,其餘woker節點(三臺都是)就是產生executor的機器。如圖

總結:在local模式下  驅動程序driver就是執行了一個Spark Application的main函數和建立Spark Context的進程,它包含了這個application的所有代碼。(在那臺機器運行了應用的所有代碼建立了sparkContext就是drive,也能夠說是你提交代碼運行的那臺機器)

 

4、那麼看看cluster模式下

 

如今假設我在192.168.10.83上執行了 bin]# spark-shell 192.168.10.84:7077 那麼就會在192.168.10.84產生一個SparkContext,此時84就是driver,其餘woker節點(三臺都是)就是產生executor的機器。這裏直接指定了主節點driver是哪臺機器:如圖

5、若是driver有多個,那麼按照上面的規則,去判斷具體在哪裏

 

Driver: 使用Driver這一律唸的分佈式框架有不少,好比hive,Spark中的Driver即運行Application的main()函數,而且建立SparkContext,建立SparkContext的目的是爲了準備Spark應用程序的運行環境,在Spark中由SparkContext負責與ClusterManager通信,進行資源的申請,任務的分配和監控等。當Executor部分運行完畢後,Driver同時負責將SaprkContext關閉,一般SparkContext表明Driver.

上面紅色框框都屬於Driver,運行在Driver端,中間沒有框住的部分屬於Executor,運行的每一個ExecutorBackend進程中。println(pcase.count())collect方法是Spark中Action操做,負責job的觸發,由於這裏有個sc.runJob()方法

def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

hbaseRDD.map()屬於Transformation操做。

總結:Spark Application的main方法(於SparkContext相關的代碼)運行在Driver上,當用於計算的RDD觸發Action動做以後,會提交Job,那麼RDD就會向前追溯每個transformation操做,直到初始的RDD開始,這之間的代碼運行在Executor。

 

---------------------------top K

堆排解法
用堆排來解決Top K的思路很直接。

前面已經說過,堆排利用的大(小)頂堆全部子節點元素都比父節點小(大)的性質來實現的,這裏故技重施:既然一個大頂堆的頂是最大的元素,那咱們要找最小的K個元素,是否是能夠先創建一個包含K個元素的堆,而後遍歷集合,若是集合的元素比堆頂元素小(說明它目前應該在K個最小之列),那就用該元素來替換堆頂元素,同時維護該堆的性質,那在遍歷結束的時候,堆中包含的K個元素是否是就是咱們要找的最小的K個元素?

實現: 
在堆排的基礎上,稍做了修改,buildHeap和heapify函數都是同樣的實現,不難理解。

速記口訣:最小的K個用最大堆,最大的K個用最小堆。

public class TopK {

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        int[] a = { 1, 17, 3, 4, 5, 6, 7, 16, 9, 10, 11, 12, 13, 14, 15, 8 };
        int[] b = topK(a, 4);
        for (int i = 0; i < b.length; i++) {
            System.out.print(b[i] + ", ");
        }
    }

    public static void heapify(int[] array, int index, int length) {
        int left = index * 2 + 1;
        int right = index * 2 + 2;
        int largest = index;
        if (left < length && array[left] > array[index]) {
            largest = left;
        }
        if (right < length && array[right] > array[largest]) {
            largest = right;
        }
        if (index != largest) {
            swap(array, largest, index);
            heapify(array, largest, length);
        }
    }

    public static void swap(int[] array, int a, int b) {
        int temp = array[a];
        array[a] = array[b];
        array[b] = temp;
    }

    public static void buildHeap(int[] array) {
        int length = array.length;
        for (int i = length / 2 - 1; i >= 0; i--) {
            heapify(array, i, length);
        }
    }

    public static void setTop(int[] array, int top) {
        array[0] = top;
        heapify(array, 0, array.length);
    }

    public static int[] topK(int[] array, int k) {
        int[] top = new int[k];
        for (int i = 0; i < k; i++) {
            top[i] = array[i];
        }
        //先建堆,而後依次比較剩餘元素與堆頂元素的大小,比堆頂小的, 說明它應該在堆中出現,則用它來替換掉堆頂元素,而後沉降。
        buildHeap(top);
        for (int j = k; j < array.length; j++) {
            int temp = top[0];
            if (array[j] < temp) {
                setTop(top, array[j]);
            }
        }
        return top;
    }
}


時間複雜度 
n*logK

速記:堆排的時間複雜度是n*logn,這裏至關於只對前Top K個元素建堆排序,想法不必定對,但必定有助於記憶。

適用場景 
實現的過程當中,咱們先用前K個數創建了一個堆,而後遍歷數組來維護這個堆。這種作法帶來了三個好處:(1)不會改變數據的輸入順序(按順序讀的);(2)不會佔用太多的內存空間(事實上,一次只讀入一個數,內存只要求能容納前K個數便可);(3)因爲(2),決定了它特別適合處理海量數據。

這三點,也決定了它最優的適用場景。

快排解法
用快排的思想來解Top K問題,必然要運用到」分治」。

與快排相比,二者惟一的不一樣是在對」分治」結果的使用上。咱們知道,分治函數會返回一個position,在position左邊的數都比第position個數小,在position右邊的數都比第position大。咱們不妨不斷調用分治函數,直到它輸出的position = K-1,此時position前面的K個數(0到K-1)就是要找的前K個數。

實現: 
「分治」仍是原來的那個分治,關鍵是getTopK的邏輯,務必要結合註釋理解透徹,自動動手寫寫。

public class TopK {

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        int[] array = { 9, 3, 1, 10, 5, 7, 6, 2, 8, 0 };
        getTopK(array, 4);
        for (int i = 0; i < array.length; i++) {
            System.out.print(array[i] + ", ");
        }
    }

    // 分治
    public static int partition(int[] array, int low, int high) {
        if (array != null && low < high) {
            int flag = array[low];
            while (low < high) {
                while (low < high && array[high] >= flag) {
                    high--;
                }
                array[low] = array[high];
                while (low < high && array[low] <= flag) {
                    low++;
                }
                array[high] = array[low];
            }
            array[low] = flag;
            return low;
        }
        return 0;
    }

    public static void getTopK(int[] array, int k) {
        if (array != null && array.length > 0) {
            int low = 0;
            int high = array.length - 1;
            int index = partition(array, low, high);
            //不斷調整分治的位置,直到position = k-1
            while (index != k - 1) {
                //大了,往前調整
                if (index > k - 1) {
                    high = index - 1;
                    index = partition(array, low, high);
                }
                //小了,日後調整
                if (index < k - 1) {
                    low = index + 1;
                    index = partition(array, low, high);
                }
            }
        }
    }
}

時間複雜度 
n

速記:記住就行,基於partition函數的時間複雜度比較難證實,歷來沒考過。

適用場景 
對照着堆排的解法來看,partition函數會不斷地交換元素的位置,因此它確定會改變數據輸入的順序;既然要交換元素的位置,那麼全部元素必需要讀到內存空間中,因此它會佔用比較大的空間,至少能容納整個數組;數據越多,佔用的空間必然越大,海量數據處理起來相對吃力。

可是,它的時間複雜度很低,意味着數據量不大時,效率極高。

好了,兩種解法寫完了,趕忙實現一下吧。

                                   <未完>

相關文章
相關標籤/搜索