001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.normalizer; 019 020import static org.hamcrest.MatcherAssert.assertThat; 021import static org.hamcrest.Matchers.contains; 022import static org.hamcrest.Matchers.greaterThan; 023import static org.hamcrest.Matchers.lessThanOrEqualTo; 024import static org.junit.jupiter.api.Assertions.assertEquals; 025import static org.junit.jupiter.api.Assertions.fail; 026 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collection; 030import java.util.HashSet; 031import java.util.Iterator; 032import java.util.LinkedList; 033import java.util.List; 034import java.util.Random; 035import java.util.concurrent.CompletableFuture; 036import java.util.concurrent.ConcurrentLinkedQueue; 037import java.util.concurrent.ThreadLocalRandom; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.atomic.AtomicBoolean; 040import java.util.stream.Collectors; 041import java.util.stream.IntStream; 042import org.apache.hadoop.hbase.HBaseConfiguration; 043import org.apache.hadoop.hbase.Waiter; 044import org.apache.hadoop.hbase.testclassification.MasterTests; 045import org.apache.hadoop.hbase.testclassification.SmallTests; 046import org.junit.jupiter.api.Tag; 047import org.junit.jupiter.api.Test; 048 049/** 050 * Tests that {@link RegionNormalizerWorkQueue} implements the contract described in its docstring. 051 */ 052@Tag(MasterTests.TAG) 053@Tag(SmallTests.TAG) 054public class TestRegionNormalizerWorkQueue { 055 056 @Test 057 public void testElementUniquenessAndFIFO() throws Exception { 058 final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>(); 059 final List<Integer> content = new LinkedList<>(); 060 IntStream.of(4, 3, 2, 1, 4, 3, 2, 1).boxed().forEach(queue::put); 061 assertEquals(4, queue.size()); 062 while (queue.size() > 0) { 063 content.add(queue.take()); 064 } 065 assertThat(content, contains(4, 3, 2, 1)); 066 067 queue.clear(); 068 queue.putAll(Arrays.asList(4, 3, 2, 1)); 069 queue.putAll(Arrays.asList(4, 5)); 070 assertEquals(5, queue.size()); 071 content.clear(); 072 while (queue.size() > 0) { 073 content.add(queue.take()); 074 } 075 assertThat(content, contains(4, 3, 2, 1, 5)); 076 } 077 078 @Test 079 public void testPriorityAndFIFO() throws Exception { 080 final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>(); 081 final List<Integer> content = new LinkedList<>(); 082 queue.putAll(Arrays.asList(4, 3, 2, 1)); 083 assertEquals(4, queue.size()); 084 queue.putFirst(0); 085 assertEquals(5, queue.size()); 086 drainTo(queue, content); 087 assertThat("putFirst items should jump the queue, preserving existing order", content, 088 contains(0, 4, 3, 2, 1)); 089 090 queue.clear(); 091 content.clear(); 092 queue.putAll(Arrays.asList(4, 3, 2, 1)); 093 queue.putFirst(1); 094 assertEquals(4, queue.size()); 095 drainTo(queue, content); 096 assertThat("existing items re-added with putFirst should jump the queue", content, 097 contains(1, 4, 3, 2)); 098 099 queue.clear(); 100 content.clear(); 101 queue.putAll(Arrays.asList(4, 3, 2, 1)); 102 queue.putAllFirst(Arrays.asList(2, 3)); 103 assertEquals(4, queue.size()); 104 drainTo(queue, content); 105 assertThat( 106 "existing items re-added with putAllFirst jump the queue AND honor changes in priority", 107 content, contains(2, 3, 4, 1)); 108 } 109 110 private enum Action { 111 PUT, 112 PUT_FIRST, 113 PUT_ALL, 114 PUT_ALL_FIRST, 115 } 116 117 /** 118 * Test that the uniqueness constraint is honored in the face of concurrent modification. 119 */ 120 @Test 121 public void testConcurrentPut() throws Exception { 122 final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>(); 123 final int maxValue = 100; 124 final Runnable producer = () -> { 125 final Random rand = ThreadLocalRandom.current(); 126 for (int i = 0; i < 1_000; i++) { 127 final Action action = Action.values()[rand.nextInt(Action.values().length)]; 128 switch (action) { 129 case PUT: { 130 final int val = rand.nextInt(maxValue); 131 queue.put(val); 132 break; 133 } 134 case PUT_FIRST: { 135 final int val = rand.nextInt(maxValue); 136 queue.putFirst(val); 137 break; 138 } 139 case PUT_ALL: { 140 final List<Integer> vals = 141 rand.ints(5, 0, maxValue).boxed().collect(Collectors.toList()); 142 queue.putAll(vals); 143 break; 144 } 145 case PUT_ALL_FIRST: { 146 final List<Integer> vals = 147 rand.ints(5, 0, maxValue).boxed().collect(Collectors.toList()); 148 queue.putAllFirst(vals); 149 break; 150 } 151 default: 152 fail("Unrecognized action " + action); 153 } 154 } 155 }; 156 157 final int numThreads = 5; 158 final CompletableFuture<?>[] futures = IntStream.range(0, numThreads) 159 .mapToObj(val -> CompletableFuture.runAsync(producer)).toArray(CompletableFuture<?>[]::new); 160 CompletableFuture.allOf(futures).join(); 161 162 final List<Integer> content = new ArrayList<>(queue.size()); 163 drainTo(queue, content); 164 assertThat("at most `maxValue` items should be present.", content.size(), 165 lessThanOrEqualTo(maxValue)); 166 assertEquals(content.size(), new HashSet<>(content).size(), "all items should be unique."); 167 } 168 169 /** 170 * Test that calls to {@link RegionNormalizerWorkQueue#take()} block the requesting thread. The 171 * producing thread places new entries onto the queue following a known schedule. The consuming 172 * thread collects a time measurement between calls to {@code take}. Finally, the test makes 173 * coarse-grained assertions of the consumer's observations based on the producer's schedule. 174 */ 175 @Test 176 public void testTake() throws Exception { 177 final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>(); 178 final ConcurrentLinkedQueue<Long> takeTimes = new ConcurrentLinkedQueue<>(); 179 final AtomicBoolean finished = new AtomicBoolean(false); 180 final int count = 5; 181 final Runnable consumer = () -> { 182 try { 183 while (!finished.get()) { 184 queue.take(); 185 takeTimes.add(System.nanoTime()); 186 } 187 } catch (InterruptedException e) { 188 fail("interrupted."); 189 } 190 }; 191 192 CompletableFuture<Void> worker = CompletableFuture.runAsync(consumer); 193 final long testStart = System.nanoTime(); 194 for (int i = 0; i < count; i++) { 195 Thread.sleep(10); 196 queue.put(i); 197 } 198 // should have timing information for 5 calls to take. 199 Waiter.waitFor(HBaseConfiguration.create(), 1000, () -> count == takeTimes.size()); 200 // set finished = true and pipe one more value in case the thread needs an extra pass through 201 // the loop. 202 finished.set(true); 203 queue.put(1); 204 worker.get(1, TimeUnit.SECONDS); 205 206 final Iterator<Long> times = takeTimes.iterator(); 207 for (int i = 0; i < count; i++) { 208 assertThat( 209 "Observations collected in takeTimes should increase by roughly 10ms every interval", 210 times.next(), greaterThan(testStart + TimeUnit.MILLISECONDS.toNanos(i * 10))); 211 } 212 } 213 214 private static <E> void drainTo(final RegionNormalizerWorkQueue<E> queue, Collection<E> dest) 215 throws InterruptedException { 216 assertThat(queue.size(), greaterThan(0)); 217 while (queue.size() > 0) { 218 dest.add(queue.take()); 219 } 220 } 221}