畢設三: spark與phoenix集成插入數據/解析json數組

需求:將前些日子採集的評論存儲到hbase中java

思路:ios

先用fastjson解析評論,而後構造rdd,最後使用spark與phoenix交互,把數據存儲到hbase中sql

部分數據:apache

  1 [
  2   {
  3     "referenceName": "Apple iPhone XR 64GB 黑色 移動聯通電信4G全網通手機 雙卡雙待",
  4     "creationTime": "2019-04-08 01:13:42",
  5     "content": "此用戶沒有填寫評價內容",
  6     "label": []
  7   },
  8   {
  9     "referenceName": "Apple iPhone XR 64GB 黑色 移動聯通電信4G全網通手機 雙卡雙待",
 10     "creationTime": "2019-03-29 11:49:36",
 11     "content": "不錯",
 12     "label": []
 13   },
 14   {
 15     "referenceName": "Apple iPhone XR 64GB 黑色 移動聯通電信4G全網通手機 雙卡雙待",
 16     "creationTime": "2019-03-21 21:13:07",
 17     "content": "正品沒什麼毛病。信號好像是照別的差一點,可是還能夠,不是特別差。分辨率不是那麼好,可是也不是特別差。通常般。手機不卡。打遊戲很順暢。官方正品沒有翻車。",
 18     "label": [
 19       {
 20         "labelName": "功能齊全"
 21       }
 22     ]
 23   },
 24   {
 25     "referenceName": "Apple iPhone XR 64GB 黑色 移動聯通電信4G全網通手機 雙卡雙待",
 26     "creationTime": "2019-03-22 09:56:22",
 27     "content": "不錯是正品",
 28     "label": [
 29       {
 30         "labelName": "系統流暢"
 31       },
 32       {
 33         "labelName": "聲音大"
 34       },
 35       {
 36         "labelName": "作工精緻"
 37       },
 38       {
 39         "labelName": "待機時間長"
 40       }
 41     ]
 42   },
 43   {
 44     "referenceName": "Apple iPhone XR 64GB 黑色 移動聯通電信4G全網通手機 雙卡雙待",
 45     "creationTime": "2019-03-13 07:27:56",
 46     "content": "性價比很高的手機,用習慣了ios轉安卓7個月,從新迴歸,只能說,用蘋果省心。蘇寧質量有保障,送貨快,價格優惠,推薦購買!",
 47     "label": [
 48       {
 49         "labelName": "系統流暢"
 50       },
 51       {
 52         "labelName": "功能齊全"
 53       },
 54       {
 55         "labelName": "包裝通常"
 56       }
 57     ]
 58   },
 59   {
 60     "referenceName": "Apple iPhone XR 64GB 黑色 移動聯通電信4G手機 雙卡雙待",
 61     "creationTime": "2019-02-25 22:03:18",
 62     "content": "弟弟就想要一個蘋果手機。原本打算買8p的。而後我,推薦他這款xr,是最新款。價格整體來講性價比,比8p好。買了很快就到了,次日就。屏幕很大,仍是面容id。蘋果x大不少。比***x小一點",
 63     "label": [
 64       {
 65         "labelName": "外觀漂亮"
 66       },
 67       {
 68         "labelName": "作工精緻"
 69       },
 70       {
 71         "labelName": "反應快"
 72       },
 73       {
 74         "labelName": "系統流暢"
 75       }
 76     ]
 77   },
 78   {
 79     "referenceName": "【券後低至5388】Apple iPhone XR 64GB 黑色 移動聯通電信4G手機 雙卡雙待",
 80     "creationTime": "2019-02-21 12:45:22",
 81     "content": "物流很棒,xr價格能夠接受、性能穩定!",
 82     "label": []
 83   },
 84   {
 85     "referenceName": "Apple iPhone XR 64GB 黑色 移動聯通電信4G全網通手機 雙卡雙待",
 86     "creationTime": "2019-02-22 12:14:55",
 87     "content": "很不錯的手機除了有點厚,極致的體驗,黑邊徹底沒有影響",
 88     "label": [
 89       {
 90         "labelName": "拍照效果好"
 91       },
 92       {
 93         "labelName": "待機時間長"
 94       },
 95       {
 96         "labelName": "電池耐用"
 97       },
 98       {
 99         "labelName": "性價比高 "
100       }
101     ]
102   },
103   {
104     "referenceName": "Apple iPhone XR 64GB 黑色 移動聯通電信4G手機 雙卡雙待",
105     "creationTime": "2019-02-13 00:28:03",
106     "content": "很是很是好的商品。很不錯,下次再來",
107     "label": [
108       {
109         "labelName": "信號穩定"
110       },
111       {
112         "labelName": "反應快"
113       },
114       {
115         "labelName": "聲音大"
116       },
117       {
118         "labelName": "作工精緻"
119       }
120     ]
121   },
122   {
123     "referenceName": "Apple iPhone XR 64GB 黑色 移動聯通電信4G 手機",
124     "creationTime": "2019-04-02 17:29:43",
125     "content": "此用戶沒有填寫評價內容",
126     "label": []
127   }
128 ]
129 [
130   {
131     "referenceName": "Apple iPhone XR 64GB 黑色 移動聯通電信4G全網通手機 雙卡雙待",
132     "creationTime": "2019-04-05 18:13:14",
133     "content": "滿意嘻嘻",
134     "label": [
135       {
136         "labelName": "音質好"
137       },
138       {
139         "labelName": "拍照效果好"
140       },
141       {
142         "labelName": "功能齊全"
143       },
144       {
145         "labelName": "外觀漂亮"
146       }
147     ]
148   },
149   {
150     "referenceName": "Apple iPhone XR 64GB 黑色 移動聯通電信4G全網通手機 雙卡雙待",
151     "creationTime": "2019-03-21 00:13:17",
152     "content": "棒棒噠",
153     "label": []
154   },
155   {
156     "referenceName": "【雙12爆款】Apple iPhone XR 64GB 黑色 移動聯通電信4G手機 雙卡雙待",
157     "creationTime": "2019-01-19 10:23:57",
158     "content": "雙十二買的正好遇上手機壞了就買了xr太貴沒捨得買一直用蘋果的系統用順手了不肯意換",
159     "label": [
160       {
161         "labelName": "反應快"
162       },
163       {
164         "labelName": "作工精緻"
165       },
166       {
167         "labelName": "信號穩定"
168       },
169       {
170         "labelName": "待機時間長"
171       },
172       {
173         "labelName": "性價比通常般"
174       }
175     ]
176   },
177   {
178     "referenceName": "Apple iPhone XR 64GB 黑色 移動聯通電信4G全網通手機 雙卡雙待",
179     "creationTime": "2019-03-21 12:47:34",
180     "content": "用了幾天感受還能夠,只是信號不是很好,整體上是能夠的",
181     "label": [
182       {
183         "labelName": "音質好"
184       },
185       {
186         "labelName": "分辨率高"
187       }
188     ]
189   },
190   {
191     "referenceName": "Apple iPhone XR 64GB 黑色 移動聯通電信4G全網通手機 雙卡雙待",
192     "creationTime": "2019-03-09 08:32:15",
193     "content": "蘋果手機作工精細,手感不錯,外觀設計也是頗有時尚感!系統運行十分流暢,操做體驗不錯;功能齊全,屏幕分辨率高,整體來講很滿意!",
194     "label": [
195       {
196         "labelName": "作工精緻"
197       },
198       {
199         "labelName": "系統流暢"
200       },
201       {
202         "labelName": "功能齊全"
203       }
204     ]
205   },
206   {
207     "referenceName": "【低至5399】Apple iPhone XR 64GB 黑色 移動聯通電信4G手機 雙卡雙待",
208     "creationTime": "2019-01-15 22:44:53",
209     "content": "真心喜歡,一直在徘徊x.仍是xr.我的以爲真不錯,一直信賴蘇寧,新機爲激活,價錢能接受,值得推薦,黑邊什麼的不影響。",
210     "label": [
211       {
212         "labelName": "拍照效果好"
213       },
214       {
215         "labelName": "外觀漂亮"
216       },
217       {
218         "labelName": "屏幕清晰"
219       }
220     ]
221   },
222   {
223     "referenceName": "Apple iPhone XR 64GB 黑色 移動聯通電信4G全網通手機 雙卡雙待",
224     "creationTime": "2019-03-08 22:12:18",
225     "content": "手機運行流暢,外觀也漂亮,黑色彰顯檔次,音質也好,又是雙卡雙待手機,性價比不錯,比起XS便宜很多。",
226     "label": [
227       {
228         "labelName": "外觀漂亮"
229       },
230       {
231         "labelName": "系統流暢"
232       }
233     ]
234   },
235   {
236     "referenceName": "Apple iPhone XR 64GB 黑色 移動聯通電信4G手機 雙卡雙待",
237     "creationTime": "2019-03-01 15:54:33",
238     "content": "手機很好,電池很是抗用,價錢也很是美麗,值得購買。",
239     "label": [
240       {
241         "labelName": "系統流暢"
242       },
243       {
244         "labelName": "電池耐用"
245       },
246       {
247         "labelName": "分辨率高"
248       },
249       {
250         "labelName": "待機時間長"
251       },
252       {
253         "labelName": "包裝通常"
254       }
255     ]
256   },
257   {
258     "referenceName": "Apple iPhone XR 64GB 黑色 移動聯通電信4G手機 雙卡雙待",
259     "creationTime": "2019-02-19 09:37:25",
260     "content": "春節期間配送超快,手機沒有任何問題,蘇寧易購確實作到了全網最低價",
261     "label": []
262   },
263   {
264     "referenceName": "Apple iPhone XR 64GB 黑色 移動聯通電信4G全網通手機 雙卡雙待",
265     "creationTime": "2019-02-23 13:27:15",
266     "content": "挺好的懶得拍照了借了幾張黑邊確實大",
267     "label": [
268       {
269         "labelName": "待機時間長"
270       },
271       {
272         "labelName": "反應快"
273       },
274       {
275         "labelName": "作工精緻"
276       }
277     ]
278   }
279 ]
View Code

