Java编程思想 并发Concurrent Synchronize Executor多线程 阻塞队列BlockingQueue使用总结
进程(process)和线程(thread)
实现并发最直接的方式是在操作系统级别使用进程。进程是运行在它自己的地址空间内的自包容的程序。
一个线程就是在进程中的一个单一的顺序控制流,单个进程可以拥有多个并发执行任务。
Runnable和Thread
通过Runnable
定义任务
线程可以驱动任务,Java中是由Runnable
接口提供,需要实现Runnable
接口的run()
方法。任务run()
方法通常总会有某种形式的循环,使得任务一直运行下去直到不再需要。通常run()
被写成无限循环的形式。
例如:
public class LiftOff implements Runnable {
protected int countDown = 10; //倒计数
private static int taskCount;
private final int id = taskCount ++;
public LiftOff() {
super();
}
public LiftOff(int countDown) {
super();
this.countDown = countDown;
}
public String status() {
return String.format("#%d(%s).", id, (countDown > 0) ? countDown : "LiftOff!");
}
@Override
public void run() {
while(countDown-- > 0) {
System.out.print(status());
Thread.yield();
}
}
}
静态方法Thread.yield()
Thread.yield()
对线程调度器的一种建议,暗示:我已经执行完生命周期中最重要的部分了,现在正是将CPU资源切换给其他任务执行一段时间的时机。
Thread类
将Runnable
对象转变为工作任务的传统方式是将它提交给一个Thread
的构造器。
使用方法如下:
public class BasicThreads {
public static void main(String[] args) {
Thread t = new Thread(new LiftOff());
t.start();
System.out.println("Waiting for LiftOff");
}
}
Executor和线程池
Executor
是Java SE5引入的,用来管理Thread
对象,从而简化并发编程。Executor是启动任务的优选方法
ExecutorService
ExecutorService
是具有生命周期的Executor
,会管理如何构建恰当的上下文来执行Runnable
对象。非常常见的情况是:单个Executor
用来创建和管理系统中所有的任务。
在任何线程池中,在现有的线程的可能情况下,都会被自动复用
CachedThreadPool
CachedThreadPool
为每个任务都创建一个线程,使用方法:
public class CachedThreadPool {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < 5; i ++) {
exec.execute(new LiftOff());
}
exec.shutdown();
}
}
FixedThreadPool
FixedThreadPool
使用了有限的线程集来执行所提交的任务。通过FixedThreadPool
可以一次性预先执行代价高昂的线程分配,限制线程的数量。使用方法:
public class FixedThreadPool {
public static void main(String[] args) {
ExecutorService exec = Executors.newFixedThreadPool(5);
for(int i = 0; i < 5; i ++) {
exec.execute(new LiftOff());
}
exec.shutdown();
}
}
SingleThreadExecutor
SingleThreadExecutor
线程数量为1的线程池。向SingleThreadExecutor
提交的多个任务,那么这些任务将排队执行,所有的任务都将使用相同的线程。SingleThreadExecutor
会序列化所有提交的给它的任务,并会维护它的悬挂任务队列。
如果有大量的线程,它们运行的任务是使用文件系统,可以使用SingleThreadExecutor
来运行这些线程,来确保任意时刻至多有唯一的任务在运行,这样就不需要在共享资源上处理同步,同时还不会过度使用文件系统。
使用方法:
public class SingleThreadExecutor {
public static void main(String[] args) {
ExecutorService exec = Executors.newSingleThreadExecutor();
for(int i = 0; i < 5; i ++) {
exec.execute(new LiftOff());
}
exec.shutdown();
}
}
CachedThreadPool
底层是通过SynchronousQueue
来实现;FixedThreadPool
和SingleThreadExecutor
底层是通过LinkedBlockingQueue
来实现。
Callable从任务中返回值
Runnable
可以执行独立任务,但是不能返回任何值。如果需要在任务完成时能够返回一个值,那么就可以使用Callable
接口。Callable
必须使用ExecutorService.submit()
来进行调用ExecutorService.submit()
会产生Future
对象,可以用isDone()
来查询Future是否已经完成,当任务完成,会有一个结果,可以通过get()
来获取该结果。如果不用isDone()
来检查,直接调用get()
,那么get()
将阻塞直到结果准备就绪。
常见使用如下:
public class TaskWithResult implements Callable<String> {
private int id;
public TaskWithResult(int id) {
super();
this.id = id;
}
@Override
public String call() throws Exception {
return "result of TaskWithResult " + id;
}
}
public class CallableDemo {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
ArrayList<Future<String>> results = new ArrayList<>();
for(int i = 0; i < 10; i ++) {
results.add(exec.submit(new TaskWithResult(i)));
}
for(Future<String> fs : results) {
try {
System.out.println(fs.get());
} catch (InterruptedException e) {
System.out.println(e);
return ;
} catch (ExecutionException e) {
System.out.println(e);
} finally {
exec.shutdown();
}
}
}
}
休眠sleep()
sleep()
可以使任务中止给定的时间。sleep()
可能会抛出InterruptedExceptioin
异常。
异常不能跨线程传播回main()
,需要在本地处理可能产生的异常。
常用使用方法如下:
public class SleepingTask extends LiftOff {
@Override
public void run() {
try {
while (countDown-- > 0) {
System.out.println(status());
TimeUnit.MILLISECONDS.sleep(100);
}
} catch (InterruptedException e) {
System.err.println("Interrupted");
}
}
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < 5; i ++) {
exec.execute(new SleepingTask());
}
exec.shutdown();
}
}
线程的优先级Priority
与让步
线程优先级将该线程的重要性传递给调度器,调度器会倾向(不能保证)让优先权最高的线程先执行。
所有的线程都应该以默认优先级运行,一般视图修改操作线程优先级通常是一种不可取的。
优先级设定是在run()
的开通部分设定Thread.currentThread().setPriority(priority)
。Thread.yield()
建议有相同优先级的其他线程可以允许。
使用方法如下:
public class SimplePriorities implements Runnable {
private int countDown = 5;
private volatile double d;
private int priority;
public SimplePriorities(int priority) {
super();
this.priority = priority;
}
@Override
public String toString() {
return Thread.currentThread() + ":" + countDown + ":" + System.nanoTime();
}
@Override
public void run() {
Thread.currentThread().setPriority(priority);
while(true) {
for(int i = 1; i < 100000; i ++) {
d += (Math.PI + Math.E) / (double)i;
if(i % 1000 == 0) {
Thread.yield();
}
}
System.out.println(this);
if(--countDown == 0) {
return ;
}
}
}
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < 5; i ++) {
exec.execute(new SimplePriorities(Thread.MIN_PRIORITY));
}
exec.execute(new SimplePriorities(Thread.MAX_PRIORITY));
exec.shutdown();
}
}
后台线程daemon
后台线程daemon
是指在程序运行的时候在后台提供一种通用服务的线程,后台线程并不属于程序中不可或缺的部分。
当所有的非后台线程结束时,程序也就结束了,同时会杀掉所有的后台线程。
只要有任何非后台线程还在运行,程序就不会终止。
让一个线程已后台线程运行,必须在线程启动调用前调用setDaemon()
,才能将线程设置为后台线程。
后台线程可能会在不执行finally
子句情况下,终止run()
方法当最后一个非后台线程终止时,所有后台线程会突然终止。
常用使用方法如下:
public class SimpleDaemons implements Runnable {
@Override
public void run() {
try {
while(true) {
TimeUnit.MILLISECONDS.sleep(100);
System.out.println(Thread.currentThread() + " " + this);
}
} catch (InterruptedException e) {
System.out.println();
}
}
public static void main(String[] args) throws Exception {
for(int i = 0; i < 10; i ++) {
Thread daemon = new Thread(new SimpleDaemons());
daemon.setDaemon(true);
daemon.start();
}
System.out.println("All daemon stared.");
TimeUnit.MILLISECONDS.sleep(175);
}
}
ThreadFactory
通过ThreadFactory
可以定制由Executor
创建线程的属性(如:后台、优先级、名称等)。
常用使用方法:
/**
* 定义ThreadFactory
*/
public class DaemonThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
}
public class DaemonFromFactory implements Runnable {
@Override
public void run() {
try {
while(true) {
TimeUnit.MILLISECONDS.sleep(100);
System.out.println(Thread.currentThread() + " " + this);
}
} catch(InterruptedException e) {
System.out.println("Interrupted");
}
}
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool(new DaemonThreadFactory());
for(int i = 0; i < 10; i ++) {
exec.execute(new DaemonFromFactory());
}
System.out.println("All daemons started.");
TimeUnit.MILLISECONDS.sleep(500);
}
}
线程任务类编码的变体
继承Thread
实现任务可以实现Runnable
接口,也可以通过直接继承Thread
这种方式。
使用方法如下:
public class SimpleThread extends Thread {
private int countDown = 5;
private static int staticCount = 0;
public SimpleThread() {
super(Integer.toString(++ staticCount));
start();
}
@Override
public String toString() {
return "#" + getName() + "(" + countDown + "), ";
}
public void run() {
while(true) {
System.out.println(this);
if(--countDown == 0) {
return ;
}
}
}
public static void main(String[] args) {
for(int i = 0; i < 5; i ++) {
new SimpleThread();
}
}
}
自管理的Runnable
方式
实现接口使得类可以继承另一个不同的类,而Thread
继承不支持。
使用方法如下:
public class SelfManaged implements Runnable {
private int countDown = 5;
private Thread t = new Thread(this);
public SelfManaged() {
super();
t.start();
}
public String toString() {
return Thread.currentThread().getName() + "(" + countDown + ").";
}
@Override
public void run() {
while(true) {
System.err.println(this);
if(--countDown == 0) {
return ;
}
}
}
public static void main(String[] args) {
for(int i = 0; i < 5; i ++) {
new SelfManaged();
}
}
}
内部类方式
public class InnerThread1 {
private int countDown = 5;
private Inner inner;
public InnerThread1(String name) {
inner = new Inner(name);
}
private class Inner extends Thread {
Inner(String name) {
super(name);
start();
}
public void run() {
try {
while(true) {
System.out.println(this);
if(--countDown == 0) {
return ;
}
sleep(10L);
}
} catch(InterruptedException e) {
System.out.println("interrupted()");
}
}
public String toString() {
return getName() + ":" + countDown;
}
}
}
public class InnerThread2 {
private int countDown = 5;
private Thread t;
public InnerThread2(String name) {
t = new Thread(name) {
public void run() {
try {
while(true) {
System.out.println(this);
if(--countDown == 0) {
return ;
}
sleep(10);
}
} catch(InterruptedException e) {
System.out.println("sleep() interrupted");
}
}
public String toString() {
return getName() + ":" + countDown;
}
};
t.start();
}
}
public class InnerRunnable1 {
private int countDown = 5;
private Inner inner;
public InnerRunnable1(String name) {
inner = new Inner(name);
}
private class Inner implements Runnable {
Thread t;
public Inner(String name) {
t = new Thread(this, name);
t.start();
}
@Override
public void run() {
try {
while(true) {
System.out.println(this);
if(--countDown == 0) {
return ;
}
TimeUnit.MILLISECONDS.sleep(10L);
}
} catch(InterruptedException e) {
System.out.println("sleep() interrupted");
}
}
public String toString() {
return t.getName() + ":" + countDown;
}
}
}
public class InnerRunnable2 {
private int countDown = 5;
private Thread t;
public InnerRunnable2(String name) {
t = new Thread(new Runnable() {
public void run() {
try {
while(true) {
System.out.println(this);
if(--countDown == 0) {
return ;
}
TimeUnit.MILLISECONDS.sleep(10L);;
}
} catch(InterruptedException e) {
System.out.println("sleep() interrupted");
}
}
public String toString() {
return Thread.currentThread().getName() + ":" + countDown;
}
}, name);
t.start();
}
}
public class ThreadVariations {
public static void main(String [] args) {
new InnerThread1("InnerThread1");
new InnerThread2("InnerThread2");
new InnerRunnable1("InnerRunnable1");
new InnerRunnable2("InnerRunnable2");
new ThreadMethod("ThreadMethod").runTask();;
}
}
加入一个线程join()
一个线程可以在本线程中调用其它线程的join()
方法,效果是等待一段时间知道其它线程(第二个线程)结束,再继续执行下面的操作。
比如在某个线在另一线程t调用t.join()
,此线程将被挂起,直到线程t结束,此线程才恢复(t.isAlive()
返回false);另一种使用方式是在调用t.join()
时,带一个超时参数,这样如果t线程在时间到期时还没有结束,join()
方法总能返回。
对join()
方法的调用可以通过调用线程的interrupt()
方法进行中断。
使用方式如下:
public class Sleeper extends Thread {
private int duration;
public Sleeper(String name, int sleepTime) {
super(name);
duration = sleepTime;
start();
}
public void run() {
try {
sleep(duration);
} catch(InterruptedException e) {
System.out.println(getName() + " was interrupted. " + "isInterrupted():" + isInterrupted());
return ;
}
System.out.println(getName() + " has awakened");
}
}
public class Joiner extends Thread {
private Sleeper sleeper;
public Joiner(String name, Sleeper sleeper) {
super(name);
this.sleeper = sleeper;
start();
}
public void run() {
try {
sleeper.join();
} catch(InterruptedException e) {
System.out.println("Interrupted");
}
System.out.println(getName() + " join completed");
}
}
public class Joining {
public static void main(String[] args) {
Sleeper
sleepy = new Sleeper("Sleepy", 1500),
grumpy = new Sleeper("Grumpy", 1500);
Joiner
dopey = new Joiner("Dopey", sleepy),
doc = new Joiner("Doc", grumpy);
grumpy.interrupt();
}
}
捕获异常Thread.UncaughtExceptionHandler
由于线程本质特性,不能捕获从线程中逃逸的异常。一旦异常逃出任务的run()
方法,它就会向外传播到控制台。Thread.UncaughtExceptionHandler
允许在每个Thread
对象上附着一个异常处理器,该处理器会在线程因未捕获的异常而临近结束时被调用。
常用的使用方式:
public class ExceptionThread implements Runnable {
@Override
public void run() {
Thread t = Thread.currentThread();
System.out.println("run() by " + t);
System.out.println("eh = " + t.getUncaughtExceptionHandler());
throw new RuntimeException();
}
}
public class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println("caught " + e);
}
}
public class HandlerThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
System.out.println(this + " creating new Thread.");
Thread t = new Thread(r);
System.out.println("created " + t);
t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler());
System.out.println("eh = " + t.getUncaughtExceptionHandler());
return t;
}
}
public class CaptureUncaughtException {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool(new HandlerThreadFactory());
exec.execute(new ExceptionThread());
}
}
如果已经知道将要在代码中使用相同的异常处理器,更简单的方式是在Thread
类中设置一个静态域,并将这个处理器设置为默认的未捕获的异常处理器Thread.setDefaultUncaughtExceptionHandler(new MyUncaughtExceptionHandler())
使用方法如下:
public class SettingDefaultHandler {
public static void main(String[] args) {
Thread.setDefaultUncaughtExceptionHandler(new MyUncaughtExceptionHandler());
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new ExceptionThread2());
}
}
共享受限资源synchronized
和Lock
对象
Java中递增不是原子性操作
synchronized
关键字
防止多个任务同时访问相同的资源的方法是当资源被一个任务使用时,给它加锁。
基本上所有的并发解决线程冲突问题的时候,都是采用序列化访问共享资源方案,Java中提供
关键字synchronized
,以防止资源冲突提供内置支持。当任务要执行被synchronized
关键字保护的代码片段的时候,它将检查锁是否可用,然后获取锁,执行代码,释放锁。
所有对象都自动含有单一的锁,当在对象上调用其任意synchronized
方法时,此对象都被加锁,这时该对象上其他synchronizd
方法只有等到前一个方法调用完毕并释放锁之后才能被调用。
也就是对于某个具体的对象,所有的synchronized
方法共享同一个锁,可以被用来防止多个任务同时访问被synchronized
修饰的对象内存。
并发时,将域设置为private
非常重要,否则synchronized
关键字就不能防止其他任务直接访问该域,这样会产生冲突。
一个任务可以多次获取对象的锁,JVM
负责跟踪对象被加锁的次数。如果一个对象被解锁,加锁计数变为0。只有首先获得锁的任务才能允许获取多个锁。
针对每个类,也有一个锁,synchronized static
方法可以在类的范围内放置对static
数据的并发访问。
如果在类中有超过一个方法在处理临界数据,那么必须同步所有相关方法。每个访问临界共享资源的方法都必须被同步,否则可能不会正确地工作。
常用使用方法:
public abstract class IntGenerator {
private volatile boolean canceled = false;
public abstract int next();
public void cancel() {
canceled = true;
}
public boolean isCanceled() {
return canceled;
}
}
public class EvenChecker implements Runnable {
private IntGenerator generator;
private final int id;
public EvenChecker(IntGenerator generator, int ident) {
super();
this.generator = generator;
this.id = ident;
}
@Override
public void run() {
while(!generator.isCanceled()) {
int val = generator.next();
if(val % 2 != 0) {
System.out.println(val + " not even!");
generator.cancel();
}
}
}
public static void test(IntGenerator gp, int count) {
System.out.println("Press Control-C to exit");
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < count; i ++) {
exec.execute(new EvenChecker(gp, i));
}
exec.shutdown();
}
public static void test(IntGenerator gp) {
test(gp, 10);
}
}
/**
* 会被中断
*/
public class EvenGenerator extends IntGenerator {
private int currentEvenValue = 0;
@Override
public int next() {
++ currentEvenValue;
// Thread.yield();
++ currentEvenValue;
return currentEvenValue;
}
public static void main(String[] args) {
EvenChecker.test(new EvenGenerator());
}
}
/**
* 不会被中断
*/
public class SynchronizedEvenGenerator extends IntGenerator {
private int currentEvenValue = 0;
@Override
public synchronized int next() {
++ currentEvenValue;
Thread.yield();
++ currentEvenValue;
return currentEvenValue;
}
public static void main(String[] args) {
EvenChecker.test(new SynchronizedEvenGenerator());
}
}
Lock
对象
Lock
对象必须显示地创建、锁定和释放。它与synchronized
锁形式相比,虽然代码缺乏优雅,但对于解决并发问题,它更加灵活。
在使用synchronized
关键字,如果中间执行过程中失败了,那么就会抛出异常,但是没有机会处理任何清理工作。有了Lock
对象,可以使用finally
关键字进行清理工作。
正常情况下,优先使用synchronized
关键字,可以让我们的代码量更少,并且错误出现可能性也会降低,只有在解决特殊问题时,才使用显式的Lock
对象。
常用使用方法:
public class MutexEvenGenerator extends IntGenerator {
private int currentEvenValue = 0;
private Lock lock = new ReentrantLock();
@Override
public int next() {
lock.lock();
try {
++currentEvenValue;
Thread.yield();
++currentEvenValue;
return currentEvenValue;
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
EvenChecker.test(new MutexEvenGenerator());
}
}
原子性与易变性 volatile
原子性
原子操作是不能被线程调度机制中断的操作,一旦操作开始,那么它一定可以在可能发生的上下文切换之前执行完毕。
原子性可以应用于long
和double
之外的所有基本类型之上的简单操作。对于读取、写入除long
和double
之外的所有基本类型变量的操作,可以保证原子操作来操作内存。
JVM
将64位的long
和double
变量的读取和写入当做2个分离的32位操作来执行,这就可能在读取或写入时发生上下文切换,导致任务看不到正确的结果。(字撕裂可能 看到部分被修改过的数值)
可视性
在现代的多处理器系统上,可视性的问题远比原子性问题多得多。一个任务做出的修改,即使是原子性的,对于其它任务而言也可能是不可视的。因为修改可能只是暂时性的存储在本地处理器的缓存中,并没有刷新至内存中。
volatile
关键字
- 获得原子性(简单的赋值与返回操作)
- 确保可视性(将一个域声明为
volatile
,对该域的进行了写操作,所有的读操作都可以看到这个修改)
在非volatile
域上的原子操作由于不必立即刷新到主存中,因此其他读取该域的任务可能看不到这个新值。volatile
无法工作的情况:
- 一个域的值依赖与它之前的值,如递增计数器;
- 一个域收到其它域的值限制,如
Range
类的lower
和upper
使用volatile
而不是synchronized
的唯一安全情况是类中只有一个可变域。
通常优先选择synchronized
关键字。
原子类
Java SE5引入了AtomicInteger
, AtomicLong
, AtomicReference
等特殊的原子性变量类,该类提供了类似下面的原子形式的更新操作:
boolean compareAndSet(expectedValue, updateValue)
原子类的常用使用方式:
public class AtomicIntegerTest implements Runnable {
private AtomicInteger i = new AtomicInteger(0);
public int getValue() { return i.get(); }
private void evenIncrement() { i.addAndGet(2); }
@Override
public void run() {
while(true) {
evenIncrement();
}
}
public static void main(String[] args) {
// 5秒后结束
new Timer().schedule(new TimerTask() {
@Override
public void run() {
System.out.println("Aborting");
System.exit(0);
}
}, 5000);
ExecutorService exec = Executors.newCachedThreadPool();
AtomicIntegerTest ait = new AtomicIntegerTest();
exec.execute(ait);
while(true) {
int val = ait.getValue();
System.out.println(val);
if(val % 2 != 0) {
System.out.println(val);
System.exit(0);
}
}
}
}
通常依赖于锁要更安全一些(要么是synchronized
关键字或现实的Lock
对象)。
临界区
临界区,将希望方式多线程同时访问方法内部的部分代码而不防止整个方法,通过这种方式分出来的代码段称为临界区,也被称为同步控制块,进入此代码前,必须得到syncObject
的锁。
synchronized(syncObject) {
// 临界区,同一时刻,仅能允许一个任务来访问。
}
通过使用同步控制块而不是同步整个方法,可以使多个任务访问对象的时间性能得到显著提高。
下面一个实例可以将一个非保护类型的类,应用于多线程的环境:
public class Pair {
private int x, y;
public Pair(int x, int y) {
this.x = x;
this.y = y;
}
public Pair() {
this(0, 0);
}
public int getX() { return x; }
public int getY() { return y; }
public void incrementX() { x++; }
public void incrementY() { y++; }
public String toString() {
return "x: " + x + ", y: " + y;
}
public void checkState() {
if(x != y) {
throw new PairValuesNotEqualException();
}
}
@SuppressWarnings("serial")
public class PairValuesNotEqualException extends RuntimeException {
public PairValuesNotEqualException() {
super("Pair values not equal:" + Pair.this);
}
}
}
public abstract class PairManager {
AtomicInteger checkCounter = new AtomicInteger(0);
protected Pair p = new Pair();
private List<Pair> storage = Collections.synchronizedList(new ArrayList<>());
public synchronized Pair getPair() {
return new Pair(p.getX(), p.getY());
}
protected void store(Pair p) {
storage.add(p);
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
System.out.println(e.getMessage());
}
}
public abstract void increment();
}
public class PairManager1 extends PairManager {
@Override
public synchronized void increment() {
p.incrementX();
p.incrementY();
store(getPair());
}
}
public class PairManager2 extends PairManager {
@Override
public void increment() {
Pair temp;
synchronized(this) {
p.incrementX();
p.incrementY();
temp = getPair();
}
store(temp);
}
}
在其他对象上同步
public class DualSynch {
private Object syncObject = new Object();
public synchronized void f() {
for(int i = 0; i < 10; i ++) {
System.out.println("f()");
Thread.yield();
}
}
public void g() {
synchronized(syncObject) {
for(int i = 0; i < 10; i ++) {
System.out.println("g()");
Thread.yield();
}
}
}
}
public class SyncObject {
public static void main(String[] args) {
final DualSynch ds = new DualSynch();
new Thread() {
public void run() {
ds.f();
}
}.start();
ds.g();
}
}
ThreadLocal
线程本地存储
防止任务在共享资源产生冲突的第二种方式是根除对变量的共享。
线程本地存储是一种自动化机制,可以为相同的变量的每个不同线程都创建不同的存储,它可以使得你将状态与线程关联起来。ThreadLocal
对象通常当做静态域存储。
常见的使用方法:
public class Accessor implements Runnable {
private final int id;
public Accessor(int idn) {
this.id = idn;
}
@Override
public void run() {
while(!Thread.currentThread().isInterrupted()) {
ThreadLocalVariableHolder.increment();
System.out.println(this);
Thread.yield();
}
}
public String toString() {
return "#" + id + ": " + ThreadLocalVariableHolder.get();
}
}
public class ThreadLocalVariableHolder {
private static ThreadLocal<Integer> value = new ThreadLocal<Integer>() {
private Random rand = new Random(47);
@Override
protected Integer initialValue() {
return rand.nextInt(10000);
}
};
public static void increment() {
value.set(value.get() + 1);
}
public static int get() {
return value.get();
}
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < 5; i++) {
exec.execute(new Accessor(i));
}
TimeUnit.SECONDS.sleep(3);
exec.shutdownNow();
}
}
线程状态
线程可以处于四种状态:
- 新建(New):当线程被创建,会短暂处于该状态。此时它已经分配了必需的系统资源,并执行了初始化,此刻线程已经有资格获得CPU时间。
- 就绪(Runnable):该状态只要调度器把时间片分配给线程,线程就可以运行。
- 阻塞(Blocked):线程能够运行,但有某个条件阻止它的运行。当线程处于阻塞状态,调度器将忽略该线程,不会分配给线程任何CPU时间,知道该线程重新进入就绪状态。
- 死亡(Dead):处于死亡或终止状态的线程不再是可调度的。
线程进入阻塞状态原因:
- 通过调用
sleep(milliseconds)
使任务进入休眠状态。 - 通过调用
wait()
是线程挂起,直到线程得到notify()
或notifyAll()
或者等价的signal()
或signalAll()
线程才会进入就绪状态。 - 任务等待某个输入/输出完成。
- 任务试图在某个对象调用其同步控制方法,但对象锁不可以(可能另一个任务已经获取该对象锁)
线程中断
Thread
类包含interrupt()
方法,可以终止被阻塞的任务,将设置线程为中断状态。如果一个线程已被阻塞或试图执行一个阻塞操作,那么设置该线程的中断状态将抛出InterruptedException
。当抛出该异常或任务调用Thread.interrupted()
,中断状态将复位。Thread.interrupted()
提供离开run()
循环而不抛出异常的第二种方式。
建议:尽量避免对Thread
对象直接操作,尽量通过Executor
来执行操作。在Executor
上调用shutdownNow()
,那么它将发送interrupt()
调用给它启动的所有线程。
如果希望中断某一个任务,并且是使用Executor
,那么通过调用submit()
而不是executor()
来启动任务,就可以持有该任务的上下文,submit()
将返回一个Future<?>
,通过调用Future<?>
的cancel(true)
就可以中断该线程。
各自中断的实例:
public class SleepBlocked implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(100);
} catch(InterruptedException e) {
System.out.println("InterruptedException");
}
System.out.println("Exiting SleepBlocked.run()");
}
}
public class IOBlocked implements Runnable {
private InputStream in;
public IOBlocked(InputStream is) {
this.in = is;
}
@Override
public void run() {
try {
System.out.println("Waiting for read():");
in.read();
} catch (IOException e) {
if(Thread.interrupted()) {
System.out.println("Interrupted from blocked I/O");
} else {
throw new RuntimeException(e);
}
}
System.out.println("Exiting IOBlocked.run()");
}
}
public class SynchronizedBlocked implements Runnable {
public synchronized void f() {
while(true) {
Thread.yield();
}
}
public SynchronizedBlocked() {
new Thread() {
public void run() {
f();
}
}.start();
}
@Override
public void run() {
System.out.println("Trying to call f()");
f();
System.out.println("Exiting SynchronizedBlocked.run()");
}
}
public class Interrupting {
private static ExecutorService exec = Executors.newCachedThreadPool();
static void test(Runnable r) throws InterruptedException {
Future<?> f = exec.submit(r);
TimeUnit.MILLISECONDS.sleep(100);
System.out.println("Interrupting " + r.getClass().getSimpleName());
f.cancel(true);
System.out.println("Interrupted send to " + r.getClass().getSimpleName());
}
public static void main(String[] args) throws Exception {
test(new SleepBlocked());
test(new IOBlocked(System.in));
test(new SynchronizedBlocked());
TimeUnit.SECONDS.sleep(3);
System.out.println("Aboring with System.exit(0)");
System.exit(0);
}
}
I/O和synchronized
块上的等待不可中断,即不能中断试图执行I/操作和试图获取synchronized
锁的线程
对应的一个解决方案:关闭任务在其上发生阻塞的底层资源。
见代码:
public class CloseResource {
public static void main(String [] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
ServerSocket server = new ServerSocket(8080);
InputStream soketInput = new Socket("localhost", 8080).getInputStream();
exec.execute(new IOBlocked(soketInput));
exec.execute(new IOBlocked(System.in));
TimeUnit.MILLISECONDS.sleep(100);
System.out.println("Shutting down all threads.");
exec.shutdownNow();
TimeUnit.SECONDS.sleep(1);
System.out.println("Closing " + soketInput.getClass().getName());
soketInput.close();
TimeUnit.SECONDS.sleep(1);
System.out.println("Closing " + System.in.getClass().getName());
System.in.close();
}
}
线程之间的协作
使用线程来同时运行多个任务时,可以通过使用锁(互斥)来同步两个任务,从而使一个任务不会干涉另个任务的资源。互斥能确保只有一个任务可以响应某个信号,这样就可以根除任何可能的竞争条件。
wait()
和notify()
、notifyAll()
wait()
等待某个条件变化,但改变这个条件超出了当前方法的控制能力,当notify()
或notifyAll()
被调用,该任务才会被唤醒并去检查所产生的变化。
- 调用
sleep()
和yield()
时,锁并没有释放。 - 调用
wait()
的时候,线程执行被挂起,对象上的锁释放,其他的任务可以获取这个锁。
检查所感兴趣的特定条件,条件不满足的情况返回到wait()
中,惯用的方法是使用while
来编写。
实例代码:
public class Car {
private boolean waxOn = false;
public synchronized void waxed() {
waxOn = true;
notifyAll();
}
public synchronized void buffed() {
waxOn = false;
notifyAll();
}
public synchronized void waitingForWaxing() throws InterruptedException {
while(waxOn == false) {
wait();
}
}
public synchronized void waitingForBuffing() throws InterruptedException {
while(waxOn == true) {
wait();
}
}
}
public class WaxOn implements Runnable {
private Car car;
public WaxOn(Car car) {
this.car = car;
}
@Override
public void run() {
try {
while(!Thread.interrupted()) {
System.out.println("Wax on!");
TimeUnit.MILLISECONDS.sleep(100);
car.waxed();
car.waitingForBuffing();
}
} catch(InterruptedException e) {
System.out.println("WaxOn Exiting via interrupt");
}
System.out.println("Ending Wax On Task");
}
}
public class WaxOff implements Runnable {
private Car car;
public WaxOff(Car car) {
this.car = car;
}
@Override
public void run() {
try {
while(!Thread.interrupted()) {
car.waitingForWaxing();
System.out.println("Wax Off");
TimeUnit.MILLISECONDS.sleep(100);
car.buffed();
}
} catch(InterruptedException e) {
System.out.println("WaxOff Exiting via interrupt");
}
System.out.println("Ending Wax Off Task");
}
}
public class WaxOnMatic {
public static void main(String[] args) throws Exception {
Car car = new Car();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new WaxOn(car));
exec.execute(new WaxOff(car));
TimeUnit.SECONDS.sleep(5);
exec.shutdownNow();
}
}
当使用wait()
和notify()
、notifyAll()
进行协作时,可能错过某个信号,解决方案是防止在条件变量上产生竞争:
synchronized(sharedMonitor) {
while(someCondition) {
shareMonitor.wait();
}
}
使用显示的Lock
和Condition
对象
使用互斥并允许任务挂起的基本类Condition
,通过调用Condition
的await()
来挂起一个任务。当外部条件发生变化某个任务应该继续执行时,可以通过singal()
来通知并唤醒一个任务,或者调用singalAll()
来唤醒所有在这个Condition
上被其自身挂起的任务。
与使用notifyAll()
相比,signalAll()
是更安全的方式。
示例代码:
public class Car {
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private boolean waxOn = false;
public synchronized void waxed() {
lock.lock();
try {
waxOn = true;
condition.signalAll();
} finally {
lock.unlock();
}
}
public synchronized void buffed() {
lock.lock();
try {
waxOn = false;
condition.signalAll();
} finally {
lock.unlock();
}
}
public synchronized void waitingForWaxing() throws InterruptedException {
lock.lock();
try {
while(waxOn = false) {
condition.await();
}
} finally {
lock.unlock();
}
}
public synchronized void waitingForBuffing() throws InterruptedException {
lock.lock();
try {
while(waxOn == true) {
condition.await();
}
} finally {
lock.unlock();
}
}
}
public class WaxOn implements Runnable {
private Car car;
public WaxOn(Car car) {
this.car = car;
}
@Override
public void run() {
try {
while(!Thread.interrupted()) {
System.out.println("Wax on!");
TimeUnit.MILLISECONDS.sleep(200);
car.waxed();
car.waitingForBuffing();
}
} catch(InterruptedException e) {
System.out.println("WaxOn Exiting via interrupt");
}
System.out.println("Ending Wax On Task");
}
}
public class WaxOff implements Runnable {
private Car car;
public WaxOff(Car car) {
this.car = car;
}
@Override
public void run() {
try {
while(!Thread.interrupted()) {
car.waitingForWaxing();
System.out.println("Wax Off");
TimeUnit.MILLISECONDS.sleep(200);
car.buffed();
}
} catch(InterruptedException e) {
System.out.println("WaxOff Exiting via interrupt");
}
System.out.println("Ending Wax Off Task");
}
}
public class WaxOnMatic2 {
public static void main(String[] args) throws Exception {
Car car = new Car();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new WaxOff(car));
exec.execute(new WaxOn(car));
TimeUnit.SECONDS.sleep(5);
exec.shutdownNow();
}
}
生产者-消费者与队列
使用同步队列来解决任务任务之间协作的问题。同步队列在任何时刻都只允许一个任务插入或移除元素。
阻塞队列可以解决并发中的大量的问题,与wait()
/notifyAll()
相比,阻塞队列简单并可靠。BlockingQueue
阻塞队列接口提供了找个队列。
常用的阻塞队列:
LinkedBlockingQueue
:一个无届队列;ArrayBlockingQueue
:具有固定尺寸,可以在它被阻塞前放置有限数量的元素。
阻塞队列使用示例:
public class LiftOffRunner implements Runnable {
private BlockingQueue<LiftOff> rokets;
public LiftOffRunner(BlockingQueue<LiftOff> queue) {
this.rokets = queue;
}
public void add(LiftOff lo) {
try {
rokets.put(lo);
} catch (InterruptedException e) {
System.out.println("Interrupted during put()");;
}
}
@Override
public void run() {
try {
while(!Thread.interrupted()) {
LiftOff roket = rokets.take();
roket.run();
}
} catch(InterruptedException e) {
System.out.println("Walking from take()");
}
}
}
public class TestBlockingQueue {
static void getkey() {
try {
new BufferedReader(new InputStreamReader(System.in)).readLine();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
static void getkey(String message) {
System.out.println(message);
getkey();
}
static void test(String msg, BlockingQueue<LiftOff> queue) {
System.out.println(msg);
LiftOffRunner runner = new LiftOffRunner(queue);
Thread t = new Thread(runner);
t.start();
for(int i = 0; i < 5; i ++) {
runner.add(new LiftOff(5));
}
getkey("Press `Enter` (" + msg + ")");
t.interrupt();
System.out.println("Finished " + msg + " test");
}
public static void main(String[] args) {
test("LinkedBlokingQueue", new LinkedBlockingQueue<>());
test("ArrayBlockingQueue", new ArrayBlockingQueue<>(3));
test("SynchronousQueue", new SynchronousQueue<>());
}
}
阻塞队列,使得处理过程将被自动地挂起和恢复。
如下面处理示例,有三个任务:一个制作吐司,一个给吐司抹黄油,一个给抹过黄油的吐司涂果酱。任务二依赖任务一,任务三依赖任务二。
public class Toast {
public enum Status { DRY, BUTTERED, JAMMED }
private Status status = Status.DRY;
private final int id;
public Toast(int idn) { this.id = idn; }
public void butter() { status = Status.BUTTERED; }
public void jam() { status = Status.JAMMED; }
public Status getStatus() { return status; }
public int getId() { return id; }
public String toString() {
return "Toast " + id + ": " + status;
}
}
/**
* Toast 阻塞队列
* @author jyoryo
*
*/
public class ToastQueue extends LinkedBlockingQueue<Toast> {
private static final long serialVersionUID = 4998588534390592837L;
}
public class Toaster implements Runnable {
private ToastQueue toastQueue;
private int count;
private Random rand = new Random(47);
public Toaster(ToastQueue tq) { toastQueue = tq; }
@Override
public void run() {
try {
while(!Thread.interrupted()) {
TimeUnit.MILLISECONDS.sleep(100 + rand.nextInt(500));
Toast t = new Toast(count ++);
System.out.println(t);
toastQueue.put(t);
}
} catch(InterruptedException e) {
System.out.println("Toaster Interrupted");
}
System.out.println("Toaster Off");
}
}
public class Butterer implements Runnable {
private ToastQueue dryQueue, butterQueue;
public Butterer(ToastQueue dry, ToastQueue buttered) {
dryQueue = dry;
butterQueue = buttered;
}
@Override
public void run() {
try {
while(!Thread.interrupted()) {
Toast t = dryQueue.take();
t.butter();
System.out.println(t);
butterQueue.put(t);
}
} catch(InterruptedException e) {
System.out.println("Butterer Interrupted");
}
System.out.println("Butterer Off");
}
}
public class Jammer implements Runnable {
private ToastQueue butteredQueue, finishedQueue;
public Jammer(ToastQueue buttered, ToastQueue finished) {
this.butteredQueue = buttered;
this.finishedQueue = finished;
}
@Override
public void run() {
try {
while(!Thread.interrupted()) {
Toast t = butteredQueue.take();
t.jam();
System.out.println(t);
finishedQueue.put(t);
}
} catch(InterruptedException e) {
System.out.println("Jammer Interrupted");
}
System.out.println("Jammer Off");
}
}
public class Eater implements Runnable {
private ToastQueue finishedQueue;
private int counter = 0;
public Eater(ToastQueue finished) {
this.finishedQueue = finished;
}
@Override
public void run() {
try {
while(!Thread.interrupted()) {
Toast t = finishedQueue.take();
if(t.getId() != counter++ || t.getStatus() != Toast.Status.JAMMED) {
System.out.println(">>> Error: " + t);
System.exit(1);
} else {
System.out.println("Chomp!" + t);
}
}
} catch(InterruptedException e) {
System.out.println("Eater Interrupted");
}
System.out.println("Eater Off");
}
}
public class ToastMatic {
public static void main(String[] args) throws Exception {
ToastQueue dryQueue = new ToastQueue(),
butteredQueue = new ToastQueue(),
finishedQueue = new ToastQueue();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new Toaster(dryQueue));
exec.execute(new Butterer(dryQueue, butteredQueue));
exec.execute(new Jammer(butteredQueue, finishedQueue));
exec.execute(new Eater(finishedQueue));
TimeUnit.SECONDS.sleep(10);
exec.shutdownNow();
}
}
任务间使用管道输入/输出
PipedWriter
:允许任务向输入/输出管道写;PipedReader
:允许不同任务从同一个输入/输出管道读取。
管道基本上是一个阻塞队列。PipedReader
与普通I/O之间最重要的差异是:PipedReader
是可中断的。
管道基本使用示例如下:
public class Sender implements Runnable {
private Random rand = new Random(47);
private PipedWriter out = new PipedWriter();
public PipedWriter getPipedWriter() { return out; }
@Override
public void run() {
try {
while(true) {
for(char c = 'A'; c <= 'z'; c++) {
out.write(c);
TimeUnit.MILLISECONDS.sleep(rand.nextInt(500));
}
}
} catch(IOException e) {
System.out.println("Sender write exception " + e);
} catch(InterruptedException e) {
System.out.println("Sender sleep interrupted " + e);
}
}
}
public class Receiver implements Runnable {
private PipedReader in;
public Receiver(Sender sender) throws IOException {
in = new PipedReader(sender.getPipedWriter());
}
@Override
public void run() {
try {
while(true) {
System.out.println("Read: " + (char)in.read());
}
} catch(IOException e) {
}
}
}
public class PipedIO {
public static void main(String [] args) throws Exception {
Sender sender = new Sender();
Receiver receiver = new Receiver(sender);
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(sender);
exec.execute(receiver);
TimeUnit.SECONDS.sleep(15);
exec.shutdownNow();
}
}
死锁
死锁:任务之间相互等待的连续循环,没有哪个线程能继续。
当以下四个条件同时满足,就会发生死锁:
- 互斥条件。任务使用的资源中至少有一个是不能共享的。
- 至少有一个任务它必须持有一个资源且正在等待获取一个当前被别的任务持有的资源。
- 资源不能被任务抢占,任务必须把资源释放当做普通事件。
- 必须有循环等待。
要发生死锁,这四个条件必须全部满足,所以要防止死锁的话,只需要破坏其中一个即可。防止死锁最容易的方法是破坏第四个条件
下面的实例代码演示了哲学家就餐问题:
public class Chopstick {
private boolean taken = false;
public synchronized void take() throws InterruptedException {
while(taken) {
wait();
}
taken = true;
}
public synchronized void drop() {
taken = false;
notifyAll();
}
}
public class Philosopher implements Runnable {
private Chopstick left;
private Chopstick right;
private final int id;
private final int ponderFactor;
private Random rand = new Random(47);
public Philosopher (Chopstick left, Chopstick right, int ident, int ponder) {
this.left = left;
this.right = right;
this.id = ident;
this.ponderFactor = ponder;
}
private void pause() throws InterruptedException {
if(0 == ponderFactor) {
return ;
}
TimeUnit.MILLISECONDS.sleep(rand.nextInt(ponderFactor * 250));
}
@Override
public void run() {
try {
while(!Thread.interrupted()) {
System.out.println(this + " thinking");
pause();
System.out.println(this + " grabbing right");
right.take();
System.out.println(this + " grabbing left");
left.take();
System.out.println(this + " eating");
pause();
right.drop();
left.drop();
}
} catch(InterruptedException e) {
System.out.println(this + " exiting via interrupt");
}
}
public String toString() {
return "Philosopher " + id;
}
}
public class DeadlockingDiningPhilosopher {
public static void main(String [] args) throws Exception {
if(null != args && args.length > 0) {
System.out.println(Arrays.toString(args));
}
int ponder = 5;
if(args.length > 0) {
ponder = Integer.parseInt(args[0]);
}
int size = 5;
if(args.length > 1) {
size = Integer.parseInt(args[1]);
}
ExecutorService exec = Executors.newCachedThreadPool();
Chopstick[] sticks = new Chopstick[size];
for(int i = 0; i < size; i++) {
sticks[i] = new Chopstick();
}
for(int i = 0; i < size; i ++) {
exec.execute(new Philosopher(sticks[i], sticks[(i + 1) % size], i, ponder));
}
if(args.length == 3 && args[2].equals("timeout")) {
TimeUnit.SECONDS.sleep(5);
} else {
System.out.println("Press 'Enter' to quit");
System.in.read();
}
exec.shutdown();
}
}
解决并发问题的构件
为了解决并发中的各种问题,java
提供了大量工具类。通过这些类可以帮助我们编写出简单而健壮的并发程序。
CountDownLatch
类
CountDownLatch
:用来同步一个或多个任务,强制它们等待由其他任务执行的一组操作完成。
可以给CountDownLatch
对象设置一个初始计数器,任何在这个对象上调用wai()
方法都将阻塞,知道这个计数器数值为0。其他任务结束工作时,可以在该对象上调用countDown()
来减少这个计数器数值。CountDownLatch
被设计为只触发一次,计数值不能被重置。如果需要能够重置计数数值的情况,可以使用CyclicBarrier
。
典型用法:将一个程序分为n个相互独立的可解决问题,并创建值为n的CountDownLatch
锁存器。每当任务完成时,都会在这个锁存器上调用countDown()
。等待问题被解决的任务在这个锁存器上调用await()
,将自己挂起,直至锁存器计数为0。
示例代码如下:
public class TaskPortion implements Runnable {
private static int counter = 0;
private final int id = counter++;
private static Random rand = new Random(47);
private final CountDownLatch latch;
TaskPortion(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
doWork();
latch.countDown();
} catch(InterruptedException e) {
System.out.println(this + " exiting via interrupt");
}
}
public void doWork() throws InterruptedException {
System.out.println(this + " working");
TimeUnit.MILLISECONDS.sleep(rand.nextInt(2000));
}
public String toString() {
return String.format("%1$-3d ", id);
}
}
public class WaitingTask implements Runnable {
private static int counter = 0;
private final int id = counter ++;
private final CountDownLatch latch;
WaitingTask(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
latch.await();
System.out.println("Latch barrier passed for " + this);
} catch(InterruptedException e) {
System.out.println(this + " exiting via interrupt");
}
}
public String toString() {
return String.format("WaitingTask %1$-3d ", id);
}
}
public class CountDownLatchDemo {
static final int SIZE = 100;
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
CountDownLatch latch = new CountDownLatch(SIZE);
for(int i = 0; i < 10; i++) {
exec.execute(new WaitingTask(latch));
}
for(int i = 0; i < SIZE; i++) {
exec.execute(new TaskPortion(latch));
}
System.out.println("Launched all tasks");
exec.shutdown();
}
}
CyclicBarrier
类
CyclicBarrier
:适用于希望创建一组任务,它们并行地执行工作,然后在下一个步骤之前等待,
直至该组所有任务都完成。它使得所有的并行任务都在栅栏处列队,一致地向前移动。
下面是一个示例:
public class Horse implements Runnable {
private static int counter = 0;
private final int id = counter ++;
private int strides = 0;
private static Random rand = new Random(47);
private static CyclicBarrier barrier;
public Horse(CyclicBarrier b) { barrier = b; }
public synchronized int getStrides() { return strides; }
@Override
public void run() {
try {
while(!Thread.interrupted()) {
synchronized(this) {
strides += rand.nextInt(3);
}
barrier.await();
}
} catch(InterruptedException e) {
System.out.println(this + " exiting via Interrupted");
} catch (BrokenBarrierException e) {
System.out.println(this + " exiting via BrokenBarrier");
}
}
public String toString() {
return "Horse " + id;
}
public String tracks() {
StringBuilder s = new StringBuilder();
for(int i = 0; i < getStrides(); i ++) {
s.append("*");
}
s.append(id);
return s.toString();
}
}
public class HorseRace {
static final int FINISH_LINE = 75;
private List<Horse> horses = new ArrayList<>();
private ExecutorService exec = Executors.newCachedThreadPool();
private CyclicBarrier barrier;
public HorseRace(int nHorses, final int pause) {
barrier = new CyclicBarrier(nHorses, new Runnable() {
public void run() {
StringBuilder s = new StringBuilder();
for(int i = 0; i < FINISH_LINE; i ++) {
s.append("=");
}
System.out.println(s.toString());
for(Horse horse : horses) {
System.out.println(horse.tracks());
}
for(Horse horse : horses) {
if(horse.getStrides() >= FINISH_LINE) {
System.out.println(horse + "won.");
exec.shutdownNow();
return ;
}
}
try {
TimeUnit.MILLISECONDS.sleep(pause);
} catch(InterruptedException e) {
System.err.println("barrier-action sleep interrupted");
}
}
});
for(int i = 0; i < nHorses; i ++) {
Horse horse = new Horse(barrier);
horses.add(horse);
exec.execute(horse);
}
}
public static void main(String[] args) {
int nHorse = 7;
int pause = 200;
if(args.length > 0) {
int n = Integer.parseInt(args[0]);
nHorse = (n > 0) ? n : nHorse;
}
if(args.length > 1) {
int p = Integer.parseInt(args[1]);
pause = (p > -1) ? p : pause;
}
new HorseRace(nHorse, pause);
}
}
PriorityBlockingQueue
类
PriorityBlockingQueue
:具有可阻塞读取的优先级队列。
优先级队列中的对象按照优先级顺序从队列中出现的任务。
下面是一个示例代码:
public class PrioritizedTask implements Runnable, Comparable<PrioritizedTask> {
private Random rand = new Random(47);
private static int counter = 0;
private final int id = counter++;
private final int priority;
protected static List<PrioritizedTask> sequence = new ArrayList<>();
public PrioritizedTask(int priority) {
this.priority = priority;
sequence.add(this);
}
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(rand.nextInt(250));
} catch(InterruptedException e) {
System.out.println("Exiting via interrupted");
}
System.out.println(this);
}
@Override
public int compareTo(PrioritizedTask arg) {
return (priority < arg.priority) ? 1 : (priority > arg.priority ? -1 : 0);
}
public String toString() {
return String.format("[%1$-3d]", priority) + " Task " + id;
}
public String summary() {
return "(" + id + ":" + priority + ")";
}
public static class EndSentinel extends PrioritizedTask {
private ExecutorService exec;
public EndSentinel(ExecutorService e) {
super(-1);
this.exec = e;
}
public void run() {
int count = 0;
for(PrioritizedTask pt : sequence) {
System.out.println(pt.summary());
if(++count % 5 == 0) {
System.out.println("\n");
}
}
System.out.println("\n");
System.out.println(this + " Calling shutdownNow()");
exec.shutdownNow();
}
}
}
public class PrioritizedTaskProducer implements Runnable {
private Random rand = new Random(47);
private Queue<Runnable> queue;
private ExecutorService exec;
public PrioritizedTaskProducer(Queue<Runnable> q, ExecutorService e) {
queue = q;
exec = e;
}
@Override
public void run() {
for(int i = 0; i < 20; i ++) {
queue.add(new PrioritizedTask(rand.nextInt(10)));
Thread.yield();
}
try {
for(int i = 0; i < 10; i ++) {
TimeUnit.MILLISECONDS.sleep(250);
queue.add(new PrioritizedTask(10));
}
for(int i = 0; i < 10; i++) {
queue.add(new PrioritizedTask(i));
}
queue.add(new PrioritizedTask.EndSentinel(exec));
} catch(InterruptedException e) {
System.out.println("PrioritizedTaskProducer Exiting InterruptedException");
}
System.out.println("Finished PrioritizedTaskProducer");
}
}
public class PrioritizedTaskConsumer implements Runnable {
private PriorityBlockingQueue<Runnable> q;
public PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable> q) {
this.q = q;
}
@Override
public void run() {
try {
while(!Thread.interrupted()) {
q.take().run();
}
} catch(InterruptedException e) {
System.out.println("PrioritizedTaskConsumer Exiting via InterruptedException");
}
System.out.println("Finished PrioritizedTaskConsumer");
}
}
public class PriorityBlockingQueueDemo {
public static void main(String[] args) {
Random rand = new Random(47);
ExecutorService exec = Executors.newCachedThreadPool();
PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<>();
exec.execute(new PrioritizedTaskProducer(queue, exec));
exec.execute(new PrioritizedTaskConsumer(queue));
}
}
ScheduledThreadPoolExecutor
类
ScheduledThreadPoolExecutor
:可以解决如果各个任务都是在一个预定时间进行运行的并发问题。schedule()
:运行一次当任务;scheduleAtFixedRate()
:每隔规则的时间重复执行任务。
下面是一个温控室控制系统的示例代码:
public class GreenhouseScheduler {
private volatile boolean light = false;
private volatile boolean water = false;
private String thermostat = "Day";
public synchronized void setThermostat(String value) {
this.thermostat = value;
}
ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(10);
public void schedule(Runnable event, long delay) {
scheduler.schedule(event, delay, TimeUnit.MILLISECONDS);
}
public void repeat(Runnable event, long initialDelay, long period) {
scheduler.scheduleAtFixedRate(event, initialDelay, period, TimeUnit.MILLISECONDS);
}
class LightOn implements Runnable {
@Override
public void run() {
System.out.println("Turning on lights");
light = true;
}
}
class LightOff implements Runnable {
@Override
public void run() {
System.out.println("Turning off lights");
light = false;
}
}
class WaterOn implements Runnable {
@Override
public void run() {
System.out.println("Turning greenhouse water on");
water = true;
}
}
class WaterOff implements Runnable {
@Override
public void run() {
System.out.println("Turning greenhouse water off");
water = false;
}
}
class ThermostatNight implements Runnable {
@Override
public void run() {
System.out.println("Thermostat to night setting");
setThermostat("Night");
}
}
class ThermostatDay implements Runnable {
@Override
public void run() {
System.out.println("Thermostat to day setting");
setThermostat("Day");
}
}
public class Bell implements Runnable {
@Override
public void run() {
System.out.println("Bing!");
}
}
public class Terminate implements Runnable {
@Override
public void run() {
System.out.println("Terminating");
scheduler.shutdownNow();
new Thread() {
public void run() {
for(DataPoint d : data) {
System.out.println(d);
}
}
}.start();
}
}
static class DataPoint {
final Calendar time;
final float temperature;
final float humidity;
public DataPoint(Calendar d, float temp, float hum) {
this.time = d;
this.temperature = temp;
this.humidity = hum;
}
public String toString() {
return time.getTime() + String.format(" temperature:%1$.1f humidity:%2$.2f", temperature, humidity);
}
}
private Calendar lastTime = Calendar.getInstance();
{
lastTime.set(Calendar.MINUTE, 30);
lastTime.set(Calendar.SECOND, 00);
}
private float lastTemp = 65.0f;
private int tempDirection = +1;
private float lastHumidity = 50.0f;
private int humidityDirection = +1;
private Random rand = new Random(47);
List<DataPoint> data = Collections.synchronizedList(new ArrayList<>());
class CollectData implements Runnable {
@Override
public void run() {
System.out.println("Collecting data");
synchronized(GreenhouseScheduler.this) {
lastTime.set(Calendar.MINUTE, lastTime.get(Calendar.MINUTE) + 30);
if(rand.nextInt(5) == 4) {
tempDirection = -tempDirection;
}
lastTemp = lastTemp + tempDirection * (1.0f + rand.nextFloat());
if(rand.nextInt(5) == 4) {
humidityDirection = -humidityDirection;
}
lastHumidity = lastHumidity + humidityDirection * rand.nextFloat();
data.add(new DataPoint((Calendar)lastTime.clone(), lastTemp, lastHumidity));
}
}
}
public static void main(String [] args) {
GreenhouseScheduler gh = new GreenhouseScheduler();
gh.schedule(gh.new Terminate(), 5000);
gh.repeat(gh.new Bell(), 0, 1000);
gh.repeat(gh.new ThermostatNight(), 0, 2000);
gh.repeat(gh.new LightOn(), 0, 200);
gh.repeat(gh.new LightOff(), 0, 400);
gh.repeat(gh.new WaterOn(), 0, 600);
gh.repeat(gh.new WaterOff(), 0, 800);
gh.repeat(gh.new ThermostatDay(), 0, 1400);
gh.repeat(gh.new CollectData(), 500, 500);
}
}
Semaphore
类
正常的锁在任何时刻都只允许一个任务访问一项资源,而计数信号量允许n个任务同时访问这个资源。
下面一个实例使用Semaphore
类,实现对象池的概念,它管理数量有限的对象,当要使用对象时可以进行签出它们,而在使用完毕时,可以将它们签回。
代码如下:
public class Pool<T> {
private int size;
private List<T> items = new ArrayList<>();
private volatile boolean[] checkedOut;
private Semaphore available;
public Pool(Class<T> classObject, int size) {
this.size = size;
checkedOut = new boolean[size];
available = new Semaphore(size, true);
for(int i = 0; i < size; ++ i) {
try {
items.add(classObject.newInstance());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
public T checkOut() throws InterruptedException {
available.acquire();
return getItem();
}
public void checkIn(T x) {
if(releaseItem(x)) {
available.release();
}
}
private synchronized T getItem() {
for(int i = 0; i < size; i ++) {
if(!checkedOut[i]) {
checkedOut[i] = true;
return items.get(i);
}
}
return null;
}
private synchronized boolean releaseItem(T item) {
int index = items.indexOf(item);
if(index == -1) { return false; }
if(checkedOut[index]) {
checkedOut[index] = false;
return true;
}
return false;
}
}
public class Fat {
private volatile double d;
private static int counter = 0;
private final int id = counter++;
public Fat() {
for(int i = 0; i < 1000000; i ++) {
d += (Math.PI + Math.E) / (double)i;
}
}
public void operation() {
System.out.println(this);
}
public String toString() {
return "Fat id: " + id;
}
}
public class CheckoutTask<T> implements Runnable {
private static int counter;
private final int id = counter++;
private Pool<T> pool;
public CheckoutTask(Pool<T> pool) {
this.pool = pool;
}
@Override
public void run() {
try {
T item = pool.checkOut();
System.out.println(this + "check out " + item);
TimeUnit.SECONDS.sleep(1);
System.out.println(this + "check in " + item);
pool.checkIn(item);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public String toString() {
return "CheckoutTask " + id + " ";
}
}
Exchanger
类
Exchanger
:是在两个任务之间交换对象的栅栏。当这些任务进入栅栏时,它们各自拥有一个对象,当它们离开时,它们都拥有之前由对方对象持有的对象。
使用使用场景:一个任务在创建对象(这些对象的生产代价很高昂),而另一个任务在消费这些对象。通过该方式,可以有更多的对象在被创建的同时被消费。
以下是一个示例代码:
public class ExchangerProduce<T> implements Runnable {
private Generator<T> generator;
private Exchanger<List<T>> exchanger;
private List<T> holder;
public ExchangerProduce(Exchanger<List<T>> exchg, Generator<T> gen, List<T> holder) {
this.exchanger = exchg;
this.generator = gen;
this.holder = holder;
}
@Override
public void run() {
try {
while(!Thread.interrupted()) {
for(int i = 0; i < ExchangerDemo.size; i++) {
holder.add(generator.next());
}
System.out.println(holder + " ___ Produce Exchange Before");
holder = exchanger.exchange(holder);
System.out.println(holder + " ___ Produce Exchange After");
}
} catch(InterruptedException e) {
}
}
}
public class ExchangerConsumer<T> implements Runnable {
private Exchanger<List<T>> exchanger;
private List<T> holder;
private volatile T value;
ExchangerConsumer(Exchanger<List<T>> ex, List<T> holder) {
this.exchanger = ex;
this.holder = holder;
}
@Override
public void run() {
try {
while(!Thread.interrupted()) {
holder = exchanger.exchange(holder);
System.out.println(holder + " ___ Consumer After");
for(T x : holder) {
value = x;
holder.remove(x);
}
}
} catch(InterruptedException e) {
}
System.out.println("Final value: " + value);
}
}
public class ExchangerDemo {
static int size = 10;
static int delay = 5;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
Exchanger<List<Fat>> xc = new Exchanger<>();
List<Fat>
producerList = new CopyOnWriteArrayList<>(),
consumerList = new CopyOnWriteArrayList<>();
exec.execute(new ExchangerProduce<>(xc, BasicGenerator.create(Fat.class), producerList));
exec.execute(new ExchangerConsumer<>(xc, consumerList));
TimeUnit.SECONDS.sleep(delay);
exec.shutdownNow();
}
}
性能调优
互斥技术
Java中的互斥技术有:
synchronized
关键字Lock
对象Atomic
原子类
synchronized
关键字:代码被阅读的次数远多于被编写的次数,因此代码的可读性至关重要。所以以synchronized
关键字入手,只有在需要性能调优时才替换为Atomic
原子类或Lock
对象。
Lock
对象:使用Lock
通常会比使用synchronized
要高效许多。
Atomic
原子类:原子类对象只有在非常简单的情况下才有用,这些情况通常包括只有一个要被修改的Atomic
对象,并且该对象独立于其他所有的对象。如果涉及多个Atomic
对象,请放弃使用原子类这种用法,使用更加常规的互斥技术。
免锁容器
免锁容器背后的通用策略是:对容器的修改可以与读取操作同时发生,只要在读取者只能看到完成修改的结果即可。
修改是在容器数据结构的某个部分的一个单独副本上执行的,并且这个副本在修改过程中是不可视的,只有在完成修改时,被修改的结构才会与主数据进行交换,之后读取者就可以看到这个修改。
常用的免锁容器有:
CopyOnWriteArrayList
类:写入将导致创建整个底层数组的副本,而原数组将保留原地不动,这样在复制的副本修改时,读取操作可以安全地执行。当完成修改时,一个原子操作将把新的数组换入。这样新的读取操作就可以读取到新的修改。CopyOnWriteArraySet
类:底层使用CopyOnWriteArrayList
实现免锁行为。ConcurrentHashMap
类ConcurrentLinkedQueue
类
乐观锁
只要从免锁容器中读取,那么它就会比其从synchronized
关键字对应对象快很多,因为获取和释放锁的开销被省掉了。
乐观加锁
Atomic
类除了有decrementAndGet()
这样的原子操作,但也支持乐观加锁compareAndSet()
。
compareAndSet()
:将旧值和新值一起提交,如果旧值与它在Atomic
对象的值不一致,那么这个操作就失败(可能某个任务已经在此操作执行期间修改了对象的值)。compareAndSet()
使用示例代码:
public class FastSimulation {
static final int N_ELEMENTS = 100000;
static final int N_GENES = 30;
static final int N_EVOLVERS = 50;
static final AtomicInteger[][] GRID = new AtomicInteger[N_ELEMENTS][N_GENES];
static Random rand = new Random(47);
static class Evolver implements Runnable {
@Override
public void run() {
while(!Thread.interrupted()) {
int element = rand.nextInt(N_ELEMENTS);
for(int i = 0; i < N_GENES; i++) {
int previous = element -1;
if(previous < 0) previous = N_ELEMENTS -1;
int next = element + 1;
if(next >= N_ELEMENTS) next = 0;
int oldvalue = GRID[element][i].get();
int newvalue = oldvalue + GRID[previous][i].get() + GRID[next][i].get();
newvalue /= 3;
if(!GRID[element][i].compareAndSet(oldvalue, newvalue)) {
System.out.printf("Old value change from %d Current:%d\n", oldvalue, GRID[element][i].get());
}
}
}
}
}
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < N_ELEMENTS; i++) {
for(int j = 0; j < N_GENES; j++) {
GRID[i][j] = new AtomicInteger(rand.nextInt(1000));
}
}
for(int i = 0; i < N_EVOLVERS; i++) {
exec.execute(new Evolver());
}
TimeUnit.SECONDS.sleep(10);
exec.shutdownNow();
}
}
ReadWriteLock
类
ReadWriteLock
对向数据结构相对不频繁地写入,但是有多个任务要经常读取这个数据结构的这类情况进行了优化。可以同时有多个读取者,只要它们都不试图写入,但是如果写锁已经被其它任务持有,那么任何读取者都不能访问,直到写锁被释放。ReadWriteLock
基本用法:
public class ReaderWriterList<T> {
private ArrayList<T> lockedList;
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
public ReaderWriterList(int size, T initialValue) {
lockedList = new ArrayList<>(Collections.nCopies(size, initialValue));
}
public T set(int index, T element) {
Lock wlock = lock.writeLock();
wlock.lock();
try {
return lockedList.set(index, element);
} finally {
wlock.unlock();
}
}
public T get(int index) {
Lock rlock = lock.readLock();
rlock.lock();
try {
if(lock.getReadLockCount() > 1) {
System.out.println(lock.getReadLockCount());
}
return lockedList.get(index);
} finally {
rlock.unlock();
}
}
public static void main(String[] args) {
new ReaderWriterListTest(30, 1);
}
}
public class ReaderWriterListTest {
ExecutorService exec = Executors.newCachedThreadPool();
private final static int SIZE = 100;
private static Random rand = new Random(47);
private ReaderWriterList<Integer> list = new ReaderWriterList<Integer>(SIZE, 0);
private class Writer implements Runnable {
@Override
public void run() {
try {
for(int i = 0; i < 20; i ++) {
list.set(i, rand.nextInt());
TimeUnit.MILLISECONDS.sleep(100);
}
} catch(InterruptedException e) {
}
System.out.println("Writer finished, shutting down.");
exec.shutdownNow();
}
}
private class Reader implements Runnable {
@Override
public void run() {
try {
while(!Thread.interrupted()) {
for(int i = 0; i < SIZE; i++) {
list.get(i);
TimeUnit.MILLISECONDS.sleep(1);
}
}
} catch(InterruptedException e) {
}
}
}
public ReaderWriterListTest(int readers, int writers) {
for(int i = 0; i <readers; i++) {
exec.execute(new Reader());
}
for(int i = 0; i < writers; i++) {
exec.execute(new Writer());
}
}
}
活动对象
活动对象或行动者:每个对象都维护着自己的工作器线程和消息队列,并且所有对这种对象的请求都将进入队列排队,任何时刻都只能运行其中的一个。
通过活动对象,我们就可以串行化消息而不是方法。
- 每个对象都可以拥有自己的 工作器线程。
- 每个对象都将维护对它自己的域的全部控制权。
- 所有活动对象之间的通信都将以在这些对象之间的消息形式发送。
- 活动对象之间的所有消息都要排队。
下面是一个示例:
public class ActiveObjectDemo {
private ExecutorService exec = Executors.newSingleThreadExecutor();
private Random rand = new Random(47);
private void pause(int factor) {
try {
TimeUnit.MILLISECONDS.sleep(factor);
} catch (InterruptedException e) {
System.out.println("sleep() interrupted");;
}
}
public Future<Integer> calculateInt(final int x, final int y) {
return exec.submit(new Callable<Integer>() {
@Override
public Integer call() {
System.out.println("starting " + x + " + " + y);
pause(500);
return x + y;
}
});
}
public Future<Float> calculateFloat(final float x, final float y) {
return exec.submit(new Callable<Float>() {
@Override
public Float call() {
System.out.println("starting " + x + " + " + y);
pause(2000);
return x + y;
}
});
}
public void shutdown() { exec.shutdown(); }
public static void main(String[] args) {
ActiveObjectDemo d1 = new ActiveObjectDemo();
List<Future<?>> results = new CopyOnWriteArrayList<>();
for(float f = 0.0f; f < 1.0f; f += 0.2f) {
results.add(d1.calculateFloat(f, f));
}
for(int i = 0; i < 5; i++) {
results.add(d1.calculateInt(i, i));
}
System.out.println("All asynch calls made");
while(results.size() > 0) {
for(Future<?> f : results) {
if(f.isDone()) {
try {
f.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
results.remove(f);
}
}
}
d1.shutdown();
}
}
参考文章书籍
- 《Java编程思想 第4版》