quick start

下载并安装

下载并且安装 influxDB, 需要 root 权限

  • 01
  • 02
wget https://dl.influxdata.com/influxdb/releases/influxdb-1.7.1.x86_64.rpm yum localinstall influxdb-1.7.1.x86_64.rpm

启动 influxDB

  • 01
systemctl start influxdb

配置文件地址

  • 01
/etc/influxdb/influxdb.conf

默认数据地址

  • 01
  • 02
  • 03
/var/lib/influxdb/data /var/lib/influxdb/wal /var/lib/influxdb/meta

关键描述字段

Conceptually you can think of a measurement as an SQL table, where the primary index is always time. tags and fields are effectively columns in the table. tags are indexed, and fields are not. The difference is that, with InfluxDB, you can have millions of measurements, you don’t have to define schemas up-front, and null values aren’t stored.

database

数据库

measurement

tag

带索引的字段

field

不带索引的字段

创建数据库

如果你已经在本地安装运行了 InfluxDB,你就可以直接使用 influx 命令行,执行 influx 连接到本地的 InfluxDB 实例上。输出就像下面这样:

  • 01
  • 02
  • 03
  • 04
$ influx -precision rfc3339 Connected to http://localhost:8086 version 1.2.x InfluxDB shell 1.2.x >

说明:InfluxDB 的 HTTP 接口默认起在 8086 上,所以 inlux 默认也是连的本地的 8086 端口,你可以通过 influx --help 来看怎么修改默认值。
-precision 参数表明了任何返回的时间戳的格式和精度,在上面的例子里,rfc3339 是让 InfluxDB 返回 RFC339 格式 (YYYY-MM-DDTHH:MM:SS.nnnnnnnnnZ) 的时间戳。
这样这个命令行已经准备好接收 influx 的查询语句了 (简称 InfluxQL),用 exit 可以退出命令行。

第一次安装好 InfluxDB 之后是没有数据库的 (除了系统自带的_internal),因此创建一个数据库是我们首先要做的事,通过 CREATE DATABASE 这样的 InfluxQL 语句来创建,其中就是数据库的名字。数据库的名字可以是被双引号引起来的任意 Unicode 字符。 如果名称只包含 ASCII 字母,数字或下划线,并且不以数字开头,那么也可以不用引起来。
我们来创建一个 mydb 数据库:

  • 01
  • 02
> CREATE DATABASE mydb >

说明:在输入上面的语句之后,并没有看到任何信息,这在 CLI 里,表示语句被执行并且没有错误,如果有错误信息展示,那一定是哪里出问题了,这就是所谓的没有消息就是好消息。
现在数据库 mydb 已经创建好了,我们可以用 SHOW DATABASES 语句来看看已存在的数据库:

  • 01
  • 02
  • 03
  • 04
  • 05
  • 06
  • 07
  • 08
> SHOW DATABASES name: databases --------------- name _internal mydb >

说明:_internal 数据库是用来存储 InfluxDB 内部的实时监控数据的。
不像 SHOW DATABASES,大部分 InfluxQL 需要作用在一个特定的数据库上。你当然可以在每一个查询语句上带上你想查的数据库的名字,但是 CLI 提供了一个更为方便的方式 USE ,这会为你后面的所以的请求设置到这个数据库上。例如:

  • 01
  • 02
  • 03
> USE mydb Using database mydb >

以下的操作都作用于 mydb 这个数据库之上。

读写数据

现在我们已经有了一个数据库,那么 InfluxDB 就可以开始接收读写了。

首先对数据存储的格式来个入门介绍。InfluxDB 里存储的数据被称为时间序列数据,其包含一个数值,就像 CPU 的 load 值或是温度值类似的。时序数据有零个或多个数据点,每一个都是一个指标值。数据点包括 time (一个时间戳),measurement (例如 cpu_load),至少一个 k-v 格式的 field (也即指标的数值例如 “value=0.64” 或者 “temperature=21.2”),零个或多个 tag,其一般是对于这个指标值的元数据 (例如 “host=server01”, “region=EMEA”, “dc=Frankfurt)。

在概念上,你可以将 measurement 类比于 SQL 里面的 table,其主键索引总是时间戳。tag 和 field 是在 table 里的其他列,tag 是被索引起来的,field 没有。不同之处在于,在 InfluxDB 里,你可以有几百万的 measurements,你不用事先定义数据的 scheme,并且 null 值不会被存储

实际应用

