001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.normalizer;
019
020import static java.util.Collections.singletonList;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.hamcrest.Matchers.comparesEqualTo;
023import static org.hamcrest.Matchers.greaterThan;
024import static org.hamcrest.Matchers.greaterThanOrEqualTo;
025import static org.hamcrest.Matchers.nullValue;
026import static org.junit.jupiter.api.Assertions.assertTrue;
027import static org.mockito.ArgumentMatchers.any;
028import static org.mockito.ArgumentMatchers.anyBoolean;
029import static org.mockito.ArgumentMatchers.anyLong;
030import static org.mockito.Mockito.when;
031
032import java.time.Duration;
033import java.util.Arrays;
034import java.util.concurrent.ExecutorService;
035import java.util.concurrent.Executors;
036import java.util.concurrent.ThreadFactory;
037import java.util.concurrent.TimeUnit;
038import java.util.concurrent.atomic.AtomicReference;
039import java.util.function.Supplier;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.Waiter;
044import org.apache.hadoop.hbase.client.RegionInfo;
045import org.apache.hadoop.hbase.client.RegionInfoBuilder;
046import org.apache.hadoop.hbase.client.TableDescriptor;
047import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
048import org.apache.hadoop.hbase.master.MasterServices;
049import org.apache.hadoop.hbase.testclassification.MasterTests;
050import org.apache.hadoop.hbase.testclassification.SmallTests;
051import org.hamcrest.Description;
052import org.hamcrest.Matcher;
053import org.hamcrest.StringDescription;
054import org.junit.jupiter.api.AfterEach;
055import org.junit.jupiter.api.BeforeEach;
056import org.junit.jupiter.api.Tag;
057import org.junit.jupiter.api.Test;
058import org.junit.jupiter.api.TestInfo;
059import org.junit.jupiter.api.extension.ExtendWith;
060import org.mockito.Answers;
061import org.mockito.Mock;
062import org.mockito.MockitoAnnotations;
063import org.mockito.junit.jupiter.MockitoExtension;
064
065import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
066
067/**
068 * A test over {@link RegionNormalizerWorker}. Being a background thread, the only points of
069 * interaction we have to this class are its input source ({@link RegionNormalizerWorkQueue} and its
070 * callbacks invoked against {@link RegionNormalizer} and {@link MasterServices}. The work queue is
071 * simple enough to use directly; for {@link MasterServices}, use a mock because, as of now, the
072 * worker only invokes 4 methods.
073 */
074@Tag(MasterTests.TAG)
075@Tag(SmallTests.TAG)
076@ExtendWith(MockitoExtension.class)
077public class TestRegionNormalizerWorker {
078
079  @Mock(answer = Answers.RETURNS_DEEP_STUBS)
080  private MasterServices masterServices;
081  @Mock
082  private RegionNormalizer regionNormalizer;
083
084  private HBaseCommonTestingUtil testingUtility;
085  private RegionNormalizerWorkQueue<TableName> queue;
086  private ExecutorService workerPool;
087
088  private final AtomicReference<Throwable> workerThreadThrowable = new AtomicReference<>();
089
090  private TableName tableName;
091
092  @BeforeEach
093  public void before(TestInfo testInfo) throws Exception {
094    MockitoAnnotations.openMocks(this);
095    when(masterServices.skipRegionManagementAction(any())).thenReturn(false);
096    testingUtility = new HBaseCommonTestingUtil();
097    queue = new RegionNormalizerWorkQueue<>();
098    workerThreadThrowable.set(null);
099    tableName = TableName.valueOf(testInfo.getTestMethod().get().getName());
100
101    final String threadNameFmt = TestRegionNormalizerWorker.class.getSimpleName() + "-"
102      + testInfo.getTestMethod().get().getName() + "-%d";
103    final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(threadNameFmt)
104      .setDaemon(true).setUncaughtExceptionHandler((t, e) -> workerThreadThrowable.set(e)).build();
105    workerPool = Executors.newSingleThreadExecutor(threadFactory);
106  }
107
108  @AfterEach
109  public void after() throws Exception {
110    workerPool.shutdownNow(); // shutdownNow to interrupt the worker thread sitting on `take()`
111    assertTrue(workerPool.awaitTermination(30, TimeUnit.SECONDS),
112      "timeout waiting for worker thread to terminate");
113    final Throwable workerThrowable = workerThreadThrowable.get();
114    assertThat("worker thread threw unexpected exception", workerThrowable, nullValue());
115  }
116
117  @Test
118  public void testMergeCounter() throws Exception {
119    final TableName tn = tableName;
120    final TableDescriptor tnDescriptor =
121      TableDescriptorBuilder.newBuilder(tn).setNormalizationEnabled(true).build();
122    when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor);
123    when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong())).thenReturn(1L);
124    when(regionNormalizer.computePlansForTable(tnDescriptor)).thenReturn(singletonList(
125      new MergeNormalizationPlan.Builder().addTarget(RegionInfoBuilder.newBuilder(tn).build(), 10)
126        .addTarget(RegionInfoBuilder.newBuilder(tn).build(), 20).build()));
127
128    final RegionNormalizerWorker worker = new RegionNormalizerWorker(
129      testingUtility.getConfiguration(), masterServices, regionNormalizer, queue);
130    final long beforeMergePlanCount = worker.getMergePlanCount();
131    workerPool.submit(worker);
132    queue.put(tn);
133
134    assertThatEventually("executing work should see plan count increase", worker::getMergePlanCount,
135      greaterThan(beforeMergePlanCount));
136  }
137
138  @Test
139  public void testSplitCounter() throws Exception {
140    final TableName tn = tableName;
141    final TableDescriptor tnDescriptor =
142      TableDescriptorBuilder.newBuilder(tn).setNormalizationEnabled(true).build();
143    when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor);
144    when(masterServices.splitRegion(any(), any(), anyLong(), anyLong())).thenReturn(1L);
145    when(regionNormalizer.computePlansForTable(tnDescriptor)).thenReturn(
146      singletonList(new SplitNormalizationPlan(RegionInfoBuilder.newBuilder(tn).build(), 10)));
147
148    final RegionNormalizerWorker worker = new RegionNormalizerWorker(
149      testingUtility.getConfiguration(), masterServices, regionNormalizer, queue);
150    final long beforeSplitPlanCount = worker.getSplitPlanCount();
151    workerPool.submit(worker);
152    queue.put(tn);
153
154    assertThatEventually("executing work should see plan count increase", worker::getSplitPlanCount,
155      greaterThan(beforeSplitPlanCount));
156  }
157
158  /**
159   * Assert that a rate limit is honored, at least in a rough way. Maintainers should manually
160   * inspect the log messages emitted by the worker thread to confirm that expected behavior.
161   */
162  @Test
163  public void testRateLimit() throws Exception {
164    final TableName tn = tableName;
165    final TableDescriptor tnDescriptor =
166      TableDescriptorBuilder.newBuilder(tn).setNormalizationEnabled(true).build();
167    final RegionInfo splitRegionInfo = RegionInfoBuilder.newBuilder(tn).build();
168    final RegionInfo mergeRegionInfo1 = RegionInfoBuilder.newBuilder(tn).build();
169    final RegionInfo mergeRegionInfo2 = RegionInfoBuilder.newBuilder(tn).build();
170    when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor);
171    when(masterServices.splitRegion(any(), any(), anyLong(), anyLong())).thenReturn(1L);
172    when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong())).thenReturn(1L);
173    when(regionNormalizer.computePlansForTable(tnDescriptor)).thenReturn(Arrays.asList(
174      new SplitNormalizationPlan(splitRegionInfo, 2), new MergeNormalizationPlan.Builder()
175        .addTarget(mergeRegionInfo1, 1).addTarget(mergeRegionInfo2, 2).build(),
176      new SplitNormalizationPlan(splitRegionInfo, 1)));
177
178    final Configuration conf = testingUtility.getConfiguration();
179    conf.set("hbase.normalizer.throughput.max_bytes_per_sec", "1m");
180    final RegionNormalizerWorker worker = new RegionNormalizerWorker(
181      testingUtility.getConfiguration(), masterServices, regionNormalizer, queue);
182    workerPool.submit(worker);
183    final long startTime = System.nanoTime();
184    queue.put(tn);
185
186    assertThatEventually("executing work should see split plan count increase",
187      worker::getSplitPlanCount, comparesEqualTo(2L));
188    assertThatEventually("executing work should see merge plan count increase",
189      worker::getMergePlanCount, comparesEqualTo(1L));
190
191    final long endTime = System.nanoTime();
192    assertThat("rate limited normalizer should have taken at least 5 seconds",
193      Duration.ofNanos(endTime - startTime), greaterThanOrEqualTo(Duration.ofSeconds(5)));
194  }
195
196  @Test
197  public void testPlansSizeLimit() throws Exception {
198    final TableName tn = tableName;
199    final TableDescriptor tnDescriptor =
200      TableDescriptorBuilder.newBuilder(tn).setNormalizationEnabled(true).build();
201    final RegionInfo splitRegionInfo = RegionInfoBuilder.newBuilder(tn).build();
202    final RegionInfo mergeRegionInfo1 = RegionInfoBuilder.newBuilder(tn).build();
203    final RegionInfo mergeRegionInfo2 = RegionInfoBuilder.newBuilder(tn).build();
204    when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor);
205    when(masterServices.splitRegion(any(), any(), anyLong(), anyLong())).thenReturn(1L);
206    when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong())).thenReturn(1L);
207    when(regionNormalizer.computePlansForTable(tnDescriptor)).thenReturn(Arrays.asList(
208      new SplitNormalizationPlan(splitRegionInfo, 2), new MergeNormalizationPlan.Builder()
209        .addTarget(mergeRegionInfo1, 1).addTarget(mergeRegionInfo2, 2).build(),
210      new SplitNormalizationPlan(splitRegionInfo, 1)));
211
212    final Configuration conf = testingUtility.getConfiguration();
213    conf.setLong(RegionNormalizerWorker.CUMULATIVE_SIZE_LIMIT_MB_KEY, 5);
214
215    final RegionNormalizerWorker worker = new RegionNormalizerWorker(
216      testingUtility.getConfiguration(), masterServices, regionNormalizer, queue);
217    workerPool.submit(worker);
218    queue.put(tn);
219
220    assertThatEventually("worker should process first split plan, but not second",
221      worker::getSplitPlanCount, comparesEqualTo(1L));
222    assertThatEventually("worker should process merge plan", worker::getMergePlanCount,
223      comparesEqualTo(1L));
224  }
225
226  /**
227   * Repeatedly evaluates {@code matcher} against the result of calling {@code actualSupplier} until
228   * the matcher succeeds or the timeout period of 30 seconds is exhausted.
229   */
230  private <T> void assertThatEventually(final String reason,
231    final Supplier<? extends T> actualSupplier, final Matcher<? super T> matcher) throws Exception {
232    testingUtility.waitFor(TimeUnit.SECONDS.toMillis(30),
233      new Waiter.ExplainingPredicate<Exception>() {
234        private T lastValue = null;
235
236        @Override
237        public String explainFailure() {
238          final Description description = new StringDescription().appendText(reason)
239            .appendText("\nExpected: ").appendDescriptionOf(matcher).appendText("\n     but: ");
240          matcher.describeMismatch(lastValue, description);
241          return description.toString();
242        }
243
244        @Override
245        public boolean evaluate() {
246          lastValue = actualSupplier.get();
247          return matcher.matches(lastValue);
248        }
249      });
250  }
251}