一、pom文件添加java
<parent>
<artifactId>ma-base</artifactId>
<groupId>com...</groupId>
<version>5.0.0-SNAPSHOT</version>
</parent>
添加依賴
<dependency>
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
<version>2.15</version>
</dependency>
二、influxDB工具類
package com.meicloud.mp.mas.core.influx;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class InfluxDBUtils implements InitializingBean {
private static InfluxDBConnect influxDB;
@Value("${common.mas.influx.url}")
private String openurl = "http://127.0.0.1:8086";//鏈接地址
@Value("${common.mas.influx.username}")
private String username = "root";//用戶名
@Value("${common.mas.influx.password}")
private String password = "root";//密碼
@Value("${common.mas.influx.database}")
private String database = "polutondb";//數據庫
@Value("${common.mas.influx.data.keepTime:1}")
private int dataKeepTime;
private void setUp(){
//建立 鏈接
influxDB = new InfluxDBConnect(username, password, openurl, database);
try {
influxDB.influxDbBuild();
} catch (Exception e) {
throw new RuntimeException("init influxDB failed!", e);
}
if(influxDB != null) {
influxDB.createRetentionPolicy(dataKeepTime);
log.info("init influxDB successfully....");
}else{
throw new RuntimeException("init influxDB failed!");
}
}
public InfluxDBConnect getConnect() {
if(influxDB == null)
{
synchronized (InfluxDBUtils.class){
if (influxDB == null){
setUp();
}
}
}
return influxDB;
}
@Override
public void afterPropertiesSet() throws Exception {
setUp();
}
}
ps:基本配置能夠走配置文件
三、InfluxDBConnect 鏈接類
package com.meicloud.mp.mas.core.influx;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.Point;
import org.influxdb.dto.Point.Builder;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import java.util.Map;
/**
* 時序數據庫 InfluxDB 鏈接
*/
public class InfluxDBConnect {
private String username;// 用戶名
private String password;// 密碼
private String openurl;// 鏈接地址
private String database;// 數據庫
private InfluxDB influxDB;
public InfluxDBConnect(String username, String password, String openurl,
String database) {
this.username = username;
this.password = password;
this.openurl = openurl;
this.database = database;
}
/** 鏈接時序數據庫;得到InfluxDB **/
public InfluxDB influxDbBuild() {
if (influxDB == null) {
influxDB = InfluxDBFactory.connect(openurl, username, password);
//influxDB.createDatabase(database);
}
return influxDB;
}
/**
* 設置數據保存策略 defalut 策略名 /database 數據庫名/ 30d 數據保存時限30天/ 1 副本個數爲1/ 結尾DEFAULT
* 表示 設爲默認的策略
*/
public void createRetentionPolicy() {
String command = String
.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT",
"defalut", database, "30d", 1);
this.query(command);
}
/**
* 查詢
*
* @param command
* 查詢語句
* @return
*/
public QueryResult query(String command) {
return influxDB.query(new Query(command, database));
}
/**
* 插入
*
* @param measurement
* 表
* @param tags
* 標籤
* @param fields
* 字段
*/
public void insert(String measurement, Map<String, String> tags,
Map<String, Object> fields) {
Builder builder = Point.measurement(measurement);
builder.tag(tags);
builder.fields(fields);
influxDB.write(database, "", builder.build());
}
/**
* 刪除
*
* @param command
* 刪除語句
* @return 返回錯誤信息
*/
public String deleteMeasurementData(String command) {
QueryResult result = influxDB.query(new Query(command, database));
return result.getError();
}
/**
* 建立數據庫
*
* @param dbName
*/
public void createDB(String dbName) {
influxDB.createDatabase(dbName);
}
/**
* 刪除數據庫
*
* @param dbName
*/
public void deleteDB(String dbName) {
influxDB.deleteDatabase(dbName);
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getOpenurl() {
return openurl;
}
public void setOpenurl(String openurl) {
this.openurl = openurl;
}
public void setDatabase(String database) {
this.database = database;
}
}
四、InfluxCallErrorInfoService 接口類
public interface InfluxCallErrorInfoService {
boolean insert(List<CallErrorInfo> data);
boolean insertOne(ErrorServerLog errorServerLog);
List<CallErrorInfo> testQuery();
List<CallErrorInfo> query(String alias, Long begin, Long end);
int count(String alias, Long begin, Long end);
int delete(String alias, Long begin, Long end);
}
四、InfluxCallErrorInfoService 接口類
package com.meicloud.mp.mas.core.influx.service.impl;import com.meicloud.mp.mas.core.influx.InfluxDBConnect;import com.meicloud.mp.mas.core.influx.InfluxDBUtils;import com.meicloud.mp.mas.core.influx.bo.CallErrorInfo;import com.meicloud.mp.mas.core.influx.bo.ErrorServerLog;import com.meicloud.mp.mas.core.influx.bo.InfluxDbQueryResult;import com.meicloud.mp.mas.core.influx.service.InfluxCallErrorInfoService;import org.influxdb.dto.Point;import org.influxdb.dto.Query;import org.influxdb.dto.QueryResult;import org.influxdb.impl.InfluxDBResultMapper;import org.influxdb.querybuilder.BuiltQuery;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.util.CollectionUtils;import java.text.SimpleDateFormat;import java.util.*;/** * */@Service("influxCallErrorInfoService")public class InfluxCallErrorInfoServiceImpl implements InfluxCallErrorInfoService { private final static Logger logger = LoggerFactory.getLogger(InfluxCallErrorInfoServiceImpl.class); @Autowired InfluxDBUtils influxUtil; @Override public boolean insert(List<CallErrorInfo> list) { InfluxDBConnect influxDBConnect = influxUtil.getConnect(); if(influxDBConnect==null) { logger.error("influxDBConnect get null, failed"); return false; } Map<String, String> tags = new HashMap<>(); Map<String, Object> fields = new HashMap<>(); for (CallErrorInfo info : list) { tags.put(CallErrorInfo.Tag_InterfaceCode, info.getInterfacecode()); tags.put(CallErrorInfo.Tag_Syscode, info.getSyscode()); fields.put(CallErrorInfo.Field_Syscode, info.getSyscode()); fields.put(CallErrorInfo.Field_InterfaceCode, info.getInterfacecode()); fields.put(CallErrorInfo.Field_FailedTimeMill, (info.getFailedtimemill() + "i")); try { influxDBConnect.insert(CallErrorInfo.measurement, tags, fields); } catch (Exception e) { e.printStackTrace(); } } return true; } @Override public boolean insertOne(ErrorServerLog errorServerLog) { Point point = Point.measurementByPOJO(errorServerLog.getClass()).addFieldsFromPOJO(errorServerLog).build(); InfluxDBConnect influxDBConnect = influxUtil.getConnect(); if(influxDBConnect==null) { logger.error("influxDBConnect get null, failed"); return false; } try{ influxDBConnect.insert(point); return true; }catch (Exception e){ logger.error("insert into influxDB failed!", e); } return false; } @Override public List<CallErrorInfo> testQuery() { List<CallErrorInfo> lists = new ArrayList<>(); InfluxDBConnect influxDBConnect = influxUtil.getConnect(); if(influxDBConnect==null) { logger.error("influxDBConnect get null, failed"); return lists; } String command = "select * from " + CallErrorInfo.measurement; QueryResult results = influxDBConnect.query(command); if (results.getResults() == null) { return lists; } for (QueryResult.Result result : results.getResults()) { List<QueryResult.Series> series = result.getSeries(); if(series==null) { continue; } for (QueryResult.Series serie : series) { // Map<String, String> tags = serie.getTags(); List<List<Object>> values = serie.getValues(); List<String> columns = serie.getColumns(); lists.addAll(CallErrorInfo.getQueryData(columns, values)); } } return lists; } @Override public List<CallErrorInfo> query(String alias, Long begin, Long end) { List<CallErrorInfo> lists = new ArrayList<>(); InfluxDBConnect influxDBConnect = influxUtil.getConnect(); if(influxDBConnect==null) { logger.error("influxDBConnect get null, failed"); return lists; } String command = "select * from " + CallErrorInfo.measurement + " where " + CallErrorInfo.Tag_InterfaceCode + "='" + alias + "'"; if(begin!=null) { command += " and time>'" + timestampToTimez(begin) + "'"; } if(end!=null) { command += " and time<'" + timestampToTimez(end) + "'"; } QueryResult results = influxDBConnect.query(command); if (results.getResults() == null) { return lists; } for (QueryResult.Result result : results.getResults()) { List<QueryResult.Series> series = result.getSeries(); if(series==null) { continue; } for (QueryResult.Series serie : series) { // Map<String, String> tags = serie.getTags(); List<List<Object>> values = serie.getValues(); List<String> columns = serie.getColumns(); lists.addAll(CallErrorInfo.getQueryData(columns, values)); } } return lists; } @Override public int count(String alias, Long begin, Long end) { InfluxDBConnect influxDBConnect = influxUtil.getConnect(); if(influxDBConnect==null) { logger.error("influxDBConnect get null, failed"); return -1; } Query query = BuiltQuery.QueryBuilder.select().count(CallErrorInfo.Field_InterfaceCode).as("count") .from("mas", CallErrorInfo.measurement) .where(BuiltQuery.QueryBuilder.eq(CallErrorInfo.Tag_InterfaceCode, alias)) .and(BuiltQuery.QueryBuilder.gte(CallErrorInfo.Field_FailedTimeMill, begin)) .and(BuiltQuery.QueryBuilder.lte(CallErrorInfo.Field_FailedTimeMill, end)); QueryResult results = influxDBConnect.query(query); if (results.getResults() == null) { return 0; } QueryResult.Result result = results.getResults().get(0); if (!CollectionUtils.isEmpty(result.getSeries())){ QueryResult.Series series = result.getSeries().get(0); if (null != series){ List<String> columns = series.getColumns(); List<List<Object>> values = series.getValues(); if (!CollectionUtils.isEmpty(columns) && !CollectionUtils.isEmpty(values)){ for (List<Object> listVal : values) { for (int i = 0; i < columns.size(); i++) { if ("count".equalsIgnoreCase(columns.get(i))) { String sCt = listVal.get(i).toString(); Double ct = Double.parseDouble(sCt); return ct.intValue(); } } } } } } return 0; } @Override public int delete(String alias, Long begin, Long end) { InfluxDBConnect influxDBConnect = influxUtil.getConnect(); if(influxDBConnect==null) { logger.error("influxDBConnect get null, failed"); return -1; } String command = "delete from " + CallErrorInfo.measurement + " where " + CallErrorInfo.Tag_InterfaceCode + "='" + alias + "'"; if(begin!=null) { command += " and time>'" + timestampToTimez(begin) + "'"; } if(end!=null) { command += " and time<'" + timestampToTimez(end) + "'"; } String err = influxDBConnect.deleteMeasurementData(command); logger.info("delete :" + err); return 0; } public static final String DEFAULT_DATETIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss'Z'"; /** * 轉換timestamp到 influx的timez字符串格式 * * @param timestamp * @return */ static String timestampToTimez(Long timestamp) { Date date = new Date(); date.setTime(timestamp); String sTime = new SimpleDateFormat(DEFAULT_DATETIME_FORMAT).format(date); return sTime; } public static void main(String[] argv) { Long timestamp = 1561289474000L; String sTime = timestampToTimez(timestamp); System.out.println(sTime); timestamp = 1561289474L; sTime = timestampToTimez(timestamp); System.out.println(sTime); }}