赞
踩
java语言往infxludb插入数据,主要是看官网示例:
https://github.com/influxdata/influxdb-client-java/tree/master/client#management-api
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.22</version>
</dependency>
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb-client-java</artifactId>
<version>6.10.0</version>
</dependency>
@Data
public class InfluxDbConfig {
private String dbUrl;
private char[] token;
private String bucket;
private String org;
private int dataBatchSize;
private int bufferLimit;
}
package com.dameng.sql_realtime_parse_tool.domain.echarts.sql; import com.influxdb.annotations.Column; import com.influxdb.annotations.Measurement; import lombok.Data; import java.time.Instant; @Data @Measurement(name = "SqlEntity") public class SqlEntity { @Column(tag = true) private String execSqlType; @Column (tag = true) private String execSqlExecTimeRange; @Column(timestamp = true) private Instant sqlRecordTime; @Column private String sessStr; @Column private String thrdStr; @Column private String userStr; @Column private String trxIdStr; @Column private String stmtStr; @Column private String ipStr; //@Column //(tag = true) //判断是参数类型 还是SQL语句 //private String entityType; @Column //这个ID是编码后的唯一ID private String execSqlId; @Column private String appnameStr; @Column private String execSqlStr; @Column private String paramsId; @Column private String paramsStr; @Column //判断参数状态 private Boolean paramsStatus; @Column//sql影响行数 private int sqlRowCount; @Column private int execSqlExecTime; }
@Service public class InfluxdbApi implements IInfluxApi { @Autowired private InfluxDbConfig influxDbConfig; @Override public List<BucketInfo> getBucketList() { List<BucketInfo> bucketList = new ArrayList<>(); try { String bucketUrl= influxDbConfig.getDbUrl()+"/api/v2/buckets"; String result2 = HttpRequest.get(bucketUrl) .header("Authorization", "Token "+ new String((influxDbConfig.getToken()))) .header("Accept","application/json") .header("Content-Type","application/json") .execute().body(); JSONObject jsonObject = JSONUtil.parseObj(result2); JSONArray buckets = jsonObject.getJSONArray("buckets"); for (int i = 0; i < buckets.size(); i++) { BucketInfo bucketInfo = new BucketInfo(); JSONObject entries = new JSONObject(buckets.get(i)); bucketInfo.setName((String)entries.get("name")); bucketInfo.setOrgID((String)entries.get("orgID")); bucketInfo.setId((String)entries.get("id")); bucketList.add(bucketInfo); } }catch (Exception e){ StaticLog.error(e,"get bucket list error "); } return bucketList; } @Override public BucketInfo getBucketInfoByName(String bucketName) { try { String bucketUrl= influxDbConfig.getDbUrl()+"/api/v2/buckets?name="+bucketName; String result2 = HttpRequest.get(bucketUrl) .header("Authorization", "Token "+ new String((influxDbConfig.getToken()))) .header("Accept","application/json") .header("Content-Type","application/json") .execute().body(); JSONObject jsonObject = JSONUtil.parseObj(result2); JSONArray buckets = jsonObject.getJSONArray("buckets"); for (int i = 0; i < buckets.size(); i++) { BucketInfo bucketInfo = new BucketInfo(); JSONObject entries = new JSONObject(buckets.get(i)); bucketInfo.setName((String)entries.get("name")); bucketInfo.setOrgID((String)entries.get("orgID")); bucketInfo.setId((String)entries.get("id")); return bucketInfo; } }catch (Exception e){ StaticLog.error(e,"get bucket list error "); } return null; } @Override public void createBucketInfoByName(String bucketName) { try { //获取orgid BucketInfo bucketInfoByName = getBucketInfoByName("_monitoring"); RetentionRules expire = RetentionRules.builder().type("expire").everySeconds(RetentionTime_30_DAY).shardGroupDuration(0).build(); List<RetentionRules> retentionRules = new ArrayList<>(); retentionRules.add(expire); BucketInfo bucketInfo = BucketInfo.builder().name(bucketName).description("api create bucket").orgID(bucketInfoByName.getOrgID()).retentionRules(retentionRules).build(); String bucketUrl= influxDbConfig.getDbUrl()+"/api/v2/buckets"; String result2 = HttpRequest.post(bucketUrl) .header("Authorization", "Token "+ new String((influxDbConfig.getToken()))) .header("Content-Type","application/json") .body(JSONUtil.toJsonStr(bucketInfo)) .timeout(20000)//超时,毫秒 .execute().body(); JSONObject jsonObject = JSONUtil.parseObj(result2); StaticLog.info("create bucket {} success",bucketName); }catch (Exception e){ StaticLog.error(e,"get bucket list error "); } } }
package com.dameng.sql_realtime_parse_tool.core.engine.influxdb; import cn.hutool.core.util.NumberUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.log.StaticLog; import com.dameng.sql_realtime_parse_tool.constant.CommonConstant; import com.dameng.sql_realtime_parse_tool.constant.sql.SqlConstant; import com.dameng.sql_realtime_parse_tool.domain.DateRange; import com.dameng.sql_realtime_parse_tool.domain.TableDataBean; import com.dameng.sql_realtime_parse_tool.domain.TableParamBean; import com.dameng.sql_realtime_parse_tool.domain.dbms.SqlStatsBean; import com.dameng.sql_realtime_parse_tool.domain.influxdb.InfluxDbConfig; import com.dameng.sql_realtime_parse_tool.tool.core.DateTimeUtils; import com.influxdb.client.*; import com.influxdb.client.domain.InfluxQLQuery; import com.influxdb.client.domain.WritePrecision; import com.influxdb.client.write.events.WriteErrorEvent; import com.influxdb.client.write.events.WriteSuccessEvent; import com.influxdb.query.InfluxQLQueryResult; import java.math.BigDecimal; import java.time.Instant; import java.util.ArrayList; import java.util.List; /** * InfluxDB数据库连接操作类 * <p> * https://github.com/influxdata/influxdb-client-java/tree/master/client#asynchronous-non-blocking-api * */ public class InfluxDBConnection { private static String dbUrl; private static char[] dbToken; private static String dbOrg; private static String dbBucket; private static int dataBatchSize; private static int bufferLimit; private static InfluxDBClient influxDBClient; private static WriteApi writeApi; public InfluxDBConnection(String dbUrl, char[] dbToken, String dbOrg, String dbBucket) { this.dbUrl = dbUrl; this.dbToken = dbToken; this.dbOrg = dbOrg; this.dbBucket = dbBucket; } /** * 测试连接是否正常 * * @return true 正常 */ public static boolean ping() { try { return influxDBClient.ping(); } catch (Exception e) { StaticLog.error("influxdb connect error", e.getMessage()); return false; } } public static boolean pingAndCloseClient(InfluxDbConfig influxDbConfig) { InfluxDBClient influxDbClient = null; try { influxDbClient = getInfluxDbClient(influxDbConfig); return influxDbClient.ping(); } catch (Exception e) { StaticLog.error("influxdb connect error", e.getMessage()); return false; } finally { if (influxDbClient != null) { influxDbClient.close(); } } } public static InfluxDBClient getInfluxDbClient(InfluxDbConfig influxDbConfig) { return InfluxDBClientFactory.create(influxDbConfig.getDbUrl(), influxDbConfig.getToken(), influxDbConfig.getOrg(), influxDbConfig.getBucket()); } public static boolean initInfluxDbClient(InfluxDbConfig influxDbConfig) { try { //判断链接是否存活,没有问题的话,直接返回 if (influxDBClient != null && ping()) { return true; } // 此方法仅是创建连接,并不会对连接进行探活 influxDBClient = getInfluxDbClient(influxDbConfig); // 对连接进行探活 if (!ping()) { return false; } dbUrl = influxDbConfig.getDbUrl(); dbToken = influxDbConfig.getToken(); dbOrg = influxDbConfig.getOrg(); dbBucket = influxDbConfig.getBucket(); dataBatchSize = influxDbConfig.getDataBatchSize(); bufferLimit = influxDbConfig.getBufferLimit(); } catch (Exception e) { StaticLog.error(e, "create influxDb conn fail,"); return false; } return true; } /* 获取writeApi对象*/ public static WriteApi getWriteApi() { if (writeApi == null) { //设置属性 WriteOptions build = WriteOptions.builder().batchSize(dataBatchSize).bufferLimit(bufferLimit).build(); writeApi = influxDBClient.makeWriteApi(build); //写入成功后的回调 writeApi.listenEvents(WriteSuccessEvent.class, event -> { // String data = event.getLineProtocol(); StaticLog.info("batch size write influxdb success"); }); //写入失败后的回调 writeApi.listenEvents(WriteErrorEvent.class, event -> { Throwable exception = event.getThrowable(); StaticLog.error(exception, "batch size write influxdb error"); }); return writeApi; } else { return writeApi; } } /** * 异步写入 */ public static void writeDataByAsync(Object SqlEntity) { WriteApi writeApi = getWriteApi(); try { writeApi.writeMeasurement(WritePrecision.NS, SqlEntity); } catch (Exception e) { StaticLog.error(e, "write measurement error"); } } /** * 通知刷盘所有数据 */ public static void flush() { //刷新数据,避免关闭连接时还未刷盘的情况 //writeApi flushInterval属性 默认值是1S try { if (writeApi != null) { writeApi.flush(); Thread.sleep(CommonConstant.SLEEP_THREAD_TIME); } } catch (Exception e) { StaticLog.error(e, "influxdb client flush data error"); } } /** * 关闭Client */ public static void closeClient() { try { if (writeApi != null) { flush(); writeApi.close(); writeApi = null; } if (influxDBClient != null) { influxDBClient.close(); influxDBClient = null; } } catch (Exception e) { StaticLog.error(e, "clost influxdb client error"); } } /** * 查询 */ public static List<TableDataBean> queryDataBySql(InfluxDBClient influxDbClient, String sqlStr, String databaseStr) { InfluxQLQueryApi queryApi = influxDbClient.getInfluxQLQueryApi(); // send request InfluxQLQueryResult result = queryApi.query(new InfluxQLQuery(sqlStr, databaseStr).setPrecision(InfluxQLQuery.InfluxQLPrecision.MILLISECONDS), (columnName, rawValue, resultIndex, seriesName) -> { // convert columns switch (columnName) { case "time": return Instant.ofEpochMilli(Long.parseLong(rawValue)); case "first": return new BigDecimal(rawValue); case "execSqlStr": return rawValue; case "paramsStr": return new String(rawValue); case "sessStr": return new String(rawValue); case "thrdStr": return new String(rawValue); case "trxIdStr": return new String(rawValue); case "userStr": return new String(rawValue); case "stmtStr": return new String(rawValue); case "ipStr": return new String(rawValue); case "setParamsStr": return new Boolean(rawValue); case "execSqlExecTime": return new String(rawValue); case "appnameStr": return new String(rawValue); case "paramsStatus": return rawValue; case "execSqlId": return rawValue; case "paramsId": return rawValue; case "sqlRowCount": return rawValue; case "DATA_COUNT": return new String(rawValue); default: StaticLog.error("SQL查询未找到字段 {}", rawValue); return null; //throw new IllegalArgumentException("unexpected column " + columnName); } }); List<TableDataBean> tableDataBeanList = new ArrayList<>(); if(result==null||result.getResults().isEmpty() || result.getResults().get(0).getSeries().isEmpty()){ return tableDataBeanList; } for (InfluxQLQueryResult.Result resultResult : result.getResults()) { for (InfluxQLQueryResult.Series series : resultResult.getSeries()) { for (InfluxQLQueryResult.Series.Record record : series.getValues()) { TableDataBean tableDataBean = new TableDataBean(); tableDataBean.setExecSqlId((String) record.getValueByKey("execSqlId")); tableDataBean.setParamsId((String) record.getValueByKey("paramsId")); tableDataBean.setSessStr((String) record.getValueByKey("sessStr")); tableDataBean.setThrdStr((String) record.getValueByKey("thrdStr")); tableDataBean.setUserStr((String) record.getValueByKey("userStr")); tableDataBean.setTrxIdStr((String) record.getValueByKey("trxIdStr")); tableDataBean.setExecSqlStr((String) record.getValueByKey("execSqlStr")); tableDataBean.setStmtStr((String) record.getValueByKey("stmtStr")); tableDataBean.setIpStr((String) record.getValueByKey("ipStr")); tableDataBean.setParamsStatus((String) record.getValueByKey("paramsStatus")); tableDataBean.setParamsStr((String) record.getValueByKey("paramsStr")); tableDataBean.setSqlRecordTime(DateTimeUtils.convertToEasternEightZoneString((Instant)record.getValueByKey("time"))); tableDataBean.setExecSqlExecTime(Integer.parseInt((String) record.getValueByKey("execSqlExecTime"))); if(StrUtil.isNotBlank((CharSequence) record.getValueByKey("sqlRowCount"))){ tableDataBean.setSqlRowCount(Integer.parseInt((String) record.getValueByKey("sqlRowCount"))); }; tableDataBeanList.add(tableDataBean); } } } return tableDataBeanList; } public static SqlStatsBean querySqlDetailedList(InfluxDBClient influxDbClient, String sqlStr, TableParamBean tableParamBean) { InfluxQLQueryApi queryApi = influxDbClient.getInfluxQLQueryApi(); SqlStatsBean sqlStatsBean = new SqlStatsBean(); // send request InfluxQLQueryResult result = queryApi.query(new InfluxQLQuery(sqlStr, tableParamBean.getExecSqlDbName()).setPrecision(InfluxQLQuery.InfluxQLPrecision.MILLISECONDS), (columnName, rawValue, resultIndex, seriesName) -> { // convert columns switch (columnName) { case "time": return Instant.ofEpochMilli(Long.parseLong(rawValue)); case "SQL_CNT": return new String(rawValue); case "MAX_EXECTIME": return new String(rawValue); case "MIN_EXECTIME": return new String(rawValue); case "MEAN_EXECTIME": return new String(rawValue); default: StaticLog.error("SQL查询未找到字段 {}", rawValue); return null; } }); List<SqlStatsBean> sqlStatsBeanList = new ArrayList<>(); if(result==null||result.getResults().isEmpty() || result.getResults().get(0).getSeries().isEmpty()){ return null; } for (InfluxQLQueryResult.Result resultResult : result.getResults()) { for (InfluxQLQueryResult.Series series : resultResult.getSeries()) { for (InfluxQLQueryResult.Series.Record record : series.getValues()) { sqlStatsBean.setSqlRecordTimeStr(DateTimeUtils.convertToEasternEightZoneString((Instant)record.getValueByKey("time"))); sqlStatsBean.setExecSqlId(tableParamBean.getExecSqlId()); sqlStatsBean.setParamsId(tableParamBean.getParamsId()); sqlStatsBean.setCnt(parseStringToInt((String)record.getValueByKey("SQL_CNT"))); sqlStatsBean.setMaxExectime(parseStringToInt((String)record.getValueByKey("MAX_EXECTIME"))); sqlStatsBean.setMinExectime(parseStringToInt((String)record.getValueByKey("MIN_EXECTIME"))); sqlStatsBean.setAvgExectime(parseStringToInt((String)record.getValueByKey("MEAN_EXECTIME"))); sqlStatsBeanList.add(sqlStatsBean); } } } return sqlStatsBean; } public static int parseStringToInt(String str) { return (int)Float.parseFloat(str); /* int intValue = 0; if (str != null && !str.isEmpty()) { int dotIndex = str.indexOf("."); if (dotIndex != -1) { String intPart = str.substring(0, dotIndex); try { intValue = Integer.parseInt(intPart); } catch (NumberFormatException e) { StaticLog.error(e,"influxdb value convert fail, value is {}", str); return 0; } } } return intValue;*/ } public static int queryCountDataBySql(InfluxDBClient influxDbClient, String sqlStr, String databaseStr) { InfluxQLQueryApi queryApi = influxDbClient.getInfluxQLQueryApi(); // send request InfluxQLQueryResult result = queryApi.query(new InfluxQLQuery(sqlStr, databaseStr).setPrecision(InfluxQLQuery.InfluxQLPrecision.MILLISECONDS), (columnName, rawValue, resultIndex, seriesName) -> { // convert columns switch (columnName) { case "time": return Instant.ofEpochMilli(Long.parseLong(rawValue)); case "DATA_COUNT": return rawValue; default: throw new IllegalArgumentException("unexpected column " + columnName); } }); List<TableDataBean> tableDataBeanList = new ArrayList<>(); if(result==null||result.getResults().isEmpty() || result.getResults().get(0).getSeries().isEmpty()){ return 0; } for (InfluxQLQueryResult.Result resultResult : result.getResults()) { for (InfluxQLQueryResult.Series series : resultResult.getSeries()) { for (InfluxQLQueryResult.Series.Record record : series.getValues()) { return Integer.parseInt((String) record.getValueByKey("DATA_COUNT")) ; } } } return 0; } public static List<String> queryDbNameDataBySql(InfluxDBClient influxDbClient, String sqlStr, String databaseStr) { InfluxQLQueryApi queryApi = influxDbClient.getInfluxQLQueryApi(); // send request InfluxQLQueryResult result = queryApi.query(new InfluxQLQuery(sqlStr, databaseStr).setPrecision(InfluxQLQuery.InfluxQLPrecision.MILLISECONDS), (columnName, rawValue, resultIndex, seriesName) -> { // convert columns switch (columnName) { case "time": return Instant.ofEpochMilli(Long.parseLong(rawValue)); default: return rawValue; } }); List<String> tableNameList = new ArrayList<>(); for (InfluxQLQueryResult.Result resultResult : result.getResults()) { for (InfluxQLQueryResult.Series series : resultResult.getSeries()) { for (InfluxQLQueryResult.Series.Record record : series.getValues()) { if (record.getValues()[0].toString().startsWith(SqlConstant.INFLUXDB_BUCKET_PREFIX)) { tableNameList.add(record.getValues()[0].toString()); } } } } return tableNameList; } }
void getBucketList() { List<String> stringList = new ArrayList<>(); String bucketUrl= influxDbConfig.getDbUrl()+"/api/v2/buckets"; String result2 = HttpRequest.get(bucketUrl) .header("Authorization", "Token "+ new String((influxDbConfig.getToken()))) .header("Accept","application/json") .header("Content-Type","application/json") .execute().body(); JSONObject jsonObject = JSONUtil.parseObj(result2); JSONArray buckets = jsonObject.getJSONArray("buckets"); for (int i = 0; i < buckets.size(); i++) { JSONObject entries = new JSONObject(buckets.get(i)); String name = (String)entries.get("name"); stringList.add(name); } System.out.println(stringList); }
void createBucket() { //获取orgid BucketInfo bucketInfoByName = influxdbApi.getBucketInfoByName(influxDbConfig.getBucket()); RetentionRules expire = RetentionRules.builder().type("expire").everySeconds(RetentionTime_30_DAY).shardGroupDuration(0).build(); List<RetentionRules> retentionRules = new ArrayList<>(); retentionRules.add(expire); BucketInfo bucketInfo = BucketInfo.builder().name("sqllog_20240118").description("testsss").orgID(bucketInfoByName.getOrgID()).retentionRules(retentionRules).build(); List<String> stringList = new ArrayList<>(); String bucketUrl= influxDbConfig.getDbUrl()+"/api/v2/buckets"; String result2 = HttpRequest.post(bucketUrl) .header("Authorization", "Token "+ new String((influxDbConfig.getToken()))) // .header("Accept","application/json") .header("Content-Type","application/json") .body(JSONUtil.toJsonStr(bucketInfo)) // .form() .timeout(20000)//超时,毫秒 .execute().body(); JSONObject jsonObject = JSONUtil.parseObj(result2); JSONArray buckets = jsonObject.getJSONArray("buckets"); System.out.println(buckets); }
拼接sql语句,注意后缀的时区+分页逻辑
String fluxSql = handleFluxDbSql(tableParamBean, SQLTYPE_DATASQL); List<TableDataBean> tableDataBeanList = InfluxDBConnection.queryDataBySql(influxDbClient, fluxSql, tableParamBean.getExecSqlDbName()); //拼接SQL语句逻辑 private String handleFluxDbSql(TableParamBean tableParamBean, int sqlType) { //拼接SQL语句进行 //fluxSql = " execSqlType = 'SEL' and execSqlExecTime > 0 and time >= '2023-10-18 12:11:30.866' and time <= '2023-10-18 13:11:30.890' LIMIT "+tableParamBean.getLimit(); StringBuilder stringBuilder = new StringBuilder(); if (SQLTYPE_DATASQL == sqlType) { stringBuilder.append("select execSqlStr,execSqlExecTime,sessStr,thrdStr,userStr,trxIdStr,stmtStr,ipStr,paramsStatus,time,execSqlId,paramsId,paramsStr,sqlRowCount from SqlEntity where"); } else if (SQLTYPE_COUNTSQL == sqlType) { stringBuilder.append("select count(execSqlId) AS DATA_COUNT from SqlEntity where"); } String beginSearchTime = DateUtil.formatDateTime(tableParamBean.getBeginSearchTime()); String endSearchTime = DateUtil.formatDateTime(tableParamBean.getEndSearchTime()); /* beginSearchTime = "2023-10-18 12:11:30.866"; endSearchTime = "2023-10-18 13:11:30.890"; */ stringBuilder.append(" time >= '").append(beginSearchTime).append("'"); stringBuilder.append(" and time <= '").append(endSearchTime).append("'"); //处理其他条件 String whereStr = HandleData.handleCommonParams(tableParamBean); if (StrUtil.isNotEmpty(whereStr)) { stringBuilder.append(whereStr); } //分页条件 //假设前台传过来的页数字段是page,每页条数字段是rows,那么查询指定页指定条数可以这样写: //SELECT time,Field列 FROM measurement WHERE 时间范围 LIMIT rows OFFSET (page - 1)*rows if (SQLTYPE_DATASQL == sqlType) { stringBuilder.append(" LIMIT ").append(tableParamBean.getLimit()); stringBuilder.append(" OFFSET ").append((tableParamBean.getPage() - 1) * tableParamBean.getLimit()); } //添加时区 stringBuilder.append(" tz('Asia/Shanghai') "); StaticLog.debug("bucket is {},search sql is {}", tableParamBean.getExecSqlDbName(), stringBuilder.toString()); return stringBuilder.toString(); }
# SqlEntity为自定义封装实体类
SqlEntity sqlEntity = ParseLineBeanToSQLEntity.doTryParse(stmtGroup, parseLogConfig, filterCond);
InfluxDBConnection.writeDataByAsync(sqlEntity);
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。