Java版InfluxDB工具類

InfluxDB工具類java

  1 package com.influxdb.test;
  2  
  3 import java.util.Map;
  4  
  5 import org.influxdb.InfluxDB;
  6 import org.influxdb.InfluxDBFactory;
  7 import org.influxdb.dto.Point;
  8 import org.influxdb.dto.Point.Builder;
  9 import org.influxdb.dto.Query;
 10 import org.influxdb.dto.QueryResult;
 11 import org.springframework.util.StringUtils;
 12  
 13 /**
 14  * 時序數據庫 InfluxDB 鏈接
 15  * @author Dai_LW
 16  *
 17  */
 18 public class InfluxDBConnect {
 19     
 20     private String username;//用戶名
 21     private String password;//密碼
 22     private String openurl;//鏈接地址
 23     private String database;//數據庫
 24     
 25     private InfluxDB influxDB;
 26     
 27     
 28     public InfluxDBConnect(String username, String password, String openurl, String database){
 29         this.username = username;
 30         this.password = password;
 31         this.openurl = openurl;
 32         this.database = database;
 33     }
 34     
 35     /**鏈接時序數據庫;得到InfluxDB**/
 36     public InfluxDB  influxDbBuild(){
 37         if(influxDB == null){
 38             influxDB = InfluxDBFactory.connect(openurl, username, password);
 39             influxDB.createDatabase(database);
 40             
 41         }
 42         return influxDB;
 43     }
 44     
 45     /**
 46      * 設置數據保存策略
 47      * defalut 策略名 /database 數據庫名/ 30d 數據保存時限30天/ 1  副本個數爲1/ 結尾DEFAULT 表示 設爲默認的策略
 48      */
 49     public void createRetentionPolicy(){
 50         String command = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT",
 51                 "defalut", database, "30d", 1);
 52         this.query(command);
 53     }
 54     
 55     /**
 56      * 查詢
 57      * @param command 查詢語句
 58      * @return
 59      */
 60     public QueryResult query(String command){
 61         return influxDB.query(new Query(command, database));
 62     }
 63     
 64     /**
 65      * 插入
 66      * @param measurement 表
 67      * @param tags 標籤
 68      * @param fields 字段
 69      */
 70     public void insert(String measurement, Map<String, String> tags, Map<String, Object> fields){
 71         Builder builder = Point.measurement(measurement);
 72         builder.tag(tags);
 73         builder.fields(fields);
 74         
 75         influxDB.write(database, "", builder.build());
 76     }
 77     
 78     /**
 79      * 刪除
 80      * @param command 刪除語句
 81      * @return 返回錯誤信息
 82      */
 83     public String deleteMeasurementData(String command){
 84         QueryResult result = influxDB.query(new Query(command, database));
 85         return result.getError();
 86     }
 87     
 88     /**
 89      * 建立數據庫
 90      * @param dbName
 91      */
 92     public void createDB(String dbName){
 93         influxDB.createDatabase(dbName);
 94     }
 95     
 96     /**
 97      * 刪除數據庫
 98      * @param dbName
 99      */
100     public void deleteDB(String dbName){
101         influxDB.deleteDatabase(dbName);
102     }
103     
104     public String getUsername() {
105         return username;
106     }
107  
108     public void setUsername(String username) {
109         this.username = username;
110     }
111  
112     public String getPassword() {
113         return password;
114     }
115  
116     public void setPassword(String password) {
117         this.password = password;
118     }
119  
120     public String getOpenurl() {
121         return openurl;
122     }
123  
124     public void setOpenurl(String openurl) {
125         this.openurl = openurl;
126     }
127  
128     public void setDatabase(String database) {
129         this.database = database;
130     }
131 }

