001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.master;
020
021import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
022import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_wait_for_zk_delete;
023import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_final_transition_failed;
024import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_preempt_task;
025import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_acquired;
026import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_done;
027import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_err;
028import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_resigned;
029import static org.junit.Assert.assertEquals;
030import static org.junit.Assert.assertFalse;
031import static org.junit.Assert.assertTrue;
032import static org.junit.Assert.fail;
033
034import java.io.IOException;
035import java.util.ArrayList;
036import java.util.Arrays;
037import java.util.Iterator;
038import java.util.List;
039import java.util.NavigableSet;
040import java.util.concurrent.ExecutorService;
041import java.util.concurrent.Executors;
042import java.util.concurrent.Future;
043import java.util.concurrent.TimeUnit;
044import java.util.concurrent.TimeoutException;
045import java.util.concurrent.atomic.LongAdder;
046import org.apache.hadoop.conf.Configuration;
047import org.apache.hadoop.fs.FSDataOutputStream;
048import org.apache.hadoop.fs.FileStatus;
049import org.apache.hadoop.fs.FileSystem;
050import org.apache.hadoop.fs.Path;
051import org.apache.hadoop.fs.PathFilter;
052import org.apache.hadoop.hbase.HBaseTestingUtility;
053import org.apache.hadoop.hbase.HConstants;
054import org.apache.hadoop.hbase.KeyValue;
055import org.apache.hadoop.hbase.MiniHBaseCluster;
056import org.apache.hadoop.hbase.NamespaceDescriptor;
057import org.apache.hadoop.hbase.ServerName;
058import org.apache.hadoop.hbase.SplitLogCounters;
059import org.apache.hadoop.hbase.StartMiniClusterOption;
060import org.apache.hadoop.hbase.TableName;
061import org.apache.hadoop.hbase.Waiter;
062import org.apache.hadoop.hbase.client.Put;
063import org.apache.hadoop.hbase.client.RegionInfo;
064import org.apache.hadoop.hbase.client.RegionInfoBuilder;
065import org.apache.hadoop.hbase.client.RegionLocator;
066import org.apache.hadoop.hbase.client.Table;
067import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
068import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
069import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
070import org.apache.hadoop.hbase.master.assignment.RegionStates;
071import org.apache.hadoop.hbase.regionserver.HRegionServer;
072import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
073import org.apache.hadoop.hbase.regionserver.Region;
074import org.apache.hadoop.hbase.util.Bytes;
075import org.apache.hadoop.hbase.util.FSUtils;
076import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
077import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
078import org.apache.hadoop.hbase.util.Threads;
079import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
080import org.apache.hadoop.hbase.wal.WAL;
081import org.apache.hadoop.hbase.wal.WALEdit;
082import org.apache.hadoop.hbase.wal.WALFactory;
083import org.apache.hadoop.hbase.wal.WALKeyImpl;
084import org.apache.hadoop.hbase.wal.WALSplitUtil;
085import org.apache.hadoop.hbase.zookeeper.ZKUtil;
086import org.junit.After;
087import org.junit.AfterClass;
088import org.junit.Before;
089import org.junit.BeforeClass;
090import org.junit.Rule;
091import org.junit.Test;
092import org.junit.rules.TestName;
093import org.slf4j.Logger;
094import org.slf4j.LoggerFactory;
095
096import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
097
098/**
099 * Base class for testing distributed log splitting.
100 */
101public abstract class AbstractTestDLS {
102  private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogManager.class);
103
104  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
105
106  // Start a cluster with 2 masters and 5 regionservers
107  private static final int NUM_MASTERS = 2;
108  private static final int NUM_RS = 5;
109  private static byte[] COLUMN_FAMILY = Bytes.toBytes("family");
110
111  @Rule
112  public TestName testName = new TestName();
113
114  private TableName tableName;
115  private MiniHBaseCluster cluster;
116  private HMaster master;
117  private Configuration conf;
118
119  @Rule
120  public TestName name = new TestName();
121
122  @BeforeClass
123  public static void setup() throws Exception {
124    // Uncomment the following line if more verbosity is needed for
125    // debugging (see HBASE-12285 for details).
126    // Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
127    TEST_UTIL.startMiniZKCluster();
128    TEST_UTIL.startMiniDFSCluster(3);
129  }
130
131  @AfterClass
132  public static void tearDown() throws Exception {
133    TEST_UTIL.shutdownMiniCluster();
134  }
135
136  protected abstract String getWalProvider();
137
138  private void startCluster(int numRS) throws Exception {
139    SplitLogCounters.resetCounters();
140    LOG.info("Starting cluster");
141    conf.setLong("hbase.splitlog.max.resubmit", 0);
142    // Make the failure test faster
143    conf.setInt("zookeeper.recovery.retry", 0);
144    conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1);
145    conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing
146    conf.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 3);
147    conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
148    conf.set("hbase.wal.provider", getWalProvider());
149    StartMiniClusterOption option = StartMiniClusterOption.builder()
150        .numMasters(NUM_MASTERS).numRegionServers(numRS).build();
151    TEST_UTIL.startMiniHBaseCluster(option);
152    cluster = TEST_UTIL.getHBaseCluster();
153    LOG.info("Waiting for active/ready master");
154    cluster.waitForActiveAndReadyMaster();
155    master = cluster.getMaster();
156    TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
157      @Override
158      public boolean evaluate() throws Exception {
159        return cluster.getLiveRegionServerThreads().size() >= numRS;
160      }
161    });
162  }
163
164  @Before
165  public void before() throws Exception {
166    conf = TEST_UTIL.getConfiguration();
167    tableName = TableName.valueOf(testName.getMethodName());
168  }
169
170  @After
171  public void after() throws Exception {
172    TEST_UTIL.shutdownMiniHBaseCluster();
173    TEST_UTIL.getTestFileSystem().delete(FSUtils.getRootDir(TEST_UTIL.getConfiguration()), true);
174    ZKUtil.deleteNodeRecursively(TEST_UTIL.getZooKeeperWatcher(), "/hbase");
175  }
176
177  @Test
178  public void testRecoveredEdits() throws Exception {
179    conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal
180    startCluster(NUM_RS);
181
182    int numLogLines = 10000;
183    SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
184    // turn off load balancing to prevent regions from moving around otherwise
185    // they will consume recovered.edits
186    master.balanceSwitch(false);
187    FileSystem fs = master.getMasterFileSystem().getFileSystem();
188
189    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
190
191    Path rootdir = FSUtils.getRootDir(conf);
192
193    int numRegions = 50;
194    try (Table t = installTable(numRegions)) {
195      List<RegionInfo> regions = null;
196      HRegionServer hrs = null;
197      for (int i = 0; i < NUM_RS; i++) {
198        hrs = rsts.get(i).getRegionServer();
199        regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
200        // At least one RS will have >= to average number of regions.
201        if (regions.size() >= numRegions / NUM_RS) {
202          break;
203        }
204      }
205      Path logDir = new Path(rootdir,
206          AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString()));
207
208      LOG.info("#regions = " + regions.size());
209      Iterator<RegionInfo> it = regions.iterator();
210      while (it.hasNext()) {
211        RegionInfo region = it.next();
212        if (region.getTable().getNamespaceAsString()
213            .equals(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR)) {
214          it.remove();
215        }
216      }
217
218      makeWAL(hrs, regions, numLogLines, 100);
219
220      slm.splitLogDistributed(logDir);
221
222      int count = 0;
223      for (RegionInfo hri : regions) {
224        @SuppressWarnings("deprecation")
225        Path editsdir = WALSplitUtil
226            .getRegionDirRecoveredEditsDir(FSUtils.getWALRegionDir(conf,
227                tableName, hri.getEncodedName()));
228        LOG.debug("checking edits dir " + editsdir);
229        FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
230          @Override
231          public boolean accept(Path p) {
232            if (WALSplitUtil.isSequenceIdFile(p)) {
233              return false;
234            }
235            return true;
236          }
237        });
238        assertTrue(
239          "edits dir should have more than a single file in it. instead has " + files.length,
240          files.length > 1);
241        for (int i = 0; i < files.length; i++) {
242          int c = countWAL(files[i].getPath(), fs, conf);
243          count += c;
244        }
245        LOG.info(count + " edits in " + files.length + " recovered edits files.");
246      }
247
248      // check that the log file is moved
249      assertFalse(fs.exists(logDir));
250      assertEquals(numLogLines, count);
251    }
252  }
253
254  @Test
255  public void testMasterStartsUpWithLogSplittingWork() throws Exception {
256    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1);
257    startCluster(NUM_RS);
258
259    int numRegionsToCreate = 40;
260    int numLogLines = 1000;
261    // turn off load balancing to prevent regions from moving around otherwise
262    // they will consume recovered.edits
263    master.balanceSwitch(false);
264
265    try (Table ht = installTable(numRegionsToCreate)) {
266      HRegionServer hrs = findRSToKill(false);
267      List<RegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
268      makeWAL(hrs, regions, numLogLines, 100);
269
270      // abort master
271      abortMaster(cluster);
272
273      // abort RS
274      LOG.info("Aborting region server: " + hrs.getServerName());
275      hrs.abort("testing");
276
277      // wait for abort completes
278      TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
279        @Override
280        public boolean evaluate() throws Exception {
281          return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 1;
282        }
283      });
284
285      Thread.sleep(2000);
286      LOG.info("Current Open Regions:" + HBaseTestingUtility.getAllOnlineRegions(cluster).size());
287
288      // wait for abort completes
289      TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
290        @Override
291        public boolean evaluate() throws Exception {
292          return (HBaseTestingUtility.getAllOnlineRegions(cluster)
293              .size() >= (numRegionsToCreate + 1));
294        }
295      });
296
297      LOG.info("Current Open Regions After Master Node Starts Up:" +
298          HBaseTestingUtility.getAllOnlineRegions(cluster).size());
299
300      assertEquals(numLogLines, TEST_UTIL.countRows(ht));
301    }
302  }
303
304  /**
305   * The original intention of this test was to force an abort of a region server and to make sure
306   * that the failure path in the region servers is properly evaluated. But it is difficult to
307   * ensure that the region server doesn't finish the log splitting before it aborts. Also now,
308   * there is this code path where the master will preempt the region server when master detects
309   * that the region server has aborted.
310   * @throws Exception
311   */
312  // Was marked flaky before Distributed Log Replay cleanup.
313  @Test
314  public void testWorkerAbort() throws Exception {
315    LOG.info("testWorkerAbort");
316    startCluster(3);
317    int numLogLines = 10000;
318    SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
319    FileSystem fs = master.getMasterFileSystem().getFileSystem();
320
321    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
322    HRegionServer hrs = findRSToKill(false);
323    Path rootdir = FSUtils.getRootDir(conf);
324    final Path logDir = new Path(rootdir,
325        AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString()));
326
327    try (Table t = installTable(40)) {
328      makeWAL(hrs, ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()), numLogLines, 100);
329
330      new Thread() {
331        @Override
332        public void run() {
333          try {
334            waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
335          } catch (InterruptedException e) {
336          }
337          for (RegionServerThread rst : rsts) {
338            rst.getRegionServer().abort("testing");
339            break;
340          }
341        }
342      }.start();
343      FileStatus[] logfiles = fs.listStatus(logDir);
344      TaskBatch batch = new TaskBatch();
345      slm.enqueueSplitTask(logfiles[0].getPath().toString(), batch);
346      // waitForCounter but for one of the 2 counters
347      long curt = System.currentTimeMillis();
348      long waitTime = 80000;
349      long endt = curt + waitTime;
350      while (curt < endt) {
351        if ((tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() +
352            tot_wkr_final_transition_failed.sum() + tot_wkr_task_done.sum() +
353            tot_wkr_preempt_task.sum()) == 0) {
354          Thread.sleep(100);
355          curt = System.currentTimeMillis();
356        } else {
357          assertTrue(1 <= (tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() +
358              tot_wkr_final_transition_failed.sum() + tot_wkr_task_done.sum() +
359              tot_wkr_preempt_task.sum()));
360          return;
361        }
362      }
363      fail("none of the following counters went up in " + waitTime + " milliseconds - " +
364          "tot_wkr_task_resigned, tot_wkr_task_err, " +
365          "tot_wkr_final_transition_failed, tot_wkr_task_done, " + "tot_wkr_preempt_task");
366    }
367  }
368
369  @Test
370  public void testThreeRSAbort() throws Exception {
371    LOG.info("testThreeRSAbort");
372    int numRegionsToCreate = 40;
373    int numRowsPerRegion = 100;
374
375    startCluster(NUM_RS); // NUM_RS=6.
376
377    try (Table table = installTable(numRegionsToCreate)) {
378      populateDataInTable(numRowsPerRegion);
379
380      List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
381      assertEquals(NUM_RS, rsts.size());
382      cluster.killRegionServer(rsts.get(0).getRegionServer().getServerName());
383      cluster.killRegionServer(rsts.get(1).getRegionServer().getServerName());
384      cluster.killRegionServer(rsts.get(2).getRegionServer().getServerName());
385
386      TEST_UTIL.waitFor(60000, new Waiter.ExplainingPredicate<Exception>() {
387
388        @Override
389        public boolean evaluate() throws Exception {
390          return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 3;
391        }
392
393        @Override
394        public String explainFailure() throws Exception {
395          return "Timed out waiting for server aborts.";
396        }
397      });
398      TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
399      int rows;
400      try {
401        rows = TEST_UTIL.countRows(table);
402      } catch (Exception e) {
403        Threads.printThreadInfo(System.out, "Thread dump before fail");
404        throw e;
405      }
406      assertEquals(numRegionsToCreate * numRowsPerRegion, rows);
407    }
408  }
409
410  @Test
411  public void testDelayedDeleteOnFailure() throws Exception {
412    LOG.info("testDelayedDeleteOnFailure");
413    startCluster(1);
414    final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
415    final FileSystem fs = master.getMasterFileSystem().getFileSystem();
416    final Path rootLogDir = new Path(FSUtils.getWALRootDir(conf), HConstants.HREGION_LOGDIR_NAME);
417    final Path logDir = new Path(rootLogDir, ServerName.valueOf("x", 1, 1).toString());
418    fs.mkdirs(logDir);
419    ExecutorService executor = null;
420    try {
421      final Path corruptedLogFile = new Path(logDir, "x");
422      FSDataOutputStream out;
423      out = fs.create(corruptedLogFile);
424      out.write(0);
425      out.write(Bytes.toBytes("corrupted bytes"));
426      out.close();
427      ZKSplitLogManagerCoordination coordination =
428          (ZKSplitLogManagerCoordination) (master.getCoordinatedStateManager())
429              .getSplitLogManagerCoordination();
430      coordination.setIgnoreDeleteForTesting(true);
431      executor = Executors.newSingleThreadExecutor();
432      Runnable runnable = new Runnable() {
433        @Override
434        public void run() {
435          try {
436            // since the logDir is a fake, corrupted one, so the split log worker
437            // will finish it quickly with error, and this call will fail and throw
438            // an IOException.
439            slm.splitLogDistributed(logDir);
440          } catch (IOException ioe) {
441            try {
442              assertTrue(fs.exists(corruptedLogFile));
443              // this call will block waiting for the task to be removed from the
444              // tasks map which is not going to happen since ignoreZKDeleteForTesting
445              // is set to true, until it is interrupted.
446              slm.splitLogDistributed(logDir);
447            } catch (IOException e) {
448              assertTrue(Thread.currentThread().isInterrupted());
449              return;
450            }
451            fail("did not get the expected IOException from the 2nd call");
452          }
453          fail("did not get the expected IOException from the 1st call");
454        }
455      };
456      Future<?> result = executor.submit(runnable);
457      try {
458        result.get(2000, TimeUnit.MILLISECONDS);
459      } catch (TimeoutException te) {
460        // it is ok, expected.
461      }
462      waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000);
463      executor.shutdownNow();
464      executor = null;
465
466      // make sure the runnable is finished with no exception thrown.
467      result.get();
468    } finally {
469      if (executor != null) {
470        // interrupt the thread in case the test fails in the middle.
471        // it has no effect if the thread is already terminated.
472        executor.shutdownNow();
473      }
474      fs.delete(logDir, true);
475    }
476  }
477
478  private Table installTable(int nrs) throws Exception {
479    return installTable(nrs, 0);
480  }
481
482  private Table installTable(int nrs, int existingRegions) throws Exception {
483    // Create a table with regions
484    byte[] family = Bytes.toBytes("family");
485    LOG.info("Creating table with " + nrs + " regions");
486    Table table = TEST_UTIL.createMultiRegionTable(tableName, family, nrs);
487    int numRegions = -1;
488    try (RegionLocator r = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
489      numRegions = r.getStartKeys().length;
490    }
491    assertEquals(nrs, numRegions);
492    LOG.info("Waiting for no more RIT\n");
493    blockUntilNoRIT();
494    // disable-enable cycle to get rid of table's dead regions left behind
495    // by createMultiRegions
496    assertTrue(TEST_UTIL.getAdmin().isTableEnabled(tableName));
497    LOG.debug("Disabling table\n");
498    TEST_UTIL.getAdmin().disableTable(tableName);
499    LOG.debug("Waiting for no more RIT\n");
500    blockUntilNoRIT();
501    NavigableSet<String> regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
502    LOG.debug("Verifying only catalog region is assigned\n");
503    if (regions.size() != 1) {
504      for (String oregion : regions)
505        LOG.debug("Region still online: " + oregion);
506    }
507    assertEquals(1 + existingRegions, regions.size());
508    LOG.debug("Enabling table\n");
509    TEST_UTIL.getAdmin().enableTable(tableName);
510    LOG.debug("Waiting for no more RIT\n");
511    blockUntilNoRIT();
512    LOG.debug("Verifying there are " + numRegions + " assigned on cluster\n");
513    regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
514    assertEquals(numRegions + 1 + existingRegions, regions.size());
515    return table;
516  }
517
518  void populateDataInTable(int nrows) throws Exception {
519    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
520    assertEquals(NUM_RS, rsts.size());
521
522    for (RegionServerThread rst : rsts) {
523      HRegionServer hrs = rst.getRegionServer();
524      List<RegionInfo> hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
525      for (RegionInfo hri : hris) {
526        if (hri.getTable().isSystemTable()) {
527          continue;
528        }
529        LOG.debug(
530          "adding data to rs = " + rst.getName() + " region = " + hri.getRegionNameAsString());
531        Region region = hrs.getOnlineRegion(hri.getRegionName());
532        assertTrue(region != null);
533        putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY);
534      }
535    }
536
537    for (MasterThread mt : cluster.getLiveMasterThreads()) {
538      HRegionServer hrs = mt.getMaster();
539      List<RegionInfo> hris;
540      try {
541        hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
542      } catch (ServerNotRunningYetException e) {
543        // It's ok: this master may be a backup. Ignored.
544        continue;
545      }
546      for (RegionInfo hri : hris) {
547        if (hri.getTable().isSystemTable()) {
548          continue;
549        }
550        LOG.debug(
551          "adding data to rs = " + mt.getName() + " region = " + hri.getRegionNameAsString());
552        Region region = hrs.getOnlineRegion(hri.getRegionName());
553        assertTrue(region != null);
554        putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY);
555      }
556    }
557  }
558
559  public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int num_edits, int edit_size)
560      throws IOException {
561    makeWAL(hrs, regions, num_edits, edit_size, true);
562  }
563
564  public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int numEdits, int editSize,
565      boolean cleanShutdown) throws IOException {
566    // remove root and meta region
567    regions.remove(RegionInfoBuilder.FIRST_META_REGIONINFO);
568
569    for (Iterator<RegionInfo> iter = regions.iterator(); iter.hasNext();) {
570      RegionInfo regionInfo = iter.next();
571      if (regionInfo.getTable().isSystemTable()) {
572        iter.remove();
573      }
574    }
575    byte[] value = new byte[editSize];
576
577    List<RegionInfo> hris = new ArrayList<>();
578    for (RegionInfo region : regions) {
579      if (region.getTable() != tableName) {
580        continue;
581      }
582      hris.add(region);
583    }
584    LOG.info("Creating wal edits across " + hris.size() + " regions.");
585    for (int i = 0; i < editSize; i++) {
586      value[i] = (byte) ('a' + (i % 26));
587    }
588    int n = hris.size();
589    int[] counts = new int[n];
590    // sync every ~30k to line up with desired wal rolls
591    final int syncEvery = 30 * 1024 / editSize;
592    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
593    if (n > 0) {
594      for (int i = 0; i < numEdits; i += 1) {
595        WALEdit e = new WALEdit();
596        RegionInfo curRegionInfo = hris.get(i % n);
597        WAL log = hrs.getWAL(curRegionInfo);
598        byte[] startRow = curRegionInfo.getStartKey();
599        if (startRow == null || startRow.length == 0) {
600          startRow = new byte[] { 0, 0, 0, 0, 1 };
601        }
602        byte[] row = Bytes.incrementBytes(startRow, counts[i % n]);
603        row = Arrays.copyOfRange(row, 3, 8); // use last 5 bytes because
604        // HBaseTestingUtility.createMultiRegions use 5 bytes key
605        byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i));
606        e.add(new KeyValue(row, COLUMN_FAMILY, qualifier, System.currentTimeMillis(), value));
607        log.append(curRegionInfo, new WALKeyImpl(curRegionInfo.getEncodedNameAsBytes(), tableName,
608            System.currentTimeMillis(), mvcc),
609          e, true);
610        if (0 == i % syncEvery) {
611          log.sync();
612        }
613        counts[i % n] += 1;
614      }
615    }
616    // done as two passes because the regions might share logs. shutdown is idempotent, but sync
617    // will cause errors if done after.
618    for (RegionInfo info : hris) {
619      WAL log = hrs.getWAL(info);
620      log.sync();
621    }
622    if (cleanShutdown) {
623      for (RegionInfo info : hris) {
624        WAL log = hrs.getWAL(info);
625        log.shutdown();
626      }
627    }
628    for (int i = 0; i < n; i++) {
629      LOG.info("region " + hris.get(i).getRegionNameAsString() + " has " + counts[i] + " edits");
630    }
631    return;
632  }
633
634  private int countWAL(Path log, FileSystem fs, Configuration conf) throws IOException {
635    int count = 0;
636    try (WAL.Reader in = WALFactory.createReader(fs, log, conf)) {
637      WAL.Entry e;
638      while ((e = in.next()) != null) {
639        if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) {
640          count++;
641        }
642      }
643    }
644    return count;
645  }
646
647  private void blockUntilNoRIT() throws Exception {
648    TEST_UTIL.waitUntilNoRegionsInTransition(60000);
649  }
650
651  private void putData(Region region, byte[] startRow, int numRows, byte[] qf, byte[]... families)
652      throws IOException {
653    for (int i = 0; i < numRows; i++) {
654      Put put = new Put(Bytes.add(startRow, Bytes.toBytes(i)));
655      for (byte[] family : families) {
656        put.addColumn(family, qf, null);
657      }
658      region.put(put);
659    }
660  }
661
662  private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems)
663      throws InterruptedException {
664    long curt = System.currentTimeMillis();
665    long endt = curt + timems;
666    while (curt < endt) {
667      if (ctr.sum() == oldval) {
668        Thread.sleep(100);
669        curt = System.currentTimeMillis();
670      } else {
671        assertEquals(newval, ctr.sum());
672        return;
673      }
674    }
675    fail();
676  }
677
678  private void abortMaster(MiniHBaseCluster cluster) throws InterruptedException {
679    for (MasterThread mt : cluster.getLiveMasterThreads()) {
680      if (mt.getMaster().isActiveMaster()) {
681        mt.getMaster().abort("Aborting for tests", new Exception("Trace info"));
682        mt.join();
683        break;
684      }
685    }
686    LOG.debug("Master is aborted");
687  }
688
689  /**
690   * Find a RS that has regions of a table.
691   * @param hasMetaRegion when true, the returned RS has hbase:meta region as well
692   */
693  private HRegionServer findRSToKill(boolean hasMetaRegion) throws Exception {
694    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
695    List<RegionInfo> regions = null;
696    HRegionServer hrs = null;
697
698    for (RegionServerThread rst : rsts) {
699      hrs = rst.getRegionServer();
700      while (rst.isAlive() && !hrs.isOnline()) {
701        Thread.sleep(100);
702      }
703      if (!rst.isAlive()) {
704        continue;
705      }
706      boolean isCarryingMeta = false;
707      boolean foundTableRegion = false;
708      regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
709      for (RegionInfo region : regions) {
710        if (region.isMetaRegion()) {
711          isCarryingMeta = true;
712        }
713        if (region.getTable() == tableName) {
714          foundTableRegion = true;
715        }
716        if (foundTableRegion && (isCarryingMeta || !hasMetaRegion)) {
717          break;
718        }
719      }
720      if (isCarryingMeta && hasMetaRegion) {
721        // clients ask for a RS with META
722        if (!foundTableRegion) {
723          HRegionServer destRS = hrs;
724          // the RS doesn't have regions of the specified table so we need move one to this RS
725          List<RegionInfo> tableRegions = TEST_UTIL.getAdmin().getRegions(tableName);
726          RegionInfo hri = tableRegions.get(0);
727          TEST_UTIL.getAdmin().move(hri.getEncodedNameAsBytes(), destRS.getServerName());
728          // wait for region move completes
729          RegionStates regionStates =
730              TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
731          TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() {
732            @Override
733            public boolean evaluate() throws Exception {
734              ServerName sn = regionStates.getRegionServerOfRegion(hri);
735              return (sn != null && sn.equals(destRS.getServerName()));
736            }
737          });
738        }
739        return hrs;
740      } else if (hasMetaRegion || isCarryingMeta) {
741        continue;
742      }
743      if (foundTableRegion) {
744        break;
745      }
746    }
747
748    return hrs;
749  }
750}