java高并发编程--03--线程间通信
1.同步阻塞与异步非阻塞 服务端监听端口,客户端提交Event,服务端创建线程接收Event,处理Event,返回结果 public class EventQueue { private final int MAX; static class Event{} //Event队列 private final LinkedList<Event> EVENTS = new LinkedList<EventQueue.Event>(); private static final int DEFAULT_MAX = 10; EventQueue(){ this(DEFAULT_MAX); } EventQueue(int max){ this.MAX = max; } //提交Event到队尾 public void offer(Event event) { synchronized (EVENTS) { if(EVENTS.size() >= MAX) { try { System.out.println("EVENTS is Full."); EVENTS.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("The Event has submitted."); EVENTS.addLast(event); EVENTS.notify(); } } //从队首取走Event public Event take() { synchronized (EVENTS) { if(EVENTS.isEmpty()) { try { System.out.println("EVENTS is Empty."); EVENTS.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Event first = EVENTS.removeFirst(); EVENTS.notify(); System.out.println("Thre Event " + first +" has Hadled."); return first; } } public static void main(String[] args) { EventQueue eq = new EventQueue(); //创建一个线程,假定提交任务没有时间间隔,用循环不断提交任务 new Thread(() -> { while (true) { eq.offer(new Event()); } },"Producer").start(); //创建一个线程,假定处理一个任务需要一定时间,使用循环处理任务 new Thread(() -> { while(true) { eq.take(); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } },"Consumer").start(); } } 输出结果: EVENTS is Empty. wait和sleep的主要区别: 3.多线程间通信 public class EventQueue { private final int MAX; static class Event{} //Event队列 private final LinkedList<Event> EVENTS = new LinkedList<EventQueue.Event>(); private static final int DEFAULT_MAX = 10; EventQueue(){ this(DEFAULT_MAX); } EventQueue(int max){ this.MAX = max; } //提交Event到队尾 public void offer(Event event) { synchronized (EVENTS) { if(EVENTS.size() >= MAX) { try { System.out.println("EVENTS is Full."); EVENTS.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } EVENTS.addLast(event); System.out.println("The Event has submitted."+EVENTS.size()); EVENTS.notify(); } } //从队首取走Event public Event take() { synchronized (EVENTS) { if(EVENTS.isEmpty()) { try { System.out.println("EVENTS is Empty."); EVENTS.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Event first = EVENTS.removeFirst(); EVENTS.notify(); System.out.println("Thre Event " + first +" has Hadled."); return first; } } public static void main(String[] args) { EventQueue eq = new EventQueue(); //创建3个提交线程,假定提交任务没有时间间隔,用循环不断提交任务 new Thread(() -> { while (true) { eq.offer(new Event()); } },"Producer1").start(); new Thread(() -> { while (true) { eq.offer(new Event()); } },"Producer2").start(); new Thread(() -> { while (true) { eq.offer(new Event()); } },"Producer3").start(); //创建2个处理线程,假定处理一个任务需要一定时间,使用循环处理任务 new Thread(() -> { while(true) { eq.take(); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } },"Consumer1").start(); new Thread(() -> { while(true) { eq.take(); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } },"Consumer2").start(); } } 输出结果: 上面出现类Event数量大于最大值的情况,还可能出现队列为空,但Consumer仍然去获取Event的情况。 疑问:代码中已经添加了synchronized数据同步,为何还会出现数据不同步的情况? public class EventQueue { private final int MAX; static class Event{} //Event队列 private final LinkedList<Event> EVENTS = new LinkedList<EventQueue.Event>(); private static final int DEFAULT_MAX = 10; EventQueue(){ this(DEFAULT_MAX); } EventQueue(int max){ this.MAX = max; } //提交Event到队尾 public void offer(Event event) { synchronized (EVENTS) { while(EVENTS.size() >= MAX) { try { System.out.println("EVENTS is Full."); EVENTS.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } EVENTS.addLast(event); System.out.println("The Event has submitted."+EVENTS.size()); EVENTS.notify(); } } //从队首取走Event public Event take() { synchronized (EVENTS) { while (EVENTS.isEmpty()) { try { System.out.println("EVENTS is Empty."); EVENTS.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Event first = EVENTS.removeFirst(); EVENTS.notify(); System.out.println("Thre Event " + first +" has Hadled."); return first; } } public static void main(String[] args) { EventQueue eq = new EventQueue(); //创建3个提交线程,假定提交任务没有时间间隔,用循环不断提交任务 new Thread(() -> { while (true) { eq.offer(new Event()); } },"Consumer2").start(); } } 输出结果: 3.3线程休息室wait set 4自定义显式锁BooleanLock public interface Lock { //类似synchronized关键字,lock方法永远阻塞,除非获取到锁,可以被打断,打断抛出InterruptedException void lock() throws InterruptedException; //类似synchronized关键字,lock方法永远阻塞,除非获取到锁,可以被打断或超时,打断抛出InterruptedException,超时抛出TimeoutException void lock(long timeMillis) throws InterruptedException,TimeoutException; //释放锁 void unLock(); //获取哪些线程被阻塞 List<Thread> getBlockedThreads(); } 2)实现Lock //通过控制Boolean变量开关决定是否允许当前线程获取到锁 public class BlooeanLock implements Lock { //当前拥有锁的线程 private Thread currentThread; //locked为false标识当前该锁没有被任何线程获得或已被释放,true表示该锁以被获得,该线程是currentThread private boolean locked = false; //存储因获取该锁而进入阻塞的线程 private List<Thread> blockedThreadList = new ArrayList<Thread>(); @Override public void lock() throws InterruptedException { //使用同步代码块的方式进行同步 synchronized (this) { //如果某个线程已经获取到当前锁,将该线程放到blockedThreadList中,并使用wait方法释放该线程对this monitor的所有权 while(locked) { blockedThreadList.add(Thread.currentThread()); this.wait(); } //如果没有线程在拥有当前锁,从blockedThreadList移除当前线程,当前线程设置为拥有锁的线程并将锁设置为已被拥有的状态 this.blockedThreadList.remove(Thread.currentThread()); this.locked = true; this.currentThread = Thread.currentThread(); } } @Override public void lock(long timeMillis) throws InterruptedException,TimeoutException { //同样使用同步代码块的方式进行同步 synchronized (this) { //如果时间不合法,默认使用lock方法,通常处理是抛出异常,抛出异常更好一些 if(timeMillis <= 0) { this.lock(); //throw *Exception }else { long remainMillis = timeMillis; long endMillis = System.currentTimeMillis() + remainMillis; while(locked) { //如果remainMillis小于等于0,意为着当前线程被其他线程唤醒或者在指定wait时间内没有获取到锁,这种情况抛出异常 if(remainMillis <= 0) { throw new TimeoutException("can not get lock during " + timeMillis); }else { if(!blockedThreadList.contains(Thread.currentThread())) { blockedThreadList.add(Thread.currentThread()); } //等待remainMillis时间,remainMillis最初由其他线程传入,但在多次wait过程重新计算 this.wait(remainMillis); //重新计算remainMillis remainMillis = endMillis - System.currentTimeMillis(); } } //如果没有线程在拥有当前锁,从blockedThreadList移除当前线程,当前线程设置为拥有锁的线程并将锁设置为已被拥有的状态 this.blockedThreadList.remove(Thread.currentThread()); this.locked = true; this.currentThread = Thread.currentThread(); } } } @Override public void unLock() { //哪一个线程加的锁只能由哪一个线程来解锁 synchronized (this) { //判断是否是当前线程获得的锁 if(currentThread.equals(Thread.currentThread())) { //将lock置为false locked = false; //通知其他线程 this.notifyAll(); } } } @Override public List<Thread> getBlockedThreads() { // TODO Auto-generated method stub return null; } } 3)使用BooleanLock public class BooleanLockTest { //定义BooleanLock private static final BlooeanLock lock = new BlooeanLock(); //使用try..finally语句来确保每次锁都被释放 public void synchronizedMethod() { try { lock.lock(); int random = new Random().nextInt(10); System.out.println("Thread " + Thread.currentThread() + " 获取到锁。"); TimeUnit.SECONDS.sleep(random); } catch (InterruptedException e) { e.printStackTrace(); } finally { //释放锁 lock.unLock(); } } public static void main(String[] args) { BooleanLockTest blt = new BooleanLockTest(); IntStream.range(1,10).mapToObj(i -> new Thread(blt::synchronizedMethod)).forEach(Thread::start); } } 输出结果: 使用可中断被阻塞的线程 public static void main(String[] args) throws InterruptedException { BooleanLockTest blt = new BooleanLockTest(); new Thread(blt::synchronizedMethod,"T1").start(); TimeUnit.MICROSECONDS.sleep(2); Thread t2 = new Thread(blt::synchronizedMethod,"T2"); t2.start(); TimeUnit.MICROSECONDS.sleep(2); t2.interrupt(); } 输出结果: Thread Thread[T1,main] 获取到锁。 使用可超时线程 public class BooleanLockTest { //定义BooleanLock private static final BlooeanLock lock = new BlooeanLock(); //使用try..finally语句来确保每次锁都被释放 public void synchronizedMethod() { try { lock.lock(100);//使用带时间的lock,尝试的时间可能小于下面一个任务执行的时间 int random = new Random().nextInt(10); System.out.println("Thread " + Thread.currentThread() + " 获取到锁。"); TimeUnit.SECONDS.sleep(random); } catch (InterruptedException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { //释放锁 lock.unLock(); } } public static void main(String[] args) throws InterruptedException { BooleanLockTest blt = new BooleanLockTest(); new Thread(blt::synchronizedMethod,"T2"); t2.start(); } } 多执行几次,应有输出结果:Thread Thread[T1,main] 获取到锁。java.util.concurrent.TimeoutException: can not get lock during 100 at cp5.cp4.BlooeanLock.lock(BlooeanLock.java:45) at cp5.cp4.BooleanLockTest.synchronizedMethod(BooleanLockTest.java:14) at java.base/java.lang.Thread.run(Thread.java:834) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |