侧边栏壁纸
  • 累计撰写 38 篇文章
  • 累计创建 25 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

influxDB 入门

michloas
2019-02-23 / 0 评论 / 0 点赞 / 326 阅读 / 8156 字 / 正在检测是否收录...

quick start

下载并安装

下载并且安装influxDB,需要root权限

wget https://dl.influxdata.com/influxdb/releases/influxdb-1.7.1.x86_64.rpm
yum localinstall influxdb-1.7.1.x86_64.rpm

启动influxDB

systemctl start influxdb

配置文件地址

/etc/influxdb/influxdb.conf

默认数据地址

/var/lib/influxdb/data
/var/lib/influxdb/wal
/var/lib/influxdb/meta

关键描述字段

Conceptually you can think of a measurement as an SQL table, where the primary index is always time. tags and fields are effectively columns in the table. tags are indexed, and fields are not. The difference is that, with InfluxDB, you can have millions of measurements, you don’t have to define schemas up-front, and null values aren’t stored.

database

数据库

measurement

tag

带索引的字段

field

不带索引的字段

创建数据库

如果你已经在本地安装运行了InfluxDB,你就可以直接使用influx命令行,执行influx连接到本地的InfluxDB实例上。输出就像下面这样:

$ influx -precision rfc3339
Connected to http://localhost:8086 version 1.2.x
InfluxDB shell 1.2.x
>

说明:InfluxDB的HTTP接口默认起在8086上,所以inlux默认也是连的本地的8086端口,你可以通过influx --help来看怎么修改默认值。
-precision参数表明了任何返回的时间戳的格式和精度,在上面的例子里,rfc3339是让InfluxDB返回RFC339格式(YYYY-MM-DDTHH:MM:SS.nnnnnnnnnZ)的时间戳。
这样这个命令行已经准备好接收influx的查询语句了(简称InfluxQL),用exit可以退出命令行。

第一次安装好InfluxDB之后是没有数据库的(除了系统自带的_internal),因此创建一个数据库是我们首先要做的事,通过CREATE DATABASE 这样的InfluxQL语句来创建,其中就是数据库的名字。数据库的名字可以是被双引号引起来的任意Unicode字符。 如果名称只包含ASCII字母,数字或下划线,并且不以数字开头,那么也可以不用引起来。
我们来创建一个mydb数据库:

> CREATE DATABASE mydb
>

说明:在输入上面的语句之后,并没有看到任何信息,这在CLI里,表示语句被执行并且没有错误,如果有错误信息展示,那一定是哪里出问题了,这就是所谓的没有消息就是好消息。
现在数据库mydb已经创建好了,我们可以用SHOW DATABASES语句来看看已存在的数据库:

> SHOW DATABASES
name: databases
---------------
name
_internal
mydb

>

说明:_internal数据库是用来存储InfluxDB内部的实时监控数据的。
不像SHOW DATABASES,大部分InfluxQL需要作用在一个特定的数据库上。你当然可以在每一个查询语句上带上你想查的数据库的名字,但是CLI提供了一个更为方便的方式USE ,这会为你后面的所以的请求设置到这个数据库上。例如:

> USE mydb
Using database mydb
>

以下的操作都作用于mydb这个数据库之上。

读写数据

现在我们已经有了一个数据库,那么InfluxDB就可以开始接收读写了。

首先对数据存储的格式来个入门介绍。InfluxDB里存储的数据被称为时间序列数据,其包含一个数值,就像CPU的load值或是温度值类似的。时序数据有零个或多个数据点,每一个都是一个指标值。数据点包括time(一个时间戳),measurement(例如cpu_load),至少一个k-v格式的field(也即指标的数值例如 “value=0.64”或者“temperature=21.2”),零个或多个tag,其一般是对于这个指标值的元数据(例如“host=server01”, “region=EMEA”, “dc=Frankfurt)。

在概念上,你可以将measurement类比于SQL里面的table,其主键索引总是时间戳。tag和field是在table里的其他列,tag是被索引起来的,field没有。不同之处在于,在InfluxDB里,你可以有几百万的measurements,你不用事先定义数据的scheme,并且null值不会被存储

实际应用

基本的写入数据方式

package com.michloas.influx.write;

import com.michloas.influx.util.InfluxDBCURD;
import org.influxdb.BatchOptions;
import org.influxdb.InfluxDB;
import org.influxdb.dto.Point;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;

