package org.example.thread;

public class Test {

public static void main(String[] args) {

AsyncProcessExecutor normalExecutor = AsyncProcessExecutorFactory.createNormalExecutor();

normalExecutor.put(() -> {

System.out.println("123");

test();

});

normalExecutor.execute();

System.out.println("00000000000000000000000000000");

}

private static void test(){

try {

Thread.sleep(1000);

System.out.println("test...............");

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

线程池代码

package org.example.thread;

/**

* 异步进程 执行器

*

* @date 2021年7月15日13:43:37

*/

public interface AsyncProcessExecutor {

/**

* 存放任务

*

* @param task 任务

* @return AsyncProcessExecutor

*/

AsyncProcessExecutor put(final Runnable task);

/**

* 执行

*

* @return boolean

*/

boolean execute();

}

package org.example.thread;

import cn.hutool.core.collection.CollUtil;

import com.google.common.collect.Maps;

import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;

import java.util.List;

import java.util.Map;

/**

* 多线程锁执行器 正常处理

*

* @date 2020-12-10 10:36

*/

@Slf4j

public class AsyncProcessExecutorByNormal implements AsyncProcessExecutor {

/**

* 线程池字典

*/

private static final Map EXECUTOR_MAP = Maps.newConcurrentMap();

/**

* 线程Key

*/

private final String key;

/**

* 任务队列

*/

private final List taskList;

/**

* 执行器

*/

private final AsyncProcessor processor;

/**

* 构造函数

*/

public AsyncProcessExecutorByNormal() {

this.key = "def";

taskList = new ArrayList<>();

processor = AsyncProcessExecutorByNormal.getProcessor(this.key);

}

/**

* 构造函数

*

* @param key 线程池唯一Key

*/

public AsyncProcessExecutorByNormal(String key) {

this.key = key;

taskList = new ArrayList<>();

processor = AsyncProcessExecutorByNormal.getProcessor(this.key);

}

/**

* 获得执行器

*

* @param key Key

* @return AsyncProcessor

*/

private synchronized static AsyncProcessor getProcessor(String key) {

AsyncProcessor asyncProcessor = EXECUTOR_MAP.get(key);

if (null == asyncProcessor) {

asyncProcessor = new AsyncProcessor();

asyncProcessor.init(key);

EXECUTOR_MAP.put(key, asyncProcessor);

}

return asyncProcessor;

}

/**

* 执行

*

* @param task 任务

*/

@Override

public AsyncProcessExecutorByNormal put(final Runnable task) {

taskList.add(task);

return this;

}

// ====================================

/**

* 执行 线程锁 等待查询结果 结果完成后继续执行

*

* @return boolean 最终直接结果

*/

@Override

public boolean execute() {

if (CollUtil.isEmpty(this.taskList)) {

return true;

}

for (Runnable task : this.taskList) {

// 多线程执行任务

this.execute(task);

}

// 返回执行结果

return true;

}

/**

* 执行指定的任务

*

* @param task 任务

* @return boolean

*/

private boolean execute(final Runnable task) {

return processor.executeTask(task);

}

}

package org.example.thread;

import cn.hutool.core.collection.CollUtil;

import com.google.common.collect.Maps;

import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;

import java.util.List;

import java.util.Map;

import java.util.concurrent.Callable;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.Executors;

import java.util.concurrent.atomic.AtomicInteger;

/**

* 多线程锁执行器

* 用于当前方法中复杂业务多线程处理,等待线程执行完毕后 获得统一结果

* 2021年11月2日14:07:54 重构 多线程异步等待执行器

*

* @author 周鹏程

* @date 2020-12-10 10:36

*/

@Slf4j

public class AsyncProcessExecutorByWait implements AsyncProcessExecutor {

/**

* 线程池字典

*/

private static final Map EXECUTOR_MAP = Maps.newConcurrentMap();

/**

* 线程Key

*/

private final String key;

/**

* 任务队列

*/

private final List> taskList;

/**

* 执行器

*/

private final AsyncProcessor processor;

/**

* 任务执行计数器

*/

private AtomicInteger count;

/**

* 构造函数

*/

public AsyncProcessExecutorByWait() {

this.key = "def";

taskList = new ArrayList<>();

processor = getProcessor(this.key);

}

/**

* 构造函数

*

* @param key 线程池唯一Key

*/

public AsyncProcessExecutorByWait(String key) {

this.key = key;

taskList = new ArrayList<>();

processor = getProcessor(this.key);

}

/**

* 获得执行器

*

* @param key Key

* @return AsyncProcessor

*/

private synchronized static AsyncProcessor getProcessor(String key) {

AsyncProcessor asyncProcessor = EXECUTOR_MAP.get(key);

if (null == asyncProcessor) {

asyncProcessor = new AsyncProcessor();

asyncProcessor.init(key);

EXECUTOR_MAP.put(key, asyncProcessor);

}

return asyncProcessor;

}

/**

* 放入执行任务

* 特殊处理 Runnable 转换为 Callable

*

* @param task 任务

*/

@Override

public AsyncProcessExecutor put(final Runnable task) {

taskList.add(Executors.callable(task));

return this;

}

/**

* 执行 线程锁 等待查询结果 结果完成后继续执行

*/

@Override

public boolean execute() {

if (CollUtil.isEmpty(this.taskList)) {

return true;

}

// 初始化锁参数

count = new AtomicInteger(this.taskList.size());

// 门闩 线程锁

CountDownLatch latch = new CountDownLatch(this.taskList.size());

for (Callable task : this.taskList) {

// 回调减 门闩

processor.executeTaskAndCallback(task, (result) -> {

if (result.getSuccess()) {

count.decrementAndGet();

}

latch.countDown();

return null;

});

}

// 线程锁 等待查询结果 结果完成后继续执行

try {

latch.await();

} catch (Exception e) {

log.error(e.getMessage(), e);

} finally {

this.taskList.clear();

}

// 返回执行结果

return count.get() == 0;

}

}

package org.example.thread;

/**

* 异步进程 执行器 工厂

*

* @date 2021年7月15日13:43:37

*/

public final class AsyncProcessExecutorFactory {

private AsyncProcessExecutorFactory() {

}

/**

* 创建等待执行器

*

* @return AsyncProcessExecutor

*/

public static AsyncProcessExecutor createWaitExecutor() {

return new AsyncProcessExecutorByWait();

}

/**

* 创建等待执行器

*

* @param key KEY

* @return AsyncProcessExecutor

*/

public static AsyncProcessExecutor createWaitExecutor(String key) {

return new AsyncProcessExecutorByWait(key);

}

/**

* 创建正常执行器

*

* @return AsyncProcessExecutor

*/

public static AsyncProcessExecutor createNormalExecutor() {

return new AsyncProcessExecutorByNormal();

}

// =====================

/**

* 创建正常执行器

*

* @param key KEY

* @return AsyncProcessExecutor

*/

public static AsyncProcessExecutor createNormalExecutor(String key) {

return new AsyncProcessExecutorByNormal(key);

}

}

package org.example.thread;

import cn.hutool.core.util.StrUtil;

import com.google.common.util.concurrent.*;

import lombok.Data;

import lombok.extern.slf4j.Slf4j;

import org.apache.commons.lang3.StringUtils;

import java.util.concurrent.Callable;

import java.util.concurrent.RejectedExecutionException;

import java.util.concurrent.TimeUnit;

import java.util.function.Function;

/**

* 自定义线程执行器 - 等待线程执行完毕不拒绝

*

* @author 周鹏程

* @date 2020-10-08 10:24

*/

@Slf4j

public class AsyncProcessor {

/**

* 线程池名称格式

*/

private static final String THREAD_POOL_NAME = "AsyncProcessorWaitPool-{}-%d";

/**

* 默认线程池关闭等待时间 秒

*/

private static final int DEFAULT_WAIT_TIME = 10;

/**

* 线程池监听执行器

*/

private ListeningExecutorService execute;

/**

* 初始化

*

* @param key 线程池标识

*/

public void init(String key) {

if (StringUtils.isBlank(key)) {

return;

}

// 线程工厂名称

String formatThreadPoolName = StrUtil.format(THREAD_POOL_NAME, key);

// 创建 Executor

// 此处默认最大值改为处理器数量的 4 倍

try {

// 监听执行器

execute = MoreExecutors.listeningDecorator(

ThreadPoolFactory.createDefThreadPool(formatThreadPoolName));

// 这里不会自动关闭线程, 当线程超过阈值时 抛异常

// 关闭事件的挂钩

Runtime.getRuntime().addShutdownHook(new Thread(() -> {

log.info("ProcessorWait 异步处理器关闭");

execute.shutdown();

try {

// 等待1秒执行关闭

if (!execute.awaitTermination(DEFAULT_WAIT_TIME, TimeUnit.SECONDS)) {

log.error("ProcessorWait 由于等待超时,异步处理器立即关闭");

execute.shutdownNow();

}

} catch (InterruptedException e) {

log.error("ProcessorWait 异步处理器关闭中断");

execute.shutdownNow();

}

log.info("ProcessorWait 异步处理器关闭完成");

}));

} catch (Exception e) {

log.error("ProcessorWait 异步处理器初始化错误", e);

throw new ExceptionInInitializerError(e);

}

}

/**

* 执行任务,不管是否成功

* 其实也就是包装以后的 {@link } 方法

*

* @param task 任务

* @return boolean

*/

public boolean executeTask(Runnable task) {

try {

execute.execute(task);

} catch (RejectedExecutionException e) {

log.error("AsyncProcessorWait 执行任务被拒绝", e);

return false;

}

return true;

}

/**

* 提交任务,并可以在稍后获取其执行情况

* 当提交失败时,会抛出 {@link }

*

* @param task 任务

*/

public void executeTaskAndCallback(Callable task, Function, Void> callback) {

ListenableFuture future = execute.submit(task);

Futures.addCallback(future, new FutureCallback() {

@Override

public void onSuccess(T result) {

CallbackResult callbackResult = new CallbackResult<>();

callbackResult.setSuccess(true);

callbackResult.setResult(result);

// 线程池失败后 返回该 Runnable

callback.apply(callbackResult);

}

@Override

public void onFailure(Throwable t) {

log.error("线程名称:{} - 执行异常信息:{}", Thread.currentThread().getName(), t.getMessage());

CallbackResult callbackResult = new CallbackResult<>();

callbackResult.setSuccess(false);

callback.apply(callbackResult);

}

}, execute);

}

// =================

/**

* 回调结果

*

* @param

*/

@Data

public static class CallbackResult {

/**

* 状态

*/

private Boolean success;

/**

* 结果

*/

private T result;

}

}

package org.example.thread;

import cn.hutool.core.thread.ThreadUtil;

import cn.hutool.core.util.StrUtil;

import com.google.common.collect.Maps;

import lombok.extern.slf4j.Slf4j;

import java.util.Map;

import java.util.concurrent.ExecutorService;

/**

* 单线程池

*

* @author 周鹏程

* @date 2021/8/27 17:00

*/

@Slf4j

public final class SyncProcessSingleExecutor {

private static final Map EXECUTOR_MAP = Maps.newConcurrentMap();

private static final String KEY = "def";

private SyncProcessSingleExecutor() {

}

/**

* 执行器

*

* @param r 任务

*/

public static synchronized void execute(Runnable r) {

execute(KEY, r);

}

/**

* 执行器

*

* @param key 唯一Key

* @param r 任务

*/

public static synchronized void execute(String key, Runnable r) {

if (null == r) {

return;

}

ExecutorService executorService = EXECUTOR_MAP.get(key);

if (null == executorService) {

executorService = ThreadUtil.newSingleExecutor();

EXECUTOR_MAP.put(key, executorService);

}

executorService.execute(new TaskWrapper(r));

}

/**

* Task 包装类

* 此类型的意义是记录可能会被 Executor 吃掉的异常

*/

private static class TaskWrapper implements Runnable {

private final Runnable gift;

public TaskWrapper(final Runnable target) {

this.gift = target;

}

@Override

public void run() {

// 捕获异常,避免在 Executor 里面被吞掉了

if (gift != null) {

try {

gift.run();

} catch (Exception e) {

String errMsg = StrUtil.format("线程池-包装的目标执行异常: {}", e.getMessage());

log.error(errMsg, e);

}

}

}

}

}

package org.example.thread;

import com.google.common.util.concurrent.ThreadFactoryBuilder;

import java.util.concurrent.LinkedBlockingDeque;

import java.util.concurrent.RejectedExecutionHandler;

import java.util.concurrent.ThreadPoolExecutor;

import java.util.concurrent.TimeUnit;

/**

* 线程池工厂

*

* @date 2021/11/2 10:48

*/

public final class ThreadPoolFactory {

/**

* 默认最大并发数

*/

private static final int DEFAULT_MAX_CONCURRENT = Runtime.getRuntime().availableProcessors() * 2;

/**

* 默认线程存活时间

*/

private static final long DEFAULT_KEEP_ALIVE = 60L;

/**

* 默认队列大小

*/

private static final int DEFAULT_SIZE = 1024;

/**

* 线程池名称格式

*/

private static final String DEFAULT_THREAD_POOL_NAME = "ProcessPool-{}-%d";

private ThreadPoolFactory() {

}

/**

* 创建默认的线程池

*

* @return ThreadPoolExecutor

*/

public static ThreadPoolExecutor createDefThreadPool() {

return createInitThreadPool(DEFAULT_MAX_CONCURRENT, DEFAULT_MAX_CONCURRENT * 4, DEFAULT_KEEP_ALIVE,

TimeUnit.SECONDS, DEFAULT_SIZE, DEFAULT_THREAD_POOL_NAME, new ThreadPoolExecutor.CallerRunsPolicy());

}

/**

* 创建默认的线程池

*

* @param poolName 线程池名称

* @return ThreadPoolExecutor

*/

public static ThreadPoolExecutor createDefThreadPool(String poolName) {

return createInitThreadPool(DEFAULT_MAX_CONCURRENT, DEFAULT_MAX_CONCURRENT * 4, DEFAULT_KEEP_ALIVE,

TimeUnit.SECONDS, DEFAULT_SIZE, poolName, new ThreadPoolExecutor.CallerRunsPolicy());

}

/**

* 创建默认的线程池

*

* @param maxConcurrent 最大线程数

* @param poolName 线程池名称

* @return ThreadPoolExecutor

*/

public static ThreadPoolExecutor createDefThreadPool(int maxConcurrent, String poolName) {

return createInitThreadPool(maxConcurrent, maxConcurrent * 4, DEFAULT_KEEP_ALIVE,

TimeUnit.SECONDS, DEFAULT_SIZE, poolName, new ThreadPoolExecutor.CallerRunsPolicy());

}

/**

* 创建线程池

*

* @param coreConcurrent 核心线程数

* @param maxConcurrent 最大线程数

* @param keepAlive 线程存活时效

* @param timeUnit 线程存活单位

* @param queueSize 队列大小

* @param poolName 线程池名称

* @param handler 拒绝处理策略

* @return ThreadPoolExecutor

*/

public static ThreadPoolExecutor createInitThreadPool(final int coreConcurrent,

final int maxConcurrent,

final long keepAlive,

final TimeUnit timeUnit,

final int queueSize,

final String poolName,

final RejectedExecutionHandler handler

) {

return new ThreadPoolExecutor(coreConcurrent, maxConcurrent, keepAlive, timeUnit,

new LinkedBlockingDeque<>(queueSize),

new ThreadFactoryBuilder().setNameFormat(poolName).build(),

handler

);

}

}

 

文章来源

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。

发表评论

返回顶部暗黑模式