001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertNull;
023import static org.junit.jupiter.api.Assertions.assertSame;
024import static org.junit.jupiter.api.Assertions.assertTrue;
025
026import java.util.Comparator;
027import java.util.PriorityQueue;
028import java.util.concurrent.CyclicBarrier;
029import java.util.concurrent.ExecutorService;
030import java.util.concurrent.Executors;
031import java.util.concurrent.TimeUnit;
032import org.apache.hadoop.hbase.testclassification.MiscTests;
033import org.apache.hadoop.hbase.testclassification.SmallTests;
034import org.junit.jupiter.api.AfterEach;
035import org.junit.jupiter.api.BeforeEach;
036import org.junit.jupiter.api.Tag;
037import org.junit.jupiter.api.Test;
038
039@Tag(MiscTests.TAG)
040@Tag(SmallTests.TAG)
041public class TestBoundedPriorityBlockingQueue {
042
043  private final static int CAPACITY = 16;
044
045  static class TestObject {
046    private final int priority;
047    private final int seqId;
048
049    public TestObject(final int priority, final int seqId) {
050      this.priority = priority;
051      this.seqId = seqId;
052    }
053
054    public int getSeqId() {
055      return this.seqId;
056    }
057
058    public int getPriority() {
059      return this.priority;
060    }
061  }
062
063  static class TestObjectComparator implements Comparator<TestObject> {
064    public TestObjectComparator() {
065    }
066
067    @Override
068    public int compare(TestObject a, TestObject b) {
069      return a.getPriority() - b.getPriority();
070    }
071  }
072
073  private BoundedPriorityBlockingQueue<TestObject> queue;
074
075  @BeforeEach
076  public void setUp() throws Exception {
077    this.queue = new BoundedPriorityBlockingQueue<>(CAPACITY, new TestObjectComparator());
078  }
079
080  @AfterEach
081  public void tearDown() throws Exception {
082  }
083
084  @Test
085  public void tesAppend() throws Exception {
086    // Push
087    for (int i = 1; i <= CAPACITY; ++i) {
088      assertTrue(queue.offer(new TestObject(i, i)));
089      assertEquals(i, queue.size());
090      assertEquals(CAPACITY - i, queue.remainingCapacity());
091    }
092    assertFalse(queue.offer(new TestObject(0, -1), 5, TimeUnit.MILLISECONDS));
093
094    // Pop
095    for (int i = 1; i <= CAPACITY; ++i) {
096      TestObject obj = queue.poll();
097      assertEquals(i, obj.getSeqId());
098      assertEquals(CAPACITY - i, queue.size());
099      assertEquals(i, queue.remainingCapacity());
100    }
101    assertEquals(null, queue.poll());
102  }
103
104  @Test
105  public void tesAppendSamePriority() throws Exception {
106    // Push
107    for (int i = 1; i <= CAPACITY; ++i) {
108      assertTrue(queue.offer(new TestObject(0, i)));
109      assertEquals(i, queue.size());
110      assertEquals(CAPACITY - i, queue.remainingCapacity());
111    }
112    assertFalse(queue.offer(new TestObject(0, -1), 5, TimeUnit.MILLISECONDS));
113
114    // Pop
115    for (int i = 1; i <= CAPACITY; ++i) {
116      TestObject obj = queue.poll();
117      assertEquals(i, obj.getSeqId());
118      assertEquals(CAPACITY - i, queue.size());
119      assertEquals(i, queue.remainingCapacity());
120    }
121    assertEquals(null, queue.poll());
122  }
123
124  @Test
125  public void testPrepend() throws Exception {
126    // Push
127    for (int i = 1; i <= CAPACITY; ++i) {
128      assertTrue(queue.offer(new TestObject(CAPACITY - i, i)));
129      assertEquals(i, queue.size());
130      assertEquals(CAPACITY - i, queue.remainingCapacity());
131    }
132
133    // Pop
134    for (int i = 1; i <= CAPACITY; ++i) {
135      TestObject obj = queue.poll();
136      assertEquals(CAPACITY - (i - 1), obj.getSeqId());
137      assertEquals(CAPACITY - i, queue.size());
138      assertEquals(i, queue.remainingCapacity());
139    }
140    assertEquals(null, queue.poll());
141  }
142
143  @Test
144  public void testInsert() throws Exception {
145    // Push
146    for (int i = 1; i <= CAPACITY; i += 2) {
147      assertTrue(queue.offer(new TestObject(i, i)));
148      assertEquals((1 + i) / 2, queue.size());
149    }
150    for (int i = 2; i <= CAPACITY; i += 2) {
151      assertTrue(queue.offer(new TestObject(i, i)));
152      assertEquals(CAPACITY / 2 + (i / 2), queue.size());
153    }
154    assertFalse(queue.offer(new TestObject(0, -1), 5, TimeUnit.MILLISECONDS));
155
156    // Pop
157    for (int i = 1; i <= CAPACITY; ++i) {
158      TestObject obj = queue.poll();
159      assertEquals(i, obj.getSeqId());
160      assertEquals(CAPACITY - i, queue.size());
161      assertEquals(i, queue.remainingCapacity());
162    }
163    assertEquals(null, queue.poll());
164  }
165
166  @Test
167  public void testFifoSamePriority() throws Exception {
168    assertTrue(CAPACITY >= 6);
169    for (int i = 0; i < 6; ++i) {
170      assertTrue(queue.offer(new TestObject((1 + (i % 2)) * 10, i)));
171    }
172
173    for (int i = 0; i < 6; i += 2) {
174      TestObject obj = queue.poll();
175      assertEquals(10, obj.getPriority());
176      assertEquals(i, obj.getSeqId());
177    }
178
179    for (int i = 1; i < 6; i += 2) {
180      TestObject obj = queue.poll();
181      assertEquals(20, obj.getPriority());
182      assertEquals(i, obj.getSeqId());
183    }
184    assertEquals(null, queue.poll());
185  }
186
187  @Test
188  public void testPoll() {
189    assertNull(queue.poll());
190    PriorityQueue<TestObject> testList = new PriorityQueue<>(CAPACITY, new TestObjectComparator());
191
192    for (int i = 0; i < CAPACITY; ++i) {
193      TestObject obj = new TestObject(i, i);
194      testList.add(obj);
195      queue.offer(obj);
196    }
197
198    for (int i = 0; i < CAPACITY; ++i) {
199      assertEquals(testList.poll(), queue.poll());
200    }
201
202    assertNull(queue.poll());
203  }
204
205  @Test
206  public void testPollInExecutor() throws InterruptedException {
207    final TestObject testObj = new TestObject(0, 0);
208
209    final CyclicBarrier threadsStarted = new CyclicBarrier(2);
210    ExecutorService executor = Executors.newFixedThreadPool(2);
211    executor.execute(new Runnable() {
212      @Override
213      public void run() {
214        try {
215          assertNull(queue.poll(1000, TimeUnit.MILLISECONDS));
216          threadsStarted.await();
217          assertSame(testObj, queue.poll(1000, TimeUnit.MILLISECONDS));
218          assertTrue(queue.isEmpty());
219        } catch (Exception e) {
220          throw new RuntimeException(e);
221        }
222      }
223    });
224
225    executor.execute(new Runnable() {
226      @Override
227      public void run() {
228        try {
229          threadsStarted.await();
230          queue.offer(testObj);
231        } catch (Exception e) {
232          throw new RuntimeException(e);
233        }
234      }
235    });
236
237    executor.shutdown();
238    assertTrue(executor.awaitTermination(8000, TimeUnit.MILLISECONDS));
239  }
240}