/**
 * Created by michloas on 2018/12/13.
 */
@Service
public class InfluxDBBasicWrite {

    @Autowired
    private InfluxDBCURD influxDBCURD;

    /**
     * 基本的写入操作
     */
    public void influxWriteBasic() {
        //连接influx
        InfluxDB influxDB = influxDBCURD.connectInfluxDB();
        String dbName = "aTimeSeries";
        influxDB.createDatabase(dbName);
        influxDB.setDatabase(dbName);
        String rpName = "aRetentionPolicy";
        influxDB.createRetentionPolicy(rpName, dbName, "30d", "30m", 2, true);
        influxDB.setRetentionPolicy(rpName);
        influxDB.enableBatch(BatchOptions.DEFAULTS);
        Point point = Point.measurement("cpu")
                .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
                .addField("idle", 90L)
                .addField("user", 9L)
                .addField("system", 1L)
                .build();
        influxDB.write(point);
       /* influxDB.write(Point.measurement("disk")
                .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
                .addField("used", 80L)
                .addField("free", 1L)
                .build());*/
        //关闭influx
        influxDBCURD.closeInfluxDB(influxDB);
    }
}

同步数据写入

package com.michloas.influx.write;

import com.michloas.influx.util.InfluxDBCURD;
import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;

/**
 * Created by michloas on 2018/12/13.
 */
@Service
public class InfluxDBSyncWrite {

    @Autowired
    private InfluxDBCURD influxDBCURD;


    /**
     * 同步写入
     */
    public void influxWriteSync() {
        //连接influx数据库
        InfluxDB influxDB = influxDBCURD.connectInfluxDB();
        String dbName = "aTimeSeries";
        influxDB.createDatabase(dbName);
        String rpName = "aRetentionPolicy";
        influxDB.createRetentionPolicy(rpName, dbName, "30d", "30m", 2, true);
        BatchPoints batchPoints = BatchPoints
                .database(dbName)
                .tag("async", "true")
                .retentionPolicy(rpName)
                .consistency(InfluxDB.ConsistencyLevel.ALL)
                .build();
        Point point1 = Point.measurement("cpu")
                .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
                .addField("idle", 800L)
                .addField("user", 80L)
                .addField("system", 8L)
                .build();
        Point point2 = Point.measurement("disk")
                .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
                .addField("used", 800L)
                .addField("free", 8L)
                .build();
        batchPoints.point(point1);
        batchPoints.point(point2);
        influxDB.write(batchPoints);
        //关闭influx
        influxDBCURD.closeInfluxDB(influxDB);
    }


}

数据读取

类sql方式读取数据

package com.michloas.influx.read;

import com.michloas.influx.pojo.Cpu;
import com.michloas.influx.pojo.Disk;
import com.michloas.influx.util.InfluxDBCURD;
import org.influxdb.dto.QueryResult;
import org.influxdb.impl.InfluxDBResultMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.List;

/**
 * Created by michloas on 2018/12/13.
 */
@Service
public class InfluxDBRead {

    @Autowired
    private InfluxDBCURD influxDBCURD;

    /**
     * 读取某个数据库的数据
     *
     * @param dbName
     */
    public List<Cpu> readData(String dbName) {
        QueryResult queryResult = influxDBCURD.queryList("query=from(bucket: \"aTimeSeries\")\n" +
                "  |> range(start: dashboardTime)\n" +
                "  |> filter(fn: (r) => r.cid == \"2\")", dbName);
        InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
        List<Cpu> cpuList = resultMapper.toPOJO(queryResult, Cpu.class);
        System.out.println("查询结果为:" + cpuList);
        return cpuList;
    }


    /**
     * @param dbName
     * @return
     */
    public List<Disk> readDiskData(String dbName) {
        QueryResult queryResult = influxDBCURD.queryList("select * from disk", dbName);
        InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
        List<Disk> diskList = resultMapper.toPOJO(queryResult, Disk.class);
        System.out.println("查询结果为:" + diskList);
        return diskList;
    }


}

flux语法方式读取数据

详细的说明会在flux使用时单独写文档

参考地址

http://docs.influxdata.com/flux/v0.7/introduction/installation/

0
  1. 支付宝打赏

    qrcode alipay
  2. 微信打赏

    qrcode weixin

评论区