數據是json數組,所以採用fastjson進行解析,爲了最終存數據的更方便,須要構造Comment類json

依賴api

 1         <dependency>
 2             <groupId>com.alibaba</groupId>
 3             <artifactId>fastjson</artifactId>
 4             <version>1.2.47</version>
 5         </dependency>
 6         
 7         <dependency>
 8             <groupId>junit</groupId>
 9             <artifactId>junit</artifactId>
10             <version>4.9</version>
11         </dependency>
12     
13     
14         <!-- hbase -->
15         <!-- <dependency>
16             <groupId>org.apache.hbase</groupId>
17             <artifactId>hbase-client</artifactId>
18             <version>2.0.2</version>
19         </dependency>
20         <dependency>
21             <groupId>org.apache.hbase</groupId>
22             <artifactId>hbase-server</artifactId>
23             <version>2.0.2</version>
24         </dependency>
25         <dependency>
26             <groupId>org.apache.hbase</groupId>
27             <artifactId>hbase-common</artifactId>
28             <version>2.0.2</version>
29         </dependency> -->
30         
31         
32         <!-- phoenix -->
33         <dependency>
34             <groupId>org.apache.phoenix</groupId>
35             <artifactId>phoenix-core</artifactId>
36             <version>5.0.0-HBase-2.0</version>
37         </dependency>
38         
39          
40         <!-- phoenix_spark -->
41          <dependency>
42             <groupId>org.apache.phoenix</groupId>
43             <artifactId>phoenix-spark</artifactId>
44             <version>5.0.0-HBase-2.0</version>
45         </dependency>
46          
47          
48          <!-- spark -->
49         <dependency>
50             <groupId>org.apache.spark</groupId>
51             <artifactId>spark-core_2.11</artifactId>
52             <version>2.0.2</version>
53         </dependency>
54           
55           
56         <dependency>
57             <groupId>org.apache.spark</groupId>
58             <artifactId>spark-sql_2.11</artifactId>
59             <version>2.0.2</version>
60         </dependency>

 Comment.java數組

 1 package cn.tele.bean;  2 
 3 /**
 4  *  5  * @author Tele  6  *  7  */
 8 public class Comment {  9     private Integer id; 10     private String name; 11     private String content; 12     private String creationtime; 13     private String label; 14     private String platform; 15 
16     public Integer getId() { 17         return id; 18  } 19 
20     public void setId(Integer id) { 21         this.id = id; 22  } 23 
24     public String getName() { 25         return name; 26  } 27 
28     public void setName(String name) { 29         this.name = name; 30  } 31 
32     public String getContent() { 33         return content; 34  } 35 
36     public void setContent(String content) { 37         this.content = content; 38  } 39 
40     public String getCreationtime() { 41         return creationtime; 42  } 43 
44     public void setCreationtime(String creationtime) { 45         this.creationtime = creationtime; 46  } 47 
48     public String getLabel() { 49         return label; 50  } 51 
52     public void setLabel(String label) { 53         this.label = label; 54  } 55 
56     public String getPlatform() { 57         return platform; 58  } 59 
60     public void setPlatform(String platform) { 61         this.platform = platform; 62  } 63 }

