{"id":37361,"date":"2025-05-15T22:27:36","date_gmt":"2025-05-15T14:27:36","guid":{"rendered":"https:\/\/www.wsisp.com\/helps\/37361.html"},"modified":"2025-05-15T22:27:36","modified_gmt":"2025-05-15T14:27:36","slug":"kafka-go%e5%ae%a2%e6%88%b7%e7%ab%af-sarama","status":"publish","type":"post","link":"https:\/\/www.wsisp.com\/helps\/37361.html","title":{"rendered":"Kafka Go\u5ba2\u6237\u7aef--Sarama"},"content":{"rendered":"<h3>Kafka Go\u5ba2\u6237\u7aef<\/h3>\n<p>\u5728Go\u4e2d\u91cc\u9762\u6709\u4e09\u4e2a\u6bd4\u8f83\u6709\u540d\u6c14\u7684Go\u5ba2\u6237\u7aef\u3002<\/p>\n<ul>\n<li>Sarama:\u7528\u6237\u6570\u91cf\u6700\u591a&#xff0c;\u65e9\u671f\u8fd9\u4e2a\u9879\u76ee\u662f\u5728Shopify\u4e0b\u9762&#xff0c;\u73b0\u5728\u632a\u5230\u4e86IBM\u4e0b\u3002<\/li>\n<li>segmentio\/kafka-go:\u6ca1\u5565\u5927\u7684\u7f3a\u70b9\u3002<\/li>\n<li>confluent-kafka-go&#xff1a;\u9700\u8981\u542f\u7528cgo,\u8de8\u5e73\u53f0\u95ee\u9898\u6bd4\u8f83\u591a&#xff0c;\u4ea4\u53c9\u7f16\u8bd1\u4e5f\u4e0d\u652f\u6301\u3002<\/li>\n<\/ul>\n<h4>Sarama \u4f7f\u7528\u5165\u95e8&#xff1a;tools<\/h4>\n<p>IBM\/sarama: Sarama is a Go library for Apache Kafka.<\/p>\n<p>\u5728 Sarama \u91cc\u9762\u63d0\u4f9b\u4e86\u4e00\u4e9b\u7b80\u5355\u7684\u547d\u4ee4\u884c\u5de5\u5177,\u53ef\u4ee5\u770b\u505a\u662f Shell\u811a\u672c\u63d0\u4f9b\u7684\u529f\u80fd\u4e00\u4e2a\u5b50\u96c6\u3002<\/p>\n<p>Consumer\u548c producer\u4e2d\u7684\u7528\u5f97\u6bd4\u8f83\u591a<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/www.wsisp.com\/helps\/wp-content\/uploads\/2025\/05\/20250515142734-6825f9d62cd5d.png\" alt=\"\u5728\u8fd9\u91cc\u63d2\u5165\u56fe\u7247\u63cf\u8ff0\" \/><\/p>\n<p>1.\u8bbe\u7f6e Go \u4ee3\u7406&#xff08;\u5982\u679c\u5185\u7f51\u65e0\u6cd5\u76f4\u8fde proxy.golang.org&#xff09;<\/p>\n<p>export GOPROXY&#061;https:\/\/goproxy.cn,direct<br \/>\nexport GOSUMDB&#061;sum.golang.google.cn<\/p>\n<p>2.\u5728\u865a\u62df\u673a\u4e0a\u6267\u884c\u5b89\u88c5\u547d\u4ee4&#xff1a;<\/p>\n<ul>\n<li>\u200b go install github.com\/IBM\/sar ama\/tools\/kafka-console-consumer&#064;latest<\/li>\n<li>\u200b go install github.com\/lBM\/sarama\/tools\/kafka-console-producer&#064;latest<\/li>\n<\/ul>\n<p>3.\u628a\u53ef\u6267\u884c\u6587\u4ef6\u6240\u5728\u76ee\u5f55\u52a0\u5230 PATH&#xff08;\u5982\u679c\u8fd8\u6ca1\u52a0&#xff09;<\/p>\n<p>export PATH&#061;$PATH:$(go env GOBIN)<\/p>\n<p>4.\u786e\u8ba4\u53ef\u6267\u884c\u6587\u4ef6\u5728\u54ea\u91cc<\/p>\n<p># \u67e5\u770b GOBIN&#xff0c;\u5982\u679c\u4f60\u6ca1\u663e\u5f0f\u8bbe\u7f6e&#xff0c;\u5c31\u4f1a\u662f\u7a7a<br \/>\ngo env GOBIN<\/p>\n<p># \u67e5\u770b GOPATH&#xff0c;\u9ed8\u8ba4\u662f $HOME\/go&#xff08;\u5bf9\u4e8e root \u7528\u6237\u5c31\u662f \/root\/go&#xff09;<br \/>\ngo env GOPATH<\/p>\n<p>#\u6211\u7684\u662f\/home\/cxz\/go\/lib:\/home\/cxz\/go\/work<\/p>\n<p>5.\u67e5\u770b\u5b89\u88c5\u7ed3\u679c<\/p>\n<p>ls \/home\/cxz\/go\/lib\/bin<br \/>\n#\u5e94\u8be5\u80fd\u591f\u770b\u5230kafka-console-consumer  kafka-console-producer<\/p>\n<p>6.\u4e34\u65f6\u751f\u6548<\/p>\n<p>export PATH&#061;$PATH:\/home\/cxz\/go\/lib\/bin<\/p>\n<p># \u7136\u540e\u9a8c\u8bc1<br \/>\nwhich kafka-console-consumer<br \/>\n# \u5e94\u8be5\u8f93\u51fa \/home\/cxz\/go\/lib\/bin\/kafka-console-consumer<\/p>\n<p>7.\u6c38\u4e45\u751f\u6548<\/p>\n<p>echo &#039;export PATH&#061;$PATH:\/home\/cxz\/go\/lib\/bin&#039; &gt;&gt; ~\/.bashrc<br \/>\n# \u6216\u8005&#xff0c;\u5982\u679c\u4f60\u7528\u7684\u662f zsh&#xff1a;<br \/>\n# echo &#039;export PATH&#061;$PATH:\/home\/cxz\/go\/lib\/bin&#039; &gt;&gt; ~\/.zshrc<\/p>\n<p># \u7136\u540e\u91cd\u65b0\u52a0\u8f7d\u914d\u7f6e<br \/>\nsource ~\/.bashrc<\/p>\n<h4>Sarama \u4f7f\u7528\u5165\u95e8&#xff1a;\u53d1\u9001\u6d88\u606f<\/h4>\n<p>\u865a\u62df\u673a\u4e0a\u6267\u884c<\/p>\n<p>kafka-console-consumer -topic&#061;test_topic -brokers&#061;192.168.24.101:9094<\/p>\n<p>Goland\u4e0a\u6267\u884c<\/p>\n<p><span class=\"token keyword\">package<\/span> main<\/p>\n<p><span class=\"token keyword\">import<\/span> <span class=\"token punctuation\">(<\/span><br \/>\n<span class=\"token string\">&#034;github.com\/IBM\/sarama&#034;<\/span><br \/>\n<span class=\"token string\">&#034;github.com\/stretchr\/testify\/assert&#034;<\/span><br \/>\n<span class=\"token string\">&#034;testing&#034;<\/span><br \/>\n<span class=\"token punctuation\">)<\/span><\/p>\n<p><span class=\"token keyword\">var<\/span> addrs <span class=\"token operator\">&#061;<\/span> <span class=\"token punctuation\">[<\/span><span class=\"token punctuation\">]<\/span><span class=\"token builtin\">string<\/span><span class=\"token punctuation\">{<\/span><span class=\"token string\">&#034;192.168.24.101:9094&#034;<\/span><span class=\"token punctuation\">}<\/span><\/p>\n<p><span class=\"token keyword\">func<\/span> <span class=\"token function\">TestSyncProducer<\/span><span class=\"token punctuation\">(<\/span>t <span class=\"token operator\">*<\/span>testing<span class=\"token punctuation\">.<\/span>T<span class=\"token punctuation\">)<\/span> <span class=\"token punctuation\">{<\/span><br \/>\n    <span class=\"token comment\">\/\/\u521b\u5efa\u4e00\u4e2a Sarama \u7684\u914d\u7f6e\u5bf9\u8c61\u3002<\/span><br \/>\ncfg <span class=\"token operator\">:&#061;<\/span> sarama<span class=\"token punctuation\">.<\/span><span class=\"token function\">NewConfig<\/span><span class=\"token punctuation\">(<\/span><span class=\"token punctuation\">)<\/span><br \/>\n    <span class=\"token comment\">\/\/\u8868\u793a\u751f\u4ea7\u8005\u8981\u7b49\u5f85 Kafka \u786e\u8ba4\u6d88\u606f\u6210\u529f\u5199\u5165\u540e\u518d\u8fd4\u56de&#xff08;\u540c\u6b65\u6a21\u5f0f&#xff09;\u3002\u5982\u679c\u4e0d\u8bbe\u7f6e\u8fd9\u4e2a&#xff0c;SyncProducer.SendMessage \u4f1a\u4e00\u76f4\u5931\u8d25\u3002<\/span><br \/>\ncfg<span class=\"token punctuation\">.<\/span>Producer<span class=\"token punctuation\">.<\/span>Return<span class=\"token punctuation\">.<\/span>Successes <span class=\"token operator\">&#061;<\/span> <span class=\"token boolean\">true<\/span> <span class=\"token comment\">\/\/\u540c\u6b65\u7684Producer\u4e00\u5b9a\u8981\u8bbe\u7f6e<\/span><br \/>\n    <span class=\"token comment\">\/\/\u521b\u5efa\u4e00\u4e2a\u540c\u6b65\u7684\u751f\u4ea7\u8005\u5b9e\u4f8b<\/span><br \/>\nproducer<span class=\"token punctuation\">,<\/span> err <span class=\"token operator\">:&#061;<\/span> sarama<span class=\"token punctuation\">.<\/span><span class=\"token function\">NewSyncProducer<\/span><span class=\"token punctuation\">(<\/span>addrs<span class=\"token punctuation\">,<\/span> cfg<span class=\"token punctuation\">)<\/span><br \/>\nassert<span class=\"token punctuation\">.<\/span><span class=\"token function\">NoError<\/span><span class=\"token punctuation\">(<\/span>t<span class=\"token punctuation\">,<\/span> err<span class=\"token punctuation\">)<\/span><br \/>\n    <span class=\"token comment\">\/\/\u6784\u5efa\u6d88\u606f\u5e76\u53d1\u9001<\/span><br \/>\n<span class=\"token boolean\">_<\/span><span class=\"token punctuation\">,<\/span> <span class=\"token boolean\">_<\/span><span class=\"token punctuation\">,<\/span> err <span class=\"token operator\">&#061;<\/span> producer<span class=\"token punctuation\">.<\/span><span class=\"token function\">SendMessage<\/span><span class=\"token punctuation\">(<\/span><span class=\"token operator\">&amp;<\/span>sarama<span class=\"token punctuation\">.<\/span>ProducerMessage<span class=\"token punctuation\">{<\/span><br \/>\nTopic<span class=\"token punctuation\">:<\/span> <span class=\"token string\">&#034;test_topic&#034;<\/span><span class=\"token punctuation\">,<\/span><br \/>\n<span class=\"token comment\">\/\/\u6d88\u606f\u6570\u636e\u672c\u4f53<\/span><br \/>\nValue<span class=\"token punctuation\">:<\/span> sarama<span class=\"token punctuation\">.<\/span><span class=\"token function\">StringEncoder<\/span><span class=\"token punctuation\">(<\/span><span class=\"token string\">&#034;hello world ,\u8fd9\u662f\u4e00\u6761\u4f7f\u7528kafka\u7684\u6d88\u606f&#034;<\/span><span class=\"token punctuation\">)<\/span><span class=\"token punctuation\">,<\/span><br \/>\n<span class=\"token comment\">\/\/\u4f1a\u5728\u751f\u4ea7\u8005\u548c\u6d88\u8d39\u8005\u4e4b\u95f4\u4f20\u9012&#xff0c;\u6d88\u606f\u5934&#xff0c;\u53ef\u4f20\u9012\u81ea\u5b9a\u4e49\u952e\u503c\u5bf9&#xff0c;\u6bd4\u5982 trace_id \u7528\u4e8e\u94fe\u8def\u8ffd\u8e2a\u3002<\/span><br \/>\nHeaders<span class=\"token punctuation\">:<\/span> <span class=\"token punctuation\">[<\/span><span class=\"token punctuation\">]<\/span>sarama<span class=\"token punctuation\">.<\/span>RecordHeader<span class=\"token punctuation\">{<\/span><br \/>\n<span class=\"token punctuation\">{<\/span><br \/>\nKey<span class=\"token punctuation\">:<\/span>   <span class=\"token punctuation\">[<\/span><span class=\"token punctuation\">]<\/span><span class=\"token function\">byte<\/span><span class=\"token punctuation\">(<\/span><span class=\"token string\">&#034;trace_id&#034;<\/span><span class=\"token punctuation\">)<\/span><span class=\"token punctuation\">,<\/span><br \/>\nValue<span class=\"token punctuation\">:<\/span> <span class=\"token punctuation\">[<\/span><span class=\"token punctuation\">]<\/span><span class=\"token function\">byte<\/span><span class=\"token punctuation\">(<\/span><span class=\"token string\">&#034;123456&#034;<\/span><span class=\"token punctuation\">)<\/span><span class=\"token punctuation\">,<\/span><br \/>\n<span class=\"token punctuation\">}<\/span><span class=\"token punctuation\">,<\/span><br \/>\n<span class=\"token punctuation\">}<\/span><span class=\"token punctuation\">,<\/span><br \/>\n<span class=\"token comment\">\/\/\u53ea\u4f5c\u7528\u4e8e\u53d1\u9001\u8fc7\u7a0b\u3002\u5143\u4fe1\u606f&#xff0c;\u5728\u53d1\u9001\u8fc7\u7a0b\u4e2d\u4f7f\u7528&#xff0c;\u53ef\u4ee5\u7528\u6765\u4f20\u9012\u989d\u5916\u4fe1\u606f&#xff0c;\u53d1\u9001\u5b8c\u6210\u540e\u4f1a\u539f\u6837\u8fd4\u56de&#xff08;\u4e0d\u4f1a\u4f20\u7ed9\u6d88\u8d39\u8005&#xff09;\u3002<\/span><br \/>\nMetadata<span class=\"token punctuation\">:<\/span> <span class=\"token string\">&#034;\u8fd9\u662fmetadata&#034;<\/span><span class=\"token punctuation\">,<\/span><br \/>\n<span class=\"token punctuation\">}<\/span><span class=\"token punctuation\">)<\/span><br \/>\nassert<span class=\"token punctuation\">.<\/span><span class=\"token function\">NoError<\/span><span class=\"token punctuation\">(<\/span>t<span class=\"token punctuation\">,<\/span> err<span class=\"token punctuation\">)<\/span><br \/>\n<span class=\"token punctuation\">}<\/span><\/p>\n<p>10.\u6267\u884c\u7ed3\u679c<\/p>\n<p>Partition:0<br \/>\nOffset:0<br \/>\nKey:<br \/>\nValue:hello world ,\u8fd9\u662f\u4e00\u6761\u4f7f\u7528kafka\u7684\u6d88\u606f<\/p>\n<p>\u4f7f\u7528\u63a7\u5236\u53f0\u5de5\u5177\u8fde\u63a5Kafka<\/p>\n<h4>Sarama \u4f7f\u7528\u5165\u95e8&#xff1a;\u6307\u5b9a\u5206\u533a<\/h4>\n<p>\u53ef\u4ee5\u6ce8\u610f\u5230,\u524d\u9762\u6240\u6709\u7684\u6d88\u606f\u90fd\u88ab\u53d1\u9001\u5230\u4e86 Partition 0 \u4e0a\u9762\u3002<\/p>\n<p>\u6b63\u5e38\u6765\u8bf4,\u5728 Sarama \u91cc\u9762,\u53ef\u4ee5\u901a\u8fc7\u6307\u5b9a config \u4e2d\u7684Partitioner\u6765\u6307\u5b9a\u6700\u7ec8\u7684\u76ee\u6807\u5206\u533a\u3002<\/p>\n<p>\u5e38\u89c1\u7684\u65b9\u6cd5:<\/p>\n<ul>\n<li>\u200b Random:\u968f\u673a\u6311\u4e00\u4e2a\u3002<\/li>\n<li>\u200b RoundRobin:\u8f6e\u8be2\u3002<\/li>\n<li>\u200b Hash(\u9ed8\u8ba4):\u6839\u636e key \u7684\u54c8\u5e0c\u503c\u6765\u7b5b\u9009\u4e00\u4e2a\u3002<\/li>\n<li>\u200b Manual: \u6839\u636e Message \u4e2d\u7684 partition \u5b57\u6bb5\u6765\u9009\u62e9\u3002<\/li>\n<li>\u200b ConsistentCRC:\u4e00\u81f4\u6027\u54c8\u5e0c&#xff0c;\u7528\u7684\u662f CRC32 \u7b97\u6cd5\u3002<\/li>\n<li>\u200b Custom:\u5b9e\u9645\u4e0a\u4e0d Custom,\u800c\u662f\u81ea\u5b9a\u4e49\u4e00\u90e8\u5206Hash \u7684\u53c2\u6570,\u672c\u8d28\u4e0a\u662f\u4e00\u4e2a Hash \u7684\u5b9e\u73b0\u3002<\/li>\n<\/ul>\n<p><span class=\"token comment\">\/\/\u9ed8\u8ba4HashPartitioner  \u9002\u5408&#xff1a; \u6309\u7528\u6237 ID\u3001\u8ba2\u5355 ID \u7b49\u5b57\u6bb5\u5206\u533a\u573a\u666f<\/span><br \/>\ncfg<span class=\"token punctuation\">.<\/span>Producer<span class=\"token punctuation\">.<\/span>Partitioner <span class=\"token operator\">&#061;<\/span> sarama<span class=\"token punctuation\">.<\/span>NewHashPartitioner<br \/>\n<span class=\"token comment\">\/\/\u4f7f\u7528 CRC32 \u7b97\u6cd5 \u8ba1\u7b97 Key \u7684\u54c8\u5e0c\u3002 \u9002\u5408&#xff1a; \u9700\u8981\u9ad8\u4e00\u81f4\u6027\u5206\u5e03\u7684\u4e1a\u52a1&#xff0c;\u4f8b\u5982\u65e5\u5fd7\u6536\u96c6\u7cfb\u7edf<\/span><br \/>\ncfg<span class=\"token punctuation\">.<\/span>Producer<span class=\"token punctuation\">.<\/span>Partitioner <span class=\"token operator\">&#061;<\/span> sarama<span class=\"token punctuation\">.<\/span>NewConsistentCRCHashPartitioner<br \/>\n<span class=\"token comment\">\/\/\u5ffd\u7565 Key&#xff0c;\u6bcf\u6761\u6d88\u606f\u968f\u673a\u5206\u914d partition\u3002  \u9002\u5408&#xff1a; \u666e\u901a\u6d88\u606f\u961f\u5217\u3001\u5e7f\u64ad\u7c7b\u573a\u666f\u3002<\/span><br \/>\ncfg<span class=\"token punctuation\">.<\/span>Producer<span class=\"token punctuation\">.<\/span>Partitioner <span class=\"token operator\">&#061;<\/span> sarama<span class=\"token punctuation\">.<\/span>NewRandomPartitioner<br \/>\n<span class=\"token comment\">\/\/\u9700\u8981\u624b\u52a8\u6307\u5b9a partition&#xff08;ProducerMessage.Partition \u5b57\u6bb5&#xff09;\u3002\u9002\u5408&#xff1a; \u660e\u786e\u77e5\u9053\u8981\u5199\u54ea\u4e2a partition&#xff0c;\u4f8b\u5982\u505a\u6570\u636e\u5206\u6d41<\/span><br \/>\ncfg<span class=\"token punctuation\">.<\/span>Producer<span class=\"token punctuation\">.<\/span>Partitioner <span class=\"token operator\">&#061;<\/span> sarama<span class=\"token punctuation\">.<\/span>NewManualPartitioner<br \/>\n<span class=\"token comment\">\/\/\u7528\u4e8e\u5b9e\u73b0\u4f60\u81ea\u5df1\u7684 Partitioner  \u4e00\u822c\u4e0d\u63a8\u8350\u4f7f\u7528\u8fd9\u4e2a\u7a7a\u53c2\u51fd\u6570&#xff08;\u5b83\u4f1a panic&#xff09;&#xff0c;\u5e94\u5b9e\u73b0\u5b8c\u6574\u63a5\u53e3\u3002<\/span><br \/>\ncfg<span class=\"token punctuation\">.<\/span>Producer<span class=\"token punctuation\">.<\/span>Partitioner <span class=\"token operator\">&#061;<\/span> sarama<span class=\"token punctuation\">.<\/span><span class=\"token function\">NewCustomPartitioner<\/span><span class=\"token punctuation\">(<\/span><span class=\"token punctuation\">)<\/span><br \/>\n<span class=\"token comment\">\/\/\u5141\u8bb8\u4f60\u4f7f\u7528\u81ea\u5b9a\u4e49\u54c8\u5e0c\u51fd\u6570\u6765\u505a key \u5206\u533a\u3002  \u9002\u5408&#xff1a; \u6709\u7279\u5b9a\u54c8\u5e0c\u7b56\u7565\u9700\u6c42\u65f6&#xff0c;\u4f8b\u5982\u5206\u5e03\u8981\u5c3d\u53ef\u80fd\u5747\u5300\u3002<\/span><br \/>\ncfg<span class=\"token punctuation\">.<\/span>Producer<span class=\"token punctuation\">.<\/span>Partitioner <span class=\"token operator\">&#061;<\/span> sarama<span class=\"token punctuation\">.<\/span><span class=\"token function\">NewCustomHashPartitioner<\/span><span class=\"token punctuation\">(<\/span><span class=\"token keyword\">func<\/span><span class=\"token punctuation\">(<\/span><span class=\"token punctuation\">)<\/span> hash<span class=\"token punctuation\">.<\/span>Hash32 <span class=\"token punctuation\">{<\/span><\/p>\n<p><span class=\"token punctuation\">}<\/span><span class=\"token punctuation\">)<\/span><\/p>\n<p>Topic<span class=\"token punctuation\">:<\/span> <span class=\"token string\">&#034;test_topic&#034;<\/span><span class=\"token punctuation\">,<\/span><br \/>\n<span class=\"token comment\">\/\/\u5206\u533a\u4f9d\u636e<\/span><br \/>\nKey<span class=\"token punctuation\">:<\/span>   sarama<span class=\"token punctuation\">.<\/span><span class=\"token function\">StringEncoder<\/span><span class=\"token punctuation\">(<\/span><span class=\"token string\">&#034;user_123&#034;<\/span><span class=\"token punctuation\">)<\/span><span class=\"token punctuation\">,<\/span> <span class=\"token comment\">\/\/ &#x1f511; \u8fd9\u91cc\u662f\u5206\u533a\u4f9d\u636e<\/span><br \/>\n<span class=\"token comment\">\/\/\u6d88\u606f\u6570\u636e\u672c\u4f53<\/span><br \/>\nValue<span class=\"token punctuation\">:<\/span> sarama<span class=\"token punctuation\">.<\/span><span class=\"token function\">StringEncoder<\/span><span class=\"token punctuation\">(<\/span><span class=\"token string\">&#034;hello world ,\u8fd9\u662f\u4e00\u6761\u4f7f\u7528kafka\u7684\u6d88\u606f&#034;<\/span><span class=\"token punctuation\">)<\/span><span class=\"token punctuation\">,<\/span><\/p>\n<p>\u6700\u5178\u578b\u7684\u573a\u666f&#xff0c;\u5c31\u662f\u5229\u7528Partitioner\u6765\u4fdd\u8bc1\u540c\u4e00\u4e2a\u4e1a\u52a1\u7684\u6d88\u606f\u4e00\u5b9a\u53d1\u9001\u5230\u540c\u4e00\u4e2a\u5206\u533a\u4e0a&#xff0c;\u4ece\u800c\u4fdd\u8bc1\u4e1a \u6709\u5e8f\u3002<\/p>\n<h4>Sarama \u4f7f\u7528\u5165\u95e8&#xff1a;\u5f02\u6b65\u53d1\u9001<\/h4>\n<p>Sarama\u6709\u4e00\u4e2a\u5f02\u6b65\u53d1\u9001\u7684producer&#xff0c;\u5b83\u7684\u7528\u6cd5\u7a0d\u5fae\u590d\u6742\u4e00\u70b9\u3002<\/p>\n<ul>\n<li>\u200b \u628aReturn.Success\u548c Errors\u90fd\u8bbe\u7f6e\u4e3atrue&#xff0c;\u8fd9\u662f\u4e3a\u4e86\u540e\u9762\u80fd\u591f\u62ff\u5230\u53d1\u9001\u7ed3\u679c\u3002<\/li>\n<li>\u200b \u521d\u59cb\u5316\u5f02\u6b65producer\u3002<\/li>\n<li>\u200b \u4eceproducer\u91cc\u9762\u62ff\u5230Input\u7684channel,\u5e76\u4e14\u53d1\u9001 \u4e00\u6761\u6d88\u606f\u3002<\/li>\n<li>\u200b \u5229\u7528select case&#xff0c;\u540c\u65f6**\u76d1\u542cSuccess\u548cError\u4e24\u4e2achannel,**\u6765\u83b7\u5f97\u53d1\u9001\u6210\u529f\u4e0e\u5426\u7684\u4fe1\u606f\u3002<\/li>\n<\/ul>\n<p><span class=\"token keyword\">func<\/span> <span class=\"token function\">TestAsyncProducer<\/span><span class=\"token punctuation\">(<\/span>t <span class=\"token operator\">*<\/span>testing<span class=\"token punctuation\">.<\/span>T<span class=\"token punctuation\">)<\/span> <span class=\"token punctuation\">{<\/span><br \/>\ncfg <span class=\"token operator\">:&#061;<\/span> sarama<span class=\"token punctuation\">.<\/span><span class=\"token function\">NewConfig<\/span><span class=\"token punctuation\">(<\/span><span class=\"token punctuation\">)<\/span><br \/>\n<span class=\"token comment\">\/\/\u600e\u4e48\u77e5\u9053\u53d1\u9001\u662f\u5426\u6210\u529f<\/span><br \/>\ncfg<span class=\"token punctuation\">.<\/span>Producer<span class=\"token punctuation\">.<\/span>Return<span class=\"token punctuation\">.<\/span>Errors <span class=\"token operator\">&#061;<\/span> <span class=\"token boolean\">true<\/span><br \/>\ncfg<span class=\"token punctuation\">.<\/span>Producer<span class=\"token punctuation\">.<\/span>Return<span class=\"token punctuation\">.<\/span>Successes <span class=\"token operator\">&#061;<\/span> <span class=\"token boolean\">true<\/span><br \/>\nproducer<span class=\"token punctuation\">,<\/span> err <span class=\"token operator\">:&#061;<\/span> sarama<span class=\"token punctuation\">.<\/span><span class=\"token function\">NewAsyncProducer<\/span><span class=\"token punctuation\">(<\/span>addrs<span class=\"token punctuation\">,<\/span> cfg<span class=\"token punctuation\">)<\/span><br \/>\nrequire<span class=\"token punctuation\">.<\/span><span class=\"token function\">NoError<\/span><span class=\"token punctuation\">(<\/span>t<span class=\"token punctuation\">,<\/span> err<span class=\"token punctuation\">)<\/span><br \/>\nmessages <span class=\"token operator\">:&#061;<\/span> producer<span class=\"token punctuation\">.<\/span><span class=\"token function\">Input<\/span><span class=\"token punctuation\">(<\/span><span class=\"token punctuation\">)<\/span><br \/>\n<span class=\"token keyword\">go<\/span> <span class=\"token keyword\">func<\/span><span class=\"token punctuation\">(<\/span><span class=\"token punctuation\">)<\/span> <span class=\"token punctuation\">{<\/span><br \/>\n<span class=\"token keyword\">for<\/span> <span class=\"token punctuation\">{<\/span><br \/>\nmessages <span class=\"token operator\">&lt;-<\/span> <span class=\"token operator\">&amp;<\/span>sarama<span class=\"token punctuation\">.<\/span>ProducerMessage<span class=\"token punctuation\">{<\/span><br \/>\nTopic<span class=\"token punctuation\">:<\/span> <span class=\"token string\">&#034;test_topic&#034;<\/span><span class=\"token punctuation\">,<\/span><br \/>\n<span class=\"token comment\">\/\/\u5206\u533a\u4f9d\u636e<\/span><br \/>\nKey<span class=\"token punctuation\">:<\/span> sarama<span class=\"token punctuation\">.<\/span><span class=\"token function\">StringEncoder<\/span><span class=\"token punctuation\">(<\/span><span class=\"token string\">&#034;user_123&#034;<\/span><span class=\"token punctuation\">)<\/span><span class=\"token punctuation\">,<\/span> <span class=\"token comment\">\/\/ &#x1f511; \u8fd9\u91cc\u662f\u5206\u533a\u4f9d\u636e<\/span><br \/>\n<span class=\"token comment\">\/\/\u6d88\u606f\u6570\u636e\u672c\u4f53<\/span><br \/>\nValue<span class=\"token punctuation\">:<\/span> sarama<span class=\"token punctuation\">.<\/span><span class=\"token function\">StringEncoder<\/span><span class=\"token punctuation\">(<\/span><span class=\"token string\">&#034;hello world ,\u8fd9\u662f\u4e00\u6761\u4f7f\u7528kafka\u7684\u6d88\u606f&#034;<\/span><span class=\"token punctuation\">)<\/span><span class=\"token punctuation\">,<\/span><br \/>\n<span class=\"token comment\">\/\/\u4f1a\u5728\u751f\u4ea7\u8005\u548c\u6d88\u8d39\u8005\u4e4b\u95f4\u4f20\u9012<\/span><br \/>\nHeaders<span class=\"token punctuation\">:<\/span> <span class=\"token punctuation\">[<\/span><span class=\"token punctuation\">]<\/span>sarama<span class=\"token punctuation\">.<\/span>RecordHeader<span class=\"token punctuation\">{<\/span><br \/>\n<span class=\"token punctuation\">{<\/span><br \/>\nKey<span class=\"token punctuation\">:<\/span>   <span class=\"token punctuation\">[<\/span><span class=\"token punctuation\">]<\/span><span class=\"token function\">byte<\/span><span class=\"token punctuation\">(<\/span><span class=\"token string\">&#034;trace_id&#034;<\/span><span class=\"token punctuation\">)<\/span><span class=\"token punctuation\">,<\/span><br \/>\nValue<span class=\"token punctuation\">:<\/span> <span class=\"token punctuation\">[<\/span><span class=\"token punctuation\">]<\/span><span class=\"token function\">byte<\/span><span class=\"token punctuation\">(<\/span><span class=\"token string\">&#034;123456&#034;<\/span><span class=\"token punctuation\">)<\/span><span class=\"token punctuation\">,<\/span><br \/>\n<span class=\"token punctuation\">}<\/span><span class=\"token punctuation\">,<\/span><br \/>\n<span class=\"token punctuation\">}<\/span><span class=\"token punctuation\">,<\/span><br \/>\n<span class=\"token comment\">\/\/\u53ea\u4f5c\u7528\u4e8e\u53d1\u9001\u8fc7\u7a0b<\/span><br \/>\nMetadata<span class=\"token punctuation\">:<\/span> <span class=\"token string\">&#034;\u8fd9\u662fmetadata&#034;<\/span><span class=\"token punctuation\">,<\/span><br \/>\n<span class=\"token punctuation\">}<\/span><br \/>\n<span class=\"token punctuation\">}<\/span><br \/>\n<span class=\"token punctuation\">}<\/span><span class=\"token punctuation\">(<\/span><span class=\"token punctuation\">)<\/span><\/p>\n<p>errCh <span class=\"token operator\">:&#061;<\/span> producer<span class=\"token punctuation\">.<\/span><span class=\"token function\">Errors<\/span><span class=\"token punctuation\">(<\/span><span class=\"token punctuation\">)<\/span><br \/>\nsuccCh <span class=\"token operator\">:&#061;<\/span> producer<span class=\"token punctuation\">.<\/span><span class=\"token function\">Successes<\/span><span class=\"token punctuation\">(<\/span><span class=\"token punctuation\">)<\/span><br \/>\n<span class=\"token keyword\">for<\/span> <span class=\"token punctuation\">{<\/span><br \/>\n<span class=\"token comment\">\/\/\u4e24\u4e2a\u90fd\u4e0d\u6ee1\u8db3\u5c31\u4f1a\u963b\u585e<\/span><br \/>\n<span class=\"token keyword\">select<\/span> <span class=\"token punctuation\">{<\/span><br \/>\n<span class=\"token keyword\">case<\/span> err <span class=\"token operator\">:&#061;<\/span> <span class=\"token operator\">&lt;-<\/span>errCh<span class=\"token punctuation\">:<\/span><br \/>\nt<span class=\"token punctuation\">.<\/span><span class=\"token function\">Log<\/span><span class=\"token punctuation\">(<\/span><span class=\"token string\">&#034;\u53d1\u9001\u51fa\u4e86\u95ee\u9898&#034;<\/span><span class=\"token punctuation\">,<\/span> err<span class=\"token punctuation\">.<\/span>Err<span class=\"token punctuation\">)<\/span><br \/>\n<span class=\"token keyword\">case<\/span> <span class=\"token operator\">&lt;-<\/span>succCh<span class=\"token punctuation\">:<\/span><br \/>\nt<span class=\"token punctuation\">.<\/span><span class=\"token function\">Log<\/span><span class=\"token punctuation\">(<\/span><span class=\"token string\">&#034;\u53d1\u9001\u6210\u529f&#034;<\/span><span class=\"token punctuation\">)<\/span><br \/>\n<span class=\"token punctuation\">}<\/span><br \/>\n<span class=\"token punctuation\">}<\/span><br \/>\n<span class=\"token punctuation\">}<\/span><\/p>\n<h4>Sarama \u4f7f\u7528\u5165\u95e8&#xff1a;acks<\/h4>\n<p>\u5728Kafka\u91cc\u9762&#xff0c;\u751f\u4ea7\u8005\u5728\u53d1\u9001\u6570\u636e\u7684\u65f6\u5019&#xff0c;\u6709\u4e00\u4e2a\u5f88\u5173\u952e\u7684\u53c2\u6570,\u5c31\u662f acks\u3002 \u6709\u4e09\u4e2a\u53d6\u503c&#xff1a;<\/p>\n<ul>\n<li>\u200b 0&#xff1a;\u5ba2\u6237\u7aef\u53d1\u4e00\u6b21&#xff0c;\u4e0d\u9700\u8981\u670d\u52a1\u7aef\u7684\u786e\u8ba4\u3002<\/li>\n<li>\u200b 1:\u5ba2\u6237\u7aef\u53d1\u9001&#xff0c;\u5e76\u4e14\u9700\u8981\u670d\u52a1\u7aef\u5199\u5165\u5230\u4e3b\u5206\u533a\u3002<\/li>\n<li>\u200b -1&#xff1a;\u5ba2\u6237\u7aef\u53d1\u9001&#xff0c;\u5e76\u4e14\u9700\u8981\u670d\u52a1\u7aef\u540c\u6b65\u5230\u6240\u6709\u7684ISR \u4e0a\u3002<\/li>\n<\/ul>\n<p>\u4ece\u4e0a\u5230\u4e0b&#xff0c;\u6027\u80fd\u53d8\u5dee&#xff0c;\u4f46\u662f\u6570\u636e\u53ef\u9760\u6027\u4e0a\u5347\u3002\u9700\u8981\u6027\u80fd&#xff0c;\u9009 0&#xff0c;\u9700\u8981\u6d88\u606f\u4e0d\u4e22\u5931&#xff0c;\u9009-1\u3002<\/p>\n<p>\u7406\u89e3acks\u4f60\u5c31\u8981\u6293\u4f4f\u6838\u5fc3\u70b9&#xff0c;\u8c01ack\u624d\u7b97\u6570&#xff1f;<\/p>\n<ul>\n<li>0&#xff1a;TCP\u534f\u8bae\u8fd4\u56de\u4e86ack\u5c31\u53ef\u4ee5\u3002<\/li>\n<li>1&#xff1a;\u4e3b\u5206\u533a\u786e\u8ba4\u5199\u5165\u4e86\u5c31\u53ef\u4ee5\u3002<\/li>\n<li>-1&#xff1a;\u6240\u6709\u7684ISR\u90fd\u786e\u8ba4\u4e86\u5c31\u53ef\u4ee5\u3002<\/li>\n<\/ul>\n<p><img decoding=\"async\" src=\"https:\/\/www.wsisp.com\/helps\/wp-content\/uploads\/2025\/05\/20250515142734-6825f9d6951e8.png\" alt=\"\u5728\u8fd9\u91cc\u63d2\u5165\u56fe\u7247\u63cf\u8ff0\" \/><\/p>\n<p>ISR &#xff08;In Sync Replicas&#xff09;&#xff0c;\u7528\u901a\u4fd7\u6613\u61c2\u7684\u8bdd\u6765\u8bf4,\u5c31\u662f\u8ddf\u4e0a\u4e86\u8282\u594f\u7684\u4ece\u5206\u533a\u3002<\/p>\n<p>\u4ec0\u4e48\u53eb\u505a\u8ddf\u4e0a\u4e86\u8282\u594f&#xff1f;\u5c31\u662f\u5b83\u548c\u4e3b\u5206\u533a\u4fdd\u6301\u4e86\u6570\u636e\u540c\u6b65\u3002<\/p>\n<p>\u6240\u4ee5&#xff0c;\u5f53\u6d88\u606f\u88ab\u540c\u6b65\u5230\u4ece\u5206\u533a\u4e4b\u540e&#xff0c;\u5982\u679c\u4e3b\u5206\u533a\u5d29\u6e83\u4e86\u90a3\u4e48\u4f9d\u65e7\u53ef\u4ee5\u4fdd\u8bc1\u5728\u4ece\u5206\u533a\u4e0a\u8fd8\u6709\u6570\u636e\u3002<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/www.wsisp.com\/helps\/wp-content\/uploads\/2025\/05\/20250515142735-6825f9d722950.png\" alt=\"\u5728\u8fd9\u91cc\u63d2\u5165\u56fe\u7247\u63cf\u8ff0\" \/><\/p>\n<h4>sarama \u4f7f\u7528\u5165\u95e8&#xff1a;\u542f\u52a8\u6d88\u8d39\u8005<\/h4>\n<p>Sarama\u7684\u6d88\u8d39\u8005\u8bbe\u8ba1\u4e0d\u662f\u5f88\u76f4\u89c2&#xff0c;\u7a0d\u5fae\u6709\u70b9\u590d\u6742\u3002<\/p>\n<ul>\n<li>\u200b \u9996\u5148\u8981\u521d\u59cb\u5316\u4e00\u4e2aConsumerGroup\u3002<\/li>\n<li>\u200b \u8c03\u7528ConsumerGroup\u4e0a\u7684Consume\u65b9\u6cd5\u3002<\/li>\n<li>\u200b \u4e3a Consume \u65b9\u6cd5\u4f20\u5165\u4e00\u4e2a ConsumerGroupHandler\u7684\u8f85\u52a9\u65b9\u6cd5\u3002<\/li>\n<\/ul>\n<p><span class=\"token keyword\">package<\/span> main<\/p>\n<p><span class=\"token keyword\">import<\/span> <span class=\"token punctuation\">(<\/span><br \/>\n<span class=\"token string\">&#034;context&#034;<\/span><br \/>\n<span class=\"token string\">&#034;github.com\/IBM\/sarama&#034;<\/span><br \/>\n<span class=\"token string\">&#034;github.com\/stretchr\/testify\/assert&#034;<\/span><br \/>\n<span class=\"token string\">&#034;log&#034;<\/span><br \/>\n<span class=\"token string\">&#034;testing&#034;<\/span><br \/>\n<span class=\"token punctuation\">)<\/span><\/p>\n<p><span class=\"token keyword\">func<\/span> <span class=\"token function\">TestConsumer<\/span><span class=\"token punctuation\">(<\/span>t <span class=\"token operator\">*<\/span>testing<span class=\"token punctuation\">.<\/span>T<span class=\"token punctuation\">)<\/span> <span class=\"token punctuation\">{<\/span><br \/>\ncfg <span class=\"token operator\">:&#061;<\/span> sarama<span class=\"token punctuation\">.<\/span><span class=\"token function\">NewConfig<\/span><span class=\"token punctuation\">(<\/span><span class=\"token punctuation\">)<\/span><br \/>\n<span class=\"token comment\">\/\/\u6b63\u5e38\u6765\u8bf4&#xff0c;\u4e00\u4e2a\u6d88\u8d39\u8005\u90fd\u662f\u5f52\u5c5e\u4e00\u4e2a\u6d88\u8d39\u8005\u7ec4\u7684<\/span><br \/>\n<span class=\"token comment\">\/\/\u6d88\u8d39\u8005\u5c31\u662f\u4f60\u7684\u4e1a\u52a1<\/span><br \/>\nconsumerGroup<span class=\"token punctuation\">,<\/span> err <span class=\"token operator\">:&#061;<\/span> sarama<span class=\"token punctuation\">.<\/span><span class=\"token function\">NewConsumerGroup<\/span><span class=\"token punctuation\">(<\/span>addrs<span class=\"token punctuation\">,<\/span> <span class=\"token string\">&#034;test_group&#034;<\/span><span class=\"token punctuation\">,<\/span> cfg<span class=\"token punctuation\">)<\/span><\/p>\n<p>assert<span class=\"token punctuation\">.<\/span><span class=\"token function\">NoError<\/span><span class=\"token punctuation\">(<\/span>t<span class=\"token punctuation\">,<\/span> err<span class=\"token punctuation\">)<\/span><br \/>\nerr <span class=\"token operator\">&#061;<\/span> consumerGroup<span class=\"token punctuation\">.<\/span><span class=\"token function\">Consume<\/span><span class=\"token punctuation\">(<\/span>context<span class=\"token punctuation\">.<\/span><span class=\"token function\">Background<\/span><span class=\"token punctuation\">(<\/span><span class=\"token punctuation\">)<\/span><span class=\"token punctuation\">,<\/span> <span class=\"token punctuation\">[<\/span><span class=\"token punctuation\">]<\/span><span class=\"token builtin\">string<\/span><span class=\"token punctuation\">{<\/span><span class=\"token string\">&#034;test_topic&#034;<\/span><span class=\"token punctuation\">}<\/span><span class=\"token punctuation\">,<\/span> testConsumerGroupHandler<span class=\"token punctuation\">{<\/span><span class=\"token punctuation\">}<\/span><span class=\"token punctuation\">)<\/span><br \/>\n<span class=\"token comment\">\/\/\u4f60\u6d88\u8d39\u7ed3\u675f&#xff0c;\u5c31\u4f1a\u5230\u8fd9\u91cc<\/span><br \/>\nt<span class=\"token punctuation\">.<\/span><span class=\"token function\">Log<\/span><span class=\"token punctuation\">(<\/span>err<span class=\"token punctuation\">)<\/span><br \/>\n<span class=\"token punctuation\">}<\/span><\/p>\n<p><span class=\"token keyword\">type<\/span> testConsumerGroupHandler <span class=\"token keyword\">struct<\/span> <span class=\"token punctuation\">{<\/span><br \/>\n<span class=\"token punctuation\">}<\/span><\/p>\n<p><span class=\"token keyword\">func<\/span> <span class=\"token punctuation\">(<\/span>t testConsumerGroupHandler<span class=\"token punctuation\">)<\/span> <span class=\"token function\">Setup<\/span><span class=\"token punctuation\">(<\/span>session sarama<span class=\"token punctuation\">.<\/span>ConsumerGroupSession<span class=\"token punctuation\">)<\/span> <span class=\"token builtin\">error<\/span> <span class=\"token punctuation\">{<\/span><br \/>\nlog<span class=\"token punctuation\">.<\/span><span class=\"token function\">Println<\/span><span class=\"token punctuation\">(<\/span><span class=\"token string\">&#034;Setup session:&#034;<\/span><span class=\"token punctuation\">,<\/span> session<span class=\"token punctuation\">)<\/span><br \/>\n<span class=\"token keyword\">return<\/span> <span class=\"token boolean\">nil<\/span><br \/>\n<span class=\"token punctuation\">}<\/span><\/p>\n<p><span class=\"token keyword\">func<\/span> <span class=\"token punctuation\">(<\/span>t testConsumerGroupHandler<span class=\"token punctuation\">)<\/span> <span class=\"token function\">Cleanup<\/span><span class=\"token punctuation\">(<\/span>session sarama<span class=\"token punctuation\">.<\/span>ConsumerGroupSession<span class=\"token punctuation\">)<\/span> <span class=\"token builtin\">error<\/span> <span class=\"token punctuation\">{<\/span><br \/>\nlog<span class=\"token punctuation\">.<\/span><span class=\"token function\">Println<\/span><span class=\"token punctuation\">(<\/span><span class=\"token string\">&#034;Cleanup session:&#034;<\/span><span class=\"token punctuation\">,<\/span> session<span class=\"token punctuation\">)<\/span><br \/>\n<span class=\"token keyword\">return<\/span> <span class=\"token boolean\">nil<\/span><br \/>\n<span class=\"token punctuation\">}<\/span><\/p>\n<p><span class=\"token keyword\">func<\/span> <span class=\"token punctuation\">(<\/span>t testConsumerGroupHandler<span class=\"token punctuation\">)<\/span> <span class=\"token function\">ConsumeClaim<\/span><span class=\"token punctuation\">(<\/span><br \/>\n<span class=\"token comment\">\/\/\u4ee3\u8868\u7684\u662f\u4f60\u548cKafka\u7684\u4f1a\u8bdd&#xff08;\u4ece\u5efa\u7acb\u8fde\u63a5\u5230\u8fde\u63a5\u5f7b\u5e95\u65ad\u6389\u7684\u90a3\u4e00\u6bb5\u65f6\u95f4&#xff09;<\/span><br \/>\nsession sarama<span class=\"token punctuation\">.<\/span>ConsumerGroupSession<span class=\"token punctuation\">,<\/span><br \/>\nclaim sarama<span class=\"token punctuation\">.<\/span>ConsumerGroupClaim<span class=\"token punctuation\">)<\/span> <span class=\"token builtin\">error<\/span> <span class=\"token punctuation\">{<\/span><br \/>\nmsgs <span class=\"token operator\">:&#061;<\/span> claim<span class=\"token punctuation\">.<\/span><span class=\"token function\">Messages<\/span><span class=\"token punctuation\">(<\/span><span class=\"token punctuation\">)<\/span><br \/>\n<span class=\"token keyword\">for<\/span> msg <span class=\"token operator\">:&#061;<\/span> <span class=\"token keyword\">range<\/span> msgs <span class=\"token punctuation\">{<\/span><br \/>\n<span class=\"token comment\">\/\/var bizMsg MyBizMsg<\/span><br \/>\n<span class=\"token comment\">\/\/err :&#061; json.Unmarshal(msg.Value, &amp;bizMsg)<\/span><br \/>\n<span class=\"token comment\">\/\/if err !&#061; nil {<\/span><br \/>\n<span class=\"token comment\">\/\/\/\/\u8fd9\u5c31\u662f\u6d88\u8d39\u6d88\u606f\u51fa\u9519<\/span><br \/>\n<span class=\"token comment\">\/\/\/\/\u5927\u591a\u6570\u65f6\u5019\u5c31\u662f\u91cd\u8bd5<\/span><br \/>\n<span class=\"token comment\">\/\/\/\/\u8bb0\u5f55\u65e5\u5fd7<\/span><br \/>\n<span class=\"token comment\">\/\/continue<\/span><br \/>\n<span class=\"token comment\">\/\/}<\/span><br \/>\nlog<span class=\"token punctuation\">.<\/span><span class=\"token function\">Println<\/span><span class=\"token punctuation\">(<\/span><span class=\"token function\">string<\/span><span class=\"token punctuation\">(<\/span>msg<span class=\"token punctuation\">.<\/span>Value<span class=\"token punctuation\">)<\/span><span class=\"token punctuation\">)<\/span><br \/>\nsession<span class=\"token punctuation\">.<\/span><span class=\"token function\">MarkMessage<\/span><span class=\"token punctuation\">(<\/span>msg<span class=\"token punctuation\">,<\/span> <span class=\"token string\">&#034;&#034;<\/span><span class=\"token punctuation\">)<\/span><br \/>\n<span class=\"token punctuation\">}<\/span><br \/>\n<span class=\"token comment\">\/\/\u4ec0\u4e48\u60c5\u51b5\u4e0b\u4f1a\u5230\u8fd9\u91cc<\/span><br \/>\n<span class=\"token comment\">\/\/msg\u88ab\u4eba\u5173\u4e86&#xff0c;\u4e5f\u5c31\u662f\u8981\u9000\u51fa\u6d88\u8d39\u903b\u8f91<\/span><br \/>\n<span class=\"token keyword\">return<\/span> <span class=\"token boolean\">nil<\/span><br \/>\n<span class=\"token punctuation\">}<\/span><\/p>\n<p><span class=\"token keyword\">type<\/span> MyBizMsg <span class=\"token keyword\">struct<\/span> <span class=\"token punctuation\">{<\/span><br \/>\nName <span class=\"token builtin\">string<\/span><br \/>\n<span class=\"token punctuation\">}<\/span><\/p>\n<h4>sarama \u4f7f\u7528\u5165\u95e8&#xff1a;ConsumerGroupHandler<\/h4>\n<p>\u4e0b\u9762\u7684\u4ee3\u7801\u5c31\u662f\u5bf9ConsumerGroupHandler\u7684\u5b9e\u73b0&#xff0c;\u5173\u952e\u5c31\u662f\u5728\u6d88\u8d39\u4e86msg\u4e4b\u540e&#xff0c;\u5982\u679c\u6d88\u8d39\u6210\u529f\u4e86&#xff0c;\u8981\u8bb0\u5f97\u63d0\u4ea4\u3002<\/p>\n<p>\u4e5f\u5c31\u662f\u8c03\u7528MarkMessage\u65b9\u6cd5\u3002<\/p>\n<p>\u81f3\u4e8e Setup \u548c Cleanup \u65b9\u6cd5\u53cd\u800c\u7528\u5f97\u4e0d\u591a\u3002<\/p>\n<p><span class=\"token keyword\">type<\/span> testConsumerGroupHandler <span class=\"token keyword\">struct<\/span> <span class=\"token punctuation\">{<\/span><br \/>\n<span class=\"token punctuation\">}<\/span><\/p>\n<p><span class=\"token keyword\">func<\/span> <span class=\"token punctuation\">(<\/span>t testConsumerGroupHandler<span class=\"token punctuation\">)<\/span> <span class=\"token function\">Setup<\/span><span class=\"token punctuation\">(<\/span>session sarama<span class=\"token punctuation\">.<\/span>ConsumerGroupSession<span class=\"token punctuation\">)<\/span> <span class=\"token builtin\">error<\/span> <span class=\"token punctuation\">{<\/span><br \/>\nlog<span class=\"token punctuation\">.<\/span><span class=\"token function\">Println<\/span><span class=\"token punctuation\">(<\/span><span class=\"token string\">&#034;Setup session:&#034;<\/span><span class=\"token punctuation\">,<\/span> session<span class=\"token punctuation\">)<\/span><br \/>\n<span class=\"token keyword\">return<\/span> <span class=\"token boolean\">nil<\/span><br \/>\n<span class=\"token punctuation\">}<\/span><\/p>\n<p><span class=\"token keyword\">func<\/span> <span class=\"token punctuation\">(<\/span>t testConsumerGroupHandler<span class=\"token punctuation\">)<\/span> <span class=\"token function\">Cleanup<\/span><span class=\"token punctuation\">(<\/span>session sarama<span class=\"token punctuation\">.<\/span>ConsumerGroupSession<span class=\"token punctuation\">)<\/span> <span class=\"token builtin\">error<\/span> <span class=\"token punctuation\">{<\/span><br \/>\nlog<span class=\"token punctuation\">.<\/span><span class=\"token function\">Println<\/span><span class=\"token punctuation\">(<\/span><span class=\"token string\">&#034;Cleanup session:&#034;<\/span><span class=\"token punctuation\">,<\/span> session<span class=\"token punctuation\">)<\/span><br \/>\n<span class=\"token keyword\">return<\/span> <span class=\"token boolean\">nil<\/span><br \/>\n<span class=\"token punctuation\">}<\/span><\/p>\n<p><span class=\"token keyword\">func<\/span> <span class=\"token punctuation\">(<\/span>t testConsumerGroupHandler<span class=\"token punctuation\">)<\/span> <span class=\"token function\">ConsumeClaim<\/span><span class=\"token punctuation\">(<\/span><br \/>\n<span class=\"token comment\">\/\/\u4ee3\u8868\u7684\u662f\u4f60\u548cKafka\u7684\u4f1a\u8bdd&#xff08;\u4ece\u5efa\u7acb\u8fde\u63a5\u5230\u8fde\u63a5\u5f7b\u5e95\u65ad\u6389\u7684\u90a3\u4e00\u6bb5\u65f6\u95f4&#xff09;<\/span><br \/>\nsession sarama<span class=\"token punctuation\">.<\/span>ConsumerGroupSession<span class=\"token punctuation\">,<\/span><br \/>\nclaim sarama<span class=\"token punctuation\">.<\/span>ConsumerGroupClaim<span class=\"token punctuation\">)<\/span> <span class=\"token builtin\">error<\/span> <span class=\"token punctuation\">{<\/span><br \/>\nmsgs <span class=\"token operator\">:&#061;<\/span> claim<span class=\"token punctuation\">.<\/span><span class=\"token function\">Messages<\/span><span class=\"token punctuation\">(<\/span><span class=\"token punctuation\">)<\/span><br \/>\n<span class=\"token keyword\">for<\/span> msg <span class=\"token operator\">:&#061;<\/span> <span class=\"token keyword\">range<\/span> msgs <span class=\"token punctuation\">{<\/span><br \/>\n<span class=\"token comment\">\/\/var bizMsg MyBizMsg<\/span><br \/>\n<span class=\"token comment\">\/\/err :&#061; json.Unmarshal(msg.Value, &amp;bizMsg)<\/span><br \/>\n<span class=\"token comment\">\/\/if err !&#061; nil {<\/span><br \/>\n<span class=\"token comment\">\/\/\/\/\u8fd9\u5c31\u662f\u6d88\u8d39\u6d88\u606f\u51fa\u9519<\/span><br \/>\n<span class=\"token comment\">\/\/\/\/\u5927\u591a\u6570\u65f6\u5019\u5c31\u662f\u91cd\u8bd5<\/span><br \/>\n<span class=\"token comment\">\/\/\/\/\u8bb0\u5f55\u65e5\u5fd7<\/span><br \/>\n<span class=\"token comment\">\/\/continue<\/span><br \/>\n<span class=\"token comment\">\/\/}<\/span><br \/>\nlog<span class=\"token punctuation\">.<\/span><span class=\"token function\">Println<\/span><span class=\"token punctuation\">(<\/span><span class=\"token function\">string<\/span><span class=\"token punctuation\">(<\/span>msg<span class=\"token punctuation\">.<\/span>Value<span class=\"token punctuation\">)<\/span><span class=\"token punctuation\">)<\/span><br \/>\nsession<span class=\"token punctuation\">.<\/span><span class=\"token function\">MarkMessage<\/span><span class=\"token punctuation\">(<\/span>msg<span class=\"token punctuation\">,<\/span> <span class=\"token string\">&#034;&#034;<\/span><span class=\"token punctuation\">)<\/span><br \/>\n<span class=\"token punctuation\">}<\/span><br \/>\n<span class=\"token comment\">\/\/\u4ec0\u4e48\u60c5\u51b5\u4e0b\u4f1a\u5230\u8fd9\u91cc<\/span><br \/>\n<span class=\"token comment\">\/\/msg\u88ab\u4eba\u5173\u4e86&#xff0c;\u4e5f\u5c31\u662f\u8981\u9000\u51fa\u6d88\u8d39\u903b\u8f91<\/span><br \/>\n<span class=\"token keyword\">return<\/span> <span class=\"token boolean\">nil<\/span><br \/>\n<span class=\"token punctuation\">}<\/span><\/p>\n<h4>sarama \u4f7f\u7528\u5165\u95e8&#xff1a;\u5229\u7528context\u6765\u63a7\u5236\u6d88\u8d39\u8005\u9000\u51fa<\/h4>\n<p>\u53ef\u4ee5\u5229\u7528\u521d\u59cb\u5316ConsumerGroup \u65f6\u5019\u4f20\u5165\u7684ctx\u6765\u63a7\u5236\u6d88\u8d39\u8005\u7ec4\u9000\u51fa\u6d88\u606f\u3002<\/p>\n<p>\u4e0b\u56fe\u4e2d&#xff0c;\u6211\u4f20\u5165\u4e86\u4e00\u4e2a\u8d85\u65f6\u7684context,\u90a3\u4e48&#xff1a;<\/p>\n<p>start <span class=\"token operator\">:&#061;<\/span> time<span class=\"token punctuation\">.<\/span><span class=\"token function\">Now<\/span><span class=\"token punctuation\">(<\/span><span class=\"token punctuation\">)<\/span><br \/>\n<span class=\"token comment\">\/\/\u8fd9\u91cc\u662f\u6d4b\u8bd5&#xff0c;\u6211\u4eec\u5c31\u63a7\u5236\u6d88\u8d3910s<\/span><br \/>\nctx<span class=\"token punctuation\">,<\/span> cancel <span class=\"token operator\">:&#061;<\/span> context<span class=\"token punctuation\">.<\/span><span class=\"token function\">WithTimeout<\/span><span class=\"token punctuation\">(<\/span>context<span class=\"token punctuation\">.<\/span><span class=\"token function\">Background<\/span><span class=\"token punctuation\">(<\/span><span class=\"token punctuation\">)<\/span><span class=\"token punctuation\">,<\/span> <span class=\"token number\">10<\/span><span class=\"token operator\">*<\/span>time<span class=\"token punctuation\">.<\/span>Second<span class=\"token punctuation\">)<\/span><br \/>\n<span class=\"token keyword\">defer<\/span> <span class=\"token function\">cancel<\/span><span class=\"token punctuation\">(<\/span><span class=\"token punctuation\">)<\/span><br \/>\n<span class=\"token comment\">\/\/\u5f00\u59cb\u6d88\u8d39&#xff0c;\u4f1a\u5728\u8fd9\u91cc\u963b\u585e\u4f4f<\/span><br \/>\nerr <span class=\"token operator\">&#061;<\/span> consumerGroup<span class=\"token punctuation\">.<\/span><span class=\"token function\">Consume<\/span><span class=\"token punctuation\">(<\/span>ctx<span class=\"token punctuation\">,<\/span> <span class=\"token punctuation\">[<\/span><span class=\"token punctuation\">]<\/span><span class=\"token builtin\">string<\/span><span class=\"token punctuation\">{<\/span><span class=\"token string\">&#034;test_topic&#034;<\/span><span class=\"token punctuation\">}<\/span><span class=\"token punctuation\">,<\/span> testConsumerGroupHandler<span class=\"token punctuation\">{<\/span><span class=\"token punctuation\">}<\/span><span class=\"token punctuation\">)<\/span><br \/>\n<span class=\"token comment\">\/\/\u4f60\u6d88\u8d39\u7ed3\u675f&#xff0c;\u5c31\u4f1a\u5230\u8fd9\u91cc<\/span><br \/>\nt<span class=\"token punctuation\">.<\/span><span class=\"token function\">Log<\/span><span class=\"token punctuation\">(<\/span>err<span class=\"token punctuation\">,<\/span> time<span class=\"token punctuation\">.<\/span><span class=\"token function\">Since<\/span><span class=\"token punctuation\">(<\/span>start<span class=\"token punctuation\">)<\/span><span class=\"token punctuation\">.<\/span><span class=\"token function\">String<\/span><span class=\"token punctuation\">(<\/span><span class=\"token punctuation\">)<\/span><span class=\"token punctuation\">)<\/span><\/p>\n<p>\u4e0b\u56fe\u4e2d&#xff0c;\u6211\u4e3b\u52a8\u8c03\u7528\u4e86cancel,\u90a3\u4e48&#xff1a;<\/p>\n<p>start <span class=\"token operator\">:&#061;<\/span> time<span class=\"token punctuation\">.<\/span><span class=\"token function\">Now<\/span><span class=\"token punctuation\">(<\/span><span class=\"token punctuation\">)<\/span><br \/>\n<span class=\"token comment\">\/\/\u8fd9\u91cc\u662f\u6d4b\u8bd5&#xff0c;\u6211\u4eec\u5c31\u63a7\u5236\u6d88\u8d395s<\/span><br \/>\nctx<span class=\"token punctuation\">,<\/span> cancel <span class=\"token operator\">:&#061;<\/span> context<span class=\"token punctuation\">.<\/span><span class=\"token function\">WithCancel<\/span><span class=\"token punctuation\">(<\/span>context<span class=\"token punctuation\">.<\/span><span class=\"token function\">Background<\/span><span class=\"token punctuation\">(<\/span><span class=\"token punctuation\">)<\/span><span class=\"token punctuation\">)<\/span><br \/>\ntime<span class=\"token punctuation\">.<\/span><span class=\"token function\">AfterFunc<\/span><span class=\"token punctuation\">(<\/span>time<span class=\"token punctuation\">.<\/span>Second<span class=\"token operator\">*<\/span><span class=\"token number\">5<\/span><span class=\"token punctuation\">,<\/span> <span class=\"token keyword\">func<\/span><span class=\"token punctuation\">(<\/span><span class=\"token punctuation\">)<\/span> <span class=\"token punctuation\">{<\/span><br \/>\n<span class=\"token function\">cancel<\/span><span class=\"token punctuation\">(<\/span><span class=\"token punctuation\">)<\/span><br \/>\n<span class=\"token punctuation\">}<\/span><span class=\"token punctuation\">)<\/span><br \/>\n<span class=\"token comment\">\/\/\u5f00\u59cb\u6d88\u8d39&#xff0c;\u4f1a\u5728\u8fd9\u91cc\u963b\u585e\u4f4f<\/span><br \/>\nerr <span class=\"token operator\">&#061;<\/span> consumerGroup<span class=\"token punctuation\">.<\/span><span class=\"token function\">Consume<\/span><span class=\"token punctuation\">(<\/span>ctx<span class=\"token punctuation\">,<\/span> <span class=\"token punctuation\">[<\/span><span class=\"token punctuation\">]<\/span><span class=\"token builtin\">string<\/span><span class=\"token punctuation\">{<\/span><span class=\"token string\">&#034;test_topic&#034;<\/span><span class=\"token punctuation\">}<\/span><span class=\"token punctuation\">,<\/span> testConsumerGroupHandler<span class=\"token punctuation\">{<\/span><span class=\"token punctuation\">}<\/span><span class=\"token punctuation\">)<\/span><br \/>\n<span class=\"token comment\">\/\/\u4f60\u6d88\u8d39\u7ed3\u675f&#xff0c;\u5c31\u4f1a\u5230\u8fd9\u91cc<\/span><br \/>\nt<span class=\"token punctuation\">.<\/span><span class=\"token function\">Log<\/span><span class=\"token punctuation\">(<\/span>err<span class=\"token punctuation\">,<\/span> time<span class=\"token punctuation\">.<\/span><span class=\"token function\">Since<\/span><span class=\"token punctuation\">(<\/span>start<span class=\"token punctuation\">)<\/span><span class=\"token punctuation\">.<\/span><span class=\"token function\">String<\/span><span class=\"token punctuation\">(<\/span><span class=\"token punctuation\">)<\/span><span class=\"token punctuation\">)<\/span><\/p>\n<ul>\n<li>\u5982\u679c\u8d85\u65f6\u4e86<\/li>\n<li>\u5982\u679c\u6211\u4e3b\u52a8\u8c03\u7528\u4e86cancel<\/li>\n<\/ul>\n<p>\u4ee5\u4e0a\u4e24\u79cd\u60c5\u51b5&#xff0c;\u4efb\u4f55\u4e00\u79cd\u60c5\u51b5\u51fa\u73b0\u4e86&#xff0c;\u90fd\u4f1a\u8ba9\u6d88\u8d39\u8005\u9000\u51fa\u6d88\u606f\u3002<\/p>\n<h4>sarama \u4f7f\u7528\u5165\u95e8&#xff1a;\u6307\u5b9a\u504f\u79fb\u91cf\u6d88\u8d39<\/h4>\n<p>\u5728\u90e8\u5206\u573a\u666f\u4e0b&#xff0c;\u6211\u4eec\u4f1a\u5e0c\u671b\u6d88\u8d39\u5386\u53f2\u6d88\u606f&#xff0c;\u6216\u8005\u4ece\u67d0\u4e2a\u6d88\u606f\u5f00\u59cb\u6d88\u8d39&#xff0c;\u90a3\u4e48\u53ef\u4ee5\u8003\u8651\u5728Setup\u91cc\u9762\u8bbe\u7f6e\u504f\u79fb\u91cf\u3002<\/p>\n<p>\u5173\u952e\u8c03\u7528\u662f ResetOffset\u3002<\/p>\n<p>\u4e0d\u8fc7\u4e00\u822c\u5efa\u8bae\u8d70\u79bb\u7ebf\u6e20\u9053&#xff0c;\u64cd\u4f5cKafka\u96c6\u7fa4\u53bb\u91cd\u7f6e\u5bf9\u5e94\u7684\u504f\u79fb\u91cf\u3002<\/p>\n<p>\u6838\u5fc3\u5728\u4e8e&#xff0c;\u4f60\u5e76\u4e0d\u662f\u6bcf\u6b21\u91cd\u65b0\u90e8\u7f72&#xff0c;\u91cd\u65b0\u542f\u52a8\u90fd\u662f\u8981\u91cd\u7f6e\u8fd9\u4e2a\u504f\u79fb\u91cf\u7684\u3002<\/p>\n<p>\u53ea\u8981\u4f60\u7684\u6d88\u8d39\u8005\u7ec4\u5728\u8fd9\u4e2a\u5206\u533a\u4e0a\u6709\u8fc7\u201c\u5df2\u63d0\u4ea4\u7684 offset\u201d&#xff0c;Kafka \u5c31\u4f1a\u4f18\u5148\u4f7f\u7528\u8fd9\u4e2a\u63d0\u4ea4\u7684 offset&#xff0c;\u800c\u5ffd\u7565\u4f60\u5728 Setup() \u4e2d\u8bbe\u7f6e\u7684 offset\u3002<\/p>\n<p><span class=\"token comment\">\/\/ \u5728\u6bcf\u6b21 rebalance \u6216\u521d\u6b21\u8fde\u63a5 Kafka \u540e\u8c03\u7528&#xff0c;\u7528\u4e8e\u521d\u59cb\u5316\u3002<\/span><br \/>\n<span class=\"token keyword\">func<\/span> <span class=\"token punctuation\">(<\/span>t testConsumerGroupHandler<span class=\"token punctuation\">)<\/span> <span class=\"token function\">Setup<\/span><span class=\"token punctuation\">(<\/span>session sarama<span class=\"token punctuation\">.<\/span>ConsumerGroupSession<span class=\"token punctuation\">)<\/span> <span class=\"token builtin\">error<\/span> <span class=\"token punctuation\">{<\/span><br \/>\n<span class=\"token comment\">\/\/\u6267\u884c\u4e00\u4e9b\u521d\u59cb\u5316\u7684\u4e8b\u60c5<\/span><br \/>\nlog<span class=\"token punctuation\">.<\/span><span class=\"token function\">Println<\/span><span class=\"token punctuation\">(<\/span><span class=\"token string\">&#034;Setup&#034;<\/span><span class=\"token punctuation\">)<\/span><br \/>\n<span class=\"token comment\">\/\/\u5047\u8bbe\u8981\u91cd\u7f6e\u52300<\/span><br \/>\n<span class=\"token keyword\">var<\/span> offset <span class=\"token builtin\">int64<\/span> <span class=\"token operator\">&#061;<\/span> <span class=\"token number\">0<\/span><br \/>\n<span class=\"token comment\">\/\/\u904d\u5386\u6240\u6709\u7684\u5206\u533a<\/span><br \/>\npartitions <span class=\"token operator\">:&#061;<\/span> session<span class=\"token punctuation\">.<\/span><span class=\"token function\">Claims<\/span><span class=\"token punctuation\">(<\/span><span class=\"token punctuation\">)<\/span><span class=\"token punctuation\">[<\/span><span class=\"token string\">&#034;test_topic&#034;<\/span><span class=\"token punctuation\">]<\/span><br \/>\n<span class=\"token keyword\">for<\/span> <span class=\"token boolean\">_<\/span><span class=\"token punctuation\">,<\/span> p <span class=\"token operator\">:&#061;<\/span> <span class=\"token keyword\">range<\/span> partitions <span class=\"token punctuation\">{<\/span><br \/>\nsession<span class=\"token punctuation\">.<\/span><span class=\"token function\">ResetOffset<\/span><span class=\"token punctuation\">(<\/span><span class=\"token string\">&#034;test_topic&#034;<\/span><span class=\"token punctuation\">,<\/span> p<span class=\"token punctuation\">,<\/span> offset<span class=\"token punctuation\">,<\/span> <span class=\"token string\">&#034;&#034;<\/span><span class=\"token punctuation\">)<\/span><br \/>\n<span class=\"token comment\">\/\/session.ResetOffset(&#034;test_topic&#034;, p, sarama.OffsetNewest, &#034;&#034;)<\/span><br \/>\n<span class=\"token comment\">\/\/session.ResetOffset(&#034;test_topic&#034;, p, sarama.OffsetOldest, &#034;&#034;)<\/span><br \/>\n<span class=\"token punctuation\">}<\/span><br \/>\n<span class=\"token keyword\">return<\/span> <span class=\"token boolean\">nil<\/span><br \/>\n<span class=\"token punctuation\">}<\/span><\/p>\n<h4>sarama\u4f7f\u7528\u5165\u95e8&#xff1a;\u5f02\u6b65\u6d88\u8d39&#xff0c;\u6279\u91cf\u63d0\u4ea4<\/h4>\n<p>\u6b63\u5e38\u6765\u8bf4&#xff0c;\u4e3a\u4e86\u5728\u5f02\u6b65\u6d88\u8d39\u5931\u8d25\u4e4b\u540e\u8fd8\u80fd\u7ee7\u7eed\u91cd\u8bd5&#xff0c;\u53ef\u4ee5\u8003\u8651\u5f02\u6b65\u6d88\u8d39\u4e00\u6279&#xff0c;\u63d0\u4ea4\u4e00\u6279\u3002<\/p>\n<p>\u4e0b\u56fe\u4e2d&#xff0c;ctx.Done\u5206\u652f\u7528\u6765\u63a7\u5236\u51d1\u591f\u4e00\u6279\u7684\u8d85\u65f6\u673a\u5236&#xff0c;\u9632\u6b62\u751f\u4ea7\u8005\u7684\u901f\u7387\u5f88\u4f4e&#xff0c;\u4e00\u76f4\u51d1\u4e0d\u591f\u4e00\u6279\u3002<\/p>\n<p><span class=\"token keyword\">func<\/span> <span class=\"token punctuation\">(<\/span>t testConsumerGroupHandler<span class=\"token punctuation\">)<\/span> <span class=\"token function\">ConsumeClaim<\/span><span class=\"token punctuation\">(<\/span><br \/>\n<span class=\"token comment\">\/\/\u4ee3\u8868\u7684\u662f\u4f60\u548cKafka\u7684\u4f1a\u8bdd&#xff08;\u4ece\u5efa\u7acb\u8fde\u63a5\u5230\u8fde\u63a5\u5f7b\u5e95\u65ad\u6389\u7684\u90a3\u4e00\u6bb5\u65f6\u95f4&#xff09;<\/span><br \/>\n<span class=\"token comment\">\/\/\u53ef\u4ee5\u901a\u8fc7 session \u63a7\u5236 offset \u63d0\u4ea4&#xff0c;\u83b7\u53d6\u6d88\u8d39\u8005\u4fe1\u606f&#xff0c;\u5e76\u611f\u77e5\u9000\u51fa\u65f6\u673a\u3002<\/span><br \/>\nsession sarama<span class=\"token punctuation\">.<\/span>ConsumerGroupSession<span class=\"token punctuation\">,<\/span><br \/>\n<span class=\"token comment\">\/\/claim \u662f\u4f60\u83b7\u53d6\u6d88\u606f\u7684\u5165\u53e3<\/span><br \/>\nclaim sarama<span class=\"token punctuation\">.<\/span>ConsumerGroupClaim<span class=\"token punctuation\">)<\/span> <span class=\"token builtin\">error<\/span> <span class=\"token punctuation\">{<\/span><br \/>\nmsgs <span class=\"token operator\">:&#061;<\/span> claim<span class=\"token punctuation\">.<\/span><span class=\"token function\">Messages<\/span><span class=\"token punctuation\">(<\/span><span class=\"token punctuation\">)<\/span><\/p>\n<p><span class=\"token comment\">\/\/\u8bbe\u7f6e\u6279\u91cf\u5904\u7406\u7684\u6761\u6570<\/span><br \/>\n<span class=\"token keyword\">const<\/span> batchSize <span class=\"token operator\">&#061;<\/span> <span class=\"token number\">10<\/span><br \/>\n<span class=\"token keyword\">for<\/span> <span class=\"token punctuation\">{<\/span><br \/>\nctx<span class=\"token punctuation\">,<\/span> cancel <span class=\"token operator\">:&#061;<\/span> context<span class=\"token punctuation\">.<\/span><span class=\"token function\">WithTimeout<\/span><span class=\"token punctuation\">(<\/span>context<span class=\"token punctuation\">.<\/span><span class=\"token function\">Background<\/span><span class=\"token punctuation\">(<\/span><span class=\"token punctuation\">)<\/span><span class=\"token punctuation\">,<\/span> time<span class=\"token punctuation\">.<\/span>Second<span class=\"token operator\">*<\/span><span class=\"token number\">1<\/span><span class=\"token punctuation\">)<\/span><br \/>\n<span class=\"token keyword\">var<\/span> eg errgroup<span class=\"token punctuation\">.<\/span>Group<br \/>\n<span class=\"token keyword\">var<\/span> last <span class=\"token operator\">*<\/span>sarama<span class=\"token punctuation\">.<\/span>ConsumerMessage<br \/>\n<span class=\"token keyword\">for<\/span> i <span class=\"token operator\">:&#061;<\/span> <span class=\"token number\">0<\/span><span class=\"token punctuation\">;<\/span> i <span class=\"token operator\">&lt;<\/span> batchSize<span class=\"token punctuation\">;<\/span> i<span class=\"token operator\">&#043;&#043;<\/span> <span class=\"token punctuation\">{<\/span><br \/>\ndone <span class=\"token operator\">:&#061;<\/span> <span class=\"token boolean\">false<\/span><br \/>\n<span class=\"token keyword\">select<\/span> <span class=\"token punctuation\">{<\/span><br \/>\n<span class=\"token keyword\">case<\/span> <span class=\"token operator\">&lt;-<\/span>ctx<span class=\"token punctuation\">.<\/span><span class=\"token function\">Done<\/span><span class=\"token punctuation\">(<\/span><span class=\"token punctuation\">)<\/span><span class=\"token punctuation\">:<\/span><br \/>\n<span class=\"token comment\">\/\/\u8fd9\u8fb9\u8868\u793a\u8d85\u65f6\u4e86<\/span><br \/>\ndone <span class=\"token operator\">&#061;<\/span> <span class=\"token boolean\">true<\/span><br \/>\n<span class=\"token keyword\">case<\/span> msg<span class=\"token punctuation\">,<\/span> ok <span class=\"token operator\">:&#061;<\/span> <span class=\"token operator\">&lt;-<\/span>msgs<span class=\"token punctuation\">:<\/span><br \/>\n<span class=\"token keyword\">if<\/span> <span class=\"token operator\">!<\/span>ok <span class=\"token punctuation\">{<\/span><br \/>\n<span class=\"token function\">cancel<\/span><span class=\"token punctuation\">(<\/span><span class=\"token punctuation\">)<\/span><br \/>\n<span class=\"token keyword\">return<\/span> <span class=\"token boolean\">nil<\/span><br \/>\n<span class=\"token punctuation\">}<\/span><br \/>\nlast <span class=\"token operator\">&#061;<\/span> msg<br \/>\nmsg1 <span class=\"token operator\">:&#061;<\/span> msg<br \/>\neg<span class=\"token punctuation\">.<\/span><span class=\"token function\">Go<\/span><span class=\"token punctuation\">(<\/span><span class=\"token keyword\">func<\/span><span class=\"token punctuation\">(<\/span><span class=\"token punctuation\">)<\/span> <span class=\"token builtin\">error<\/span> <span class=\"token punctuation\">{<\/span><br \/>\n<span class=\"token comment\">\/\/\u6211\u5c31\u5728\u8fd9\u91cc\u6d88\u8d39<\/span><br \/>\ntime<span class=\"token punctuation\">.<\/span><span class=\"token function\">Sleep<\/span><span class=\"token punctuation\">(<\/span>time<span class=\"token punctuation\">.<\/span>Second<span class=\"token punctuation\">)<\/span><br \/>\n<span class=\"token comment\">\/\/\u4f60\u5728\u8fd9\u91cc\u91cd\u8bd5<\/span><br \/>\nlog<span class=\"token punctuation\">.<\/span><span class=\"token function\">Println<\/span><span class=\"token punctuation\">(<\/span><span class=\"token function\">string<\/span><span class=\"token punctuation\">(<\/span>msg1<span class=\"token punctuation\">.<\/span>Value<span class=\"token punctuation\">)<\/span><span class=\"token punctuation\">)<\/span><br \/>\n<span class=\"token keyword\">return<\/span> <span class=\"token boolean\">nil<\/span><br \/>\n<span class=\"token punctuation\">}<\/span><span class=\"token punctuation\">)<\/span><br \/>\n<span class=\"token punctuation\">}<\/span><br \/>\n<span class=\"token keyword\">if<\/span> done <span class=\"token punctuation\">{<\/span><br \/>\n<span class=\"token keyword\">break<\/span><br \/>\n<span class=\"token punctuation\">}<\/span><br \/>\n<span class=\"token punctuation\">}<\/span><br \/>\n<span class=\"token function\">cancel<\/span><span class=\"token punctuation\">(<\/span><span class=\"token punctuation\">)<\/span><br \/>\nerr <span class=\"token operator\">:&#061;<\/span> eg<span class=\"token punctuation\">.<\/span><span class=\"token function\">Wait<\/span><span class=\"token punctuation\">(<\/span><span class=\"token punctuation\">)<\/span><br \/>\n<span class=\"token keyword\">if<\/span> err <span class=\"token operator\">!&#061;<\/span> <span class=\"token boolean\">nil<\/span> <span class=\"token punctuation\">{<\/span><br \/>\n<span class=\"token comment\">\/\/\u8fd9\u8fb9\u80fd\u600e\u4e48\u529e&#xff1f;<\/span><br \/>\n<span class=\"token comment\">\/\/\u8bb0\u5f55\u65e5\u5fd7<\/span><br \/>\n<span class=\"token keyword\">continue<\/span><br \/>\n<span class=\"token punctuation\">}<\/span><br \/>\n<span class=\"token comment\">\/\/\u5c31\u8fd9\u6837<\/span><br \/>\nsession<span class=\"token punctuation\">.<\/span><span class=\"token function\">MarkMessage<\/span><span class=\"token punctuation\">(<\/span>last<span class=\"token punctuation\">,<\/span> <span class=\"token string\">&#034;&#034;<\/span><span class=\"token punctuation\">)<\/span><br \/>\n<span class=\"token punctuation\">}<\/span><br \/>\n<span class=\"token keyword\">return<\/span> <span class=\"token boolean\">nil<\/span><br \/>\n<span class=\"token punctuation\">}<\/span><\/p>\n<p>\u53e6\u5916\u4e00\u4e2a\u5206\u652f\u5c31\u662f\u8bfb\u53d6\u6d88\u606f&#xff0c;\u5e76\u4e14\u63d0\u4ea4\u5230errgroup\u91cc\u9762\u6267\u884c\u3002<\/p>\n<p>Sleep\u662f\u6a21\u62df\u957f\u65f6\u95f4\u4e1a\u52a1\u6267\u884c\u3002<\/p>\n","protected":false},"excerpt":{"rendered":"<p>\u6587\u7ae0\u6d4f\u89c8\u9605\u8bfb1.1k\u6b21\uff0c\u70b9\u8d5e19\u6b21\uff0c\u6536\u85cf7\u6b21\u3002\u5728Go\u8bed\u8a00\u4e2d\uff0c\u6709\u591a\u4e2aKafka\u5ba2\u6237\u7aef\u53ef\u4f9b\u9009\u62e9\uff0c\u5176\u4e2dSarama\u3001segmentio\/kafka-go\u548cconfluent-kafka-go\u662f\u8f83\u4e3a\u77e5\u540d\u7684\u4e09\u4e2a\u3002Sarama\u662f\u76ee\u524d\u7528\u6237\u6570\u91cf\u6700\u591a\u7684\u5ba2\u6237\u7aef\uff0c\u6700\u521d\u7531Shopify\u5f00\u53d1\uff0c\u73b0\u7531IBM\u7ef4\u62a4\u3002segmentio\/kafka-go\u867d\u7136\u6ca1\u6709\u663e\u8457\u7f3a\u70b9\uff0c\u4f46\u529f\u80fd\u76f8\u5bf9\u57fa\u7840\u3002confluent-kafka-go\u5219\u9700\u8981\u542f\u7528cgo\uff0c\u8de8\u5e73\u53f0\u652f\u6301\u8f83\u5dee\uff0c\u4e14\u4e0d\u652f\u6301\u4ea4\u53c9\u7f16\u8bd1\u3002 Sarama\u63d0\u4f9b\u4e86\u547d\u4ee4\u884c\u5de5\u5177\uff0c\u53ef\u4ee5\u7528\u4e8e\u7b80\u5355\u7684\u6d88\u8d39\u8005\u548c\u751f\u4ea7\u8005\u64cd\u4f5c\u3002\u5b89\u88c5Sarama\u5de5\u5177\u65f6\uff0c\u53ef\u4ee5\u901a\u8fc7\u8bbe\u7f6eGo\u4ee3\u7406\u6765\u52a0\u901f<\/p>\n","protected":false},"author":2,"featured_media":37358,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[1],"tags":[746,1114,3231,668],"topic":[],"class_list":["post-37361","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-server","tag-golang","tag-kafka","tag-linq","tag-668"],"yoast_head":"<!-- This site is optimized with the Yoast SEO plugin v20.3 - https:\/\/yoast.com\/wordpress\/plugins\/seo\/ -->\n<title>Kafka Go\u5ba2\u6237\u7aef-Sarama - \u7f51\u7855\u4e92\u8054\u5e2e\u52a9\u4e2d\u5fc3<\/title>\n<meta name=\"robots\" content=\"index, follow, max-snippet:-1, max-image-preview:large, max-video-preview:-1\" \/>\n<link rel=\"canonical\" href=\"https:\/\/www.wsisp.com\/helps\/37361.html\" \/>\n<meta property=\"og:locale\" content=\"zh_CN\" \/>\n<meta property=\"og:type\" content=\"article\" \/>\n<meta property=\"og:title\" content=\"Kafka Go\u5ba2\u6237\u7aef-Sarama - \u7f51\u7855\u4e92\u8054\u5e2e\u52a9\u4e2d\u5fc3\" \/>\n<meta property=\"og:description\" content=\"\u6587\u7ae0\u6d4f\u89c8\u9605\u8bfb1.1k\u6b21\uff0c\u70b9\u8d5e19\u6b21\uff0c\u6536\u85cf7\u6b21\u3002\u5728Go\u8bed\u8a00\u4e2d\uff0c\u6709\u591a\u4e2aKafka\u5ba2\u6237\u7aef\u53ef\u4f9b\u9009\u62e9\uff0c\u5176\u4e2dSarama\u3001segmentio\/kafka-go\u548cconfluent-kafka-go\u662f\u8f83\u4e3a\u77e5\u540d\u7684\u4e09\u4e2a\u3002Sarama\u662f\u76ee\u524d\u7528\u6237\u6570\u91cf\u6700\u591a\u7684\u5ba2\u6237\u7aef\uff0c\u6700\u521d\u7531Shopify\u5f00\u53d1\uff0c\u73b0\u7531IBM\u7ef4\u62a4\u3002segmentio\/kafka-go\u867d\u7136\u6ca1\u6709\u663e\u8457\u7f3a\u70b9\uff0c\u4f46\u529f\u80fd\u76f8\u5bf9\u57fa\u7840\u3002confluent-kafka-go\u5219\u9700\u8981\u542f\u7528cgo\uff0c\u8de8\u5e73\u53f0\u652f\u6301\u8f83\u5dee\uff0c\u4e14\u4e0d\u652f\u6301\u4ea4\u53c9\u7f16\u8bd1\u3002 Sarama\u63d0\u4f9b\u4e86\u547d\u4ee4\u884c\u5de5\u5177\uff0c\u53ef\u4ee5\u7528\u4e8e\u7b80\u5355\u7684\u6d88\u8d39\u8005\u548c\u751f\u4ea7\u8005\u64cd\u4f5c\u3002\u5b89\u88c5Sarama\u5de5\u5177\u65f6\uff0c\u53ef\u4ee5\u901a\u8fc7\u8bbe\u7f6eGo\u4ee3\u7406\u6765\u52a0\u901f\" \/>\n<meta property=\"og:url\" content=\"https:\/\/www.wsisp.com\/helps\/37361.html\" \/>\n<meta property=\"og:site_name\" content=\"\u7f51\u7855\u4e92\u8054\u5e2e\u52a9\u4e2d\u5fc3\" \/>\n<meta property=\"article:published_time\" content=\"2025-05-15T14:27:36+00:00\" \/>\n<meta property=\"og:image\" content=\"https:\/\/www.wsisp.com\/helps\/wp-content\/uploads\/2025\/05\/20250515142734-6825f9d62cd5d.png\" \/>\n<meta name=\"author\" content=\"admin\" \/>\n<meta name=\"twitter:card\" content=\"summary_large_image\" \/>\n<meta name=\"twitter:label1\" content=\"\u4f5c\u8005\" \/>\n\t<meta name=\"twitter:data1\" content=\"admin\" \/>\n\t<meta name=\"twitter:label2\" content=\"\u9884\u8ba1\u9605\u8bfb\u65f6\u95f4\" \/>\n\t<meta name=\"twitter:data2\" content=\"6 \u5206\" \/>\n<script type=\"application\/ld+json\" class=\"yoast-schema-graph\">{\"@context\":\"https:\/\/schema.org\",\"@graph\":[{\"@type\":\"WebPage\",\"@id\":\"https:\/\/www.wsisp.com\/helps\/37361.html\",\"url\":\"https:\/\/www.wsisp.com\/helps\/37361.html\",\"name\":\"Kafka Go\u5ba2\u6237\u7aef-Sarama - \u7f51\u7855\u4e92\u8054\u5e2e\u52a9\u4e2d\u5fc3\",\"isPartOf\":{\"@id\":\"https:\/\/www.wsisp.com\/helps\/#website\"},\"datePublished\":\"2025-05-15T14:27:36+00:00\",\"dateModified\":\"2025-05-15T14:27:36+00:00\",\"author\":{\"@id\":\"https:\/\/www.wsisp.com\/helps\/#\/schema\/person\/358e386c577a3ab51c4493330a20ad41\"},\"breadcrumb\":{\"@id\":\"https:\/\/www.wsisp.com\/helps\/37361.html#breadcrumb\"},\"inLanguage\":\"zh-Hans\",\"potentialAction\":[{\"@type\":\"ReadAction\",\"target\":[\"https:\/\/www.wsisp.com\/helps\/37361.html\"]}]},{\"@type\":\"BreadcrumbList\",\"@id\":\"https:\/\/www.wsisp.com\/helps\/37361.html#breadcrumb\",\"itemListElement\":[{\"@type\":\"ListItem\",\"position\":1,\"name\":\"\u9996\u9875\",\"item\":\"https:\/\/www.wsisp.com\/helps\"},{\"@type\":\"ListItem\",\"position\":2,\"name\":\"Kafka Go\u5ba2\u6237\u7aef--Sarama\"}]},{\"@type\":\"WebSite\",\"@id\":\"https:\/\/www.wsisp.com\/helps\/#website\",\"url\":\"https:\/\/www.wsisp.com\/helps\/\",\"name\":\"\u7f51\u7855\u4e92\u8054\u5e2e\u52a9\u4e2d\u5fc3\",\"description\":\"\u9999\u6e2f\u670d\u52a1\u5668_\u9999\u6e2f\u4e91\u670d\u52a1\u5668\u8d44\u8baf_\u670d\u52a1\u5668\u5e2e\u52a9\u6587\u6863_\u670d\u52a1\u5668\u6559\u7a0b\",\"potentialAction\":[{\"@type\":\"SearchAction\",\"target\":{\"@type\":\"EntryPoint\",\"urlTemplate\":\"https:\/\/www.wsisp.com\/helps\/?s={search_term_string}\"},\"query-input\":\"required name=search_term_string\"}],\"inLanguage\":\"zh-Hans\"},{\"@type\":\"Person\",\"@id\":\"https:\/\/www.wsisp.com\/helps\/#\/schema\/person\/358e386c577a3ab51c4493330a20ad41\",\"name\":\"admin\",\"image\":{\"@type\":\"ImageObject\",\"inLanguage\":\"zh-Hans\",\"@id\":\"https:\/\/www.wsisp.com\/helps\/#\/schema\/person\/image\/\",\"url\":\"https:\/\/gravatar.wp-china-yes.net\/avatar\/?s=96&d=mystery\",\"contentUrl\":\"https:\/\/gravatar.wp-china-yes.net\/avatar\/?s=96&d=mystery\",\"caption\":\"admin\"},\"sameAs\":[\"http:\/\/wp.wsisp.com\"],\"url\":\"https:\/\/www.wsisp.com\/helps\/author\/admin\"}]}<\/script>\n<!-- \/ Yoast SEO plugin. -->","yoast_head_json":{"title":"Kafka Go\u5ba2\u6237\u7aef-Sarama - \u7f51\u7855\u4e92\u8054\u5e2e\u52a9\u4e2d\u5fc3","robots":{"index":"index","follow":"follow","max-snippet":"max-snippet:-1","max-image-preview":"max-image-preview:large","max-video-preview":"max-video-preview:-1"},"canonical":"https:\/\/www.wsisp.com\/helps\/37361.html","og_locale":"zh_CN","og_type":"article","og_title":"Kafka Go\u5ba2\u6237\u7aef-Sarama - \u7f51\u7855\u4e92\u8054\u5e2e\u52a9\u4e2d\u5fc3","og_description":"\u6587\u7ae0\u6d4f\u89c8\u9605\u8bfb1.1k\u6b21\uff0c\u70b9\u8d5e19\u6b21\uff0c\u6536\u85cf7\u6b21\u3002\u5728Go\u8bed\u8a00\u4e2d\uff0c\u6709\u591a\u4e2aKafka\u5ba2\u6237\u7aef\u53ef\u4f9b\u9009\u62e9\uff0c\u5176\u4e2dSarama\u3001segmentio\/kafka-go\u548cconfluent-kafka-go\u662f\u8f83\u4e3a\u77e5\u540d\u7684\u4e09\u4e2a\u3002Sarama\u662f\u76ee\u524d\u7528\u6237\u6570\u91cf\u6700\u591a\u7684\u5ba2\u6237\u7aef\uff0c\u6700\u521d\u7531Shopify\u5f00\u53d1\uff0c\u73b0\u7531IBM\u7ef4\u62a4\u3002segmentio\/kafka-go\u867d\u7136\u6ca1\u6709\u663e\u8457\u7f3a\u70b9\uff0c\u4f46\u529f\u80fd\u76f8\u5bf9\u57fa\u7840\u3002confluent-kafka-go\u5219\u9700\u8981\u542f\u7528cgo\uff0c\u8de8\u5e73\u53f0\u652f\u6301\u8f83\u5dee\uff0c\u4e14\u4e0d\u652f\u6301\u4ea4\u53c9\u7f16\u8bd1\u3002 Sarama\u63d0\u4f9b\u4e86\u547d\u4ee4\u884c\u5de5\u5177\uff0c\u53ef\u4ee5\u7528\u4e8e\u7b80\u5355\u7684\u6d88\u8d39\u8005\u548c\u751f\u4ea7\u8005\u64cd\u4f5c\u3002\u5b89\u88c5Sarama\u5de5\u5177\u65f6\uff0c\u53ef\u4ee5\u901a\u8fc7\u8bbe\u7f6eGo\u4ee3\u7406\u6765\u52a0\u901f","og_url":"https:\/\/www.wsisp.com\/helps\/37361.html","og_site_name":"\u7f51\u7855\u4e92\u8054\u5e2e\u52a9\u4e2d\u5fc3","article_published_time":"2025-05-15T14:27:36+00:00","og_image":[{"url":"https:\/\/www.wsisp.com\/helps\/wp-content\/uploads\/2025\/05\/20250515142734-6825f9d62cd5d.png"}],"author":"admin","twitter_card":"summary_large_image","twitter_misc":{"\u4f5c\u8005":"admin","\u9884\u8ba1\u9605\u8bfb\u65f6\u95f4":"6 \u5206"},"schema":{"@context":"https:\/\/schema.org","@graph":[{"@type":"WebPage","@id":"https:\/\/www.wsisp.com\/helps\/37361.html","url":"https:\/\/www.wsisp.com\/helps\/37361.html","name":"Kafka Go\u5ba2\u6237\u7aef-Sarama - \u7f51\u7855\u4e92\u8054\u5e2e\u52a9\u4e2d\u5fc3","isPartOf":{"@id":"https:\/\/www.wsisp.com\/helps\/#website"},"datePublished":"2025-05-15T14:27:36+00:00","dateModified":"2025-05-15T14:27:36+00:00","author":{"@id":"https:\/\/www.wsisp.com\/helps\/#\/schema\/person\/358e386c577a3ab51c4493330a20ad41"},"breadcrumb":{"@id":"https:\/\/www.wsisp.com\/helps\/37361.html#breadcrumb"},"inLanguage":"zh-Hans","potentialAction":[{"@type":"ReadAction","target":["https:\/\/www.wsisp.com\/helps\/37361.html"]}]},{"@type":"BreadcrumbList","@id":"https:\/\/www.wsisp.com\/helps\/37361.html#breadcrumb","itemListElement":[{"@type":"ListItem","position":1,"name":"\u9996\u9875","item":"https:\/\/www.wsisp.com\/helps"},{"@type":"ListItem","position":2,"name":"Kafka Go\u5ba2\u6237\u7aef--Sarama"}]},{"@type":"WebSite","@id":"https:\/\/www.wsisp.com\/helps\/#website","url":"https:\/\/www.wsisp.com\/helps\/","name":"\u7f51\u7855\u4e92\u8054\u5e2e\u52a9\u4e2d\u5fc3","description":"\u9999\u6e2f\u670d\u52a1\u5668_\u9999\u6e2f\u4e91\u670d\u52a1\u5668\u8d44\u8baf_\u670d\u52a1\u5668\u5e2e\u52a9\u6587\u6863_\u670d\u52a1\u5668\u6559\u7a0b","potentialAction":[{"@type":"SearchAction","target":{"@type":"EntryPoint","urlTemplate":"https:\/\/www.wsisp.com\/helps\/?s={search_term_string}"},"query-input":"required name=search_term_string"}],"inLanguage":"zh-Hans"},{"@type":"Person","@id":"https:\/\/www.wsisp.com\/helps\/#\/schema\/person\/358e386c577a3ab51c4493330a20ad41","name":"admin","image":{"@type":"ImageObject","inLanguage":"zh-Hans","@id":"https:\/\/www.wsisp.com\/helps\/#\/schema\/person\/image\/","url":"https:\/\/gravatar.wp-china-yes.net\/avatar\/?s=96&d=mystery","contentUrl":"https:\/\/gravatar.wp-china-yes.net\/avatar\/?s=96&d=mystery","caption":"admin"},"sameAs":["http:\/\/wp.wsisp.com"],"url":"https:\/\/www.wsisp.com\/helps\/author\/admin"}]}},"_links":{"self":[{"href":"https:\/\/www.wsisp.com\/helps\/wp-json\/wp\/v2\/posts\/37361","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/www.wsisp.com\/helps\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.wsisp.com\/helps\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.wsisp.com\/helps\/wp-json\/wp\/v2\/users\/2"}],"replies":[{"embeddable":true,"href":"https:\/\/www.wsisp.com\/helps\/wp-json\/wp\/v2\/comments?post=37361"}],"version-history":[{"count":0,"href":"https:\/\/www.wsisp.com\/helps\/wp-json\/wp\/v2\/posts\/37361\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/www.wsisp.com\/helps\/wp-json\/wp\/v2\/media\/37358"}],"wp:attachment":[{"href":"https:\/\/www.wsisp.com\/helps\/wp-json\/wp\/v2\/media?parent=37361"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.wsisp.com\/helps\/wp-json\/wp\/v2\/categories?post=37361"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.wsisp.com\/helps\/wp-json\/wp\/v2\/tags?post=37361"},{"taxonomy":"topic","embeddable":true,"href":"https:\/\/www.wsisp.com\/helps\/wp-json\/wp\/v2\/topic?post=37361"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}