想到之前在黑马头条看到的流式实时更新排行,遂想用在这个项目里。
目标是实时计算每一条博客的评论数,点赞数,还有浏览数收藏数来计算分数,更新在redis上,redis使用zset完成排序。
Todo:其实可以分离,点赞评论这一系列数据可以放在mongodb里面,不用写在sql中,后续改造这部分内容。
HyperLogLog 浏览量想用redis的这个hyperloglog实现,目前学的比较肤浅,这里只是把它当作一个有概率丢失但容量很大并且很高效的set。主要用stringRedisTemplate.opsForHyperLogLog()中的size和add,很简单前者是有多少个,后者是添加
KafkaStream实现单词统计 先从配置说起,hosts与kafka本身是一致的,APPLICATION_ID_CONFIG表示的是stream的唯一标识符,Kafka Streams利用这个ID来创建一个内部的消费组,跟踪处理的偏移量,并生成应用的状态存储。
CLIENT_ID_CONFIG是跟踪消费者的网络状态的客户端组
多个 CLIENT_ID_CONFIG 可以对应一个 APPLICATION_ID_CONFIG,这种配置允许在一个Kafka Streams应用中,细粒度地控制和监控不同的客户端实例或任务。这对于大型、复杂的应用尤其有用,使得在监控和调试中可以精确定位问题和分析性能。
APPLICATION_ID_CONFIG 定义了Kafka Streams应用的全局标识,影响整个应用的行为、状态管理和数据分区处理。
CLIENT_ID_CONFIG 则用于标识具体的客户端实例或任务,便于监控、日志和调试。 这种配置方式增强了应用的灵活性和可管理性,使得在复杂的流处理场景中,能够更有效地跟踪和优化每个组件的表现。
这里我们只有很简单的应用,直接就是一对一
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @ConfigurationProperties(prefix = "kafka") @Configuration @EnableKafkaStreams @Data public class KafkaStreamConfig { private static final int MAX_MESSAGE_SIZE = 16 *1024 *1024 ; private String hosts; private String group; @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) public KafkaStreamsConfiguration defaultKafkaStreamsConfig () { Map<String, Object> props = new HashMap <>(); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts); props.put(StreamsConfig.APPLICATION_ID_CONFIG, this .getGroup()+"_stream_aid" ); props.put(StreamsConfig.CLIENT_ID_CONFIG, this .getGroup()+"_stream_cid" ); props.put(StreamsConfig.RETRIES_CONFIG, 10 ); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); return new KafkaStreamsConfiguration (props); } }
测试用例:
随机在这几种颜色里发送给kafka一个消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Autowired private KafkaTemplate<String,String> kafkaTemplate;@Test public void KafkaTest () throws InterruptedException { String[] field = new String []{"red" ,"blue" ,"yellow" ,"green" ,"black" ,"white" }; Random random = new Random (); while (true ){ try { Thread.sleep(1000 ); String s = field[random.nextInt(5 )]; System.out.println(s); kafkaTemplate.send("itcast-topic-input" ,s); } catch (Exception e){ e.printStackTrace(); } } }
流式应用:
由于流式是不具有缓存功能的,也就是说一段时间内得到的结果只能代表这一段时间。不能与历史记录累加,这个时候就要存redis了,把每一段时间的数据都累加。
数据流向:测试线程一直发给itcast-topic-input主题,流式应用监听itcast-topic-input,通过处理得到键为单词,值为数量的键值对,传给itcast-topic-output主题。最后再有一个监听器把这些数据与redis进行处理和持久化。
输入的数据是一个字符串,例如“apple apple tea tea”,对于这个构建的流来说,key为空,value为这个字符串,我们首先需要将这个字符串划分为四个键值对,要保留key的同时value变成各个单词(这里不能用map,map是一对一的,这里涉及拆分,需要一对多),得到了key为null,值为单词的很多键值对, 对这些键值对分组,key为它的值,也就是说所有是一个单词的分在一个组里面,方便后续统计。
随后可以用count直接得出每个组的数量,但是我嫌太简单了,用aggregate,这个方法有三个参数。作为聚合,你需要给这个聚合器一个初始的数据类型和数据,后续再对其进行叠加,第一个函数是这个初始化的动作,第二个函数有三个参数,分别是k,v和这个叠加器的传递值。第三个函数指定序列化类型,这个我也没太搞懂
在这个应用中,迭代器的初始为0,由于aggregate是对每个组进行聚合,因此迭代器只用每次将初始的值+1即可。
最后收集这个数据进行转换,发送到对应主题给redis监听。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Bean public KStream<String,String> kStream (StreamsBuilder streamsBuilder) { KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input" ); stream.flatMapValues(((value)->Arrays.asList(value.split(" " )))) .groupBy((key,value)->value) .windowedBy(TimeWindows.of(Duration.ofSeconds(10 ))) .aggregate(()->"0" ,(key,value,aggValue)->{ Long i = Long.parseLong(aggValue)+1 ; return i+"" ; }) .toStream() .map(new KeyValueMapper <Windowed<String>, String, KeyValue<?, ?>>() { @Override public KeyValue<String, String> apply (Windowed<String> key, String value) { return new KeyValue <>(key.key(),value); } }) .to("itcast-topic-output" ); return stream; }
监听器:
先查找redis有没有这个key,后续就是简单的加上
1 2 3 4 5 6 7 8 9 10 11 12 13 @KafkaListener(topics = "itcast-topic-out") public void handleMessage (ConsumerRecord<String,String> message) { Double score = stringRedisTemplate.opsForZSet().score(REDIS_WORD, message.key()); if (score == null ){ stringRedisTemplate.opsForZSet().add(REDIS_WORD,message.key(), Double.parseDouble(message.value())); } else { stringRedisTemplate.opsForZSet().add(REDIS_WORD,message.key(), score+Double.parseDouble(message.value())); } System.out.println("总计:" +message.key()+":" +stringRedisTemplate.opsForZSet().score(REDIS_WORD, message.key())); }
KafkaStream改造项目 定义两个实体类:
一个是传给kafkastream的消息实体,也就是要告诉stream那一篇文章,有了什么动作,点赞了还是评论了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Data public class UpdateBlogMess { private Long id; private UpdateType updateType; private int add; public enum UpdateType { VIEW,LIKED,COMMENT } }
另一个是传出kafkastream的消息实体,输出处理后的当前片段内blog的一些统计结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Data public class BlogVisitedStreamMess { private Long id; private Long view; private Long liked; private Long comment; }
流式应用:
如上所述,这里的输入是key为空,值为UpdateBlogMess的json字符串,
首先要把json反序列化,映射到key为id,值为类型和数量,例如liked:1,
再对key进行分组,很多操作的都是同一篇文章。
聚合:根据对象的操作类型在对应的未知+1,初始化是一个”VIEW:0;COMMENT:0;LIKED:0”,后续的聚合都在这上面分割和校验。这里只测试了点赞的,对于view这个字段我打算是直接使用HyperLogLog来统计。评论这个功能暂时没做。
UV统计近一个礼拜的文章访问量。
最后返回一个key为文章id,值为BlogVisitedStreamMess的Json字符串
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 private static final String KAFKA_STREAM_HOT_BLOG = "hotBlog-produce" ;private static final String KAFKA_STREAM_HOT_BLOG_CONSUMER = "hotBlog-consumer" ;@Resource private StringRedisTemplate stringRedisTemplate;@Bean public KStream<String,String> kStream (StreamsBuilder streamsBuilder) { KStream<String, String> stream = streamsBuilder.stream(KAFKA_STREAM_HOT_BLOG); stream.map(new KeyValueMapper <String, String, KeyValue<String , String>>() { @Override public KeyValue<String, String> apply (String key, String value) { UpdateBlogMess blogMess = JSONUtil.toBean(value, UpdateBlogMess.class); return new KeyValue <>(blogMess.getId()+"" ,blogMess.getUpdateType().name()+":" +blogMess.getAdd()); } }) .groupBy((key,value)->key) .windowedBy(TimeWindows.of(Duration.ofSeconds(10 ))) .aggregate(new Initializer <String>() { @Override public String apply () { return "VIEW:0;COMMENT:0;LIKED:0" ; } }, new Aggregator <String, String, String>() { @Override public String apply (String key, String value, String aggValue) { if (StringUtils.isBlank(value)){ return aggValue; } long viewNum = 0L , commentNum = 0L , likedNum = 0L ; String[] splits = aggValue.split(";" ); for (String split : splits) { String[] type = split.split(":" ); switch (UpdateBlogMess.UpdateType.valueOf(type[0 ])) { case VIEW: viewNum = Long.parseLong(type[1 ]); break ; case LIKED: likedNum = Long.parseLong(type[1 ]); break ; case COMMENT: commentNum = Long.parseLong(type[1 ]); break ; } } String[] splitVal = value.split(":" ); switch (UpdateBlogMess.UpdateType.valueOf(splitVal[0 ])) { case VIEW: viewNum = calculateUV(key); break ; case LIKED: likedNum += Long.parseLong(splitVal[1 ]); break ; case COMMENT: commentNum += 1 ; break ; } return "VIEW:" + viewNum + ";COMMENT:" + commentNum + ";LIKED:" + likedNum; } }) .toStream().map((key,value)->{ BlogVisitedStreamMess blogVisitedStreamMess = new BlogVisitedStreamMess (); blogVisitedStreamMess.setId(Long.valueOf(key.key())); String[] splits = value.split(";" ); for (String split:splits){ String[] strings = split.split(":" ); switch (UpdateBlogMess.UpdateType.valueOf(strings[0 ])){ case LIKED: blogVisitedStreamMess.setLiked(Long.valueOf(strings[1 ])); break ; case COMMENT: blogVisitedStreamMess.setComment(Long.parseLong(strings[1 ])); break ; case VIEW: blogVisitedStreamMess.setView(Long.parseLong(strings[1 ])); break ; } } return new KeyValue <>(key.key(),JSONUtil.toJsonStr(blogVisitedStreamMess)); }) .to(KAFKA_STREAM_HOT_BLOG_CONSUMER); return stream; } private static final String HyperLogLogPrefix = "HLL:blog:" ;public Long calculateUV (String key) { long UV4Week = 0L ; LocalDate now = LocalDate.now(); List<LocalDate> lastWeekDates = IntStream.rangeClosed(1 , 7 ) .mapToObj(now::minusDays) .collect(Collectors.toList()); long id = Long.parseLong(key); for (LocalDate localDate:lastWeekDates){ String date = localDate.format(DateTimeFormatter.ofPattern("yyyy:MM:dd" )); UV4Week += stringRedisTemplate.opsForHyperLogLog().size(HyperLogLogPrefix + ":" + id + date); } return UV4Week; }
后续监听器根据这个消息操作redis,但是由于黑马点评这个项目里查询热点文章是直接走数据库,根据likes的升序查询,所有要改很多部分。今天先介绍到这里。
2026.6.11更新
实现了用xxl-job定期统计UV然后发送给管道.
这里还可以优化为scan命令,如果用key的话会在redis里全局找,开销很大。现在数据量小点还没关系,一旦大起来就效率很低了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public Set<String> scanKeys (String pattern) { return stringRedisTemplate.keys("*" + pattern + "*" ); } @XxlJob("demoJobHandler") public void viewSchedule () { Set<String> keys = scanKeys(HLL_PREFIX); for (String key :keys){ Long size = stringRedisTemplate.opsForHyperLogLog().size(key); UpdateBlogMess mess = new UpdateBlogMess (); mess.setUpdateType(UpdateBlogMess.UpdateType.VIEW); mess.setAdd(Math.toIntExact(size)); mess.setId(Long.valueOf(key.split(":" )[2 ])); log.debug(mess.toString()); kafkaTemplate.send(KAFKA_STREAM_HOT_BLOG, JSONUtil.toJsonStr(mess)); } }