001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertSame;
024import static org.junit.Assert.assertTrue;
025
026import java.util.Comparator;
027import java.util.PriorityQueue;
028import java.util.concurrent.CyclicBarrier;
029import java.util.concurrent.ExecutorService;
030import java.util.concurrent.Executors;
031import java.util.concurrent.TimeUnit;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.testclassification.MiscTests;
034import org.apache.hadoop.hbase.testclassification.SmallTests;
035import org.junit.After;
036import org.junit.Before;
037import org.junit.ClassRule;
038import org.junit.Test;
039import org.junit.experimental.categories.Category;
040
041@Category({MiscTests.class, SmallTests.class})
042public class TestBoundedPriorityBlockingQueue {
043
044  @ClassRule
045  public static final HBaseClassTestRule CLASS_RULE =
046      HBaseClassTestRule.forClass(TestBoundedPriorityBlockingQueue.class);
047
048  private final static int CAPACITY = 16;
049
050  static class TestObject {
051    private final int priority;
052    private final int seqId;
053
054    public TestObject(final int priority, final int seqId) {
055      this.priority = priority;
056      this.seqId = seqId;
057    }
058
059    public int getSeqId() {
060      return this.seqId;
061    }
062
063    public int getPriority() {
064      return this.priority;
065    }
066  }
067
068  static class TestObjectComparator implements Comparator<TestObject> {
069    public TestObjectComparator() {}
070
071    @Override
072    public int compare(TestObject a, TestObject b) {
073      return a.getPriority() - b.getPriority();
074    }
075  }
076
077  private BoundedPriorityBlockingQueue<TestObject> queue;
078
079  @Before
080  public void setUp() throws Exception {
081    this.queue = new BoundedPriorityBlockingQueue<>(CAPACITY, new TestObjectComparator());
082  }
083
084  @After
085  public void tearDown() throws Exception {
086  }
087
088  @Test
089  public void tesAppend() throws Exception {
090    // Push
091    for (int i = 1; i <= CAPACITY; ++i) {
092      assertTrue(queue.offer(new TestObject(i, i)));
093      assertEquals(i, queue.size());
094      assertEquals(CAPACITY - i, queue.remainingCapacity());
095    }
096    assertFalse(queue.offer(new TestObject(0, -1), 5, TimeUnit.MILLISECONDS));
097
098    // Pop
099    for (int i = 1; i <= CAPACITY; ++i) {
100      TestObject obj = queue.poll();
101      assertEquals(i, obj.getSeqId());
102      assertEquals(CAPACITY - i, queue.size());
103      assertEquals(i, queue.remainingCapacity());
104    }
105    assertEquals(null, queue.poll());
106  }
107
108  @Test
109  public void tesAppendSamePriority() throws Exception {
110    // Push
111    for (int i = 1; i <= CAPACITY; ++i) {
112      assertTrue(queue.offer(new TestObject(0, i)));
113      assertEquals(i, queue.size());
114      assertEquals(CAPACITY - i, queue.remainingCapacity());
115    }
116    assertFalse(queue.offer(new TestObject(0, -1), 5, TimeUnit.MILLISECONDS));
117
118    // Pop
119    for (int i = 1; i <= CAPACITY; ++i) {
120      TestObject obj = queue.poll();
121      assertEquals(i, obj.getSeqId());
122      assertEquals(CAPACITY - i, queue.size());
123      assertEquals(i, queue.remainingCapacity());
124    }
125    assertEquals(null, queue.poll());
126  }
127
128  @Test
129  public void testPrepend() throws Exception {
130    // Push
131    for (int i = 1; i <= CAPACITY; ++i) {
132      assertTrue(queue.offer(new TestObject(CAPACITY - i, i)));
133      assertEquals(i, queue.size());
134      assertEquals(CAPACITY - i, queue.remainingCapacity());
135    }
136
137    // Pop
138    for (int i = 1; i <= CAPACITY; ++i) {
139      TestObject obj = queue.poll();
140      assertEquals(CAPACITY - (i - 1), obj.getSeqId());
141      assertEquals(CAPACITY - i, queue.size());
142      assertEquals(i, queue.remainingCapacity());
143    }
144    assertEquals(null, queue.poll());
145  }
146
147  @Test
148  public void testInsert() throws Exception {
149    // Push
150    for (int i = 1; i <= CAPACITY; i += 2) {
151      assertTrue(queue.offer(new TestObject(i, i)));
152      assertEquals((1 + i) / 2, queue.size());
153    }
154    for (int i = 2; i <= CAPACITY; i += 2) {
155      assertTrue(queue.offer(new TestObject(i, i)));
156      assertEquals(CAPACITY / 2 + (i / 2), queue.size());
157    }
158    assertFalse(queue.offer(new TestObject(0, -1), 5, TimeUnit.MILLISECONDS));
159
160    // Pop
161    for (int i = 1; i <= CAPACITY; ++i) {
162      TestObject obj = queue.poll();
163      assertEquals(i, obj.getSeqId());
164      assertEquals(CAPACITY - i, queue.size());
165      assertEquals(i, queue.remainingCapacity());
166    }
167    assertEquals(null, queue.poll());
168  }
169
170  @Test
171  public void testFifoSamePriority() throws Exception {
172    assertTrue(CAPACITY >= 6);
173    for (int i = 0; i < 6; ++i) {
174      assertTrue(queue.offer(new TestObject((1 + (i % 2)) * 10, i)));
175    }
176
177    for (int i = 0; i < 6; i += 2) {
178      TestObject obj = queue.poll();
179      assertEquals(10, obj.getPriority());
180      assertEquals(i, obj.getSeqId());
181    }
182
183    for (int i = 1; i < 6; i += 2) {
184      TestObject obj = queue.poll();
185      assertEquals(20, obj.getPriority());
186      assertEquals(i, obj.getSeqId());
187    }
188    assertEquals(null, queue.poll());
189  }
190
191  @Test
192  public void testPoll() {
193    assertNull(queue.poll());
194    PriorityQueue<TestObject> testList = new PriorityQueue<>(CAPACITY, new TestObjectComparator());
195
196    for (int i = 0; i < CAPACITY; ++i) {
197      TestObject obj = new TestObject(i, i);
198      testList.add(obj);
199      queue.offer(obj);
200    }
201
202    for (int i = 0; i < CAPACITY; ++i) {
203      assertEquals(testList.poll(), queue.poll());
204    }
205
206    assertNull(null, queue.poll());
207  }
208
209  @Test
210  public void testPollInExecutor() throws InterruptedException {
211    final TestObject testObj = new TestObject(0, 0);
212
213    final CyclicBarrier threadsStarted = new CyclicBarrier(2);
214    ExecutorService executor = Executors.newFixedThreadPool(2);
215    executor.execute(new Runnable() {
216      @Override
217      public void run() {
218        try {
219          assertNull(queue.poll(1000, TimeUnit.MILLISECONDS));
220          threadsStarted.await();
221          assertSame(testObj, queue.poll(1000, TimeUnit.MILLISECONDS));
222          assertTrue(queue.isEmpty());
223        } catch (Exception e) {
224          throw new RuntimeException(e);
225        }
226      }
227    });
228
229    executor.execute(new Runnable() {
230      @Override
231      public void run() {
232        try {
233            threadsStarted.await();
234            queue.offer(testObj);
235        } catch (Exception e) {
236          throw new RuntimeException(e);
237        }
238      }
239    });
240
241    executor.shutdown();
242    assertTrue(executor.awaitTermination(8000, TimeUnit.MILLISECONDS));
243  }
244}