需求1:与推荐侧数据同步

1.配置kafka

kafka批量消费,设置消费者组,使得每次消费的时候从头消费

消费者组能从头开始消费必须得有消息,也就是说如果消息过期了还是消费不了的,这个过期时间在delete.retention.ms里面配置,当前topic的过期时间是604800000,也就是一个礼拜。

还有一种办法是通过kafka给定的某个脚本参数来指定offset的消费位置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
recommendDomainDataListenerContainer(KafkaProperties kafkaProperties) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
Map<String, Object> map = kafkaProperties.buildConsumerProperties();
//批量消费50个
map.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
map.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 10 * 60 * 1000);
map.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000); // 3秒上报一次心跳
map.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 60 * 1000);
map.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//消费者组每次都从头开始消费
ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(map);
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(2); // 2个线程
factory.setBatchListener(true);
return factory;
}

2.配置kafka的消费者

由于是批量请求rpc接口,因此需要解决请求失败带来的消息丢失问题,意思是如果请求失败了,这些domain还会被消费,这里的思路是用一个set来保存没有消费的domain,如果在某处出错了,就会重新往kafka里面发消息,实现不漏消息。

2024.12.2 在这上面栽了跟头

想法很美好但是实际上线还是出现了漏消息,成功率只有90%。

首先这个domainSet不能设置成全局变量,因为所有的线程都会更改这个变量,会有复杂的并发安全问题。其次是可能出现异常的时间点:

  • 消费的时候就出现问题,可能性很小
  • 拿着所有50条去请求rpc接口的时候
  • rpc接口失败或者别的状态码
  • es更新出错

由此可见需要在rpc接口请求之前就要初始化domainSet,所以在消费消息的时候就需要同时加入domainSet。但如果rpc接口请求成功了,需要清空domainSet,为什么呢?因为有可能domain是50个,那边只有40个可返回域名,如果再按照50个就会有消息被重复发送,例如虽然消息里面有www.baidu.com,但是rpc接口里面死活没有,这个时候再去重新往kafka里面投重新消费是不合适的,所以在这里需要遍历response,然后保存进清空了的domainSet里面。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@KafkaListener(
//这里是{},可以配置多个topic
topics = {TOPIC},
groupId = "recommendation-ready-group-bugfix-v1",
containerFactory = "recommendDomainDataListenerContainer"
)
public void listener(List<ConsumerRecord<String, String>> records) {
log.info("RecommendDomainDataListener | size:{}", records.size());
Set<String> domains = new HashSet<>();
Set<String> domainSet = new HashSet<>();
StopWatch sw = new StopWatch();
sw.start();
try {
for(ConsumerRecord<String, String> record :records) {
JSONObject kafkaMsg = JSON.parseObject(record.value());
String domain = kafkaMsg.getString("domain");
domain = DomainUtil.format(domain);
if(StringUtils.isBlank(domain)) {
continue;
}
domains.add(domain);
domainSet.add(domain);
}
batchGetRecommendationDetail(domains,domainSet);
}
catch (Throwable e){
for (String domain : domainSet){
Map<String,String> map = new HashMap<>();
map.put("domain",domain);
kafkaProducer.send(TOPIC, domain, JSON.toJSONString(map));
}
log.error("RecommendDomainDataListener | consumer message error. put in queue {}.", JSON.toJSONString(domains),e);
}
finally {
sw.stop();
log.info("RecommendDomainDataListener | {}, cost: {}", domains.size(), sw.getTime());
}
}

3.GRPC请求

grpc是让远程方法变得像本地方法一样简单,这里用了推荐侧的一个接口,确实只用引用一个jar包就行,背后就是一些复杂的网络请求。这个接口的可以一次传50个domain,状态码主要关注429,这里是触发了限流,需要按照上面的方法进行重新消费,消费多少发送多少相同的域名。