StoreData.javasession

 1 package cn.tele.spark;  2 
 3 import java.io.IOException;  4 import java.nio.file.FileVisitResult;  5 import java.nio.file.Files;  6 import java.nio.file.Path;  7 import java.nio.file.Paths;  8 import java.nio.file.SimpleFileVisitor;  9 import java.nio.file.attribute.BasicFileAttributes;  10 import java.sql.SQLException;  11 import java.util.ArrayList;  12 import java.util.HashMap;  13 import java.util.Iterator;  14 import java.util.List;  15 import java.util.Map;  16 import java.util.Map.Entry;  17 import java.util.Set;  18 import java.util.UUID;  19 import org.apache.spark.SparkConf;  20 import org.apache.spark.api.java.JavaRDD;  21 import org.apache.spark.api.java.JavaSparkContext;  22 import org.apache.spark.api.java.function.FlatMapFunction;  23 import org.apache.spark.api.java.function.Function;  24 import org.apache.spark.api.java.function.Function2;  25 import org.apache.spark.sql.Dataset;  26 import org.apache.spark.sql.Row;  27 import org.apache.spark.sql.SaveMode;  28 import org.apache.spark.sql.SparkSession;  29 import org.apache.spark.storage.StorageLevel;  30 import com.alibaba.fastjson.JSON;  31 import com.alibaba.fastjson.JSONArray;  32 import com.alibaba.fastjson.JSONObject;  33 import cn.tele.bean.Comment;  34 import scala.Tuple2;  35 
 36 /**
 37  * 存儲爬取的評論到hbase中  38  *  39  * @author Tele  40  *  41  */
 42 public class StoreData {  43     private static SparkConf conf = new SparkConf().setAppName("storedata").setMaster("local").set("spark.serializer",  44             "org.apache.spark.serializer.KryoSerializer");  45     private static JavaSparkContext jsc = new JavaSparkContext(conf);  46     private static SparkSession session = new SparkSession(jsc.sc());  47     static {  48         // 註冊
 49         conf.registerKryoClasses(new Class[] { Comment.class });  50  }  51     // 連接信息
 52     private static final String DB_PHOENIX_URL = "jdbc:phoenix:hadoop002,hadoop003,hadoop004";  53     /*
 54  * private static final String DB_PHOENIX_DRIVER =  55  * "org.apache.phoenix.jdbc.PhoenixDriver"; private static final String  56  * DB_PHOENIX_USER = ""; private static final String DB_PHOENIX_PASS = "";  57  * private static final String DB_PHOENIX_FETCHSIZE = "10000";  58      */
 59 
 60     public static void main(String[] args) throws SQLException {  61 
 62         // 遍歷文件夾
 63         Path path = Paths.get("F:\\comment\\");  64 
 65         try {  66             MyFileVisitor myFileVisitor = new MyFileVisitor();  67  Files.walkFileTree(path, myFileVisitor);  68             List<Map<String, Object>> list = myFileVisitor.getData();  69             JavaRDD<Comment> commentRDD = getCommentRDD(list);  70             // 存儲至hbase
 71  storeData(commentRDD);  72 
 73         } catch (IOException e) {  74  e.printStackTrace();  75  }  76 
 77         // 讀取數據
 78         /*
 79  * JavaRDD<String> rdd =  80  * jsc.textFile("file:\\F:\\comment\\sn_comment\\iphonexr-2019-04-16-18-27-36\\"  81  * ); List<Comment> commentList = getCommentList(rdd,"iphonexr");  82          */
 83 
 84  jsc.close();  85 
 86  }  87 
 88     private static int storeData(JavaRDD<Comment> rdd) {  89         int successCount = 0;  90         /*
 91  * DataTypes.createStructType(Arrays.asList(  92  * DataTypes.createStructField("id",DataTypes.IntegerType,false),  93  * DataTypes.createStructField("name",DataTypes.StringType,false),  94  * DataTypes.createStructField("content",DataTypes.StringType,false),  95  * DataTypes.createStructField("creationtime",DataTypes.StringType,false),  96  * DataTypes.createStructField("label",DataTypes.StringType,true),  97  * DataTypes.createStructField("platform",DataTypes.StringType,false) ));  98          */
 99 
100         Dataset<Row> ds = session.createDataFrame(rdd, Comment.class); 101         ds.write().mode(SaveMode.Overwrite).format("org.apache.phoenix.spark").option("zkUrl", DB_PHOENIX_URL) 102                 .option("table", "comment").save(); 103  ; 104 
105         return successCount; 106  } 107 
108     @SuppressWarnings("unchecked") 109     private static <U> JavaRDD<Comment> getCommentRDD(List<Map<String, Object>> list) { 110         JavaRDD<Map<String, Object>> originalRDD = jsc.parallelize(list); 111         JavaRDD<List<Comment>> listCommentRDD = originalRDD.map(new Function<Map<String, Object>, List<Comment>>() { 112 
113             private static final long serialVersionUID = 1L; 114 
115             List<Comment> dataList = new ArrayList<>(); 116 
117             public List<Comment> call(Map<String, Object> v1) throws Exception { 118                 Set<Entry<String, Object>> entrySet = v1.entrySet(); 119                 Iterator<Entry<String, Object>> iterator = entrySet.iterator(); 120                 while (iterator.hasNext()) { 121                     Entry<String, Object> entry = iterator.next(); 122                     String referenceName = entry.getKey(); 123                     String platform = referenceName.split("#")[0]; 124                     List<Comment> commentList = (List<Comment>) entry.getValue(); 125                     commentList.forEach(cm -> { 126  cm.setPlatform(platform); 127  dataList.add(cm); 128  }); 129                     println(referenceName + "評論量------------------:" + commentList.size()); 130  } 131                 return dataList; 132  } 133  }).persist(StorageLevel.MEMORY_ONLY()); 134 
135         JavaRDD<Comment> commentRDD = listCommentRDD.flatMap(new FlatMapFunction<List<Comment>, Comment>() { 136 
137             private static final long serialVersionUID = 1L; 138 
139  @Override 140             public Iterator<Comment> call(List<Comment> t) throws Exception { 141                 return t.iterator(); 142  } 143  }); 144 
145         long totalSize = commentRDD.count(); 146         println("評論總量:-----------" + totalSize); 147 
148         // 設置id
149         JavaRDD<Comment> resultRDD = commentRDD.zipWithIndex().map(new Function<Tuple2<Comment, Long>, Comment>() { 150 
151             private static final long serialVersionUID = 1L; 152 
153  @Override 154             public Comment call(Tuple2<Comment, Long> v1) throws Exception { 155  v1._1.setId(Integer.valueOf(v1._2.toString())); 156                 return v1._1; 157  } 158  }); 159         return resultRDD; 160  } 161 
162     private static List<Comment> getCommentList(JavaRDD<String> rdd, String referenceName) { 163         List<Comment> commentList = new ArrayList<Comment>(); 164 
165         String originalStr = rdd.reduce(new Function2<String, String, String>() { 166             private static final long serialVersionUID = 1L; 167 
168             public String call(String v1, String v2) throws Exception { 169                 return v1.trim() + v2.trim(); 170  } 171  }); 172         String uuid = UUID.randomUUID().toString(); 173         originalStr = originalStr.replace("][", "]" + uuid + "["); 174 
175         // 解析json
176         String[] pages = originalStr.split(uuid); 177         for (String page : pages) { 178             try { 179                 JSONArray jsonArray = JSON.parseArray(page); 180                 for (Object obj : jsonArray) { 181                     JSONObject jsonObject = (JSONObject) obj; 182 // String referenceName = jsonObject.getString("referenceName");
183                     String creationTime = jsonObject.getString("creationTime"); 184                     String content = jsonObject.getString("content"); 185                     println("referenceName:" + referenceName); 186                     println("creationTime:" + creationTime); 187                     println("content:" + content); 188 
189                     // 封裝
190                     Comment comment = new Comment(); 191  comment.setName(referenceName); 192  comment.setCreationtime(creationTime); 193  comment.setContent(content); 194 
195                     JSONArray labelArray = jsonObject.getJSONArray("label"); 196                     if (labelArray != null) { 197                         String label = ""; 198                         for (Object labelObj : labelArray) { 199                             JSONObject labelObject = (JSONObject) labelObj; 200                             label += labelObject.getString("labelName") + "#"; 201  } 202  comment.setLabel(label); 203                         println("label:" + label); 204  } 205  commentList.add(comment); 206  } 207             } catch (Exception e) { 208                 continue; 209  } 210  } 211         return commentList; 212  } 213 
214     private static class MyFileVisitor extends SimpleFileVisitor<Path> { 215         private List<Map<String, Object>> list = new ArrayList<Map<String, Object>>(); 216         String platform = null; 217 
218  @Override 219         public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException { 220             println("當前訪問的文件夾是------" + dir.toAbsolutePath() /* + dir.getFileName() */); 221             String fileName = dir.getFileName().toString(); 222             if (fileName.contains("_")) { 223                 platform = fileName.split("_")[0]; 224  } 225             if (platform != null) { 226                 if (fileName.contains("-")) { 227                     String referenceName = fileName.split("-")[0]; 228                     JavaRDD<String> rdd = jsc.textFile(dir.toAbsolutePath().toString()); 229                     List<Comment> commentList = getCommentList(rdd, referenceName); 230                     Map<String, Object> map = new HashMap<String, Object>(); 231                     // 平臺_品牌--評論
232                     map.put(platform + "#" + referenceName, commentList); 233  list.add(map); 234  } 235  } 236 
237             return super.preVisitDirectory(dir, attrs); 238  } 239 
240  @Override 241         public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { 242             return super.visitFile(file, attrs); 243  } 244 
245  @Override 246         public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException { 247             return super.visitFileFailed(file, exc); 248  } 249 
250  @Override 251         public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { 252             return super.postVisitDirectory(dir, exc); 253  } 254 
255         public List<Map<String, Object>> getData() { 256             return list; 257  } 258 
259  } 260 
261     private static void println(Object obj) { 262  System.out.println(obj); 263  } 264 
265 }

