influxDB 增長

一、pom文件添加java

<parent>
<artifactId>ma-base</artifactId>
<groupId>com...</groupId>
<version>5.0.0-SNAPSHOT</version>
</parent>
添加依賴
<dependency>
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
<version>2.15</version>
</dependency>

二、influxDB工具類
package com.meicloud.mp.mas.core.influx;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class InfluxDBUtils implements InitializingBean {

private static InfluxDBConnect influxDB;

@Value("${common.mas.influx.url}")
private String openurl = "http://127.0.0.1:8086";//鏈接地址

@Value("${common.mas.influx.username}")
private String username = "root";//用戶名

@Value("${common.mas.influx.password}")
private String password = "root";//密碼

@Value("${common.mas.influx.database}")
private String database = "polutondb";//數據庫

@Value("${common.mas.influx.data.keepTime:1}")
private int dataKeepTime;

private void setUp(){
//建立 鏈接
influxDB = new InfluxDBConnect(username, password, openurl, database);

try {
influxDB.influxDbBuild();
} catch (Exception e) {
throw new RuntimeException("init influxDB failed!", e);
}

if(influxDB != null) {
influxDB.createRetentionPolicy(dataKeepTime);
log.info("init influxDB successfully....");
}else{
throw new RuntimeException("init influxDB failed!");
}
}

public InfluxDBConnect getConnect() {
if(influxDB == null)
{
synchronized (InfluxDBUtils.class){
if (influxDB == null){
setUp();
}
}
}
return influxDB;
}

@Override
public void afterPropertiesSet() throws Exception {
setUp();
}
}

ps:基本配置能夠走配置文件

三、InfluxDBConnect 鏈接類
package com.meicloud.mp.mas.core.influx;

import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.Point;
import org.influxdb.dto.Point.Builder;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;

import java.util.Map;

/**
* 時序數據庫 InfluxDB 鏈接
*/
public class InfluxDBConnect {

private String username;// 用戶名
private String password;// 密碼
private String openurl;// 鏈接地址
private String database;// 數據庫

private InfluxDB influxDB;

public InfluxDBConnect(String username, String password, String openurl,
String database) {
this.username = username;
this.password = password;
this.openurl = openurl;
this.database = database;
}

/** 鏈接時序數據庫;得到InfluxDB **/
public InfluxDB influxDbBuild() {
if (influxDB == null) {
influxDB = InfluxDBFactory.connect(openurl, username, password);
//influxDB.createDatabase(database);

}
return influxDB;
}

/**
* 設置數據保存策略 defalut 策略名 /database 數據庫名/ 30d 數據保存時限30天/ 1 副本個數爲1/ 結尾DEFAULT
* 表示 設爲默認的策略
*/
public void createRetentionPolicy() {
String command = String
.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT",
"defalut", database, "30d", 1);
this.query(command);
}

/**
* 查詢
*
* @param command
* 查詢語句
* @return
*/
public QueryResult query(String command) {
return influxDB.query(new Query(command, database));
}

/**
* 插入
*
* @param measurement
* 表
* @param tags
* 標籤
* @param fields
* 字段
*/
public void insert(String measurement, Map<String, String> tags,
Map<String, Object> fields) {
Builder builder = Point.measurement(measurement);
builder.tag(tags);
builder.fields(fields);

influxDB.write(database, "", builder.build());
}

/**
* 刪除
*
* @param command
* 刪除語句
* @return 返回錯誤信息
*/
public String deleteMeasurementData(String command) {
QueryResult result = influxDB.query(new Query(command, database));
return result.getError();
}

/**
* 建立數據庫
*
* @param dbName
*/
public void createDB(String dbName) {
influxDB.createDatabase(dbName);
}

/**
* 刪除數據庫
*
* @param dbName
*/
public void deleteDB(String dbName) {
influxDB.deleteDatabase(dbName);
}

public String getUsername() {
return username;
}

public void setUsername(String username) {
this.username = username;
}

public String getPassword() {
return password;
}

public void setPassword(String password) {
this.password = password;
}

public String getOpenurl() {
return openurl;
}

