001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
021import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_wait_for_zk_delete;
022import static org.junit.jupiter.api.Assertions.assertEquals;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024import static org.junit.jupiter.api.Assertions.fail;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Iterator;
030import java.util.List;
031import java.util.NavigableSet;
032import java.util.concurrent.ExecutorService;
033import java.util.concurrent.Executors;
034import java.util.concurrent.Future;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.TimeoutException;
037import java.util.concurrent.atomic.LongAdder;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.FSDataOutputStream;
040import org.apache.hadoop.fs.FileSystem;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.hbase.HBaseTestingUtil;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.KeyValue;
045import org.apache.hadoop.hbase.ServerName;
046import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
047import org.apache.hadoop.hbase.SplitLogCounters;
048import org.apache.hadoop.hbase.StartTestingClusterOption;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.Waiter;
051import org.apache.hadoop.hbase.client.Put;
052import org.apache.hadoop.hbase.client.RegionInfo;
053import org.apache.hadoop.hbase.client.RegionInfoBuilder;
054import org.apache.hadoop.hbase.client.RegionLocator;
055import org.apache.hadoop.hbase.client.Table;
056import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
057import org.apache.hadoop.hbase.master.assignment.RegionStates;
058import org.apache.hadoop.hbase.regionserver.HRegionServer;
059import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
060import org.apache.hadoop.hbase.regionserver.Region;
061import org.apache.hadoop.hbase.util.Bytes;
062import org.apache.hadoop.hbase.util.CommonFSUtils;
063import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
064import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
065import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
066import org.apache.hadoop.hbase.util.Threads;
067import org.apache.hadoop.hbase.wal.WAL;
068import org.apache.hadoop.hbase.wal.WALEdit;
069import org.apache.hadoop.hbase.wal.WALEditInternalHelper;
070import org.apache.hadoop.hbase.wal.WALKeyImpl;
071import org.apache.hadoop.hbase.zookeeper.ZKUtil;
072import org.junit.jupiter.api.AfterAll;
073import org.junit.jupiter.api.AfterEach;
074import org.junit.jupiter.api.BeforeAll;
075import org.junit.jupiter.api.BeforeEach;
076import org.junit.jupiter.api.Test;
077import org.junit.jupiter.api.TestInfo;
078import org.slf4j.Logger;
079import org.slf4j.LoggerFactory;
080
081import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
082
083/**
084 * Base class for testing distributed log splitting.
085 */
086public abstract class AbstractTestDLS {
087  private static final Logger LOG = LoggerFactory.getLogger(AbstractTestDLS.class);
088
089  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
090
091  // Start a cluster with 2 masters and 5 regionservers
092  private static final int NUM_MASTERS = 2;
093  private static final int NUM_RS = 5;
094  private static byte[] COLUMN_FAMILY = Bytes.toBytes("family");
095
096  private TableName tableName;
097  private SingleProcessHBaseCluster cluster;
098  private HMaster master;
099  private Configuration conf;
100
101  @BeforeAll
102  public static void setup() throws Exception {
103    // Uncomment the following line if more verbosity is needed for
104    // debugging (see HBASE-12285 for details).
105    // Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
106    TEST_UTIL.startMiniZKCluster();
107    TEST_UTIL.startMiniDFSCluster(3);
108  }
109
110  @AfterAll
111  public static void tearDown() throws Exception {
112    TEST_UTIL.shutdownMiniCluster();
113  }
114
115  protected abstract String getWalProvider();
116
117  private void startCluster(int numRS) throws Exception {
118    SplitLogCounters.resetCounters();
119    LOG.info("Starting cluster");
120    conf.setLong("hbase.splitlog.max.resubmit", 0);
121    // Make the failure test faster
122    conf.setInt("zookeeper.recovery.retry", 0);
123    conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1);
124    conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing
125    conf.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 3);
126    conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
127    conf.set("hbase.wal.provider", getWalProvider());
128    StartTestingClusterOption option =
129      StartTestingClusterOption.builder().numMasters(NUM_MASTERS).numRegionServers(numRS).build();
130    TEST_UTIL.startMiniHBaseCluster(option);
131    cluster = TEST_UTIL.getHBaseCluster();
132    LOG.info("Waiting for active/ready master");
133    cluster.waitForActiveAndReadyMaster();
134    master = cluster.getMaster();
135    TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
136      @Override
137      public boolean evaluate() throws Exception {
138        return cluster.getLiveRegionServerThreads().size() >= numRS;
139      }
140    });
141  }
142
143  @BeforeEach
144  public void before(TestInfo testInfo) throws Exception {
145    conf = TEST_UTIL.getConfiguration();
146    tableName = TableName.valueOf(testInfo.getTestMethod().get().getName());
147  }
148
149  @AfterEach
150  public void after() throws Exception {
151    TEST_UTIL.shutdownMiniHBaseCluster();
152    TEST_UTIL.getTestFileSystem().delete(CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration()),
153      true);
154    ZKUtil.deleteNodeRecursively(TEST_UTIL.getZooKeeperWatcher(), "/hbase");
155  }
156
157  @Test
158  public void testMasterStartsUpWithLogSplittingWork() throws Exception {
159    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1);
160    startCluster(NUM_RS);
161
162    int numRegionsToCreate = 40;
163    int numLogLines = 1000;
164    // turn off load balancing to prevent regions from moving around otherwise
165    // they will consume recovered.edits
166    master.balanceSwitch(false);
167
168    try (Table ht = installTable(numRegionsToCreate)) {
169      HRegionServer hrs = findRSToKill(false);
170      List<RegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
171      makeWAL(hrs, regions, numLogLines, 100);
172
173      // abort master
174      abortMaster(cluster);
175
176      // abort RS
177      LOG.info("Aborting region server: " + hrs.getServerName());
178      hrs.abort("testing");
179
180      // wait for abort completes
181      TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
182        @Override
183        public boolean evaluate() throws Exception {
184          return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 1;
185        }
186      });
187
188      Thread.sleep(2000);
189      LOG.info("Current Open Regions:" + HBaseTestingUtil.getAllOnlineRegions(cluster).size());
190
191      // wait for abort completes
192      TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
193        @Override
194        public boolean evaluate() throws Exception {
195          return (HBaseTestingUtil.getAllOnlineRegions(cluster).size() >= (numRegionsToCreate + 1));
196        }
197      });
198
199      LOG.info("Current Open Regions After Master Node Starts Up:"
200        + HBaseTestingUtil.getAllOnlineRegions(cluster).size());
201
202      assertEquals(numLogLines, TEST_UTIL.countRows(ht));
203    }
204  }
205
206  @Test
207  public void testThreeRSAbort() throws Exception {
208    LOG.info("testThreeRSAbort");
209    int numRegionsToCreate = 40;
210    int numRowsPerRegion = 100;
211
212    startCluster(NUM_RS); // NUM_RS=6.
213
214    try (Table table = installTable(numRegionsToCreate)) {
215      populateDataInTable(numRowsPerRegion);
216
217      List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
218      assertEquals(NUM_RS, rsts.size());
219      cluster.killRegionServer(rsts.get(0).getRegionServer().getServerName());
220      cluster.killRegionServer(rsts.get(1).getRegionServer().getServerName());
221      cluster.killRegionServer(rsts.get(2).getRegionServer().getServerName());
222
223      TEST_UTIL.waitFor(60000, new Waiter.ExplainingPredicate<Exception>() {
224
225        @Override
226        public boolean evaluate() throws Exception {
227          return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 3;
228        }
229
230        @Override
231        public String explainFailure() throws Exception {
232          return "Timed out waiting for server aborts.";
233        }
234      });
235      TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
236      int rows;
237      try {
238        rows = TEST_UTIL.countRows(table);
239      } catch (Exception e) {
240        Threads.printThreadInfo(System.out, "Thread dump before fail");
241        throw e;
242      }
243      assertEquals(numRegionsToCreate * numRowsPerRegion, rows);
244    }
245  }
246
247  @Test
248  public void testDelayedDeleteOnFailure() throws Exception {
249    if (
250      !this.conf.getBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
251        HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)
252    ) {
253      // This test depends on zk coordination....
254      return;
255    }
256    LOG.info("testDelayedDeleteOnFailure");
257    startCluster(1);
258    final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
259    final FileSystem fs = master.getMasterFileSystem().getFileSystem();
260    final Path rootLogDir =
261      new Path(CommonFSUtils.getWALRootDir(conf), HConstants.HREGION_LOGDIR_NAME);
262    final Path logDir = new Path(rootLogDir, ServerName.valueOf("x", 1, 1).toString());
263    fs.mkdirs(logDir);
264    ExecutorService executor = null;
265    try {
266      final Path corruptedLogFile = new Path(logDir, "x");
267      FSDataOutputStream out;
268      out = fs.create(corruptedLogFile);
269      out.write(0);
270      out.write(Bytes.toBytes("corrupted bytes"));
271      out.close();
272      ZKSplitLogManagerCoordination coordination =
273        (ZKSplitLogManagerCoordination) (master.getCoordinatedStateManager())
274          .getSplitLogManagerCoordination();
275      coordination.setIgnoreDeleteForTesting(true);
276      executor = Executors.newSingleThreadExecutor();
277      Runnable runnable = new Runnable() {
278        @Override
279        public void run() {
280          try {
281            // since the logDir is a fake, corrupted one, so the split log worker
282            // will finish it quickly with error, and this call will fail and throw
283            // an IOException.
284            slm.splitLogDistributed(logDir);
285          } catch (IOException ioe) {
286            try {
287              assertTrue(fs.exists(corruptedLogFile));
288              // this call will block waiting for the task to be removed from the
289              // tasks map which is not going to happen since ignoreZKDeleteForTesting
290              // is set to true, until it is interrupted.
291              slm.splitLogDistributed(logDir);
292            } catch (IOException e) {
293              assertTrue(Thread.currentThread().isInterrupted());
294              return;
295            }
296            fail("did not get the expected IOException from the 2nd call");
297          }
298          fail("did not get the expected IOException from the 1st call");
299        }
300      };
301      Future<?> result = executor.submit(runnable);
302      try {
303        result.get(2000, TimeUnit.MILLISECONDS);
304      } catch (TimeoutException te) {
305        // it is ok, expected.
306      }
307      waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000);
308      executor.shutdownNow();
309      executor = null;
310
311      // make sure the runnable is finished with no exception thrown.
312      result.get();
313    } finally {
314      if (executor != null) {
315        // interrupt the thread in case the test fails in the middle.
316        // it has no effect if the thread is already terminated.
317        executor.shutdownNow();
318      }
319      fs.delete(logDir, true);
320    }
321  }
322
323  private Table installTable(int nrs) throws Exception {
324    return installTable(nrs, 0);
325  }
326
327  private Table installTable(int nrs, int existingRegions) throws Exception {
328    // Create a table with regions
329    byte[] family = Bytes.toBytes("family");
330    LOG.info("Creating table with " + nrs + " regions");
331    Table table = TEST_UTIL.createMultiRegionTable(tableName, family, nrs);
332    int numRegions = -1;
333    try (RegionLocator r = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
334      numRegions = r.getStartKeys().length;
335    }
336    assertEquals(nrs, numRegions);
337    LOG.info("Waiting for no more RIT\n");
338    blockUntilNoRIT();
339    // disable-enable cycle to get rid of table's dead regions left behind
340    // by createMultiRegions
341    assertTrue(TEST_UTIL.getAdmin().isTableEnabled(tableName));
342    LOG.debug("Disabling table\n");
343    TEST_UTIL.getAdmin().disableTable(tableName);
344    LOG.debug("Waiting for no more RIT\n");
345    blockUntilNoRIT();
346    NavigableSet<String> regions = HBaseTestingUtil.getAllOnlineRegions(cluster);
347    LOG.debug("Verifying only catalog region is assigned\n");
348    if (regions.size() != 1) {
349      for (String oregion : regions) {
350        LOG.debug("Region still online: " + oregion);
351      }
352    }
353    assertEquals(1 + existingRegions, regions.size());
354    LOG.debug("Enabling table\n");
355    TEST_UTIL.getAdmin().enableTable(tableName);
356    LOG.debug("Waiting for no more RIT\n");
357    blockUntilNoRIT();
358    LOG.debug("Verifying there are " + numRegions + " assigned on cluster\n");
359    regions = HBaseTestingUtil.getAllOnlineRegions(cluster);
360    assertEquals(numRegions + 1 + existingRegions, regions.size());
361    return table;
362  }
363
364  void populateDataInTable(int nrows) throws Exception {
365    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
366    assertEquals(NUM_RS, rsts.size());
367
368    for (RegionServerThread rst : rsts) {
369      HRegionServer hrs = rst.getRegionServer();
370      List<RegionInfo> hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
371      for (RegionInfo hri : hris) {
372        if (hri.getTable().isSystemTable()) {
373          continue;
374        }
375        LOG.debug(
376          "adding data to rs = " + rst.getName() + " region = " + hri.getRegionNameAsString());
377        Region region = hrs.getOnlineRegion(hri.getRegionName());
378        assertTrue(region != null);
379        putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY);
380      }
381    }
382  }
383
384  public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int num_edits, int edit_size)
385    throws IOException {
386    makeWAL(hrs, regions, num_edits, edit_size, true);
387  }
388
389  public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int numEdits, int editSize,
390    boolean cleanShutdown) throws IOException {
391    // remove root and meta region
392    regions.remove(RegionInfoBuilder.FIRST_META_REGIONINFO);
393
394    for (Iterator<RegionInfo> iter = regions.iterator(); iter.hasNext();) {
395      RegionInfo regionInfo = iter.next();
396      if (regionInfo.getTable().isSystemTable()) {
397        iter.remove();
398      }
399    }
400    byte[] value = new byte[editSize];
401
402    List<RegionInfo> hris = new ArrayList<>();
403    for (RegionInfo region : regions) {
404      if (region.getTable() != tableName) {
405        continue;
406      }
407      hris.add(region);
408    }
409    LOG.info("Creating wal edits across " + hris.size() + " regions.");
410    for (int i = 0; i < editSize; i++) {
411      value[i] = (byte) ('a' + (i % 26));
412    }
413    int n = hris.size();
414    int[] counts = new int[n];
415    // sync every ~30k to line up with desired wal rolls
416    final int syncEvery = 30 * 1024 / editSize;
417    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
418    if (n > 0) {
419      for (int i = 0; i < numEdits; i += 1) {
420        WALEdit e = new WALEdit();
421        RegionInfo curRegionInfo = hris.get(i % n);
422        WAL log = hrs.getWAL(curRegionInfo);
423        byte[] startRow = curRegionInfo.getStartKey();
424        if (startRow == null || startRow.length == 0) {
425          startRow = new byte[] { 0, 0, 0, 0, 1 };
426        }
427        byte[] row = Bytes.incrementBytes(startRow, counts[i % n]);
428        row = Arrays.copyOfRange(row, 3, 8); // use last 5 bytes because
429        // HBaseTestingUtility.createMultiRegions use 5 bytes key
430        byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i));
431        WALEditInternalHelper.addExtendedCell(e,
432          new KeyValue(row, COLUMN_FAMILY, qualifier, EnvironmentEdgeManager.currentTime(), value));
433        log.appendData(curRegionInfo, new WALKeyImpl(curRegionInfo.getEncodedNameAsBytes(),
434          tableName, EnvironmentEdgeManager.currentTime(), mvcc), e);
435        if (0 == i % syncEvery) {
436          log.sync();
437        }
438        counts[i % n] += 1;
439      }
440    }
441    // done as two passes because the regions might share logs. shutdown is idempotent, but sync
442    // will cause errors if done after.
443    for (RegionInfo info : hris) {
444      WAL log = hrs.getWAL(info);
445      log.sync();
446    }
447    if (cleanShutdown) {
448      for (RegionInfo info : hris) {
449        WAL log = hrs.getWAL(info);
450        log.shutdown();
451      }
452    }
453    for (int i = 0; i < n; i++) {
454      LOG.info("region " + hris.get(i).getRegionNameAsString() + " has " + counts[i] + " edits");
455    }
456    return;
457  }
458
459  private void blockUntilNoRIT() throws Exception {
460    TEST_UTIL.waitUntilNoRegionsInTransition(60000);
461  }
462
463  private void putData(Region region, byte[] startRow, int numRows, byte[] qf, byte[]... families)
464    throws IOException {
465    for (int i = 0; i < numRows; i++) {
466      Put put = new Put(Bytes.add(startRow, Bytes.toBytes(i)));
467      for (byte[] family : families) {
468        put.addColumn(family, qf, null);
469      }
470      region.put(put);
471    }
472  }
473
474  private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems)
475    throws InterruptedException {
476    long curt = EnvironmentEdgeManager.currentTime();
477    long endt = curt + timems;
478    while (curt < endt) {
479      if (ctr.sum() == oldval) {
480        Thread.sleep(100);
481        curt = EnvironmentEdgeManager.currentTime();
482      } else {
483        assertEquals(newval, ctr.sum());
484        return;
485      }
486    }
487    fail();
488  }
489
490  private void abortMaster(SingleProcessHBaseCluster cluster) throws InterruptedException {
491    for (MasterThread mt : cluster.getLiveMasterThreads()) {
492      if (mt.getMaster().isActiveMaster()) {
493        mt.getMaster().abort("Aborting for tests", new Exception("Trace info"));
494        mt.join();
495        break;
496      }
497    }
498    LOG.debug("Master is aborted");
499  }
500
501  /**
502   * Find a RS that has regions of a table.
503   * @param hasMetaRegion when true, the returned RS has hbase:meta region as well
504   */
505  private HRegionServer findRSToKill(boolean hasMetaRegion) throws Exception {
506    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
507    List<RegionInfo> regions = null;
508    HRegionServer hrs = null;
509
510    for (RegionServerThread rst : rsts) {
511      hrs = rst.getRegionServer();
512      while (rst.isAlive() && !hrs.isOnline()) {
513        Thread.sleep(100);
514      }
515      if (!rst.isAlive()) {
516        continue;
517      }
518      boolean isCarryingMeta = false;
519      boolean foundTableRegion = false;
520      regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
521      for (RegionInfo region : regions) {
522        if (region.isMetaRegion()) {
523          isCarryingMeta = true;
524        }
525        if (region.getTable() == tableName) {
526          foundTableRegion = true;
527        }
528        if (foundTableRegion && (isCarryingMeta || !hasMetaRegion)) {
529          break;
530        }
531      }
532      if (isCarryingMeta && hasMetaRegion) {
533        // clients ask for a RS with META
534        if (!foundTableRegion) {
535          HRegionServer destRS = hrs;
536          // the RS doesn't have regions of the specified table so we need move one to this RS
537          List<RegionInfo> tableRegions = TEST_UTIL.getAdmin().getRegions(tableName);
538          RegionInfo hri = tableRegions.get(0);
539          TEST_UTIL.getAdmin().move(hri.getEncodedNameAsBytes(), destRS.getServerName());
540          // wait for region move completes
541          RegionStates regionStates =
542            TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
543          TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() {
544            @Override
545            public boolean evaluate() throws Exception {
546              ServerName sn = regionStates.getRegionServerOfRegion(hri);
547              return (sn != null && sn.equals(destRS.getServerName()));
548            }
549          });
550        }
551        return hrs;
552      } else if (hasMetaRegion || isCarryingMeta) {
553        continue;
554      }
555      if (foundTableRegion) {
556        break;
557      }
558    }
559
560    return hrs;
561  }
562}