1.PriorityBlockingQueue
public class PriorityBlockingQueue<E>
extends AbstractQueue<E>
implements BlockingQueue<E>, Serializable
所有添加进入PriorityBlockingQueue的元素都必须实现Comparable接口。当插入元素时,PriorityBlockingQueue会使用CompareTo()方法来决定元素插入的位置,元素越大越靠后。PriorityBlockingQueue是一个阻塞式的数据结构。当它的方法被调用且不能立即执行时,调用这个方法的线程将被阻塞直到方法执行完成。
public class Event implements Comparable<Event> {
private int thread;
private int priority;
public Event(int thread, int priority) {
super();
this.thread = thread;
this.priority = priority;
}
public int getThread() {
return thread;
}
public int getPriority() {
return priority;
}
@Override
public int compareTo(Event o) {
if(this.priority>o.getPriority()){
return -1;
}else if(this.priority<o.getPriority()){
return 1;
}else{
return 0;
}
}
}
public class Task implements Runnable {
private int id;
private PriorityBlockingQueue<Event> queue;
public Task(int id, PriorityBlockingQueue<Event> queue) {
super();
this.id = id;
this.queue = queue;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
Event event = new Event(id, i);
queue.add(event);
}
}
}
public class PriorityBlockingQueueMain {
public static void main(String[] args) {
PriorityBlockingQueue<Event> queue = new PriorityBlockingQueue<Event>();
Thread threads[] = new Thread[5];
for (int i = 0; i < threads.length; i++) {
Task task = new Task(i, queue);
threads[i] = new Thread(task);
}
for (int i = 0; i < threads.length; i++) {
threads[i].start();
}
for (int i = 0; i < threads.length; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Main:Queue Size:" + queue.size());
for (int i = 0; i < threads.length * 100; i++) {
Event event = queue.poll();
System.out.println("Thread "+ event.getThread()+" :Priority "+event.getPriority());
}
System.out.println("Main:Queue Size:" + queue.size());
System.out.println("Main: End");
}
}