001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.master;
020
021import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
022import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_wait_for_zk_delete;
023import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_final_transition_failed;
024import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_preempt_task;
025import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_acquired;
026import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_done;
027import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_err;
028import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_resigned;
029import static org.junit.Assert.assertEquals;
030import static org.junit.Assert.assertFalse;
031import static org.junit.Assert.assertTrue;
032import static org.junit.Assert.fail;
033
034import java.io.IOException;
035import java.util.ArrayList;
036import java.util.Arrays;
037import java.util.Iterator;
038import java.util.List;
039import java.util.NavigableSet;
040import java.util.concurrent.ExecutorService;
041import java.util.concurrent.Executors;
042import java.util.concurrent.Future;
043import java.util.concurrent.TimeUnit;
044import java.util.concurrent.TimeoutException;
045import java.util.concurrent.atomic.LongAdder;
046import java.util.stream.Collectors;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.fs.FSDataOutputStream;
049import org.apache.hadoop.fs.FileStatus;
050import org.apache.hadoop.fs.FileSystem;
051import org.apache.hadoop.fs.Path;
052import org.apache.hadoop.fs.PathFilter;
053import org.apache.hadoop.hbase.HBaseTestingUtility;
054import org.apache.hadoop.hbase.HConstants;
055import org.apache.hadoop.hbase.KeyValue;
056import org.apache.hadoop.hbase.MiniHBaseCluster;
057import org.apache.hadoop.hbase.NamespaceDescriptor;
058import org.apache.hadoop.hbase.ServerName;
059import org.apache.hadoop.hbase.SplitLogCounters;
060import org.apache.hadoop.hbase.StartMiniClusterOption;
061import org.apache.hadoop.hbase.TableName;
062import org.apache.hadoop.hbase.Waiter;
063import org.apache.hadoop.hbase.client.Put;
064import org.apache.hadoop.hbase.client.RegionInfo;
065import org.apache.hadoop.hbase.client.RegionInfoBuilder;
066import org.apache.hadoop.hbase.client.RegionLocator;
067import org.apache.hadoop.hbase.client.Table;
068import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
069import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
070import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
071import org.apache.hadoop.hbase.master.assignment.RegionStates;
072import org.apache.hadoop.hbase.regionserver.HRegionServer;
073import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
074import org.apache.hadoop.hbase.regionserver.Region;
075import org.apache.hadoop.hbase.util.Bytes;
076import org.apache.hadoop.hbase.util.CommonFSUtils;
077import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
078import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
079import org.apache.hadoop.hbase.util.Threads;
080import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
081import org.apache.hadoop.hbase.wal.WAL;
082import org.apache.hadoop.hbase.wal.WALEdit;
083import org.apache.hadoop.hbase.wal.WALFactory;
084import org.apache.hadoop.hbase.wal.WALKeyImpl;
085import org.apache.hadoop.hbase.wal.WALSplitUtil;
086import org.apache.hadoop.hbase.zookeeper.ZKUtil;
087import org.junit.After;
088import org.junit.AfterClass;
089import org.junit.Before;
090import org.junit.BeforeClass;
091import org.junit.Rule;
092import org.junit.Test;
093import org.junit.rules.TestName;
094import org.slf4j.Logger;
095import org.slf4j.LoggerFactory;
096
097import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
098
099/**
100 * Base class for testing distributed log splitting.
101 */
102public abstract class AbstractTestDLS {
103  private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogManager.class);
104
105  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
106
107  // Start a cluster with 2 masters and 5 regionservers
108  private static final int NUM_MASTERS = 2;
109  private static final int NUM_RS = 5;
110  private static byte[] COLUMN_FAMILY = Bytes.toBytes("family");
111
112  @Rule
113  public TestName testName = new TestName();
114
115  private TableName tableName;
116  private MiniHBaseCluster cluster;
117  private HMaster master;
118  private Configuration conf;
119
120  @Rule
121  public TestName name = new TestName();
122
123  @BeforeClass
124  public static void setup() throws Exception {
125    // Uncomment the following line if more verbosity is needed for
126    // debugging (see HBASE-12285 for details).
127    // Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
128    TEST_UTIL.startMiniZKCluster();
129    TEST_UTIL.startMiniDFSCluster(3);
130  }
131
132  @AfterClass
133  public static void tearDown() throws Exception {
134    TEST_UTIL.shutdownMiniCluster();
135  }
136
137  protected abstract String getWalProvider();
138
139  private void startCluster(int numRS) throws Exception {
140    SplitLogCounters.resetCounters();
141    LOG.info("Starting cluster");
142    conf.setLong("hbase.splitlog.max.resubmit", 0);
143    // Make the failure test faster
144    conf.setInt("zookeeper.recovery.retry", 0);
145    conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1);
146    conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing
147    conf.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 3);
148    conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
149    conf.set("hbase.wal.provider", getWalProvider());
150    StartMiniClusterOption option = StartMiniClusterOption.builder()
151        .numMasters(NUM_MASTERS).numRegionServers(numRS).build();
152    TEST_UTIL.startMiniHBaseCluster(option);
153    cluster = TEST_UTIL.getHBaseCluster();
154    LOG.info("Waiting for active/ready master");
155    cluster.waitForActiveAndReadyMaster();
156    master = cluster.getMaster();
157    TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
158      @Override
159      public boolean evaluate() throws Exception {
160        return cluster.getLiveRegionServerThreads().size() >= numRS;
161      }
162    });
163  }
164
165  @Before
166  public void before() throws Exception {
167    conf = TEST_UTIL.getConfiguration();
168    tableName = TableName.valueOf(testName.getMethodName());
169  }
170
171  @After
172  public void after() throws Exception {
173    TEST_UTIL.shutdownMiniHBaseCluster();
174    TEST_UTIL.getTestFileSystem().delete(CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration()),
175      true);
176    ZKUtil.deleteNodeRecursively(TEST_UTIL.getZooKeeperWatcher(), "/hbase");
177  }
178
179  @Test
180  public void testRecoveredEdits() throws Exception {
181    conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal
182    startCluster(NUM_RS);
183
184    int numLogLines = 10000;
185    SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
186    // turn off load balancing to prevent regions from moving around otherwise
187    // they will consume recovered.edits
188    master.balanceSwitch(false);
189    FileSystem fs = master.getMasterFileSystem().getFileSystem();
190
191    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
192
193    Path rootdir = CommonFSUtils.getRootDir(conf);
194
195    int numRegions = 50;
196    try (Table t = installTable(numRegions)) {
197      List<RegionInfo> regions = null;
198      HRegionServer hrs = null;
199      for (int i = 0; i < NUM_RS; i++) {
200        hrs = rsts.get(i).getRegionServer();
201        regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
202        // At least one RS will have >= to average number of regions.
203        if (regions.size() >= numRegions / NUM_RS) {
204          break;
205        }
206      }
207      Path logDir = new Path(rootdir,
208          AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString()));
209
210      LOG.info("#regions = " + regions.size());
211      Iterator<RegionInfo> it = regions.iterator();
212      while (it.hasNext()) {
213        RegionInfo region = it.next();
214        if (region.getTable().getNamespaceAsString()
215            .equals(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR)) {
216          it.remove();
217        }
218      }
219
220      makeWAL(hrs, regions, numLogLines, 100);
221
222      slm.splitLogDistributed(logDir);
223
224      int count = 0;
225      for (RegionInfo hri : regions) {
226        @SuppressWarnings("deprecation")
227        Path editsdir = WALSplitUtil
228            .getRegionDirRecoveredEditsDir(CommonFSUtils.getWALRegionDir(conf,
229                tableName, hri.getEncodedName()));
230        LOG.debug("Checking edits dir " + editsdir);
231        FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
232          @Override
233          public boolean accept(Path p) {
234            if (WALSplitUtil.isSequenceIdFile(p)) {
235              return false;
236            }
237            return true;
238          }
239        });
240        LOG.info("Files {}", Arrays.stream(files).map(f -> f.getPath().toString()).
241          collect(Collectors.joining(",")));
242        assertTrue("Edits dir should have more than a one file", files.length > 1);
243        for (int i = 0; i < files.length; i++) {
244          int c = countWAL(files[i].getPath(), fs, conf);
245          count += c;
246        }
247        LOG.info(count + " edits in " + files.length + " recovered edits files.");
248      }
249
250      // check that the log file is moved
251      assertFalse(fs.exists(logDir));
252      assertEquals(numLogLines, count);
253    }
254  }
255
256  @Test
257  public void testMasterStartsUpWithLogSplittingWork() throws Exception {
258    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1);
259    startCluster(NUM_RS);
260
261    int numRegionsToCreate = 40;
262    int numLogLines = 1000;
263    // turn off load balancing to prevent regions from moving around otherwise
264    // they will consume recovered.edits
265    master.balanceSwitch(false);
266
267    try (Table ht = installTable(numRegionsToCreate)) {
268      HRegionServer hrs = findRSToKill(false);
269      List<RegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
270      makeWAL(hrs, regions, numLogLines, 100);
271
272      // abort master
273      abortMaster(cluster);
274
275      // abort RS
276      LOG.info("Aborting region server: " + hrs.getServerName());
277      hrs.abort("testing");
278
279      // wait for abort completes
280      TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
281        @Override
282        public boolean evaluate() throws Exception {
283          return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 1;
284        }
285      });
286
287      Thread.sleep(2000);
288      LOG.info("Current Open Regions:" + HBaseTestingUtility.getAllOnlineRegions(cluster).size());
289
290      // wait for abort completes
291      TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
292        @Override
293        public boolean evaluate() throws Exception {
294          return (HBaseTestingUtility.getAllOnlineRegions(cluster)
295              .size() >= (numRegionsToCreate + 1));
296        }
297      });
298
299      LOG.info("Current Open Regions After Master Node Starts Up:" +
300          HBaseTestingUtility.getAllOnlineRegions(cluster).size());
301
302      assertEquals(numLogLines, TEST_UTIL.countRows(ht));
303    }
304  }
305
306  /**
307   * The original intention of this test was to force an abort of a region server and to make sure
308   * that the failure path in the region servers is properly evaluated. But it is difficult to
309   * ensure that the region server doesn't finish the log splitting before it aborts. Also now,
310   * there is this code path where the master will preempt the region server when master detects
311   * that the region server has aborted.
312   * @throws Exception
313   */
314  // Was marked flaky before Distributed Log Replay cleanup.
315  @Test
316  public void testWorkerAbort() throws Exception {
317    LOG.info("testWorkerAbort");
318    startCluster(3);
319    int numLogLines = 10000;
320    SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
321    FileSystem fs = master.getMasterFileSystem().getFileSystem();
322
323    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
324    HRegionServer hrs = findRSToKill(false);
325    Path rootdir = CommonFSUtils.getRootDir(conf);
326    final Path logDir = new Path(rootdir,
327        AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString()));
328
329    try (Table t = installTable(40)) {
330      makeWAL(hrs, ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()), numLogLines, 100);
331
332      new Thread() {
333        @Override
334        public void run() {
335          try {
336            waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
337          } catch (InterruptedException e) {
338          }
339          for (RegionServerThread rst : rsts) {
340            rst.getRegionServer().abort("testing");
341            break;
342          }
343        }
344      }.start();
345      FileStatus[] logfiles = fs.listStatus(logDir);
346      TaskBatch batch = new TaskBatch();
347      slm.enqueueSplitTask(logfiles[0].getPath().toString(), batch);
348      // waitForCounter but for one of the 2 counters
349      long curt = System.currentTimeMillis();
350      long waitTime = 80000;
351      long endt = curt + waitTime;
352      while (curt < endt) {
353        if ((tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() +
354            tot_wkr_final_transition_failed.sum() + tot_wkr_task_done.sum() +
355            tot_wkr_preempt_task.sum()) == 0) {
356          Thread.sleep(100);
357          curt = System.currentTimeMillis();
358        } else {
359          assertTrue(1 <= (tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() +
360              tot_wkr_final_transition_failed.sum() + tot_wkr_task_done.sum() +
361              tot_wkr_preempt_task.sum()));
362          return;
363        }
364      }
365      fail("none of the following counters went up in " + waitTime + " milliseconds - " +
366          "tot_wkr_task_resigned, tot_wkr_task_err, " +
367          "tot_wkr_final_transition_failed, tot_wkr_task_done, " + "tot_wkr_preempt_task");
368    }
369  }
370
371  @Test
372  public void testThreeRSAbort() throws Exception {
373    LOG.info("testThreeRSAbort");
374    int numRegionsToCreate = 40;
375    int numRowsPerRegion = 100;
376
377    startCluster(NUM_RS); // NUM_RS=6.
378
379    try (Table table = installTable(numRegionsToCreate)) {
380      populateDataInTable(numRowsPerRegion);
381
382      List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
383      assertEquals(NUM_RS, rsts.size());
384      cluster.killRegionServer(rsts.get(0).getRegionServer().getServerName());
385      cluster.killRegionServer(rsts.get(1).getRegionServer().getServerName());
386      cluster.killRegionServer(rsts.get(2).getRegionServer().getServerName());
387
388      TEST_UTIL.waitFor(60000, new Waiter.ExplainingPredicate<Exception>() {
389
390        @Override
391        public boolean evaluate() throws Exception {
392          return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 3;
393        }
394
395        @Override
396        public String explainFailure() throws Exception {
397          return "Timed out waiting for server aborts.";
398        }
399      });
400      TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
401      int rows;
402      try {
403        rows = TEST_UTIL.countRows(table);
404      } catch (Exception e) {
405        Threads.printThreadInfo(System.out, "Thread dump before fail");
406        throw e;
407      }
408      assertEquals(numRegionsToCreate * numRowsPerRegion, rows);
409    }
410  }
411
412  @Test
413  public void testDelayedDeleteOnFailure() throws Exception {
414    LOG.info("testDelayedDeleteOnFailure");
415    startCluster(1);
416    final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
417    final FileSystem fs = master.getMasterFileSystem().getFileSystem();
418    final Path rootLogDir =
419      new Path(CommonFSUtils.getWALRootDir(conf), HConstants.HREGION_LOGDIR_NAME);
420    final Path logDir = new Path(rootLogDir, ServerName.valueOf("x", 1, 1).toString());
421    fs.mkdirs(logDir);
422    ExecutorService executor = null;
423    try {
424      final Path corruptedLogFile = new Path(logDir, "x");
425      FSDataOutputStream out;
426      out = fs.create(corruptedLogFile);
427      out.write(0);
428      out.write(Bytes.toBytes("corrupted bytes"));
429      out.close();
430      ZKSplitLogManagerCoordination coordination =
431          (ZKSplitLogManagerCoordination) (master.getCoordinatedStateManager())
432              .getSplitLogManagerCoordination();
433      coordination.setIgnoreDeleteForTesting(true);
434      executor = Executors.newSingleThreadExecutor();
435      Runnable runnable = new Runnable() {
436        @Override
437        public void run() {
438          try {
439            // since the logDir is a fake, corrupted one, so the split log worker
440            // will finish it quickly with error, and this call will fail and throw
441            // an IOException.
442            slm.splitLogDistributed(logDir);
443          } catch (IOException ioe) {
444            try {
445              assertTrue(fs.exists(corruptedLogFile));
446              // this call will block waiting for the task to be removed from the
447              // tasks map which is not going to happen since ignoreZKDeleteForTesting
448              // is set to true, until it is interrupted.
449              slm.splitLogDistributed(logDir);
450            } catch (IOException e) {
451              assertTrue(Thread.currentThread().isInterrupted());
452              return;
453            }
454            fail("did not get the expected IOException from the 2nd call");
455          }
456          fail("did not get the expected IOException from the 1st call");
457        }
458      };
459      Future<?> result = executor.submit(runnable);
460      try {
461        result.get(2000, TimeUnit.MILLISECONDS);
462      } catch (TimeoutException te) {
463        // it is ok, expected.
464      }
465      waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000);
466      executor.shutdownNow();
467      executor = null;
468
469      // make sure the runnable is finished with no exception thrown.
470      result.get();
471    } finally {
472      if (executor != null) {
473        // interrupt the thread in case the test fails in the middle.
474        // it has no effect if the thread is already terminated.
475        executor.shutdownNow();
476      }
477      fs.delete(logDir, true);
478    }
479  }
480
481  private Table installTable(int nrs) throws Exception {
482    return installTable(nrs, 0);
483  }
484
485  private Table installTable(int nrs, int existingRegions) throws Exception {
486    // Create a table with regions
487    byte[] family = Bytes.toBytes("family");
488    LOG.info("Creating table with " + nrs + " regions");
489    Table table = TEST_UTIL.createMultiRegionTable(tableName, family, nrs);
490    int numRegions = -1;
491    try (RegionLocator r = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
492      numRegions = r.getStartKeys().length;
493    }
494    assertEquals(nrs, numRegions);
495    LOG.info("Waiting for no more RIT\n");
496    blockUntilNoRIT();
497    // disable-enable cycle to get rid of table's dead regions left behind
498    // by createMultiRegions
499    assertTrue(TEST_UTIL.getAdmin().isTableEnabled(tableName));
500    LOG.debug("Disabling table\n");
501    TEST_UTIL.getAdmin().disableTable(tableName);
502    LOG.debug("Waiting for no more RIT\n");
503    blockUntilNoRIT();
504    NavigableSet<String> regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
505    LOG.debug("Verifying only catalog region is assigned\n");
506    if (regions.size() != 1) {
507      for (String oregion : regions)
508        LOG.debug("Region still online: " + oregion);
509    }
510    assertEquals(1 + existingRegions, regions.size());
511    LOG.debug("Enabling table\n");
512    TEST_UTIL.getAdmin().enableTable(tableName);
513    LOG.debug("Waiting for no more RIT\n");
514    blockUntilNoRIT();
515    LOG.debug("Verifying there are " + numRegions + " assigned on cluster\n");
516    regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
517    assertEquals(numRegions + 1 + existingRegions, regions.size());
518    return table;
519  }
520
521  void populateDataInTable(int nrows) throws Exception {
522    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
523    assertEquals(NUM_RS, rsts.size());
524
525    for (RegionServerThread rst : rsts) {
526      HRegionServer hrs = rst.getRegionServer();
527      List<RegionInfo> hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
528      for (RegionInfo hri : hris) {
529        if (hri.getTable().isSystemTable()) {
530          continue;
531        }
532        LOG.debug(
533          "adding data to rs = " + rst.getName() + " region = " + hri.getRegionNameAsString());
534        Region region = hrs.getOnlineRegion(hri.getRegionName());
535        assertTrue(region != null);
536        putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY);
537      }
538    }
539
540    for (MasterThread mt : cluster.getLiveMasterThreads()) {
541      HRegionServer hrs = mt.getMaster();
542      List<RegionInfo> hris;
543      try {
544        hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
545      } catch (ServerNotRunningYetException e) {
546        // It's ok: this master may be a backup. Ignored.
547        continue;
548      }
549      for (RegionInfo hri : hris) {
550        if (hri.getTable().isSystemTable()) {
551          continue;
552        }
553        LOG.debug(
554          "adding data to rs = " + mt.getName() + " region = " + hri.getRegionNameAsString());
555        Region region = hrs.getOnlineRegion(hri.getRegionName());
556        assertTrue(region != null);
557        putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY);
558      }
559    }
560  }
561
562  public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int num_edits, int edit_size)
563      throws IOException {
564    makeWAL(hrs, regions, num_edits, edit_size, true);
565  }
566
567  public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int numEdits, int editSize,
568      boolean cleanShutdown) throws IOException {
569    // remove root and meta region
570    regions.remove(RegionInfoBuilder.FIRST_META_REGIONINFO);
571
572    for (Iterator<RegionInfo> iter = regions.iterator(); iter.hasNext();) {
573      RegionInfo regionInfo = iter.next();
574      if (regionInfo.getTable().isSystemTable()) {
575        iter.remove();
576      }
577    }
578    byte[] value = new byte[editSize];
579
580    List<RegionInfo> hris = new ArrayList<>();
581    for (RegionInfo region : regions) {
582      if (region.getTable() != tableName) {
583        continue;
584      }
585      hris.add(region);
586    }
587    LOG.info("Creating wal edits across " + hris.size() + " regions.");
588    for (int i = 0; i < editSize; i++) {
589      value[i] = (byte) ('a' + (i % 26));
590    }
591    int n = hris.size();
592    int[] counts = new int[n];
593    // sync every ~30k to line up with desired wal rolls
594    final int syncEvery = 30 * 1024 / editSize;
595    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
596    if (n > 0) {
597      for (int i = 0; i < numEdits; i += 1) {
598        WALEdit e = new WALEdit();
599        RegionInfo curRegionInfo = hris.get(i % n);
600        WAL log = hrs.getWAL(curRegionInfo);
601        byte[] startRow = curRegionInfo.getStartKey();
602        if (startRow == null || startRow.length == 0) {
603          startRow = new byte[] { 0, 0, 0, 0, 1 };
604        }
605        byte[] row = Bytes.incrementBytes(startRow, counts[i % n]);
606        row = Arrays.copyOfRange(row, 3, 8); // use last 5 bytes because
607        // HBaseTestingUtility.createMultiRegions use 5 bytes key
608        byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i));
609        e.add(new KeyValue(row, COLUMN_FAMILY, qualifier, System.currentTimeMillis(), value));
610        log.appendData(curRegionInfo, new WALKeyImpl(curRegionInfo.getEncodedNameAsBytes(),
611          tableName, System.currentTimeMillis(), mvcc), e);
612        if (0 == i % syncEvery) {
613          log.sync();
614        }
615        counts[i % n] += 1;
616      }
617    }
618    // done as two passes because the regions might share logs. shutdown is idempotent, but sync
619    // will cause errors if done after.
620    for (RegionInfo info : hris) {
621      WAL log = hrs.getWAL(info);
622      log.sync();
623    }
624    if (cleanShutdown) {
625      for (RegionInfo info : hris) {
626        WAL log = hrs.getWAL(info);
627        log.shutdown();
628      }
629    }
630    for (int i = 0; i < n; i++) {
631      LOG.info("region " + hris.get(i).getRegionNameAsString() + " has " + counts[i] + " edits");
632    }
633    return;
634  }
635
636  private int countWAL(Path log, FileSystem fs, Configuration conf) throws IOException {
637    int count = 0;
638    try (WAL.Reader in = WALFactory.createReader(fs, log, conf)) {
639      WAL.Entry e;
640      while ((e = in.next()) != null) {
641        if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) {
642          count++;
643        }
644      }
645    }
646    return count;
647  }
648
649  private void blockUntilNoRIT() throws Exception {
650    TEST_UTIL.waitUntilNoRegionsInTransition(60000);
651  }
652
653  private void putData(Region region, byte[] startRow, int numRows, byte[] qf, byte[]... families)
654      throws IOException {
655    for (int i = 0; i < numRows; i++) {
656      Put put = new Put(Bytes.add(startRow, Bytes.toBytes(i)));
657      for (byte[] family : families) {
658        put.addColumn(family, qf, null);
659      }
660      region.put(put);
661    }
662  }
663
664  private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems)
665      throws InterruptedException {
666    long curt = System.currentTimeMillis();
667    long endt = curt + timems;
668    while (curt < endt) {
669      if (ctr.sum() == oldval) {
670        Thread.sleep(100);
671        curt = System.currentTimeMillis();
672      } else {
673        assertEquals(newval, ctr.sum());
674        return;
675      }
676    }
677    fail();
678  }
679
680  private void abortMaster(MiniHBaseCluster cluster) throws InterruptedException {
681    for (MasterThread mt : cluster.getLiveMasterThreads()) {
682      if (mt.getMaster().isActiveMaster()) {
683        mt.getMaster().abort("Aborting for tests", new Exception("Trace info"));
684        mt.join();
685        break;
686      }
687    }
688    LOG.debug("Master is aborted");
689  }
690
691  /**
692   * Find a RS that has regions of a table.
693   * @param hasMetaRegion when true, the returned RS has hbase:meta region as well
694   */
695  private HRegionServer findRSToKill(boolean hasMetaRegion) throws Exception {
696    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
697    List<RegionInfo> regions = null;
698    HRegionServer hrs = null;
699
700    for (RegionServerThread rst : rsts) {
701      hrs = rst.getRegionServer();
702      while (rst.isAlive() && !hrs.isOnline()) {
703        Thread.sleep(100);
704      }
705      if (!rst.isAlive()) {
706        continue;
707      }
708      boolean isCarryingMeta = false;
709      boolean foundTableRegion = false;
710      regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
711      for (RegionInfo region : regions) {
712        if (region.isMetaRegion()) {
713          isCarryingMeta = true;
714        }
715        if (region.getTable() == tableName) {
716          foundTableRegion = true;
717        }
718        if (foundTableRegion && (isCarryingMeta || !hasMetaRegion)) {
719          break;
720        }
721      }
722      if (isCarryingMeta && hasMetaRegion) {
723        // clients ask for a RS with META
724        if (!foundTableRegion) {
725          HRegionServer destRS = hrs;
726          // the RS doesn't have regions of the specified table so we need move one to this RS
727          List<RegionInfo> tableRegions = TEST_UTIL.getAdmin().getRegions(tableName);
728          RegionInfo hri = tableRegions.get(0);
729          TEST_UTIL.getAdmin().move(hri.getEncodedNameAsBytes(), destRS.getServerName());
730          // wait for region move completes
731          RegionStates regionStates =
732              TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
733          TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() {
734            @Override
735            public boolean evaluate() throws Exception {
736              ServerName sn = regionStates.getRegionServerOfRegion(hri);
737              return (sn != null && sn.equals(destRS.getServerName()));
738            }
739          });
740        }
741        return hrs;
742      } else if (hasMetaRegion || isCarryingMeta) {
743        continue;
744      }
745      if (foundTableRegion) {
746        break;
747      }
748    }
749
750    return hrs;
751  }
752}