001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.master;
020
021import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_wait_for_zk_delete;
022import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_final_transition_failed;
023import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_preempt_task;
024import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_acquired;
025import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_done;
026import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_err;
027import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_resigned;
028import static org.junit.Assert.assertEquals;
029import static org.junit.Assert.assertFalse;
030import static org.junit.Assert.assertTrue;
031import static org.junit.Assert.fail;
032
033import java.io.IOException;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.Iterator;
037import java.util.List;
038import java.util.NavigableSet;
039import java.util.concurrent.ExecutorService;
040import java.util.concurrent.Executors;
041import java.util.concurrent.Future;
042import java.util.concurrent.TimeUnit;
043import java.util.concurrent.TimeoutException;
044import java.util.concurrent.atomic.LongAdder;
045import org.apache.hadoop.conf.Configuration;
046import org.apache.hadoop.fs.FSDataOutputStream;
047import org.apache.hadoop.fs.FileStatus;
048import org.apache.hadoop.fs.FileSystem;
049import org.apache.hadoop.fs.Path;
050import org.apache.hadoop.fs.PathFilter;
051import org.apache.hadoop.hbase.HBaseTestingUtility;
052import org.apache.hadoop.hbase.HConstants;
053import org.apache.hadoop.hbase.KeyValue;
054import org.apache.hadoop.hbase.MiniHBaseCluster;
055import org.apache.hadoop.hbase.NamespaceDescriptor;
056import org.apache.hadoop.hbase.ServerName;
057import org.apache.hadoop.hbase.SplitLogCounters;
058import org.apache.hadoop.hbase.TableName;
059import org.apache.hadoop.hbase.Waiter;
060import org.apache.hadoop.hbase.client.Put;
061import org.apache.hadoop.hbase.client.RegionInfo;
062import org.apache.hadoop.hbase.client.RegionInfoBuilder;
063import org.apache.hadoop.hbase.client.RegionLocator;
064import org.apache.hadoop.hbase.client.Table;
065import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
066import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
067import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
068import org.apache.hadoop.hbase.master.assignment.RegionStates;
069import org.apache.hadoop.hbase.regionserver.HRegionServer;
070import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
071import org.apache.hadoop.hbase.regionserver.Region;
072import org.apache.hadoop.hbase.util.Bytes;
073import org.apache.hadoop.hbase.util.FSUtils;
074import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
075import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
076import org.apache.hadoop.hbase.util.Threads;
077import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
078import org.apache.hadoop.hbase.wal.WAL;
079import org.apache.hadoop.hbase.wal.WALEdit;
080import org.apache.hadoop.hbase.wal.WALFactory;
081import org.apache.hadoop.hbase.wal.WALKeyImpl;
082import org.apache.hadoop.hbase.wal.WALSplitter;
083import org.apache.hadoop.hbase.zookeeper.ZKUtil;
084import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
085import org.junit.After;
086import org.junit.AfterClass;
087import org.junit.Before;
088import org.junit.BeforeClass;
089import org.junit.Rule;
090import org.junit.Test;
091import org.junit.rules.TestName;
092import org.slf4j.Logger;
093import org.slf4j.LoggerFactory;
094
095import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
096
097/**
098 * Base class for testing distributed log splitting.
099 */
100public abstract class AbstractTestDLS {
101  private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogManager.class);
102
103  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
104
105  // Start a cluster with 2 masters and 5 regionservers
106  private static final int NUM_MASTERS = 2;
107  private static final int NUM_RS = 5;
108  private static byte[] COLUMN_FAMILY = Bytes.toBytes("family");
109
110  @Rule
111  public TestName testName = new TestName();
112
113  private TableName tableName;
114  private MiniHBaseCluster cluster;
115  private HMaster master;
116  private Configuration conf;
117
118  @Rule
119  public TestName name = new TestName();
120
121  @BeforeClass
122  public static void setup() throws Exception {
123    // Uncomment the following line if more verbosity is needed for
124    // debugging (see HBASE-12285 for details).
125    // Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
126    TEST_UTIL.startMiniZKCluster();
127    TEST_UTIL.startMiniDFSCluster(3);
128  }
129
130  @AfterClass
131  public static void tearDown() throws Exception {
132    TEST_UTIL.shutdownMiniCluster();
133  }
134
135  protected abstract String getWalProvider();
136
137  private void startCluster(int numRS) throws Exception {
138    SplitLogCounters.resetCounters();
139    LOG.info("Starting cluster");
140    conf.setLong("hbase.splitlog.max.resubmit", 0);
141    // Make the failure test faster
142    conf.setInt("zookeeper.recovery.retry", 0);
143    conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1);
144    conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing
145    conf.setInt("hbase.regionserver.wal.max.splitters", 3);
146    conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
147    conf.set("hbase.wal.provider", getWalProvider());
148    TEST_UTIL.startMiniHBaseCluster(NUM_MASTERS, numRS);
149    cluster = TEST_UTIL.getHBaseCluster();
150    LOG.info("Waiting for active/ready master");
151    cluster.waitForActiveAndReadyMaster();
152    master = cluster.getMaster();
153    TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
154      @Override
155      public boolean evaluate() throws Exception {
156        return cluster.getLiveRegionServerThreads().size() >= numRS;
157      }
158    });
159  }
160
161  @Before
162  public void before() throws Exception {
163    conf = TEST_UTIL.getConfiguration();
164    tableName = TableName.valueOf(testName.getMethodName());
165  }
166
167  @After
168  public void after() throws Exception {
169    TEST_UTIL.shutdownMiniHBaseCluster();
170    TEST_UTIL.getTestFileSystem().delete(FSUtils.getRootDir(TEST_UTIL.getConfiguration()), true);
171    ZKUtil.deleteNodeRecursively(TEST_UTIL.getZooKeeperWatcher(), "/hbase");
172  }
173
174  @Test
175  public void testRecoveredEdits() throws Exception {
176    conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal
177    startCluster(NUM_RS);
178
179    int numLogLines = 10000;
180    SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
181    // turn off load balancing to prevent regions from moving around otherwise
182    // they will consume recovered.edits
183    master.balanceSwitch(false);
184    FileSystem fs = master.getMasterFileSystem().getFileSystem();
185
186    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
187
188    Path rootdir = FSUtils.getRootDir(conf);
189
190    int numRegions = 50;
191    try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null);
192        Table t = installTable(zkw, numRegions)) {
193      TableName table = t.getName();
194      List<RegionInfo> regions = null;
195      HRegionServer hrs = null;
196      for (int i = 0; i < NUM_RS; i++) {
197        hrs = rsts.get(i).getRegionServer();
198        regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
199        // At least one RS will have >= to average number of regions.
200        if (regions.size() >= numRegions / NUM_RS) {
201          break;
202        }
203      }
204      Path logDir = new Path(rootdir,
205          AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString()));
206
207      LOG.info("#regions = " + regions.size());
208      Iterator<RegionInfo> it = regions.iterator();
209      while (it.hasNext()) {
210        RegionInfo region = it.next();
211        if (region.getTable().getNamespaceAsString()
212            .equals(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR)) {
213          it.remove();
214        }
215      }
216
217      makeWAL(hrs, regions, numLogLines, 100);
218
219      slm.splitLogDistributed(logDir);
220
221      int count = 0;
222      for (RegionInfo hri : regions) {
223        Path tdir = FSUtils.getWALTableDir(conf, table);
224        @SuppressWarnings("deprecation")
225        Path editsdir = WALSplitter
226            .getRegionDirRecoveredEditsDir(FSUtils.getWALRegionDir(conf,
227                tableName, hri.getEncodedName()));
228        LOG.debug("checking edits dir " + editsdir);
229        FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
230          @Override
231          public boolean accept(Path p) {
232            if (WALSplitter.isSequenceIdFile(p)) {
233              return false;
234            }
235            return true;
236          }
237        });
238        assertTrue(
239          "edits dir should have more than a single file in it. instead has " + files.length,
240          files.length > 1);
241        for (int i = 0; i < files.length; i++) {
242          int c = countWAL(files[i].getPath(), fs, conf);
243          count += c;
244        }
245        LOG.info(count + " edits in " + files.length + " recovered edits files.");
246      }
247
248      // check that the log file is moved
249      assertFalse(fs.exists(logDir));
250      assertEquals(numLogLines, count);
251    }
252  }
253
254  @Test
255  public void testMasterStartsUpWithLogSplittingWork() throws Exception {
256    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1);
257    startCluster(NUM_RS);
258
259    int numRegionsToCreate = 40;
260    int numLogLines = 1000;
261    // turn off load balancing to prevent regions from moving around otherwise
262    // they will consume recovered.edits
263    master.balanceSwitch(false);
264
265    try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null);
266        Table ht = installTable(zkw, numRegionsToCreate);) {
267      HRegionServer hrs = findRSToKill(false);
268      List<RegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
269      makeWAL(hrs, regions, numLogLines, 100);
270
271      // abort master
272      abortMaster(cluster);
273
274      // abort RS
275      LOG.info("Aborting region server: " + hrs.getServerName());
276      hrs.abort("testing");
277
278      // wait for abort completes
279      TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
280        @Override
281        public boolean evaluate() throws Exception {
282          return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 1;
283        }
284      });
285
286      Thread.sleep(2000);
287      LOG.info("Current Open Regions:" + HBaseTestingUtility.getAllOnlineRegions(cluster).size());
288
289      // wait for abort completes
290      TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
291        @Override
292        public boolean evaluate() throws Exception {
293          return (HBaseTestingUtility.getAllOnlineRegions(cluster)
294              .size() >= (numRegionsToCreate + 1));
295        }
296      });
297
298      LOG.info("Current Open Regions After Master Node Starts Up:" +
299          HBaseTestingUtility.getAllOnlineRegions(cluster).size());
300
301      assertEquals(numLogLines, TEST_UTIL.countRows(ht));
302    }
303  }
304
305  /**
306   * The original intention of this test was to force an abort of a region server and to make sure
307   * that the failure path in the region servers is properly evaluated. But it is difficult to
308   * ensure that the region server doesn't finish the log splitting before it aborts. Also now,
309   * there is this code path where the master will preempt the region server when master detects
310   * that the region server has aborted.
311   * @throws Exception
312   */
313  // Was marked flaky before Distributed Log Replay cleanup.
314  @Test
315  public void testWorkerAbort() throws Exception {
316    LOG.info("testWorkerAbort");
317    startCluster(3);
318    int numLogLines = 10000;
319    SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
320    FileSystem fs = master.getMasterFileSystem().getFileSystem();
321
322    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
323    HRegionServer hrs = findRSToKill(false);
324    Path rootdir = FSUtils.getRootDir(conf);
325    final Path logDir = new Path(rootdir,
326        AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString()));
327
328    try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null);
329        Table t = installTable(zkw, 40)) {
330      makeWAL(hrs, ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()), numLogLines, 100);
331
332      new Thread() {
333        @Override
334        public void run() {
335          try {
336            waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
337          } catch (InterruptedException e) {
338          }
339          for (RegionServerThread rst : rsts) {
340            rst.getRegionServer().abort("testing");
341            break;
342          }
343        }
344      }.start();
345      FileStatus[] logfiles = fs.listStatus(logDir);
346      TaskBatch batch = new TaskBatch();
347      slm.enqueueSplitTask(logfiles[0].getPath().toString(), batch);
348      // waitForCounter but for one of the 2 counters
349      long curt = System.currentTimeMillis();
350      long waitTime = 80000;
351      long endt = curt + waitTime;
352      while (curt < endt) {
353        if ((tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() +
354            tot_wkr_final_transition_failed.sum() + tot_wkr_task_done.sum() +
355            tot_wkr_preempt_task.sum()) == 0) {
356          Thread.sleep(100);
357          curt = System.currentTimeMillis();
358        } else {
359          assertTrue(1 <= (tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() +
360              tot_wkr_final_transition_failed.sum() + tot_wkr_task_done.sum() +
361              tot_wkr_preempt_task.sum()));
362          return;
363        }
364      }
365      fail("none of the following counters went up in " + waitTime + " milliseconds - " +
366          "tot_wkr_task_resigned, tot_wkr_task_err, " +
367          "tot_wkr_final_transition_failed, tot_wkr_task_done, " + "tot_wkr_preempt_task");
368    }
369  }
370
371  @Test
372  public void testThreeRSAbort() throws Exception {
373    LOG.info("testThreeRSAbort");
374    int numRegionsToCreate = 40;
375    int numRowsPerRegion = 100;
376
377    startCluster(NUM_RS); // NUM_RS=6.
378
379    try (ZKWatcher zkw = new ZKWatcher(conf, "distributed log splitting test", null);
380        Table table = installTable(zkw, numRegionsToCreate)) {
381      populateDataInTable(numRowsPerRegion);
382
383      List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
384      assertEquals(NUM_RS, rsts.size());
385      cluster.killRegionServer(rsts.get(0).getRegionServer().getServerName());
386      cluster.killRegionServer(rsts.get(1).getRegionServer().getServerName());
387      cluster.killRegionServer(rsts.get(2).getRegionServer().getServerName());
388
389      TEST_UTIL.waitFor(60000, new Waiter.ExplainingPredicate<Exception>() {
390
391        @Override
392        public boolean evaluate() throws Exception {
393          return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 3;
394        }
395
396        @Override
397        public String explainFailure() throws Exception {
398          return "Timed out waiting for server aborts.";
399        }
400      });
401      TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
402      int rows;
403      try {
404        rows = TEST_UTIL.countRows(table);
405      } catch (Exception e) {
406        Threads.printThreadInfo(System.out, "Thread dump before fail");
407        throw e;
408      }
409      assertEquals(numRegionsToCreate * numRowsPerRegion, rows);
410    }
411  }
412
413  @Test
414  public void testDelayedDeleteOnFailure() throws Exception {
415    LOG.info("testDelayedDeleteOnFailure");
416    startCluster(1);
417    final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
418    final FileSystem fs = master.getMasterFileSystem().getFileSystem();
419    final Path logDir = new Path(new Path(FSUtils.getWALRootDir(conf), HConstants.HREGION_LOGDIR_NAME),
420        ServerName.valueOf("x", 1, 1).toString());
421    fs.mkdirs(logDir);
422    ExecutorService executor = null;
423    try {
424      final Path corruptedLogFile = new Path(logDir, "x");
425      FSDataOutputStream out;
426      out = fs.create(corruptedLogFile);
427      out.write(0);
428      out.write(Bytes.toBytes("corrupted bytes"));
429      out.close();
430      ZKSplitLogManagerCoordination coordination =
431          (ZKSplitLogManagerCoordination) (master.getCoordinatedStateManager())
432              .getSplitLogManagerCoordination();
433      coordination.setIgnoreDeleteForTesting(true);
434      executor = Executors.newSingleThreadExecutor();
435      Runnable runnable = new Runnable() {
436        @Override
437        public void run() {
438          try {
439            // since the logDir is a fake, corrupted one, so the split log worker
440            // will finish it quickly with error, and this call will fail and throw
441            // an IOException.
442            slm.splitLogDistributed(logDir);
443          } catch (IOException ioe) {
444            try {
445              assertTrue(fs.exists(corruptedLogFile));
446              // this call will block waiting for the task to be removed from the
447              // tasks map which is not going to happen since ignoreZKDeleteForTesting
448              // is set to true, until it is interrupted.
449              slm.splitLogDistributed(logDir);
450            } catch (IOException e) {
451              assertTrue(Thread.currentThread().isInterrupted());
452              return;
453            }
454            fail("did not get the expected IOException from the 2nd call");
455          }
456          fail("did not get the expected IOException from the 1st call");
457        }
458      };
459      Future<?> result = executor.submit(runnable);
460      try {
461        result.get(2000, TimeUnit.MILLISECONDS);
462      } catch (TimeoutException te) {
463        // it is ok, expected.
464      }
465      waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000);
466      executor.shutdownNow();
467      executor = null;
468
469      // make sure the runnable is finished with no exception thrown.
470      result.get();
471    } finally {
472      if (executor != null) {
473        // interrupt the thread in case the test fails in the middle.
474        // it has no effect if the thread is already terminated.
475        executor.shutdownNow();
476      }
477      fs.delete(logDir, true);
478    }
479  }
480
481  private Table installTable(ZKWatcher zkw, int nrs) throws Exception {
482    return installTable(zkw, nrs, 0);
483  }
484
485  private Table installTable(ZKWatcher zkw, int nrs, int existingRegions) throws Exception {
486    // Create a table with regions
487    byte[] family = Bytes.toBytes("family");
488    LOG.info("Creating table with " + nrs + " regions");
489    Table table = TEST_UTIL.createMultiRegionTable(tableName, family, nrs);
490    int numRegions = -1;
491    try (RegionLocator r = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
492      numRegions = r.getStartKeys().length;
493    }
494    assertEquals(nrs, numRegions);
495    LOG.info("Waiting for no more RIT\n");
496    blockUntilNoRIT(zkw, master);
497    // disable-enable cycle to get rid of table's dead regions left behind
498    // by createMultiRegions
499    assertTrue(TEST_UTIL.getAdmin().isTableEnabled(tableName));
500    LOG.debug("Disabling table\n");
501    TEST_UTIL.getAdmin().disableTable(tableName);
502    LOG.debug("Waiting for no more RIT\n");
503    blockUntilNoRIT(zkw, master);
504    NavigableSet<String> regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
505    LOG.debug("Verifying only catalog and namespace regions are assigned\n");
506    if (regions.size() != 2) {
507      for (String oregion : regions)
508        LOG.debug("Region still online: " + oregion);
509    }
510    assertEquals(2 + existingRegions, regions.size());
511    LOG.debug("Enabling table\n");
512    TEST_UTIL.getAdmin().enableTable(tableName);
513    LOG.debug("Waiting for no more RIT\n");
514    blockUntilNoRIT(zkw, master);
515    LOG.debug("Verifying there are " + numRegions + " assigned on cluster\n");
516    regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
517    assertEquals(numRegions + 2 + existingRegions, regions.size());
518    return table;
519  }
520
521  void populateDataInTable(int nrows) throws Exception {
522    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
523    assertEquals(NUM_RS, rsts.size());
524
525    for (RegionServerThread rst : rsts) {
526      HRegionServer hrs = rst.getRegionServer();
527      List<RegionInfo> hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
528      for (RegionInfo hri : hris) {
529        if (hri.getTable().isSystemTable()) {
530          continue;
531        }
532        LOG.debug(
533          "adding data to rs = " + rst.getName() + " region = " + hri.getRegionNameAsString());
534        Region region = hrs.getOnlineRegion(hri.getRegionName());
535        assertTrue(region != null);
536        putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY);
537      }
538    }
539
540    for (MasterThread mt : cluster.getLiveMasterThreads()) {
541      HRegionServer hrs = mt.getMaster();
542      List<RegionInfo> hris;
543      try {
544        hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
545      } catch (ServerNotRunningYetException e) {
546        // It's ok: this master may be a backup. Ignored.
547        continue;
548      }
549      for (RegionInfo hri : hris) {
550        if (hri.getTable().isSystemTable()) {
551          continue;
552        }
553        LOG.debug(
554          "adding data to rs = " + mt.getName() + " region = " + hri.getRegionNameAsString());
555        Region region = hrs.getOnlineRegion(hri.getRegionName());
556        assertTrue(region != null);
557        putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY);
558      }
559    }
560  }
561
562  public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int num_edits, int edit_size)
563      throws IOException {
564    makeWAL(hrs, regions, num_edits, edit_size, true);
565  }
566
567  public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int numEdits, int editSize,
568      boolean cleanShutdown) throws IOException {
569    // remove root and meta region
570    regions.remove(RegionInfoBuilder.FIRST_META_REGIONINFO);
571
572    for (Iterator<RegionInfo> iter = regions.iterator(); iter.hasNext();) {
573      RegionInfo regionInfo = iter.next();
574      if (regionInfo.getTable().isSystemTable()) {
575        iter.remove();
576      }
577    }
578    byte[] value = new byte[editSize];
579
580    List<RegionInfo> hris = new ArrayList<>();
581    for (RegionInfo region : regions) {
582      if (region.getTable() != tableName) {
583        continue;
584      }
585      hris.add(region);
586    }
587    LOG.info("Creating wal edits across " + hris.size() + " regions.");
588    for (int i = 0; i < editSize; i++) {
589      value[i] = (byte) ('a' + (i % 26));
590    }
591    int n = hris.size();
592    int[] counts = new int[n];
593    // sync every ~30k to line up with desired wal rolls
594    final int syncEvery = 30 * 1024 / editSize;
595    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
596    if (n > 0) {
597      for (int i = 0; i < numEdits; i += 1) {
598        WALEdit e = new WALEdit();
599        RegionInfo curRegionInfo = hris.get(i % n);
600        WAL log = hrs.getWAL(curRegionInfo);
601        byte[] startRow = curRegionInfo.getStartKey();
602        if (startRow == null || startRow.length == 0) {
603          startRow = new byte[] { 0, 0, 0, 0, 1 };
604        }
605        byte[] row = Bytes.incrementBytes(startRow, counts[i % n]);
606        row = Arrays.copyOfRange(row, 3, 8); // use last 5 bytes because
607        // HBaseTestingUtility.createMultiRegions use 5 bytes key
608        byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i));
609        e.add(new KeyValue(row, COLUMN_FAMILY, qualifier, System.currentTimeMillis(), value));
610        log.append(curRegionInfo, new WALKeyImpl(curRegionInfo.getEncodedNameAsBytes(), tableName,
611            System.currentTimeMillis(), mvcc),
612          e, true);
613        if (0 == i % syncEvery) {
614          log.sync();
615        }
616        counts[i % n] += 1;
617      }
618    }
619    // done as two passes because the regions might share logs. shutdown is idempotent, but sync
620    // will cause errors if done after.
621    for (RegionInfo info : hris) {
622      WAL log = hrs.getWAL(info);
623      log.sync();
624    }
625    if (cleanShutdown) {
626      for (RegionInfo info : hris) {
627        WAL log = hrs.getWAL(info);
628        log.shutdown();
629      }
630    }
631    for (int i = 0; i < n; i++) {
632      LOG.info("region " + hris.get(i).getRegionNameAsString() + " has " + counts[i] + " edits");
633    }
634    return;
635  }
636
637  private int countWAL(Path log, FileSystem fs, Configuration conf) throws IOException {
638    int count = 0;
639    try (WAL.Reader in = WALFactory.createReader(fs, log, conf)) {
640      WAL.Entry e;
641      while ((e = in.next()) != null) {
642        if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) {
643          count++;
644        }
645      }
646    }
647    return count;
648  }
649
650  private void blockUntilNoRIT(ZKWatcher zkw, HMaster master) throws Exception {
651    TEST_UTIL.waitUntilNoRegionsInTransition(60000);
652  }
653
654  private void putData(Region region, byte[] startRow, int numRows, byte[] qf, byte[]... families)
655      throws IOException {
656    for (int i = 0; i < numRows; i++) {
657      Put put = new Put(Bytes.add(startRow, Bytes.toBytes(i)));
658      for (byte[] family : families) {
659        put.addColumn(family, qf, null);
660      }
661      region.put(put);
662    }
663  }
664
665  private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems)
666      throws InterruptedException {
667    long curt = System.currentTimeMillis();
668    long endt = curt + timems;
669    while (curt < endt) {
670      if (ctr.sum() == oldval) {
671        Thread.sleep(100);
672        curt = System.currentTimeMillis();
673      } else {
674        assertEquals(newval, ctr.sum());
675        return;
676      }
677    }
678    fail();
679  }
680
681  private void abortMaster(MiniHBaseCluster cluster) throws InterruptedException {
682    for (MasterThread mt : cluster.getLiveMasterThreads()) {
683      if (mt.getMaster().isActiveMaster()) {
684        mt.getMaster().abort("Aborting for tests", new Exception("Trace info"));
685        mt.join();
686        break;
687      }
688    }
689    LOG.debug("Master is aborted");
690  }
691
692  /**
693   * Find a RS that has regions of a table.
694   * @param hasMetaRegion when true, the returned RS has hbase:meta region as well
695   */
696  private HRegionServer findRSToKill(boolean hasMetaRegion) throws Exception {
697    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
698    List<RegionInfo> regions = null;
699    HRegionServer hrs = null;
700
701    for (RegionServerThread rst : rsts) {
702      hrs = rst.getRegionServer();
703      while (rst.isAlive() && !hrs.isOnline()) {
704        Thread.sleep(100);
705      }
706      if (!rst.isAlive()) {
707        continue;
708      }
709      boolean isCarryingMeta = false;
710      boolean foundTableRegion = false;
711      regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
712      for (RegionInfo region : regions) {
713        if (region.isMetaRegion()) {
714          isCarryingMeta = true;
715        }
716        if (region.getTable() == tableName) {
717          foundTableRegion = true;
718        }
719        if (foundTableRegion && (isCarryingMeta || !hasMetaRegion)) {
720          break;
721        }
722      }
723      if (isCarryingMeta && hasMetaRegion) {
724        // clients ask for a RS with META
725        if (!foundTableRegion) {
726          HRegionServer destRS = hrs;
727          // the RS doesn't have regions of the specified table so we need move one to this RS
728          List<RegionInfo> tableRegions = TEST_UTIL.getAdmin().getRegions(tableName);
729          RegionInfo hri = tableRegions.get(0);
730          TEST_UTIL.getAdmin().move(hri.getEncodedNameAsBytes(),
731            Bytes.toBytes(destRS.getServerName().getServerName()));
732          // wait for region move completes
733          RegionStates regionStates =
734              TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
735          TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() {
736            @Override
737            public boolean evaluate() throws Exception {
738              ServerName sn = regionStates.getRegionServerOfRegion(hri);
739              return (sn != null && sn.equals(destRS.getServerName()));
740            }
741          });
742        }
743        return hrs;
744      } else if (hasMetaRegion || isCarryingMeta) {
745        continue;
746      }
747      if (foundTableRegion) {
748        break;
749      }
750    }
751
752    return hrs;
753  }
754}