可能會報以下異常,但並不影響,彷佛和我使用的hbase版本有關app

在終端進行查看dom

說明:

1.StorgeData還能夠再進行優化,好比解析json的時候能夠直接構造rdd而不是用list,也能夠改形成集羣上運行的版本,但因爲個人數據量很少,直接用本地模式就足夠了

2.SaveMode只能是SaveMode.Overwrite其餘模式phoenix都不支持,實際測試時發現仍是append

3.生成id時用了zipWithIndex,但連續的id容易形成集羣熱點問題,使用phoenix建表時最好加鹽

4.與phoenix交互時也能夠用spark的jdbc,能夠參考http://www.javashuo.com/article/p-otlcstfw-nq.html

5.關於目錄問題,spark支持對目錄下的多個文件進行讀取構造rdd,所以遍歷時到父文件夾便可,固然,遍歷到每一個文件也能夠

附測試用例:

 1 /** 
 2  *  3  *@author Tele  4  *測試spark與phoenix集成  5  */
 6 public class SparkPhoneix {  7     private static SparkConf conf = new SparkConf().setAppName("demo").setMaster("local");  8     private static JavaSparkContext jsc = new JavaSparkContext(conf);  9     private static SparkSession session = new SparkSession(jsc.sc()); 10     
11     //連接信息
12     private static final String DB_PHOENIX_URL = "jdbc:phoenix:hadoop002,hadoop003,hadoop004"; 13     /*private static final String DB_PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver"; 14  private static final String DB_PHOENIX_USER = ""; 15  private static final String DB_PHOENIX_PASS = ""; 16  private static final String DB_PHOENIX_FETCHSIZE = "10000";*/
17     
18     public static void main(String[] args) { 19          Dataset<Row> ds = session.read().format("org.apache.phoenix.spark") 20                                         .option("zkUrl", DB_PHOENIX_URL) 21                                         .option("table", "TEST") 22  .load(); 23          ds.createOrReplaceTempView("test"); 24          
25  ds.show(); 26          
27          
28          //插入數據測試_SALT,ID,INFO.ITEM,INFO.CONTENT,INFO.LABEL
29         StructType schema = DataTypes.createStructType(Arrays.asList( 30                     DataTypes.createStructField("id",DataTypes.IntegerType,false), 31                     DataTypes.createStructField("item",DataTypes.StringType,false), 32                     DataTypes.createStructField("content",DataTypes.StringType,false), 33                     DataTypes.createStructField("label",DataTypes.StringType,true) 34  )); 35         
36         
37         // 建立數據
38         List<Row> list = new ArrayList<Row>(); 39         Row row1 = RowFactory.create(3,"iphone","不錯",null); 40  list.add(row1); 41         
42         Dataset<Row> dataset = session.createDataFrame(list,schema); 43         //對於phoenix只能使用overwrite模式,但實際操做時發現是append數據
44         dataset.write().mode(SaveMode.Overwrite).format("org.apache.phoenix.spark") 45                                          .option("zkUrl", DB_PHOENIX_URL) 46                                          .option("table", "TEST").save();; 47  ds.show(); 48  session.stop(); 49  jsc.close(); 50  } 51 }
相關文章
相關標籤/搜索