測試類spring

  1 package com.influxdb.test;
  2  
  3 import java.util.ArrayList;
  4 import java.util.Date;
  5 import java.util.HashMap;
  6 import java.util.List;
  7 import java.util.Map;
  8  
  9 import junit.framework.Assert;
 10  
 11 import org.influxdb.dto.QueryResult;
 12 import org.influxdb.dto.QueryResult.Result;
 13 import org.influxdb.dto.QueryResult.Series;
 14 import org.junit.Before;
 15 import org.junit.Test;
 16 import org.springframework.beans.BeanWrapperImpl;
 17  
 18 import com.influxdb.pojo.CodeInfo;
 19  
 20 public class InfluxDBTest {
 21     
 22     private InfluxDBConnect influxDB;
 23     private String username = "admin";//用戶名
 24     private String password = "admin";//密碼
 25     private String openurl = "http://127.0.0.1:8086";//鏈接地址
 26     private String database = "test_db";//數據庫
 27     private String measurement = "sys_code";
 28     
 29     @Before
 30     public void setUp(){
 31         //建立 鏈接
 32         influxDB = new InfluxDBConnect(username, password, openurl, database);
 33         
 34         influxDB.influxDbBuild();
 35         
 36         influxDB.createRetentionPolicy();
 37         
 38 //        influxDB.deleteDB(database);
 39 //        influxDB.createDB(database);
 40     }
 41  
 42     @Test
 43     public void testInsert(){//測試數據插入
 44         Map<String, String> tags = new HashMap<String, String>();
 45         Map<String, Object> fields = new HashMap<String, Object>();
 46         List<CodeInfo> list = new ArrayList<CodeInfo>();
 47         
 48         CodeInfo info1 = new CodeInfo();
 49         info1.setId(1L);
 50         info1.setName("BANKS");
 51         info1.setCode("ABC");
 52         info1.setDescr("中國農業銀行");
 53         info1.setDescrE("ABC");
 54         info1.setCreatedBy("system");
 55         info1.setCreatedAt(new Date().getTime());
 56         
 57         CodeInfo info2 = new CodeInfo();
 58         info2.setId(2L);
 59         info2.setName("BANKS");
 60         info2.setCode("CCB");
 61         info2.setDescr("中國建設銀行");
 62         info2.setDescrE("CCB");
 63         info2.setCreatedBy("system");
 64         info2.setCreatedAt(new Date().getTime());
 65         
 66         list.add(info1);
 67         list.add(info2);
 68         
 69         for(CodeInfo info : list){
 70             
 71             tags.put("TAG_CODE", info.getCode());
 72             tags.put("TAG_NAME", info.getName());
 73             
 74             fields.put("ID", info.getId());
 75             fields.put("NAME", info.getName());
 76             fields.put("CODE", info.getCode());
 77             fields.put("DESCR", info.getDescr());
 78             fields.put("DESCR_E", info.getDescrE());
 79             fields.put("CREATED_BY", info.getCreatedBy());
 80             fields.put("CREATED_AT", info.getCreatedAt());
 81             
 82             influxDB.insert(measurement, tags, fields);
 83         }
 84     }
 85     
 86     @Test
 87     public void testQuery(){//測試數據查詢
 88         String command = "select * from sys_code";
 89         QueryResult results = influxDB.query(command);
 90         
 91         if(results.getResults() == null){
 92             return;
 93         }
 94         List<CodeInfo> lists = new ArrayList<CodeInfo>();
 95         for (Result result : results.getResults()) {
 96             
 97             List<Series> series= result.getSeries();
 98             for (Series serie : series) {
 99 //                Map<String, String> tags = serie.getTags();
100                 List<List<Object>>  values = serie.getValues();
101                 List<String> columns = serie.getColumns();
102                 
103                 lists.addAll(getQueryData(columns, values));
104             }
105         }
106         
107         Assert.assertTrue((!lists.isEmpty()));
108         Assert.assertEquals(2, lists.size());
109     }
110     
111     @Test
112     public void testQueryWhere(){//tag 列名 區分大小寫
113         String command = "select * from sys_code where TAG_CODE='ABC'";
114         QueryResult results = influxDB.query(command);
115         
116         if(results.getResults() == null){
117             return;
118         }
119         List<CodeInfo> lists = new ArrayList<CodeInfo>();
120         for (Result result : results.getResults()) {
121             
122             List<Series> series= result.getSeries();
123             for (Series serie : series) {
124                 List<List<Object>>  values = serie.getValues();
125                 List<String> columns = serie.getColumns();
126                 
127                 lists.addAll(getQueryData(columns, values));
128             }
129         }
130         
131         Assert.assertTrue((!lists.isEmpty()));
132         Assert.assertEquals(1, lists.size());
133         
134         CodeInfo info = lists.get(0);
135         
136         Assert.assertEquals(info.getCode(), "ABC");
137         
138     }
139     
140     @Test
141     public void deletMeasurementData(){
142         String command = "delete from sys_code where TAG_CODE='ABC'";
143         String err = influxDB.deleteMeasurementData(command);
144         Assert.assertNull(err);
145     }
146     
147     /***整理列名、行數據***/
148     private List<CodeInfo> getQueryData(List<String> columns, List<List<Object>>  values){
149         List<CodeInfo> lists = new ArrayList<CodeInfo>();
150         
151         for (List<Object> list : values) {
152             CodeInfo info = new CodeInfo();
153             BeanWrapperImpl bean = new BeanWrapperImpl(info);
154             for(int i=0; i< list.size(); i++){
155                 
156                 String propertyName = setColumns(columns.get(i));//字段名
157                 Object value = list.get(i);//相應字段值
158                 bean.setPropertyValue(propertyName, value);
159             }
160             
161             lists.add(info);
162         }
163         
164         return lists;
165     }
166     
167     /***轉義字段***/
168     private String setColumns(String column){
169         String[] cols = column.split("_");
170         StringBuffer sb = new StringBuffer();
171         for(int i=0; i< cols.length; i++){
172             String col = cols[i].toLowerCase();
173             if(i != 0){
174                 String start = col.substring(0, 1).toUpperCase();
175                 String end = col.substring(1).toLowerCase();
176                 col = start + end;
177             }
178             sb.append(col);
179         }
180         return sb.toString();
181     }
182 }
相關文章
相關標籤/搜索