hive中shuffle的優化java
壓縮
壓縮可使磁盤上存儲的數據量變小,經過下降I/O來提升查詢速度。mysql
對hive產生的一系列MR中間過程啓用壓縮linux
set hive.exec.compress.intermediate=true; set mapred.map.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
對最終輸出結果壓縮(寫到hdfs、本地磁盤的文件)git
set hive.exec.compress.output=true; set mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
join優化正則表達式
若是關聯查詢兩張表中有一張小表默認map join,將小表加入內存 hive.mapjoin.smalltable.filesize=25000000 默認大小 hive.auto.convert.join=true 默認開啓 若是沒有開啓使用mapjoin,使用語句制定小表使用mapjoin ```sql select /*+ MAPJOIN(time_dim) */ count(1) from store_sales join time_dim on (ss_sold_time_sk = t_time_sk) ``` 2. smb join Sort-Merge-Bucket join 解決大表與大表join速度慢問題 經過分桶字段的的hash值對桶的個數取餘進行分桶 3. 傾斜鏈接 ```xml <!-- hive.optimize.skewjoin:是否爲鏈接表中的傾斜鍵建立單獨的執行計劃。它基於存儲在元數據中的傾斜鍵。在編譯時,Hive爲傾斜鍵和其餘鍵值生成各自的查詢計 劃。 --> <property> <name>hive.optimize.skewjoin</name> <value>true</value> </property> <property> <!-- hive.skewjoin.key:決定如何肯定鏈接中的傾斜鍵。在鏈接操做中,若是同一鍵值所對應的數據行數超過該參數值,則認爲該鍵是一個傾斜鏈接鍵。 --> <name>hive.skewjoin.key</name> <value>100000</value> </property> <!-- hive.skewjoin.mapjoin.map.tasks:指定傾斜鏈接中,用於Map鏈接做業的任務數。該參數應該與hive.skewjoin.mapjoin.min.split一塊兒使用,執行細粒度的控制。 --> <property> <name>hive.skewjoin.mapjoin.map.tasks</name> <value>10000</value> </property> <!-- hive.skewjoin.mapjoin.min.split:經過指定最小split的大小,肯定Map鏈接做業的任務數。該參數應該與hive.skewjoin.mapjoin.map.tasks一塊兒使用,執行細粒度的控制。 --> <property> <name>hive.skewjoin.mapjoin.min.split</name> <value>33554432</value> </property> ```
本質緣由:key的分佈不均致使的sql
Map 端部分聚合,至關於Combinershell
hive.map.aggr=true
有數據傾斜的時候進行負載均衡數據庫
hive.groupby.skewindata=true
當選項設定爲 true,生成的查詢計劃會有兩個 MR Job。第一個 MR Job 中,Map 的輸出結果集合會隨機分佈到 Reduce 中,每一個 Reduce 作部分聚合操做,並輸出結果,這樣處理的結果是相同的 Group By Key 有可能被分發到不一樣的 Reduce 中,從而達到負載均衡的目的;第二個 MR Job 再根據預處理的數據結果按照 Group By Key 分佈到 Reduce 中(這個過程能夠保證相同的 Group By Key 被分佈到同一個 Reduce 中),最後完成最終的聚合操做。express
全量導入apache
[hadoop@linux03 sqoop-1.4.5-cdh5.3.6]$ bin/sqoop import \ > --connect jdbc:mysql://linux03.ibf.com:3306/mydb \ > --username root \ > --password 123456 \ > --table user
增量導入
bin/sqoop import \ --connect jdbc:mysql://linux03.ibf.com:3306/mydb \ --username root \ --password 123456 \ --table user \ --fields-terminated-by '\t' \ --target-dir /sqoop/incremental \ -m 1 \ --direct \ --check-column id \ --incremental append \ --last-value 3
hive致使數據傾斜的可能性(哪些操做會致使) -->分桶 join key分佈不均勻 大量空值致使如何解決?
根據key操做到時結果分佈不均均可能致使數據傾斜,如group by key
order by 使用全局排序最終只會在一個reducer上運行全部數據,致使數據傾斜
大量NULL
hive的NULL有時候是必須的:
1)hive中insert語句必須列數匹配,不支持不寫入,沒有值的列必須使用null佔位。
2)hive表的數據文件中按分隔符區分各個列。空列會保存NULL(n)來保留列位置。但外部表加載某些數據時若是列不夠,如表13列,文件數據只有2列,則在表查詢時表中的末尾剩餘列無數據對應,自動顯示爲NULL。
因此,NULL轉化爲空字符串,能夠節省磁盤空間,實現方法有幾種
1)建表時直接指定(兩種方式)
a、用語句 ROW FORMAT SERDE ‘org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe’ with serdeproperties('serialization.null.format' = '') 實現,注意二者必須一塊兒使用,如
CREATE TABLE hive_tb (id int,name STRING) PARTITIONED BY ( `day` string,`type` tinyint COMMENT '0 as bid, 1 as win, 2 as ck', `hour` tinyint) ROW FORMAT SERDE ‘org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe’ WITH SERDEPROPERTIES ( ‘field.delim’='/t’, ‘escape.delim’='//’, ‘serialization.null.format'='' ) STORED AS TEXTFILE;
b、或者經過ROW FORMAT DELIMITED NULL DEFINED AS '' 如
CREATE TABLE hive_tb (id int,name STRING) PARTITIONED BY ( `day` string,`type` tinyint COMMENT '0 as bid, 1 as win, 2 as ck', `hour` tinyint) ROW FORMAT DELIMITED NULL DEFINED AS '' STORED AS TEXTFILE;
2)修改已存在的表
alter table hive_tb set serdeproperties('serialization.null.format' = '');
hive中如何增長一列數據?
新增一列
hive > alter table log_messages add coloumns( app_name string comment 'Application name', session_id long comment 'The current session id' ); -- 增長列的表的最後一個字段以後,在分區字段以前添加。
若是在表中新增一列new_column,則在原表上直插入new_column這一列數據不可行
若是新增一列是分區,則能夠新增數據到該分區下
insert into table clear partition(date='20150828',hour='18') select id,url,guid from tracklogs where date='20150828' and hour='18';
有沒有hive處理過json?有哪些函數?
建表時制定jar包處理json數據
ADD JAR hcatalog/share/hcatalog/hive-hcatalog-core-1.1.0-cdh5.14.2.jar; 2. 建表 ``` hive (default)> ADD JAR hcatalog/share/hcatalog/hive-hcatalog-core-1.1.0-cdh5.14.2.jar; Added [hcatalog/share/hcatalog/hive-hcatalog-core-1.1.0-cdh5.14.2.jar] to class path Added resources: [hcatalog/share/hcatalog/hive-hcatalog-core-1.1.0-cdh5.14.2.jar] hive (default)> create table spark_people_json( > > `name` string, > > `age` int) > > ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' > > STORED AS TEXTFILE; OK Time taken: 4.445 seconds ``` 2. 記錄下若是隻是某個字段爲json,想要獲取裏面的某個值怎麼操做? 1. get_json_object() 只能獲取一個字段 ```sql select get_json_object('{"shop":{"book":[{"price":43.3,"type":"art"},{"price":30,"type":"technology"}],"clothes":{"price":19.951,"type":"shirt"}},"name":"jane","age":"23"}', '$.shop.book[0].type'); ``` 2. json_tuple() 能夠獲取多個字段 ```sql select json_tuple('{"name":"jack","server":"www.qq.com"}','server','name') ``` 3. 自行編寫UDF
sparkstreaming正在運行的程序如何去停止?怎麼安全中止?代碼作了更新,如何讓正在運行的和更新後的代碼作一個交替?
升級應用程序代碼
若是須要使用新的應用程序代碼升級正在運行的Spark Streaming應用程序,則有兩種可能的機制。
使用直接鏈接方式
消息語義有幾種?
sparkstreaming和kafka集成有幾種方式?
初始化 StreamingContext
經過建立輸入DStreams來定義輸入源。
經過將轉換和輸出操做應用於DStream來定義流式計算。
開始接收數據並使用它進行處理streamingContext.start()。
等待處理中止(手動或因爲任何錯誤)使用streamingContext.awaitTermination()。
可使用手動中止處理streamingContext.stop()。
hive中的分析函數?
LEAD 能夠選擇指定要引導的行數。若是未指定要引導的行數,則前導是一行。 噹噹前行的前導超出窗口末尾時返回null。 ``` hive (default)> desc function lead; OK tab_name LEAD (scalar_expression [,offset] [,default]) OVER ([query_partition_clause] order_by_clause); The LEAD function is used to return data from the next row. ``` LAG 能夠選擇指定滯後的行數。若是未指定滯後行數,則滯後爲一行。 噹噹前行的延遲在窗口開始以前延伸時,返回null。 ``` hive (default)> desc function lag; OK tab_name LAG (scalar_expression [,offset] [,default]) OVER ([query_partition_clause] order_by_clause); The LAG function is used to access data from a previous row. ``` FIRST_VALUE 這最多須要兩個參數。第一個參數是您想要第一個值的列,第二個(可選)參數必須是false默認的布爾值。若是設置爲true,則跳過空值。 LAST_VALUE 這最多須要兩個參數。第一個參數是您想要最後一個值的列,第二個(可選)參數必須是false默認的布爾值。若是設置爲true,則跳過空值。 2. OVER字句 OVER標準聚合: COUNT、SUM、MIN、MAX、AVG 使用帶有任何原始數據類型的一個或多個分區列的PARTITION BY語句。 使用PARTITION BY和ORDER BY 與任何數據類型的一個或多個分區和/或排序列。 帶有窗口的over具體說明。Windows能夠在WINDOW子句中單獨定義。窗口規範支持如下格式: ``` (ROWS | RANGE) BETWEEN (UNBOUNDED | [num]) PRECEDING AND ([num] PRECEDING | CURRENT ROW | (UNBOUNDED | [num]) FOLLOWING) (ROWS | RANGE) BETWEEN CURRENT ROW AND (CURRENT ROW | (UNBOUNDED | [num]) FOLLOWING) (ROWS | RANGE) BETWEEN [num] FOLLOWING AND (UNBOUNDED | [num]) FOLLOWING ``` 當指定ORDER BY時缺乏WINDOW子句,WINDOW規範默認爲 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW. 當缺乏ORDER BY和WINDOW子句時,WINDOW規範默認爲 ROW BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING. OVER子句支持如下函數,但它不支持帶有它們的窗口(參見HIVE-4797): Ranking functions: Rank, NTile, DenseRank, CumeDist, PercentRank. Lead and Lag functions. 3. 分析函數 RANK ROW_NUMBER DENSE_RANK CUME_DIST:CUME_DIST 小於等於當前值的行數/分組內總行數 PERCENT_RANK NTILE ```sql select s_id, NTILE(2) over(partition by c_id order by s_score) from score ``` 4. Hive 2.1.0及更高版本中支持Distinct (參見HIVE-9534) 聚合函數支持Distinct,包括SUM,COUNT和AVG,它們聚合在每一個分區內的不一樣值上。當前實現具備如下限制:出於性能緣由,在分區子句中不能支持ORDER BY或窗口規範。支持的語法以下。 ```sql COUNT(DISTINCT a) OVER (PARTITION BY c) ``` Hive 2.2.0中支持ORDER BY和窗口規範(參見HIVE-13453)。一個例子以下。 ```sql COUNT(DISTINCT a) OVER (PARTITION BY c ORDER BY d ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) ``` 5. Hive 2.1.0及更高版本中OVER子句支持中的聚合函數(參見 HIVE-13475) 添加了對OVER子句中引用聚合函數的支持。例如,目前咱們能夠在OVER子句中使用SUM聚合函數,以下所示。 ```sql SELECT rank() OVER (ORDER BY sum(b)) FROM T GROUP BY a; ```
常見的字符串用哪些函數?
hive (practice)> select concat_ws('|','abc','def','gh'); abc|def|gh
substr(string A, int start),substring(string A, int start)
substr(string A, int start, int len),substring(string A, int start, int len)
語法: regexp_replace(string A, string B, string C)
返回值: string
說明:將字符串A中的符合java正則表達式B的部分替換爲C。注意,在有些狀況下要使用轉義字符,相似oracle中的regexp_replace函數。
語法: regexp_extract(string subject, string pattern, int index)
返回值: string
說明:將字符串subject按照pattern正則表達式的規則拆分,返回index指定的字符。
hive (practice)> select repeat('abc',5); abcabcabcabcabc
hive中如何去統計每週一,每月的第一天的pv?
獲取指定日期月份的第一天、年份的第一天
select trunc('2019-02-24', 'YYYY'); select trunc('2019-02-24', 'MM');
指定日期下週的指定周幾
select next_day('2019-02-24', 'TU');
按指定格式返回指定日期增長几個月後的日期
select add_months('2019-02-28', 1); select add_months('2019-02-24 21:15:16', 2, 'YYYY-MM-dd HH:mm:ss');
select count(guid) from table group by trunc(date, 'MM')
select count(guid) from table group by next_day('2019-06-08', 'MONDAY');
Spark2.x以後,官方已經將 ( DataFrame ) /Dataset (數據集)API的進行了 統一 ,DataFrame僅 是Dataset中每一個元素爲Row類型的時候
不一樣之處在於 Dataset 是 strongly typed (強類型的) ,而dataframe則是 untypedrel (弱類型的)
項目中hive的元數據在哪兒保存?
元數據怎麼保證他的安全性?
修改元數據所用的用戶名和密碼
<property> <name>javax.jdo.option.ConnectionUserName</name> <value>root</value> </property> <property> <name>javax.jdo.option.ConnectionPassword</name> <value>123456</value> </property>
在mysql端設置metastore數據庫的訪問權限
sqoop導入導出有幾種方式?增量導出?
導入
全量導入 ``` [hadoop@linux03 sqoop-1.4.5-cdh5.3.6]$ bin/sqoop import \ > --connect jdbc:mysql://linux03.ibf.com:3306/test_db \ > --username root \ > --password root \ > --table toHdfs \ > --target-dir /toHdfs \ > --direct \ > --delete-target-dir \ > --fields-terminated-by '\t' \ > -m 1 ``` 增量導入append ```sh bin/sqoop import \ --connect jdbc:mysql://linux03.ibf.com:3306/mydb \ --username root \ --password 123456 \ --table user \ --fields-terminated-by '\t' \ --target-dir /sqoop/incremental \ -m 1 \ --direct \ --check-column id \ --incremental append \ --last-value 3 ``` 增量導入lastmodified 表中必須有一列指示時間 ``` sqoop import \ --connect jdbc:mysql://master:3306/test \ --username hive \ --password 123456 \ --table customertest \ --check-column last_mod \ --incremental lastmodified \ --last-value "2016-12-15 15:47:29" \ -m 1 \ --append ```
導出
插入 默認狀況下,sqoop-export將新行添加到表中;每行輸入記錄都被轉換成一條INSERT語句,將此行記錄添加到目標數據庫表中。若是數據庫中的表具備約束條件(例如,其值必須惟一的主鍵列)而且已有數據存在,則必須注意避免插入違反這些約束條件的記錄。若是INSERT語句失敗,導出過程將失敗。此模式主要用於將記錄導出到能夠接收這些結果的空表中。 更新 若是指定了--update-key參數,則Sqoop將改成修改數據庫中表中現有的數據。每一個輸入記錄都將轉化爲UPDATE語句修改現有數據。語句修改的行取決於--update-key指定的列名,若是數據庫中的表中不存在的數據,那麼也不會插入。 根據目標數據庫,若是要更新已存在於數據庫中的行,或者若是行尚不存在則插入行,則還能夠--update-mode 使用allowinsert模式指定參數
使用窗口函數,指定足夠長的窗口處理數據,總而使數據量足夠大(最好在一個block大小左右),完成後使用foreachRDD將數據寫出到HDFS
hive中的負責數據類型有哪些?
TINYINT
SMALLINT
INT/INTEGER
BIGINT
FLOAT
DOUBLE
DECIMAL
string
varchar
char
TIMESTAMP
DATE
複雜類型
ARRAY<data_type>
create table hive_array_test (name string, stu_id_list array<INT>) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' COLLECTION ITEMS TERMINATED BY ':' ; -- 'FIELDS TERMINATED BY' :字段與字段之間的分隔符 -- 'COLLECTION ITEMS TERMINATED BY' :一個字段各個 item 的分隔符 [chen@centos01 ~]$ vi hive_array.txt 0601,1:2:3:4 0602,5:6 0603,7:8:9:10 0604,11:12 load data local inpath '/home/chen/hive_array.txt' into table hive_array_test;
hive (default)> select * from hive_array_test; OK hive_array_test.name hive_array_test.stu_id_list 0601 [1,2,3,4] 0602 [5,6] 0603 [7,8,9,10] 0604 [11,12] Time taken: 0.9 seconds, Fetched: 4 row(s)
MAP<primitive_type, data_type>
create table hive_map_test (id int, unit map<string, int>) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' COLLECTION ITEMS TERMINATED BY ',' MAP KEYS TERMINATED BY ':'; ‘MAP KEYS TERMINATED BY’: key value 分隔符 [chen@centos01 ~]$ vi hive_map.txt 0 Chinese:100,English:80,math:59 1 Chinese:80,English:90 2 Chinese:100,English:100,math:60 load data local inpath '/home/chen/hive_map.txt' into table hive_map_test;
hive (default)> select * from hive_map_test; OK hive_map_test.id hive_map_test.unit 0 {"Chinese":100,"English":80,"math":59} 1 {"Chinese":80,"English":90} 2 {"Chinese":100,"English":100,"math":60} Time taken: 0.204 seconds, Fetched: 3 row(s) hive (default)> select id, unit['math'] from hive_map_test; OK id _c1 0 59 1 NULL 2 60 Time taken: 0.554 seconds, Fetched: 3 row(s)
STRUCT<col_name : data_type [COMMENT col_comment], ...>
create table hive_struct_test(id int, info struct<name:string, age:int, height:float>) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' COLLECTION ITEMS TERMINATED BY ':'; [chen@centos01 ~]$ vi hive_struct.txt 0,zhao:18:178 1,qian:30:173 2,sun:20:180 3,li:23:183 load data local inpath '/home/chen/hive_struct.txt' into table hive_struct_test;
hive (default)> select * from hive_struct_test; OK hive_struct_test.id hive_struct_test.info 0 {"name":"zhao","age":18,"height":178.0} 1 {"name":"qian","age":30,"height":173.0} 2 {"name":"sun","age":20,"height":180.0} 3 {"name":"li","age":23,"height":183.0} Time taken: 0.153 seconds, Fetched: 4 row(s) hive (default)> select id, info.name from hive_struct_test; OK id name 0 zhao 1 qian 2 sun 3 li Time taken: 0.133 seconds, Fetched: 4 row(s)
導入:load data [local] inpath '路徑' overwrite into table 表名
導出:insert overwrite [local] directory '/home/hadoop/data' select * from emp_p;
local:加local是從本地加載,不加local是從hdfs加載
RDD的建立方式?
scala> var data = Array(1, 2, 3, 4, 5) data: Array[Int] = Array(1, 2, 3, 4, 5) scala> val rdd = sc.parallelize(data) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:26
scala> sc.textFile("student.log") res0: org.apache.spark.rdd.RDD[String] = student.log MapPartitionsRDD[1] at textFile at <console>:25
hadoop的壓縮格式
bin/hadoop checknative -a
[chen@centos01 hadoop-2.6.0-cdh5.14.2]$ bin/hadoop checknative -a 19/06/05 19:15:45 INFO bzip2.Bzip2Factory: Successfully loaded & initialized native-bzip2 library system-native 19/06/05 19:15:45 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library Native library checking: hadoop: true /opt/modules/hadoop-2.6.0-cdh5.14.2/lib/native/libhadoop.so.1.0.0 zlib: true /lib64/libz.so.1 snappy: true /opt/modules/hadoop-2.6.0-cdh5.14.2/lib/native/libsnappy.so.1 lz4: true revision:10301 bzip2: true /lib64/libbz2.so.1 openssl: true /usr/lib64/libcrypto.so
sparksql處理完的dataframe結果要保存在數據庫中,具體應該怎麼作?
spark .read .table("mydb.emp") .write .mode(SaveMode.Ignore) .jdbc("jdbc:mysql://centos01:3306/mydb", "emp", prop)
mapreduce過程當中 shuffle的優化?
shuffle過程:map端:環形緩衝區(到80%) --》 溢寫(分區,排序)--》combiner --》 compress --》 reduce端:--》 merge --》 排序 --》 group combiner可選擇開啓,在map端進行一次小reduce compress可選擇開區,將結果壓縮,減小IO shuffle中分區時採用HashPartitioner,相同的key會進入同一個reduce,key分佈不均會致使數據傾斜,參考數據傾斜優化過程
hive二次排序的問題?
order by:全局有序,最終數據會進入一個reduce中,不推薦使用
sort by:局部有序,每一個reduce中的數據局有序
distribute by
經過distribute by設置分區 ,使用 sort by設置分區內排序 distribute by 常常與 sort by 在一塊兒使用
cluster by:distribute by 和sort by條件一致時 使用cluster by
二次排序在by後面加上字段名a, b, c ...,hive會先按a排序,若a相同按b排序,若b相同按c排序
select * from score order by score.s_id asc, score.s_score desc;
score.s_id score.c_id score.s_score 01 03 99 01 02 90 01 01 80 02 03 80 02 01 70 02 02 60 03 03 80 03 02 80 03 01 80 04 01 50 04 02 30 04 03 20 05 02 87 05 01 76 06 03 34 06 01 31 07 03 98 07 02 89 Time taken: 96.333 seconds, Fetched: 18 row(s)
能夠知道爲何不用order by排序了
總方針:避免由於hbase前置匹配機制致使數據所有進入一個regionserver或某幾個regionserver而產生數據熱點
rowkey:
必須惟一
不建議用隨機數做爲rowkey,要根據實際業務需求設計rowkey
不能設置過大致使存儲空間變大和索引變大
sparkstreaming窗口函數 開窗怎麼開的,開的多長
開啓窗口函數須要制定兩個參數:
窗口長度 - The duration of the window (3 in the figure). 滑動間隔 - The interval at which the window operation is performed (2 in the figure).
// 滑動窗口:兩個窗口之間有重疊部分,滑動時間小於窗口時間 val wc = res.reduceByKeyAndWindow( // 同一個key在同一個窗口不一樣批次數據的聚合操做 (a:Int, b:Int) => a + b, // 窗口寬度,表明一個窗口具體計算的數據量 Seconds(15), // 滑動時間間隔,表明多久時間計算一個窗口的數據 Seconds(10) )
通常窗口長度大於滑動間隔
增長窗口寬度,對一個窗口中的數據操做就能夠作到用sparkstreaming跑批