将kafka里的数据传输给大数据平台,发现有两个时间段的传输数据是一毛一样的,然后就上网各种查资料,记录下来
###原因
原因1:线程挂掉了,导致消费了的数据offset没提交
原因2:已经消费了数据,但是offset没提交
经过日志排查,排除了第一种可能性。在网上查询了相关资料,给出了两种方案。
- 修改offset
我们在使用consumer消费的时候,每个topic会产生一个偏移量,这个偏移量保证我们消费的消息顺序且不重复。Offest是在zookeeper中存储的,我们可以设置consumer实时或定时的注册offset到zookeeper中。我们修改这个offest到我们想重新消费的位置,就可以做到重新消费了。
set('auto.commit.interval.ms', 1000);
- 通过使用不同的group来消费
通过不同的group来重新消费数据方法简单,但我们无法指定我们要重复消费哪些数据,它会从这个groupid在zookeeper注册之后所产生的数据开始消费。这里需要注意的是新的group是重新消费所有数据,但也并非是topic中所有数据,它只会消费它在zookeeper注册过之后产生的数据。我们可以再zookeeper客户端中 /consumer/ 目录下查看我们已经注册过的groupid。我们在使用consumer消费数据时如果指定一个新的groupid,那么当这个consumer被执行的时候会自动注册到zookeeper中。而这个group中的consumer之后消费到注册之后产生的数据。
$conf = new RdKafka\Conf();
$conf->set('group.id', 'BDLogConsumerGroup');
$rk = new RdKafka\Consumer($conf);
$rk->addBrokers($config['kafka']);