public void setOpenurl(String openurl) {
this.openurl = openurl;
}

public void setDatabase(String database) {
this.database = database;
}
}
四、InfluxCallErrorInfoService 接口類
public interface InfluxCallErrorInfoService {

boolean insert(List<CallErrorInfo> data);

boolean insertOne(ErrorServerLog errorServerLog);

List<CallErrorInfo> testQuery();

List<CallErrorInfo> query(String alias, Long begin, Long end);

int count(String alias, Long begin, Long end);

int delete(String alias, Long begin, Long end);
}

四、InfluxCallErrorInfoService 接口類
package com.meicloud.mp.mas.core.influx.service.impl;import com.meicloud.mp.mas.core.influx.InfluxDBConnect;import com.meicloud.mp.mas.core.influx.InfluxDBUtils;import com.meicloud.mp.mas.core.influx.bo.CallErrorInfo;import com.meicloud.mp.mas.core.influx.bo.ErrorServerLog;import com.meicloud.mp.mas.core.influx.bo.InfluxDbQueryResult;import com.meicloud.mp.mas.core.influx.service.InfluxCallErrorInfoService;import org.influxdb.dto.Point;import org.influxdb.dto.Query;import org.influxdb.dto.QueryResult;import org.influxdb.impl.InfluxDBResultMapper;import org.influxdb.querybuilder.BuiltQuery;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.util.CollectionUtils;import java.text.SimpleDateFormat;import java.util.*;/** * */@Service("influxCallErrorInfoService")public class InfluxCallErrorInfoServiceImpl implements InfluxCallErrorInfoService {    private final static Logger logger = LoggerFactory.getLogger(InfluxCallErrorInfoServiceImpl.class);    @Autowired    InfluxDBUtils influxUtil;    @Override    public boolean insert(List<CallErrorInfo> list) {        InfluxDBConnect influxDBConnect = influxUtil.getConnect();        if(influxDBConnect==null) {            logger.error("influxDBConnect get null, failed");            return false;        }        Map<String, String> tags = new HashMap<>();        Map<String, Object> fields = new HashMap<>();        for (CallErrorInfo info : list) {            tags.put(CallErrorInfo.Tag_InterfaceCode, info.getInterfacecode());            tags.put(CallErrorInfo.Tag_Syscode, info.getSyscode());            fields.put(CallErrorInfo.Field_Syscode, info.getSyscode());            fields.put(CallErrorInfo.Field_InterfaceCode, info.getInterfacecode());            fields.put(CallErrorInfo.Field_FailedTimeMill, (info.getFailedtimemill() + "i"));            try {                influxDBConnect.insert(CallErrorInfo.measurement, tags, fields);            } catch (Exception e) {                e.printStackTrace();            }        }        return true;    }    @Override    public boolean insertOne(ErrorServerLog errorServerLog) {        Point point = Point.measurementByPOJO(errorServerLog.getClass()).addFieldsFromPOJO(errorServerLog).build();        InfluxDBConnect influxDBConnect = influxUtil.getConnect();        if(influxDBConnect==null) {            logger.error("influxDBConnect get null, failed");            return false;        }        try{            influxDBConnect.insert(point);            return true;        }catch (Exception e){            logger.error("insert into influxDB failed!", e);        }        return false;    }    @Override    public List<CallErrorInfo> testQuery() {        List<CallErrorInfo> lists = new ArrayList<>();        InfluxDBConnect influxDBConnect = influxUtil.getConnect();        if(influxDBConnect==null) {            logger.error("influxDBConnect get null, failed");            return lists;        }        String command = "select * from " + CallErrorInfo.measurement;        QueryResult results = influxDBConnect.query(command);        if (results.getResults() == null) {            return lists;        }        for (QueryResult.Result result : results.getResults()) {            List<QueryResult.Series> series = result.getSeries();            if(series==null) {                continue;            }            for (QueryResult.Series serie : series) {                // Map<String, String> tags = serie.getTags();                List<List<Object>> values = serie.getValues();                List<String> columns = serie.getColumns();                lists.addAll(CallErrorInfo.getQueryData(columns, values));            }        }        return lists;    }    @Override    public List<CallErrorInfo> query(String alias, Long begin, Long end) {        List<CallErrorInfo> lists = new ArrayList<>();        InfluxDBConnect influxDBConnect = influxUtil.getConnect();        if(influxDBConnect==null) {            logger.error("influxDBConnect get null, failed");            return lists;        }        String command = "select * from " + CallErrorInfo.measurement + " where " + CallErrorInfo.Tag_InterfaceCode + "='" + alias + "'";        if(begin!=null) {            command += " and time>'" + timestampToTimez(begin) + "'";        }        if(end!=null) {            command += " and time<'" + timestampToTimez(end) + "'";        }        QueryResult results = influxDBConnect.query(command);        if (results.getResults() == null) {            return lists;        }        for (QueryResult.Result result : results.getResults()) {            List<QueryResult.Series> series = result.getSeries();            if(series==null) {                continue;            }            for (QueryResult.Series serie : series) {                // Map<String, String> tags = serie.getTags();                List<List<Object>> values = serie.getValues();                List<String> columns = serie.getColumns();                lists.addAll(CallErrorInfo.getQueryData(columns, values));            }        }        return lists;    }    @Override    public int count(String alias, Long begin, Long end) {        InfluxDBConnect influxDBConnect = influxUtil.getConnect();        if(influxDBConnect==null) {            logger.error("influxDBConnect get null, failed");            return -1;        }        Query query = BuiltQuery.QueryBuilder.select().count(CallErrorInfo.Field_InterfaceCode).as("count")                .from("mas", CallErrorInfo.measurement)                .where(BuiltQuery.QueryBuilder.eq(CallErrorInfo.Tag_InterfaceCode, alias))                .and(BuiltQuery.QueryBuilder.gte(CallErrorInfo.Field_FailedTimeMill, begin))                .and(BuiltQuery.QueryBuilder.lte(CallErrorInfo.Field_FailedTimeMill, end));        QueryResult results = influxDBConnect.query(query);        if (results.getResults() == null) {            return 0;        }        QueryResult.Result result = results.getResults().get(0);        if (!CollectionUtils.isEmpty(result.getSeries())){            QueryResult.Series series = result.getSeries().get(0);            if (null != series){                List<String> columns = series.getColumns();                List<List<Object>> values = series.getValues();                if (!CollectionUtils.isEmpty(columns) && !CollectionUtils.isEmpty(values)){                    for (List<Object> listVal : values) {                        for (int i = 0; i < columns.size(); i++) {                            if ("count".equalsIgnoreCase(columns.get(i))) {                                String sCt = listVal.get(i).toString();                                Double ct = Double.parseDouble(sCt);                                return ct.intValue();                            }                        }                    }                }            }        }        return 0;    }    @Override    public int delete(String alias, Long begin, Long end) {        InfluxDBConnect influxDBConnect = influxUtil.getConnect();        if(influxDBConnect==null) {            logger.error("influxDBConnect get null, failed");            return -1;        }        String command = "delete from " + CallErrorInfo.measurement + " where " + CallErrorInfo.Tag_InterfaceCode + "='" + alias + "'";        if(begin!=null) {            command += " and time>'" + timestampToTimez(begin) + "'";        }        if(end!=null) {            command += " and time<'" + timestampToTimez(end) + "'";        }        String err = influxDBConnect.deleteMeasurementData(command);        logger.info("delete :" + err);        return 0;    }    public static final String DEFAULT_DATETIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss'Z'";    /**     * 轉換timestamp到 influx的timez字符串格式     *     * @param timestamp     * @return     */    static String timestampToTimez(Long timestamp)    {        Date date = new Date();        date.setTime(timestamp);        String sTime = new SimpleDateFormat(DEFAULT_DATETIME_FORMAT).format(date);        return sTime;    }    public static void main(String[] argv) {        Long timestamp = 1561289474000L;        String sTime = timestampToTimez(timestamp);        System.out.println(sTime);        timestamp = 1561289474L;        sTime = timestampToTimez(timestamp);        System.out.println(sTime);    }}
相關文章
相關標籤/搜索