想到之前在黑马头条看到的流式实时更新排行,遂想用在这个项目里。

目标是实时计算每一条博客的评论数,点赞数,还有浏览数收藏数来计算分数,更新在redis上,redis使用zset完成排序。

Todo:其实可以分离,点赞评论这一系列数据可以放在mongodb里面,不用写在sql中,后续改造这部分内容。

HyperLogLog

浏览量想用redis的这个hyperloglog实现,目前学的比较肤浅,这里只是把它当作一个有概率丢失但容量很大并且很高效的set。主要用stringRedisTemplate.opsForHyperLogLog()中的size和add,很简单前者是有多少个,后者是添加

KafkaStream实现单词统计

先从配置说起,hosts与kafka本身是一致的,APPLICATION_ID_CONFIG表示的是stream的唯一标识符,Kafka Streams利用这个ID来创建一个内部的消费组,跟踪处理的偏移量,并生成应用的状态存储。

CLIENT_ID_CONFIG是跟踪消费者的网络状态的客户端组

多个 CLIENT_ID_CONFIG 可以对应一个 APPLICATION_ID_CONFIG,这种配置允许在一个Kafka Streams应用中,细粒度地控制和监控不同的客户端实例或任务。这对于大型、复杂的应用尤其有用,使得在监控和调试中可以精确定位问题和分析性能。

  • APPLICATION_ID_CONFIG 定义了Kafka Streams应用的全局标识,影响整个应用的行为、状态管理和数据分区处理。
  • CLIENT_ID_CONFIG 则用于标识具体的客户端实例或任务,便于监控、日志和调试。
    这种配置方式增强了应用的灵活性和可管理性,使得在复杂的流处理场景中,能够更有效地跟踪和优化每个组件的表现。

这里我们只有很简单的应用,直接就是一对一

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@ConfigurationProperties(prefix = "kafka")
@Configuration
@EnableKafkaStreams
@Data
public class KafkaStreamConfig {
private static final int MAX_MESSAGE_SIZE = 16*1024*1024;
private String hosts;
private String group;

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration defaultKafkaStreamsConfig(){
Map<String, Object> props = new HashMap<>();
//ip地址和端口号
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
//全局标识
props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
//客户端标识
props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
props.put(StreamsConfig.RETRIES_CONFIG, 10);
//key的序列化和反序列化都为string
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
//value,也就是说键值对都得是String的,后面有坑点
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return new KafkaStreamsConfiguration(props);
}
}

测试用例:

随机在这几种颜色里发送给kafka一个消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@Test
public void KafkaTest() throws InterruptedException {
String[] field = new String[]{"red","blue","yellow","green","black","white"};
Random random = new Random();
while (true){
try {
Thread.sleep(1000);
String s = field[random.nextInt(5)];
System.out.println(s);
kafkaTemplate.send("itcast-topic-input",s);
}
catch (Exception e){
e.printStackTrace();
}
}

}

流式应用:

由于流式是不具有缓存功能的,也就是说一段时间内得到的结果只能代表这一段时间。不能与历史记录累加,这个时候就要存redis了,把每一段时间的数据都累加。

数据流向:测试线程一直发给itcast-topic-input主题,流式应用监听itcast-topic-input,通过处理得到键为单词,值为数量的键值对,传给itcast-topic-output主题。最后再有一个监听器把这些数据与redis进行处理和持久化。

输入的数据是一个字符串,例如“apple apple tea tea”,对于这个构建的流来说,key为空,value为这个字符串,我们首先需要将这个字符串划分为四个键值对,要保留key的同时value变成各个单词(这里不能用map,map是一对一的,这里涉及拆分,需要一对多),得到了key为null,值为单词的很多键值对,
对这些键值对分组,key为它的值,也就是说所有是一个单词的分在一个组里面,方便后续统计。

随后可以用count直接得出每个组的数量,但是我嫌太简单了,用aggregate,这个方法有三个参数。作为聚合,你需要给这个聚合器一个初始的数据类型和数据,后续再对其进行叠加,第一个函数是这个初始化的动作,第二个函数有三个参数,分别是k,v和这个叠加器的传递值。第三个函数指定序列化类型,这个我也没太搞懂

在这个应用中,迭代器的初始为0,由于aggregate是对每个组进行聚合,因此迭代器只用每次将初始的值+1即可。

