001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.master;
020
021import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
022import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_wait_for_zk_delete;
023import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_final_transition_failed;
024import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_preempt_task;
025import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_acquired;
026import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_done;
027import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_err;
028import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_resigned;
029import static org.junit.Assert.assertEquals;
030import static org.junit.Assert.assertFalse;
031import static org.junit.Assert.assertTrue;
032import static org.junit.Assert.fail;
033
034import java.io.IOException;
035import java.util.ArrayList;
036import java.util.Arrays;
037import java.util.Iterator;
038import java.util.List;
039import java.util.NavigableSet;
040import java.util.concurrent.ExecutorService;
041import java.util.concurrent.Executors;
042import java.util.concurrent.Future;
043import java.util.concurrent.TimeUnit;
044import java.util.concurrent.TimeoutException;
045import java.util.concurrent.atomic.LongAdder;
046import org.apache.hadoop.conf.Configuration;
047import org.apache.hadoop.fs.FSDataOutputStream;
048import org.apache.hadoop.fs.FileStatus;
049import org.apache.hadoop.fs.FileSystem;
050import org.apache.hadoop.fs.Path;
051import org.apache.hadoop.fs.PathFilter;
052import org.apache.hadoop.hbase.HBaseTestingUtility;
053import org.apache.hadoop.hbase.HConstants;
054import org.apache.hadoop.hbase.KeyValue;
055import org.apache.hadoop.hbase.MiniHBaseCluster;
056import org.apache.hadoop.hbase.NamespaceDescriptor;
057import org.apache.hadoop.hbase.ServerName;
058import org.apache.hadoop.hbase.SplitLogCounters;
059import org.apache.hadoop.hbase.StartMiniClusterOption;
060import org.apache.hadoop.hbase.TableName;
061import org.apache.hadoop.hbase.Waiter;
062import org.apache.hadoop.hbase.client.Put;
063import org.apache.hadoop.hbase.client.RegionInfo;
064import org.apache.hadoop.hbase.client.RegionInfoBuilder;
065import org.apache.hadoop.hbase.client.RegionLocator;
066import org.apache.hadoop.hbase.client.Table;
067import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
068import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
069import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
070import org.apache.hadoop.hbase.master.assignment.RegionStates;
071import org.apache.hadoop.hbase.regionserver.HRegionServer;
072import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
073import org.apache.hadoop.hbase.regionserver.Region;
074import org.apache.hadoop.hbase.util.Bytes;
075import org.apache.hadoop.hbase.util.FSUtils;
076import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
077import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
078import org.apache.hadoop.hbase.util.Threads;
079import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
080import org.apache.hadoop.hbase.wal.WAL;
081import org.apache.hadoop.hbase.wal.WALEdit;
082import org.apache.hadoop.hbase.wal.WALFactory;
083import org.apache.hadoop.hbase.wal.WALKeyImpl;
084import org.apache.hadoop.hbase.wal.WALSplitUtil;
085import org.apache.hadoop.hbase.zookeeper.ZKUtil;
086import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
087import org.junit.After;
088import org.junit.AfterClass;
089import org.junit.Before;
090import org.junit.BeforeClass;
091import org.junit.Rule;
092import org.junit.Test;
093import org.junit.rules.TestName;
094import org.slf4j.Logger;
095import org.slf4j.LoggerFactory;
096
097import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
098
099/**
100 * Base class for testing distributed log splitting.
101 */
102public abstract class AbstractTestDLS {
103  private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogManager.class);
104
105  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
106
107  // Start a cluster with 2 masters and 5 regionservers
108  private static final int NUM_MASTERS = 2;
109  private static final int NUM_RS = 5;
110  private static byte[] COLUMN_FAMILY = Bytes.toBytes("family");
111
112  @Rule
113  public TestName testName = new TestName();
114
115  private TableName tableName;
116  private MiniHBaseCluster cluster;
117  private HMaster master;
118  private Configuration conf;
119
120  @Rule
121  public TestName name = new TestName();
122
123  @BeforeClass
124  public static void setup() throws Exception {
125    // Uncomment the following line if more verbosity is needed for
126    // debugging (see HBASE-12285 for details).
127    // Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
128    TEST_UTIL.startMiniZKCluster();
129    TEST_UTIL.startMiniDFSCluster(3);
130  }
131
132  @AfterClass
133  public static void tearDown() throws Exception {
134    TEST_UTIL.shutdownMiniCluster();
135  }
136
137  protected abstract String getWalProvider();
138
139  private void startCluster(int numRS) throws Exception {
140    SplitLogCounters.resetCounters();
141    LOG.info("Starting cluster");
142    conf.setLong("hbase.splitlog.max.resubmit", 0);
143    // Make the failure test faster
144    conf.setInt("zookeeper.recovery.retry", 0);
145    conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1);
146    conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing
147    conf.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 3);
148    conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
149    conf.set("hbase.wal.provider", getWalProvider());
150    StartMiniClusterOption option = StartMiniClusterOption.builder()
151        .numMasters(NUM_MASTERS).numRegionServers(numRS).build();
152    TEST_UTIL.startMiniHBaseCluster(option);
153    cluster = TEST_UTIL.getHBaseCluster();
154    LOG.info("Waiting for active/ready master");
155    cluster.waitForActiveAndReadyMaster();
156    master = cluster.getMaster();
157    TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
158      @Override
159      public boolean evaluate() throws Exception {
160        return cluster.getLiveRegionServerThreads().size() >= numRS;
161      }
162    });
163  }
164
165  @Before
166  public void before() throws Exception {
167    conf = TEST_UTIL.getConfiguration();
168    tableName = TableName.valueOf(testName.getMethodName());
169  }
170
171  @After
172  public void after() throws Exception {
173    TEST_UTIL.shutdownMiniHBaseCluster();
174    TEST_UTIL.getTestFileSystem().delete(FSUtils.getRootDir(TEST_UTIL.getConfiguration()), true);
175    ZKUtil.deleteNodeRecursively(TEST_UTIL.getZooKeeperWatcher(), "/hbase");
176  }
177
178  @Test
179  public void testRecoveredEdits() throws Exception {
180    conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal
181    startCluster(NUM_RS);
182
183    int numLogLines = 10000;
184    SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
185    // turn off load balancing to prevent regions from moving around otherwise
186    // they will consume recovered.edits
187    master.balanceSwitch(false);
188    FileSystem fs = master.getMasterFileSystem().getFileSystem();
189
190    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
191
192    Path rootdir = FSUtils.getRootDir(conf);
193
194    int numRegions = 50;
195    try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null);
196        Table t = installTable(zkw, numRegions)) {
197      TableName table = t.getName();
198      List<RegionInfo> regions = null;
199      HRegionServer hrs = null;
200      for (int i = 0; i < NUM_RS; i++) {
201        hrs = rsts.get(i).getRegionServer();
202        regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
203        // At least one RS will have >= to average number of regions.
204        if (regions.size() >= numRegions / NUM_RS) {
205          break;
206        }
207      }
208      Path logDir = new Path(rootdir,
209          AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString()));
210
211      LOG.info("#regions = " + regions.size());
212      Iterator<RegionInfo> it = regions.iterator();
213      while (it.hasNext()) {
214        RegionInfo region = it.next();
215        if (region.getTable().getNamespaceAsString()
216            .equals(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR)) {
217          it.remove();
218        }
219      }
220
221      makeWAL(hrs, regions, numLogLines, 100);
222
223      slm.splitLogDistributed(logDir);
224
225      int count = 0;
226      for (RegionInfo hri : regions) {
227        Path tdir = FSUtils.getWALTableDir(conf, table);
228        @SuppressWarnings("deprecation")
229        Path editsdir = WALSplitUtil
230            .getRegionDirRecoveredEditsDir(FSUtils.getWALRegionDir(conf,
231                tableName, hri.getEncodedName()));
232        LOG.debug("checking edits dir " + editsdir);
233        FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
234          @Override
235          public boolean accept(Path p) {
236            if (WALSplitUtil.isSequenceIdFile(p)) {
237              return false;
238            }
239            return true;
240          }
241        });
242        assertTrue(
243          "edits dir should have more than a single file in it. instead has " + files.length,
244          files.length > 1);
245        for (int i = 0; i < files.length; i++) {
246          int c = countWAL(files[i].getPath(), fs, conf);
247          count += c;
248        }
249        LOG.info(count + " edits in " + files.length + " recovered edits files.");
250      }
251
252      // check that the log file is moved
253      assertFalse(fs.exists(logDir));
254      assertEquals(numLogLines, count);
255    }
256  }
257
258  @Test
259  public void testMasterStartsUpWithLogSplittingWork() throws Exception {
260    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1);
261    startCluster(NUM_RS);
262
263    int numRegionsToCreate = 40;
264    int numLogLines = 1000;
265    // turn off load balancing to prevent regions from moving around otherwise
266    // they will consume recovered.edits
267    master.balanceSwitch(false);
268
269    try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null);
270        Table ht = installTable(zkw, numRegionsToCreate);) {
271      HRegionServer hrs = findRSToKill(false);
272      List<RegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
273      makeWAL(hrs, regions, numLogLines, 100);
274
275      // abort master
276      abortMaster(cluster);
277
278      // abort RS
279      LOG.info("Aborting region server: " + hrs.getServerName());
280      hrs.abort("testing");
281
282      // wait for abort completes
283      TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
284        @Override
285        public boolean evaluate() throws Exception {
286          return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 1;
287        }
288      });
289
290      Thread.sleep(2000);
291      LOG.info("Current Open Regions:" + HBaseTestingUtility.getAllOnlineRegions(cluster).size());
292
293      // wait for abort completes
294      TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
295        @Override
296        public boolean evaluate() throws Exception {
297          return (HBaseTestingUtility.getAllOnlineRegions(cluster)
298              .size() >= (numRegionsToCreate + 1));
299        }
300      });
301
302      LOG.info("Current Open Regions After Master Node Starts Up:" +
303          HBaseTestingUtility.getAllOnlineRegions(cluster).size());
304
305      assertEquals(numLogLines, TEST_UTIL.countRows(ht));
306    }
307  }
308
309  /**
310   * The original intention of this test was to force an abort of a region server and to make sure
311   * that the failure path in the region servers is properly evaluated. But it is difficult to
312   * ensure that the region server doesn't finish the log splitting before it aborts. Also now,
313   * there is this code path where the master will preempt the region server when master detects
314   * that the region server has aborted.
315   * @throws Exception
316   */
317  // Was marked flaky before Distributed Log Replay cleanup.
318  @Test
319  public void testWorkerAbort() throws Exception {
320    LOG.info("testWorkerAbort");
321    startCluster(3);
322    int numLogLines = 10000;
323    SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
324    FileSystem fs = master.getMasterFileSystem().getFileSystem();
325
326    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
327    HRegionServer hrs = findRSToKill(false);
328    Path rootdir = FSUtils.getRootDir(conf);
329    final Path logDir = new Path(rootdir,
330        AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString()));
331
332    try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null);
333        Table t = installTable(zkw, 40)) {
334      makeWAL(hrs, ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()), numLogLines, 100);
335
336      new Thread() {
337        @Override
338        public void run() {
339          try {
340            waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
341          } catch (InterruptedException e) {
342          }
343          for (RegionServerThread rst : rsts) {
344            rst.getRegionServer().abort("testing");
345            break;
346          }
347        }
348      }.start();
349      FileStatus[] logfiles = fs.listStatus(logDir);
350      TaskBatch batch = new TaskBatch();
351      slm.enqueueSplitTask(logfiles[0].getPath().toString(), batch);
352      // waitForCounter but for one of the 2 counters
353      long curt = System.currentTimeMillis();
354      long waitTime = 80000;
355      long endt = curt + waitTime;
356      while (curt < endt) {
357        if ((tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() +
358            tot_wkr_final_transition_failed.sum() + tot_wkr_task_done.sum() +
359            tot_wkr_preempt_task.sum()) == 0) {
360          Thread.sleep(100);
361          curt = System.currentTimeMillis();
362        } else {
363          assertTrue(1 <= (tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() +
364              tot_wkr_final_transition_failed.sum() + tot_wkr_task_done.sum() +
365              tot_wkr_preempt_task.sum()));
366          return;
367        }
368      }
369      fail("none of the following counters went up in " + waitTime + " milliseconds - " +
370          "tot_wkr_task_resigned, tot_wkr_task_err, " +
371          "tot_wkr_final_transition_failed, tot_wkr_task_done, " + "tot_wkr_preempt_task");
372    }
373  }
374
375  @Test
376  public void testThreeRSAbort() throws Exception {
377    LOG.info("testThreeRSAbort");
378    int numRegionsToCreate = 40;
379    int numRowsPerRegion = 100;
380
381    startCluster(NUM_RS); // NUM_RS=6.
382
383    try (ZKWatcher zkw = new ZKWatcher(conf, "distributed log splitting test", null);
384        Table table = installTable(zkw, numRegionsToCreate)) {
385      populateDataInTable(numRowsPerRegion);
386
387      List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
388      assertEquals(NUM_RS, rsts.size());
389      cluster.killRegionServer(rsts.get(0).getRegionServer().getServerName());
390      cluster.killRegionServer(rsts.get(1).getRegionServer().getServerName());
391      cluster.killRegionServer(rsts.get(2).getRegionServer().getServerName());
392
393      TEST_UTIL.waitFor(60000, new Waiter.ExplainingPredicate<Exception>() {
394
395        @Override
396        public boolean evaluate() throws Exception {
397          return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 3;
398        }
399
400        @Override
401        public String explainFailure() throws Exception {
402          return "Timed out waiting for server aborts.";
403        }
404      });
405      TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
406      int rows;
407      try {
408        rows = TEST_UTIL.countRows(table);
409      } catch (Exception e) {
410        Threads.printThreadInfo(System.out, "Thread dump before fail");
411        throw e;
412      }
413      assertEquals(numRegionsToCreate * numRowsPerRegion, rows);
414    }
415  }
416
417  @Test
418  public void testDelayedDeleteOnFailure() throws Exception {
419    LOG.info("testDelayedDeleteOnFailure");
420    startCluster(1);
421    final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
422    final FileSystem fs = master.getMasterFileSystem().getFileSystem();
423    final Path logDir = new Path(new Path(FSUtils.getWALRootDir(conf), HConstants.HREGION_LOGDIR_NAME),
424        ServerName.valueOf("x", 1, 1).toString());
425    fs.mkdirs(logDir);
426    ExecutorService executor = null;
427    try {
428      final Path corruptedLogFile = new Path(logDir, "x");
429      FSDataOutputStream out;
430      out = fs.create(corruptedLogFile);
431      out.write(0);
432      out.write(Bytes.toBytes("corrupted bytes"));
433      out.close();
434      ZKSplitLogManagerCoordination coordination =
435          (ZKSplitLogManagerCoordination) (master.getCoordinatedStateManager())
436              .getSplitLogManagerCoordination();
437      coordination.setIgnoreDeleteForTesting(true);
438      executor = Executors.newSingleThreadExecutor();
439      Runnable runnable = new Runnable() {
440        @Override
441        public void run() {
442          try {
443            // since the logDir is a fake, corrupted one, so the split log worker
444            // will finish it quickly with error, and this call will fail and throw
445            // an IOException.
446            slm.splitLogDistributed(logDir);
447          } catch (IOException ioe) {
448            try {
449              assertTrue(fs.exists(corruptedLogFile));
450              // this call will block waiting for the task to be removed from the
451              // tasks map which is not going to happen since ignoreZKDeleteForTesting
452              // is set to true, until it is interrupted.
453              slm.splitLogDistributed(logDir);
454            } catch (IOException e) {
455              assertTrue(Thread.currentThread().isInterrupted());
456              return;
457            }
458            fail("did not get the expected IOException from the 2nd call");
459          }
460          fail("did not get the expected IOException from the 1st call");
461        }
462      };
463      Future<?> result = executor.submit(runnable);
464      try {
465        result.get(2000, TimeUnit.MILLISECONDS);
466      } catch (TimeoutException te) {
467        // it is ok, expected.
468      }
469      waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000);
470      executor.shutdownNow();
471      executor = null;
472
473      // make sure the runnable is finished with no exception thrown.
474      result.get();
475    } finally {
476      if (executor != null) {
477        // interrupt the thread in case the test fails in the middle.
478        // it has no effect if the thread is already terminated.
479        executor.shutdownNow();
480      }
481      fs.delete(logDir, true);
482    }
483  }
484
485  private Table installTable(ZKWatcher zkw, int nrs) throws Exception {
486    return installTable(zkw, nrs, 0);
487  }
488
489  private Table installTable(ZKWatcher zkw, int nrs, int existingRegions) throws Exception {
490    // Create a table with regions
491    byte[] family = Bytes.toBytes("family");
492    LOG.info("Creating table with " + nrs + " regions");
493    Table table = TEST_UTIL.createMultiRegionTable(tableName, family, nrs);
494    int numRegions = -1;
495    try (RegionLocator r = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
496      numRegions = r.getStartKeys().length;
497    }
498    assertEquals(nrs, numRegions);
499    LOG.info("Waiting for no more RIT\n");
500    blockUntilNoRIT(zkw, master);
501    // disable-enable cycle to get rid of table's dead regions left behind
502    // by createMultiRegions
503    assertTrue(TEST_UTIL.getAdmin().isTableEnabled(tableName));
504    LOG.debug("Disabling table\n");
505    TEST_UTIL.getAdmin().disableTable(tableName);
506    LOG.debug("Waiting for no more RIT\n");
507    blockUntilNoRIT(zkw, master);
508    NavigableSet<String> regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
509    LOG.debug("Verifying only catalog and namespace regions are assigned\n");
510    if (regions.size() != 2) {
511      for (String oregion : regions)
512        LOG.debug("Region still online: " + oregion);
513    }
514    assertEquals(2 + existingRegions, regions.size());
515    LOG.debug("Enabling table\n");
516    TEST_UTIL.getAdmin().enableTable(tableName);
517    LOG.debug("Waiting for no more RIT\n");
518    blockUntilNoRIT(zkw, master);
519    LOG.debug("Verifying there are " + numRegions + " assigned on cluster\n");
520    regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
521    assertEquals(numRegions + 2 + existingRegions, regions.size());
522    return table;
523  }
524
525  void populateDataInTable(int nrows) throws Exception {
526    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
527    assertEquals(NUM_RS, rsts.size());
528
529    for (RegionServerThread rst : rsts) {
530      HRegionServer hrs = rst.getRegionServer();
531      List<RegionInfo> hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
532      for (RegionInfo hri : hris) {
533        if (hri.getTable().isSystemTable()) {
534          continue;
535        }
536        LOG.debug(
537          "adding data to rs = " + rst.getName() + " region = " + hri.getRegionNameAsString());
538        Region region = hrs.getOnlineRegion(hri.getRegionName());
539        assertTrue(region != null);
540        putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY);
541      }
542    }
543
544    for (MasterThread mt : cluster.getLiveMasterThreads()) {
545      HRegionServer hrs = mt.getMaster();
546      List<RegionInfo> hris;
547      try {
548        hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
549      } catch (ServerNotRunningYetException e) {
550        // It's ok: this master may be a backup. Ignored.
551        continue;
552      }
553      for (RegionInfo hri : hris) {
554        if (hri.getTable().isSystemTable()) {
555          continue;
556        }
557        LOG.debug(
558          "adding data to rs = " + mt.getName() + " region = " + hri.getRegionNameAsString());
559        Region region = hrs.getOnlineRegion(hri.getRegionName());
560        assertTrue(region != null);
561        putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY);
562      }
563    }
564  }
565
566  public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int num_edits, int edit_size)
567      throws IOException {
568    makeWAL(hrs, regions, num_edits, edit_size, true);
569  }
570
571  public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int numEdits, int editSize,
572      boolean cleanShutdown) throws IOException {
573    // remove root and meta region
574    regions.remove(RegionInfoBuilder.FIRST_META_REGIONINFO);
575
576    for (Iterator<RegionInfo> iter = regions.iterator(); iter.hasNext();) {
577      RegionInfo regionInfo = iter.next();
578      if (regionInfo.getTable().isSystemTable()) {
579        iter.remove();
580      }
581    }
582    byte[] value = new byte[editSize];
583
584    List<RegionInfo> hris = new ArrayList<>();
585    for (RegionInfo region : regions) {
586      if (region.getTable() != tableName) {
587        continue;
588      }
589      hris.add(region);
590    }
591    LOG.info("Creating wal edits across " + hris.size() + " regions.");
592    for (int i = 0; i < editSize; i++) {
593      value[i] = (byte) ('a' + (i % 26));
594    }
595    int n = hris.size();
596    int[] counts = new int[n];
597    // sync every ~30k to line up with desired wal rolls
598    final int syncEvery = 30 * 1024 / editSize;
599    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
600    if (n > 0) {
601      for (int i = 0; i < numEdits; i += 1) {
602        WALEdit e = new WALEdit();
603        RegionInfo curRegionInfo = hris.get(i % n);
604        WAL log = hrs.getWAL(curRegionInfo);
605        byte[] startRow = curRegionInfo.getStartKey();
606        if (startRow == null || startRow.length == 0) {
607          startRow = new byte[] { 0, 0, 0, 0, 1 };
608        }
609        byte[] row = Bytes.incrementBytes(startRow, counts[i % n]);
610        row = Arrays.copyOfRange(row, 3, 8); // use last 5 bytes because
611        // HBaseTestingUtility.createMultiRegions use 5 bytes key
612        byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i));
613        e.add(new KeyValue(row, COLUMN_FAMILY, qualifier, System.currentTimeMillis(), value));
614        log.appendData(curRegionInfo, new WALKeyImpl(curRegionInfo.getEncodedNameAsBytes(),
615          tableName, System.currentTimeMillis(), mvcc), e);
616        if (0 == i % syncEvery) {
617          log.sync();
618        }
619        counts[i % n] += 1;
620      }
621    }
622    // done as two passes because the regions might share logs. shutdown is idempotent, but sync
623    // will cause errors if done after.
624    for (RegionInfo info : hris) {
625      WAL log = hrs.getWAL(info);
626      log.sync();
627    }
628    if (cleanShutdown) {
629      for (RegionInfo info : hris) {
630        WAL log = hrs.getWAL(info);
631        log.shutdown();
632      }
633    }
634    for (int i = 0; i < n; i++) {
635      LOG.info("region " + hris.get(i).getRegionNameAsString() + " has " + counts[i] + " edits");
636    }
637    return;
638  }
639
640  private int countWAL(Path log, FileSystem fs, Configuration conf) throws IOException {
641    int count = 0;
642    try (WAL.Reader in = WALFactory.createReader(fs, log, conf)) {
643      WAL.Entry e;
644      while ((e = in.next()) != null) {
645        if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) {
646          count++;
647        }
648      }
649    }
650    return count;
651  }
652
653  private void blockUntilNoRIT(ZKWatcher zkw, HMaster master) throws Exception {
654    TEST_UTIL.waitUntilNoRegionsInTransition(60000);
655  }
656
657  private void putData(Region region, byte[] startRow, int numRows, byte[] qf, byte[]... families)
658      throws IOException {
659    for (int i = 0; i < numRows; i++) {
660      Put put = new Put(Bytes.add(startRow, Bytes.toBytes(i)));
661      for (byte[] family : families) {
662        put.addColumn(family, qf, null);
663      }
664      region.put(put);
665    }
666  }
667
668  private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems)
669      throws InterruptedException {
670    long curt = System.currentTimeMillis();
671    long endt = curt + timems;
672    while (curt < endt) {
673      if (ctr.sum() == oldval) {
674        Thread.sleep(100);
675        curt = System.currentTimeMillis();
676      } else {
677        assertEquals(newval, ctr.sum());
678        return;
679      }
680    }
681    fail();
682  }
683
684  private void abortMaster(MiniHBaseCluster cluster) throws InterruptedException {
685    for (MasterThread mt : cluster.getLiveMasterThreads()) {
686      if (mt.getMaster().isActiveMaster()) {
687        mt.getMaster().abort("Aborting for tests", new Exception("Trace info"));
688        mt.join();
689        break;
690      }
691    }
692    LOG.debug("Master is aborted");
693  }
694
695  /**
696   * Find a RS that has regions of a table.
697   * @param hasMetaRegion when true, the returned RS has hbase:meta region as well
698   */
699  private HRegionServer findRSToKill(boolean hasMetaRegion) throws Exception {
700    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
701    List<RegionInfo> regions = null;
702    HRegionServer hrs = null;
703
704    for (RegionServerThread rst : rsts) {
705      hrs = rst.getRegionServer();
706      while (rst.isAlive() && !hrs.isOnline()) {
707        Thread.sleep(100);
708      }
709      if (!rst.isAlive()) {
710        continue;
711      }
712      boolean isCarryingMeta = false;
713      boolean foundTableRegion = false;
714      regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
715      for (RegionInfo region : regions) {
716        if (region.isMetaRegion()) {
717          isCarryingMeta = true;
718        }
719        if (region.getTable() == tableName) {
720          foundTableRegion = true;
721        }
722        if (foundTableRegion && (isCarryingMeta || !hasMetaRegion)) {
723          break;
724        }
725      }
726      if (isCarryingMeta && hasMetaRegion) {
727        // clients ask for a RS with META
728        if (!foundTableRegion) {
729          HRegionServer destRS = hrs;
730          // the RS doesn't have regions of the specified table so we need move one to this RS
731          List<RegionInfo> tableRegions = TEST_UTIL.getAdmin().getRegions(tableName);
732          RegionInfo hri = tableRegions.get(0);
733          TEST_UTIL.getAdmin().move(hri.getEncodedNameAsBytes(), destRS.getServerName());
734          // wait for region move completes
735          RegionStates regionStates =
736              TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
737          TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() {
738            @Override
739            public boolean evaluate() throws Exception {
740              ServerName sn = regionStates.getRegionServerOfRegion(hri);
741              return (sn != null && sn.equals(destRS.getServerName()));
742            }
743          });
744        }
745        return hrs;
746      } else if (hasMetaRegion || isCarryingMeta) {
747        continue;
748      }
749      if (foundTableRegion) {
750        break;
751      }
752    }
753
754    return hrs;
755  }
756}