go操作kafka

本文使用的是kafka-go这个库,执行下面的命令安装

1
go get github.com/segmentio/kafka-go

下面是常见的使用方式。

1.连接到Kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package controller

import (
"context"
"fmt"
"log"
"net"
"strconv"
"time"

"github.com/segmentio/kafka-go"
)

// 连接到leader
func Connection() {
conn, err := kafka.DialLeader(context.Background(), "tcp", addr, topic, partition)
if err != nil {
log.Fatal(err)
}

// 设置发送消息的超时时间
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))

// 发送消息
_, err = conn.WriteMessages(
kafka.Message{Value: []byte("one!")},
kafka.Message{Value: []byte("two!")},
kafka.Message{Value: []byte("three!")},
)

if err != nil {
log.Fatal(err)
}

// 关闭连接
if err := conn.Close(); err != nil {
log.Fatal(err)
}

fmt.Println("successful!")
}

// 通过非leader节点连接
func ConnectionNon() {
conn, err := kafka.Dial("tcp", addr)
if err != nil {
log.Fatal(err)
}
defer conn.Close()

// 获取控制节点信息
ctr, err := conn.Controller()
if err != nil {
log.Fatal(err)
}

connLeader, err := kafka.Dial("tcp", net.JoinHostPort(ctr.Host, strconv.Itoa(ctr.Port)))
if err != nil {
log.Fatal(err)
}
defer connLeader.Close()

fmt.Println("connect successful!")
}

2.消费消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package controller

import (
"context"
"fmt"
"log"
"time"

"github.com/segmentio/kafka-go"
)

// 连接至Kafka后接收消息
func Consume() {
conn, err := kafka.DialLeader(context.Background(), "tcp", addr, topic, partition)
if err != nil {
log.Fatal(err)
}

// 设置读取超时时间
conn.SetReadDeadline(time.Now().Add(time.Second * 10))

// 读取一批信息,得到的batch是一系列消息的迭代器
batch := conn.ReadBatch(10e3, 1e6)

// 遍历读取消息
b := make([]byte, 10e3)
for {
n, err := batch.Read(b)
if err != nil {
break
}
fmt.Println(string(b[:n]))
}

// 关闭batch
if err := batch.Close(); err != nil {
log.Fatal(err)
}

// 关闭连接
if err := conn.Close(); err != nil {
log.Fatal(err)
}
}

3.创建topic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package controller

import (
"fmt"
"log"
"net"
"strconv"

"github.com/segmentio/kafka-go"
)

// 创建topic
func CreateTopic() {
// 连接至任意的Kafka节点
conn, err := kafka.Dial("tcp", addr)
if err != nil {
log.Fatal(err)
}
defer conn.Close()

// 获取当前控制节点的信息
ctr, err := conn.Controller()
if err != nil {
log.Fatal(err)
}

// 连接至leader节点
ctrConn, err := kafka.Dial("tcp", net.JoinHostPort(ctr.Host, strconv.Itoa(ctr.Port)))
if err != nil {
log.Fatal(err)
}
defer ctrConn.Close()

topicCfg := []kafka.TopicConfig{
{
Topic: "myTopic",
NumPartitions: 1,
ReplicationFactor: 1,
},
}

// 创建topic
err = ctrConn.CreateTopics(topicCfg...)
if err != nil {
log.Fatal(err)
}

fmt.Println("create topic successful!")
}

// 获取topic列表
func ListTopics() {
conn, err := kafka.Dial("tcp", addr)
if err != nil {
log.Fatal(err)
}
defer conn.Close()

// 获取所有分区
partitions, err := conn.ReadPartitions()
m := make(map[string]int)

// 遍历所有分区获取topic
for _, v := range partitions {
m[v.Topic]++
}
for k, v := range m {
fmt.Println(k, v)
}
}

5.Reader方式接收消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package controller

import (
"context"
"fmt"
"log"

"github.com/segmentio/kafka-go"
)

// 通过Reader接收消息
func readByReader() {
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{addr},
Topic: topic,
// GroupID: groupid, 指定消费组id
Partition: partition,
MinBytes: 10e6,
// CommitInterval: time, 提交偏移量的时间间隔
})

// 设置offset,使用消费组id时,不需要设置偏移。
r.SetOffset(42)

// 接收消息
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
log.Fatal(err)
break
}
fmt.Printf("message at offset %d: %s = %s \n", m.Offset, string(m.Key), string(m.Value))
}

// 程序退出前关闭reader
r.Close()
}

6.writer方式发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package controller

import (
"context"
"log"

"github.com/segmentio/kafka-go"
)

// 通过writer发送消息
func writerByWriter() {
w := kafka.Writer{
Addr: kafka.TCP(addr),
Topic: topic,
// 指定分区的balancer模式为最小字节分布
Balancer: &kafka.LeastBytes{},
// ack模式
RequiredAcks: kafka.RequireAll,
// 异步
Async: true,
AllowAutoTopicCreation: true, // 自动创建topic
}

err := w.WriteMessages(context.Background(), kafka.Message{
Key: []byte("key-01"),
Value: []byte("value-01"),
}, kafka.Message{
Key: []byte("key-02"),
Value: []byte("value-02"),
}, kafka.Message{
Key: []byte("key-03"),
Value: []byte("value-03"),
})

if err != nil {
log.Fatal(err)
}
}