Queue是什么
队列,是一种数据结构。除了优先级队列和LIFO队列外,队列都是以FIFO(先进先出)的方式对各个元素进行排序的。无论使用哪种排序方式,队列的头都是调用remove()或poll()移除元素的。在FIFO队列中,所有新元素都插入队列的末尾。队列都是线程安全的,内部已经实现安全措施,不用我们担心
Queue中的方法
Queue中的方法不难理解,6个,每2对是一个也就是总共3对。看一下JDK API就知道了:

注意一点就好,Queue通常不允许插入Null,尽管某些实现(比如LinkedList)是允许的,但是也不建议。
ArrayBlockingQueue:基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列。
LinkedBlockingQueue:基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE。
PriorityBlockingQueue:以上2种队列都是先进先出队列,而PriorityBlockingQueue却不是,它会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。注意,此阻塞队列为无界阻塞队列,即容量没有上限(通过源码就可以知道,它没有容器满的信号标志),前面2种都是有界队列。
DelayQueue:基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
注意:
1、必须要使用take()方法在获取的时候达成阻塞结果
2、使用poll()方法将产生非阻塞效果
- public class ArrayBlockingQueue<E> extends AbstractQueue<E>
- implements BlockingQueue<E>, java.io.Serializable {
- private static final long serialVersionUID = -817911632652898426L;
- /** The queued items */
- private final E[] items;
- /** items index for next take, poll or remove */
- private int takeIndex;
- /** items index for next put, offer, or add. */
- private int putIndex;
- /** Number of items in the queue */
- private int count;
- /*
- * Concurrency control uses the classic two-condition algorithm
- * found in any textbook.
- */
- /** Main lock guarding all access */
- private final ReentrantLock lock;
- /** Condition for waiting takes */
- private final Condition notEmpty;
- /** Condition for waiting puts */
- private final Condition notFull;
- }
- public ArrayBlockingQueue(int capacity) {
- }
- public ArrayBlockingQueue(int capacity, boolean fair) {
- }
- public ArrayBlockingQueue(int capacity, boolean fair,
- Collection<? extends E> c) {
- }
- public void put(E e) throws InterruptedException {
- if (e == null) throw new NullPointerException();
- final E[] items = this.items;
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- try {
- while (count == items.length)
- notFull.await();
- } catch (InterruptedException ie) {
- notFull.signal(); // propagate to non-interrupted thread
- throw ie;
- }
- insert(e);
- } finally {
- lock.unlock();
- }
- }
- private void insert(E x) {
- items[putIndex] = x;
- putIndex = inc(putIndex);
- ++count;
- notEmpty.signal();
- }
- public E take() throws InterruptedException {
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- try {
- while (count == 0)
- notEmpty.await();
- } catch (InterruptedException ie) {
- notEmpty.signal(); // propagate to non-interrupted thread
- throw ie;
- }
- E x = extract();
- return x;
- } finally {
- lock.unlock();
- }
- }
- private E extract() {
- final E[] items = this.items;
- E x = items[takeIndex];
- items[takeIndex] = null;
- takeIndex = inc(takeIndex);
- --count;
- notFull.signal();
- return x;
- }
- public class Test {
- private int queueSize = 10;
- private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);
- public static void main(String[] args) {
- Test test = new Test();
- Producer producer = test.new Producer();
- Consumer consumer = test.new Consumer();
- producer.start();
- consumer.start();
- }
- class Consumer extends Thread{
- @Override
- public void run() {
- consume();
- }
- private void consume() {
- while(true){
- synchronized (queue) {
- while(queue.size() == 0){
- try {
- System.out.println("队列空,等待数据");
- queue.wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- queue.notify();
- }
- }
- queue.poll(); //每次移走队首元素
- queue.notify();
- System.out.println("从队列取走一个元素,队列剩余"+queue.size()+"个元素");
- }
- }
- }
- }
- class Producer extends Thread{
- @Override
- public void run() {
- produce();
- }
- private void produce() {
- while(true){
- synchronized (queue) {
- while(queue.size() == queueSize){
- try {
- System.out.println("队列满,等待有空余空间");
- queue.wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- queue.notify();
- }
- }
- queue.offer(1); //每次插入一个元素
- queue.notify();
- System.out.println("向队列取中插入一个元素,队列剩余空间:"+(queueSize-queue.size()));
- }
- }
- }
- }
- }
- public class Test {
- private int queueSize = 10;
- private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize);
- public static void main(String[] args) {
- Test test = new Test();
- Producer producer = test.new Producer();
- Consumer consumer = test.new Consumer();
- producer.start();
- consumer.start();
- }
- class Consumer extends Thread{
- @Override
- public void run() {
- consume();
- }
- private void consume() {
- while(true){
- try {
- queue.take();
- System.out.println("从队列取走一个元素,队列剩余"+queue.size()+"个元素");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- class Producer extends Thread{
- @Override
- public void run() {
- produce();
- }
- private void produce() {
- while(true){
- try {
- queue.put(1);
- System.out.println("向队列取中插入一个元素,队列剩余空间:"+(queueSize-queue.size()));
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- }