001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.normalizer; 019 020import static org.hamcrest.MatcherAssert.assertThat; 021import static org.hamcrest.Matchers.contains; 022import static org.hamcrest.Matchers.greaterThan; 023import static org.hamcrest.Matchers.lessThanOrEqualTo; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertTrue; 026import static org.junit.Assert.fail; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collection; 030import java.util.HashSet; 031import java.util.Iterator; 032import java.util.LinkedList; 033import java.util.List; 034import java.util.Random; 035import java.util.concurrent.CompletableFuture; 036import java.util.concurrent.ConcurrentLinkedQueue; 037import java.util.concurrent.ThreadLocalRandom; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.atomic.AtomicBoolean; 040import java.util.stream.Collectors; 041import java.util.stream.IntStream; 042import org.apache.hadoop.hbase.HBaseClassTestRule; 043import org.apache.hadoop.hbase.testclassification.MasterTests; 044import org.apache.hadoop.hbase.testclassification.SmallTests; 045import org.junit.ClassRule; 046import org.junit.Rule; 047import org.junit.Test; 048import org.junit.experimental.categories.Category; 049import org.junit.rules.TestName; 050 051/** 052 * Tests that {@link RegionNormalizerWorkQueue} implements the contract described in its docstring. 053 */ 054@Category({ MasterTests.class, SmallTests.class}) 055public class TestRegionNormalizerWorkQueue { 056 057 @ClassRule 058 public static final HBaseClassTestRule CLASS_RULE = 059 HBaseClassTestRule.forClass(TestRegionNormalizerWorkQueue.class); 060 061 @Rule 062 public TestName testName = new TestName(); 063 064 @Test 065 public void testElementUniquenessAndFIFO() throws Exception { 066 final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>(); 067 final List<Integer> content = new LinkedList<>(); 068 IntStream.of(4, 3, 2, 1, 4, 3, 2, 1) 069 .boxed() 070 .forEach(queue::put); 071 assertEquals(4, queue.size()); 072 while (queue.size() > 0) { 073 content.add(queue.take()); 074 } 075 assertThat(content, contains(4, 3, 2, 1)); 076 077 queue.clear(); 078 queue.putAll(Arrays.asList(4, 3, 2, 1)); 079 queue.putAll(Arrays.asList(4, 5)); 080 assertEquals(5, queue.size()); 081 content.clear(); 082 while (queue.size() > 0) { 083 content.add(queue.take()); 084 } 085 assertThat(content, contains(4, 3, 2, 1, 5)); 086 } 087 088 @Test 089 public void testPriorityAndFIFO() throws Exception { 090 final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>(); 091 final List<Integer> content = new LinkedList<>(); 092 queue.putAll(Arrays.asList(4, 3, 2, 1)); 093 assertEquals(4, queue.size()); 094 queue.putFirst(0); 095 assertEquals(5, queue.size()); 096 drainTo(queue, content); 097 assertThat("putFirst items should jump the queue, preserving existing order", 098 content, contains(0, 4, 3, 2, 1)); 099 100 queue.clear(); 101 content.clear(); 102 queue.putAll(Arrays.asList(4, 3, 2, 1)); 103 queue.putFirst(1); 104 assertEquals(4, queue.size()); 105 drainTo(queue, content); 106 assertThat("existing items re-added with putFirst should jump the queue", 107 content, contains(1, 4, 3, 2)); 108 109 queue.clear(); 110 content.clear(); 111 queue.putAll(Arrays.asList(4, 3, 2, 1)); 112 queue.putAllFirst(Arrays.asList(2, 3)); 113 assertEquals(4, queue.size()); 114 drainTo(queue, content); 115 assertThat( 116 "existing items re-added with putAllFirst jump the queue AND honor changes in priority", 117 content, contains(2, 3, 4, 1)); 118 } 119 120 private enum Action { 121 PUT, 122 PUT_FIRST, 123 PUT_ALL, 124 PUT_ALL_FIRST, 125 } 126 127 /** 128 * Test that the uniqueness constraint is honored in the face of concurrent modification. 129 */ 130 @Test 131 public void testConcurrentPut() throws Exception { 132 final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>(); 133 final int maxValue = 100; 134 final Runnable producer = () -> { 135 final Random rand = ThreadLocalRandom.current(); 136 for (int i = 0; i < 1_000; i++) { 137 final Action action = Action.values()[rand.nextInt(Action.values().length)]; 138 switch (action) { 139 case PUT: { 140 final int val = rand.nextInt(maxValue); 141 queue.put(val); 142 break; 143 } 144 case PUT_FIRST: { 145 final int val = rand.nextInt(maxValue); 146 queue.putFirst(val); 147 break; 148 } 149 case PUT_ALL: { 150 final List<Integer> vals = rand.ints(5, 0, maxValue) 151 .boxed() 152 .collect(Collectors.toList()); 153 queue.putAll(vals); 154 break; 155 } 156 case PUT_ALL_FIRST: { 157 final List<Integer> vals = rand.ints(5, 0, maxValue) 158 .boxed() 159 .collect(Collectors.toList()); 160 queue.putAllFirst(vals); 161 break; 162 } 163 default: 164 fail("Unrecognized action " + action); 165 } 166 } 167 }; 168 169 final int numThreads = 5; 170 final CompletableFuture<?>[] futures = IntStream.range(0, numThreads) 171 .mapToObj(val -> CompletableFuture.runAsync(producer)) 172 .toArray(CompletableFuture<?>[]::new); 173 CompletableFuture.allOf(futures).join(); 174 175 final List<Integer> content = new ArrayList<>(queue.size()); 176 drainTo(queue, content); 177 assertThat("at most `maxValue` items should be present.", 178 content.size(), lessThanOrEqualTo(maxValue)); 179 assertEquals("all items should be unique.", content.size(), new HashSet<>(content).size()); 180 } 181 182 /** 183 * Test that calls to {@link RegionNormalizerWorkQueue#take()} block the requesting thread. The 184 * producing thread places new entries onto the queue following a known schedule. The consuming 185 * thread collects a time measurement between calls to {@code take}. Finally, the test makes 186 * coarse-grained assertions of the consumer's observations based on the producer's schedule. 187 */ 188 @Test 189 public void testTake() throws Exception { 190 final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>(); 191 final ConcurrentLinkedQueue<Long> takeTimes = new ConcurrentLinkedQueue<>(); 192 final AtomicBoolean finished = new AtomicBoolean(false); 193 final Runnable consumer = () -> { 194 try { 195 while (!finished.get()) { 196 queue.take(); 197 takeTimes.add(System.nanoTime()); 198 } 199 } catch (InterruptedException e) { 200 fail("interrupted."); 201 } 202 }; 203 204 CompletableFuture<Void> worker = CompletableFuture.runAsync(consumer); 205 final long testStart = System.nanoTime(); 206 for (int i = 0; i < 5; i++) { 207 Thread.sleep(10); 208 queue.put(i); 209 } 210 211 // set finished = true and pipe one more value in case the thread needs an extra pass through 212 // the loop. 213 finished.set(true); 214 queue.put(1); 215 worker.get(1, TimeUnit.SECONDS); 216 217 final Iterator<Long> times = takeTimes.iterator(); 218 assertTrue("should have timing information for at least 2 calls to take.", 219 takeTimes.size() >= 5); 220 for (int i = 0; i < 5; i++) { 221 assertThat( 222 "Observations collected in takeTimes should increase by roughly 10ms every interval", 223 times.next(), greaterThan(testStart + TimeUnit.MILLISECONDS.toNanos(i * 10))); 224 } 225 } 226 227 private static <E> void drainTo(final RegionNormalizerWorkQueue<E> queue, Collection<E> dest) 228 throws InterruptedException { 229 assertThat(queue.size(), greaterThan(0)); 230 while (queue.size() > 0) { 231 dest.add(queue.take()); 232 } 233 } 234}