kafka 技术的初步说明
kafka简介
Kafka 概述
Apache Kafka 是一个分布式流处理平台,用于构建实时的数据管道和流式的应用.它可以让你发布和订阅流式的记录,可以储存流式的记录,并且有较好的容错性,可以在流式记录产生时就进行处理。
Apache Kafka是分布式发布-订阅消息系统,在 kafka官网上对 Kafka 的定义:一个分布式发布-订阅消息传递系统。
Kafka 特性
-
高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作;
-
可扩展性:kafka集群支持热扩展;
-
持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失;
-
容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败);
-
高并发:支持数千个客户端同时读写;
-
支持实时在线处理和离线处理:可以使用Storm这种实时流处理系统对消息进行实时进行处理,同时还可以使用Hadoop这种批处理系统进行离线处理;
Kafka 使用场景
-
日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如Hadoop、Hbase、Solr等;
-
消息系统:解耦和生产者和消费者、缓存消息等;
-
用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到Hadoop、数据仓库中做离线分析和挖掘;
-
运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告;
-
流式处理:比如spark streaming和storm;
-
事件源;
kafka快速入门
启动kafka自带的zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties 1>/dev/null 2>&1 &
集群启动1
bin/kafka-server-start.sh config/server.properties
集群启动2
bin/kafka-server-start.sh config/server-1.properties
集群启动3
bin/kafka-server-start.sh config/server-2.properties
创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic zhangyu
查看集群描述
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic zhangyu
生产者
bin/kafka-console-producer.sh --broker-list 192.168.204.128:9092 --topic zhangyu
消费者
bin/kafka-console-consumer.sh --bootstrap-server 192.168.204.128:9092 --from-beginning --topic zhangyu
查看防火墙状态
firewall-cmd --state
停止防火墙
systemctl stop firewalld.service
kafka quick start
加入pom依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.2.RELEASE</version>
</dependency>
kafka数据发送(生产者)
@Component
public class KafkaDataSender {
private final static Logger logger = LoggerFactory.getLogger(KafkaDataSender.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send() {
Long beginTime = new Date().getTime();
logger.info("开始时间:" + beginTime);
for (int i = 0; i <= 200000; i++) {
JSONObject message = new JSONObject();
message.put("cid", i);
message.put("pn", i);
message.put("tag", "ac");
message.put("time", "123");
message.put("value", 123);
ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send("zhangyu1", message.toString());
result.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
}
@Override
public void onSuccess(@Nullable SendResult<String, String> stringStringSendResult) {
Long currentTime = new Date().getTime();
logger.info("数据发送成功" + (currentTime - beginTime) / 1000 + "秒");
}
});
}
Long endTime = new Date().getTime();
logger.info("结束时间:" + endTime);
logger.info("耗时:" + (endTime - beginTime) / 1000 + "秒");
}
}
kafka数据消费(消费者)
@Component
public class KafkaConsumer {
private final static Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
@KafkaListener(topics = "zhangyu1")
public void listen(ConsumerRecord<String, String> record) throws Exception {
logger.info("topic is " + record.topic() + " offset is +" + record.offset() + " value is " + record.value());
}
}
压力测试
本机开启虚拟机占用内存1g,谷歌占用内存0.5g,编译器占用2g
100w数据占用85%内存
数据量 | 耗时 |
---|---|
1w | 2s |
10w | 17s |
20w | 34s |
30w | 46s |
50w | 97s |
100w | 202s |