知识屋:更实用的电脑技术知识网站
所在位置:首页 > 科技  > Android

Java并发学习(九)任务执行、线程池及并发框架

发表时间:2022-03-25来源:网络

     大多数并发应用程序都是围绕“任务执行(Task Execution)”来构造的:任务通常是一些抽象且离散的工作单元。把应用程序分配到多个任务中,可以简化程序的组织结构,提高任务的执行效率。

任务执行

在围绕“任务执行”进行程序设计时,首先应该确定清晰的任务边界,在理想情况下,各个任务的边界是相互独立的,任务并不依赖于其他任务的状态或结果,那么这些相互独立的任务单元就是可以并行执行的。

应用程序应该同时表现出良好的吞吐量快速的响应速度。即在保持尽可能多的用户任务同时工作的条件下,还需要尽可能快的响应用户需求。

串行的执行任务:在以前的应用程序编程中,最简单的方法就是单个线程串行的执行各项任务,这样的做法虽然无过错但却会因为被IO阻塞无法充分利用CPU资源(计算机磁盘I/O的速度远小于CPU的运行速度),所以为了充分利用CPU资源,尽可能快的响应需求,采取并发编程的策略是一个很好的解决方式。

显式的为任务创建线程:为了实现并发编程提高程序的响应速度,我们为每一个任务创建一个新的线程提供服务。这样使得任务的处理过程可以从主线程中分离出来,任务可以并行执行,达到更高的吞吐量和响应速度。但是此方法是存在一定缺陷的,尤其是当任务数量较多需要创建大量的线程时,线程上下文切换和线程创建销毁的开销是非常大的。

线程引入的开销

在之前线程实现原理学习中,我们已经了解到线程的底层实现原理以及线程和进程的关系,详见这篇文章

我们知道进程是资源(CPU、内存等)分配的基本单位线程是CPU调度的基本单位,虽然在同一进程中,线程的切换不会引起进程切换,线程会共享同一进程的资源,但是线程调度过程中的产生的上下文切换和线程创建销毁的开销也是不容忽视的。每个线程都有一个程序计数器(记录要执行的下一条指令),一组寄存器(保存当前线程的工作变量),堆栈(记录执行历史,其中每一帧保存了一个已经调用但未返回的过程)。

1.上下文切换

在并发程序的执行过程中,对于多任务的处理是通过多线程来实现的,在多任务处理系统中,CPU需要处理所有程序的操作当用户来回切换它们时,需要记录这些程序执行到哪里。上下文切换就是这样一个过程,允许CPU记录并恢复各种正在运行程序的状态,使它能够完成切换操作。

任务系统往往需要同时执行多道作业。作业数往往大于机器的CPU数,然而一颗CPU同时只能执行一项任务,如何让用户感觉这些任务正在同时进行呢? 操作系统的设计者 巧妙地利用了时间片轮转的方式, CPU给每个任务都服务一定的时间,然后把当前任务的状态保存下来,在加载下一任务的状态后,继续服务下一任务。任务的状态保存及再加载, 这段过程就叫做上下文切换。时间片轮转的方式使多个任务在同一颗CPU上执行变成了可能。

2.切换步骤

在上下文切换过程中,CPU会停止处理当前运行的程序,并保存当前程序运行的具体位置以便之后继续运行。从这个角度来看,上下文切换有点像我们同时阅读几本书,在来回切换书本的同时我们需要记住每本书当前读到的页码。在进程中,上下文切换过程中的“页码”信息是保存在进程控制块(PCB, process control block)中的。PCB还被称作“切换桢”(switchframe)。“页码”信息会一直保存到CPU的内存中,直到他们被再次使用。

PCB通常是系统内存占用区中的一个连续存区,它存放着操作系统用于描述进程情况及控制进程运行所需的全部信息,它使一个在多道程序环境下不能独立运行的程序成为一个能独立运行的基本单位或一个能与其他进程并发执行的进程。进程切换的步骤:

