序言

上一节我们学习了异步查询转同步的 7 种实现方式,今天我们就来学习一下,如何对其进行封装,使其成为一个更加便于使用的工具。

思维导图如下:

异步转同步字节码增强

拓展阅读

java 手写并发框架(一)异步查询转同步的 7 种实现方式

java 手写并发框架(二)异步转同步框架封装锁策略

java 手写并发框架(三)异步转同步框架注解和字节码增强

异步转同步的锁调用

同步锁策略

  • wait & notify

  • 使用条件锁

  • 使用 CountDownLatch

  • 使用 CyclicBarrier

上一节我们已经对上面的 4 种实现方式进行了详细的介绍,没有看过的同学可以去简单回顾一下。

但是对于调用的方式还没有实现。本文将通过注解结合字节码增强技术来实现这个特性。

实现思路

其实整体的实现类似于 spring 的 @Retry 注解。

(1)在锁同步方法上使用对应的注解 @Sync

(2)在异步回调方法上使用注解 @SyncCallback

(3)通过字节码增强调用方法,拥有上述注解的方法进行对应的加解锁实现

注解的定义

同步加锁

@Sync 同步注解可以放在希望同步等待的方法上。

package com.github.houbb.sync.api.annotation;

import com.github.houbb.sync.api.constant.LockType;

import java.lang.annotation.*;
import java.util.concurrent.TimeUnit;

/**
 * 异步转同步注解
 *
 * (1)根据标识匹配结果
 * (2)直接简单的返回结果
 *
 * @author binbin.hou
 * @since 0.0.1
 */
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Sync {

    /**
     * 超时时间
     *
     * 单位 mills
     * @return 时间
     * @since 0.0.1
     */
    long timeout() default 60 * 1000L;

    /**
     * 时间单位
     * @return 时间单位
     * @since 0.0.1
     */
    TimeUnit timeUnit() default TimeUnit.MILLISECONDS;

    /**
     * 等待策略
     *
     * (1)基于 countDownLatch 等待
     * (2)基于查询结果等等待
     *
     * 不同等待策略,结果封装对用户不可见。
     * @return 等待
     * @since 0.0.1
     */
    LockType lock() default LockType.COUNT_DOWN_LATCH;

}

所有的属性都有默认值,默认为 60s 超时。

默认的加锁策略是 LockType.COUNT_DOWN_LATCH

异步解锁

package com.github.houbb.sync.api.annotation;

import java.lang.annotation.*;

/**
 * 异步转同步注解
 *
 * (1)根据标识匹配结果
 * (2)直接简单的返回结果
 *
 * @author binbin.hou
 * @since 0.0.1
 */
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface SyncCallback {

    /**
     * 对应的方法名称
     *
     * @return 方法名称
     * @since 0.0.1
     */
    String value();

}

这里异步回调解锁只有一个属性,那就是加锁对应的方法名称。

这里我们不做过多的封装,保证 @Sync 注解的方法名称在当前类唯一即可,这个设计理念和 spring 的 @Retry 是一样的。

@SyncCallback 可以根据方法名称,找到对应的 @Sync 注解信息。

核心同步类

接口定义

package com.github.houbb.sync.api.api;

/**
 * @author binbin.hou
 * @since 0.0.1
 */
public interface ISync {

    /**
     * 同步
     * @param context 上下文
     * @return 方法执行结果
     * @since 0.0.1
     * @throws Throwable 异常
     */
    Object sync(final ISyncContext context) throws Throwable;

}

接口定义非常简单,只有一个方法,通过上下文返回对应的值。

上下文定义如下:

package com.github.houbb.sync.api.api;

import com.github.houbb.sync.api.annotation.Sync;
import com.github.houbb.sync.api.annotation.SyncCallback;

import java.lang.reflect.Method;

/**
 * sync 上下文
 *
 * @author binbin.hou
 * @since 0.0.1
 */
public interface ISyncContext {

    /**
     * 同步注解信息
     * @return 同步注解
     * @since 0.0.1
     */
    Sync sync();

    /**
     * 注解回调
     * @return 回调
     * @since 0.0.1
     */
    SyncCallback callback();

    /**
     * 参数信息
     * @return 参数信息
     * @since 0.0.7
     */
    Object[] params();

    /**
     * 方法信息
     * @return 方法信息
     * @since 0.0.7
     */
    Method method();

    /**
     * 方法执行
     * @return 执行
     * @since 0.0.1
     * @throws Throwable 异常信息
     */
    Object process() throws Throwable;

}

这里就有我们开头定义的注解信息,还有一些字节码增强相关的信息,比如方法,参数等等。

同步实现