grpc方法的qps是1,触发了限流要降速,Thread.sleep(1000);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public void batchGetRecommendationDetail(Set<String> domains,Set<String> domainSet) throws InterruptedException {
Recommendation.GetDomainInfosRequest request = Recommendation.GetDomainInfosRequest.newBuilder()
.addAllDomains(domains)
.setDomainSourceValue(DOMAIN_SOURCE_GRPC_VALUE).build();
Recommendation.GetDomainInfosResponse response = recommendationClient.getDomainInfos(request);
if (response != null) {
if (!response.getSuccess() || response.getCode() != GRPC_SUCCESS_CODE){
//触发grpc限流,该方法降速,重新消费
Thread.sleep(1000);
throw new RuntimeException(String.valueOf(response.getCode()));
}
log.info("getRecommendationDetail. code:{}",response.getCode());
if(!CollectionUtils.isEmpty(response.getDataList())){
List<Recommendation.DomainInfo> domainInfoList = response.getDataList();
for (Recommendation.DomainInfo domainInfo : domainInfoList){
domainSet.add(domainInfo.getDomain());
}
for (Recommendation.DomainInfo domainInfo : domainInfoList){
convert(domainInfo,domainSet);
}
}
}
else {
throw new RuntimeException("Recommendation.GetDomainInfosResponse returns null");
}
}

4.转换对象

这个没什么好说的,就是拷贝属性到当前项目的bean

总结

主要熟悉了测试环境和线上环境,以及kafka的一些配置和机制,对于异常处理的理解加深了。

需求2:定时发送新域名

需要每天的00:00去es里面检查昨天新增的域名,通过kafka发送消息。

  1. 计算昨天00:00到23:59的时间戳:由于字段里面只有updateTime没有createTime,而且也是用时间戳来保存的,要先计算一下时间。
  2. 拼es的查询语句:range来查询时间,synFlag是防止重复发送的,有这个字段的可以认为已经发过了。
  3. scrollSearch循环查找,不了解,但是大家都是这样用的。

es的操作跟数据库很像,而且网易封装的很好,index就相当于数据库表名,DomainDataIndex可以看作mybatis的实现类,AbstractIndex就是BaseMapper那种用来基础的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@XxlJob("newDomainSyncRecommend")
public ReturnT<String> newDomainSyncRecommend (String param) throws ParseException {
//昨天的同一时刻
long yesterday = System.currentTimeMillis() - 1000 * 3600 * 24;
//昨天零点零分零秒的毫秒数
long zero = yesterday / (1000 * 3600 * 24) * (1000 * 3600 * 24) - TimeZone.getDefault().getRawOffset();//今天零点零分零秒的毫秒数
//昨天23点59分59秒的毫秒数
long twelve = zero + 24 * 60 * 60 * 1000 - 1;
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.filter(QueryBuilders.rangeQuery("updateTime")
.gte(zero)
.lte(twelve));
boolQuery.mustNot(QueryBuilders.existsQuery("synFlag"));
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder()
.query(boolQuery) // 查询参数
.fetchSource(new String[]{"domain"},
null)
.size(100);
if (domainDataIndex == null) {
log.error("domainDataIndex is null");
}
domainDataIndex.scrollSearch(sourceBuilder, searchHits -> {
for (SearchHit searchHit : searchHits) {
String domain = searchHit.getId();
try {
//更新同步标识
Map<String,Boolean> updateMap = new HashMap<>();
updateMap.put("synFlag",true);
domainDataIndex.updateByProcessor(domain,updateMap);
log.info("domainDataIndex upsert success ,domain : {}",domain);
}
catch (Throwable e){
log.error("domainDataIndex upsert failed ,domain : {}",domain,e);
continue;
}
//组织kafka消息
JSONObject jsonObject = new JSONObject();
jsonObject.put("domain",domain);
kafkaProducer.send("new_domain_sync_recommend_topic",domain,JSON.toJSONString(jsonObject));
log.info("kafka send success ,domain : {}",domain);
}
return false;
});
return ReturnT.SUCCESS;
}