001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertNull; 023import static org.junit.jupiter.api.Assertions.assertSame; 024import static org.junit.jupiter.api.Assertions.assertTrue; 025 026import java.util.Comparator; 027import java.util.PriorityQueue; 028import java.util.concurrent.CyclicBarrier; 029import java.util.concurrent.ExecutorService; 030import java.util.concurrent.Executors; 031import java.util.concurrent.TimeUnit; 032import org.apache.hadoop.hbase.testclassification.MiscTests; 033import org.apache.hadoop.hbase.testclassification.SmallTests; 034import org.junit.jupiter.api.AfterEach; 035import org.junit.jupiter.api.BeforeEach; 036import org.junit.jupiter.api.Tag; 037import org.junit.jupiter.api.Test; 038 039@Tag(MiscTests.TAG) 040@Tag(SmallTests.TAG) 041public class TestBoundedPriorityBlockingQueue { 042 043 private final static int CAPACITY = 16; 044 045 static class TestObject { 046 private final int priority; 047 private final int seqId; 048 049 public TestObject(final int priority, final int seqId) { 050 this.priority = priority; 051 this.seqId = seqId; 052 } 053 054 public int getSeqId() { 055 return this.seqId; 056 } 057 058 public int getPriority() { 059 return this.priority; 060 } 061 } 062 063 static class TestObjectComparator implements Comparator<TestObject> { 064 public TestObjectComparator() { 065 } 066 067 @Override 068 public int compare(TestObject a, TestObject b) { 069 return a.getPriority() - b.getPriority(); 070 } 071 } 072 073 private BoundedPriorityBlockingQueue<TestObject> queue; 074 075 @BeforeEach 076 public void setUp() throws Exception { 077 this.queue = new BoundedPriorityBlockingQueue<>(CAPACITY, new TestObjectComparator()); 078 } 079 080 @AfterEach 081 public void tearDown() throws Exception { 082 } 083 084 @Test 085 public void tesAppend() throws Exception { 086 // Push 087 for (int i = 1; i <= CAPACITY; ++i) { 088 assertTrue(queue.offer(new TestObject(i, i))); 089 assertEquals(i, queue.size()); 090 assertEquals(CAPACITY - i, queue.remainingCapacity()); 091 } 092 assertFalse(queue.offer(new TestObject(0, -1), 5, TimeUnit.MILLISECONDS)); 093 094 // Pop 095 for (int i = 1; i <= CAPACITY; ++i) { 096 TestObject obj = queue.poll(); 097 assertEquals(i, obj.getSeqId()); 098 assertEquals(CAPACITY - i, queue.size()); 099 assertEquals(i, queue.remainingCapacity()); 100 } 101 assertEquals(null, queue.poll()); 102 } 103 104 @Test 105 public void tesAppendSamePriority() throws Exception { 106 // Push 107 for (int i = 1; i <= CAPACITY; ++i) { 108 assertTrue(queue.offer(new TestObject(0, i))); 109 assertEquals(i, queue.size()); 110 assertEquals(CAPACITY - i, queue.remainingCapacity()); 111 } 112 assertFalse(queue.offer(new TestObject(0, -1), 5, TimeUnit.MILLISECONDS)); 113 114 // Pop 115 for (int i = 1; i <= CAPACITY; ++i) { 116 TestObject obj = queue.poll(); 117 assertEquals(i, obj.getSeqId()); 118 assertEquals(CAPACITY - i, queue.size()); 119 assertEquals(i, queue.remainingCapacity()); 120 } 121 assertEquals(null, queue.poll()); 122 } 123 124 @Test 125 public void testPrepend() throws Exception { 126 // Push 127 for (int i = 1; i <= CAPACITY; ++i) { 128 assertTrue(queue.offer(new TestObject(CAPACITY - i, i))); 129 assertEquals(i, queue.size()); 130 assertEquals(CAPACITY - i, queue.remainingCapacity()); 131 } 132 133 // Pop 134 for (int i = 1; i <= CAPACITY; ++i) { 135 TestObject obj = queue.poll(); 136 assertEquals(CAPACITY - (i - 1), obj.getSeqId()); 137 assertEquals(CAPACITY - i, queue.size()); 138 assertEquals(i, queue.remainingCapacity()); 139 } 140 assertEquals(null, queue.poll()); 141 } 142 143 @Test 144 public void testInsert() throws Exception { 145 // Push 146 for (int i = 1; i <= CAPACITY; i += 2) { 147 assertTrue(queue.offer(new TestObject(i, i))); 148 assertEquals((1 + i) / 2, queue.size()); 149 } 150 for (int i = 2; i <= CAPACITY; i += 2) { 151 assertTrue(queue.offer(new TestObject(i, i))); 152 assertEquals(CAPACITY / 2 + (i / 2), queue.size()); 153 } 154 assertFalse(queue.offer(new TestObject(0, -1), 5, TimeUnit.MILLISECONDS)); 155 156 // Pop 157 for (int i = 1; i <= CAPACITY; ++i) { 158 TestObject obj = queue.poll(); 159 assertEquals(i, obj.getSeqId()); 160 assertEquals(CAPACITY - i, queue.size()); 161 assertEquals(i, queue.remainingCapacity()); 162 } 163 assertEquals(null, queue.poll()); 164 } 165 166 @Test 167 public void testFifoSamePriority() throws Exception { 168 assertTrue(CAPACITY >= 6); 169 for (int i = 0; i < 6; ++i) { 170 assertTrue(queue.offer(new TestObject((1 + (i % 2)) * 10, i))); 171 } 172 173 for (int i = 0; i < 6; i += 2) { 174 TestObject obj = queue.poll(); 175 assertEquals(10, obj.getPriority()); 176 assertEquals(i, obj.getSeqId()); 177 } 178 179 for (int i = 1; i < 6; i += 2) { 180 TestObject obj = queue.poll(); 181 assertEquals(20, obj.getPriority()); 182 assertEquals(i, obj.getSeqId()); 183 } 184 assertEquals(null, queue.poll()); 185 } 186 187 @Test 188 public void testPoll() { 189 assertNull(queue.poll()); 190 PriorityQueue<TestObject> testList = new PriorityQueue<>(CAPACITY, new TestObjectComparator()); 191 192 for (int i = 0; i < CAPACITY; ++i) { 193 TestObject obj = new TestObject(i, i); 194 testList.add(obj); 195 queue.offer(obj); 196 } 197 198 for (int i = 0; i < CAPACITY; ++i) { 199 assertEquals(testList.poll(), queue.poll()); 200 } 201 202 assertNull(queue.poll()); 203 } 204 205 @Test 206 public void testPollInExecutor() throws InterruptedException { 207 final TestObject testObj = new TestObject(0, 0); 208 209 final CyclicBarrier threadsStarted = new CyclicBarrier(2); 210 ExecutorService executor = Executors.newFixedThreadPool(2); 211 executor.execute(new Runnable() { 212 @Override 213 public void run() { 214 try { 215 assertNull(queue.poll(1000, TimeUnit.MILLISECONDS)); 216 threadsStarted.await(); 217 assertSame(testObj, queue.poll(1000, TimeUnit.MILLISECONDS)); 218 assertTrue(queue.isEmpty()); 219 } catch (Exception e) { 220 throw new RuntimeException(e); 221 } 222 } 223 }); 224 225 executor.execute(new Runnable() { 226 @Override 227 public void run() { 228 try { 229 threadsStarted.await(); 230 queue.offer(testObj); 231 } catch (Exception e) { 232 throw new RuntimeException(e); 233 } 234 } 235 }); 236 237 executor.shutdown(); 238 assertTrue(executor.awaitTermination(8000, TimeUnit.MILLISECONDS)); 239 } 240}