001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.normalizer; 019 020import static java.util.Collections.singletonList; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.hamcrest.Matchers.comparesEqualTo; 023import static org.hamcrest.Matchers.greaterThan; 024import static org.hamcrest.Matchers.greaterThanOrEqualTo; 025import static org.hamcrest.Matchers.nullValue; 026import static org.junit.Assert.assertTrue; 027import static org.mockito.ArgumentMatchers.any; 028import static org.mockito.ArgumentMatchers.anyBoolean; 029import static org.mockito.ArgumentMatchers.anyLong; 030import static org.mockito.Mockito.when; 031 032import java.time.Duration; 033import java.util.Arrays; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.Executors; 036import java.util.concurrent.ThreadFactory; 037import java.util.concurrent.TimeUnit; 038import java.util.concurrent.atomic.AtomicReference; 039import java.util.function.Supplier; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.hbase.HBaseClassTestRule; 042import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.TableNameTestRule; 045import org.apache.hadoop.hbase.Waiter; 046import org.apache.hadoop.hbase.client.RegionInfo; 047import org.apache.hadoop.hbase.client.RegionInfoBuilder; 048import org.apache.hadoop.hbase.client.TableDescriptor; 049import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 050import org.apache.hadoop.hbase.master.MasterServices; 051import org.apache.hadoop.hbase.testclassification.MasterTests; 052import org.apache.hadoop.hbase.testclassification.SmallTests; 053import org.hamcrest.Description; 054import org.hamcrest.Matcher; 055import org.hamcrest.StringDescription; 056import org.junit.After; 057import org.junit.Before; 058import org.junit.ClassRule; 059import org.junit.Rule; 060import org.junit.Test; 061import org.junit.experimental.categories.Category; 062import org.junit.rules.TestName; 063import org.mockito.Answers; 064import org.mockito.Mock; 065import org.mockito.MockitoAnnotations; 066import org.mockito.junit.MockitoJUnit; 067import org.mockito.junit.MockitoRule; 068 069import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 070 071/** 072 * A test over {@link RegionNormalizerWorker}. Being a background thread, the only points of 073 * interaction we have to this class are its input source ({@link RegionNormalizerWorkQueue} and its 074 * callbacks invoked against {@link RegionNormalizer} and {@link MasterServices}. The work queue is 075 * simple enough to use directly; for {@link MasterServices}, use a mock because, as of now, the 076 * worker only invokes 4 methods. 077 */ 078@Category({ MasterTests.class, SmallTests.class }) 079public class TestRegionNormalizerWorker { 080 081 @ClassRule 082 public static final HBaseClassTestRule CLASS_RULE = 083 HBaseClassTestRule.forClass(TestRegionNormalizerWorker.class); 084 085 @Rule 086 public TestName testName = new TestName(); 087 @Rule 088 public TableNameTestRule tableName = new TableNameTestRule(); 089 090 @Rule 091 public MockitoRule mockitoRule = MockitoJUnit.rule(); 092 093 @Mock(answer = Answers.RETURNS_DEEP_STUBS) 094 private MasterServices masterServices; 095 @Mock 096 private RegionNormalizer regionNormalizer; 097 098 private HBaseCommonTestingUtility testingUtility; 099 private RegionNormalizerWorkQueue<TableName> queue; 100 private ExecutorService workerPool; 101 102 private final AtomicReference<Throwable> workerThreadThrowable = new AtomicReference<>(); 103 104 @Before 105 public void before() throws Exception { 106 MockitoAnnotations.initMocks(this); 107 when(masterServices.skipRegionManagementAction(any())).thenReturn(false); 108 testingUtility = new HBaseCommonTestingUtility(); 109 queue = new RegionNormalizerWorkQueue<>(); 110 workerThreadThrowable.set(null); 111 112 final String threadNameFmt = 113 TestRegionNormalizerWorker.class.getSimpleName() + "-" + testName.getMethodName() + "-%d"; 114 final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(threadNameFmt) 115 .setDaemon(true).setUncaughtExceptionHandler((t, e) -> workerThreadThrowable.set(e)).build(); 116 workerPool = Executors.newSingleThreadExecutor(threadFactory); 117 } 118 119 @After 120 public void after() throws Exception { 121 workerPool.shutdownNow(); // shutdownNow to interrupt the worker thread sitting on `take()` 122 assertTrue("timeout waiting for worker thread to terminate", 123 workerPool.awaitTermination(30, TimeUnit.SECONDS)); 124 final Throwable workerThrowable = workerThreadThrowable.get(); 125 assertThat("worker thread threw unexpected exception", workerThrowable, nullValue()); 126 } 127 128 @Test 129 public void testMergeCounter() throws Exception { 130 final TableName tn = tableName.getTableName(); 131 final TableDescriptor tnDescriptor = 132 TableDescriptorBuilder.newBuilder(tn).setNormalizationEnabled(true).build(); 133 when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor); 134 when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong())).thenReturn(1L); 135 when(regionNormalizer.computePlansForTable(tnDescriptor)).thenReturn(singletonList( 136 new MergeNormalizationPlan.Builder().addTarget(RegionInfoBuilder.newBuilder(tn).build(), 10) 137 .addTarget(RegionInfoBuilder.newBuilder(tn).build(), 20).build())); 138 139 final RegionNormalizerWorker worker = new RegionNormalizerWorker( 140 testingUtility.getConfiguration(), masterServices, regionNormalizer, queue); 141 final long beforeMergePlanCount = worker.getMergePlanCount(); 142 workerPool.submit(worker); 143 queue.put(tn); 144 145 assertThatEventually("executing work should see plan count increase", worker::getMergePlanCount, 146 greaterThan(beforeMergePlanCount)); 147 } 148 149 @Test 150 public void testSplitCounter() throws Exception { 151 final TableName tn = tableName.getTableName(); 152 final TableDescriptor tnDescriptor = 153 TableDescriptorBuilder.newBuilder(tn).setNormalizationEnabled(true).build(); 154 when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor); 155 when(masterServices.splitRegion(any(), any(), anyLong(), anyLong())).thenReturn(1L); 156 when(regionNormalizer.computePlansForTable(tnDescriptor)).thenReturn( 157 singletonList(new SplitNormalizationPlan(RegionInfoBuilder.newBuilder(tn).build(), 10))); 158 159 final RegionNormalizerWorker worker = new RegionNormalizerWorker( 160 testingUtility.getConfiguration(), masterServices, regionNormalizer, queue); 161 final long beforeSplitPlanCount = worker.getSplitPlanCount(); 162 workerPool.submit(worker); 163 queue.put(tn); 164 165 assertThatEventually("executing work should see plan count increase", worker::getSplitPlanCount, 166 greaterThan(beforeSplitPlanCount)); 167 } 168 169 /** 170 * Assert that a rate limit is honored, at least in a rough way. Maintainers should manually 171 * inspect the log messages emitted by the worker thread to confirm that expected behavior. 172 */ 173 @Test 174 public void testRateLimit() throws Exception { 175 final TableName tn = tableName.getTableName(); 176 final TableDescriptor tnDescriptor = 177 TableDescriptorBuilder.newBuilder(tn).setNormalizationEnabled(true).build(); 178 final RegionInfo splitRegionInfo = RegionInfoBuilder.newBuilder(tn).build(); 179 final RegionInfo mergeRegionInfo1 = RegionInfoBuilder.newBuilder(tn).build(); 180 final RegionInfo mergeRegionInfo2 = RegionInfoBuilder.newBuilder(tn).build(); 181 when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor); 182 when(masterServices.splitRegion(any(), any(), anyLong(), anyLong())).thenReturn(1L); 183 when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong())).thenReturn(1L); 184 when(regionNormalizer.computePlansForTable(tnDescriptor)).thenReturn(Arrays.asList( 185 new SplitNormalizationPlan(splitRegionInfo, 2), new MergeNormalizationPlan.Builder() 186 .addTarget(mergeRegionInfo1, 1).addTarget(mergeRegionInfo2, 2).build(), 187 new SplitNormalizationPlan(splitRegionInfo, 1))); 188 189 final Configuration conf = testingUtility.getConfiguration(); 190 conf.set("hbase.normalizer.throughput.max_bytes_per_sec", "1m"); 191 final RegionNormalizerWorker worker = new RegionNormalizerWorker( 192 testingUtility.getConfiguration(), masterServices, regionNormalizer, queue); 193 workerPool.submit(worker); 194 final long startTime = System.nanoTime(); 195 queue.put(tn); 196 197 assertThatEventually("executing work should see split plan count increase", 198 worker::getSplitPlanCount, comparesEqualTo(2L)); 199 assertThatEventually("executing work should see merge plan count increase", 200 worker::getMergePlanCount, comparesEqualTo(1L)); 201 202 final long endTime = System.nanoTime(); 203 assertThat("rate limited normalizer should have taken at least 5 seconds", 204 Duration.ofNanos(endTime - startTime), greaterThanOrEqualTo(Duration.ofSeconds(5))); 205 } 206 207 /** 208 * Repeatedly evaluates {@code matcher} against the result of calling {@code actualSupplier} until 209 * the matcher succeeds or the timeout period of 30 seconds is exhausted. 210 */ 211 private <T> void assertThatEventually(final String reason, 212 final Supplier<? extends T> actualSupplier, final Matcher<? super T> matcher) throws Exception { 213 testingUtility.waitFor(TimeUnit.SECONDS.toMillis(30), 214 new Waiter.ExplainingPredicate<Exception>() { 215 private T lastValue = null; 216 217 @Override 218 public String explainFailure() { 219 final Description description = new StringDescription().appendText(reason) 220 .appendText("\nExpected: ").appendDescriptionOf(matcher).appendText("\n but: "); 221 matcher.describeMismatch(lastValue, description); 222 return description.toString(); 223 } 224 225 @Override 226 public boolean evaluate() { 227 lastValue = actualSupplier.get(); 228 return matcher.matches(lastValue); 229 } 230 }); 231 } 232}