InfluxDB工具類java
1 package com.influxdb.test; 2 3 import java.util.Map; 4 5 import org.influxdb.InfluxDB; 6 import org.influxdb.InfluxDBFactory; 7 import org.influxdb.dto.Point; 8 import org.influxdb.dto.Point.Builder; 9 import org.influxdb.dto.Query; 10 import org.influxdb.dto.QueryResult; 11 import org.springframework.util.StringUtils; 12 13 /** 14 * 時序數據庫 InfluxDB 鏈接 15 * @author Dai_LW 16 * 17 */ 18 public class InfluxDBConnect { 19 20 private String username;//用戶名 21 private String password;//密碼 22 private String openurl;//鏈接地址 23 private String database;//數據庫 24 25 private InfluxDB influxDB; 26 27 28 public InfluxDBConnect(String username, String password, String openurl, String database){ 29 this.username = username; 30 this.password = password; 31 this.openurl = openurl; 32 this.database = database; 33 } 34 35 /**鏈接時序數據庫;得到InfluxDB**/ 36 public InfluxDB influxDbBuild(){ 37 if(influxDB == null){ 38 influxDB = InfluxDBFactory.connect(openurl, username, password); 39 influxDB.createDatabase(database); 40 41 } 42 return influxDB; 43 } 44 45 /** 46 * 設置數據保存策略 47 * defalut 策略名 /database 數據庫名/ 30d 數據保存時限30天/ 1 副本個數爲1/ 結尾DEFAULT 表示 設爲默認的策略 48 */ 49 public void createRetentionPolicy(){ 50 String command = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT", 51 "defalut", database, "30d", 1); 52 this.query(command); 53 } 54 55 /** 56 * 查詢 57 * @param command 查詢語句 58 * @return 59 */ 60 public QueryResult query(String command){ 61 return influxDB.query(new Query(command, database)); 62 } 63 64 /** 65 * 插入 66 * @param measurement 表 67 * @param tags 標籤 68 * @param fields 字段 69 */ 70 public void insert(String measurement, Map<String, String> tags, Map<String, Object> fields){ 71 Builder builder = Point.measurement(measurement); 72 builder.tag(tags); 73 builder.fields(fields); 74 75 influxDB.write(database, "", builder.build()); 76 } 77 78 /** 79 * 刪除 80 * @param command 刪除語句 81 * @return 返回錯誤信息 82 */ 83 public String deleteMeasurementData(String command){ 84 QueryResult result = influxDB.query(new Query(command, database)); 85 return result.getError(); 86 } 87 88 /** 89 * 建立數據庫 90 * @param dbName 91 */ 92 public void createDB(String dbName){ 93 influxDB.createDatabase(dbName); 94 } 95 96 /** 97 * 刪除數據庫 98 * @param dbName 99 */ 100 public void deleteDB(String dbName){ 101 influxDB.deleteDatabase(dbName); 102 } 103 104 public String getUsername() { 105 return username; 106 } 107 108 public void setUsername(String username) { 109 this.username = username; 110 } 111 112 public String getPassword() { 113 return password; 114 } 115 116 public void setPassword(String password) { 117 this.password = password; 118 } 119 120 public String getOpenurl() { 121 return openurl; 122 } 123 124 public void setOpenurl(String openurl) { 125 this.openurl = openurl; 126 } 127 128 public void setDatabase(String database) { 129 this.database = database; 130 } 131 }
測試類spring
1 package com.influxdb.test; 2 3 import java.util.ArrayList; 4 import java.util.Date; 5 import java.util.HashMap; 6 import java.util.List; 7 import java.util.Map; 8 9 import junit.framework.Assert; 10 11 import org.influxdb.dto.QueryResult; 12 import org.influxdb.dto.QueryResult.Result; 13 import org.influxdb.dto.QueryResult.Series; 14 import org.junit.Before; 15 import org.junit.Test; 16 import org.springframework.beans.BeanWrapperImpl; 17 18 import com.influxdb.pojo.CodeInfo; 19 20 public class InfluxDBTest { 21 22 private InfluxDBConnect influxDB; 23 private String username = "admin";//用戶名 24 private String password = "admin";//密碼 25 private String openurl = "http://127.0.0.1:8086";//鏈接地址 26 private String database = "test_db";//數據庫 27 private String measurement = "sys_code"; 28 29 @Before 30 public void setUp(){ 31 //建立 鏈接 32 influxDB = new InfluxDBConnect(username, password, openurl, database); 33 34 influxDB.influxDbBuild(); 35 36 influxDB.createRetentionPolicy(); 37 38 // influxDB.deleteDB(database); 39 // influxDB.createDB(database); 40 } 41 42 @Test 43 public void testInsert(){//測試數據插入 44 Map<String, String> tags = new HashMap<String, String>(); 45 Map<String, Object> fields = new HashMap<String, Object>(); 46 List<CodeInfo> list = new ArrayList<CodeInfo>(); 47 48 CodeInfo info1 = new CodeInfo(); 49 info1.setId(1L); 50 info1.setName("BANKS"); 51 info1.setCode("ABC"); 52 info1.setDescr("中國農業銀行"); 53 info1.setDescrE("ABC"); 54 info1.setCreatedBy("system"); 55 info1.setCreatedAt(new Date().getTime()); 56 57 CodeInfo info2 = new CodeInfo(); 58 info2.setId(2L); 59 info2.setName("BANKS"); 60 info2.setCode("CCB"); 61 info2.setDescr("中國建設銀行"); 62 info2.setDescrE("CCB"); 63 info2.setCreatedBy("system"); 64 info2.setCreatedAt(new Date().getTime()); 65 66 list.add(info1); 67 list.add(info2); 68 69 for(CodeInfo info : list){ 70 71 tags.put("TAG_CODE", info.getCode()); 72 tags.put("TAG_NAME", info.getName()); 73 74 fields.put("ID", info.getId()); 75 fields.put("NAME", info.getName()); 76 fields.put("CODE", info.getCode()); 77 fields.put("DESCR", info.getDescr()); 78 fields.put("DESCR_E", info.getDescrE()); 79 fields.put("CREATED_BY", info.getCreatedBy()); 80 fields.put("CREATED_AT", info.getCreatedAt()); 81 82 influxDB.insert(measurement, tags, fields); 83 } 84 } 85 86 @Test 87 public void testQuery(){//測試數據查詢 88 String command = "select * from sys_code"; 89 QueryResult results = influxDB.query(command); 90 91 if(results.getResults() == null){ 92 return; 93 } 94 List<CodeInfo> lists = new ArrayList<CodeInfo>(); 95 for (Result result : results.getResults()) { 96 97 List<Series> series= result.getSeries(); 98 for (Series serie : series) { 99 // Map<String, String> tags = serie.getTags(); 100 List<List<Object>> values = serie.getValues(); 101 List<String> columns = serie.getColumns(); 102 103 lists.addAll(getQueryData(columns, values)); 104 } 105 } 106 107 Assert.assertTrue((!lists.isEmpty())); 108 Assert.assertEquals(2, lists.size()); 109 } 110 111 @Test 112 public void testQueryWhere(){//tag 列名 區分大小寫 113 String command = "select * from sys_code where TAG_CODE='ABC'"; 114 QueryResult results = influxDB.query(command); 115 116 if(results.getResults() == null){ 117 return; 118 } 119 List<CodeInfo> lists = new ArrayList<CodeInfo>(); 120 for (Result result : results.getResults()) { 121 122 List<Series> series= result.getSeries(); 123 for (Series serie : series) { 124 List<List<Object>> values = serie.getValues(); 125 List<String> columns = serie.getColumns(); 126 127 lists.addAll(getQueryData(columns, values)); 128 } 129 } 130 131 Assert.assertTrue((!lists.isEmpty())); 132 Assert.assertEquals(1, lists.size()); 133 134 CodeInfo info = lists.get(0); 135 136 Assert.assertEquals(info.getCode(), "ABC"); 137 138 } 139 140 @Test 141 public void deletMeasurementData(){ 142 String command = "delete from sys_code where TAG_CODE='ABC'"; 143 String err = influxDB.deleteMeasurementData(command); 144 Assert.assertNull(err); 145 } 146 147 /***整理列名、行數據***/ 148 private List<CodeInfo> getQueryData(List<String> columns, List<List<Object>> values){ 149 List<CodeInfo> lists = new ArrayList<CodeInfo>(); 150 151 for (List<Object> list : values) { 152 CodeInfo info = new CodeInfo(); 153 BeanWrapperImpl bean = new BeanWrapperImpl(info); 154 for(int i=0; i< list.size(); i++){ 155 156 String propertyName = setColumns(columns.get(i));//字段名 157 Object value = list.get(i);//相應字段值 158 bean.setPropertyValue(propertyName, value); 159 } 160 161 lists.add(info); 162 } 163 164 return lists; 165 } 166 167 /***轉義字段***/ 168 private String setColumns(String column){ 169 String[] cols = column.split("_"); 170 StringBuffer sb = new StringBuffer(); 171 for(int i=0; i< cols.length; i++){ 172 String col = cols[i].toLowerCase(); 173 if(i != 0){ 174 String start = col.substring(0, 1).toUpperCase(); 175 String end = col.substring(1).toLowerCase(); 176 col = start + end; 177 } 178 sb.append(col); 179 } 180 return sb.toString(); 181 } 182 }