最后收集这个数据进行转换,发送到对应主题给redis监听。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Bean
public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
stream.flatMapValues(((value)->Arrays.asList(value.split(" "))))
.groupBy((key,value)->value)
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
.aggregate(()->"0",(key,value,aggValue)->{
Long i = Long.parseLong(aggValue)+1;
return i+"";
})
.toStream()
.map(new KeyValueMapper<Windowed<String>, String, KeyValue<?, ?>>() {
@Override
public KeyValue<String, String> apply(Windowed<String> key, String value) {
return new KeyValue<>(key.key(),value);
}
})
.to("itcast-topic-output");
return stream;
}

监听器:

先查找redis有没有这个key,后续就是简单的加上

1
2
3
4
5
6
7
8
9
10
11
12
13
@KafkaListener(topics = "itcast-topic-out")
public void handleMessage(ConsumerRecord<String,String> message){
//先在redis里找有没有这个单词
Double score = stringRedisTemplate.opsForZSet().score(REDIS_WORD, message.key());
if (score == null){
stringRedisTemplate.opsForZSet().add(REDIS_WORD,message.key(), Double.parseDouble(message.value()));
}
else {
stringRedisTemplate.opsForZSet().add(REDIS_WORD,message.key(), score+Double.parseDouble(message.value()));
}
System.out.println("总计:"+message.key()+":"
+stringRedisTemplate.opsForZSet().score(REDIS_WORD, message.key()));
}

KafkaStream改造项目

定义两个实体类:

  • 一个是传给kafkastream的消息实体,也就是要告诉stream那一篇文章,有了什么动作,点赞了还是评论了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Data
public class UpdateBlogMess {
/**
* 文章id
*/
private Long id;

/**
* 更新类型
*/
private UpdateType updateType;

/**
* 更新数值,点赞是1,取消点赞是-1
*/
private int add;
public enum UpdateType{
VIEW,LIKED,COMMENT
}
}
  • 另一个是传出kafkastream的消息实体,输出处理后的当前片段内blog的一些统计结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Data
public class BlogVisitedStreamMess {
/**
* 文章id
*/
private Long id;
/**
* UV统计量
*/
private Long view;
/**
* 点赞的
*/
private Long liked;
/**
* 评论数
*/
private Long comment;
}

流式应用:

如上所述,这里的输入是key为空,值为UpdateBlogMess的json字符串,

  1. 首先要把json反序列化,映射到key为id,值为类型和数量,例如liked:1,
  2. 再对key进行分组,很多操作的都是同一篇文章。
  3. 聚合:根据对象的操作类型在对应的未知+1,初始化是一个”VIEW:0;COMMENT:0;LIKED:0”,后续的聚合都在这上面分割和校验。这里只测试了点赞的,对于view这个字段我打算是直接使用HyperLogLog来统计。评论这个功能暂时没做。
  4. UV统计近一个礼拜的文章访问量。
  5. 最后返回一个key为文章id,值为BlogVisitedStreamMess的Json字符串
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
private static final String KAFKA_STREAM_HOT_BLOG = "hotBlog-produce";
private static final String KAFKA_STREAM_HOT_BLOG_CONSUMER = "hotBlog-consumer";
@Resource
private StringRedisTemplate stringRedisTemplate;

