项目背景
老规矩,先讲讲项目背景。可跳过。
小工具类的微系统。
我们会有一些文本语义描述的事件。譬如某小区两户人家因为宠物发生了争吵,比如某人拨打12345热线反映小区深夜还在跳广场舞等等。这些统称事件。
小学语文老师告诉我们描述事件的叙述文三要素,时间地点人物。
所以我们需要通过NLP从事件里提取出来这3个关键要素。
我们今天主要讨论人物,由事件到人。哪些具体的人(必须是非常精准的人,他的户籍常住身份手机车牌,而不是某个姓名符号)出现在这个事件。
上面提到的事件描述里会有人的若干相关信息描述,包括姓名身份证号码电话车牌等。这些信息都是零散的语义描述。
比如2023年5月20日,张三,女,3343身份证号码1000001,反映住其对面的李四(手机号:1593432322)长期把垃圾丢到过道里,影响楼道卫生案例。
通过NLP或者简单正则,我们能提取出来两个姓名,1个身份证号码,1个手机号。
那么身份证号码或者手机号到底属于张三还是李四一个人还是两个人呢?
简单的NLP是没办法做到的,最近大火的chatGPT倒是可以,虽然准确率不能达到百分之百。
比如输入上面一段事件描述,再提问“里面出现了哪些人,并将他们的身份证号码和手机号分别输出”是可以达到我们想要的效果的。
但是毕竟不可能在内网线上生产环境使用。
所以我们只能根据常口库来进行关联。这里面有相对精确的人口数据(也有滞后,换了手机号码车牌什么的)。
身份证肯定能关联出唯一的一个人。
手机号和车牌则不一定。
而且常口库只是某小部份,有很多手机或车牌根本关联不到常口库。
如果手机号关联不到常口库也把他加入常口库作为一个人口信息。
你能想象吗?
常口库里就两个字段ID和手机号有值,其它的比如,姓名,身份证,车牌,户籍地,常住地,性别,年龄,籍贯,学历,家族成员,政治面貌等等等等全部一片空白。
如果后期协p调y到了新的常口数据,比如进来户籍人,通过手机号把上面那条奇葩数据的手机号关联上了,两条数据再做合并。
世上的技术千篇一律,奇葩的需求花枝招展
上面文字不全是为了吐槽,而是说明,解析一条记录过后,需要做很多条件判断,IO读写。比较耗时。
且为什么会有由事到人过后再有由人到事。
技术背景
项目背景简单说,有一批文本语义描述,提取其中的的人员要素(身份证,手机,车牌等)将真人进行关联,再将真人与文本事件进行关联。这其中有两个方向,一是从文本事件关联到人,二是从人关联到文本事件。
处理流程:数据入到系统,调用算法解析,基础数据入库,然后写入消息队列,异步处理由事到人的关联。
数据流量:以区县为基本单位部署,售前和产品了解到年事件量几十万级别。平均日上报量甚至不到1000。这个数量级可以说是非常少了。前面说到,每个事件的关联处理相对比较耗时,在秒级。
满打满算,按日均1000条数据算,单个线程处理也在20分钟以内完成,完全是可以接受的。历史数据是部署前就直接跑完,所以只考虑到新增数据即可。
如果客户不按常理出牌,几天导入一次,也可以在apollo配置里面通过spring.kafka.listener.concurrency
参数来增加kafka消费端线程数,并发加速处理。
简化流程后的伪代码:
// 1.根据事件解析要素查询 是否关联常口表
List<TPersonInfo> personInfos = personInfoMapper.selectList(eventItem);
List<TWarningRecordDTO> warningRecords = new ArrayList<>();
for (TPersonInfo person : personInfos){
String personId = null;
// 2.是否重点人
if(person.getIsKeyPerson().intValue() == 1){
personId = person.getId();
} else {
personId = SnowflakeIdUtil.snowflakeId();
}
// 3.是否存在于新增人员表
TRiskPerson rperson = riskPersonMapper.selectOne();
// 4.upsert新增人员表
if (rperson != null) {
// 5.update by personId
} else {
// 6.insert by personId
}
}
// 7.写入事件-人员关系记录表
但是如果spring.kafka.listener.concurrency>1
变成了多线程,这第4个步骤upsert
新增人员表就有线程安全问题。
如果两个personId同时在做update不同的手机号,那么最终的可能是最后只保留了一个手机号。
这里不多做解释,应该是显而易见的。
因为是单机应用,所以添加同步块即可。
同步锁
public synchronized void upsert(){
// 3.是否存在于新增人员表
TRiskPerson rperson = riskPersonMapper.selectOne();
// 4.upsert新增人员表
if (rperson != null) {
// 5.update by personId
} else {
// 6.insert by personId
}
}
因为其它地方也使用到这段代码,将这段代码单独提出来,加上同步锁,就能解决上面的线程安全问题。
这里解释一下在业务上关于锁的粒度问题。
可以只锁step 5
里的update
操作吗?
在具体的业务里还真是不可以。
在step 3
中,假设两个线程同时执行,同时返回null
,表示数据库没有此人,那么它就会执行两次insert
操作。将会抛出异常:Duplicate entry 'x' for key 'PRIMARY'
因为人员ID和身份证是唯一索引。
说到这个异常,多说两句,这里分为update 和 insert两步操作,没有使用insert into on duplicate key update
,因为这里只有一条数据,横竖只做1次IO操作。
如果是多条数据,最好也别用,因为这个语法可能会造成死锁,以及它有严重的性能问题,后者特别是多条记录同时操作且唯一键冲突比较严重的时候,这里不做展开。
同时复习一下synchronized本身锁粒度问题。
- 这里synchronized加到方法上,因为是非static方法,所以锁对象为当前类的实例对象。等同于:
public void upsert(){
synchronized(this){
}
}
如果是static方法,因为静态方法属于类,所以锁对象为类对象。等同于:
public void upsert(){
synchronized(Demo.class){
}
}
2.如果synchronized同步代码块,参考上面。
3.注意锁对象的安全问题。
业务上的锁粒度
然后上线过后,万万没想到,客户可不是按照每天或每几天导入几条百数据这样的常规操作来,而是半月甚至一个月想起来,导入一次数据。
这样,一次导入的可是几万条数据。按单条数据秒计算,消费端开10个并发线程,最终耗时也是按10小时为单位计。
因为使用的是全局悲观锁,参数过大,锁竞争会越大,所以spring.kafka.listener.concurrency
参数也不是越大越好。
客户觉得太慢了,完全不能接受,想尽快看到数据导入的效果,怎么办?
尝试着分析一下锁的粒度。看能不能再降低一些。
首先,锁的粒度当然越低越好,但通过前面的分析,同步方法在代码上已经属于最小粒度。
但是在业务上呢?
实际上线程安全问题只是针对同一个人。对吧?同一个人才会有写入新增的线程安全问题,不同人之间其实是互不干扰的。
但是同步方法针对的是所有人。所有线程执行到这一步的时候都被阻塞,等待锁。
那么把锁对象降低到人员ID呢?
public void upsert(){
synchronized (personId){
// 3.是否存在于新增人员表
TRiskPerson rperson = riskPersonMapper.selectOne();
// 4.upsert新增人员表
if (rperson != null) {
// 5.update by personId
} else {
// 6.insert by personId
}
}
}
因为经常关联事件的人以万计,所以可想而知,这样的粒度降低肯定会带来较大的性能提升。
经过真实数据测试,万数据可降到小时以内。
乐观锁?
有的读者可能已经看出来了,根据业务场景分析,我们知道,若干个事件关联若干个人,它线程冲突到具体到个人,几率还是比较小的,这是一个典型的适用乐观锁的场景。
java提供的lock
默认是全局锁,因为在业务上的最小粒度已经是个人了,所以我们在这里使用lock
的话得自己构建一个分段锁。
提到分段锁,javaer首先想到的应该就是concurrentHashMap
?
它是怎么实现锁的呢?众所周知,hashmap
由数组+链表组成。hashtable
直接使用synchronized
锁定整个数组,而concurrentHashMap
呢,它通过segment
只锁住数组里面的部份元素。
这样一来,不同segment
的操作不存在竞态条件,而只存在于同一segment
,这时候才需要加锁。从而降低了锁的粒度。
假设数组长度为16,我给0-15每个元素都创建一个锁对象【或者按段来,0-3 4-7 8-11 12-15每个段创建一个锁对象】,当操作不同下标的元素是不会产生竞争和锁等待。
同样的,我们有若干个人,想要把锁的粒度降到最小,就得给每个人都创建一个锁对象。
伪代码:
HashMap<String, ReentrantLock> locks = new HashMap<>();
/**
* 通过人员唯一标识来获取锁,如果不存在则新创建一把锁
* @param personId
* @return
*/
public ReentrantLock getLock(String personId){
ReentrantLock lock = locks.get(personId);
if (lock != null){
return lock;
}
lock = new ReentrantLock();
locks.put(personId, lock);
return lock;
}
public void upsert(){
ReentrantLock lock = getLock(personId);
try {
lock.tryLock(60, TimeUnit.SECONDS);
// 3.是否存在于新增人员表
TRiskPerson rperson = riskPersonMapper.selectOne();
// 4.upsert新增人员表
if (rperson != null) {
// 5.update by personId
} else {
// 6.add by personId
}
} catch (Exception exception) {
//
} finally {
lock.unlock();
}
}
但最终在生产环境不会采用这种方式
总之怎么都是得不偿失的。
上面第1点也可以从ConcurrentHashmap
的源码看出。
1.7的源码
1.8的源码
1.7使用的lock
,到1.8直接换成了synchronized
,因为此时的锁粒度已经降低到了Node(Key-value entry)
级别。这时候锁竞争显著减小,synchronized
比lock
更具优势。
分布式锁
单机应用是怎么用上分布式锁的呢?
你不会是为了技术而技术吧,简称搞事?
这就要怪搞事的**了,之前的逻辑是从事关联到人。
现在要从人关联到事了。
常口库某个人手机号变了,车牌换了个,这个时候他原来手机号关联的事件可能就没了,新添加的手机号可能关联了新的事件。
对吧?
从事到人,从人到事都涉及到人员信息的更新。那么这两块业务逻辑代码不一样,但涉及到同一记录的查询和修改,所以两边都加同一把锁。
所以这里虽然是单机应用,但是就得用到分布式锁了。
因为环境中本来就有redis
,所以顺理成章的使用redission
来实现一个分布式锁,实现起来也比较简单方便。原理这里就不展开了。
后记
很多网友是不是有过这样的疑惑:
我在一个很low的平台,我在一个很low的项目里做一个CRUD boy。
我没有高并发大流量分布式的实际场景,怎么学习这些技术呢?
背了忘忘了背,没有实战经历,面试前背了不少八股文,一问深点就露馅。
屏幕前的你以为我在骂你?
其实我在自嘲而已。
我所从事的项目对于搞大数据来说,平台数据量还是真蛮大,但是对于web开发来说,真的不太友好。需求一天3变,最重要的是一个平台真没什么流量,就内部人员使用,如果有个几百上千人使用,说明系统真的做得很优秀。
在这样的一些个业务场景下,怎样不荒废web经历,最大限度提升技术能力呢?
以上只不过是我的一点点小小的努力罢了。
比如,前文小小的骗了一下大家。事实上,这其实只是一个半边缘化的产品。甲方客户也根本没怎么催我。
比这更悲伤的是,客户根据没怎么使用。
前面说到的功能,只要能实现功能就行。客户根据没有精力来关心效率问题。
后面的优化都只是我个人行为。
虽然小小的骗了一下大家,但优化是真的在做优化。
我自己当过面试官,更是求职者。我自己的感受,有的时候
八股文是不得已,因为简历上真的没有东西可写,可问。那怎么办呢?
至少,下次求职的时候,当面试官问到八股文高频面试题concurrentHashMap
的实现原理。我能结合这段优化的经历简单提提原理相通的部份,证明不是单纯的在背是吧?
来,干了这杯鸡汤!