publicThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
for (i in0..20) { server.execute { println("task $i run") Thread.sleep(Long.MAX_VALUE) } }
上面这段代码的输出为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
task 0 run task 1 run task 7 run task 8 run task 9 run task 10 run task 11 run task 12 run task 13 run task 14 run Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task MainKt$$Lambda$16/0x0000017ddf003400@70177ecd rejected from java.util.concurrent.ThreadPoolExecutor@1e80bfe8[Running, pool size = 10, active threads = 10, queued tasks = 5, completed tasks = 0] at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065) at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833) at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365) at MainKt.main(Main.kt:20)
privatebooleanaddWorker(Runnable firstTask, boolean core) { retry: for (intc= ctl.get();;) { // Check if queue empty only if necessary. if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())) returnfalse;
for (;;) { //a) 添加核心线程:如果线程数量>=corePoolSize返回失败; //b) 队列已满,添加非核心新线程,如果线程数量>=maximumPoolSize返回失败 if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) returnfalse; //线程数+1并返回外层的for循环 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateAtLeast(c, SHUTDOWN)) continue retry; // else CAS failed due to workerCount change; retry inner loop } }
booleanworkerStarted=false; booleanworkerAdded=false; Workerw=null; try { // 创建新线程,Worker构造函数中会创新新的Thread w = newWorker(firstTask); finalThreadt= w.thread; if (t != null) { finalReentrantLockmainLock=this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. intc= ctl.get();
if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) { if (t.getState() != Thread.State.NEW) thrownewIllegalThreadStateException(); workers.add(w); workerAdded = true; ints= workers.size(); if (s > largestPoolSize) largestPoolSize = s; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
privatefinalclassWorker extendsAbstractQueuedSynchronizer implementsRunnable { ...... publicvoidrun() { runWorker(this); } ...... //反复从队列中获取任务并执行 finalvoidrunWorker(Worker w) { Threadwt= Thread.currentThread(); Runnabletask= w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts booleancompletedAbruptly=true; try { // 优先执行firstTask,firstTask执行完成后会调用getTask()从队列获取任务来执行 // 如果getTask()返回null,线程会退出,系统就会回收这个线程 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); try { task.run(); //执行任务 afterExecute(task, null); } catch (Throwable ex) { afterExecute(task, ex); throw ex; } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
private Runnable getTask() { booleantimedOut=false; // Did the last poll() time out?
for (;;) { intc= ctl.get();
// Check if queue empty only if necessary. if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) { decrementWorkerCount(); returnnull; }
intwc= workerCountOf(c);
// Are workers subject to culling? booleantimed= allowCoreThreadTimeOut || wc > corePoolSize;