001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNull; 023import static org.junit.Assert.assertSame; 024import static org.junit.Assert.assertTrue; 025 026import java.util.Comparator; 027import java.util.PriorityQueue; 028import java.util.concurrent.CyclicBarrier; 029import java.util.concurrent.ExecutorService; 030import java.util.concurrent.Executors; 031import java.util.concurrent.TimeUnit; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.testclassification.MiscTests; 034import org.apache.hadoop.hbase.testclassification.SmallTests; 035import org.junit.After; 036import org.junit.Before; 037import org.junit.ClassRule; 038import org.junit.Test; 039import org.junit.experimental.categories.Category; 040 041@Category({MiscTests.class, SmallTests.class}) 042public class TestBoundedPriorityBlockingQueue { 043 044 @ClassRule 045 public static final HBaseClassTestRule CLASS_RULE = 046 HBaseClassTestRule.forClass(TestBoundedPriorityBlockingQueue.class); 047 048 private final static int CAPACITY = 16; 049 050 static class TestObject { 051 private final int priority; 052 private final int seqId; 053 054 public TestObject(final int priority, final int seqId) { 055 this.priority = priority; 056 this.seqId = seqId; 057 } 058 059 public int getSeqId() { 060 return this.seqId; 061 } 062 063 public int getPriority() { 064 return this.priority; 065 } 066 } 067 068 static class TestObjectComparator implements Comparator<TestObject> { 069 public TestObjectComparator() {} 070 071 @Override 072 public int compare(TestObject a, TestObject b) { 073 return a.getPriority() - b.getPriority(); 074 } 075 } 076 077 private BoundedPriorityBlockingQueue<TestObject> queue; 078 079 @Before 080 public void setUp() throws Exception { 081 this.queue = new BoundedPriorityBlockingQueue<>(CAPACITY, new TestObjectComparator()); 082 } 083 084 @After 085 public void tearDown() throws Exception { 086 } 087 088 @Test 089 public void tesAppend() throws Exception { 090 // Push 091 for (int i = 1; i <= CAPACITY; ++i) { 092 assertTrue(queue.offer(new TestObject(i, i))); 093 assertEquals(i, queue.size()); 094 assertEquals(CAPACITY - i, queue.remainingCapacity()); 095 } 096 assertFalse(queue.offer(new TestObject(0, -1), 5, TimeUnit.MILLISECONDS)); 097 098 // Pop 099 for (int i = 1; i <= CAPACITY; ++i) { 100 TestObject obj = queue.poll(); 101 assertEquals(i, obj.getSeqId()); 102 assertEquals(CAPACITY - i, queue.size()); 103 assertEquals(i, queue.remainingCapacity()); 104 } 105 assertEquals(null, queue.poll()); 106 } 107 108 @Test 109 public void tesAppendSamePriority() throws Exception { 110 // Push 111 for (int i = 1; i <= CAPACITY; ++i) { 112 assertTrue(queue.offer(new TestObject(0, i))); 113 assertEquals(i, queue.size()); 114 assertEquals(CAPACITY - i, queue.remainingCapacity()); 115 } 116 assertFalse(queue.offer(new TestObject(0, -1), 5, TimeUnit.MILLISECONDS)); 117 118 // Pop 119 for (int i = 1; i <= CAPACITY; ++i) { 120 TestObject obj = queue.poll(); 121 assertEquals(i, obj.getSeqId()); 122 assertEquals(CAPACITY - i, queue.size()); 123 assertEquals(i, queue.remainingCapacity()); 124 } 125 assertEquals(null, queue.poll()); 126 } 127 128 @Test 129 public void testPrepend() throws Exception { 130 // Push 131 for (int i = 1; i <= CAPACITY; ++i) { 132 assertTrue(queue.offer(new TestObject(CAPACITY - i, i))); 133 assertEquals(i, queue.size()); 134 assertEquals(CAPACITY - i, queue.remainingCapacity()); 135 } 136 137 // Pop 138 for (int i = 1; i <= CAPACITY; ++i) { 139 TestObject obj = queue.poll(); 140 assertEquals(CAPACITY - (i - 1), obj.getSeqId()); 141 assertEquals(CAPACITY - i, queue.size()); 142 assertEquals(i, queue.remainingCapacity()); 143 } 144 assertEquals(null, queue.poll()); 145 } 146 147 @Test 148 public void testInsert() throws Exception { 149 // Push 150 for (int i = 1; i <= CAPACITY; i += 2) { 151 assertTrue(queue.offer(new TestObject(i, i))); 152 assertEquals((1 + i) / 2, queue.size()); 153 } 154 for (int i = 2; i <= CAPACITY; i += 2) { 155 assertTrue(queue.offer(new TestObject(i, i))); 156 assertEquals(CAPACITY / 2 + (i / 2), queue.size()); 157 } 158 assertFalse(queue.offer(new TestObject(0, -1), 5, TimeUnit.MILLISECONDS)); 159 160 // Pop 161 for (int i = 1; i <= CAPACITY; ++i) { 162 TestObject obj = queue.poll(); 163 assertEquals(i, obj.getSeqId()); 164 assertEquals(CAPACITY - i, queue.size()); 165 assertEquals(i, queue.remainingCapacity()); 166 } 167 assertEquals(null, queue.poll()); 168 } 169 170 @Test 171 public void testFifoSamePriority() throws Exception { 172 assertTrue(CAPACITY >= 6); 173 for (int i = 0; i < 6; ++i) { 174 assertTrue(queue.offer(new TestObject((1 + (i % 2)) * 10, i))); 175 } 176 177 for (int i = 0; i < 6; i += 2) { 178 TestObject obj = queue.poll(); 179 assertEquals(10, obj.getPriority()); 180 assertEquals(i, obj.getSeqId()); 181 } 182 183 for (int i = 1; i < 6; i += 2) { 184 TestObject obj = queue.poll(); 185 assertEquals(20, obj.getPriority()); 186 assertEquals(i, obj.getSeqId()); 187 } 188 assertEquals(null, queue.poll()); 189 } 190 191 @Test 192 public void testPoll() { 193 assertNull(queue.poll()); 194 PriorityQueue<TestObject> testList = new PriorityQueue<>(CAPACITY, new TestObjectComparator()); 195 196 for (int i = 0; i < CAPACITY; ++i) { 197 TestObject obj = new TestObject(i, i); 198 testList.add(obj); 199 queue.offer(obj); 200 } 201 202 for (int i = 0; i < CAPACITY; ++i) { 203 assertEquals(testList.poll(), queue.poll()); 204 } 205 206 assertNull(null, queue.poll()); 207 } 208 209 @Test 210 public void testPollInExecutor() throws InterruptedException { 211 final TestObject testObj = new TestObject(0, 0); 212 213 final CyclicBarrier threadsStarted = new CyclicBarrier(2); 214 ExecutorService executor = Executors.newFixedThreadPool(2); 215 executor.execute(new Runnable() { 216 @Override 217 public void run() { 218 try { 219 assertNull(queue.poll(1000, TimeUnit.MILLISECONDS)); 220 threadsStarted.await(); 221 assertSame(testObj, queue.poll(1000, TimeUnit.MILLISECONDS)); 222 assertTrue(queue.isEmpty()); 223 } catch (Exception e) { 224 throw new RuntimeException(e); 225 } 226 } 227 }); 228 229 executor.execute(new Runnable() { 230 @Override 231 public void run() { 232 try { 233 threadsStarted.await(); 234 queue.offer(testObj); 235 } catch (Exception e) { 236 throw new RuntimeException(e); 237 } 238 } 239 }); 240 241 executor.shutdown(); 242 assertTrue(executor.awaitTermination(8000, TimeUnit.MILLISECONDS)); 243 } 244}