001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.normalizer; 019 020import static java.util.Collections.singletonList; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.hamcrest.Matchers.comparesEqualTo; 023import static org.hamcrest.Matchers.greaterThan; 024import static org.hamcrest.Matchers.greaterThanOrEqualTo; 025import static org.hamcrest.Matchers.nullValue; 026import static org.junit.jupiter.api.Assertions.assertTrue; 027import static org.mockito.ArgumentMatchers.any; 028import static org.mockito.ArgumentMatchers.anyBoolean; 029import static org.mockito.ArgumentMatchers.anyLong; 030import static org.mockito.Mockito.when; 031 032import java.time.Duration; 033import java.util.Arrays; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.Executors; 036import java.util.concurrent.ThreadFactory; 037import java.util.concurrent.TimeUnit; 038import java.util.concurrent.atomic.AtomicReference; 039import java.util.function.Supplier; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.Waiter; 044import org.apache.hadoop.hbase.client.RegionInfo; 045import org.apache.hadoop.hbase.client.RegionInfoBuilder; 046import org.apache.hadoop.hbase.client.TableDescriptor; 047import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 048import org.apache.hadoop.hbase.master.MasterServices; 049import org.apache.hadoop.hbase.testclassification.MasterTests; 050import org.apache.hadoop.hbase.testclassification.SmallTests; 051import org.hamcrest.Description; 052import org.hamcrest.Matcher; 053import org.hamcrest.StringDescription; 054import org.junit.jupiter.api.AfterEach; 055import org.junit.jupiter.api.BeforeEach; 056import org.junit.jupiter.api.Tag; 057import org.junit.jupiter.api.Test; 058import org.junit.jupiter.api.TestInfo; 059import org.junit.jupiter.api.extension.ExtendWith; 060import org.mockito.Answers; 061import org.mockito.Mock; 062import org.mockito.MockitoAnnotations; 063import org.mockito.junit.jupiter.MockitoExtension; 064 065import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 066 067/** 068 * A test over {@link RegionNormalizerWorker}. Being a background thread, the only points of 069 * interaction we have to this class are its input source ({@link RegionNormalizerWorkQueue} and its 070 * callbacks invoked against {@link RegionNormalizer} and {@link MasterServices}. The work queue is 071 * simple enough to use directly; for {@link MasterServices}, use a mock because, as of now, the 072 * worker only invokes 4 methods. 073 */ 074@Tag(MasterTests.TAG) 075@Tag(SmallTests.TAG) 076@ExtendWith(MockitoExtension.class) 077public class TestRegionNormalizerWorker { 078 079 @Mock(answer = Answers.RETURNS_DEEP_STUBS) 080 private MasterServices masterServices; 081 @Mock 082 private RegionNormalizer regionNormalizer; 083 084 private HBaseCommonTestingUtil testingUtility; 085 private RegionNormalizerWorkQueue<TableName> queue; 086 private ExecutorService workerPool; 087 088 private final AtomicReference<Throwable> workerThreadThrowable = new AtomicReference<>(); 089 090 private TableName tableName; 091 092 @BeforeEach 093 public void before(TestInfo testInfo) throws Exception { 094 MockitoAnnotations.openMocks(this); 095 when(masterServices.skipRegionManagementAction(any())).thenReturn(false); 096 testingUtility = new HBaseCommonTestingUtil(); 097 queue = new RegionNormalizerWorkQueue<>(); 098 workerThreadThrowable.set(null); 099 tableName = TableName.valueOf(testInfo.getTestMethod().get().getName()); 100 101 final String threadNameFmt = TestRegionNormalizerWorker.class.getSimpleName() + "-" 102 + testInfo.getTestMethod().get().getName() + "-%d"; 103 final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(threadNameFmt) 104 .setDaemon(true).setUncaughtExceptionHandler((t, e) -> workerThreadThrowable.set(e)).build(); 105 workerPool = Executors.newSingleThreadExecutor(threadFactory); 106 } 107 108 @AfterEach 109 public void after() throws Exception { 110 workerPool.shutdownNow(); // shutdownNow to interrupt the worker thread sitting on `take()` 111 assertTrue(workerPool.awaitTermination(30, TimeUnit.SECONDS), 112 "timeout waiting for worker thread to terminate"); 113 final Throwable workerThrowable = workerThreadThrowable.get(); 114 assertThat("worker thread threw unexpected exception", workerThrowable, nullValue()); 115 } 116 117 @Test 118 public void testMergeCounter() throws Exception { 119 final TableName tn = tableName; 120 final TableDescriptor tnDescriptor = 121 TableDescriptorBuilder.newBuilder(tn).setNormalizationEnabled(true).build(); 122 when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor); 123 when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong())).thenReturn(1L); 124 when(regionNormalizer.computePlansForTable(tnDescriptor)).thenReturn(singletonList( 125 new MergeNormalizationPlan.Builder().addTarget(RegionInfoBuilder.newBuilder(tn).build(), 10) 126 .addTarget(RegionInfoBuilder.newBuilder(tn).build(), 20).build())); 127 128 final RegionNormalizerWorker worker = new RegionNormalizerWorker( 129 testingUtility.getConfiguration(), masterServices, regionNormalizer, queue); 130 final long beforeMergePlanCount = worker.getMergePlanCount(); 131 workerPool.submit(worker); 132 queue.put(tn); 133 134 assertThatEventually("executing work should see plan count increase", worker::getMergePlanCount, 135 greaterThan(beforeMergePlanCount)); 136 } 137 138 @Test 139 public void testSplitCounter() throws Exception { 140 final TableName tn = tableName; 141 final TableDescriptor tnDescriptor = 142 TableDescriptorBuilder.newBuilder(tn).setNormalizationEnabled(true).build(); 143 when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor); 144 when(masterServices.splitRegion(any(), any(), anyLong(), anyLong())).thenReturn(1L); 145 when(regionNormalizer.computePlansForTable(tnDescriptor)).thenReturn( 146 singletonList(new SplitNormalizationPlan(RegionInfoBuilder.newBuilder(tn).build(), 10))); 147 148 final RegionNormalizerWorker worker = new RegionNormalizerWorker( 149 testingUtility.getConfiguration(), masterServices, regionNormalizer, queue); 150 final long beforeSplitPlanCount = worker.getSplitPlanCount(); 151 workerPool.submit(worker); 152 queue.put(tn); 153 154 assertThatEventually("executing work should see plan count increase", worker::getSplitPlanCount, 155 greaterThan(beforeSplitPlanCount)); 156 } 157 158 /** 159 * Assert that a rate limit is honored, at least in a rough way. Maintainers should manually 160 * inspect the log messages emitted by the worker thread to confirm that expected behavior. 161 */ 162 @Test 163 public void testRateLimit() throws Exception { 164 final TableName tn = tableName; 165 final TableDescriptor tnDescriptor = 166 TableDescriptorBuilder.newBuilder(tn).setNormalizationEnabled(true).build(); 167 final RegionInfo splitRegionInfo = RegionInfoBuilder.newBuilder(tn).build(); 168 final RegionInfo mergeRegionInfo1 = RegionInfoBuilder.newBuilder(tn).build(); 169 final RegionInfo mergeRegionInfo2 = RegionInfoBuilder.newBuilder(tn).build(); 170 when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor); 171 when(masterServices.splitRegion(any(), any(), anyLong(), anyLong())).thenReturn(1L); 172 when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong())).thenReturn(1L); 173 when(regionNormalizer.computePlansForTable(tnDescriptor)).thenReturn(Arrays.asList( 174 new SplitNormalizationPlan(splitRegionInfo, 2), new MergeNormalizationPlan.Builder() 175 .addTarget(mergeRegionInfo1, 1).addTarget(mergeRegionInfo2, 2).build(), 176 new SplitNormalizationPlan(splitRegionInfo, 1))); 177 178 final Configuration conf = testingUtility.getConfiguration(); 179 conf.set("hbase.normalizer.throughput.max_bytes_per_sec", "1m"); 180 final RegionNormalizerWorker worker = new RegionNormalizerWorker( 181 testingUtility.getConfiguration(), masterServices, regionNormalizer, queue); 182 workerPool.submit(worker); 183 final long startTime = System.nanoTime(); 184 queue.put(tn); 185 186 assertThatEventually("executing work should see split plan count increase", 187 worker::getSplitPlanCount, comparesEqualTo(2L)); 188 assertThatEventually("executing work should see merge plan count increase", 189 worker::getMergePlanCount, comparesEqualTo(1L)); 190 191 final long endTime = System.nanoTime(); 192 assertThat("rate limited normalizer should have taken at least 5 seconds", 193 Duration.ofNanos(endTime - startTime), greaterThanOrEqualTo(Duration.ofSeconds(5))); 194 } 195 196 @Test 197 public void testPlansSizeLimit() throws Exception { 198 final TableName tn = tableName; 199 final TableDescriptor tnDescriptor = 200 TableDescriptorBuilder.newBuilder(tn).setNormalizationEnabled(true).build(); 201 final RegionInfo splitRegionInfo = RegionInfoBuilder.newBuilder(tn).build(); 202 final RegionInfo mergeRegionInfo1 = RegionInfoBuilder.newBuilder(tn).build(); 203 final RegionInfo mergeRegionInfo2 = RegionInfoBuilder.newBuilder(tn).build(); 204 when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor); 205 when(masterServices.splitRegion(any(), any(), anyLong(), anyLong())).thenReturn(1L); 206 when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong())).thenReturn(1L); 207 when(regionNormalizer.computePlansForTable(tnDescriptor)).thenReturn(Arrays.asList( 208 new SplitNormalizationPlan(splitRegionInfo, 2), new MergeNormalizationPlan.Builder() 209 .addTarget(mergeRegionInfo1, 1).addTarget(mergeRegionInfo2, 2).build(), 210 new SplitNormalizationPlan(splitRegionInfo, 1))); 211 212 final Configuration conf = testingUtility.getConfiguration(); 213 conf.setLong(RegionNormalizerWorker.CUMULATIVE_SIZE_LIMIT_MB_KEY, 5); 214 215 final RegionNormalizerWorker worker = new RegionNormalizerWorker( 216 testingUtility.getConfiguration(), masterServices, regionNormalizer, queue); 217 workerPool.submit(worker); 218 queue.put(tn); 219 220 assertThatEventually("worker should process first split plan, but not second", 221 worker::getSplitPlanCount, comparesEqualTo(1L)); 222 assertThatEventually("worker should process merge plan", worker::getMergePlanCount, 223 comparesEqualTo(1L)); 224 } 225 226 /** 227 * Repeatedly evaluates {@code matcher} against the result of calling {@code actualSupplier} until 228 * the matcher succeeds or the timeout period of 30 seconds is exhausted. 229 */ 230 private <T> void assertThatEventually(final String reason, 231 final Supplier<? extends T> actualSupplier, final Matcher<? super T> matcher) throws Exception { 232 testingUtility.waitFor(TimeUnit.SECONDS.toMillis(30), 233 new Waiter.ExplainingPredicate<Exception>() { 234 private T lastValue = null; 235 236 @Override 237 public String explainFailure() { 238 final Description description = new StringDescription().appendText(reason) 239 .appendText("\nExpected: ").appendDescriptionOf(matcher).appendText("\n but: "); 240 matcher.describeMismatch(lastValue, description); 241 return description.toString(); 242 } 243 244 @Override 245 public boolean evaluate() { 246 lastValue = actualSupplier.get(); 247 return matcher.matches(lastValue); 248 } 249 }); 250 } 251}