kafka 技术的初步说明

kafka简介

Kafka 概述
Apache Kafka 是一个分布式流处理平台,用于构建实时的数据管道和流式的应用.它可以让你发布和订阅流式的记录,可以储存流式的记录,并且有较好的容错性,可以在流式记录产生时就进行处理。

Apache Kafka是分布式发布-订阅消息系统,在 kafka官网上对 Kafka 的定义:一个分布式发布-订阅消息传递系统。

Kafka 特性

  • 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作;

  • 可扩展性:kafka集群支持热扩展;

  • 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失;

  • 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败);

  • 高并发:支持数千个客户端同时读写;

  • 支持实时在线处理和离线处理:可以使用Storm这种实时流处理系统对消息进行实时进行处理,同时还可以使用Hadoop这种批处理系统进行离线处理;

Kafka 使用场景

  • 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如Hadoop、Hbase、Solr等;

  • 消息系统:解耦和生产者和消费者、缓存消息等;

  • 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到Hadoop、数据仓库中做离线分析和挖掘;

  • 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告;

  • 流式处理:比如spark streaming和storm;

  • 事件源;

kafka快速入门

启动kafka自带的zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties 1>/dev/null 2>&1 &

集群启动1

bin/kafka-server-start.sh config/server.properties

集群启动2

bin/kafka-server-start.sh config/server-1.properties

集群启动3

bin/kafka-server-start.sh config/server-2.properties

创建topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic zhangyu

查看集群描述

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic zhangyu

生产者

bin/kafka-console-producer.sh --broker-list 192.168.204.128:9092 --topic zhangyu

消费者

bin/kafka-console-consumer.sh --bootstrap-server 192.168.204.128:9092 --from-beginning --topic zhangyu

查看防火墙状态

firewall-cmd --state

停止防火墙

systemctl stop firewalld.service

kafka quick start

加入pom依赖

<dependency>
		<groupId>org.springframework.kafka</groupId>
		<artifactId>spring-kafka</artifactId>
		<version>2.2.2.RELEASE</version>
</dependency>

kafka数据发送(生产者)

@Component
public class KafkaDataSender {

    private final static Logger logger = LoggerFactory.getLogger(KafkaDataSender.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;


    public void send() {
        Long beginTime = new Date().getTime();
        logger.info("开始时间:" + beginTime);
        for (int i = 0; i <= 200000; i++) {
            JSONObject message = new JSONObject();
            message.put("cid", i);
            message.put("pn", i);
            message.put("tag", "ac");
            message.put("time", "123");
            message.put("value", 123);
            ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send("zhangyu1", message.toString());
            result.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
                @Override
                public void onFailure(Throwable throwable) {

                }

                @Override
                public void onSuccess(@Nullable SendResult<String, String> stringStringSendResult) {
                    Long currentTime = new Date().getTime();
                    logger.info("数据发送成功" + (currentTime - beginTime) / 1000 + "秒");
                }
            });
        }
        Long endTime = new Date().getTime();
        logger.info("结束时间:" + endTime);
        logger.info("耗时:" + (endTime - beginTime) / 1000 + "秒");
    }
}

kafka数据消费(消费者)

@Component
public class KafkaConsumer {

    private final static Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);

    @KafkaListener(topics = "zhangyu1")
    public void listen(ConsumerRecord<String, String> record) throws Exception {
        logger.info("topic is " + record.topic() + " offset is +" + record.offset() + " value is " + record.value());
    }

}

压力测试

本机开启虚拟机占用内存1g,谷歌占用内存0.5g,编译器占用2g

100w数据占用85%内存

数据量耗时
1w2s
10w17s
20w34s
30w46s
50w97s
100w202s

个人账号