001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.normalizer; 019 020import static org.hamcrest.MatcherAssert.assertThat; 021import static org.hamcrest.Matchers.contains; 022import static org.hamcrest.Matchers.greaterThan; 023import static org.hamcrest.Matchers.lessThanOrEqualTo; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertTrue; 026import static org.junit.Assert.fail; 027 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Collection; 031import java.util.HashSet; 032import java.util.Iterator; 033import java.util.LinkedList; 034import java.util.List; 035import java.util.Random; 036import java.util.concurrent.CompletableFuture; 037import java.util.concurrent.ConcurrentLinkedQueue; 038import java.util.concurrent.ThreadLocalRandom; 039import java.util.concurrent.TimeUnit; 040import java.util.concurrent.atomic.AtomicBoolean; 041import java.util.stream.Collectors; 042import java.util.stream.IntStream; 043import org.apache.hadoop.hbase.HBaseClassTestRule; 044import org.apache.hadoop.hbase.testclassification.MasterTests; 045import org.apache.hadoop.hbase.testclassification.SmallTests; 046import org.junit.ClassRule; 047import org.junit.Rule; 048import org.junit.Test; 049import org.junit.experimental.categories.Category; 050import org.junit.rules.TestName; 051 052/** 053 * Tests that {@link RegionNormalizerWorkQueue} implements the contract described in its docstring. 054 */ 055@Category({ MasterTests.class, SmallTests.class }) 056public class TestRegionNormalizerWorkQueue { 057 058 @ClassRule 059 public static final HBaseClassTestRule CLASS_RULE = 060 HBaseClassTestRule.forClass(TestRegionNormalizerWorkQueue.class); 061 062 @Rule 063 public TestName testName = new TestName(); 064 065 @Test 066 public void testElementUniquenessAndFIFO() throws Exception { 067 final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>(); 068 final List<Integer> content = new LinkedList<>(); 069 IntStream.of(4, 3, 2, 1, 4, 3, 2, 1).boxed().forEach(queue::put); 070 assertEquals(4, queue.size()); 071 while (queue.size() > 0) { 072 content.add(queue.take()); 073 } 074 assertThat(content, contains(4, 3, 2, 1)); 075 076 queue.clear(); 077 queue.putAll(Arrays.asList(4, 3, 2, 1)); 078 queue.putAll(Arrays.asList(4, 5)); 079 assertEquals(5, queue.size()); 080 content.clear(); 081 while (queue.size() > 0) { 082 content.add(queue.take()); 083 } 084 assertThat(content, contains(4, 3, 2, 1, 5)); 085 } 086 087 @Test 088 public void testPriorityAndFIFO() throws Exception { 089 final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>(); 090 final List<Integer> content = new LinkedList<>(); 091 queue.putAll(Arrays.asList(4, 3, 2, 1)); 092 assertEquals(4, queue.size()); 093 queue.putFirst(0); 094 assertEquals(5, queue.size()); 095 drainTo(queue, content); 096 assertThat("putFirst items should jump the queue, preserving existing order", content, 097 contains(0, 4, 3, 2, 1)); 098 099 queue.clear(); 100 content.clear(); 101 queue.putAll(Arrays.asList(4, 3, 2, 1)); 102 queue.putFirst(1); 103 assertEquals(4, queue.size()); 104 drainTo(queue, content); 105 assertThat("existing items re-added with putFirst should jump the queue", content, 106 contains(1, 4, 3, 2)); 107 108 queue.clear(); 109 content.clear(); 110 queue.putAll(Arrays.asList(4, 3, 2, 1)); 111 queue.putAllFirst(Arrays.asList(2, 3)); 112 assertEquals(4, queue.size()); 113 drainTo(queue, content); 114 assertThat( 115 "existing items re-added with putAllFirst jump the queue AND honor changes in priority", 116 content, contains(2, 3, 4, 1)); 117 } 118 119 private enum Action { 120 PUT, 121 PUT_FIRST, 122 PUT_ALL, 123 PUT_ALL_FIRST, 124 } 125 126 /** 127 * Test that the uniqueness constraint is honored in the face of concurrent modification. 128 */ 129 @Test 130 public void testConcurrentPut() throws Exception { 131 final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>(); 132 final int maxValue = 100; 133 final Runnable producer = () -> { 134 final Random rand = ThreadLocalRandom.current(); 135 for (int i = 0; i < 1_000; i++) { 136 final Action action = Action.values()[rand.nextInt(Action.values().length)]; 137 switch (action) { 138 case PUT: { 139 final int val = rand.nextInt(maxValue); 140 queue.put(val); 141 break; 142 } 143 case PUT_FIRST: { 144 final int val = rand.nextInt(maxValue); 145 queue.putFirst(val); 146 break; 147 } 148 case PUT_ALL: { 149 final List<Integer> vals = 150 rand.ints(5, 0, maxValue).boxed().collect(Collectors.toList()); 151 queue.putAll(vals); 152 break; 153 } 154 case PUT_ALL_FIRST: { 155 final List<Integer> vals = 156 rand.ints(5, 0, maxValue).boxed().collect(Collectors.toList()); 157 queue.putAllFirst(vals); 158 break; 159 } 160 default: 161 fail("Unrecognized action " + action); 162 } 163 } 164 }; 165 166 final int numThreads = 5; 167 final CompletableFuture<?>[] futures = IntStream.range(0, numThreads) 168 .mapToObj(val -> CompletableFuture.runAsync(producer)).toArray(CompletableFuture<?>[]::new); 169 CompletableFuture.allOf(futures).join(); 170 171 final List<Integer> content = new ArrayList<>(queue.size()); 172 drainTo(queue, content); 173 assertThat("at most `maxValue` items should be present.", content.size(), 174 lessThanOrEqualTo(maxValue)); 175 assertEquals("all items should be unique.", content.size(), new HashSet<>(content).size()); 176 } 177 178 /** 179 * Test that calls to {@link RegionNormalizerWorkQueue#take()} block the requesting thread. The 180 * producing thread places new entries onto the queue following a known schedule. The consuming 181 * thread collects a time measurement between calls to {@code take}. Finally, the test makes 182 * coarse-grained assertions of the consumer's observations based on the producer's schedule. 183 */ 184 @Test 185 public void testTake() throws Exception { 186 final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>(); 187 final ConcurrentLinkedQueue<Long> takeTimes = new ConcurrentLinkedQueue<>(); 188 final AtomicBoolean finished = new AtomicBoolean(false); 189 final Runnable consumer = () -> { 190 try { 191 while (!finished.get()) { 192 queue.take(); 193 takeTimes.add(System.nanoTime()); 194 } 195 } catch (InterruptedException e) { 196 fail("interrupted."); 197 } 198 }; 199 200 CompletableFuture<Void> worker = CompletableFuture.runAsync(consumer); 201 final long testStart = System.nanoTime(); 202 for (int i = 0; i < 5; i++) { 203 Thread.sleep(10); 204 queue.put(i); 205 } 206 207 // set finished = true and pipe one more value in case the thread needs an extra pass through 208 // the loop. 209 finished.set(true); 210 queue.put(1); 211 worker.get(1, TimeUnit.SECONDS); 212 213 final Iterator<Long> times = takeTimes.iterator(); 214 assertTrue("should have timing information for at least 2 calls to take.", 215 takeTimes.size() >= 5); 216 for (int i = 0; i < 5; i++) { 217 assertThat( 218 "Observations collected in takeTimes should increase by roughly 10ms every interval", 219 times.next(), greaterThan(testStart + TimeUnit.MILLISECONDS.toNanos(i * 10))); 220 } 221 } 222 223 private static <E> void drainTo(final RegionNormalizerWorkQueue<E> queue, Collection<E> dest) 224 throws InterruptedException { 225 assertThat(queue.size(), greaterThan(0)); 226 while (queue.size() > 0) { 227 dest.add(queue.take()); 228 } 229 } 230}