保存进程A的状态(寄存器和操作系统数据);更新PCB中的信息,对进程A的“运行态”做出相应更改;将进程A的PCB放入相关状态的队列将进程B的PCB信息改为“运行态”,并执行进程BB执行完后,从队列中取出进程A的PCB,恢复进程A被切换时的上下文,继续执行A

总的来说,进程的切换可以分为两步:

切换页目录以使用新的地址空间;切换内核栈和硬件上下文;

而对于线程来说,由于共享了进程的资源,所以对于线程切换,第1步是不需要做的,第2是进程和线程切换都要做的(不在同一进程的线程切换视作进程切换)。线程上下文切换和进程上下文切换一个最主要的区别是 线程的切换虚拟内存空间依然是相同的,但是进程切换是不同的这两种上下文切换的处理都是 通过操作系统内核来完成的。内核的这种切换过程伴随的 最显著的性能损耗是将寄存器中的内容切换出

对于一个正在执行的进程包括 程序计数器、寄存器、变量的当前值等 ,而这些数据都是保存在CPU的寄存器中的,且这些寄存器只能是正在使用CPU的进程才能享用,在进程切换时,首先得保存上一个进程的这些数据(便于下次获得CPU的使用权时从上次的中断处开始继续顺序执行,而不是返回到进程开始,否则每次进程重新获得CPU时所处理的任务都是上一次的重复,可能永远也到不了进程的结束出,因为一个进程几乎不可能执行完所有任务后才释放CPU),然后将本次获得CPU的进程的这些数据装入CPU的寄存器从上次断点处继续执行剩下的任务。

3.引起切换的原因

对于我们经常使用的抢占式操作系统(CPU随机为线程分配时间片)而言,以下几种原因可能会导致线程上下文切换:

中断处理:在中断处理中,其他程序”打断”了当前正在运行的程序。当CPU接收到中断请求时,会在正在运行的程序和发起中断请求的程序之间进行一次上下文切换。中断分为硬件中断和软件中断,软件中断包括因为IO阻塞、未抢到资源(锁)或者用户代码挂起当前任务等原因,线程被挂起。多任务处理:在多任务处理中,CPU会在不同程序之间来回切换,每个程序都有相应的处理时间片,CPU在两个时间片的间隔中进行上下文切换。用户态切换:对于一些操作系统,当进行用户态内核态切换时也会进行一次上下文切换。关于用户态和内核态本篇文章就讲的很详细。

4.减少切换的方法

既然上下文切换会导致额外的开销,因此减少上下文切换次数便可以提高多线程程序的运行效率。所以,减少上下文切换的方法有无锁并发编程、CAS算法、使用最少线程和使用协程

无锁并发:多线程竞争时,会引起上下文切换,所以多线程处理数据时,可以用一些办法来避免使用锁,如将数据的ID按照Hash取模分段,不同的线程处理不同段的数据;CAS算法:Java的Atomic原子操作包使用CAS算法来更新数据,而不需要加锁;最少线程:避免创建不需要的线程,比如任务很少,但是创建了很多线程来处理,这样会造成大量线程都处于等待状态;使用协程:在单线程里实现多任务的调度,并在单线程里维持多个任务间的切换;

5.线程创建和销毁的开销

在Java中一切都可以看做是对象,线程也不例外,线程对象的创建必然导致内存空间的消耗,而线程的销毁同样也需要虚拟机垃圾收集器GC来完成回收工作,过多的线程对象会使得这部分的开销变得不容忽视。

线程池

通过上面的学习我们了解到串行执行程序的低效性,并发编程又受限于线程引入的性能开销,那么有什么办法可以解决这些问题呢?线程池的出现很好的解决了这类问题,合理的使用线程池有如下好处:

降低资源消耗:重复利用存在的线程,减少对象创建、消毁的开销。提高响应速度: 可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。提高线程的可管理性: 提供定时执行、定期执行、单线程、并发数控制等功能,对线程统一分配、调优和监控。

1.Executor框架

      在之前对于线程的实现原理的学习中,我们已经了解到HotSpot VM的线程模型采用的是内核线程1:1模型实现的(详见这篇文章),Java线程(java.lang.Thread)被一对一的映射为本地操作系统线程。操作系统会调度所有的线程并将它们分配给可用的CPU。