核心代码

同步实现的核心代码如下:

package com.github.houbb.sync.core.core;

import com.github.houbb.heaven.util.lang.ObjectUtil;
import com.github.houbb.heaven.util.lang.reflect.ClassUtil;
import com.github.houbb.sync.api.annotation.Sync;
import com.github.houbb.sync.api.annotation.SyncCallback;
import com.github.houbb.sync.api.api.*;
import com.github.houbb.sync.api.exception.SyncRuntimeException;
import com.github.houbb.sync.core.support.context.SyncLockContext;
import com.github.houbb.sync.core.support.context.SyncUnLockContext;
import com.github.houbb.sync.core.support.lock.Locks;

import java.lang.reflect.Method;

/**
 * @author binbin.hou
 * @since 0.0.1
 */
public class SimpleSync implements ISync {

    @Override
    public Object sync(ISyncContext context) throws Throwable {
        //1. 判断是 sync 还是 callback
        Sync sync = context.sync();
        if(ObjectUtil.isNotNull(sync)) {
            ISyncLock syncLock = Locks.getLock(sync.lock());
            ISyncLockContext lockContext = SyncLockContext.newInstance()
                    .timeout(sync.timeout())
                    .timeUnit(sync.timeUnit());

            // 方法执行完成之前加锁
            syncLock.lock(lockContext);
            return context.process();
        }

        SyncCallback callback = context.callback();
        if(ObjectUtil.isNotNull(callback)) {
            Sync matchSync = getMatchSync(context.method(), callback);
            ISyncLock syncLock = Locks.getLock(matchSync.lock());
            ISyncUnlockContext unlockContext = SyncUnLockContext.newInstance()
                    .timeout(matchSync.timeout())
                    .timeUnit(matchSync.timeUnit());

            Object result = context.process();
            // 方法执行完成之后解锁
            syncLock.unlock(unlockContext);
            return result;
        }

        return context.process();
    }

}

根据上下文中的注解信息,判断是执行加锁,还是解锁。

那加/解锁与方法执行的先后顺序呢?

机智如你不妨花一分钟想一想这个简单的问题。

60

59

01

注意加锁解锁的顺序,方法执行完成之前加锁,方法执行完成之后解锁。

如果是解锁,则需要获取对应的 @Sync 注解。方法试下如下:

获取匹配的回调信息

/**
 * 获取匹配的回调信息
 * @param method 回调方法
 * @param callback 回调方法注解信息
 * @return 结果
 * @since 0.0.1
 */
private Sync getMatchSync(final Method method,
                          final SyncCallback callback) {
    String value = callback.value();
    Class<?> clazz = method.getDeclaringClass();
    Method syncMethod = ClassUtil.getMethod(clazz, value);
    Sync sync = syncMethod.getAnnotation(Sync.class);
    if(ObjectUtil.isNull(sync)) {
        throw new SyncRuntimeException(value + " 方法未指定 @Sync 注解信息!");
    }
    return sync;
}

这里会根据 @SyncCallback 注解中指定的方法名称,遍历当前类的方法,返回匹配的信息。

如果方法不存在,或者方法没有指定 @Sync 都会进行报错。

字节码增强

字节码增强的方式

万事俱备只欠东风。

下面我们只需要使用字节码增强技术,就可以把整个调用链路串联起来。

这里我们使用代理模式,对所有的方法进行增强。

主要有三类:

(1)不需要代理的

(2)有接口的方法,可以通过 jdk 动态代理

(3)没有接口的方法,我们通过 cglib 进行代码增强。

增强的方法

我们为 Sync 同步实现添加了一个引导类,便于代理中调用,实际上就是调用上面的方法,实现也非常简单:

package com.github.houbb.sync.core.bs;

import com.github.houbb.sync.api.api.ISync;
import com.github.houbb.sync.api.api.ISyncContext;
import com.github.houbb.sync.core.core.SimpleSync;

/**
 * 引导类
 * @author binbin.hou
 * @since 0.0.1
 */
public final class SyncBs {

    private SyncBs(){}

    /**
     * 同步实现
     * @since 0.0.1
     */
    private final ISync sync = new SimpleSync();

    /**
     * 同步上下文
     * @since 0.0.1
     */
    private ISyncContext syncContext;

    /**
     * 新建对象实例
     * @return 实例
     * @since 0.0.1
     */
    public static SyncBs newInstance() {
        return new SyncBs();
    }

    public SyncBs syncContext(ISyncContext syncContext) {
        this.syncContext = syncContext;
        return this;
    }

    public Object execute() throws Throwable {
        return this.sync.sync(syncContext);
    }

}

代理实现

