001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.normalizer;
019
020import static java.util.Collections.singletonList;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.hamcrest.Matchers.comparesEqualTo;
023import static org.hamcrest.Matchers.greaterThan;
024import static org.hamcrest.Matchers.greaterThanOrEqualTo;
025import static org.hamcrest.Matchers.nullValue;
026import static org.junit.Assert.assertTrue;
027import static org.mockito.ArgumentMatchers.any;
028import static org.mockito.ArgumentMatchers.anyBoolean;
029import static org.mockito.ArgumentMatchers.anyLong;
030import static org.mockito.Mockito.when;
031
032import java.time.Duration;
033import java.util.Arrays;
034import java.util.concurrent.ExecutorService;
035import java.util.concurrent.Executors;
036import java.util.concurrent.ThreadFactory;
037import java.util.concurrent.TimeUnit;
038import java.util.concurrent.atomic.AtomicReference;
039import java.util.function.Supplier;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.hbase.HBaseClassTestRule;
042import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.TableNameTestRule;
045import org.apache.hadoop.hbase.Waiter;
046import org.apache.hadoop.hbase.client.RegionInfo;
047import org.apache.hadoop.hbase.client.RegionInfoBuilder;
048import org.apache.hadoop.hbase.client.TableDescriptor;
049import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
050import org.apache.hadoop.hbase.master.MasterServices;
051import org.apache.hadoop.hbase.testclassification.MasterTests;
052import org.apache.hadoop.hbase.testclassification.SmallTests;
053import org.hamcrest.Description;
054import org.hamcrest.Matcher;
055import org.hamcrest.StringDescription;
056import org.junit.After;
057import org.junit.Before;
058import org.junit.ClassRule;
059import org.junit.Rule;
060import org.junit.Test;
061import org.junit.experimental.categories.Category;
062import org.junit.rules.TestName;
063import org.mockito.Answers;
064import org.mockito.Mock;
065import org.mockito.MockitoAnnotations;
066import org.mockito.junit.MockitoJUnit;
067import org.mockito.junit.MockitoRule;
068
069import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
070
071/**
072 * A test over {@link RegionNormalizerWorker}. Being a background thread, the only points of
073 * interaction we have to this class are its input source ({@link RegionNormalizerWorkQueue} and its
074 * callbacks invoked against {@link RegionNormalizer} and {@link MasterServices}. The work queue is
075 * simple enough to use directly; for {@link MasterServices}, use a mock because, as of now, the
076 * worker only invokes 4 methods.
077 */
078@Category({ MasterTests.class, SmallTests.class })
079public class TestRegionNormalizerWorker {
080
081  @ClassRule
082  public static final HBaseClassTestRule CLASS_RULE =
083    HBaseClassTestRule.forClass(TestRegionNormalizerWorker.class);
084
085  @Rule
086  public TestName testName = new TestName();
087  @Rule
088  public TableNameTestRule tableName = new TableNameTestRule();
089
090  @Rule
091  public MockitoRule mockitoRule = MockitoJUnit.rule();
092
093  @Mock(answer = Answers.RETURNS_DEEP_STUBS)
094  private MasterServices masterServices;
095  @Mock
096  private RegionNormalizer regionNormalizer;
097
098  private HBaseCommonTestingUtil testingUtility;
099  private RegionNormalizerWorkQueue<TableName> queue;
100  private ExecutorService workerPool;
101
102  private final AtomicReference<Throwable> workerThreadThrowable = new AtomicReference<>();
103
104  @Before
105  public void before() throws Exception {
106    MockitoAnnotations.initMocks(this);
107    when(masterServices.skipRegionManagementAction(any())).thenReturn(false);
108    testingUtility = new HBaseCommonTestingUtil();
109    queue = new RegionNormalizerWorkQueue<>();
110    workerThreadThrowable.set(null);
111
112    final String threadNameFmt =
113      TestRegionNormalizerWorker.class.getSimpleName() + "-" + testName.getMethodName() + "-%d";
114    final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(threadNameFmt)
115      .setDaemon(true).setUncaughtExceptionHandler((t, e) -> workerThreadThrowable.set(e)).build();
116    workerPool = Executors.newSingleThreadExecutor(threadFactory);
117  }
118
119  @After
120  public void after() throws Exception {
121    workerPool.shutdownNow(); // shutdownNow to interrupt the worker thread sitting on `take()`
122    assertTrue("timeout waiting for worker thread to terminate",
123      workerPool.awaitTermination(30, TimeUnit.SECONDS));
124    final Throwable workerThrowable = workerThreadThrowable.get();
125    assertThat("worker thread threw unexpected exception", workerThrowable, nullValue());
126  }
127
128  @Test
129  public void testMergeCounter() throws Exception {
130    final TableName tn = tableName.getTableName();
131    final TableDescriptor tnDescriptor =
132      TableDescriptorBuilder.newBuilder(tn).setNormalizationEnabled(true).build();
133    when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor);
134    when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong())).thenReturn(1L);
135    when(regionNormalizer.computePlansForTable(tnDescriptor)).thenReturn(singletonList(
136      new MergeNormalizationPlan.Builder().addTarget(RegionInfoBuilder.newBuilder(tn).build(), 10)
137        .addTarget(RegionInfoBuilder.newBuilder(tn).build(), 20).build()));
138
139    final RegionNormalizerWorker worker = new RegionNormalizerWorker(
140      testingUtility.getConfiguration(), masterServices, regionNormalizer, queue);
141    final long beforeMergePlanCount = worker.getMergePlanCount();
142    workerPool.submit(worker);
143    queue.put(tn);
144
145    assertThatEventually("executing work should see plan count increase", worker::getMergePlanCount,
146      greaterThan(beforeMergePlanCount));
147  }
148
149  @Test
150  public void testSplitCounter() throws Exception {
151    final TableName tn = tableName.getTableName();
152    final TableDescriptor tnDescriptor =
153      TableDescriptorBuilder.newBuilder(tn).setNormalizationEnabled(true).build();
154    when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor);
155    when(masterServices.splitRegion(any(), any(), anyLong(), anyLong())).thenReturn(1L);
156    when(regionNormalizer.computePlansForTable(tnDescriptor)).thenReturn(
157      singletonList(new SplitNormalizationPlan(RegionInfoBuilder.newBuilder(tn).build(), 10)));
158
159    final RegionNormalizerWorker worker = new RegionNormalizerWorker(
160      testingUtility.getConfiguration(), masterServices, regionNormalizer, queue);
161    final long beforeSplitPlanCount = worker.getSplitPlanCount();
162    workerPool.submit(worker);
163    queue.put(tn);
164
165    assertThatEventually("executing work should see plan count increase", worker::getSplitPlanCount,
166      greaterThan(beforeSplitPlanCount));
167  }
168
169  /**
170   * Assert that a rate limit is honored, at least in a rough way. Maintainers should manually
171   * inspect the log messages emitted by the worker thread to confirm that expected behavior.
172   */
173  @Test
174  public void testRateLimit() throws Exception {
175    final TableName tn = tableName.getTableName();
176    final TableDescriptor tnDescriptor =
177      TableDescriptorBuilder.newBuilder(tn).setNormalizationEnabled(true).build();
178    final RegionInfo splitRegionInfo = RegionInfoBuilder.newBuilder(tn).build();
179    final RegionInfo mergeRegionInfo1 = RegionInfoBuilder.newBuilder(tn).build();
180    final RegionInfo mergeRegionInfo2 = RegionInfoBuilder.newBuilder(tn).build();
181    when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor);
182    when(masterServices.splitRegion(any(), any(), anyLong(), anyLong())).thenReturn(1L);
183    when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong())).thenReturn(1L);
184    when(regionNormalizer.computePlansForTable(tnDescriptor)).thenReturn(Arrays.asList(
185      new SplitNormalizationPlan(splitRegionInfo, 2), new MergeNormalizationPlan.Builder()
186        .addTarget(mergeRegionInfo1, 1).addTarget(mergeRegionInfo2, 2).build(),
187      new SplitNormalizationPlan(splitRegionInfo, 1)));
188
189    final Configuration conf = testingUtility.getConfiguration();
190    conf.set("hbase.normalizer.throughput.max_bytes_per_sec", "1m");
191    final RegionNormalizerWorker worker = new RegionNormalizerWorker(
192      testingUtility.getConfiguration(), masterServices, regionNormalizer, queue);
193    workerPool.submit(worker);
194    final long startTime = System.nanoTime();
195    queue.put(tn);
196
197    assertThatEventually("executing work should see split plan count increase",
198      worker::getSplitPlanCount, comparesEqualTo(2L));
199    assertThatEventually("executing work should see merge plan count increase",
200      worker::getMergePlanCount, comparesEqualTo(1L));
201
202    final long endTime = System.nanoTime();
203    assertThat("rate limited normalizer should have taken at least 5 seconds",
204      Duration.ofNanos(endTime - startTime), greaterThanOrEqualTo(Duration.ofSeconds(5)));
205  }
206
207  @Test
208  public void testPlansSizeLimit() throws Exception {
209    final TableName tn = tableName.getTableName();
210    final TableDescriptor tnDescriptor =
211      TableDescriptorBuilder.newBuilder(tn).setNormalizationEnabled(true).build();
212    final RegionInfo splitRegionInfo = RegionInfoBuilder.newBuilder(tn).build();
213    final RegionInfo mergeRegionInfo1 = RegionInfoBuilder.newBuilder(tn).build();
214    final RegionInfo mergeRegionInfo2 = RegionInfoBuilder.newBuilder(tn).build();
215    when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor);
216    when(masterServices.splitRegion(any(), any(), anyLong(), anyLong())).thenReturn(1L);
217    when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong())).thenReturn(1L);
218    when(regionNormalizer.computePlansForTable(tnDescriptor)).thenReturn(Arrays.asList(
219      new SplitNormalizationPlan(splitRegionInfo, 2), new MergeNormalizationPlan.Builder()
220        .addTarget(mergeRegionInfo1, 1).addTarget(mergeRegionInfo2, 2).build(),
221      new SplitNormalizationPlan(splitRegionInfo, 1)));
222
223    final Configuration conf = testingUtility.getConfiguration();
224    conf.setLong(RegionNormalizerWorker.CUMULATIVE_SIZE_LIMIT_MB_KEY, 5);
225
226    final RegionNormalizerWorker worker = new RegionNormalizerWorker(
227      testingUtility.getConfiguration(), masterServices, regionNormalizer, queue);
228    workerPool.submit(worker);
229    queue.put(tn);
230
231    assertThatEventually("worker should process first split plan, but not second",
232      worker::getSplitPlanCount, comparesEqualTo(1L));
233    assertThatEventually("worker should process merge plan", worker::getMergePlanCount,
234      comparesEqualTo(1L));
235  }
236
237  /**
238   * Repeatedly evaluates {@code matcher} against the result of calling {@code actualSupplier} until
239   * the matcher succeeds or the timeout period of 30 seconds is exhausted.
240   */
241  private <T> void assertThatEventually(final String reason,
242    final Supplier<? extends T> actualSupplier, final Matcher<? super T> matcher) throws Exception {
243    testingUtility.waitFor(TimeUnit.SECONDS.toMillis(30),
244      new Waiter.ExplainingPredicate<Exception>() {
245        private T lastValue = null;
246
247        @Override
248        public String explainFailure() {
249          final Description description = new StringDescription().appendText(reason)
250            .appendText("\nExpected: ").appendDescriptionOf(matcher).appendText("\n     but: ");
251          matcher.describeMismatch(lastValue, description);
252          return description.toString();
253        }
254
255        @Override
256        public boolean evaluate() {
257          lastValue = actualSupplier.get();
258          return matcher.matches(lastValue);
259        }
260      });
261  }
262}