在上层,Java多线程程序通常把应用分为若干个任务,然后使用用户级的线程调度器(Executor执行器框架)将这些任务映射为固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上,如下图所示:

从图中可以看出,应用程序通过Executor框架控制上层的调度;而下层的调度由操作系统内核来控制,下层的调度不受应用程序的控制。

2.Executor框架的结构

      Executor框架是一个根据一组执行策略调用、调度、执行和控制的异步任务的框架,目的是提供一种将”任务提交”与”任务如何运行”分离开来的机制。

Executor框架主要由3大部分构成

任务: 包括执行任务需要实现的Runnable接口Callable接口任务的执行:包括任务执行机制的核心接口Executor ,以及继承自Executor 接口的ExecutorService接口。(ScheduledThreadPoolExecutor和ThreadPoolExecutor这两个关键类实现了ExecutorService接口。)异步计算的结果:Future接口以及Future接口的实现类FutureTask类

下图为Executor框架的主要类和接口:

Executor接口:

一个运行新任务的简单接口,是Executor框架的基础,它将任务的提交与执行分离开来

public interface Executor { void execute(Runnable command); }

Executor接口只有一个execute方法,用来替代通常创建或启动线程的方法。例如,使用Thread来创建并启动线程的代码如下:

Thread t = new Thread(); t.start(); //Thread启动线程 executor.execute(t); //Executor启动线程

对于不同的Executor实现,execute()方法可能是创建一个新线程并立即启动,也有可能是使用已有的工作线程来运行传入的任务,也可能是根据设置线程池的容量或者阻塞队列的容量来决定是否要将传入的线程放入阻塞队列中或者拒绝接收传入的线程。

ExecutorService接口

ExecutorService接口扩展了Executor接口。添加了一些用来管理执行器生命周期和任务生命周期的方法;提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。增加了shutDown(),shutDownNow(),invokeAll(),invokeAny()和submit()等方法。如果需要支持即时关闭,也就是shutDownNow()方法,则任务需要正确处理中断。

ScheduledExecutorService接口

扩展了ExecutorService接口,支持Future和定期执行任务。ScheduledExecutorService扩展ExecutorService接口并增加了schedule方法。调用schedule方法可以在指定的延时后执行一个Runnable或者Callable任务。ScheduledExecutorService接口还定义了按照指定时间间隔定期执行任务的scheduleAtFixedRate()方法和scheduleWithFixedDelay()方法。

Executor框架的使用示意图:

主线程首先要创建实现Runnable或者Callable接口的任务对象。工具类Executors可以实现Runnable对象和Callable对象之间的相互转换。(Executors.callable(Runnable task)或Executors.callable(Runnable task,Object resule))。然后可以把创建完成的Runnable对象直接交给ExecutorService执行(ExecutorService.execute(Runnable command));或者也可以把Runnable对象或Callable对象提交给ExecutorService执行(ExecutorService.submit(Runnable task)或ExecutorService.submit(Callable task))。

执行execute()方法和submit()方法的区别在于: 1)execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否; 2)submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

如果执行ExecutorService.submit(…),ExecutorService将返回一个实现Future接口的对象。由于FutureTask实现了Runnable,程序员也可以创建FutureTask,然后直接交给ExecutorService执行。最后,主线程可以执行FutureTask.get()方法来等待任务执行完成。主线程也可以执行FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。

3.ThreadPoolExecutor详解

java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,它是线程池的实现类,它有四种构造方法,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作。

public class ThreadPoolExecutor extends AbstractExecutorService { ..... public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue workQueue); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue workQueue,ThreadFactory threadFactory); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue workQueue,RejectedExecutionHandler handler); public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize = CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 尝试增加workerCount,如果成功,则跳出第一个for循环 if (compareAndIncrementWorkerCount(c)) break retry; // 如果增加workerCount失败,则重新获取ctl的值 c = ctl.get(); // 如果当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行 if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false;//线程启动成功与否的标志 boolean workerAdded = false; //工作线程创建并添加到线程池成功与否的标志 Worker w = null; try { // 根据firstTask来创建Worker对象 w = new Worker(firstTask); // 每一个Worker对象都会创建一个线程 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); // rs < SHUTDOWN表示是RUNNING状态; // 如果rs是RUNNING状态 或者 rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。 // 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) throw new IllegalThreadStateException(); // workers是一个HashSet容器 workers.add(w); int s = workers.size(); // largestPoolSize记录着线程池中出现过的最大线程数量 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 启动线程 t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }

