简单耍一下-Kafka
趁着毕设初期,还能摸会儿🐟,了解波 Kafka。
Kafka 术语一览
Kafka,分布式消息引擎系统,主要功能是提供一套完备的消息发布与订阅解决方案。Kafka 也是一个分布式的、分区的、多副本的多订阅者,基于 Zookeeper 协调的分布式日志系统,可用于处理 Web 日志和消息服务。
Topic:主题,承载消息(Record)的逻辑容器,每条发布到 Kafka 集群的消息都归属于某一个 Topic,实际应用中,不同的 topic 对应着不同的业务;
Broker:Kafka 服务进程,一个 Kafka 集群由多个 Broker 服务进程组成,虽然多个 Broker 可在同一服务器上进行部署,但为了高可用,会将不同的 Broker 部署在不同的机器上;
Partition:分区,一组有序的消息序列,一个 Topic 可以有多个分区,同一 Topic 下的 Partition 可以分布在不同的 Broker 中。Producer 生产的每一条消息都会被放到一个 Partition 中,每条消息在 Partition 中的位置信息由一个 Offset(偏移量)数据表征。Kafka 通过偏移量(Offset)来保证消息在分区内的顺序性;
Leader:每个 Partition 下可以配置多个 Replica(副本),Replica 由一个 Leader 和多个 Follower 组成,Leader 负责当前 Partition 消息的读写;
Follower:用于同步 Leader 中的数据,数据冗余,Leader 失效时会从 Followers 中选取;
Producer:生产者即数据的发布者。Producer 将消息发送给 Kafka 对应的 Topic 中,Broker 接收到消息后,会将消息存储到 Partition 中;
Consumer:消费者,消费者可以消费多个 Topic 中的消息,一个 Topic 中的消息也可以被多个消费者消费;
Consumer Group:消费者组,每一个消费者都会归属于某一个消费者组,如果未指定,则取默认的 Group;
Consumer Offset:消费者位移,用于表示消费者的消费进度;
与 Kafka 相关的几个问题:
Broker 分布式部署
备份机制(Replacation),把相同的数据拷贝到多台机器上。即 Kafka Replica,Leader Replica 提供数据的读写操作,Follower Replica 负责同步数据。
Partition 机制,一个 Topic 划分为多个 Partition,防止单台 Broker 机器无法容纳太多的数据,Partition 机制与 Replica 机制联系紧密,每个 Partition 可以有多个 Replica(1 Leader + N Followers)。
Zookeeper 可为分布式系统提供分布式配置服务、同步服务和命名注册服务。
从前文可知,Kafka 的消息存储在 Topic 中,一个 Topic 又可以划分为多个 Partition,多 Partition 时,Kafka 只能保证 Partition 内的消息有序(Offset保证有序),如需保证 Topic 消息的有序,那么只能使用单个Partition了。如果仍要使用多个 Partition,消息的分区写入策略应选择按键(Key)保存。
通过 Go 体验一下 Kafka 环境搭建 既然只是玩一下,不如使用 Docker 搭建 Kafka 环境吧,“即用即焚”。
环境:Windows 10 Docker Desktop + WSL
这里通过 Docker-Compose 搭建个单机版的 kafka 集群,编排文件如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 version: '3.4' services: zoo1: container_name: zookeeper-one image: zookeeper:3.4.9 hostname: zoo1 ports: - "2181:2181" environment: ZOO_MY_ID: 1 ZOO_PORT: 2181 ZOO_SERVERS: server.1=zoo1:2888:3888 volumes: - ./zk-single-kafka-single/zoo1/data:/data - ./zk-single-kafka-single/zoo1/datalog:/datalog kafka1: container_name: kafka-one image: confluentinc/cp-kafka:5.3.1 hostname: kafka1 ports: - "9092:9092" environment: KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181" KAFKA_BROKER_ID: 1 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 volumes: - ./zk-single-kafka-single/kafka1/data:/var/lib/kafka/data depends_on: - zoo1
该编排文件来自:https://github.com/simplesteph/kafka-stack-docker-compose 的 zk-single-kafka-single.yml。使用 docker-compose up
启动容器。
编排文件中所使用到的镜像 confluentinc/cp-kafka:5.3.1
和 zookeeper:3.4.9
配置参考:
Kafka 和 Zookeeper 容器启动后,配合 IDEA 的两个插件 Kafkalytic
和 Zoolytic
,我们可以很方便的观察集群的情况:
Cluster Management
通过 vscode 插件我们可以方便的对启动的容器进行管理(日志追踪、shell attach等):
vscode docker plugin
通过 Kafka 自带的命令行工具可以查看 Topic:(先连接到 Kafka 容器:docker exec -it kafka-one bash
)
1 2 3 4 5 root@kafka1:/# kafka-topics --describe --zookeeper zoo1:2181 Topic:__confluent.support.metrics PartitionCount:1 ReplicationFactor:1 Configs:retention.ms=31536000000 Topic: __confluent.support.metrics Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:1 Configs:s ......
使用 confluent-kafka-go 体验 Kafka Go 中有两个比较有名的 Go Client,即 kafka-go 和 confluent-kafka-go 。我都不熟悉😂,但是前面编排时用到了 confluent 公司的 Kafka 镜像,所以这里选用 confluent-kafka-go
创建 Client。confluent-kafka-go 项目的 example 拿来即用。
1、创建 Go Module
1 2 3 4 mkdir go-kafka-demo cd go-kafka-demo go mod init github.com/yeshan333/go-kafka-demo go get -u github.com/confluentinc/confluent-kafka-go
2、创建 Consumer。这个 Consumer 订阅的 Topic 为 myTopic。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 package mainimport ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" ) func main () { c, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers" : "localhost" , "group.id" : "myGroup" , "auto.offset.reset" : "earliest" , }) if err != nil { panic (err) } c.SubscribeTopics([]string {"myTopic" , "^aRegex.*[Tt]opic" }, nil ) for { msg, err := c.ReadMessage(-1 ) if err == nil { fmt.Printf("Message on %s: %s\n" , msg.TopicPartition, string (msg.Value)) } else { fmt.Printf("Consumer error: %v (%v)\n" , err, msg) } } c.Close() }
3、创建 Producer。这个 Producer 向 myTopic Topic 发送了 7 条消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 package mainimport ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" ) func main () { p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers" : "localhost" }) if err != nil { panic (err) } defer p.Close() go func () { for e := range p.Events() { switch ev := e.(type ) { case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("Delivery failed: %v\n" , ev.TopicPartition) } else { fmt.Printf("Delivered message to %v\n" , ev.TopicPartition) } } } }() topic := "myTopic" for _, word := range []string {"Welcome" , "to" , "the" , "Confluent" , "Kafka" , "Golang" , "client" } { p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte (word), }, nil ) } p.Flush(15 * 1000 ) }
4、两个 terminal,先跑 Consumer,再跑 Producer。
1 2 3 4 # terminal 1 go run kafka_consumer.go # ternimal 2 go run kafka_producer.go
run result
收工,其他东西后续慢慢啃。本文源文件:https://github.com/yeshan333/go-kafka-demo
参考