001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.normalizer; 019 020import static java.util.Collections.singletonList; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.hamcrest.Matchers.comparesEqualTo; 023import static org.hamcrest.Matchers.greaterThan; 024import static org.hamcrest.Matchers.greaterThanOrEqualTo; 025import static org.hamcrest.Matchers.nullValue; 026import static org.junit.Assert.assertTrue; 027import static org.mockito.ArgumentMatchers.any; 028import static org.mockito.ArgumentMatchers.anyBoolean; 029import static org.mockito.ArgumentMatchers.anyLong; 030import static org.mockito.Mockito.when; 031import java.time.Duration; 032import java.util.Arrays; 033import java.util.concurrent.ExecutorService; 034import java.util.concurrent.Executors; 035import java.util.concurrent.ThreadFactory; 036import java.util.concurrent.TimeUnit; 037import java.util.concurrent.atomic.AtomicReference; 038import java.util.function.Supplier; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.hbase.HBaseClassTestRule; 041import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.TableNameTestRule; 044import org.apache.hadoop.hbase.Waiter; 045import org.apache.hadoop.hbase.client.RegionInfo; 046import org.apache.hadoop.hbase.client.RegionInfoBuilder; 047import org.apache.hadoop.hbase.client.TableDescriptor; 048import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 049import org.apache.hadoop.hbase.master.MasterServices; 050import org.apache.hadoop.hbase.testclassification.MasterTests; 051import org.apache.hadoop.hbase.testclassification.SmallTests; 052import org.hamcrest.Description; 053import org.hamcrest.Matcher; 054import org.hamcrest.StringDescription; 055import org.junit.After; 056import org.junit.Before; 057import org.junit.ClassRule; 058import org.junit.Rule; 059import org.junit.Test; 060import org.junit.experimental.categories.Category; 061import org.junit.rules.TestName; 062import org.mockito.Answers; 063import org.mockito.Mock; 064import org.mockito.MockitoAnnotations; 065import org.mockito.junit.MockitoJUnit; 066import org.mockito.junit.MockitoRule; 067import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 068 069/** 070 * A test over {@link RegionNormalizerWorker}. Being a background thread, the only points of 071 * interaction we have to this class are its input source ({@link RegionNormalizerWorkQueue} and 072 * its callbacks invoked against {@link RegionNormalizer} and {@link MasterServices}. The work 073 * queue is simple enough to use directly; for {@link MasterServices}, use a mock because, as of 074 * now, the worker only invokes 4 methods. 075 */ 076@Category({ MasterTests.class, SmallTests.class}) 077public class TestRegionNormalizerWorker { 078 079 @ClassRule 080 public static final HBaseClassTestRule CLASS_RULE = 081 HBaseClassTestRule.forClass(TestRegionNormalizerWorker.class); 082 083 @Rule 084 public TestName testName = new TestName(); 085 @Rule 086 public TableNameTestRule tableName = new TableNameTestRule(); 087 088 @Rule 089 public MockitoRule mockitoRule = MockitoJUnit.rule(); 090 091 @Mock(answer = Answers.RETURNS_DEEP_STUBS) 092 private MasterServices masterServices; 093 @Mock 094 private RegionNormalizer regionNormalizer; 095 096 private HBaseCommonTestingUtility testingUtility; 097 private RegionNormalizerWorkQueue<TableName> queue; 098 private ExecutorService workerPool; 099 100 private final AtomicReference<Throwable> workerThreadThrowable = new AtomicReference<>(); 101 102 @Before 103 public void before() throws Exception { 104 MockitoAnnotations.initMocks(this); 105 when(masterServices.skipRegionManagementAction(any())).thenReturn(false); 106 testingUtility = new HBaseCommonTestingUtility(); 107 queue = new RegionNormalizerWorkQueue<>(); 108 workerThreadThrowable.set(null); 109 110 final String threadNameFmt = 111 TestRegionNormalizerWorker.class.getSimpleName() + "-" + testName.getMethodName() + "-%d"; 112 final ThreadFactory threadFactory = new ThreadFactoryBuilder() 113 .setNameFormat(threadNameFmt) 114 .setDaemon(true) 115 .setUncaughtExceptionHandler((t, e) -> workerThreadThrowable.set(e)) 116 .build(); 117 workerPool = Executors.newSingleThreadExecutor(threadFactory); 118 } 119 120 @After 121 public void after() throws Exception { 122 workerPool.shutdownNow(); // shutdownNow to interrupt the worker thread sitting on `take()` 123 assertTrue("timeout waiting for worker thread to terminate", 124 workerPool.awaitTermination(30, TimeUnit.SECONDS)); 125 final Throwable workerThrowable = workerThreadThrowable.get(); 126 assertThat("worker thread threw unexpected exception", workerThrowable, nullValue()); 127 } 128 129 @Test 130 public void testMergeCounter() throws Exception { 131 final TableName tn = tableName.getTableName(); 132 final TableDescriptor tnDescriptor = TableDescriptorBuilder.newBuilder(tn) 133 .setNormalizationEnabled(true) 134 .build(); 135 when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor); 136 when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong())) 137 .thenReturn(1L); 138 when(regionNormalizer.computePlansForTable(tn)) 139 .thenReturn(singletonList(new MergeNormalizationPlan.Builder() 140 .addTarget(RegionInfoBuilder.newBuilder(tn).build(), 10) 141 .addTarget(RegionInfoBuilder.newBuilder(tn).build(), 20) 142 .build())); 143 144 final RegionNormalizerWorker worker = new RegionNormalizerWorker( 145 testingUtility.getConfiguration(), masterServices, regionNormalizer, queue); 146 final long beforeMergePlanCount = worker.getMergePlanCount(); 147 workerPool.submit(worker); 148 queue.put(tn); 149 150 assertThatEventually("executing work should see plan count increase", 151 worker::getMergePlanCount, greaterThan(beforeMergePlanCount)); 152 } 153 154 @Test 155 public void testSplitCounter() throws Exception { 156 final TableName tn = tableName.getTableName(); 157 final TableDescriptor tnDescriptor = TableDescriptorBuilder.newBuilder(tn) 158 .setNormalizationEnabled(true) 159 .build(); 160 when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor); 161 when(masterServices.splitRegion(any(), any(), anyLong(), anyLong())) 162 .thenReturn(1L); 163 when(regionNormalizer.computePlansForTable(tn)) 164 .thenReturn(singletonList( 165 new SplitNormalizationPlan(RegionInfoBuilder.newBuilder(tn).build(), 10))); 166 167 final RegionNormalizerWorker worker = new RegionNormalizerWorker( 168 testingUtility.getConfiguration(), masterServices, regionNormalizer, queue); 169 final long beforeSplitPlanCount = worker.getSplitPlanCount(); 170 workerPool.submit(worker); 171 queue.put(tn); 172 173 assertThatEventually("executing work should see plan count increase", 174 worker::getSplitPlanCount, greaterThan(beforeSplitPlanCount)); 175 } 176 177 /** 178 * Assert that a rate limit is honored, at least in a rough way. Maintainers should manually 179 * inspect the log messages emitted by the worker thread to confirm that expected behavior. 180 */ 181 @Test 182 public void testRateLimit() throws Exception { 183 final TableName tn = tableName.getTableName(); 184 final TableDescriptor tnDescriptor = TableDescriptorBuilder.newBuilder(tn) 185 .setNormalizationEnabled(true) 186 .build(); 187 final RegionInfo splitRegionInfo = RegionInfoBuilder.newBuilder(tn).build(); 188 final RegionInfo mergeRegionInfo1 = RegionInfoBuilder.newBuilder(tn).build(); 189 final RegionInfo mergeRegionInfo2 = RegionInfoBuilder.newBuilder(tn).build(); 190 when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor); 191 when(masterServices.splitRegion(any(), any(), anyLong(), anyLong())) 192 .thenReturn(1L); 193 when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong())) 194 .thenReturn(1L); 195 when(regionNormalizer.computePlansForTable(tn)) 196 .thenReturn(Arrays.asList( 197 new SplitNormalizationPlan(splitRegionInfo, 2), 198 new MergeNormalizationPlan.Builder() 199 .addTarget(mergeRegionInfo1, 1) 200 .addTarget(mergeRegionInfo2, 2) 201 .build(), 202 new SplitNormalizationPlan(splitRegionInfo, 1))); 203 204 final Configuration conf = testingUtility.getConfiguration(); 205 conf.set("hbase.normalizer.throughput.max_bytes_per_sec", "1m"); 206 final RegionNormalizerWorker worker = new RegionNormalizerWorker( 207 testingUtility.getConfiguration(), masterServices, regionNormalizer, queue); 208 workerPool.submit(worker); 209 final long startTime = System.nanoTime(); 210 queue.put(tn); 211 212 assertThatEventually("executing work should see split plan count increase", 213 worker::getSplitPlanCount, comparesEqualTo(2L)); 214 assertThatEventually("executing work should see merge plan count increase", 215 worker::getMergePlanCount, comparesEqualTo(1L)); 216 217 final long endTime = System.nanoTime(); 218 assertThat("rate limited normalizer should have taken at least 5 seconds", 219 Duration.ofNanos(endTime - startTime), greaterThanOrEqualTo(Duration.ofSeconds(5))); 220 } 221 222 /** 223 * Repeatedly evaluates {@code matcher} against the result of calling {@code actualSupplier} 224 * until the matcher succeeds or the timeout period of 30 seconds is exhausted. 225 */ 226 private <T> void assertThatEventually( 227 final String reason, 228 final Supplier<? extends T> actualSupplier, 229 final Matcher<? super T> matcher 230 ) throws Exception { 231 testingUtility.waitFor(TimeUnit.SECONDS.toMillis(30), 232 new Waiter.ExplainingPredicate<Exception>() { 233 private T lastValue = null; 234 235 @Override 236 public String explainFailure() { 237 final Description description = new StringDescription() 238 .appendText(reason) 239 .appendText("\nExpected: ") 240 .appendDescriptionOf(matcher) 241 .appendText("\n but: "); 242 matcher.describeMismatch(lastValue, description); 243 return description.toString(); 244 } 245 246 @Override public boolean evaluate() { 247 lastValue = actualSupplier.get(); 248 return matcher.matches(lastValue); 249 } 250 }); 251 } 252}