基本的写入数据方式

  • 01
  • 02
  • 03
  • 04
  • 05
  • 06
  • 07
  • 08
  • 09
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
package com.michloas.influx.write; import com.michloas.influx.util.InfluxDBCURD; import org.influxdb.BatchOptions; import org.influxdb.InfluxDB; import org.influxdb.dto.Point; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.concurrent.TimeUnit; /** * Created by michloas on 2018/12/13. */ @Service public class InfluxDBBasicWrite { @Autowired private InfluxDBCURD influxDBCURD; /** * 基本的写入操作 */ public void influxWriteBasic() { //连接influx InfluxDB influxDB = influxDBCURD.connectInfluxDB(); String dbName = "aTimeSeries"; influxDB.createDatabase(dbName); influxDB.setDatabase(dbName); String rpName = "aRetentionPolicy"; influxDB.createRetentionPolicy(rpName, dbName, "30d", "30m", 2, true); influxDB.setRetentionPolicy(rpName); influxDB.enableBatch(BatchOptions.DEFAULTS); Point point = Point.measurement("cpu") .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS) .addField("idle", 90L) .addField("user", 9L) .addField("system", 1L) .build(); influxDB.write(point); /* influxDB.write(Point.measurement("disk") .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS) .addField("used", 80L) .addField("free", 1L) .build());*/ //关闭influx influxDBCURD.closeInfluxDB(influxDB); } }

同步数据写入

  • 01
  • 02
  • 03
  • 04
  • 05
  • 06
  • 07
  • 08
  • 09
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
package com.michloas.influx.write; import com.michloas.influx.util.InfluxDBCURD; import org.influxdb.InfluxDB; import org.influxdb.dto.BatchPoints; import org.influxdb.dto.Point; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.concurrent.TimeUnit; /** * Created by michloas on 2018/12/13. */ @Service public class InfluxDBSyncWrite { @Autowired private InfluxDBCURD influxDBCURD; /** * 同步写入 */ public void influxWriteSync() { //连接influx数据库 InfluxDB influxDB = influxDBCURD.connectInfluxDB(); String dbName = "aTimeSeries"; influxDB.createDatabase(dbName); String rpName = "aRetentionPolicy"; influxDB.createRetentionPolicy(rpName, dbName, "30d", "30m", 2, true); BatchPoints batchPoints = BatchPoints .database(dbName) .tag("async", "true") .retentionPolicy(rpName) .consistency(InfluxDB.ConsistencyLevel.ALL) .build(); Point point1 = Point.measurement("cpu") .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS) .addField("idle", 800L) .addField("user", 80L) .addField("system", 8L) .build(); Point point2 = Point.measurement("disk") .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS) .addField("used", 800L) .addField("free", 8L) .build(); batchPoints.point(point1); batchPoints.point(point2); influxDB.write(batchPoints); //关闭influx influxDBCURD.closeInfluxDB(influxDB); } }

数据读取

类 sql 方式读取数据

  • 01
  • 02
  • 03
  • 04
  • 05
  • 06
  • 07
  • 08
  • 09
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
package com.michloas.influx.read; import com.michloas.influx.pojo.Cpu; import com.michloas.influx.pojo.Disk; import com.michloas.influx.util.InfluxDBCURD; import org.influxdb.dto.QueryResult; import org.influxdb.impl.InfluxDBResultMapper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.List; /** * Created by michloas on 2018/12/13. */ @Service public class InfluxDBRead { @Autowired private InfluxDBCURD influxDBCURD; /** * 读取某个数据库的数据 * * @param dbName */ public List<Cpu> readData(String dbName) { QueryResult queryResult = influxDBCURD.queryList("query=from(bucket: \"aTimeSeries\")\n" + " |> range(start: dashboardTime)\n" + " |> filter(fn: (r) => r.cid == \"2\")", dbName); InfluxDBResultMapper resultMapper = new InfluxDBResultMapper(); List<Cpu> cpuList = resultMapper.toPOJO(queryResult, Cpu.class); System.out.println("查询结果为:" + cpuList); return cpuList; } /** * @param dbName * @return */ public List<Disk> readDiskData(String dbName) { QueryResult queryResult = influxDBCURD.queryList("select * from disk", dbName); InfluxDBResultMapper resultMapper = new InfluxDBResultMapper(); List<Disk> diskList = resultMapper.toPOJO(queryResult, Disk.class); System.out.println("查询结果为:" + diskList); return diskList; } }

flux 语法方式读取数据

详细的说明会在 flux 使用时单独写文档

参考地址

http://docs.influxdata.com/flux/v0.7/introduction/installation/