go操作kafka
本文使用的是kafka-go这个库,执行下面的命令安装
1
| go get github.com/segmentio/kafka-go
|
下面是常见的使用方式。
1.连接到Kafka
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| package controller
import ( "context" "fmt" "log" "net" "strconv" "time"
"github.com/segmentio/kafka-go" )
func Connection() { conn, err := kafka.DialLeader(context.Background(), "tcp", addr, topic, partition) if err != nil { log.Fatal(err) }
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
_, err = conn.WriteMessages( kafka.Message{Value: []byte("one!")}, kafka.Message{Value: []byte("two!")}, kafka.Message{Value: []byte("three!")}, )
if err != nil { log.Fatal(err) }
if err := conn.Close(); err != nil { log.Fatal(err) }
fmt.Println("successful!") }
func ConnectionNon() { conn, err := kafka.Dial("tcp", addr) if err != nil { log.Fatal(err) } defer conn.Close()
ctr, err := conn.Controller() if err != nil { log.Fatal(err) }
connLeader, err := kafka.Dial("tcp", net.JoinHostPort(ctr.Host, strconv.Itoa(ctr.Port))) if err != nil { log.Fatal(err) } defer connLeader.Close()
fmt.Println("connect successful!") }
|
2.消费消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| package controller
import ( "context" "fmt" "log" "time"
"github.com/segmentio/kafka-go" )
func Consume() { conn, err := kafka.DialLeader(context.Background(), "tcp", addr, topic, partition) if err != nil { log.Fatal(err) }
conn.SetReadDeadline(time.Now().Add(time.Second * 10))
batch := conn.ReadBatch(10e3, 1e6)
b := make([]byte, 10e3) for { n, err := batch.Read(b) if err != nil { break } fmt.Println(string(b[:n])) }
if err := batch.Close(); err != nil { log.Fatal(err) }
if err := conn.Close(); err != nil { log.Fatal(err) } }
|
3.创建topic
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| package controller
import ( "fmt" "log" "net" "strconv"
"github.com/segmentio/kafka-go" )
func CreateTopic() { conn, err := kafka.Dial("tcp", addr) if err != nil { log.Fatal(err) } defer conn.Close()
ctr, err := conn.Controller() if err != nil { log.Fatal(err) }
ctrConn, err := kafka.Dial("tcp", net.JoinHostPort(ctr.Host, strconv.Itoa(ctr.Port))) if err != nil { log.Fatal(err) } defer ctrConn.Close()
topicCfg := []kafka.TopicConfig{ { Topic: "myTopic", NumPartitions: 1, ReplicationFactor: 1, }, }
err = ctrConn.CreateTopics(topicCfg...) if err != nil { log.Fatal(err) }
fmt.Println("create topic successful!") }
func ListTopics() { conn, err := kafka.Dial("tcp", addr) if err != nil { log.Fatal(err) } defer conn.Close()
partitions, err := conn.ReadPartitions() m := make(map[string]int)
for _, v := range partitions { m[v.Topic]++ } for k, v := range m { fmt.Println(k, v) } }
|
5.Reader方式接收消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| package controller
import ( "context" "fmt" "log"
"github.com/segmentio/kafka-go" )
func readByReader() { r := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{addr}, Topic: topic, Partition: partition, MinBytes: 10e6, })
r.SetOffset(42)
for { m, err := r.ReadMessage(context.Background()) if err != nil { log.Fatal(err) break } fmt.Printf("message at offset %d: %s = %s \n", m.Offset, string(m.Key), string(m.Value)) }
r.Close() }
|
6.writer方式发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| package controller
import ( "context" "log"
"github.com/segmentio/kafka-go" )
func writerByWriter() { w := kafka.Writer{ Addr: kafka.TCP(addr), Topic: topic, Balancer: &kafka.LeastBytes{}, RequiredAcks: kafka.RequireAll, Async: true, AllowAutoTopicCreation: true, }
err := w.WriteMessages(context.Background(), kafka.Message{ Key: []byte("key-01"), Value: []byte("value-01"), }, kafka.Message{ Key: []byte("key-02"), Value: []byte("value-02"), }, kafka.Message{ Key: []byte("key-03"), Value: []byte("value-03"), })
if err != nil { log.Fatal(err) } }
|