`
zhouchaofei2010
  • 浏览: 1085780 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

java并发编程实战-第8章-线程池的使用

 
阅读更多

java并发编程实战-第8章-线程池的使用

 

重点:配置调优的高级选项,并分析在任务执行框架时需要注意的各种危险

8.1 在任务与执行策略之间的隐性耦合

 

8.1.1 线程饥饿死锁

条件:任务依赖其他任务,只要线程池中的正在执行的任务的线程需要等待一工作队列中的任务而阻塞。

调整策略:调整线程池的大小

 

例子:ThreadDeadlock中主任务等待子任务的完成(注意:例子是单线程执行,如果是多线程则不会)

 

Task that Deadlocks in a Single-threaded Executor Don't Do this.

 

public class ThreadDeadlock {

    ExecutorService exec = Executors.newSingleThreadExecutor();

 

    public class RenderPageTask implements Callable<String> {

        public String call() throws Exception {

            Future<String> header, footer;

            header = exec.submit(new LoadFileTask("header.html"));

            footer = exec.submit(new LoadFileTask("footer.html"));

            String page = renderBody();

            // Will deadlock -- task waiting for result of subtask

            return header.get() + page + footer.get();

        }

    }

}

 

 

8.1.2 运行较长时间的任务

 

条件:任务时间阻塞时间过长,即使不死锁,线程池的响应性也会变的很糟糕

 

调整策略:1、(缓解)限定任务等待资源时间,用限时版本的阻塞方法

          2、如果总是充满被阻塞的任务,那么调大线程池的大小

 

 

例如:Thread.join ,BlockingQueue.put、CountDownLatch.await

 

 

8.2 设置线程池的大小

 

  必须分析计算机环境,资源预算和任务的特性,要正确的设置线程池的大小,必须估算出任务的等待时间和计算时间的比值

  

 

 

 

The optimal pool size for keeping the processors at the desired utilization is:

 

N(线程数)=N(cpu总数)*U(cpu的利用率)*(1+W(等待时间)/C(计算时间))

 

和任务的数量没关系,只有cpu 和 任务的等待执行时间有关系

 

You can determine the number of CPUs using Runtime:

 

int N_CPUS = Runtime.getRuntime().availableProcessors();

 

 

 

 

如果阻塞操作多了。比如i/o 操作,有些任务并不会一直执行,所以要调大线程池的大小

 

8.3  配置ThreadPoolExcutor

 

首先,来看看ThreadPoolExcutor的本质与默认配置

本质:ExcutorService

适用:Executors 工厂方法配置

默认配置(Executors 工厂方法提供,它们均为大多数使用场景预定义了设置 ):

Executors.newCachedThreadPool()(无界线程池,可以进行自动线程回收)、Executors.newFixedThreadPool(int)(固定大小线程池)

Executors.newSingleThreadExecutor()(单个后台线程)

 

 

如果默认执行策略不能满足需求,那么可以通过ThreadPoolExcutor的构造函数来实例化一个对象,根据自己的需求定制

 

执行策略包括的内容即是构造函数的参数内容:

 

如下:

ThreadPoolExecutor(

  int corePoolSize,

  int maximumPoolSize, 

  long keepAliveTime,

 TimeUnit unit, 

  BlockingQueue workQueue,

  RejectedExecutionHandler handler

corePoolSize: 线程池维护线程的最少数量 

maximumPoolSize:线程池维护线程的最大数量 

keepAliveTime:线程池维护线程所允许的空闲时间 

unit: 线程池维护线程所允许的空闲时间的单位 

workQueue: 线程池所使用的缓冲队列 

handler: 线程池对拒绝任务的处理策略  

 

如下蓝色内容来自:http://blog.chinaunix.net/uid-20577907-id-3519578.html

如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。 
如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。 
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。 
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。 

也就是:处理任务的优先级为: 
核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。 

当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程

 

 

8.3.1 线程的创建和销毁

 corePoolSize, maximumPoolSize,keepAliveTime 负责线程的创建和销毁

 

8.3.2 管理队列任务

 

 

 

8.3.3 饱和策略

ThreadPoolExecutor的饱和策略通过setRejectedExecutionHandler来修改

 

1、Abort 中止  默认,将抛出RejectedExecutionException,调用者捕获改异常,根据需求处理代码

2、抛弃(Discard)

3、抛弃最旧的(Discard-Oldest)会抛弃下一个需要执行的任务,在FIFO队列中时成立的,但在优先队列中会抛弃最优先的任务,所以该策略不该和优先队列一起使用 

4、调用者运行(Caller-Runs),该策略是个调节机制,比如在webServer服务器中,使用有界队列和调用者运行饱和策略,在线程池中所有线程都被调用,工作队列已满,

下一个任务会在调用excute的主线程中运行,此时主线程不会在调用accept,因此到达的请求将被保存在TCP层的队列中而不是在应用程序的队列中。如果持续过载,tcp层同样开始抛弃任务。

当服务器过载时,过载情况会向外蔓延开来-从线程池到工作者队列到应用程序到tcp层,最终达到客户端,导致服务器在过高负载的情况下实现一种平缓的性能降低。

 

 

例子:

 

ThreadPoolExecutor executor

    = new ThreadPoolExecutor(N_THREADS, N_THREADS,

        0L, TimeUnit.MILLISECONDS,

        new LinkedBlockingQueue<Runnable>(CAPACITY));

executor.setRejectedExecutionHandler(

    new ThreadPoolExecutor.CallerRunsPolicy());

    

    

    那么当任务达到后,如何做到不中止、不抛弃、不被调节,而是阻塞呢?

    

 通过Semaphore(信号量)来控制任务的到达率,来实现

 

 

 

    

 

 

 

8.3.4 线程工厂

 

 通过实现ThreadFactory接口,可以实现自定义线程

 

指定name,设置UncaughtExceptionHandler,log等

 

 

8.3.5 在调用构造函数后再定制ThreadPoolExcutor(允许指定和不允许指定的处理)

 

Listing 8.8. Modifying an Executor Created with the Standard Factories.

ExecutorService exec = Executors.newCachedThreadPool();

if (exec instanceof ThreadPoolExecutor)

    ((ThreadPoolExecutor) exec).setCorePoolSize(10);

else

    throw new AssertionError("Oops, bad assumption");

 

补充理解: 

ExecutorService 和 ThreadPoolExecutor 的关系 :ThreadPoolExecutor是ExecutorService实现的子类

 

 

public interface ExecutorServiceextends Executor

 

public abstract class AbstractExecutorServiceextends Objectimplements ExecutorService

 

public class ThreadPoolExecutorextends AbstractExecutorService

 

 

 

如果不让外部代码定制ThreadPoolExcutor,可以使用Excutors中的 unconfigurableExecutorService方法来包裹ExecutorService

 

static ExecutorService unconfigurableExecutorService(ExecutorService executor) 

          返回一个将所有已定义的 ExecutorService 方法委托给指定执行程序的对象,但是使用强制转换可能无法访问其他方法 

          

          

改方法仅仅代理了定义在ExecutorService的方法,比如shutdown、submit 但是屏蔽了子类比如ThreadPoolExecutor中的setCorePoolSize,setKeepAliveTime等方法 

 

 

8.4 扩展ThreadPoolExcutor

 

override beforeExecute, afterExecute , and terminatethat 

 

Listing 8.9.  TimingThreadPool 例子不错

 

 

8.5 并行化递归算法

 

经典应用场景:  

谜题框架 (puzzle  Framework)

 

 

串行执行,深度优先算法,对栈要求高,不一定找到是最短路径

 

并行执行,广度优先算法,需要存的节点多,所以对内存需求高,能找出最短路径

 

public class ConcurrentPuzzleSolver <P, M> {

    private final Puzzle<P, M> puzzle;

    private final ExecutorService exec;

    private final ConcurrentMap<P, Boolean> seen;

    protected final ValueLatch<PuzzleNode<P, M>> solution = new ValueLatch<PuzzleNode<P, M>>();

 

    public ConcurrentPuzzleSolver(Puzzle<P, M> puzzle) {

        this.puzzle = puzzle;

        this.exec = initThreadPool();

        this.seen = new ConcurrentHashMap<P, Boolean>();

        if (exec instanceof ThreadPoolExecutor) {

            ThreadPoolExecutor tpe = (ThreadPoolExecutor) exec;

            tpe.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());

        }

    }

 

    private ExecutorService initThreadPool() {

        return Executors.newCachedThreadPool();

    }

 

    public List<M> solve() throws InterruptedException {

        try {

            P p = puzzle.initialPosition();

            exec.execute(newTask(p, null, null));

            // block until solution found

            PuzzleNode<P, M> solnPuzzleNode = solution.getValue();  // The main thread needs to wait until a solution is found

            return (solnPuzzleNode == null) ? null : solnPuzzleNode.asMoveList();

        } finally {

            exec.shutdown();

        }

    }

 

    protected Runnable newTask(P p, M m, PuzzleNode<P, M> n) {

        return new SolverTask(p, m, n);

    }

 

    protected class SolverTask extends PuzzleNode<P, M> implements Runnable {

        SolverTask(P pos, M move, PuzzleNode<P, M> prev) {

            super(pos, move, prev);

        }

 

        public void run() {

            if (solution.isSet()

                    || seen.putIfAbsent(pos, true) != null)

                return; // already solved or seen this position

            if (puzzle.isGoal(pos))

                solution.setValue(this);

            else

                for (M m : puzzle.legalMoves(pos))

                    exec.execute(newTask(puzzle.move(pos, m), m, this));//启动子线程继续搜索,父线程终止,不影响子线程继续执行

        }

    }

}

 

 

 

小结:

 

策略:

创建线程策略

关闭线程策略

处理队列任务策略

处理过多任务策略

 

钩子方法来扩展行为

分享到:
评论

相关推荐

    Java并发编程实战

    第8章 线程池的使用 第9章 图形用户界面应用程序 第三部分 活跃性、性能与测试 第10章 避免活跃性危险 第11章 性能与可伸缩性 第12章 并发程序的测试 第四部分 高级主题 第13章 显式锁 第14章 构建自定义的...

    Java并发编程实战2019.zip

    Java并发编程实战,第1章 简介,第2章 线程安全性 第3章 对象的共享 第4章 对象的组合 第5章 基础构建模块 第6章 任务执行 第7章 取消与关闭 第8章 线程池的使用 第9章 图形用户界面应用程序 第10章 避免...

    Java 并发编程实战

    第8章 线程池的使用 第9章 图形用户界面应用程序 第三部分 活跃性、性能与测试 第10章 避免活跃性危险 第11章 性能与可伸缩性 第12章 并发程序的测试 第四部分 高级主题 第13章 显式锁 第14章 构建自定义的...

    Java 7并发编程实战手册

    《Java 7并发编程实战手册》是Java 7并发编程的实战指南,介绍了Java 7并发API中大部分重要而有用的机制。全书分为9章,涵盖了线程管理、线程同步、线程执行器、Fork/Join框架、并发集合、定制并发类、测试并发应用...

    Java并发编程实践 PDF 高清版

    第8章 应用线程池 8.1 任务与执行策略问的隐性耦合 8.2 定制线程池的大小 8.3 配置ThreadPoolExecutor 8.4 扩展ThreadPoolExecutor 8.5 并行递归算法 第9章 GUI应用程序 9.1 为什么GUI是单线程化的 9.2 短期的GUI...

    《Java并发编程的艺术》源代码

    第1章介绍Java并发编程的挑战,向读者说明进入并发编程的世界可能会遇到哪些问题,以及如何解决。 第2章介绍Java并发编程的底层实现原理,介绍在CPU和JVM这个层面是如何帮助Java实现并发编程的。 第3章介绍深入介绍...

    龙果 java并发编程原理实战

    龙果 java并发编程原理实战 第2节理解多线程与并发的之间的联系与区别 [免费观看] 00:11:59分钟 | 第3节解析多线程与多进程的联系以及上下文切换所导致资源浪费问题 [免费观看] 00:13:03分钟 | 第4节学习并发的四...

    汪文君高并发编程实战视频资源下载.txt

    │ Java并发编程.png │ ppt+源码.rar │ 高并发编程第二阶段01讲、课程大纲及主要内容介绍.wmv │ 高并发编程第二阶段02讲、介绍四种Singleton方式的优缺点在多线程情况下.wmv │ 高并发编程第二阶段03讲、...

    Java并发编程的艺术

    第1章介绍Java并发编程的挑战,会向读者说明可能会遇到哪些问题,以及如何解决。第2章Java并发编程的底层实现原理,从CPU和JVM2个层面剖析。第3章详细深入介绍了Java的内存模型。第4章从介绍多线程技术带来的好处...

    Java并发编程的艺术_非扫描

    Java并发编程的艺术_非扫描本书特色本书结合JDK的源码介绍了Java并发框架、线程池的实现原理,帮助读者做到知其所以然。本书对原理的剖析不仅仅局限于Java层面,而是深入到JVM,甚至CPU层面来进行讲解,帮助读者从更...

    Java 并发编程原理与实战视频

    java并发编程原理实战 第2节理解多线程与并发的之间的联系与区别 [免费观看] 00:11:59分钟 | 第3节解析多线程与多进程的联系以及上下文切换所导致资源浪费问题 [免费观看] 00:13:03分钟 | 第4节学习并发的四个...

    汪文君高并发编程实战视频资源全集

    │ Java并发编程.png │ ppt+源码.rar │ 高并发编程第二阶段01讲、课程大纲及主要内容介绍.wmv │ 高并发编程第二阶段02讲、介绍四种Singleton方式的优缺点在多线程情况下.wmv │ 高并发编程第二阶段03讲、...

    龙果java并发编程完整视频

    第8节多种创建线程的方式案例演示(二)使用线程池00:15:40分钟 | 第9节Spring对并发的支持:Spring的异步任务00:11:10分钟 | 第10节使用jdk8提供的lambda进行并行计算00:14:22分钟 | 第11节了解多线程所带来的...

    java并发编程

    第8节多种创建线程的方式案例演示(二)使用线程池00:15:40分钟 | 第9节Spring对并发的支持:Spring的异步任务00:11:10分钟 | 第10节使用jdk8提供的lambda进行并行计算00:14:22分钟 | 第11节了解多线程所带来的...

    实战Java高并发程序设计(第2版)PPT模板.pptx

    3.3不要重复发明轮子:jdk的并发容器 3.4使用jmh进行性能测试 3.2线程复用:线程池 3.3不要重复发明轮子:JDK的并发容器 3.4使用JMH进行性能测试 实战Java高并发程序设计(第2版)PPT模板全文共25页,当前为第8页。...

    java8源码-highconcurrency:《实战Java高并发程序设计》葛一鸣、郭超着电子工业出版社2015年11月第一版书上部分代码

    本章开始我会重点学习在《Java多线程编程核心技术》一书中没有涉及到的线程池,并发容器等知识。 第四章锁的优化及注意事项: 第五章并行模式与算法: 第六章Java8与并发 第七章使用Akka构建高并发程序 第八章并行程

    java8源码-baijia123:常用工具类及测试类

    JAVA并发编程实战的示例及其他 第5章:基础构建模块 对应类包com.baijia123.concurrent TestHarnes-&gt;在计时测试中使用CountDownLatch(闭锁)来启动和停止线程 Preloader-&gt;使用FutureTask来提前加载稍后需要的数据 ...

Global site tag (gtag.js) - Google Analytics