@Bean
public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
KStream<String, String> stream = streamsBuilder.stream(KAFKA_STREAM_HOT_BLOG);
stream.map(new KeyValueMapper<String, String, KeyValue<String , String>>() {
/**
*
* @param key 空
* @param value UpdateBlogMess的Json字符串
* @return key为文章id,value为类型和数量
*/
@Override
public KeyValue<String, String> apply(String key, String value) {
UpdateBlogMess blogMess = JSONUtil.toBean(value, UpdateBlogMess.class);
//key:120312391 value:LIKED:-1

return new KeyValue<>(blogMess.getId()+"",blogMess.getUpdateType().name()+":"+blogMess.getAdd());
}
})
.groupBy((key,value)->key)
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
.aggregate(new Initializer<String>() {
@Override
public String apply() {
return "VIEW:0;COMMENT:0;LIKED:0";
}
}, new Aggregator<String, String, String>() {
@Override
public String apply(String key, String value, String aggValue) {
if(StringUtils.isBlank(value)){
return aggValue;
}
long viewNum = 0L, commentNum = 0L, likedNum = 0L;
//处理aggValue
String[] splits = aggValue.split(";");
for (String split : splits) {
String[] type = split.split(":");
switch (UpdateBlogMess.UpdateType.valueOf(type[0])) {
case VIEW:
viewNum = Long.parseLong(type[1]);
break;
case LIKED:
likedNum = Long.parseLong(type[1]);
break;
case COMMENT:
commentNum = Long.parseLong(type[1]);
break;
}
}
//处理value
String[] splitVal = value.split(":");
switch (UpdateBlogMess.UpdateType.valueOf(splitVal[0])) {
case VIEW:
//查询一周以内的UV访问量
viewNum = calculateUV(key);
break;
case LIKED:
//如果是liked:-1就要-1
likedNum += Long.parseLong(splitVal[1]);
break;
case COMMENT:
commentNum += 1;
break;
}
//最后返回agg
return "VIEW:" + viewNum + ";COMMENT:" + commentNum + ";LIKED:" + likedNum;
}
})
.toStream().map((key,value)->{
BlogVisitedStreamMess blogVisitedStreamMess = new BlogVisitedStreamMess();
blogVisitedStreamMess.setId(Long.valueOf(key.key()));
String[] splits = value.split(";");
for (String split:splits){
String[] strings = split.split(":");
switch (UpdateBlogMess.UpdateType.valueOf(strings[0])){
case LIKED:
blogVisitedStreamMess.setLiked(Long.valueOf(strings[1]));
break;
case COMMENT:
blogVisitedStreamMess.setComment(Long.parseLong(strings[1]));
break;
case VIEW:
blogVisitedStreamMess.setView(Long.parseLong(strings[1]));
break;
}
}
return new KeyValue<>(key.key(),JSONUtil.toJsonStr(blogVisitedStreamMess));
})
.to(KAFKA_STREAM_HOT_BLOG_CONSUMER);

return stream;
}
private static final String HyperLogLogPrefix = "HLL:blog:";
/**
* 统计一周以内的UV访问量
* @return
*/
public Long calculateUV(String key){
long UV4Week = 0L;
LocalDate now = LocalDate.now();
//不够优雅
/*String yesterday = now.minusDays(1).format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
String theDayBeforeYesterday = now.minusDays(2).format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));*/
List<LocalDate> lastWeekDates = IntStream.rangeClosed(1, 7)
.mapToObj(now::minusDays)
.collect(Collectors.toList());

//blog的id
long id = Long.parseLong(key);
for (LocalDate localDate:lastWeekDates){
String date = localDate.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
UV4Week += stringRedisTemplate.opsForHyperLogLog().size(HyperLogLogPrefix + ":" + id + date);
}
return UV4Week;
}

后续监听器根据这个消息操作redis,但是由于黑马点评这个项目里查询热点文章是直接走数据库,根据likes的升序查询,所有要改很多部分。今天先介绍到这里。

2026.6.11更新

实现了用xxl-job定期统计UV然后发送给管道.

这里还可以优化为scan命令,如果用key的话会在redis里全局找,开销很大。现在数据量小点还没关系,一旦大起来就效率很低了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* 使用 SCAN 命令获取与给定模式匹配的键
*
* @param pattern 模式,例如 "*HLL_PREFIX*"
* @return 匹配的键集合
*/
public Set<String> scanKeys(String pattern) {
return stringRedisTemplate.keys("*" + pattern + "*");
}
/**
* 定时给kafka管道发view的消息,从HyperLogLog里找
*/
@XxlJob("demoJobHandler")
public void viewSchedule(){
//查找所有文章
Set<String> keys = scanKeys(HLL_PREFIX);
for (String key :keys){
Long size = stringRedisTemplate.opsForHyperLogLog().size(key);
UpdateBlogMess mess = new UpdateBlogMess();
mess.setUpdateType(UpdateBlogMess.UpdateType.VIEW);
mess.setAdd(Math.toIntExact(size));
mess.setId(Long.valueOf(key.split(":")[2]));
log.debug(mess.toString());
kafkaTemplate.send(KAFKA_STREAM_HOT_BLOG, JSONUtil.toJsonStr(mess));
}
}