接口定义

我们为上述三种情况定义统一的接口:

package com.github.houbb.sync.core.support.proxy;

/**
 * 代理接口
 * @author binbin.hou
 * @since 0.0.1
 */
public interface ISyncProxy {

    /**
     * 获取代理实现
     * @return 代理
     * @since 0.0.6
     */
    Object proxy();

}

不需要代理

/*
 * Copyright (c)  2019. houbinbin Inc.
 * async All rights reserved.
 */

package com.github.houbb.sync.core.support.proxy.none;

import com.github.houbb.sync.core.support.proxy.ISyncProxy;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;

/**
 * <p> 没有代理 </p>
 *
 * <pre> Created: 2019/3/5 10:23 PM  </pre>
 * <pre> Project: async  </pre>
 *
 * @author houbinbin
 * @since 0.0.1
 */
public class NoneProxy implements InvocationHandler, ISyncProxy {

    /**
     * 代理对象
     */
    private final Object target;

    public NoneProxy(Object target) {
        this.target = target;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        return method.invoke(proxy, args);
    }

    /**
     * 返回原始对象,没有代理
     * @return 原始对象
     */
    @Override
    public Object proxy() {
        return this.target;
    }

}

这种比较简单,直接反射执行原来的方法即可。

动态代理

/*
 * Copyright (c)  2019. houbinbin Inc.
 * async All rights reserved.
 */

package com.github.houbb.sync.core.support.proxy.dynamic;

import com.github.houbb.sync.api.api.ISyncContext;
import com.github.houbb.sync.core.bs.SyncBs;
import com.github.houbb.sync.core.core.SimpleSyncContext;
import com.github.houbb.sync.core.support.proxy.ISyncProxy;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.concurrent.CompletionService;

/**
 * <p> 动态代理 </p>
 *
 * <pre> Created: 2019/3/5 10:23 PM  </pre>
 * <pre> Project: sync  </pre>
 *
 * @author houbinbin
 * @since 0.0.1
 */
public class DynamicProxy implements InvocationHandler, ISyncProxy {

    /**
     * 被代理的对象
     */
    private final Object target;

    public DynamicProxy(Object target) {
        this.target = target;
    }

    /**
     * 这种方式虽然实现了异步执行,但是存在一个缺陷:
     * 强制用户返回值为 Future 的子类。
     *
     * 如何实现不影响原来的值,要怎么实现呢?
     * @param proxy 原始对象
     * @param method 方法
     * @param args 入参
     * @return 结果
     * @throws Throwable 异常
     */
    @Override
    @SuppressWarnings("all")
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        ISyncContext context = SimpleSyncContext.newInstance()
                .method(method).params(args).target(target);
        return SyncBs.newInstance().syncContext(context).execute();
    }

    @Override
    public Object proxy() {
        // 我们要代理哪个真实对象,就将该对象传进去,最后是通过该真实对象来调用其方法的
        InvocationHandler handler = new DynamicProxy(target);

        return Proxy.newProxyInstance(handler.getClass().getClassLoader(),
                target.getClass().getInterfaces(), handler);
    }

}

通过动态代理进行代码增强,SyncBs.newInstance().syncContext(context).execute(); 这里就是对于增强方法的执行。

cglib 字节码增强

package com.github.houbb.sync.core.support.proxy.cglib;

import com.github.houbb.sync.api.api.ISyncContext;
import com.github.houbb.sync.core.bs.SyncBs;
import com.github.houbb.sync.core.core.SimpleSyncContext;
import com.github.houbb.sync.core.support.proxy.ISyncProxy;
import net.sf.cglib.proxy.Enhancer;
import net.sf.cglib.proxy.MethodInterceptor;
import net.sf.cglib.proxy.MethodProxy;

import java.lang.reflect.Method;

/**
 * CGLIB 代理类
 * @author binbin.hou
 * date 2019/3/7
 * @since 0.0.1
 */
public class CglibProxy implements MethodInterceptor, ISyncProxy {

    /**
     * 被代理的对象
     */
    private final Object target;

    public CglibProxy(Object target) {
        this.target = target;
    }

    @Override
    public Object intercept(Object o, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
        ISyncContext context = SimpleSyncContext.newInstance()
                .method(method).params(args).target(target);
        return SyncBs.newInstance().syncContext(context).execute();
    }

    @Override
    public Object proxy() {
        Enhancer enhancer = new Enhancer();
        //目标对象类
        enhancer.setSuperclass(target.getClass());
        enhancer.setCallback(this);
        //通过字节码技术创建目标对象类的子类实例作为代理
        return enhancer.create();
    }

}

