1.什么是阻塞队列
阻塞队列是一个在队列基础上又支持了两个附加操作的队列。
2.支持阻塞的插入方法:队列满时,队列会阻塞插入元素的线程,直到队列不满。
1.支持阻塞的移除方法:队列空时,获取元素的线程会等待队列变为非空。
2.阻塞队列的应用场景:
阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。简而言之,阻塞队列是生产者用来存放元素、消费者获取元素的容器。
3.java的阻塞队列
自从Java 1.5之后,在java.util.concurrent包下提供了若干个阻塞队列,主要有以下几个:
ArrayBlockingQueue:基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列。
LinkedBlockingQueue:基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE。
PriorityBlockingQueue:以上2种队列都是先进先出队列,而PriorityBlockingQueue却不是,它会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。注意,此阻塞队列为无界阻塞队列,即容量没有上限(通过源码就可以知道,它没有容器满的信号标志),前面2种都是有界队列。
DelayQueue:基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
4.非阻塞队列中的方法
add(E e):将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则会抛出异常;
remove():移除队首元素,若移除成功,则返回true;如果移除失败(队列为空),则会抛出异常;
offer(E e):将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则返回false;
poll():移除并获取队首元素,若成功,则返回队首元素;否则返回null;
peek():获取队首元素,若成功,则返回队首元素;否则返回null
5.阻塞队列中的方法
阻塞队列包括了非阻塞队列中的大部分方法,上面列举的5个方法在阻塞队列中都存在,但是要注意这5个方法在阻塞队列中都进行了同步措施。除此之外,阻塞队列提供了另外4个非常有用的方法:
put(E e)
take()
offer(E e,long timeout, TimeUnit unit)
poll(long timeout, TimeUnit unit)
put方法用来向队尾存入元素,如果队列满,则等待;
take方法用来从队首取元素,如果队列为空,则等待;
offer方法用来向队尾存入元素,如果队列满,则等待一定的时间,当时间期限达到时,如果还没有插入成功,则返回false;否则返回true;
poll方法用来从队首取元素,如果队列空,则等待一定的时间,当时间期限达到时,如果取到,则返回null;否则返回取得的元素;
6.阻塞队列提供了四种处理方法
方法\处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
检查方法 element() peek() 不可用 不可用
7.阻塞队列的实现:基于双向表
package threadTest.test5;
public class Node<E> {
private E e;
private Node<E> prve;
private Node<E> next;
public Node(E e) {
super();
this.e = e;
}
public E getE() {
return e;
}
public void setE(E e) {
this.e = e;
}
public Node<E> getPrve() {
return prve;
}
public void setPrve(Node<E> prve) {
this.prve = prve;
}
public Node() {
super();
// TODO Auto-generated constructor stub
}
public Node<E> getNext() {
return next;
}
public void setNext(Node<E> next) {
this.next = next;
}
@Override
protected void finalize() throws Throwable {
System.out.println("当前对象被释放:"+e);
}
}
package threadTest.test5;
import threadTest.test6.Node;
public class BlockingsQueue<E> {
//定义两把锁,只是简单的锁
private Object full = new Object();
private Object empty = new Object();
private Node<E> head;//队首
private Node<E> tail;//队尾
private Integer size;//队列长度
private Integer s;//当前队列长度
/**
* 定义队列最大长度 size
* @param size
*/
public BlockingsQueue(Integer size) {
super();
head = new Node<E>();
tail = new Node<E>();
head.setNext(tail);
head.setPrve(null);
tail.setNext(null);
tail.setPrve(head);
this.size = size;
this.s=0;
}
/**
* 默认队列长度为:Integer.MAX_VALUE
*/
public BlockingsQueue() {
super();
head = new Node<E>();
tail = new Node<E>();
head.setNext(tail);
head.setPrve(null);
tail.setNext(null);
tail.setPrve(head);
this.size=Integer.MAX_VALUE;
this.s=0;
}
/**
* 入队操作
* @param e
*/
public void push(E e) {
synchronized(full){
while (this.s>=this.size) {//没有更多空间,需要阻塞
try {
System.out.println("没有空间,需要阻塞");
full.wait();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
Node<E> n=new Node<E>(e);
n.setPrve(tail.getPrve());
n.setNext(tail);
tail.getPrve().setNext(n);
tail.setPrve(n);
this.s++;
synchronized(empty){
System.out.println("有数据了,唤醒poll方法");
empty.notify();//达到了唤醒poll方法的条件
}
}
/**
* 出队操作
* @return
*/
public E pop() {
synchronized(empty){
while (s<= 0) {//没有数据,阻塞
try {
System.out.println("没有数据,阻塞");
empty.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Node<E> n=head.getNext();
n.getNext().setPrve(head);
head.setNext(n.getNext());
s--;
synchronized(full){
System.out.println("可以放数据 了");
full.notify();
}
return n.getE();
}
@Override
public String toString() {
Node<E> no=head.getNext();
while(no!=tail) {
System.out.println(no.getE());
no=no.getNext();
}
return "";
}
}
package threadTest.test5;
//主线程
public class TestMain {
public static void main(String[] args) {
BlockingsQueue<Integer> q=new BlockingsQueue<>(3);
new Thread(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
for(int i=0;i<100;i++) {
try {
Thread.sleep(1000);
q.push(i);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
for(int i=0;i<100;i++) {
try {
Thread.sleep(1000);
System.out.println(q.pop());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}
}