001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.normalizer;
019
020import static java.util.Collections.singletonList;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.hamcrest.Matchers.comparesEqualTo;
023import static org.hamcrest.Matchers.greaterThan;
024import static org.hamcrest.Matchers.greaterThanOrEqualTo;
025import static org.hamcrest.Matchers.nullValue;
026import static org.junit.Assert.assertTrue;
027import static org.mockito.ArgumentMatchers.any;
028import static org.mockito.ArgumentMatchers.anyBoolean;
029import static org.mockito.ArgumentMatchers.anyLong;
030import static org.mockito.Mockito.when;
031import java.time.Duration;
032import java.util.Arrays;
033import java.util.concurrent.ExecutorService;
034import java.util.concurrent.Executors;
035import java.util.concurrent.ThreadFactory;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.atomic.AtomicReference;
038import java.util.function.Supplier;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.hbase.HBaseClassTestRule;
041import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.TableNameTestRule;
044import org.apache.hadoop.hbase.Waiter;
045import org.apache.hadoop.hbase.client.RegionInfo;
046import org.apache.hadoop.hbase.client.RegionInfoBuilder;
047import org.apache.hadoop.hbase.client.TableDescriptor;
048import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
049import org.apache.hadoop.hbase.master.MasterServices;
050import org.apache.hadoop.hbase.testclassification.MasterTests;
051import org.apache.hadoop.hbase.testclassification.SmallTests;
052import org.hamcrest.Description;
053import org.hamcrest.Matcher;
054import org.hamcrest.StringDescription;
055import org.junit.After;
056import org.junit.Before;
057import org.junit.ClassRule;
058import org.junit.Rule;
059import org.junit.Test;
060import org.junit.experimental.categories.Category;
061import org.junit.rules.TestName;
062import org.mockito.Answers;
063import org.mockito.Mock;
064import org.mockito.MockitoAnnotations;
065import org.mockito.junit.MockitoJUnit;
066import org.mockito.junit.MockitoRule;
067import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
068
069/**
070 * A test over {@link RegionNormalizerWorker}. Being a background thread, the only points of
071 * interaction we have to this class are its input source ({@link RegionNormalizerWorkQueue} and
072 * its callbacks invoked against {@link RegionNormalizer} and {@link MasterServices}. The work
073 * queue is simple enough to use directly; for {@link MasterServices}, use a mock because, as of
074 * now, the worker only invokes 4 methods.
075 */
076@Category({ MasterTests.class, SmallTests.class})
077public class TestRegionNormalizerWorker {
078
079  @ClassRule
080  public static final HBaseClassTestRule CLASS_RULE =
081    HBaseClassTestRule.forClass(TestRegionNormalizerWorker.class);
082
083  @Rule
084  public TestName testName = new TestName();
085  @Rule
086  public TableNameTestRule tableName = new TableNameTestRule();
087
088  @Rule
089  public MockitoRule mockitoRule = MockitoJUnit.rule();
090
091  @Mock(answer = Answers.RETURNS_DEEP_STUBS)
092  private MasterServices masterServices;
093  @Mock
094  private RegionNormalizer regionNormalizer;
095
096  private HBaseCommonTestingUtility testingUtility;
097  private RegionNormalizerWorkQueue<TableName> queue;
098  private ExecutorService workerPool;
099
100  private final AtomicReference<Throwable> workerThreadThrowable = new AtomicReference<>();
101
102  @Before
103  public void before() throws Exception {
104    MockitoAnnotations.initMocks(this);
105    when(masterServices.skipRegionManagementAction(any())).thenReturn(false);
106    testingUtility = new HBaseCommonTestingUtility();
107    queue = new RegionNormalizerWorkQueue<>();
108    workerThreadThrowable.set(null);
109
110    final String threadNameFmt =
111      TestRegionNormalizerWorker.class.getSimpleName() + "-" + testName.getMethodName() + "-%d";
112    final ThreadFactory threadFactory = new ThreadFactoryBuilder()
113      .setNameFormat(threadNameFmt)
114      .setDaemon(true)
115      .setUncaughtExceptionHandler((t, e) -> workerThreadThrowable.set(e))
116      .build();
117    workerPool = Executors.newSingleThreadExecutor(threadFactory);
118  }
119
120  @After
121  public void after() throws Exception {
122    workerPool.shutdownNow(); // shutdownNow to interrupt the worker thread sitting on `take()`
123    assertTrue("timeout waiting for worker thread to terminate",
124      workerPool.awaitTermination(30, TimeUnit.SECONDS));
125    final Throwable workerThrowable = workerThreadThrowable.get();
126    assertThat("worker thread threw unexpected exception", workerThrowable, nullValue());
127  }
128
129  @Test
130  public void testMergeCounter() throws Exception {
131    final TableName tn = tableName.getTableName();
132    final TableDescriptor tnDescriptor = TableDescriptorBuilder.newBuilder(tn)
133      .setNormalizationEnabled(true)
134      .build();
135    when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor);
136    when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong()))
137      .thenReturn(1L);
138    when(regionNormalizer.computePlansForTable(tn))
139      .thenReturn(singletonList(new MergeNormalizationPlan.Builder()
140        .addTarget(RegionInfoBuilder.newBuilder(tn).build(), 10)
141        .addTarget(RegionInfoBuilder.newBuilder(tn).build(), 20)
142        .build()));
143
144    final RegionNormalizerWorker worker = new RegionNormalizerWorker(
145      testingUtility.getConfiguration(), masterServices, regionNormalizer, queue);
146    final long beforeMergePlanCount = worker.getMergePlanCount();
147    workerPool.submit(worker);
148    queue.put(tn);
149
150    assertThatEventually("executing work should see plan count increase",
151      worker::getMergePlanCount, greaterThan(beforeMergePlanCount));
152  }
153
154  @Test
155  public void testSplitCounter() throws Exception {
156    final TableName tn = tableName.getTableName();
157    final TableDescriptor tnDescriptor = TableDescriptorBuilder.newBuilder(tn)
158      .setNormalizationEnabled(true)
159      .build();
160    when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor);
161    when(masterServices.splitRegion(any(), any(), anyLong(), anyLong()))
162      .thenReturn(1L);
163    when(regionNormalizer.computePlansForTable(tn))
164      .thenReturn(singletonList(
165        new SplitNormalizationPlan(RegionInfoBuilder.newBuilder(tn).build(), 10)));
166
167    final RegionNormalizerWorker worker = new RegionNormalizerWorker(
168      testingUtility.getConfiguration(), masterServices, regionNormalizer, queue);
169    final long beforeSplitPlanCount = worker.getSplitPlanCount();
170    workerPool.submit(worker);
171    queue.put(tn);
172
173    assertThatEventually("executing work should see plan count increase",
174      worker::getSplitPlanCount, greaterThan(beforeSplitPlanCount));
175  }
176
177  /**
178   * Assert that a rate limit is honored, at least in a rough way. Maintainers should manually
179   * inspect the log messages emitted by the worker thread to confirm that expected behavior.
180   */
181  @Test
182  public void testRateLimit() throws Exception {
183    final TableName tn = tableName.getTableName();
184    final TableDescriptor tnDescriptor = TableDescriptorBuilder.newBuilder(tn)
185      .setNormalizationEnabled(true)
186      .build();
187    final RegionInfo splitRegionInfo = RegionInfoBuilder.newBuilder(tn).build();
188    final RegionInfo mergeRegionInfo1 = RegionInfoBuilder.newBuilder(tn).build();
189    final RegionInfo mergeRegionInfo2 = RegionInfoBuilder.newBuilder(tn).build();
190    when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor);
191    when(masterServices.splitRegion(any(), any(), anyLong(), anyLong()))
192      .thenReturn(1L);
193    when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong()))
194      .thenReturn(1L);
195    when(regionNormalizer.computePlansForTable(tn))
196      .thenReturn(Arrays.asList(
197        new SplitNormalizationPlan(splitRegionInfo, 2),
198        new MergeNormalizationPlan.Builder()
199          .addTarget(mergeRegionInfo1, 1)
200          .addTarget(mergeRegionInfo2, 2)
201          .build(),
202        new SplitNormalizationPlan(splitRegionInfo, 1)));
203
204    final Configuration conf = testingUtility.getConfiguration();
205    conf.set("hbase.normalizer.throughput.max_bytes_per_sec", "1m");
206    final RegionNormalizerWorker worker = new RegionNormalizerWorker(
207      testingUtility.getConfiguration(), masterServices, regionNormalizer, queue);
208    workerPool.submit(worker);
209    final long startTime = System.nanoTime();
210    queue.put(tn);
211
212    assertThatEventually("executing work should see split plan count increase",
213      worker::getSplitPlanCount, comparesEqualTo(2L));
214    assertThatEventually("executing work should see merge plan count increase",
215      worker::getMergePlanCount, comparesEqualTo(1L));
216
217    final long endTime = System.nanoTime();
218    assertThat("rate limited normalizer should have taken at least 5 seconds",
219      Duration.ofNanos(endTime - startTime), greaterThanOrEqualTo(Duration.ofSeconds(5)));
220  }
221
222  /**
223   * Repeatedly evaluates {@code matcher} against the result of calling {@code actualSupplier}
224   * until the matcher succeeds or the timeout period of 30 seconds is exhausted.
225   */
226  private <T> void assertThatEventually(
227    final String reason,
228    final Supplier<? extends T> actualSupplier,
229    final Matcher<? super T> matcher
230  ) throws Exception {
231    testingUtility.waitFor(TimeUnit.SECONDS.toMillis(30),
232      new Waiter.ExplainingPredicate<Exception>() {
233        private T lastValue = null;
234
235        @Override
236        public String explainFailure() {
237          final Description description = new StringDescription()
238            .appendText(reason)
239            .appendText("\nExpected: ")
240            .appendDescriptionOf(matcher)
241            .appendText("\n     but: ");
242          matcher.describeMismatch(lastValue, description);
243          return description.toString();
244        }
245
246        @Override public boolean evaluate() {
247          lastValue = actualSupplier.get();
248          return matcher.matches(lastValue);
249        }
250      });
251  }
252}