001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
021import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_wait_for_zk_delete;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Iterator;
030import java.util.List;
031import java.util.NavigableSet;
032import java.util.concurrent.ExecutorService;
033import java.util.concurrent.Executors;
034import java.util.concurrent.Future;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.TimeoutException;
037import java.util.concurrent.atomic.LongAdder;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.FSDataOutputStream;
040import org.apache.hadoop.fs.FileSystem;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.hbase.HBaseTestingUtil;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.KeyValue;
045import org.apache.hadoop.hbase.ServerName;
046import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
047import org.apache.hadoop.hbase.SplitLogCounters;
048import org.apache.hadoop.hbase.StartTestingClusterOption;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.Waiter;
051import org.apache.hadoop.hbase.client.Put;
052import org.apache.hadoop.hbase.client.RegionInfo;
053import org.apache.hadoop.hbase.client.RegionInfoBuilder;
054import org.apache.hadoop.hbase.client.RegionLocator;
055import org.apache.hadoop.hbase.client.Table;
056import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
057import org.apache.hadoop.hbase.master.assignment.RegionStates;
058import org.apache.hadoop.hbase.regionserver.HRegionServer;
059import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
060import org.apache.hadoop.hbase.regionserver.Region;
061import org.apache.hadoop.hbase.util.Bytes;
062import org.apache.hadoop.hbase.util.CommonFSUtils;
063import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
064import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
065import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
066import org.apache.hadoop.hbase.util.Threads;
067import org.apache.hadoop.hbase.wal.WAL;
068import org.apache.hadoop.hbase.wal.WALEdit;
069import org.apache.hadoop.hbase.wal.WALKeyImpl;
070import org.apache.hadoop.hbase.zookeeper.ZKUtil;
071import org.junit.After;
072import org.junit.AfterClass;
073import org.junit.Before;
074import org.junit.BeforeClass;
075import org.junit.Rule;
076import org.junit.Test;
077import org.junit.rules.TestName;
078import org.slf4j.Logger;
079import org.slf4j.LoggerFactory;
080
081import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
082
083/**
084 * Base class for testing distributed log splitting.
085 */
086public abstract class AbstractTestDLS {
087  private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogManager.class);
088
089  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
090
091  // Start a cluster with 2 masters and 5 regionservers
092  private static final int NUM_MASTERS = 2;
093  private static final int NUM_RS = 5;
094  private static byte[] COLUMN_FAMILY = Bytes.toBytes("family");
095
096  @Rule
097  public TestName testName = new TestName();
098
099  private TableName tableName;
100  private SingleProcessHBaseCluster cluster;
101  private HMaster master;
102  private Configuration conf;
103
104  @Rule
105  public TestName name = new TestName();
106
107  @BeforeClass
108  public static void setup() throws Exception {
109    // Uncomment the following line if more verbosity is needed for
110    // debugging (see HBASE-12285 for details).
111    // Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
112    TEST_UTIL.startMiniZKCluster();
113    TEST_UTIL.startMiniDFSCluster(3);
114  }
115
116  @AfterClass
117  public static void tearDown() throws Exception {
118    TEST_UTIL.shutdownMiniCluster();
119  }
120
121  protected abstract String getWalProvider();
122
123  private void startCluster(int numRS) throws Exception {
124    SplitLogCounters.resetCounters();
125    LOG.info("Starting cluster");
126    conf.setLong("hbase.splitlog.max.resubmit", 0);
127    // Make the failure test faster
128    conf.setInt("zookeeper.recovery.retry", 0);
129    conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1);
130    conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing
131    conf.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 3);
132    conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
133    conf.set("hbase.wal.provider", getWalProvider());
134    StartTestingClusterOption option =
135      StartTestingClusterOption.builder().numMasters(NUM_MASTERS).numRegionServers(numRS).build();
136    TEST_UTIL.startMiniHBaseCluster(option);
137    cluster = TEST_UTIL.getHBaseCluster();
138    LOG.info("Waiting for active/ready master");
139    cluster.waitForActiveAndReadyMaster();
140    master = cluster.getMaster();
141    TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
142      @Override
143      public boolean evaluate() throws Exception {
144        return cluster.getLiveRegionServerThreads().size() >= numRS;
145      }
146    });
147  }
148
149  @Before
150  public void before() throws Exception {
151    conf = TEST_UTIL.getConfiguration();
152    tableName = TableName.valueOf(testName.getMethodName());
153  }
154
155  @After
156  public void after() throws Exception {
157    TEST_UTIL.shutdownMiniHBaseCluster();
158    TEST_UTIL.getTestFileSystem().delete(CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration()),
159      true);
160    ZKUtil.deleteNodeRecursively(TEST_UTIL.getZooKeeperWatcher(), "/hbase");
161  }
162
163  @Test
164  public void testMasterStartsUpWithLogSplittingWork() throws Exception {
165    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1);
166    startCluster(NUM_RS);
167
168    int numRegionsToCreate = 40;
169    int numLogLines = 1000;
170    // turn off load balancing to prevent regions from moving around otherwise
171    // they will consume recovered.edits
172    master.balanceSwitch(false);
173
174    try (Table ht = installTable(numRegionsToCreate)) {
175      HRegionServer hrs = findRSToKill(false);
176      List<RegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
177      makeWAL(hrs, regions, numLogLines, 100);
178
179      // abort master
180      abortMaster(cluster);
181
182      // abort RS
183      LOG.info("Aborting region server: " + hrs.getServerName());
184      hrs.abort("testing");
185
186      // wait for abort completes
187      TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
188        @Override
189        public boolean evaluate() throws Exception {
190          return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 1;
191        }
192      });
193
194      Thread.sleep(2000);
195      LOG.info("Current Open Regions:" + HBaseTestingUtil.getAllOnlineRegions(cluster).size());
196
197      // wait for abort completes
198      TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
199        @Override
200        public boolean evaluate() throws Exception {
201          return (HBaseTestingUtil.getAllOnlineRegions(cluster).size() >= (numRegionsToCreate + 1));
202        }
203      });
204
205      LOG.info("Current Open Regions After Master Node Starts Up:"
206        + HBaseTestingUtil.getAllOnlineRegions(cluster).size());
207
208      assertEquals(numLogLines, TEST_UTIL.countRows(ht));
209    }
210  }
211
212  @Test
213  public void testThreeRSAbort() throws Exception {
214    LOG.info("testThreeRSAbort");
215    int numRegionsToCreate = 40;
216    int numRowsPerRegion = 100;
217
218    startCluster(NUM_RS); // NUM_RS=6.
219
220    try (Table table = installTable(numRegionsToCreate)) {
221      populateDataInTable(numRowsPerRegion);
222
223      List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
224      assertEquals(NUM_RS, rsts.size());
225      cluster.killRegionServer(rsts.get(0).getRegionServer().getServerName());
226      cluster.killRegionServer(rsts.get(1).getRegionServer().getServerName());
227      cluster.killRegionServer(rsts.get(2).getRegionServer().getServerName());
228
229      TEST_UTIL.waitFor(60000, new Waiter.ExplainingPredicate<Exception>() {
230
231        @Override
232        public boolean evaluate() throws Exception {
233          return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 3;
234        }
235
236        @Override
237        public String explainFailure() throws Exception {
238          return "Timed out waiting for server aborts.";
239        }
240      });
241      TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
242      int rows;
243      try {
244        rows = TEST_UTIL.countRows(table);
245      } catch (Exception e) {
246        Threads.printThreadInfo(System.out, "Thread dump before fail");
247        throw e;
248      }
249      assertEquals(numRegionsToCreate * numRowsPerRegion, rows);
250    }
251  }
252
253  @Test
254  public void testDelayedDeleteOnFailure() throws Exception {
255    if (
256      !this.conf.getBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
257        HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)
258    ) {
259      // This test depends on zk coordination....
260      return;
261    }
262    LOG.info("testDelayedDeleteOnFailure");
263    startCluster(1);
264    final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
265    final FileSystem fs = master.getMasterFileSystem().getFileSystem();
266    final Path rootLogDir =
267      new Path(CommonFSUtils.getWALRootDir(conf), HConstants.HREGION_LOGDIR_NAME);
268    final Path logDir = new Path(rootLogDir, ServerName.valueOf("x", 1, 1).toString());
269    fs.mkdirs(logDir);
270    ExecutorService executor = null;
271    try {
272      final Path corruptedLogFile = new Path(logDir, "x");
273      FSDataOutputStream out;
274      out = fs.create(corruptedLogFile);
275      out.write(0);
276      out.write(Bytes.toBytes("corrupted bytes"));
277      out.close();
278      ZKSplitLogManagerCoordination coordination =
279        (ZKSplitLogManagerCoordination) (master.getCoordinatedStateManager())
280          .getSplitLogManagerCoordination();
281      coordination.setIgnoreDeleteForTesting(true);
282      executor = Executors.newSingleThreadExecutor();
283      Runnable runnable = new Runnable() {
284        @Override
285        public void run() {
286          try {
287            // since the logDir is a fake, corrupted one, so the split log worker
288            // will finish it quickly with error, and this call will fail and throw
289            // an IOException.
290            slm.splitLogDistributed(logDir);
291          } catch (IOException ioe) {
292            try {
293              assertTrue(fs.exists(corruptedLogFile));
294              // this call will block waiting for the task to be removed from the
295              // tasks map which is not going to happen since ignoreZKDeleteForTesting
296              // is set to true, until it is interrupted.
297              slm.splitLogDistributed(logDir);
298            } catch (IOException e) {
299              assertTrue(Thread.currentThread().isInterrupted());
300              return;
301            }
302            fail("did not get the expected IOException from the 2nd call");
303          }
304          fail("did not get the expected IOException from the 1st call");
305        }
306      };
307      Future<?> result = executor.submit(runnable);
308      try {
309        result.get(2000, TimeUnit.MILLISECONDS);
310      } catch (TimeoutException te) {
311        // it is ok, expected.
312      }
313      waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000);
314      executor.shutdownNow();
315      executor = null;
316
317      // make sure the runnable is finished with no exception thrown.
318      result.get();
319    } finally {
320      if (executor != null) {
321        // interrupt the thread in case the test fails in the middle.
322        // it has no effect if the thread is already terminated.
323        executor.shutdownNow();
324      }
325      fs.delete(logDir, true);
326    }
327  }
328
329  private Table installTable(int nrs) throws Exception {
330    return installTable(nrs, 0);
331  }
332
333  private Table installTable(int nrs, int existingRegions) throws Exception {
334    // Create a table with regions
335    byte[] family = Bytes.toBytes("family");
336    LOG.info("Creating table with " + nrs + " regions");
337    Table table = TEST_UTIL.createMultiRegionTable(tableName, family, nrs);
338    int numRegions = -1;
339    try (RegionLocator r = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
340      numRegions = r.getStartKeys().length;
341    }
342    assertEquals(nrs, numRegions);
343    LOG.info("Waiting for no more RIT\n");
344    blockUntilNoRIT();
345    // disable-enable cycle to get rid of table's dead regions left behind
346    // by createMultiRegions
347    assertTrue(TEST_UTIL.getAdmin().isTableEnabled(tableName));
348    LOG.debug("Disabling table\n");
349    TEST_UTIL.getAdmin().disableTable(tableName);
350    LOG.debug("Waiting for no more RIT\n");
351    blockUntilNoRIT();
352    NavigableSet<String> regions = HBaseTestingUtil.getAllOnlineRegions(cluster);
353    LOG.debug("Verifying only catalog region is assigned\n");
354    if (regions.size() != 1) {
355      for (String oregion : regions) {
356        LOG.debug("Region still online: " + oregion);
357      }
358    }
359    assertEquals(1 + existingRegions, regions.size());
360    LOG.debug("Enabling table\n");
361    TEST_UTIL.getAdmin().enableTable(tableName);
362    LOG.debug("Waiting for no more RIT\n");
363    blockUntilNoRIT();
364    LOG.debug("Verifying there are " + numRegions + " assigned on cluster\n");
365    regions = HBaseTestingUtil.getAllOnlineRegions(cluster);
366    assertEquals(numRegions + 1 + existingRegions, regions.size());
367    return table;
368  }
369
370  void populateDataInTable(int nrows) throws Exception {
371    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
372    assertEquals(NUM_RS, rsts.size());
373
374    for (RegionServerThread rst : rsts) {
375      HRegionServer hrs = rst.getRegionServer();
376      List<RegionInfo> hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
377      for (RegionInfo hri : hris) {
378        if (hri.getTable().isSystemTable()) {
379          continue;
380        }
381        LOG.debug(
382          "adding data to rs = " + rst.getName() + " region = " + hri.getRegionNameAsString());
383        Region region = hrs.getOnlineRegion(hri.getRegionName());
384        assertTrue(region != null);
385        putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY);
386      }
387    }
388  }
389
390  public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int num_edits, int edit_size)
391    throws IOException {
392    makeWAL(hrs, regions, num_edits, edit_size, true);
393  }
394
395  public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int numEdits, int editSize,
396    boolean cleanShutdown) throws IOException {
397    // remove root and meta region
398    regions.remove(RegionInfoBuilder.FIRST_META_REGIONINFO);
399
400    for (Iterator<RegionInfo> iter = regions.iterator(); iter.hasNext();) {
401      RegionInfo regionInfo = iter.next();
402      if (regionInfo.getTable().isSystemTable()) {
403        iter.remove();
404      }
405    }
406    byte[] value = new byte[editSize];
407
408    List<RegionInfo> hris = new ArrayList<>();
409    for (RegionInfo region : regions) {
410      if (region.getTable() != tableName) {
411        continue;
412      }
413      hris.add(region);
414    }
415    LOG.info("Creating wal edits across " + hris.size() + " regions.");
416    for (int i = 0; i < editSize; i++) {
417      value[i] = (byte) ('a' + (i % 26));
418    }
419    int n = hris.size();
420    int[] counts = new int[n];
421    // sync every ~30k to line up with desired wal rolls
422    final int syncEvery = 30 * 1024 / editSize;
423    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
424    if (n > 0) {
425      for (int i = 0; i < numEdits; i += 1) {
426        WALEdit e = new WALEdit();
427        RegionInfo curRegionInfo = hris.get(i % n);
428        WAL log = hrs.getWAL(curRegionInfo);
429        byte[] startRow = curRegionInfo.getStartKey();
430        if (startRow == null || startRow.length == 0) {
431          startRow = new byte[] { 0, 0, 0, 0, 1 };
432        }
433        byte[] row = Bytes.incrementBytes(startRow, counts[i % n]);
434        row = Arrays.copyOfRange(row, 3, 8); // use last 5 bytes because
435        // HBaseTestingUtility.createMultiRegions use 5 bytes key
436        byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i));
437        e.add(
438          new KeyValue(row, COLUMN_FAMILY, qualifier, EnvironmentEdgeManager.currentTime(), value));
439        log.appendData(curRegionInfo, new WALKeyImpl(curRegionInfo.getEncodedNameAsBytes(),
440          tableName, EnvironmentEdgeManager.currentTime(), mvcc), e);
441        if (0 == i % syncEvery) {
442          log.sync();
443        }
444        counts[i % n] += 1;
445      }
446    }
447    // done as two passes because the regions might share logs. shutdown is idempotent, but sync
448    // will cause errors if done after.
449    for (RegionInfo info : hris) {
450      WAL log = hrs.getWAL(info);
451      log.sync();
452    }
453    if (cleanShutdown) {
454      for (RegionInfo info : hris) {
455        WAL log = hrs.getWAL(info);
456        log.shutdown();
457      }
458    }
459    for (int i = 0; i < n; i++) {
460      LOG.info("region " + hris.get(i).getRegionNameAsString() + " has " + counts[i] + " edits");
461    }
462    return;
463  }
464
465  private void blockUntilNoRIT() throws Exception {
466    TEST_UTIL.waitUntilNoRegionsInTransition(60000);
467  }
468
469  private void putData(Region region, byte[] startRow, int numRows, byte[] qf, byte[]... families)
470    throws IOException {
471    for (int i = 0; i < numRows; i++) {
472      Put put = new Put(Bytes.add(startRow, Bytes.toBytes(i)));
473      for (byte[] family : families) {
474        put.addColumn(family, qf, null);
475      }
476      region.put(put);
477    }
478  }
479
480  private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems)
481    throws InterruptedException {
482    long curt = EnvironmentEdgeManager.currentTime();
483    long endt = curt + timems;
484    while (curt < endt) {
485      if (ctr.sum() == oldval) {
486        Thread.sleep(100);
487        curt = EnvironmentEdgeManager.currentTime();
488      } else {
489        assertEquals(newval, ctr.sum());
490        return;
491      }
492    }
493    fail();
494  }
495
496  private void abortMaster(SingleProcessHBaseCluster cluster) throws InterruptedException {
497    for (MasterThread mt : cluster.getLiveMasterThreads()) {
498      if (mt.getMaster().isActiveMaster()) {
499        mt.getMaster().abort("Aborting for tests", new Exception("Trace info"));
500        mt.join();
501        break;
502      }
503    }
504    LOG.debug("Master is aborted");
505  }
506
507  /**
508   * Find a RS that has regions of a table.
509   * @param hasMetaRegion when true, the returned RS has hbase:meta region as well
510   */
511  private HRegionServer findRSToKill(boolean hasMetaRegion) throws Exception {
512    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
513    List<RegionInfo> regions = null;
514    HRegionServer hrs = null;
515
516    for (RegionServerThread rst : rsts) {
517      hrs = rst.getRegionServer();
518      while (rst.isAlive() && !hrs.isOnline()) {
519        Thread.sleep(100);
520      }
521      if (!rst.isAlive()) {
522        continue;
523      }
524      boolean isCarryingMeta = false;
525      boolean foundTableRegion = false;
526      regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
527      for (RegionInfo region : regions) {
528        if (region.isMetaRegion()) {
529          isCarryingMeta = true;
530        }
531        if (region.getTable() == tableName) {
532          foundTableRegion = true;
533        }
534        if (foundTableRegion && (isCarryingMeta || !hasMetaRegion)) {
535          break;
536        }
537      }
538      if (isCarryingMeta && hasMetaRegion) {
539        // clients ask for a RS with META
540        if (!foundTableRegion) {
541          HRegionServer destRS = hrs;
542          // the RS doesn't have regions of the specified table so we need move one to this RS
543          List<RegionInfo> tableRegions = TEST_UTIL.getAdmin().getRegions(tableName);
544          RegionInfo hri = tableRegions.get(0);
545          TEST_UTIL.getAdmin().move(hri.getEncodedNameAsBytes(), destRS.getServerName());
546          // wait for region move completes
547          RegionStates regionStates =
548            TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
549          TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() {
550            @Override
551            public boolean evaluate() throws Exception {
552              ServerName sn = regionStates.getRegionServerOfRegion(hri);
553              return (sn != null && sn.equals(destRS.getServerName()));
554            }
555          });
556        }
557        return hrs;
558      } else if (hasMetaRegion || isCarryingMeta) {
559        continue;
560      }
561      if (foundTableRegion) {
562        break;
563      }
564    }
565
566    return hrs;
567  }
568}