需求:將前些日子採集的評論存儲到hbase中java
思路:ios
先用fastjson解析評論,而後構造rdd,最後使用spark與phoenix交互,把數據存儲到hbase中sql
部分數據:apache
1 [ 2 { 3 "referenceName": "Apple iPhone XR 64GB 黑色 移動聯通電信4G全網通手機 雙卡雙待", 4 "creationTime": "2019-04-08 01:13:42", 5 "content": "此用戶沒有填寫評價內容", 6 "label": [] 7 }, 8 { 9 "referenceName": "Apple iPhone XR 64GB 黑色 移動聯通電信4G全網通手機 雙卡雙待", 10 "creationTime": "2019-03-29 11:49:36", 11 "content": "不錯", 12 "label": [] 13 }, 14 { 15 "referenceName": "Apple iPhone XR 64GB 黑色 移動聯通電信4G全網通手機 雙卡雙待", 16 "creationTime": "2019-03-21 21:13:07", 17 "content": "正品沒什麼毛病。信號好像是照別的差一點,可是還能夠,不是特別差。分辨率不是那麼好,可是也不是特別差。通常般。手機不卡。打遊戲很順暢。官方正品沒有翻車。", 18 "label": [ 19 { 20 "labelName": "功能齊全" 21 } 22 ] 23 }, 24 { 25 "referenceName": "Apple iPhone XR 64GB 黑色 移動聯通電信4G全網通手機 雙卡雙待", 26 "creationTime": "2019-03-22 09:56:22", 27 "content": "不錯是正品", 28 "label": [ 29 { 30 "labelName": "系統流暢" 31 }, 32 { 33 "labelName": "聲音大" 34 }, 35 { 36 "labelName": "作工精緻" 37 }, 38 { 39 "labelName": "待機時間長" 40 } 41 ] 42 }, 43 { 44 "referenceName": "Apple iPhone XR 64GB 黑色 移動聯通電信4G全網通手機 雙卡雙待", 45 "creationTime": "2019-03-13 07:27:56", 46 "content": "性價比很高的手機,用習慣了ios轉安卓7個月,從新迴歸,只能說,用蘋果省心。蘇寧質量有保障,送貨快,價格優惠,推薦購買!", 47 "label": [ 48 { 49 "labelName": "系統流暢" 50 }, 51 { 52 "labelName": "功能齊全" 53 }, 54 { 55 "labelName": "包裝通常" 56 } 57 ] 58 }, 59 { 60 "referenceName": "Apple iPhone XR 64GB 黑色 移動聯通電信4G手機 雙卡雙待", 61 "creationTime": "2019-02-25 22:03:18", 62 "content": "弟弟就想要一個蘋果手機。原本打算買8p的。而後我,推薦他這款xr,是最新款。價格整體來講性價比,比8p好。買了很快就到了,次日就。屏幕很大,仍是面容id。蘋果x大不少。比***x小一點", 63 "label": [ 64 { 65 "labelName": "外觀漂亮" 66 }, 67 { 68 "labelName": "作工精緻" 69 }, 70 { 71 "labelName": "反應快" 72 }, 73 { 74 "labelName": "系統流暢" 75 } 76 ] 77 }, 78 { 79 "referenceName": "【券後低至5388】Apple iPhone XR 64GB 黑色 移動聯通電信4G手機 雙卡雙待", 80 "creationTime": "2019-02-21 12:45:22", 81 "content": "物流很棒,xr價格能夠接受、性能穩定!", 82 "label": [] 83 }, 84 { 85 "referenceName": "Apple iPhone XR 64GB 黑色 移動聯通電信4G全網通手機 雙卡雙待", 86 "creationTime": "2019-02-22 12:14:55", 87 "content": "很不錯的手機除了有點厚,極致的體驗,黑邊徹底沒有影響", 88 "label": [ 89 { 90 "labelName": "拍照效果好" 91 }, 92 { 93 "labelName": "待機時間長" 94 }, 95 { 96 "labelName": "電池耐用" 97 }, 98 { 99 "labelName": "性價比高 " 100 } 101 ] 102 }, 103 { 104 "referenceName": "Apple iPhone XR 64GB 黑色 移動聯通電信4G手機 雙卡雙待", 105 "creationTime": "2019-02-13 00:28:03", 106 "content": "很是很是好的商品。很不錯,下次再來", 107 "label": [ 108 { 109 "labelName": "信號穩定" 110 }, 111 { 112 "labelName": "反應快" 113 }, 114 { 115 "labelName": "聲音大" 116 }, 117 { 118 "labelName": "作工精緻" 119 } 120 ] 121 }, 122 { 123 "referenceName": "Apple iPhone XR 64GB 黑色 移動聯通電信4G 手機", 124 "creationTime": "2019-04-02 17:29:43", 125 "content": "此用戶沒有填寫評價內容", 126 "label": [] 127 } 128 ] 129 [ 130 { 131 "referenceName": "Apple iPhone XR 64GB 黑色 移動聯通電信4G全網通手機 雙卡雙待", 132 "creationTime": "2019-04-05 18:13:14", 133 "content": "滿意嘻嘻", 134 "label": [ 135 { 136 "labelName": "音質好" 137 }, 138 { 139 "labelName": "拍照效果好" 140 }, 141 { 142 "labelName": "功能齊全" 143 }, 144 { 145 "labelName": "外觀漂亮" 146 } 147 ] 148 }, 149 { 150 "referenceName": "Apple iPhone XR 64GB 黑色 移動聯通電信4G全網通手機 雙卡雙待", 151 "creationTime": "2019-03-21 00:13:17", 152 "content": "棒棒噠", 153 "label": [] 154 }, 155 { 156 "referenceName": "【雙12爆款】Apple iPhone XR 64GB 黑色 移動聯通電信4G手機 雙卡雙待", 157 "creationTime": "2019-01-19 10:23:57", 158 "content": "雙十二買的正好遇上手機壞了就買了xr太貴沒捨得買一直用蘋果的系統用順手了不肯意換", 159 "label": [ 160 { 161 "labelName": "反應快" 162 }, 163 { 164 "labelName": "作工精緻" 165 }, 166 { 167 "labelName": "信號穩定" 168 }, 169 { 170 "labelName": "待機時間長" 171 }, 172 { 173 "labelName": "性價比通常般" 174 } 175 ] 176 }, 177 { 178 "referenceName": "Apple iPhone XR 64GB 黑色 移動聯通電信4G全網通手機 雙卡雙待", 179 "creationTime": "2019-03-21 12:47:34", 180 "content": "用了幾天感受還能夠,只是信號不是很好,整體上是能夠的", 181 "label": [ 182 { 183 "labelName": "音質好" 184 }, 185 { 186 "labelName": "分辨率高" 187 } 188 ] 189 }, 190 { 191 "referenceName": "Apple iPhone XR 64GB 黑色 移動聯通電信4G全網通手機 雙卡雙待", 192 "creationTime": "2019-03-09 08:32:15", 193 "content": "蘋果手機作工精細,手感不錯,外觀設計也是頗有時尚感!系統運行十分流暢,操做體驗不錯;功能齊全,屏幕分辨率高,整體來講很滿意!", 194 "label": [ 195 { 196 "labelName": "作工精緻" 197 }, 198 { 199 "labelName": "系統流暢" 200 }, 201 { 202 "labelName": "功能齊全" 203 } 204 ] 205 }, 206 { 207 "referenceName": "【低至5399】Apple iPhone XR 64GB 黑色 移動聯通電信4G手機 雙卡雙待", 208 "creationTime": "2019-01-15 22:44:53", 209 "content": "真心喜歡,一直在徘徊x.仍是xr.我的以爲真不錯,一直信賴蘇寧,新機爲激活,價錢能接受,值得推薦,黑邊什麼的不影響。", 210 "label": [ 211 { 212 "labelName": "拍照效果好" 213 }, 214 { 215 "labelName": "外觀漂亮" 216 }, 217 { 218 "labelName": "屏幕清晰" 219 } 220 ] 221 }, 222 { 223 "referenceName": "Apple iPhone XR 64GB 黑色 移動聯通電信4G全網通手機 雙卡雙待", 224 "creationTime": "2019-03-08 22:12:18", 225 "content": "手機運行流暢,外觀也漂亮,黑色彰顯檔次,音質也好,又是雙卡雙待手機,性價比不錯,比起XS便宜很多。", 226 "label": [ 227 { 228 "labelName": "外觀漂亮" 229 }, 230 { 231 "labelName": "系統流暢" 232 } 233 ] 234 }, 235 { 236 "referenceName": "Apple iPhone XR 64GB 黑色 移動聯通電信4G手機 雙卡雙待", 237 "creationTime": "2019-03-01 15:54:33", 238 "content": "手機很好,電池很是抗用,價錢也很是美麗,值得購買。", 239 "label": [ 240 { 241 "labelName": "系統流暢" 242 }, 243 { 244 "labelName": "電池耐用" 245 }, 246 { 247 "labelName": "分辨率高" 248 }, 249 { 250 "labelName": "待機時間長" 251 }, 252 { 253 "labelName": "包裝通常" 254 } 255 ] 256 }, 257 { 258 "referenceName": "Apple iPhone XR 64GB 黑色 移動聯通電信4G手機 雙卡雙待", 259 "creationTime": "2019-02-19 09:37:25", 260 "content": "春節期間配送超快,手機沒有任何問題,蘇寧易購確實作到了全網最低價", 261 "label": [] 262 }, 263 { 264 "referenceName": "Apple iPhone XR 64GB 黑色 移動聯通電信4G全網通手機 雙卡雙待", 265 "creationTime": "2019-02-23 13:27:15", 266 "content": "挺好的懶得拍照了借了幾張黑邊確實大", 267 "label": [ 268 { 269 "labelName": "待機時間長" 270 }, 271 { 272 "labelName": "反應快" 273 }, 274 { 275 "labelName": "作工精緻" 276 } 277 ] 278 } 279 ]
數據是json數組,所以採用fastjson進行解析,爲了最終存數據的更方便,須要構造Comment類json
依賴api
1 <dependency>
2 <groupId>com.alibaba</groupId>
3 <artifactId>fastjson</artifactId>
4 <version>1.2.47</version>
5 </dependency>
6
7 <dependency>
8 <groupId>junit</groupId>
9 <artifactId>junit</artifactId>
10 <version>4.9</version>
11 </dependency>
12
13
14 <!-- hbase -->
15 <!-- <dependency>
16 <groupId>org.apache.hbase</groupId>
17 <artifactId>hbase-client</artifactId>
18 <version>2.0.2</version>
19 </dependency>
20 <dependency>
21 <groupId>org.apache.hbase</groupId>
22 <artifactId>hbase-server</artifactId>
23 <version>2.0.2</version>
24 </dependency>
25 <dependency>
26 <groupId>org.apache.hbase</groupId>
27 <artifactId>hbase-common</artifactId>
28 <version>2.0.2</version>
29 </dependency> -->
30
31
32 <!-- phoenix -->
33 <dependency>
34 <groupId>org.apache.phoenix</groupId>
35 <artifactId>phoenix-core</artifactId>
36 <version>5.0.0-HBase-2.0</version>
37 </dependency>
38
39
40 <!-- phoenix_spark -->
41 <dependency>
42 <groupId>org.apache.phoenix</groupId>
43 <artifactId>phoenix-spark</artifactId>
44 <version>5.0.0-HBase-2.0</version>
45 </dependency>
46
47
48 <!-- spark -->
49 <dependency>
50 <groupId>org.apache.spark</groupId>
51 <artifactId>spark-core_2.11</artifactId>
52 <version>2.0.2</version>
53 </dependency>
54
55
56 <dependency>
57 <groupId>org.apache.spark</groupId>
58 <artifactId>spark-sql_2.11</artifactId>
59 <version>2.0.2</version>
60 </dependency>
Comment.java數組
1 package cn.tele.bean; 2
3 /**
4 * 5 * @author Tele 6 * 7 */
8 public class Comment { 9 private Integer id; 10 private String name; 11 private String content; 12 private String creationtime; 13 private String label; 14 private String platform; 15
16 public Integer getId() { 17 return id; 18 } 19
20 public void setId(Integer id) { 21 this.id = id; 22 } 23
24 public String getName() { 25 return name; 26 } 27
28 public void setName(String name) { 29 this.name = name; 30 } 31
32 public String getContent() { 33 return content; 34 } 35
36 public void setContent(String content) { 37 this.content = content; 38 } 39
40 public String getCreationtime() { 41 return creationtime; 42 } 43
44 public void setCreationtime(String creationtime) { 45 this.creationtime = creationtime; 46 } 47
48 public String getLabel() { 49 return label; 50 } 51
52 public void setLabel(String label) { 53 this.label = label; 54 } 55
56 public String getPlatform() { 57 return platform; 58 } 59
60 public void setPlatform(String platform) { 61 this.platform = platform; 62 } 63 }
StoreData.javasession
1 package cn.tele.spark; 2
3 import java.io.IOException; 4 import java.nio.file.FileVisitResult; 5 import java.nio.file.Files; 6 import java.nio.file.Path; 7 import java.nio.file.Paths; 8 import java.nio.file.SimpleFileVisitor; 9 import java.nio.file.attribute.BasicFileAttributes; 10 import java.sql.SQLException; 11 import java.util.ArrayList; 12 import java.util.HashMap; 13 import java.util.Iterator; 14 import java.util.List; 15 import java.util.Map; 16 import java.util.Map.Entry; 17 import java.util.Set; 18 import java.util.UUID; 19 import org.apache.spark.SparkConf; 20 import org.apache.spark.api.java.JavaRDD; 21 import org.apache.spark.api.java.JavaSparkContext; 22 import org.apache.spark.api.java.function.FlatMapFunction; 23 import org.apache.spark.api.java.function.Function; 24 import org.apache.spark.api.java.function.Function2; 25 import org.apache.spark.sql.Dataset; 26 import org.apache.spark.sql.Row; 27 import org.apache.spark.sql.SaveMode; 28 import org.apache.spark.sql.SparkSession; 29 import org.apache.spark.storage.StorageLevel; 30 import com.alibaba.fastjson.JSON; 31 import com.alibaba.fastjson.JSONArray; 32 import com.alibaba.fastjson.JSONObject; 33 import cn.tele.bean.Comment; 34 import scala.Tuple2; 35
36 /**
37 * 存儲爬取的評論到hbase中 38 * 39 * @author Tele 40 * 41 */
42 public class StoreData { 43 private static SparkConf conf = new SparkConf().setAppName("storedata").setMaster("local").set("spark.serializer", 44 "org.apache.spark.serializer.KryoSerializer"); 45 private static JavaSparkContext jsc = new JavaSparkContext(conf); 46 private static SparkSession session = new SparkSession(jsc.sc()); 47 static { 48 // 註冊
49 conf.registerKryoClasses(new Class[] { Comment.class }); 50 } 51 // 連接信息
52 private static final String DB_PHOENIX_URL = "jdbc:phoenix:hadoop002,hadoop003,hadoop004"; 53 /*
54 * private static final String DB_PHOENIX_DRIVER = 55 * "org.apache.phoenix.jdbc.PhoenixDriver"; private static final String 56 * DB_PHOENIX_USER = ""; private static final String DB_PHOENIX_PASS = ""; 57 * private static final String DB_PHOENIX_FETCHSIZE = "10000"; 58 */
59
60 public static void main(String[] args) throws SQLException { 61
62 // 遍歷文件夾
63 Path path = Paths.get("F:\\comment\\"); 64
65 try { 66 MyFileVisitor myFileVisitor = new MyFileVisitor(); 67 Files.walkFileTree(path, myFileVisitor); 68 List<Map<String, Object>> list = myFileVisitor.getData(); 69 JavaRDD<Comment> commentRDD = getCommentRDD(list); 70 // 存儲至hbase
71 storeData(commentRDD); 72
73 } catch (IOException e) { 74 e.printStackTrace(); 75 } 76
77 // 讀取數據
78 /*
79 * JavaRDD<String> rdd = 80 * jsc.textFile("file:\\F:\\comment\\sn_comment\\iphonexr-2019-04-16-18-27-36\\" 81 * ); List<Comment> commentList = getCommentList(rdd,"iphonexr"); 82 */
83
84 jsc.close(); 85
86 } 87
88 private static int storeData(JavaRDD<Comment> rdd) { 89 int successCount = 0; 90 /*
91 * DataTypes.createStructType(Arrays.asList( 92 * DataTypes.createStructField("id",DataTypes.IntegerType,false), 93 * DataTypes.createStructField("name",DataTypes.StringType,false), 94 * DataTypes.createStructField("content",DataTypes.StringType,false), 95 * DataTypes.createStructField("creationtime",DataTypes.StringType,false), 96 * DataTypes.createStructField("label",DataTypes.StringType,true), 97 * DataTypes.createStructField("platform",DataTypes.StringType,false) )); 98 */
99
100 Dataset<Row> ds = session.createDataFrame(rdd, Comment.class); 101 ds.write().mode(SaveMode.Overwrite).format("org.apache.phoenix.spark").option("zkUrl", DB_PHOENIX_URL) 102 .option("table", "comment").save(); 103 ; 104
105 return successCount; 106 } 107
108 @SuppressWarnings("unchecked") 109 private static <U> JavaRDD<Comment> getCommentRDD(List<Map<String, Object>> list) { 110 JavaRDD<Map<String, Object>> originalRDD = jsc.parallelize(list); 111 JavaRDD<List<Comment>> listCommentRDD = originalRDD.map(new Function<Map<String, Object>, List<Comment>>() { 112
113 private static final long serialVersionUID = 1L; 114
115 List<Comment> dataList = new ArrayList<>(); 116
117 public List<Comment> call(Map<String, Object> v1) throws Exception { 118 Set<Entry<String, Object>> entrySet = v1.entrySet(); 119 Iterator<Entry<String, Object>> iterator = entrySet.iterator(); 120 while (iterator.hasNext()) { 121 Entry<String, Object> entry = iterator.next(); 122 String referenceName = entry.getKey(); 123 String platform = referenceName.split("#")[0]; 124 List<Comment> commentList = (List<Comment>) entry.getValue(); 125 commentList.forEach(cm -> { 126 cm.setPlatform(platform); 127 dataList.add(cm); 128 }); 129 println(referenceName + "評論量------------------:" + commentList.size()); 130 } 131 return dataList; 132 } 133 }).persist(StorageLevel.MEMORY_ONLY()); 134
135 JavaRDD<Comment> commentRDD = listCommentRDD.flatMap(new FlatMapFunction<List<Comment>, Comment>() { 136
137 private static final long serialVersionUID = 1L; 138
139 @Override 140 public Iterator<Comment> call(List<Comment> t) throws Exception { 141 return t.iterator(); 142 } 143 }); 144
145 long totalSize = commentRDD.count(); 146 println("評論總量:-----------" + totalSize); 147
148 // 設置id
149 JavaRDD<Comment> resultRDD = commentRDD.zipWithIndex().map(new Function<Tuple2<Comment, Long>, Comment>() { 150
151 private static final long serialVersionUID = 1L; 152
153 @Override 154 public Comment call(Tuple2<Comment, Long> v1) throws Exception { 155 v1._1.setId(Integer.valueOf(v1._2.toString())); 156 return v1._1; 157 } 158 }); 159 return resultRDD; 160 } 161
162 private static List<Comment> getCommentList(JavaRDD<String> rdd, String referenceName) { 163 List<Comment> commentList = new ArrayList<Comment>(); 164
165 String originalStr = rdd.reduce(new Function2<String, String, String>() { 166 private static final long serialVersionUID = 1L; 167
168 public String call(String v1, String v2) throws Exception { 169 return v1.trim() + v2.trim(); 170 } 171 }); 172 String uuid = UUID.randomUUID().toString(); 173 originalStr = originalStr.replace("][", "]" + uuid + "["); 174
175 // 解析json
176 String[] pages = originalStr.split(uuid); 177 for (String page : pages) { 178 try { 179 JSONArray jsonArray = JSON.parseArray(page); 180 for (Object obj : jsonArray) { 181 JSONObject jsonObject = (JSONObject) obj; 182 // String referenceName = jsonObject.getString("referenceName");
183 String creationTime = jsonObject.getString("creationTime"); 184 String content = jsonObject.getString("content"); 185 println("referenceName:" + referenceName); 186 println("creationTime:" + creationTime); 187 println("content:" + content); 188
189 // 封裝
190 Comment comment = new Comment(); 191 comment.setName(referenceName); 192 comment.setCreationtime(creationTime); 193 comment.setContent(content); 194
195 JSONArray labelArray = jsonObject.getJSONArray("label"); 196 if (labelArray != null) { 197 String label = ""; 198 for (Object labelObj : labelArray) { 199 JSONObject labelObject = (JSONObject) labelObj; 200 label += labelObject.getString("labelName") + "#"; 201 } 202 comment.setLabel(label); 203 println("label:" + label); 204 } 205 commentList.add(comment); 206 } 207 } catch (Exception e) { 208 continue; 209 } 210 } 211 return commentList; 212 } 213
214 private static class MyFileVisitor extends SimpleFileVisitor<Path> { 215 private List<Map<String, Object>> list = new ArrayList<Map<String, Object>>(); 216 String platform = null; 217
218 @Override 219 public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException { 220 println("當前訪問的文件夾是------" + dir.toAbsolutePath() /* + dir.getFileName() */); 221 String fileName = dir.getFileName().toString(); 222 if (fileName.contains("_")) { 223 platform = fileName.split("_")[0]; 224 } 225 if (platform != null) { 226 if (fileName.contains("-")) { 227 String referenceName = fileName.split("-")[0]; 228 JavaRDD<String> rdd = jsc.textFile(dir.toAbsolutePath().toString()); 229 List<Comment> commentList = getCommentList(rdd, referenceName); 230 Map<String, Object> map = new HashMap<String, Object>(); 231 // 平臺_品牌--評論
232 map.put(platform + "#" + referenceName, commentList); 233 list.add(map); 234 } 235 } 236
237 return super.preVisitDirectory(dir, attrs); 238 } 239
240 @Override 241 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { 242 return super.visitFile(file, attrs); 243 } 244
245 @Override 246 public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException { 247 return super.visitFileFailed(file, exc); 248 } 249
250 @Override 251 public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { 252 return super.postVisitDirectory(dir, exc); 253 } 254
255 public List<Map<String, Object>> getData() { 256 return list; 257 } 258
259 } 260
261 private static void println(Object obj) { 262 System.out.println(obj); 263 } 264
265 }
可能會報以下異常,但並不影響,彷佛和我使用的hbase版本有關app
在終端進行查看dom
說明:
1.StorgeData還能夠再進行優化,好比解析json的時候能夠直接構造rdd而不是用list,也能夠改形成集羣上運行的版本,但因爲個人數據量很少,直接用本地模式就足夠了
2.SaveMode只能是SaveMode.Overwrite其餘模式phoenix都不支持,實際測試時發現仍是append
3.生成id時用了zipWithIndex,但連續的id容易形成集羣熱點問題,使用phoenix建表時最好加鹽
4.與phoenix交互時也能夠用spark的jdbc,能夠參考http://www.javashuo.com/article/p-otlcstfw-nq.html
5.關於目錄問題,spark支持對目錄下的多個文件進行讀取構造rdd,所以遍歷時到父文件夾便可,固然,遍歷到每一個文件也能夠
附測試用例:
1 /**
2 * 3 *@author Tele 4 *測試spark與phoenix集成 5 */
6 public class SparkPhoneix { 7 private static SparkConf conf = new SparkConf().setAppName("demo").setMaster("local"); 8 private static JavaSparkContext jsc = new JavaSparkContext(conf); 9 private static SparkSession session = new SparkSession(jsc.sc()); 10
11 //連接信息
12 private static final String DB_PHOENIX_URL = "jdbc:phoenix:hadoop002,hadoop003,hadoop004"; 13 /*private static final String DB_PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver"; 14 private static final String DB_PHOENIX_USER = ""; 15 private static final String DB_PHOENIX_PASS = ""; 16 private static final String DB_PHOENIX_FETCHSIZE = "10000";*/
17
18 public static void main(String[] args) { 19 Dataset<Row> ds = session.read().format("org.apache.phoenix.spark") 20 .option("zkUrl", DB_PHOENIX_URL) 21 .option("table", "TEST") 22 .load(); 23 ds.createOrReplaceTempView("test"); 24
25 ds.show(); 26
27
28 //插入數據測試_SALT,ID,INFO.ITEM,INFO.CONTENT,INFO.LABEL
29 StructType schema = DataTypes.createStructType(Arrays.asList( 30 DataTypes.createStructField("id",DataTypes.IntegerType,false), 31 DataTypes.createStructField("item",DataTypes.StringType,false), 32 DataTypes.createStructField("content",DataTypes.StringType,false), 33 DataTypes.createStructField("label",DataTypes.StringType,true) 34 )); 35
36
37 // 建立數據
38 List<Row> list = new ArrayList<Row>(); 39 Row row1 = RowFactory.create(3,"iphone","不錯",null); 40 list.add(row1); 41
42 Dataset<Row> dataset = session.createDataFrame(list,schema); 43 //對於phoenix只能使用overwrite模式,但實際操做時發現是append數據
44 dataset.write().mode(SaveMode.Overwrite).format("org.apache.phoenix.spark") 45 .option("zkUrl", DB_PHOENIX_URL) 46 .option("table", "TEST").save();; 47 ds.show(); 48 session.stop(); 49 jsc.close(); 50 } 51 }