kafka基础应用学习笔记

kafka基础应用学习笔记

kafka:一种非常流行的消息中间件
人话:可以用这个框架解决消息在服务间传递的格式化及速度等问题,那么为什么要这么做呢?
答:因为快,而且方便。

解释kafka中的专业术语(个人理解)

1
2
3
4
5
6
7
8
9
Producer:消息的生产者,消息从生产者传递至kafka处理 (做饭的)

Consumer:消息的消费者,把消息从kafka中拿出(吃饭的)

Consumer Group:消费者群组,就是把消费者们进行划分合并(吃饭排一个队的)

Topic:标签,个人将其理解为一个标识,具有相同标识的生产者和消费者之间进行联系,读取特定的数据(打饭的窗口)

Broker: kafka集群中包含的服务器节点(食堂)

搭建kafka

可选择官网tgz包下载或直接docker搭建

https://archive.apache.org/dist/kafka/

1
2
3
4
5
6
7
8
kafka启动需要zookeeper组件
tgz包版:
$root>cd kafka_X.XX-X.X.X
$root>bin/zookeeper-server-start.sh config/zookeeper.properties

然后启动kafkaserver
$root>bin/kafka-server-start.sh config/server.properties

docker版:

1
2
3
4
5
6
7
8
9
$root>docker pull zookeeper 
$root>docker run -it -p2888:2888 -p2181:2181 --name=zookeeper zookeeper

#到这步就是启动了zookeeper
#下面就是使用kafka链接zookeeper,可使用宿主机(前提是有java环境「使用java --version查看」)
$root> curl -O -L https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.6.0/kafka_2.12-2.6.0.tgz
$root>tar -zxvf kafka_2.12-2.6.0.tgz
$root>cd kafka_2.12-2.6.0/
$root>bin/kafka-server-start.sh config/server.properties

利用了别人的镜像,避免重复造轮子(当然我喜欢自己造轮子所以我用的tgz )

一些基本操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 创建topic
>bin/kafka-topics.sh --create --zookeeper 【zookeeper server:port】 --replication-factor 1 --partitions 1 --topic 【topic name】

# 查看本机的topic
>bin/kafka-topics.sh --zookeeper 【zookeeper server:port】 --list

# 使用生产者身份连接kafka,这里会打开一个终端,向kafka中发送消息
>bin/kafka-console-producer.sh --broker-list 【zookeeper server:port】 --topic 【topic name】
>hello world

# 使用消费者身份连接kafka,这里同样会打开一个终端,获取kafka中的信息
>bin/kafka-console-consumer.sh --bootstrap-server 【zookeeper server:port】 --topic 【topic name】 --from-beginning
hello world

# 删除topic(先把kafka和zookeeper停了再操作)
>bin/kafka-topics --delete --zookeeper 【zookeeper server:port】 --topic 【topic name】
#然后删除topic的存储目录(server.properties文件log.dirs配置)“默认为/tmp/kafka-logs的topic相关目录”即可,查询显示还有的话,就重启一下将信息更新即可


注意事项:

1
2
3
4
5
6
7
# PS:说一下几个可能碰到的问题,
# 1.“唉,为啥我在虚拟机里面(docker里面)启动的kafka为啥局域网内拿python连不上啊?”
答:“因为这玩意默认是本地localhost运行的,你要在config/server.properties内修改配置(看下图)“
# 2."唉,为啥我的kafka链接不了zookeeper啊?"
答:“你看下你的zookeeper启动在哪里,然后在config/server.properties内修改配置(同样看下图)”
# 3."md,sb博主,为啥我的topic删除不了?骗子!??"
答:“你需要配置auto.create.topics.enable = false和delete.topic.enable=true

配置1

配置2

贴上我自己测试的python链接脚本,先启动消费者然后启动生产者

Producer.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import json
import traceback
from kafka import KafkaProducer
from kafka.errors import kafka_errors

producer = KafkaProducer(
bootstrap_servers=['zookeeper_server:port'],
key_serializer=lambda k: json.dumps(k).encode(),
value_serializer=lambda v: json.dumps(v).encode())
print (producer.config)
msg_dict = {
"sleep_time": 1,
"db_config": {
"database": "test_1",
"host": "xxxx",
"user": "root",
"password": "root"
},
"table": "msg",
"msg": "Hello World"
}
msg2_dict = "hahahahhahaha"

msg = json.dumps(msg_dict)
futruer = producer.send("kafka_demo", key='count_num', value=msg_dict, partition=None)
try:
futruer.get(timeout=20)
except kafka_errors: # 发送失败抛出kafka_errors
traceback.format_exc()

Consumer.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import json
from kafka import KafkaConsumer

def consumer_demo():
consumer = KafkaConsumer(
'kafka_demo',
bootstrap_servers='zookeeper_server:port',
group_id='hahaha'
)
for message in consumer:
print("receive, key: {}, value: {}".format(
json.loads(message.key.decode()),
json.loads(message.value.decode())
)
)

consumer_demo()