execute方法开始,整个工作线程的生命周期为Worker使用ThreadFactory创建新的工作线程,新的工作线程Worker(Worker类继承了AQS,实现了Runnable接口,也可作为线程,是不可重入的锁)启动后会直接调用Worker的run()方法,执行runWorker,runWorker通过getTask获取任务,然后执行任务,如果getTask返回null,进入processWorkerExit方法,整个线程结束。

后续方法的源码分析可参考这篇文章,就不过多赘述。

4.常用的线程池

通过Executor框架的工具类Executors,可以创建多种不同类型的线程池,但其实质还是通过Executors类的静态方法内重新调用ThreadPoolExecutor四种构造方法的其中一种,为ThreadPoolExecutor的部分成员变量设置一定的值,从而形成特定功能的线程池,并没有真正的产生多个线程池类

常见的3种ThreadPoolExecutor线程池:FixedThreadPool、SingleThreadExecutor、CachedThreadPool。

FixedThreadPool

FixedThreadPool被称为可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。在任意点,在大多数 nThreads 线程会处于处理任务的活动状态。

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory); }

FixedThreadPool使用无界队列 LinkedBlockingQueue(队列的容量为Intger.MAX_VALUE)作为线程池的工作队列会对线程池带来如下影响:

当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池中的线程数不会超过corePoolSize;由于1,使用无界队列时maximumPoolSize将是一个无效参数;由于1和2,使用无界队列时keepAliveTime将是一个无效参数;运行中的FixedThreadPool(未执行shutdown()或shutdownNow()方法)不会拒绝任务

SingleThreadExecutor

SingleThreadExecutor是使用单个worker线程的Executor。

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory)); }

 

从上面源代码可以看出新创建的SingleThreadExecutorcorePoolSizemaximumPoolSize都被设置为1。其他参数和FixedThreadPool相同,SingleThreadExecutor使用无界队列LinkedBlockingQueue作为线程池的工作队列。SingleThreadExecutor使用无界队列作为线程池的工作队列会对线程池带来的影响与FixedThreadPool相同。

CachedThreadPool

CachedThreadPool是一个会根据需要创建新线程的线程池,可缓存线程池。

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue(), threadFactory); }

CachedThreadPool的corePoolSize被设置为空(0)maximumPoolSize被设置为Integer.MAX.VALUE,即它是无界的,这也就意味着如果主线程提交任务的速度高于maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新的线程。极端情况下,这样会导致耗尽cpu和内存资源。

5.ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor主要用来在给定的延迟后运行任务,或者定期执行任务

ScheduledThreadPoolExecutor使用的任务队列DelayQueue封装了一个PriorityQueue,PriorityQueue会对队列中的任务进行排序,执行所需时间短的放在前面先被执行(ScheduledFutureTask的time变量小的先执行),如果执行所需时间相同则先提交的任务将被先执行(ScheduledFutureTask的s变量小的先执行)。

ScheduledThreadPoolExecutor与ThreadPoolExecutor的设计基本类似,为了实现周期性的执行任务,对ThreadPoolExecutor做了如下修改:

使用 DelayQueue 作为任务队列;获取任务的方不同执行周期任务后,增加了额外的处理

这里不做赘述,详细可见这篇文章

6.线程池的应用

public class Test { public static void main(String[] args) { //创建一个核心线程数为5,最大线程数为10,空闲线程存活时间为200毫秒的有界队列线程池 ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(5)); //创建15个任务,放入Executor执行器执行 for(int i=0;i
收藏

上一篇JAVA的编译运行过程分析

下一篇java JDBC

  • 人气文章
  • 最新文章
  • 下载排行榜
  • 热门排行榜