Java 之 JUC 1. JUC 简介 在 Java 5.0 提供了 java.util.concurrent(简称JUC)包,在此包中增加了在并发编程中很常用的工具类,用于定义类似于线程的自定义子系统,包括线程池,异步 IO 和轻量级任务框架;还提供了设计用于多线程上下文中的 Collection 实现等;
2. volatile 关键字
volatile 关键字: 当多个线程进行操作共享数据时,可以保证内存中的数据是可见的;相较于 synchronized 是一种 较为轻量级的同步策略;
volatile 不具备”互斥性”;
volatile 不能保证变量的”原子性”;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class TestVolatile { public static void main (String[] args) { ThreadDemo td = new ThreadDemo (); new Thread (td).start(); while (true ){ if (td.isFlag()){ System.out.println("########" ); break ; } } } } class ThreadDemo implements Runnable { private boolean flag = false ; public void run () { try { Thread.sleep(200 ); }catch (InterruptedException e){ e.printStackTrace(); } flag = true ; Sytem.out.println("flag=" +isFlag()); } public boolean isFlag () { return flag; } public void setFlag (boolean flag) { this .flag = flag; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public class TestVolatile { public static void main (String[] args) { ThreadDemo td = new ThreadDemo (); new Thread (td).start(); while (true ){ synchronized (td){ if (td.isFlag()){ System.out.println("########" ); break ; } } } } } public class TestVolatile { public static void main (String[] args) { ThreadDemo td = new ThreadDemo (); new Thread (td).start(); while (true ){ if (td.isFlag()){ System.out.println("########" ); break ; } } } } class ThreadDemo implements Runnable { private volatile boolean flag = false ; 同上(略) }
3. i++ 的原子性问题 i++的操作实际上分为三个步骤: “读-改-写”;
原子性: 就是”i++”的”读-改-写”是不可分割的三个步骤;
原子变量: JDK1.5 以后, java.util.concurrent.atomic包下,提供了常用的原子变量;
原子变量中的值,使用 volatile 修饰,保证了内存可见性; CAS(Compare-And-Swap) 算法保证数据的原子性;
执行步骤: int temp = i; i = i + 1; i = temp;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public class TestAtomicDemo { public static void main (String[] args) { AtomicDemo ad = new AtomicDemo (); for (int i=0 ; i < 10 ; i++){ new Thread (ad).start(); } } } class AtomicDemo implements Runnable { private int serialNumber = 0 ; public void run () { try { Thread.sleep(200 ); }catch (InterruptedException e){ } System.out.println(Thread.currentThread().getName() + ":" + getSerialNumber()); } public int getSerialNumber () { return serialNumber++; } } class AtomicDemo implements Runnable { private AtomicInteger serialNumber = new AtomicInteger (); public void run () { try { Thread.sleep(200 ); }catch (InterruptedException e){ } System.out.println(Thread.currentThread().getName()+":" +getSerialNumber()); } public int getSerialNumber () { return serialNumber.getAndIncrement(); } }
3.1 CAS 算法 CAS(Compare-And-Swap) 算法是硬件对于并发的支持,针对多处理器操作而设计的处理器中的一种特殊指令,用于 管理对共享数据的并发访问;
CAS 是一种无锁的非阻塞算法的实现;
CAS 包含了三个操作数:
需要读写的内存值: V
进行比较的预估值: A
拟写入的更新值: B 当且仅当 V == A 时, V = B, 否则,将不做任何操作;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 class CompareAndSwap { private int value; public synchronized int get () { return value; } public synchronized int compareAndSwap (int expectedValue, int newValue) { int oldValue = value; if (oldValue == expectedValue){ this .value = newValue; } return oldValue; } public synchronized boolean compareAndSet (int expectedValue, int newValue) { return expectedValue == compareAndSwap(expectedValue, newValue); } } public class TestCompareAndSwap { public static void main (String[] args) { final CopareAndSwap cas = new CompareAndSwap (); for (int i=0 ; i<10 ; i++){ new Thead (new Runnable (){ public void run () { int expectedValue = cas.get(); boolean b = cas.compareAndSet(expectedValue, (int )(Math.random()*100 )); System.out.println(b); } }).start(); } } }
4. 并发容器类 Java 5.0 在 java.util.concurrent 包中提供了多种并发容器类来改进同步容器的性能;
4.1 ConcurrentHashMap ConcurrentHashMap 同步容器类是 Java5 增加的一个线程安全的哈希表;介于 HashMap 与 Hashtable 之间; 内部采用”锁分段”机制替代Hashtable的独占锁,进而提高性能; 此包还提供了设计用于多线程上下文中的Collection实现: ConcurrentHashMap,ConcurrentSkipListMap ConcurrentSkipListSet, CopyOnWriteArrayList 和 CopyOnWriteArraySet; 当期望许多线程访问一个给定collection时,ConcurrentHashMap通常优于同步的HashMap; ConcurrentSkipListMap通常优于同步的TreeMap; 当期望的读数和遍历远远大于列表的更新数时, CopyOnWriteArrayList优于同步的ArrayList;
4.2 CountDownLatch(闭锁) CountDownLatch是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public class TestCountDownLatch { public static void main (String[] args) { final CountDownLatch latch = new CountDownLatch (10 ); LatchDemo ld = new LatchDemo (latch); long start = System.currentTimeMillis(); for (int i=0 ; i<10 ; i++){ new Thread (ld).start(); } try { latch.await(); }catch (InterruptedException e){ } long end = System.currentTimeMillis(); System.out.println("耗费时间为:" +(end - start)); } } class LatchDemo implements Runnable { private CountDownLatch latch; public LatchDemo (CountDownLatch latch) { this .latch = latch; } public void run () { synchronized (this ){ try { for (int i=0 ; i<50000 ; i++){ if (i % 2 == 0 ){ System.out.println(i); } } }finally { latch.countDown(); } } } }
5. 创建执行线程的方式三 相较于实现 Runnable 接口的方式,实现 Callable 接口类中的方法可以有返回值,并且可以抛出异常;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public class TestCallable { public static void main (String[] args) { ThreadDemo td = new ThreadDemo (); FutureTask<Integer> result = new FutureTask <>(td); new Thread (result).start(); try { Integer sum = result.get(); System.out.println(sum); }catch (InterruptedException | ExecutionException e){ e.printStackTrace(); } } } class ThreadDemo implements Callable <Integer>{ public Integer call () throws Exception{ int sum = 0 ; for (int i=0 ; i<=100 ; i++){ sum += i; } return sum; } }
6. 同步锁(Lock) 参考 “java 多线程间通信”
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 public class TestLock { public static void main (String[] args) { Ticket ticket = new Ticket (); new Thread (ticket,"1号窗口" ).start(); new Thread (ticket,"2号窗口" ).start(); new Thread (ticket,"3号窗口" ).start(); } } class Ticket implements Runnable { private int tick = 100 ; public void run () { while (true ){ if (tick > 0 ){ try { Thread.sleep(200 ); }catch (InterruptedException e){ } System.out.println(Thread.currentThread().getName()+"完成售票,余票为: " + --tick); } } } } class Ticket implements Runnable { private int tick = 100 ; private Lock lock = new ReentrantLock (); public void run () { while (true ){ lock.lock(); try { if (tick > 0 ){ try { Thread.sleep(200 ); }catch (InterruptedException e){ } System.out.println(Thread.currentThread().getName()+"完成售票,余票为: " + --tick); } }finally { lock.unlock(); } } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 public class TestABCAlternate { public static void main (String[] args) { AlternateDemo ad = new AlternateDemo (); new Thread (new Runnable (){ public void run () { for (int i=1 ; i<20 ; i++){ ad.loopA(i); } } },"A" ).start(); new Thread (new Runnable (){ public void run () { for (int i=1 ; i<20 ; i++){ ad.loopB(i); } } },"B" ).start(); new Thread (new Runnable (){ public void run () { for (int i=1 ; i<20 ; i++){ ad.loopC(i); System.out.println("--------------------" ); } } },"C" ).start(); } } class AlternateDemo { private int number = 1 ; private Lock lock = new ReentrantLock (); private Condition condition1 = lock.newCondition(); private Condition condition2 = lock.newCondition(); private Condition condition3 = lock.newCondition(); public void loopA (int totalLoop) { lock.lock(); try { if (number != 1 ){ condition1.await(); } for (int i=1 ; i <= 5 ; i++){ System.out.println(Thread.currentThread().getName()+"\t" +i+"\t" +totalLoop); } number = 2 ; condition2.signal(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } public void loopB (int totalLoop) { lock.lock(); try { if (number != 2 ){ condition2.await(); } for (int i=1 ; i <= 15 ; i++){ System.out.println(Thread.currentThread().getName()+"\t" +i+"\t" +totalLoop); } number = 3 ; condition3.signal(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } public void loopC (int totalLoop) { lock.lock(); try { if (number != 3 ){ condition3.await(); } for (int i=1 ; i <= 20 ; i++){ System.out.println(Thread.currentThread().getName()+"\t" +i+"\t" +totalLoop); } number = 1 ; condition1.signal(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } }
7. ReadWriteLock(读写锁) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 public class TestReadWriteLock { public static void main (String[] args) { ReadWriteLockDemo rw = new ReadWriteLockDemo (); new Thread (new Runnable (){ public void run () { rw.set((int )(Math.random()*100 )); } },"Write:" ).start(); for (int i=0 ; i<100 ; i++){ new Thread (new Runnable (){ public void run () { rw.get(); } },"Read:" ).start(); } } } class ReadWriteLockDemo { private int number = 0 ; private ReadWriteLock lock = new ReentrantReadWriteLock (); public void get () { lock.readLock().lock(); try { System.out.println(Thread.currentThread().getName()+":" +number); }finally { lock.readLock().unlock(); } } public void set (int number) { lock.writeLock().lock(); try { System.out.println(Thread.currentThread().getName()); this .number = number; }finally { lock.writeLock().unlock(); } } }
8. 线程八锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public class Test { public static void main (String[] args) { Demo demo = new Demo (); Demo demo2 = new Demo (); new Thread (new Runnable (){ public void run () { demo.getOne(); } }).start(); new Thread (new Runnable (){ public void run () { demo.getTwo(); } }).start(); } } class Demo { public synchronized void getOne () { try { Thread.sleep(3000 ); }catch (InterruptedException e){ } System.out.println("one" ); } public synchronized void getTwo () { System.out.println("two" ); } }
9. 线程池 线程池提供了一个线程队列,队列中保存着所有等待状态的线程; 避免了创建与销毁线程的额外开销,提高了响应速度;
线程池的体系结构
java.util.concurrent.Executor: 负责线程的使用和调度的根接口;
ExecutorService: 子接口,线程池的主要接口;
ThreadPoolExecutor: 线程池的实现类;
ScheduledExecutorService: 子接口,负责线程的调度;
ScheduledThreadPoolExecutor: 继承了线程池的实现类,实现了负责线程调度的子接口;
工具类: Executors
ExecutorService newFixedThreadPool(): 创建固定大小的线程池;
ExecutorService newCachedThreadPool(): 缓存线程池,线程池中线程的数量不固定,可以根据需求自动更改数量;
ExecutorService newSingleThreadExecutor(): 创建单个线程池, 线程池中只有一个线程;
ScheduledExecutorService newScheduledThreadPool(): 创建固定大小的线程,可以延时或定时的执行任务;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class TestThreadPool { public static void main (String[] args) { ExecutorService pool = Executors.newFixedThreadPool(5 ); ThreadPoolDemo tpd = new ThreadPoolDemo (); for (int i=0 ; i<10 ; i++){ pool.submit(tpd); } pool.shutdown(); } } class ThreadPoolDemo implements Runnable { private int i=0 ; public void run () { while (i <= 100 ){ System.out.println(Thread.currentThread().getName()+" : " + i++) } } }
9.1 线程调度 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class TestScheduledThreadPool { public static void main (String[] args) throws Exception{ ScheduledExecutorService pool = Executors.newScheduledThreadPool(5 ); for (int i=0 ; i < 10 ; i++){ Future<Integer> result = pool.schedule(new Callable <Integer>(){ public Integer call () throws Exception{ int num = new Random ().nextInt(100 ); System.out.println(Thread.currentThread().getName()+ ":" + num); return num; } }, 3 , TimeUnit.SECONDS); System.out.println(result.get()); } pool.shutdown(); } }
10 Fork/Join 框架 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 public class TestForkJoinPool { public static void main (String[] args) { ForkJoinPool pool = new ForkJoinPool (); ForkJoinTask<Long> task = new ForkJoinSumCalculate (0L , 100000000L ); Long sum = pool.invoke(task); System.out.println(sum); } } class ForkJoinSumCalculate extends RecursiveTask <Long>{ private static final long serialVersionUID = 24340990L ; private long start; private long end; private static final long THURSHOLD = 10000L ; public ForkJoinSumCalculate (long start, long end) { this .start = start; this .end = end; } public Long compute () { long length = end - start; if (length <= THURSHOLD){ long sum = 0L ; for (long i = start; i<=end; i++){ sum += i; } return sum; }else { long middle = (start + end ) / 2 ; ForkJoinSumCalculate left = new ForkJoinSumCalculate (start, middle); left.fork(); ForkJoinSumCalculate right = new ForkJoinSumCalculate (middle + 1 , end); right.fork(); return left.join() + right.join(); } } }