有个项目用到了查询设备的实时信息日志,需要对其进行数据分析和提取,用到了influxDB这个时序数据库。
引入依赖
在pom文件中添加下面的依赖即可:
<dependency>
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
<version>2.15</version>
</dependency>
然后在springboot里面application.yaml文件配置里加入如下配置:
spring.influx.url=Influx地址
spring.influx.user=时序数据库用户名
spring.influx.password=时序数据库密码
在项目里面创建一个配置类InfluxdbConfiguration:
import lombok.AllArgsConstructor;
import org.influxdb.InfluxDB;
import org.influxdb.impl.InfluxDBMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@AllArgsConstructor
@Configuration
public class InfluxdbConfiguration {
private final InfluxDB influxDB;
@Bean
public InfluxDBMapper influxDBMapper() {
return new InfluxDBMapper(influxDB);
}
}
接着创建实体类InfluxdbInfo:
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import org.influxdb.annotation.Column;
import org.influxdb.annotation.Measurement;
@Data
@SuperBuilder
@AllArgsConstructor
@NoArgsConstructor
@Measurement(name="实时数据库表名",database = "时序数据库")
public class ElectromagneticFlowMeterData {
@Column(name = "time")
private String time;
@Column(name = "product_key")
private String productKey;
@ApiModelProperty(value = "设备编号")
@Column(name = "entity_id")
private String entity_id;
}
注意:InfluxDB里,Measurement对应于传统关系型数据库中的table,database是时序数据库。InfluxDB里存储的数据成为时间序列数据,此外注意InfluxDB中的时间戳是以UTC时保存的,在提取过程中需要注意时区的转换。
查询数据(这里不去分耦合了,大概示范一下)
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@Data
@SuperBuilder
@AllArgsConstructor
@NoArgsConstructor
public class Query {
private Integer limit;
}
import lombok.AllArgsConstructor;
import org.influxdb.annotation.Measurement;
import org.influxdb.dto.Query;
import org.influxdb.impl.InfluxDBMapper;
import org.springframework.stereotype.Repository;
import java.util.List;
import java.util.Objects;
@AllArgsConstructor
@Repository
public class InfluxdbInfoTest {
private final InfluxDBMapper influxDBMapper;
public List<InfluxdbInfo> selectAll(Query query) {
return influxDBMapper.query(buildQuery(query),InfluxdbInfo.class);
}
public Query buildQuery(Query query){
String measurement = InfluxdbInfo.class.getAnnotation(Measurement.class).name();
String database = InfluxdbInfo.class.getAnnotation(Measurement.class).database();
StringBuilder sql = new StringBuilder("select * from "+measurement);
sql.append(" order by time desc ");
if(Objects.nonNull(query.getLimit())){
sql.append(" limit ").append(query.getLimit());
}
return new Query(sql.toString(),database);
}
}
到这整合查询基本上就完成了。