分布式锁&kafka事务提交等编码技巧

dbaplus社群

一、前言


在开发过程中,遇到了一些比较实用的编码技巧,故记录以加深印象。因为每个技巧的篇幅较短,故不做拆分,只合在一篇小文章中阐述。以下会涉及kafka的事务提交方法、redis分布式锁简化以及多key情况下应该怎么加锁、业务日志如何解耦。


二、kafka的事务提交方法


kafka我们常用于削峰填谷,以及系统间解耦等场景。这里我们会常遇到一种情况,就是上游系统在处理完成业务后,需要通知其它的系统,假如我们不考虑事务提交失败的情况下,就可以像下面这样写。但是假如出现网络异常或者数据库异常等情况,就会出现事务提交失败从而回滚,但是消息却已经发生给其它服务了,那么就会导致整条调用链的异常。


@Autowiredprivate KafkaTemplate kafkaTemplate; @Transactional(rollbackFor = Exception.class) public void saveServiceOrder(ServiceOrder serviceOrder){// do somethingNoticeListDTO notice =NoticeListDTO.builder().build();// 通知其它服务kafkaTemplate.send(TopicNameConstants.SERVICE_ORDER_CHANGE_NOTIFY, JSONObject.toJSONString(notice)); }


所以,我们可以进行进一步优化,就是将消息通知后置到事务提交后,这样系统的可靠度就会更高。我们增加一个kafka帮助类,如下:


@Component @Slf4j public class KafkaTemplateHelper {@Autowiredprivate KafkaTemplate kafkaTemplate;/*** 事务提交后发送kafka消息* @param topic* @param data* @param <T>*/public <T> void send(String topic, Object data) {// 是否开启事务判断if (TransactionSynchronizationManager.isSynchronizationActive()) {TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {@Overridepublic void afterCommit() {log.info("事务提交成功后发送消息:topic:{},data:{}",topic, JSONObject.toJSONString(data));kafkaTemplate.send(topic,data);}});} else {log.info("没有开启事务直接发送消息:topic:{},data:{}",topic, JSONObject.toJSONString(data));kafkaTemplate.send(topic,data);}} }


kafka调用如下,它就会保证在事务结束后再通知其它系统,同理,很多需要后置的操作也可以这么玩。其实kafka还有一套可靠性应用方案可以分享,待有空再写。


@Autowired private KafkaTemplateHelper kafkaTemplateHelper; @Transactional(rollbackFor = Exception.class) public void saveServiceOrder(ServiceOrder serviceOrder){// do somethingNoticeListDTO notice =NoticeListDTO.builder().build();// 通知a服务kafkaTemplateHelper.send(TopicNameConstants.SERVICE_ORDER_CHANGE_NOTIFY, JSONObject.toJSONString(notice)); }


三、redis分布式锁代码简化


我们使用redis分布式锁就离不开redission组件,举个栗子,我们一般在服务集群的情况下,为了保证并发不出现问题,会如下加锁,用一段字符串加上入参中的唯一编号(如用户id、订单编号等等)来保证接口幂等性(PS:redissonDistributedLocker只是redission的简单封装)。这样写很好,没有问题,但是我们不禁会想,好像每个创建、更新等业务操作都得给接口加这些重复代码,那么有没有更加优雅的方式呢,没错,我们要追求的就是极致的优雅。


@ApiOperation("服务单更新") @Transactional(rollbackFor = Exception.class) public ApiResult serviceOrderUpdate(@RequestBody@Validated ServiceOrder req){log.info("服务单更新:time={},params={}", System.currentTimeMillis(), JSONObject.toJSONString(req));String lockKey = "mh:scs:serviceOrderUpdate:"+req.getServiceOrderId();boolean lock = redissonDistributedLocker.tryLock(lockKey,0L,10L);AssertUtil.businessInvalid(!lock,"操作过于频繁,请稍后再试");try {// do somethingreturn ApiResult.success();}finally {redissonDistributedLocker.unlock(lockKey);} }


使用Aop为接口加锁,添加一个注解anno,并写个实现。


/*** 为方法加锁,处理完成再释放*/ @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.METHOD}) public @interface AopLock {/*** SpEL表达式,用于计算lockKey.*/String value();/*** 单位秒*/int waitTime() default 0;/*** 单位秒*/int leaseTime() default 6;int errorCode() default 2733;String errorMsg() default "操作过于频繁,请稍后再试"; } /*** 为方法加锁,处理完成再释放**/ @Slf4j @Aspect @Order(3) @ConditionalOnBean(RedissonClient.class) public class AopLockAspect {@Autowiredprivate RedissonClient redissonClient;@Value("${spring.application.name}")private String lockKeyPrefix;@Around("@annotation(common.aop.annos.AopLock)")public Object lock(ProceedingJoinPoint joinPoint) throws Throwable {Object[] args = joinPoint.getArgs();MethodSignature signature = (MethodSignature) joinPoint.getSignature();Method method = signature.getMethod();EvaluationContext context = initEvaluationContext(joinPoint);AopLock aopLock = method.getAnnotation(AopLock.class);String spEl = aopLock.value();String expressionValue = lockKeyPrefix + ":" + PARSER.parseExpression(spEl).getValue(context);RLock lock = redissonClient.getLock(expressionValue);try {boolean getterLock = lock.tryLock(aopLock.waitTime(), aopLock.leaseTime(), TimeUnit.SECONDS);if (!getterLock) {throw new ServiceException(aopLock.errorCode(), aopLock.errorMsg());}return joinPoint.proceed(args);} finally {try {lock.unlock();} catch (Exception e) {log.warn("unlock error:" + e.getMessage() + "," + e.getClass().getName());}}} }


那么我们的加锁就可以简单的加个@AopLock注解就可以了,是不是很棒呢


@ApiOperation("服务单更新") @AopLock(value="'mh:scs:serviceOrderUpdate:'+ #req.serviceOrderId",leaseTime = 60*30) @Transactional(rollbackFor = Exception.class) public ApiResult serviceOrderUpdate(@RequestBody@Validated ServiceOrder req){log.info("服务单更新:time={},params={}", System.currentTimeMillis(), JSONObject.toJSONString(req));// do somethingreturn ApiResult.success(); }


四、redission在多key情况下应该怎么加锁


上面的例子很好地将简单的分布式锁代码简化,但是我们会有一些场景是无法这样加锁的,比如一些批处理的场景,用户A批量操作了单据a、b、c,同一时间,用户B批量操作了单据b、c、d,这时bc单据就会有并发问题,在这种场景下,我们是不能简单地根据某个单据的订单编号进行加锁的,要思考换一种方式,如下:


    订单实体类


@Data public class UpdateServiceOrdersReq implements Serializable {private static final long serialVersionUID = 1L;@Validprivate List<ServiceOrder> serviceOrderList; }


    接口实现,对每个订单的id都加锁,假如有其中一个订单的锁获取失败的话则返回重试信息,在更新操作结束后释放所有的锁


@ApiOperation("批量更新服务单信息")@PostMapping("/xxxx/updateServiceOrders")public ResponseBean updateServiceOrders(@RequestBody @Validated UpdateServiceOrdersReq req) {List<String> redisKeys = new ArrayList<>();List<ServiceOrder> list = new ArrayList<>();for (ServiceOrder serviceOrder : list) {redisKeys.add("mh:scs:updateServiceOrders:" + serviceOrder.getServiceOrderId());}try {for (String redisKey : redisKeys) {boolean lock = redissonDistributedLocker.tryLock(redisKey, 5L, 30L);if(!lock){AssertUtil.businessInvalid("批量更新服务单获取锁失败,请稍后尝试!");}}ResponseBean responseBean = ResponseBean.success();// do somethingreturn responseBean;} catch (Exception ex){throw ex;} finally {redisKeys.forEach(redisKey->{try {redissonDistributedLocker.unlock(redisKey);} catch (Exception e) {log.error("updateServiceOrders:释放redis锁失败:{}", redisKey, e);}});}}


五、业务日志如何解耦


在写业务系统的过程中,我们难免要进行一些业务日志操作记录,这里就会涉及业务日志字符串的数据组装,比如产品要求记录车辆出发时间、更新日期时间巴拉巴拉之类的,但同时会存在一个问题,因为业务日志记录是非主业务流程操作(类似消息通知之类的),故不可因为复杂的日志数据拼接去影响接口的响应速度,从而影响用户体验;这里就要思考如何解耦的问题。我思考了两种场景下的处理方案,可以分享出来给大家分享


情况一:假如只是简单的进行文字记录,我们可以使用线程池的方式去对日志记录进行解耦


使用线程池创建其它线程进行日志操作,这样就不会影响到主线程了。


/** * @author ppz * @date 2022年04月12日 17:00 * @Description 服务单日志操作工具类 */public class ServiceOrderLogUtils {private static ScsServiceOrderLogService logService = ApplicationContextUtils.getBean(ScsServiceOrderLogService.class);private static int corePoolSize = Runtime.getRuntime().availableProcessors();private static ExecutorService executor = new ThreadPoolExecutor(corePoolSize,corePoolSize*2,10L,TimeUnit.SECONDS,new LinkedBlockingQueue<>(Integer.MAX_VALUE),new ThreadPoolExecutor.CallerRunsPolicy());private ServiceOrderLogUtils() {throw new IllegalStateException("Utility class");}/*** 日志公共线程池中执行线程* @param runnable 可运行对象*/public static void execute(Runnable runnable) {executor.execute(runnable);}/*** 保存订单操作日志* @param serviceOrderId订单编号* @param operType日志操作类型* @param operContent操作内容* @return: void*/public static void saveLog(Long serviceOrderId, OperTypeEnum operType, String operContent){saveLog(createLog(serviceOrderId, operType, operContent));}/*** 保存订单操作日志* @param serviceOrderId订单编号* @param operType日志操作类型* @param operContent操作内容* @param operUserId操作人登录名* @param operUserName 操作人名称* @return: void*/public static void saveLog(Long serviceOrderId, OperTypeEnum operType, String operContent, Integer operUserId, String operUserName){saveLog(createLog(serviceOrderId, operType, operContent, operUserId, operUserName));}public static ScsServiceOrderLog createLog(Long serviceOrderId, OperTypeEnum operType, String operContent) {AuthUser userInfo = WebUtils.getCurrentUser();return createLog(serviceOrderId, operType, operContent, StringUtil.toInt(userInfo.getLoginName()), userInfo.getName());}/*** 封装订单日志实体* @param serviceOrderId* @param operType* @param operContent* @param operUserId* @param operUserName* @return: ScsServiceOrderLog*/public static ScsServiceOrderLog createLog(Long serviceOrderId, OperTypeEnum operType, String operContent, Integer operUserId, String operUserName){ScsServiceOrderLog log = new ScsServiceOrderLog();log.setServiceOrderId(serviceOrderId);log.setOperContent(operContent);log.setOperType(operType.getCode());log.setOperatorId(operUserId);log.setOperatorName(operUserName);return log;}/*** 保存订单操作日志* @param log日志对象* @return: void*/public static void saveLog(ScsServiceOrderLog log){List<ScsServiceOrderLog> list = Lists.newArrayList();list.add(log);saveLog(list);}/*** 批量保存订单操作日志* @param list* @return: void*/public static void saveLog(List<ScsServiceOrderLog> list){if(CollectionUtils.isEmpty(list)) {return;}Date now = new Date();for(ScsServiceOrderLog log : list) {if(log.getOperatorTime() == null) {log.setOperatorTime(now);}if(StrUtil.length(log.getOperContent()) > 512) {log.setOperContent(StrUtil.subWithLength(log.getOperContent(), 0, 512));}}if(!list.isEmpty()) {execute(new SaveLogThread(list));}}/*** 订单日志保存线程* @author: xiecy* @date:2019年4月29日 下午12:03:35*/static class SaveLogThread implements Runnable {private List<ScsServiceOrderLog> list = null;public SaveLogThread(List<ScsServiceOrderLog> list) {super();this.list = list;}@Overridepublic void run() {if(list != null && !list.isEmpty()) {logService.batchInsert(list);}}}/*** 同步批量保存日志* @param list* @return: void*/public static void saveLogSync(List<ScsServiceOrderLog> list){if(list.isEmpty()) {return;}Date now = new Date();AuthUser userInfo = WebUtils.getCurrentUser();for(ScsServiceOrderLog log : list) {if(log.getOperatorTime() == null) {log.setOperatorTime(now);}if(log.getOperatorId() == null && userInfo!=null) {log.setOperatorId(StringUtil.toInt(userInfo.getLoginName()));log.setOperatorName(userInfo.getName());}if(StrUtil.length(log.getOperContent()) > 512) {log.setOperContent(StrUtil.subWithLength(log.getOperContent(), 0, 512));}}if(list != null && !list.isEmpty()) {logService.batchInsert(list);}}}


业务代码中进行使用:


@Transactional(rollbackFor = Exception.class)public boolean updateShippingDemandStatus(UpdateShippingDemandStatusReq req) {// todo somethingServiceOrderLogUtils.saveLog(serviceOrderId, OperTypeEnum.CANCEL_SHIPPING_DEMAND,"用户取消运输需求");}


情况二:假如日志记录需要对数据进行复杂组件的话,可以把使用到的数据组装到一个实体,然后通过发送给kafka或者redis进行解耦,在另外的线程中进行数据组装,具体就不展示了。


作者丨撸猫的代码

cn/post/7112237932815056932#heading-5

dbaplus社群欢迎广大技术人员投稿,投稿邮箱:editor@dbaplus.cn


更多精彩内容

11月26日14:00-17:00,dbaplus社群携手光大银行,围绕“光大银行大数据领域建设与应用探索”这一主题开展线上直播分享,针对实时数据架构、高性能查询、安全认证权限控制等内容进行深度探讨,促进更多金融企业借力大数据实现数字化转型。

直播地址:cn/5v59i

关于我们

dbaplus社群是围绕Database、BigData、AIOps的企业级专业社群。资深大咖、技术干货,每天精品原创文章推送,每周线上技术分享,每月线下技术沙龙,每季度Gdevops&DAMS行业大会。

关注公众号【dbaplus社群】,获取更多原创技术文章和精选工具下载

版权声明:分布式锁&kafka事务提交等编码技巧内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,请联系 删除。本文链接:https://www.qi520.com/n/16677.html