简介
InfluxDB是一个时间序列数据库,旨在处理较高的写入和查询负载。它是TICK堆栈的组成部分 。InfluxDB旨在用作涉及大量时间戳数据的任何用例的后备存储,包括DevOps监控,应用程序指标,IoT传感器数据和实时分析。。
特点
- GO语言编写,无其他依赖项
- 专为时间序列数据编写的定制高性能数据存储。TSM引擎可实现高摄取速度和数据压缩
- 无结构,列式存储,可动态扩展列
- 支持一系列的聚合函数
- 支持HTTP API访问
注意 InfluxDB 2.0 版本新增了org 和 bucket的概念,认证使用 token 而不是用户名密码方式,语法采用 Flux 而不是之前的类 SQL 方式。
安装
## 下载
wget https://dl.influxdata.com/influxdb/releases/influxdb2-2.0.7-linux-amd64.tar.gz
## 解压
tar xvzf influxdb2-2.0.7-linux-amd64.tar.gz
## 复制
cp influxdb2-2.0.7-linux-amd64/{influx,influxd} /usr/local/bin/
安装成功后访问地址:http://ip:8086
科普
- metric: 度量,相当于关系型数据库中的table。
- data point: 数据点,相当于关系型数据库中的row。
- timestamp:时间戳,代表数据点产生的时间。
- field: 度量下的不同字段。比如位置这个度量具有经度和纬度两个field。一般情况下存放的是会随着时间戳的变化而变化的数据。
- tag: 标签,或者附加信息。一般存放的是并不随着时间戳变化的属性信息。timestamp加上所有的tags可以认为是table的primary key。
整合
配置文件 pom.xml 引入:
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb-client-java</artifactId>
<version>3.0.1</version>
</dependency>
数据库配置参数,自行更换 token 和 url:
spring:
influx:
bucket: DNC
org: itstyle
token: MemXOQoMFfRTnBSGPldTyO8V5w3L7BA==
url: http://127.0.0.1:8086
初始化配置:
/**
* 时序数据库配置
*/
@Configuration
public class InfluxdbConfig {
@Value("${spring.influx.url:''}")
private String influxDBUrl;
@Value("${spring.influx.token:''}")
private String token;
@Bean
public InfluxDBClient influxDBClient() {
InfluxDBClient influxDBClient = InfluxDBClientFactory.create(influxDBUrl, token.toCharArray());
influxDBClient.setLogLevel(LogLevel.BASIC);
return influxDBClient;
}
}
插入查询案例:
@Repository
public class TimeSeriesRepository {
@Autowired
InfluxDBClient influxDBClient;
@Value("${spring.influx.org:''}")
private String org;
@Value("${spring.influx.bucket:''}")
private String bucket;
/**
* 保存
* @param measurement 表名
* @param fields
*/
public void save(String measurement, Map<String,Object> fields){
WriteOptions writeOptions = WriteOptions.builder()
.batchSize(5000)
.flushInterval(1000)
.bufferLimit(10000)
.jitterInterval(1000)
.retryInterval(5000)
.build();
try (WriteApi writeApi = influxDBClient.getWriteApi(writeOptions)) {
String deviceId = fields.get("deviceId").toString();
fields.remove("deviceId");
Point point = Point
.measurement(measurement)
.addTag("deviceId",deviceId)
.addFields(fields)
.time(Instant.now(), WritePrecision.NS);
writeApi.writePoint(bucket, org, point);
}
}
/**
* 查询语法说明
* https://blog.52itstyle.vip
* 1、bucket 桶
* 2、range 指定起始时间段
* range有两个参数start,stop,stop不设置默认为当前。
* range可以是相对的(使用负持续时间)或绝对(使用时间段)
* 3、filter 过滤条件查询 _measurement 表 _field 字段
* 4、yield()函数作为查询结果输出过滤的tables。
* 更多参考:https://docs.influxdata.com/influxdb/v2.0/query-data/flux/
* @return
*/
public List<FluxTable> findAll(){
String sql = "from(bucket: \"%s\") |> range(start: -1m)";
sql +=" |> filter(fn: (r) => r._measurement == \"%s\" and";
sql +=" r._field == \"%s\")";
sql +=" |> yield()";
String flux = String.format(sql, bucket,"dnc_humidity","humidity");
QueryApi queryApi = influxDBClient.getQueryApi();
List<FluxTable> tables = queryApi.query(flux,org);
return tables;
}
}