spark雜記

1.須要加上轉義字符
java.util.regex.PatternSyntaxException: Unclosed character class near index 0
java.util.regex.PatternSyntaxException: Unexpected internal error near index 1

2.kafka中數據還沒來得及消費,數據就已經丟失或者過時了;就是kafka的topic的offset超過range了,多是maxratePerPartition的值設定小了 [https://blog.csdn.net/yxgxy270187133/article/details/53666760]
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {newsfeed-100-content-docidlog-1=103944288}


3.內存參數過小 --executor-memory 8G \  --driver-memory 8G \
 Application application_1547156777102_0243 failed 2 times due to AM Container for appattempt_1547156777102_0243_000002 exited with exitCode: -104
For more detailed output, check the application tracking page:https://host-10-31-4-246:26001/cluster/app/application_1547156777102_0243 Then click on links to logs of each attempt.
Diagnostics: Container [pid=5064,containerID=container_e62_1547156777102_0243_02_000001] is running beyond physical memory limits. Current usage: 4.6 GB of 4.5 GB physical memory used; 6.3 GB of 22.5 GB virtual memory used. Killing container.


4.方法調用在方法定義以後
forward reference extends over definition of value xxx

*******************************************************************
https://blog.csdn.net/appleyuchi/article/details/81633335
pom中的provided指的是編譯須要,發佈不須要,當咱們經過spark-submit提交時,spark會提供須要的streaming包,而Intellij是經過java提交的,在運行時依然須要streaming的包,因此須要去掉.
1.解決方案:本地運行時註銷掉<scope>provided</scope>,reimport maven projects
java.lang.ClassNotFoundException: org.apache.spark.SparkConf

2.

[ERROR] E:\git3_commit2\Newsfeed\Newsfeed\src\main\scala\com\huawei\rcm\newsfeed
\textcontent\wordToVecDocUser.scala:206: error: No org.json4s.Formats found. Try
 to bring an instance of org.json4s.Formats in scope or use the org.json4s.Defau
ltFormats.
[INFO]     val str = write(map)

添加
implicit val formats: DefaultFormats = DefaultFormats

3.



https://stackoverflow.com/questions/30033043/hadoop-job-fails-resource-manager-doesnt-recognize-attemptid/30391973#30391973

*******************************************************************
如何在IDEA 中使用Git
https://www.cnblogs.com/zbw911/p/6206689.html

*******************************************************************
(1)鏈接zk
cd /opt/FIC70_client/ZooKeeper/zookeeper/bin
./zkCli.sh -timeout 5000 -r -server 172.16.16.159:24002 (中間以,分割)
./zkCli.sh -timeout 5000 -r -server 10.31.7.209:24002


(2)斷點續傳的kafka能力
讀取kafka的topic offset失敗,致使contenanalysis啓動失敗。 或者是讀取的偏移量與實際的不符,故去kafka時獲取offset失敗,或者offset的值錯誤。 須要重建offset。
經過zookeeper查詢對應組id下的topic是存在
zkCli.sh --server  10.73.80.4:24002   zookeeper的地址端口經過配置文件能夠查詢, cd $KAFKA_HOME/conf cat server.propertes | grep "^zookeeper.connect"
經過ls命令查詢對應的grounpid下的topic是不是否存在,或者offset是否存在  ls /consumers/[grounpid]/offsets/   
若是不存在,能夠考慮重建topic 或者手動添加topic偏移量
create /consumers/[grounpid]/offsets/[topic]/[partition]/[offsetvalue]

 (3)ls命令
在ZK上運行ls /consumers/對應的分組/offset/對應的topic,就能夠看到此topic下的全部分區了
ls  /consumers/Newsfeed.Entertainment.ContentAnalysis/offsets/newsfeed-100-contentdistribute-entertainment

(4)刪除zk目錄
rmr  /consumers/Newsfeed.Entertainment.ContentAnalysis/offsets/newsfeed-100-contentdistribute-entertainment
deleteall  /consumers/Newsfeed.Entertainment.ContentAnalysis/offsets/newsfeed-100-contentdistribute-entertainment

(5)get命令
get /consumers/對應的分組/offset/對應的topic/對應的分區號,能夠查詢到該分區上記錄的offset

(6)set命令
set /consumers/對應的分組/offset/對應的topic/對應的分區號 修改後的值(通常爲0),便可完成對offset的修

*******************************************************************
Applicatiion: 應用程序
Driver: 表示main()函數,建立SparkContext
Executor:
Worker: 集羣中能夠運行Application代碼的節點.在Standalone模式中指的是經過slave文件配置的worker節點,在Spark on Yarn模式中指的就是NodeManager節點
Task:在Executor進程中執行任務的工做單元,多個Task組成一個Stage
Job:包含多個Task組成的並行計算,是由Action行爲觸發的

spark運行流程:
    (1)構建Spark Application的運行環境(啓動SparkContext),SparkContext向資源管理器(能夠是Standalone、Mesos或YARN)註冊並申請運行Executor資源;
    (2)資源管理器分配Executor資源並啓動StandaloneExecutorBackend,Executor運行狀況將隨着心跳發送到資源管理器上;
    (3)SparkContext構建成DAG圖,將DAG圖分解成Stage,並把Taskset發送給Task Scheduler。Executor向SparkContext申請Task
    (4)Task Scheduler將Task發放給Executor運行同時SparkContext將應用程序代碼發放給Executor
    (5)Task在Executor上運行,運行完畢釋放全部資源

*******************************************************************
1.Spark和Hadoop
    Hadoop的兩個核心模塊:分佈式存儲模塊HDFS,分佈式計算模塊Mapreduce
    Spark主要是在計算模塊取代了Mapreduce,存儲模塊仍是基於hadoop的HDFS

2. RDD(Resilient Distributed Dataset)彈性分佈式數據集
    Spark中的RDD是一個不可變的分佈式對象集合,有五大特性:
①有一個分片列表。就是能被切分,和hadoop同樣的,可以切分的數據才能並行計算。
②有一個函數計算每個分片,這裏指的是下面會提到的compute函數。
③對其餘的RDD的依賴列表,依賴還具體分爲寬依賴和窄依賴,但並非全部的RDD都有依賴。
④可選:key-value型的RDD是根據哈希來分區的,相似於mapreduce當中的Paritioner接口,控制key分到哪一個reduce。
⑤可選:每個分片的優先計算位置(preferred locations),好比HDFS的block的所在位置應該是優先計算的位置。(存儲的是一個表,能夠將處理的分區「本地化」)

3.RDD兩種建立方式:
    ①(經常使用)讀取外部數據集:
        val rdd1=sc.textFile("/path/to/readme.md")
    ②在驅動程序中對另外一個集合並行化:
        val rdd2=sc.parallelize(List("apple","banana","orange"))
        注:通常在開發原型或測試時才使用

4.Spark程序或shell會話工做流程
    ①從外部數據建立出輸入RDD;
    ②使用諸如filter()等這樣的轉化操做對RDD進行轉化,以定義新的RDD;
    ③告訴Spark對須要被重用的中間結果RDD執行persist()操做;
    ④ 使用諸如first()等這樣的行動操做來觸發一次並行計算,Spark會對計算進行優化後再執行。

5.RDD操做:
    3.1獲取RDD
    ①從共享的文件系統獲取(如:HDFS)
    ②經過已存在的RDD轉換 ③將已存在scala集合(只要是Seq對象)並行化,經過調用SparkContext的parallelize方法實現    ④改變現有RDD的之久性;RDD是懶散,短暫的。(RDD的固化:cache緩存至內錯;save保存到分佈式文件系統)
    

    3.2.轉化操做(返回一個新的RDD)
    ① map(func)
       返回一個新的分佈式數據集,由每一個原元素通過func函數轉換後組成
    ②filter(func)
    返回一個新的數據集,由通過func函數後返回值爲true的原元素組成
    ③flatMap(func) 相似於map,可是每個輸入元素,會被映射爲0到多個輸出元素(所以,func函數的返回值是一個Seq,而不是單一元素)
    ④sample(withReplacement,  frac, seed)
    根據給定的隨機種子seed,隨機抽樣出數量爲frac的數據
    ⑤union(other)
    返回一個新的數據集,由原數據集和參數聯合而成
    ⑥groupByKey([numTasks]) 在一個由(K,V)對組成的數據集上調用,返回一個(K,Seq[V])對的數據集。注意:默認狀況下,使用8個並行任務進行分組,你能夠傳入numTask可選參數,根據數據量設置不一樣數目的Task
    ⑦reduceByKey(func,  [numTasks])    在一個(K,V)對的數據集上使用,返回一個(K,V)對的數據集,key相同的值,都被使用指定的reduce函數聚合到一塊兒。和groupbykey相似,任務的個數是能夠經過第二個可選參數來配置的。
    ⑧join(otherDataset,  [numTasks]) 在類型爲(K,V)和(K,W)類型的數據集上調用,返回一個(K,(V,W))對,每一個key中的全部元素都在一塊兒的數據集
    ⑨groupWith(otherDataset,  [numTasks])
    在類型爲(K,V)和(K,W)類型的數據集上調用,返回一個數據集,組成元素爲(K, Seq[V], Seq[W]) Tuples。這個操做在其它框架,稱爲CoGroup
    ⑩cartesian(otherDataset)
    笛卡爾積。但在數據集T和U上調用時,返回一個(T,U)對的數據集,全部元素交互進行笛卡爾積。

    3.3.行動操做(向驅動器程序返回結果或把結果寫入外部系統的操做)
    ①reduce(func)     經過函數func彙集數據集中的全部元素。Func函數接受2個參數,返回一個值。這個函數必須是關聯性的,確保能夠被正確的併發執行
    ②collect()     在Driver的程序中,以數組的形式,返回數據集的全部元素。這一般會在使用filter或者其它操做後,返回一個足夠小的數據子集再使用,直接將整個RDD集Collect返回,極可能會讓Driver程序OOM
    ③count()     返回數據集的元素個數
    ④take(n)     返回一個數組,由數據集的前n個元素組成。注意,這個操做目前並不是在多個節點上,並行執行,而是Driver程序所在機器,單機計算全部的元素(Gateway的內存壓力會增大,須要謹慎使用)
    ⑤first()     返回數據集的第一個元素(相似於take(1)
    ⑥saveAsTextFile(path)     將數據集的元素,以textfile的形式,保存到本地文件系統,hdfs或者任何其它hadoop支持的文件系統。Spark將會調用每一個元素的toString方法,並將它轉換爲文件中的一行文本
    ⑦saveAsSequenceFile(path)     將數據集的元素,以sequencefile的格式,保存到指定的目錄下,本地系統,hdfs或者任何其它hadoop支持的文件系統。RDD的元素必須由key-value對組成,並都實現了Hadoop的Writable接口,或隱式能夠轉換爲Writable(Spark包括了基本類型的轉換,例如Int,Double,String等等)
    ⑧foreach(func)     在數據集的每個元素上,運行函數func。這一般用於更新一個累加器變量,或者和外部存儲系統作交互

[注:惰性求值:RDD的轉化操做是惰性求值的,即在被調用行動操做以前Spark不會開始計算]













經常使用術語:
①Application:用戶編寫的Spark應用程序,其中包括一個Driver功能的代碼和分佈在集羣中多個節點上運行的Executor代碼
②Driver: 運行Application的main函數並建立SparkContext,建立SparkContext的目的是爲了準備Spark應用程序的運行環境,在Spark中有SparkContext負責與ClusterManager通訊,進行資源申請、任務的分配和監控等,當Executor部分運行完畢後,Driver同時負責將SparkContext關閉,一般用SparkContext表明Driver
③Executor:
④Cluter Manager:
⑤Worker:
⑥Task:
⑦Job:
⑧Stage:
⑨DAGScheduler





html

相關文章
相關標籤/搜索