SpringBoot 2.x 开发案例之整合时序数据库 Influxdb


简介

InfluxDB是一个时间序列数据库,旨在处理较高的写入和查询负载。它是TICK堆栈的组成部分 。InfluxDB旨在用作涉及大量时间戳数据的任何用例的后备存储,包括DevOps监控,应用程序指标,IoT传感器数据和实时分析。。

特点

  • GO语言编写,无其他依赖项
  • 专为时间序列数据编写的定制高性能数据存储。TSM引擎可实现高摄取速度和数据压缩
  • 无结构,列式存储,可动态扩展列
  • 支持一系列的聚合函数
  • 支持HTTP API访问

注意 InfluxDB 2.0 版本新增了org 和 bucket的概念,认证使用 token 而不是用户名密码方式,语法采用 Flux 而不是之前的类 SQL 方式。

安装

## 下载
wget https://dl.influxdata.com/influxdb/releases/influxdb2-2.0.7-linux-amd64.tar.gz
## 解压
tar xvzf influxdb2-2.0.7-linux-amd64.tar.gz
## 复制
cp influxdb2-2.0.7-linux-amd64/{influx,influxd} /usr/local/bin/

安装成功后访问地址:http://ip:8086

科普

  • metric: 度量,相当于关系型数据库中的table。
  • data point: 数据点,相当于关系型数据库中的row。
  • timestamp:时间戳,代表数据点产生的时间。
  • field: 度量下的不同字段。比如位置这个度量具有经度和纬度两个field。一般情况下存放的是会随着时间戳的变化而变化的数据。
  • tag: 标签,或者附加信息。一般存放的是并不随着时间戳变化的属性信息。timestamp加上所有的tags可以认为是table的primary key。

整合

配置文件 pom.xml 引入:

<dependency>
     <groupId>com.influxdb</groupId>
     <artifactId>influxdb-client-java</artifactId>
     <version>3.0.1</version>
</dependency>

数据库配置参数,自行更换 token 和 url:

spring:
  influx:
    bucket: DNC
    org: itstyle
    token: MemXOQoMFfRTnBSGPldTyO8V5w3L7BA==
    url: http://127.0.0.1:8086

初始化配置:

/**
 * 时序数据库配置
 */
@Configuration
public class InfluxdbConfig {

    @Value("${spring.influx.url:''}")
    private String influxDBUrl;
    @Value("${spring.influx.token:''}")
    private String token;

    @Bean
    public InfluxDBClient influxDBClient() {
        InfluxDBClient influxDBClient = InfluxDBClientFactory.create(influxDBUrl, token.toCharArray());
        influxDBClient.setLogLevel(LogLevel.BASIC);
        return influxDBClient;
    }
}

插入查询案例:

@Repository
public class TimeSeriesRepository {

    @Autowired
    InfluxDBClient influxDBClient;

    @Value("${spring.influx.org:''}")
    private String org;

    @Value("${spring.influx.bucket:''}")
    private String bucket;

    /**
     * 保存
     * @param measurement 表名
     * @param fields
     */
    public void save(String measurement, Map<String,Object> fields){
        WriteOptions writeOptions = WriteOptions.builder()
                .batchSize(5000)
                .flushInterval(1000)
                .bufferLimit(10000)
                .jitterInterval(1000)
                .retryInterval(5000)
                .build();
        try (WriteApi writeApi = influxDBClient.getWriteApi(writeOptions)) {
            String deviceId = fields.get("deviceId").toString();
            fields.remove("deviceId");
            Point point = Point
                    .measurement(measurement)
                    .addTag("deviceId",deviceId)
                    .addFields(fields)
                    .time(Instant.now(), WritePrecision.NS);
            writeApi.writePoint(bucket, org, point);
        }
    }

    /**
     * 查询语法说明

     * https://blog.52itstyle.vip

     * 1、bucket 桶
     * 2、range 指定起始时间段
     *    range有两个参数start,stop,stop不设置默认为当前。
     *    range可以是相对的(使用负持续时间)或绝对(使用时间段)
     * 3、filter 过滤条件查询 _measurement 表  _field 字段
     * 4、yield()函数作为查询结果输出过滤的tables。
     * 更多参考:https://docs.influxdata.com/influxdb/v2.0/query-data/flux/
     * @return
     */
    public List<FluxTable> findAll(){
        String sql = "from(bucket: \"%s\") |> range(start: -1m)";
        sql +="  |> filter(fn: (r) => r._measurement == \"%s\" and";
        sql +="  r._field == \"%s\")";
        sql +="  |> yield()";
        String flux = String.format(sql, bucket,"dnc_humidity","humidity");
        QueryApi queryApi = influxDBClient.getQueryApi();
        List<FluxTable> tables = queryApi.query(flux,org);
        return tables;
    }
}
爪哇笔记

作者: 小柒

出处: https://blog.52itstyle.vip

分享是快乐的,也见证了个人成长历程,文章大多都是工作经验总结以及平时学习积累,基于自身认知不足之处在所难免,也请大家指正,共同进步。

本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出, 如有问题, 可邮件(345849402@qq.com)咨询。