实习2.0-项目总结
实习拷打
项目介绍
网易外贸通是一个为外贸企业打造的一站式外贸营销平台,主要的功能包括数据获客,邮件营销,客户管理和订单追踪等。
我所在的组是做数据获客的,主要业务就是让用户能够通过我们的应用查到他的潜在客户。就比如一个想做服装贸易的就可以通过我们的数据找到对应的国外买家。
这一部分数据获取是多渠道的,我们通过海关每年的hscode,各种类似盘踞这些网站采购,以及一些爬虫技术拿到了大概六千万家公司数据。我们的业务一方面是搜索功能,所以我们数据层没用mysql多用的是es;
另一部分是数据层,多渠道去搜集这些公司作为我们的信息。
数据双向同步链路-落库
- 目的:我们这边是做数据的,另外一边算法组做的是域名行业分类,意思就是他们把域名对应的html通过某种模型预测一个行业标签,我们组展示需要这个tag,但是他们落库不在我们这边,于是就将这部分有tag的域名推送给我们,kafka消息里面传的是这个domain,但是具体的标签信息需要我们去请求grpc获取。
- 解决方案:对于这部分工作我首先是给entity新增了关于域名分类的字段,然后写了一个kafka listener批量消费,grpc接口能够一次传50个域名然后返回50个对应的行业信息。
- 遇到问题:由于grpc限流以及有可能域名查不到,会造成消息丢失。他那边grpc接口设置是1秒一次,因此我要保证他那边报错了消息重新消费。
- 解决问题:两个set,一个全局set一个失败set,错误可能发生在kafka消费的时候、拿着所有50条去请求rpc接口的时候、rpc接口报错或者没有数据的时候、es更新的时候:
(1)kafka消费报错,这里是手动ack的,所以一旦没有消费成功offset也没偏移,一般不会出现这种情况。
(2)拿着50条数据请求rpc接口的时候:在消息消费的时候就组装了这两个set,如果这里出现问题,也能保证这50条不提交
(3)rpc接口报错或者没有数据的时候:例如限流了,此时也可以返回这50条;但如果返回有效需要清空这个失败set,按照实际他返回的数据再去组装,因为有可能这个域名他就是没有数据,允许一部分这种差异。
(4)es落库一条从这个失败set里面移除一条。
最终效果是推送了900w条数据,根据新增的这个recommend字段exist查询已落库数量,成功率接近100%,分析了一些差错case,原因基本上是动态问题,原本推送的某些域名后续调整他把字段删除了,因此rpc接口请求不到。
数据双向同步链路-推送
- 目的:我们数据组每天都会搜集域名,算法组需要这些增量数据,因此需要一个topic每天定时通过kafka发送新增域名
- 解决方案:xxljob设置每天凌晨推送新增domain数据,根据createTime来看,然后深度分页用scrollSearch解决
- 遇到问题:他这边没有createTime,只有updateTime,这就可能导致重复发送了,因此需要一个发送的tag,与mt商量后同意这种做法了
- 解决问题:发送的时候同时给数据新增一个字段flag,查询的时候去看有没有这个字段,有并且又是昨天的就发送。
订阅更新任务
原本逻辑:每周每个账号订阅的公司消息进行处理和监听;
后来逻辑:增加一个watchTime字段,如果用户在这一周查看过这个订阅更新界面就刷新。
改造方法:查询接口crud这个字段,一天watchtime只更新一次。然后xxljob里面的执行器加一个这个过滤即可,然后将这些作为一个消息发送给kafka
kafka消费者拿到这些id去数据库找对应的公司,将订阅公司表项和公司实体项(从es来的,如果前者有companyId就直接查询,如果没有就需要根据名字和地区来查询并保存)作为参数传分别分析facebook信息,facebook提及信息,海关信息,联系人信息,开了一个线程池并行处理这些。
原来这块是顺序的,因为本就是离线任务,触发的频率也不高,后续改成了线程池,用了异步编排,以便后续扩展
1 | private static final ExecutorService executor = new ThreadPoolExecutor(5, |