在 Java 中,当我们需要执行异步操作时,往往会去创建一个新线程去执行,如下:
public class App {
public static void main( String[] args ) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + ":异步任务");
}).start();
}
}
关于更多的线程池信息,请查看:
Spring 3.0 之 后提供了一个 @Async
注解,使用 @Async
注解进行优雅的异步调用。
其实,@Async
注解本质上还是通过线程池创建线程去异步执行任务
在启动类开启启用异步调用,同时注入 ApplicationRunner
对象在启动类进行调用测试(没有使用单元测试类)。
使用 @Async
注解步骤:
App:
启动类(测试类)
@MapperScan("com.zzc.mapper")
@SpringBootApplication
public class App {
@Autowired
private AsyncService asyncService;
public static void main( String[] args ) {
SpringApplication.run(App.class, args);
}
@Bean
public ApplicationRunner applicationRunner() {
return applicationArguments -> {
long startTime = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + ":开始调用异步业务");
// 无返回值
asyncService.asyncTask();
// 有返回值--主线程未使用到
//Future<String> asyncTask = asyncService.asyncTask("666");
// 有返回值--主线程使用到
//System.out.println(Thread.currentThread().getName() + ":返回值:" + asyncTask.get());
// 模拟事务异常回滚
//asyncService.asyncTaskForTransaction(true);
long endTime = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + ":调用异步业务结束,耗时:" + (endTime - startTime));
};
}
}
AsyncServiceImpl
:业务处理类
@EnableAsync
@Service
public class AsyncServiceImpl implements AsyncService {
@Autowired
private UserMapper userMapper;
@Async
@Override
public void asyncTask() {
long startTime = System.currentTimeMillis();
try {
//模拟耗时
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
long endTime = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + ":void asyncTask(),耗时:" + (endTime - startTime));
}
@Async
@Override
public Future<String> asyncTask(String s) {
long startTime = System.currentTimeMillis();
try {
//模拟耗时
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
long endTime = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + ":Future<String> asyncTask(String s),耗时:" + (endTime - startTime));
return AsyncResult.forValue(s);
}
@Async
@Override
@Transactional
public void asyncTaskForTransaction(Boolean exFlg) {
TabUser tabUser = new TabUser();
tabUser.setId("111111");
tabUser.setUserName("zzc");
tabUser.setPhoto("12");
userMapper.addUser(tabUser);
if (exFlg) {
throw new RuntimeException("模拟异常");
}
}
}
@MapperScan("com.zzc.mapper")
@SpringBootApplication
public class App {
@Autowired
private AsyncService asyncService;
public static void main( String[] args ) {
SpringApplication.run(App.class, args);
}
@Bean
public ApplicationRunner applicationRunner() {
return applicationArguments -> {
long startTime = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + ":开始调用异步业务");
asyncService.asyncTask();
long endTime = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + ":调用异步业务结束,耗时:" + (endTime - startTime));
};
}
}
运行代码后:
@Bean
public ApplicationRunner applicationRunner() {
return applicationArguments -> {
long startTime = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + ":开始调用异步业务");
Future<String> asyncTask = asyncService.asyncTask("666");
long endTime = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + ":调用异步业务结束,耗时:" + (endTime - startTime));
};
}
运行结果如下:
@Bean
public ApplicationRunner applicationRunner() {
return applicationArguments -> {
long startTime = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + ":开始调用异步业务");
Future<String> asyncTask = asyncService.asyncTask("666");
System.out.println(Thread.currentThread().getName() + ":返回值:" + asyncTask.get());
long endTime = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + ":调用异步业务结束,耗时:" + (endTime - startTime));
};
}
运行结果如下:
有返回值的情况下,虽然异步业务逻辑是由子线程执行,但如果在主线程操作返回值对象,主线程会等待,还是顺序执行
为了方便观察、测试,我们在配置文件中将日志级别设置成debug
logging:
level:
root: debug
事务正常提交是没有问题的,数据可以添加到数据库(通过 Mybatis),所以,这里只演示事务异常情况。
@Bean
public ApplicationRunner applicationRunner() {
return applicationArguments -> {
long startTime = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + ":开始调用异步业务");
asyncService.asyncTaskForTransaction(true);
long endTime = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + ":调用异步业务结束,耗时:" + (endTime - startTime));
};
}
运行结果如下:
@Async
底层原理:就是通过线程池创建一个线程,然后去执行业务逻辑。
@Async
注解会应用默认线程池 SimpleAsyncTaskExecutor
。
默认线程池的弊端
在线程池应用中,参考阿里巴巴 Java 开发规范:程池不允许使用 Executors
去创建,不允许使用系统默认的线程池,推荐通过 ThreadPoolExecutor
的方式,这样的处理方式让开发的工程师更加明确线程池的运行规则,规避资源耗尽的风险。
Executors 各个方法的弊端:
newFixedThreadPool
和 newSingleThreadExecutor
:主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOMnewCachedThreadPool
和 newScheduledThreadPool
:要问题是线程数最大数是 Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM@Async
默认异步配置使用的是 SimpleAsyncTaskExecutor
,该线程池默认来一个任务创建一个线程,若系统中不断的创建线程,最终会导致系统占用内存过高,引发 OutOfMemoryError
错误。针对线程创建问题,SimpleAsyncTaskExecutor
提供了限流机制,通过 concurrencyLimit
属性来控制开关,当 concurrencyLimit>=0
时开启限流机制,默认关闭限流机制,即 concurrencyLimit=-1
,当关闭情况下,会不断创建新的线程来处理任务。基于默认配置,SimpleAsyncTaskExecutor
并不是严格意义的线程池,达不到线程复用的功能
自定义线程池
新建一个配置类:
@Configuration
public class AsyncThreadPoolConfig {
private static final int MAX_POOL_SIZE = 50;
private static final int CORE_POOL_SIZE = 20;
private static final int TASK_NUM = 200;
private static final int ACTIVE_TIME = 60;
@Bean("asyncTaskExecutor")
public AsyncTaskExecutor asyncTaskExecutor() {
ThreadPoolTaskExecutor asyncTaskExecutor = new ThreadPoolTaskExecutor();
asyncTaskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
asyncTaskExecutor.setCorePoolSize(CORE_POOL_SIZE);
asyncTaskExecutor.setQueueCapacity(TASK_NUM);
asyncTaskExecutor.setKeepAliveSeconds(ACTIVE_TIME);
asyncTaskExecutor.setThreadNamePrefix("async-task-thread-pool");
asyncTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
asyncTaskExecutor.initialize();
return asyncTaskExecutor;
}
}
上面我们通过使用 ThreadPoolTaskExecutor
创建了一个线程池,同时设置了以下这些参数:
CallerRunsPolicy
策略,当线程池没有处理能力的时候,该策略会直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务使用线程池
在定义了线程池之后,我们如何让异步调用的执行任务使用这个线程池中的资源来运行呢?方法非常简单,我们只需要在 @Async 注解中指定线程池名即可。当然,也可以不用指定,比如:
@Async("asyncTaskExecutor") // @Async
@Override
public void asyncTask() {
long startTime = System.currentTimeMillis();
try {
//模拟耗时
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
long endTime = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + ":void asyncTask(),耗时:" + (endTime - startTime));
}
测试线程池
@Bean
public ApplicationRunner applicationRunner() {
return applicationArguments -> {
long startTime = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + ":开始调用异步业务");
asyncService.asyncTask();
long endTime = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + ":调用异步业务结束,耗时:" + (endTime - startTime));
};
}
运行结果如下:
【参考资料】
因篇幅问题不能全部显示,请点此查看更多更全内容