001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mttr;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assume.assumeFalse;
022
023import io.opentelemetry.api.trace.Span;
024import io.opentelemetry.context.Scope;
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.concurrent.Callable;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.Executors;
030import java.util.concurrent.Future;
031import java.util.concurrent.TimeUnit;
032import org.apache.commons.lang3.RandomStringUtils;
033import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
034import org.apache.hadoop.hbase.ClusterMetrics;
035import org.apache.hadoop.hbase.IntegrationTestingUtility;
036import org.apache.hadoop.hbase.InvalidFamilyOperationException;
037import org.apache.hadoop.hbase.NamespaceExistException;
038import org.apache.hadoop.hbase.NamespaceNotFoundException;
039import org.apache.hadoop.hbase.TableExistsException;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.TableNotFoundException;
042import org.apache.hadoop.hbase.chaos.actions.Action;
043import org.apache.hadoop.hbase.chaos.actions.MoveRegionsOfTableAction;
044import org.apache.hadoop.hbase.chaos.actions.RestartActiveMasterAction;
045import org.apache.hadoop.hbase.chaos.actions.RestartRsHoldingMetaAction;
046import org.apache.hadoop.hbase.chaos.actions.RestartRsHoldingTableAction;
047import org.apache.hadoop.hbase.chaos.factories.MonkeyConstants;
048import org.apache.hadoop.hbase.client.Admin;
049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
050import org.apache.hadoop.hbase.client.Put;
051import org.apache.hadoop.hbase.client.Result;
052import org.apache.hadoop.hbase.client.ResultScanner;
053import org.apache.hadoop.hbase.client.RetriesExhaustedException;
054import org.apache.hadoop.hbase.client.Scan;
055import org.apache.hadoop.hbase.client.Table;
056import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
057import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
058import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
059import org.apache.hadoop.hbase.ipc.FatalConnectionException;
060import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
061import org.apache.hadoop.hbase.security.AccessDeniedException;
062import org.apache.hadoop.hbase.testclassification.IntegrationTests;
063import org.apache.hadoop.hbase.trace.TraceUtil;
064import org.apache.hadoop.hbase.util.Bytes;
065import org.apache.hadoop.hbase.util.LoadTestTool;
066import org.junit.AfterClass;
067import org.junit.BeforeClass;
068import org.junit.Test;
069import org.junit.experimental.categories.Category;
070import org.slf4j.Logger;
071import org.slf4j.LoggerFactory;
072
073import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
074
075/**
076 * Integration test that should benchmark how fast HBase can recover from failures. This test starts
077 * different threads:
078 * <ol>
079 * <li>Load Test Tool.<br/>
080 * This runs so that all RegionServers will have some load and WALs will be full.</li>
081 * <li>Scan thread.<br/>
082 * This thread runs a very short scan over and over again recording how log it takes to respond. The
083 * longest response is assumed to be the time it took to recover.</li>
084 * <li>Put thread.<br/>
085 * This thread just like the scan thread except it does a very small put.</li>
086 * <li>Admin thread. <br/>
087 * This thread will continually go to the master to try and get the cluster status. Just like the
088 * put and scan threads, the time to respond is recorded.</li>
089 * <li>Chaos Monkey thread.<br/>
090 * This thread runs a ChaosMonkey.Action.</li>
091 * </ol>
092 * <p/>
093 * The ChaosMonkey actions currently run are:
094 * <ul>
095 * <li>Restart the RegionServer holding meta.</li>
096 * <li>Move the Regions of meta.</li>
097 * <li>Restart the RegionServer holding the table the scan and put threads are targeting.</li>
098 * <li>Move the Regions of the table used by the scan and put threads.</li>
099 * <li>Restart the master.</li>
100 * </ul>
101 * <p/>
102 * At the end of the test a log line is output on the INFO level containing the timing data that was
103 * collected.
104 */
105@Category(IntegrationTests.class)
106public class IntegrationTestMTTR {
107  /**
108   * Constants.
109   */
110  private static final byte[] FAMILY = Bytes.toBytes("d");
111  private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestMTTR.class);
112  private static long sleepTime;
113  private static final String SLEEP_TIME_KEY = "hbase.IntegrationTestMTTR.sleeptime";
114  private static final long SLEEP_TIME_DEFAULT = 60 * 1000l;
115
116  /**
117   * Configurable table names.
118   */
119  private static TableName tableName;
120  private static TableName loadTableName;
121
122  /**
123   * Util to get at the cluster.
124   */
125  private static IntegrationTestingUtility util;
126
127  /**
128   * Executor for test threads.
129   */
130  private static ExecutorService executorService;
131
132  /**
133   * All of the chaos monkey actions used.
134   */
135  private static Action restartRSAction;
136  private static Action restartMetaAction;
137  private static Action moveMetaRegionsAction;
138  private static Action moveRegionAction;
139  private static Action restartMasterAction;
140
141  /**
142   * The load test tool used to create load and make sure that WALs aren't empty.
143   */
144  private static LoadTestTool loadTool;
145
146  @BeforeClass
147  public static void setUp() throws Exception {
148    // Set up the integration test util
149    if (util == null) {
150      util = new IntegrationTestingUtility();
151    }
152
153    // Make sure there are three servers.
154    util.initializeCluster(3);
155
156    // Set up the load test tool.
157    loadTool = new LoadTestTool();
158    loadTool.setConf(util.getConfiguration());
159
160    // Create executor with enough threads to restart rs's,
161    // run scans, puts, admin ops and load test tool.
162    executorService = Executors.newFixedThreadPool(8);
163
164    // Set up the tables needed.
165    setupTables();
166
167    // Set up the actions.
168    sleepTime = util.getConfiguration().getLong(SLEEP_TIME_KEY, SLEEP_TIME_DEFAULT);
169    setupActions();
170  }
171
172  private static void setupActions() throws IOException {
173    // allow a little more time for RS restart actions because RS start depends on having a master
174    // to report to and the master is also being monkeyed.
175    util.getConfiguration().setLong(Action.START_RS_TIMEOUT_KEY, 3 * 60 * 1000);
176
177    // Set up the action that will restart a region server holding a region from our table
178    // because this table should only have one region we should be good.
179    restartRSAction =
180      new RestartRsHoldingTableAction(sleepTime, util.getConnection().getRegionLocator(tableName));
181
182    // Set up the action that will kill the region holding meta.
183    restartMetaAction = new RestartRsHoldingMetaAction(sleepTime);
184
185    // Set up the action that will move the regions of meta.
186    moveMetaRegionsAction = new MoveRegionsOfTableAction(sleepTime,
187      MonkeyConstants.DEFAULT_MOVE_REGIONS_MAX_TIME, TableName.META_TABLE_NAME);
188
189    // Set up the action that will move the regions of our table.
190    moveRegionAction = new MoveRegionsOfTableAction(sleepTime,
191      MonkeyConstants.DEFAULT_MOVE_REGIONS_MAX_TIME, tableName);
192
193    // Kill the master
194    restartMasterAction = new RestartActiveMasterAction(1000);
195
196    // Give the action the access to the cluster.
197    Action.ActionContext actionContext = new Action.ActionContext(util);
198    restartRSAction.init(actionContext);
199    restartMetaAction.init(actionContext);
200    moveMetaRegionsAction.init(actionContext);
201    moveRegionAction.init(actionContext);
202    restartMasterAction.init(actionContext);
203  }
204
205  private static void setupTables() throws IOException {
206    // Get the table name.
207    tableName = TableName.valueOf(
208      util.getConfiguration().get("hbase.IntegrationTestMTTR.tableName", "IntegrationTestMTTR"));
209
210    loadTableName = TableName.valueOf(util.getConfiguration()
211      .get("hbase.IntegrationTestMTTR.loadTableName", "IntegrationTestMTTRLoadTestTool"));
212
213    if (util.getAdmin().tableExists(tableName)) {
214      util.deleteTable(tableName);
215    }
216
217    if (util.getAdmin().tableExists(loadTableName)) {
218      util.deleteTable(loadTableName);
219    }
220
221    // Create the table. If this fails then fail everything.
222    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
223
224    // Make the max file size huge so that splits don't happen during the test.
225    builder.setMaxFileSize(Long.MAX_VALUE);
226
227    ColumnFamilyDescriptorBuilder colDescriptorBldr =
228      ColumnFamilyDescriptorBuilder.newBuilder(FAMILY);
229    colDescriptorBldr.setMaxVersions(1);
230    builder.setColumnFamily(colDescriptorBldr.build());
231    util.getAdmin().createTable(builder.build());
232
233    // Setup the table for LoadTestTool
234    int ret = loadTool.run(new String[] { "-tn", loadTableName.getNameAsString(), "-init_only" });
235    assertEquals("Failed to initialize LoadTestTool", 0, ret);
236  }
237
238  @AfterClass
239  public static void after() throws IOException {
240    // Clean everything up.
241    util.restoreCluster();
242    util = null;
243
244    // Stop the threads so that we know everything is complete.
245    executorService.shutdown();
246    executorService = null;
247
248    // Clean up the actions.
249    moveRegionAction = null;
250    restartMetaAction = null;
251    moveMetaRegionsAction = null;
252    restartRSAction = null;
253    restartMasterAction = null;
254
255    loadTool = null;
256  }
257
258  private static boolean tablesOnMaster() {
259    boolean ret = true;
260    String value = util.getConfiguration().get("hbase.balancer.tablesOnMaster");
261    if (value != null && value.equalsIgnoreCase("none")) {
262      ret = false;
263    }
264    return ret;
265  }
266
267  @Test
268  public void testRestartRsHoldingTable() throws Exception {
269    run(new ActionCallable(restartRSAction), "RestartRsHoldingTableAction");
270  }
271
272  @Test
273  public void testKillRsHoldingMeta() throws Exception {
274    assumeFalse(tablesOnMaster());
275    run(new ActionCallable(restartMetaAction), "KillRsHoldingMeta");
276  }
277
278  @Test
279  public void testMoveMeta() throws Exception {
280    run(new ActionCallable(moveMetaRegionsAction), "MoveMeta");
281  }
282
283  @Test
284  public void testMoveRegion() throws Exception {
285    run(new ActionCallable(moveRegionAction), "MoveRegion");
286  }
287
288  @Test
289  public void testRestartMaster() throws Exception {
290    run(new ActionCallable(restartMasterAction), "RestartMaster");
291  }
292
293  public void run(Callable<Boolean> monkeyCallable, String testName) throws Exception {
294    int maxIters = util.getHBaseClusterInterface().isDistributedCluster() ? 10 : 3;
295    LOG.info("Starting " + testName + " with " + maxIters + " iterations.");
296
297    // Array to keep track of times.
298    ArrayList<TimingResult> resultPuts = new ArrayList<>(maxIters);
299    ArrayList<TimingResult> resultScan = new ArrayList<>(maxIters);
300    ArrayList<TimingResult> resultAdmin = new ArrayList<>(maxIters);
301    long start = System.nanoTime();
302
303    try {
304      // We're going to try this multiple times
305      for (int fullIterations = 0; fullIterations < maxIters; fullIterations++) {
306        // Create and start executing a callable that will kill the servers
307        Future<Boolean> monkeyFuture = executorService.submit(monkeyCallable);
308
309        // Pass that future to the timing Callables.
310        Future<TimingResult> putFuture = executorService.submit(new PutCallable(monkeyFuture));
311        Future<TimingResult> scanFuture = executorService.submit(new ScanCallable(monkeyFuture));
312        Future<TimingResult> adminFuture = executorService.submit(new AdminCallable(monkeyFuture));
313
314        Future<Boolean> loadFuture = executorService.submit(new LoadCallable(monkeyFuture));
315
316        monkeyFuture.get();
317        loadFuture.get();
318
319        // Get the values from the futures.
320        TimingResult putTime = putFuture.get();
321        TimingResult scanTime = scanFuture.get();
322        TimingResult adminTime = adminFuture.get();
323
324        // Store the times to display later.
325        resultPuts.add(putTime);
326        resultScan.add(scanTime);
327        resultAdmin.add(adminTime);
328
329        // Wait some time for everything to settle down.
330        Thread.sleep(5000l);
331      }
332    } catch (Exception e) {
333      long runtimeMs =
334        TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS);
335      LOG.info(testName + " failed after " + runtimeMs + "ms.", e);
336      throw e;
337    }
338
339    long runtimeMs = TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS);
340
341    MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper("MTTRResults")
342      .add("putResults", resultPuts).add("scanResults", resultScan).add("adminResults", resultAdmin)
343      .add("totalRuntimeMs", runtimeMs).add("name", testName);
344
345    // Log the info
346    LOG.info(helper.toString());
347  }
348
349  /**
350   * Class to store results of TimingCallable. Stores times and trace id.
351   */
352  private static class TimingResult {
353    DescriptiveStatistics stats = new DescriptiveStatistics();
354    ArrayList<String> traces = new ArrayList<>(10);
355
356    /**
357     * Add a result to this aggregate result.
358     * @param time Time in nanoseconds
359     * @param span Span. To be kept if the time taken was over 1 second
360     */
361    public void addResult(long time, Span span) {
362      stats.addValue(TimeUnit.MILLISECONDS.convert(time, TimeUnit.NANOSECONDS));
363      if (TimeUnit.SECONDS.convert(time, TimeUnit.NANOSECONDS) >= 1) {
364        traces.add(span.getSpanContext().getTraceId());
365      }
366    }
367
368    @Override
369    public String toString() {
370      MoreObjects.ToStringHelper helper =
371        MoreObjects.toStringHelper(this).add("numResults", stats.getN())
372          .add("minTime", stats.getMin()).add("meanTime", stats.getMean())
373          .add("maxTime", stats.getMax()).add("25th", stats.getPercentile(25))
374          .add("50th", stats.getPercentile(50)).add("75th", stats.getPercentile(75))
375          .add("90th", stats.getPercentile(90)).add("95th", stats.getPercentile(95))
376          .add("99th", stats.getPercentile(99)).add("99.9th", stats.getPercentile(99.9))
377          .add("99.99th", stats.getPercentile(99.99)).add("traces", traces);
378      return helper.toString();
379    }
380  }
381
382  /**
383   * Base class for actions that need to record the time needed to recover from a failure.
384   */
385  static abstract class TimingCallable implements Callable<TimingResult> {
386    protected final Future<?> future;
387
388    public TimingCallable(Future<?> f) {
389      future = f;
390    }
391
392    @Override
393    public TimingResult call() throws Exception {
394      TimingResult result = new TimingResult();
395      final int maxIterations = 10;
396      int numAfterDone = 0;
397      int resetCount = 0;
398      // Keep trying until the rs is back up and we've gotten a put through
399      while (numAfterDone < maxIterations) {
400        long start = System.nanoTime();
401        Span span = TraceUtil.getGlobalTracer().spanBuilder(getSpanName()).startSpan();
402        try (Scope scope = span.makeCurrent()) {
403          boolean actionResult = doAction();
404          if (actionResult && future.isDone()) {
405            numAfterDone++;
406          }
407
408          // the following Exceptions derive from DoNotRetryIOException. They are considered
409          // fatal for the purpose of this test. If we see one of these, it means something is
410          // broken and needs investigation. This is not the case for all children of DNRIOE.
411          // Unfortunately, this is an explicit enumeration and will need periodically refreshed.
412          // See HBASE-9655 for further discussion.
413        } catch (AccessDeniedException e) {
414          throw e;
415        } catch (CoprocessorException e) {
416          throw e;
417        } catch (FatalConnectionException e) {
418          throw e;
419        } catch (InvalidFamilyOperationException e) {
420          throw e;
421        } catch (NamespaceExistException e) {
422          throw e;
423        } catch (NamespaceNotFoundException e) {
424          throw e;
425        } catch (NoSuchColumnFamilyException e) {
426          throw e;
427        } catch (TableExistsException e) {
428          throw e;
429        } catch (TableNotFoundException e) {
430          throw e;
431        } catch (RetriesExhaustedException e) {
432          throw e;
433          // Everything else is potentially recoverable on the application side. For instance, a CM
434          // action kills the RS that hosted a scanner the client was using. Continued use of that
435          // scanner should be terminated, but a new scanner can be created and the read attempted
436          // again.
437        } catch (Exception e) {
438          resetCount++;
439          if (resetCount < maxIterations) {
440            LOG.info(
441              "Non-fatal exception while running " + this.toString() + ". Resetting loop counter",
442              e);
443            numAfterDone = 0;
444          } else {
445            LOG.info("Too many unexpected Exceptions. Aborting.", e);
446            throw e;
447          }
448        } finally {
449          span.end();
450        }
451        result.addResult(System.nanoTime() - start, span);
452      }
453      return result;
454    }
455
456    protected abstract boolean doAction() throws Exception;
457
458    protected String getSpanName() {
459      return this.getClass().getSimpleName();
460    }
461
462    @Override
463    public String toString() {
464      return this.getSpanName();
465    }
466  }
467
468  /**
469   * Callable that will keep putting small amounts of data into a table until the future supplied
470   * returns. It keeps track of the max time.
471   */
472  static class PutCallable extends TimingCallable {
473
474    private final Table table;
475
476    public PutCallable(Future<?> f) throws IOException {
477      super(f);
478      this.table = util.getConnection().getTable(tableName);
479    }
480
481    @Override
482    protected boolean doAction() throws Exception {
483      Put p = new Put(Bytes.toBytes(RandomStringUtils.randomAlphanumeric(5)));
484      p.addColumn(FAMILY, Bytes.toBytes("\0"), Bytes.toBytes(RandomStringUtils.randomAscii(5)));
485      table.put(p);
486      return true;
487    }
488
489    @Override
490    protected String getSpanName() {
491      return "MTTR Put Test";
492    }
493  }
494
495  /**
496   * Callable that will keep scanning for small amounts of data until the supplied future returns.
497   * Returns the max time taken to scan.
498   */
499  static class ScanCallable extends TimingCallable {
500    private final Table table;
501
502    public ScanCallable(Future<?> f) throws IOException {
503      super(f);
504      this.table = util.getConnection().getTable(tableName);
505    }
506
507    @Override
508    protected boolean doAction() throws Exception {
509      ResultScanner rs = null;
510      try {
511        Scan s = new Scan();
512        s.setBatch(2);
513        s.addFamily(FAMILY);
514        s.setFilter(new KeyOnlyFilter());
515        s.readVersions(1);
516
517        rs = table.getScanner(s);
518        Result result = rs.next();
519        return result != null && result.size() > 0;
520      } finally {
521        if (rs != null) {
522          rs.close();
523        }
524      }
525    }
526
527    @Override
528    protected String getSpanName() {
529      return "MTTR Scan Test";
530    }
531  }
532
533  /**
534   * Callable that will keep going to the master for cluster status. Returns the max time taken.
535   */
536  static class AdminCallable extends TimingCallable {
537
538    public AdminCallable(Future<?> f) throws IOException {
539      super(f);
540    }
541
542    @Override
543    protected boolean doAction() throws Exception {
544      Admin admin = null;
545      try {
546        admin = util.getAdmin();
547        ClusterMetrics status = admin.getClusterMetrics();
548        return status != null;
549      } finally {
550        if (admin != null) {
551          admin.close();
552        }
553      }
554    }
555
556    @Override
557    protected String getSpanName() {
558      return "MTTR Admin Test";
559    }
560  }
561
562  static class ActionCallable implements Callable<Boolean> {
563    private final Action action;
564
565    public ActionCallable(Action action) {
566      this.action = action;
567    }
568
569    @Override
570    public Boolean call() throws Exception {
571      this.action.perform();
572      return true;
573    }
574  }
575
576  /**
577   * Callable used to make sure the cluster has some load on it. This callable uses LoadTest tool to
578   */
579  public static class LoadCallable implements Callable<Boolean> {
580
581    private final Future<?> future;
582
583    public LoadCallable(Future<?> f) {
584      future = f;
585    }
586
587    @Override
588    public Boolean call() throws Exception {
589      int colsPerKey = 10;
590      int numServers =
591        util.getHBaseClusterInterface().getInitialClusterMetrics().getLiveServerMetrics().size();
592      int numKeys = numServers * 5000;
593      int writeThreads = 10;
594
595      // Loop until the chaos monkey future is done.
596      // But always go in just in case some action completes quickly
597      do {
598        int ret = loadTool.run(new String[] { "-tn", loadTableName.getNameAsString(), "-write",
599          String.format("%d:%d:%d", colsPerKey, 500, writeThreads), "-num_keys",
600          String.valueOf(numKeys), "-skip_init" });
601        assertEquals("Load failed", 0, ret);
602      } while (!future.isDone());
603
604      return true;
605    }
606  }
607}