对于没有接口的方法,我们通过 cglib 进行代码增强,这点和 spring aop 是保持一致的。

代理的调用

我们把三种场景的代理都实现了,那么应该如何判断方法应该调用哪一个代理呢?

这里有一个基本是公用方法,如下:

/*
 * Copyright (c)  2019. houbinbin Inc.
 * async All rights reserved.
 */

package com.github.houbb.sync.core.support.proxy;

import com.github.houbb.heaven.util.lang.ObjectUtil;
import com.github.houbb.sync.core.support.proxy.cglib.CglibProxy;
import com.github.houbb.sync.core.support.proxy.dynamic.DynamicProxy;
import com.github.houbb.sync.core.support.proxy.none.NoneProxy;

import java.lang.reflect.Proxy;

/**
 * <p> 代理信息 </p>
 *
 * <pre> Created: 2019/3/8 10:38 AM  </pre>
 * <pre> Project: async  </pre>
 *
 * @author houbinbin
 * @since 0.0.1
 */
public final class SyncProxy {

    private SyncProxy(){}

    /**
     * 获取对象代理
     * @param <T> 泛型
     * @param object 对象代理
     * @return 代理信息
     * @since 0.0.6
     */
    @SuppressWarnings("all")
    public static <T> T getProxy(final T object) {
        if(ObjectUtil.isNull(object)) {
            return (T) new NoneProxy(object).proxy();
        }

        final Class clazz = object.getClass();

        // 如果targetClass本身是个接口或者targetClass是JDK Proxy生成的,则使用JDK动态代理。
        // 参考 spring 的 AOP 判断
        if (clazz.isInterface() || Proxy.isProxyClass(clazz)) {
            return (T) new DynamicProxy(object).proxy();
        }

        return (T) new CglibProxy(object).proxy();
    }

}

我们根据类本身,选择调用的代理类。

(1)空对象,不做增强

(2)接口或者代理类,使动态代理实现

(3)其他使用 cglib 进行字节码增强。

验证

春种秋收,我们辛苦忙活到现在,终于可以验证一下自己的成果了。

上述代码已经上传到 maven 仓库,实际可以直接调用。

maven 项目依赖

<dependency>
    <groupId>com.github.houbb</groupId>
    <artifactId>sync-core</artifactId>
    <version>0.0.1</version>
</dependency>

入门测试

业务代码

通过 @Sync 指定需要转同步的方法,通过 @SyncCallback 指定回调方法。

import com.github.houbb.sync.api.annotation.Sync;
import com.github.houbb.sync.api.annotation.SyncCallback;

/**
 * @author binbin.hou
 * @since 0.0.1
 */
public class UserService {

    String id = "";

    @Sync
    public String queryId() {
        System.out.println("开始查询");
        return id;
    }

    @SyncCallback(value = "queryId")
    public void queryIdCallback() {
        System.out.println("回调函数执行");
        id = "123";
    }

}

测试类

@Test
public void queryIdTest() {
    final UserService proxy = SyncProxy.getProxy(new UserService());

    // 异步执行回调
    new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                System.out.println("开始异步执行回调");
                TimeUnit.SECONDS.sleep(2);
                proxy.queryIdCallback();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }).start();

    String id = proxy.queryId();
    System.out.println("id: " + id);
}

测试日志

测试日志如下:

开始异步执行回调
[DEBUG] [2020-10-09 17:37:56.066] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.
[INFO] [2020-10-09 17:37:56.076] [main] [c.g.h.s.c.s.l.CountDownLatchLock.lock] - 进入等待,超时时间为:60000,超时单位:MILLISECONDS
回调函数执行
[INFO] [2020-10-09 17:37:58.019] [Thread-0] [c.g.h.s.c.s.l.CountDownLatchLock.unlock] - 执行 unlock 操作
[INFO] [2020-10-09 17:37:58.019] [main] [c.g.h.s.c.s.l.CountDownLatchLock.lock] - 等待结果: true
开始查询
id: 123

非常的完美。

小结

好了,到这里我们就把上一节中的遗留的问题全部解决了。

但是沉溺于 spring 多年的各位读者一定不满意,我也不满意。

因为我们都知道,结合 spring 之后,调用可以变得更加简单。

由于篇幅原因,spring 整合实现部分将放在下一节。感兴趣的可以关注一下我,便于实时接收最新内容。

觉得本文对你有帮助的话,欢迎点赞评论收藏转发一波。你的鼓励,是我最大的动力~

不知道你有哪些收获呢?或者有其他更多的想法,欢迎留言区和我一起讨论,期待与你的思考相遇。

文中如果链接失效,可以点击 {阅读原文}。