需求1:与推荐侧数据同步
1.配置kafka
kafka批量消费,设置消费者组,使得每次消费的时候从头消费
消费者组能从头开始消费必须得有消息,也就是说如果消息过期了还是消费不了的,这个过期时间在delete.retention.ms里面配置,当前topic的过期时间是604800000,也就是一个礼拜。
还有一种办法是通过kafka给定的某个脚本参数来指定offset的消费位置。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> recommendDomainDataListenerContainer(KafkaProperties kafkaProperties) { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); Map<String, Object> map = kafkaProperties.buildConsumerProperties(); map.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); map.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 10 * 60 * 1000); map.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000); map.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 60 * 1000); map.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(map); factory.setConsumerFactory(consumerFactory); factory.setConcurrency(2); factory.setBatchListener(true); return factory; }
|
2.配置kafka的消费者
由于是批量请求rpc接口,因此需要解决请求失败带来的消息丢失问题,意思是如果请求失败了,这些domain还会被消费,这里的思路是用一个set来保存没有消费的domain,如果在某处出错了,就会重新往kafka里面发消息,实现不漏消息。
2024.12.2 在这上面栽了跟头
想法很美好但是实际上线还是出现了漏消息,成功率只有90%。
首先这个domainSet不能设置成全局变量,因为所有的线程都会更改这个变量,会有复杂的并发安全问题。其次是可能出现异常的时间点:
- 消费的时候就出现问题,可能性很小
- 拿着所有50条去请求rpc接口的时候
- rpc接口失败或者别的状态码
- es更新出错
由此可见需要在rpc接口请求之前就要初始化domainSet,所以在消费消息的时候就需要同时加入domainSet。但如果rpc接口请求成功了,需要清空domainSet,为什么呢?因为有可能domain是50个,那边只有40个可返回域名,如果再按照50个就会有消息被重复发送,例如虽然消息里面有www.baidu.com,但是rpc接口里面死活没有,这个时候再去重新往kafka里面投重新消费是不合适的,所以在这里需要遍历response,然后保存进清空了的domainSet里面。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| @KafkaListener( //这里是{},可以配置多个topic topics = {TOPIC}, groupId = "recommendation-ready-group-bugfix-v1", containerFactory = "recommendDomainDataListenerContainer" ) public void listener(List<ConsumerRecord<String, String>> records) { log.info("RecommendDomainDataListener | size:{}", records.size()); Set<String> domains = new HashSet<>(); Set<String> domainSet = new HashSet<>(); StopWatch sw = new StopWatch(); sw.start(); try { for(ConsumerRecord<String, String> record :records) { JSONObject kafkaMsg = JSON.parseObject(record.value()); String domain = kafkaMsg.getString("domain"); domain = DomainUtil.format(domain); if(StringUtils.isBlank(domain)) { continue; } domains.add(domain); domainSet.add(domain); } batchGetRecommendationDetail(domains,domainSet); } catch (Throwable e){ for (String domain : domainSet){ Map<String,String> map = new HashMap<>(); map.put("domain",domain); kafkaProducer.send(TOPIC, domain, JSON.toJSONString(map)); } log.error("RecommendDomainDataListener | consumer message error. put in queue {}.", JSON.toJSONString(domains),e); } finally { sw.stop(); log.info("RecommendDomainDataListener | {}, cost: {}", domains.size(), sw.getTime()); } }
|
3.GRPC请求
grpc是让远程方法变得像本地方法一样简单,这里用了推荐侧的一个接口,确实只用引用一个jar包就行,背后就是一些复杂的网络请求。这个接口的可以一次传50个domain,状态码主要关注429,这里是触发了限流,需要按照上面的方法进行重新消费,消费多少发送多少相同的域名。
grpc方法的qps是1,触发了限流要降速,Thread.sleep(1000);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public void batchGetRecommendationDetail(Set<String> domains,Set<String> domainSet) throws InterruptedException { Recommendation.GetDomainInfosRequest request = Recommendation.GetDomainInfosRequest.newBuilder() .addAllDomains(domains) .setDomainSourceValue(DOMAIN_SOURCE_GRPC_VALUE).build(); Recommendation.GetDomainInfosResponse response = recommendationClient.getDomainInfos(request); if (response != null) { if (!response.getSuccess() || response.getCode() != GRPC_SUCCESS_CODE){ Thread.sleep(1000); throw new RuntimeException(String.valueOf(response.getCode())); } log.info("getRecommendationDetail. code:{}",response.getCode()); if(!CollectionUtils.isEmpty(response.getDataList())){ List<Recommendation.DomainInfo> domainInfoList = response.getDataList(); for (Recommendation.DomainInfo domainInfo : domainInfoList){ domainSet.add(domainInfo.getDomain()); } for (Recommendation.DomainInfo domainInfo : domainInfoList){ convert(domainInfo,domainSet); } } } else { throw new RuntimeException("Recommendation.GetDomainInfosResponse returns null"); } }
|
4.转换对象
这个没什么好说的,就是拷贝属性到当前项目的bean
总结
主要熟悉了测试环境和线上环境,以及kafka的一些配置和机制,对于异常处理的理解加深了。
需求2:定时发送新域名
需要每天的00:00去es里面检查昨天新增的域名,通过kafka发送消息。
- 计算昨天00:00到23:59的时间戳:由于字段里面只有updateTime没有createTime,而且也是用时间戳来保存的,要先计算一下时间。
- 拼es的查询语句:range来查询时间,synFlag是防止重复发送的,有这个字段的可以认为已经发过了。
- scrollSearch循环查找,不了解,但是大家都是这样用的。
es的操作跟数据库很像,而且网易封装的很好,index就相当于数据库表名,DomainDataIndex可以看作mybatis的实现类,AbstractIndex就是BaseMapper那种用来基础的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| @XxlJob("newDomainSyncRecommend") public ReturnT<String> newDomainSyncRecommend (String param) throws ParseException { long yesterday = System.currentTimeMillis() - 1000 * 3600 * 24; long zero = yesterday / (1000 * 3600 * 24) * (1000 * 3600 * 24) - TimeZone.getDefault().getRawOffset(); long twelve = zero + 24 * 60 * 60 * 1000 - 1; BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); boolQuery.filter(QueryBuilders.rangeQuery("updateTime") .gte(zero) .lte(twelve)); boolQuery.mustNot(QueryBuilders.existsQuery("synFlag")); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder() .query(boolQuery) .fetchSource(new String[]{"domain"}, null) .size(100); if (domainDataIndex == null) { log.error("domainDataIndex is null"); } domainDataIndex.scrollSearch(sourceBuilder, searchHits -> { for (SearchHit searchHit : searchHits) { String domain = searchHit.getId(); try { Map<String,Boolean> updateMap = new HashMap<>(); updateMap.put("synFlag",true); domainDataIndex.updateByProcessor(domain,updateMap); log.info("domainDataIndex upsert success ,domain : {}",domain); } catch (Throwable e){ log.error("domainDataIndex upsert failed ,domain : {}",domain,e); continue; } JSONObject jsonObject = new JSONObject(); jsonObject.put("domain",domain); kafkaProducer.send("new_domain_sync_recommend_topic",domain,JSON.toJSONString(jsonObject)); log.info("kafka send success ,domain : {}",domain); } return false; }); return ReturnT.SUCCESS; }
|