001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
021import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_wait_for_zk_delete;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Iterator;
030import java.util.List;
031import java.util.NavigableSet;
032import java.util.concurrent.ExecutorService;
033import java.util.concurrent.Executors;
034import java.util.concurrent.Future;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.TimeoutException;
037import java.util.concurrent.atomic.LongAdder;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.FSDataOutputStream;
040import org.apache.hadoop.fs.FileSystem;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.hbase.HBaseTestingUtil;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.KeyValue;
045import org.apache.hadoop.hbase.ServerName;
046import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
047import org.apache.hadoop.hbase.SplitLogCounters;
048import org.apache.hadoop.hbase.StartTestingClusterOption;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.Waiter;
051import org.apache.hadoop.hbase.client.Put;
052import org.apache.hadoop.hbase.client.RegionInfo;
053import org.apache.hadoop.hbase.client.RegionInfoBuilder;
054import org.apache.hadoop.hbase.client.RegionLocator;
055import org.apache.hadoop.hbase.client.Table;
056import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
057import org.apache.hadoop.hbase.master.assignment.RegionStates;
058import org.apache.hadoop.hbase.regionserver.HRegionServer;
059import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
060import org.apache.hadoop.hbase.regionserver.Region;
061import org.apache.hadoop.hbase.util.Bytes;
062import org.apache.hadoop.hbase.util.CommonFSUtils;
063import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
064import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
065import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
066import org.apache.hadoop.hbase.util.Threads;
067import org.apache.hadoop.hbase.wal.WAL;
068import org.apache.hadoop.hbase.wal.WALEdit;
069import org.apache.hadoop.hbase.wal.WALFactory;
070import org.apache.hadoop.hbase.wal.WALKeyImpl;
071import org.apache.hadoop.hbase.zookeeper.ZKUtil;
072import org.junit.After;
073import org.junit.AfterClass;
074import org.junit.Before;
075import org.junit.BeforeClass;
076import org.junit.Rule;
077import org.junit.Test;
078import org.junit.rules.TestName;
079import org.slf4j.Logger;
080import org.slf4j.LoggerFactory;
081
082import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
083
084/**
085 * Base class for testing distributed log splitting.
086 */
087public abstract class AbstractTestDLS {
088  private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogManager.class);
089
090  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
091
092  // Start a cluster with 2 masters and 5 regionservers
093  private static final int NUM_MASTERS = 2;
094  private static final int NUM_RS = 5;
095  private static byte[] COLUMN_FAMILY = Bytes.toBytes("family");
096
097  @Rule
098  public TestName testName = new TestName();
099
100  private TableName tableName;
101  private SingleProcessHBaseCluster cluster;
102  private HMaster master;
103  private Configuration conf;
104
105  @Rule
106  public TestName name = new TestName();
107
108  @BeforeClass
109  public static void setup() throws Exception {
110    // Uncomment the following line if more verbosity is needed for
111    // debugging (see HBASE-12285 for details).
112    // Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
113    TEST_UTIL.startMiniZKCluster();
114    TEST_UTIL.startMiniDFSCluster(3);
115  }
116
117  @AfterClass
118  public static void tearDown() throws Exception {
119    TEST_UTIL.shutdownMiniCluster();
120  }
121
122  protected abstract String getWalProvider();
123
124  private void startCluster(int numRS) throws Exception {
125    SplitLogCounters.resetCounters();
126    LOG.info("Starting cluster");
127    conf.setLong("hbase.splitlog.max.resubmit", 0);
128    // Make the failure test faster
129    conf.setInt("zookeeper.recovery.retry", 0);
130    conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1);
131    conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing
132    conf.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 3);
133    conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
134    conf.set("hbase.wal.provider", getWalProvider());
135    StartTestingClusterOption option =
136      StartTestingClusterOption.builder().numMasters(NUM_MASTERS).numRegionServers(numRS).build();
137    TEST_UTIL.startMiniHBaseCluster(option);
138    cluster = TEST_UTIL.getHBaseCluster();
139    LOG.info("Waiting for active/ready master");
140    cluster.waitForActiveAndReadyMaster();
141    master = cluster.getMaster();
142    TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
143      @Override
144      public boolean evaluate() throws Exception {
145        return cluster.getLiveRegionServerThreads().size() >= numRS;
146      }
147    });
148  }
149
150  @Before
151  public void before() throws Exception {
152    conf = TEST_UTIL.getConfiguration();
153    tableName = TableName.valueOf(testName.getMethodName());
154  }
155
156  @After
157  public void after() throws Exception {
158    TEST_UTIL.shutdownMiniHBaseCluster();
159    TEST_UTIL.getTestFileSystem().delete(CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration()),
160      true);
161    ZKUtil.deleteNodeRecursively(TEST_UTIL.getZooKeeperWatcher(), "/hbase");
162  }
163
164  @Test
165  public void testMasterStartsUpWithLogSplittingWork() throws Exception {
166    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1);
167    startCluster(NUM_RS);
168
169    int numRegionsToCreate = 40;
170    int numLogLines = 1000;
171    // turn off load balancing to prevent regions from moving around otherwise
172    // they will consume recovered.edits
173    master.balanceSwitch(false);
174
175    try (Table ht = installTable(numRegionsToCreate)) {
176      HRegionServer hrs = findRSToKill(false);
177      List<RegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
178      makeWAL(hrs, regions, numLogLines, 100);
179
180      // abort master
181      abortMaster(cluster);
182
183      // abort RS
184      LOG.info("Aborting region server: " + hrs.getServerName());
185      hrs.abort("testing");
186
187      // wait for abort completes
188      TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
189        @Override
190        public boolean evaluate() throws Exception {
191          return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 1;
192        }
193      });
194
195      Thread.sleep(2000);
196      LOG.info("Current Open Regions:" + HBaseTestingUtil.getAllOnlineRegions(cluster).size());
197
198      // wait for abort completes
199      TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
200        @Override
201        public boolean evaluate() throws Exception {
202          return (HBaseTestingUtil.getAllOnlineRegions(cluster).size() >= (numRegionsToCreate + 1));
203        }
204      });
205
206      LOG.info("Current Open Regions After Master Node Starts Up:"
207        + HBaseTestingUtil.getAllOnlineRegions(cluster).size());
208
209      assertEquals(numLogLines, TEST_UTIL.countRows(ht));
210    }
211  }
212
213  @Test
214  public void testThreeRSAbort() throws Exception {
215    LOG.info("testThreeRSAbort");
216    int numRegionsToCreate = 40;
217    int numRowsPerRegion = 100;
218
219    startCluster(NUM_RS); // NUM_RS=6.
220
221    try (Table table = installTable(numRegionsToCreate)) {
222      populateDataInTable(numRowsPerRegion);
223
224      List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
225      assertEquals(NUM_RS, rsts.size());
226      cluster.killRegionServer(rsts.get(0).getRegionServer().getServerName());
227      cluster.killRegionServer(rsts.get(1).getRegionServer().getServerName());
228      cluster.killRegionServer(rsts.get(2).getRegionServer().getServerName());
229
230      TEST_UTIL.waitFor(60000, new Waiter.ExplainingPredicate<Exception>() {
231
232        @Override
233        public boolean evaluate() throws Exception {
234          return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 3;
235        }
236
237        @Override
238        public String explainFailure() throws Exception {
239          return "Timed out waiting for server aborts.";
240        }
241      });
242      TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
243      int rows;
244      try {
245        rows = TEST_UTIL.countRows(table);
246      } catch (Exception e) {
247        Threads.printThreadInfo(System.out, "Thread dump before fail");
248        throw e;
249      }
250      assertEquals(numRegionsToCreate * numRowsPerRegion, rows);
251    }
252  }
253
254  @Test
255  public void testDelayedDeleteOnFailure() throws Exception {
256    if (
257      !this.conf.getBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
258        HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)
259    ) {
260      // This test depends on zk coordination....
261      return;
262    }
263    LOG.info("testDelayedDeleteOnFailure");
264    startCluster(1);
265    final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
266    final FileSystem fs = master.getMasterFileSystem().getFileSystem();
267    final Path rootLogDir =
268      new Path(CommonFSUtils.getWALRootDir(conf), HConstants.HREGION_LOGDIR_NAME);
269    final Path logDir = new Path(rootLogDir, ServerName.valueOf("x", 1, 1).toString());
270    fs.mkdirs(logDir);
271    ExecutorService executor = null;
272    try {
273      final Path corruptedLogFile = new Path(logDir, "x");
274      FSDataOutputStream out;
275      out = fs.create(corruptedLogFile);
276      out.write(0);
277      out.write(Bytes.toBytes("corrupted bytes"));
278      out.close();
279      ZKSplitLogManagerCoordination coordination =
280        (ZKSplitLogManagerCoordination) (master.getCoordinatedStateManager())
281          .getSplitLogManagerCoordination();
282      coordination.setIgnoreDeleteForTesting(true);
283      executor = Executors.newSingleThreadExecutor();
284      Runnable runnable = new Runnable() {
285        @Override
286        public void run() {
287          try {
288            // since the logDir is a fake, corrupted one, so the split log worker
289            // will finish it quickly with error, and this call will fail and throw
290            // an IOException.
291            slm.splitLogDistributed(logDir);
292          } catch (IOException ioe) {
293            try {
294              assertTrue(fs.exists(corruptedLogFile));
295              // this call will block waiting for the task to be removed from the
296              // tasks map which is not going to happen since ignoreZKDeleteForTesting
297              // is set to true, until it is interrupted.
298              slm.splitLogDistributed(logDir);
299            } catch (IOException e) {
300              assertTrue(Thread.currentThread().isInterrupted());
301              return;
302            }
303            fail("did not get the expected IOException from the 2nd call");
304          }
305          fail("did not get the expected IOException from the 1st call");
306        }
307      };
308      Future<?> result = executor.submit(runnable);
309      try {
310        result.get(2000, TimeUnit.MILLISECONDS);
311      } catch (TimeoutException te) {
312        // it is ok, expected.
313      }
314      waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000);
315      executor.shutdownNow();
316      executor = null;
317
318      // make sure the runnable is finished with no exception thrown.
319      result.get();
320    } finally {
321      if (executor != null) {
322        // interrupt the thread in case the test fails in the middle.
323        // it has no effect if the thread is already terminated.
324        executor.shutdownNow();
325      }
326      fs.delete(logDir, true);
327    }
328  }
329
330  private Table installTable(int nrs) throws Exception {
331    return installTable(nrs, 0);
332  }
333
334  private Table installTable(int nrs, int existingRegions) throws Exception {
335    // Create a table with regions
336    byte[] family = Bytes.toBytes("family");
337    LOG.info("Creating table with " + nrs + " regions");
338    Table table = TEST_UTIL.createMultiRegionTable(tableName, family, nrs);
339    int numRegions = -1;
340    try (RegionLocator r = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
341      numRegions = r.getStartKeys().length;
342    }
343    assertEquals(nrs, numRegions);
344    LOG.info("Waiting for no more RIT\n");
345    blockUntilNoRIT();
346    // disable-enable cycle to get rid of table's dead regions left behind
347    // by createMultiRegions
348    assertTrue(TEST_UTIL.getAdmin().isTableEnabled(tableName));
349    LOG.debug("Disabling table\n");
350    TEST_UTIL.getAdmin().disableTable(tableName);
351    LOG.debug("Waiting for no more RIT\n");
352    blockUntilNoRIT();
353    NavigableSet<String> regions = HBaseTestingUtil.getAllOnlineRegions(cluster);
354    LOG.debug("Verifying only catalog region is assigned\n");
355    if (regions.size() != 1) {
356      for (String oregion : regions) {
357        LOG.debug("Region still online: " + oregion);
358      }
359    }
360    assertEquals(1 + existingRegions, regions.size());
361    LOG.debug("Enabling table\n");
362    TEST_UTIL.getAdmin().enableTable(tableName);
363    LOG.debug("Waiting for no more RIT\n");
364    blockUntilNoRIT();
365    LOG.debug("Verifying there are " + numRegions + " assigned on cluster\n");
366    regions = HBaseTestingUtil.getAllOnlineRegions(cluster);
367    assertEquals(numRegions + 1 + existingRegions, regions.size());
368    return table;
369  }
370
371  void populateDataInTable(int nrows) throws Exception {
372    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
373    assertEquals(NUM_RS, rsts.size());
374
375    for (RegionServerThread rst : rsts) {
376      HRegionServer hrs = rst.getRegionServer();
377      List<RegionInfo> hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
378      for (RegionInfo hri : hris) {
379        if (hri.getTable().isSystemTable()) {
380          continue;
381        }
382        LOG.debug(
383          "adding data to rs = " + rst.getName() + " region = " + hri.getRegionNameAsString());
384        Region region = hrs.getOnlineRegion(hri.getRegionName());
385        assertTrue(region != null);
386        putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY);
387      }
388    }
389  }
390
391  public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int num_edits, int edit_size)
392    throws IOException {
393    makeWAL(hrs, regions, num_edits, edit_size, true);
394  }
395
396  public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int numEdits, int editSize,
397    boolean cleanShutdown) throws IOException {
398    // remove root and meta region
399    regions.remove(RegionInfoBuilder.FIRST_META_REGIONINFO);
400
401    for (Iterator<RegionInfo> iter = regions.iterator(); iter.hasNext();) {
402      RegionInfo regionInfo = iter.next();
403      if (regionInfo.getTable().isSystemTable()) {
404        iter.remove();
405      }
406    }
407    byte[] value = new byte[editSize];
408
409    List<RegionInfo> hris = new ArrayList<>();
410    for (RegionInfo region : regions) {
411      if (region.getTable() != tableName) {
412        continue;
413      }
414      hris.add(region);
415    }
416    LOG.info("Creating wal edits across " + hris.size() + " regions.");
417    for (int i = 0; i < editSize; i++) {
418      value[i] = (byte) ('a' + (i % 26));
419    }
420    int n = hris.size();
421    int[] counts = new int[n];
422    // sync every ~30k to line up with desired wal rolls
423    final int syncEvery = 30 * 1024 / editSize;
424    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
425    if (n > 0) {
426      for (int i = 0; i < numEdits; i += 1) {
427        WALEdit e = new WALEdit();
428        RegionInfo curRegionInfo = hris.get(i % n);
429        WAL log = hrs.getWAL(curRegionInfo);
430        byte[] startRow = curRegionInfo.getStartKey();
431        if (startRow == null || startRow.length == 0) {
432          startRow = new byte[] { 0, 0, 0, 0, 1 };
433        }
434        byte[] row = Bytes.incrementBytes(startRow, counts[i % n]);
435        row = Arrays.copyOfRange(row, 3, 8); // use last 5 bytes because
436        // HBaseTestingUtility.createMultiRegions use 5 bytes key
437        byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i));
438        e.add(
439          new KeyValue(row, COLUMN_FAMILY, qualifier, EnvironmentEdgeManager.currentTime(), value));
440        log.appendData(curRegionInfo, new WALKeyImpl(curRegionInfo.getEncodedNameAsBytes(),
441          tableName, EnvironmentEdgeManager.currentTime(), mvcc), e);
442        if (0 == i % syncEvery) {
443          log.sync();
444        }
445        counts[i % n] += 1;
446      }
447    }
448    // done as two passes because the regions might share logs. shutdown is idempotent, but sync
449    // will cause errors if done after.
450    for (RegionInfo info : hris) {
451      WAL log = hrs.getWAL(info);
452      log.sync();
453    }
454    if (cleanShutdown) {
455      for (RegionInfo info : hris) {
456        WAL log = hrs.getWAL(info);
457        log.shutdown();
458      }
459    }
460    for (int i = 0; i < n; i++) {
461      LOG.info("region " + hris.get(i).getRegionNameAsString() + " has " + counts[i] + " edits");
462    }
463    return;
464  }
465
466  private int countWAL(Path log, FileSystem fs, Configuration conf) throws IOException {
467    int count = 0;
468    try (WAL.Reader in = WALFactory.createReader(fs, log, conf)) {
469      WAL.Entry e;
470      while ((e = in.next()) != null) {
471        if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) {
472          count++;
473        }
474      }
475    }
476    return count;
477  }
478
479  private void blockUntilNoRIT() throws Exception {
480    TEST_UTIL.waitUntilNoRegionsInTransition(60000);
481  }
482
483  private void putData(Region region, byte[] startRow, int numRows, byte[] qf, byte[]... families)
484    throws IOException {
485    for (int i = 0; i < numRows; i++) {
486      Put put = new Put(Bytes.add(startRow, Bytes.toBytes(i)));
487      for (byte[] family : families) {
488        put.addColumn(family, qf, null);
489      }
490      region.put(put);
491    }
492  }
493
494  private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems)
495    throws InterruptedException {
496    long curt = EnvironmentEdgeManager.currentTime();
497    long endt = curt + timems;
498    while (curt < endt) {
499      if (ctr.sum() == oldval) {
500        Thread.sleep(100);
501        curt = EnvironmentEdgeManager.currentTime();
502      } else {
503        assertEquals(newval, ctr.sum());
504        return;
505      }
506    }
507    fail();
508  }
509
510  private void abortMaster(SingleProcessHBaseCluster cluster) throws InterruptedException {
511    for (MasterThread mt : cluster.getLiveMasterThreads()) {
512      if (mt.getMaster().isActiveMaster()) {
513        mt.getMaster().abort("Aborting for tests", new Exception("Trace info"));
514        mt.join();
515        break;
516      }
517    }
518    LOG.debug("Master is aborted");
519  }
520
521  /**
522   * Find a RS that has regions of a table.
523   * @param hasMetaRegion when true, the returned RS has hbase:meta region as well
524   */
525  private HRegionServer findRSToKill(boolean hasMetaRegion) throws Exception {
526    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
527    List<RegionInfo> regions = null;
528    HRegionServer hrs = null;
529
530    for (RegionServerThread rst : rsts) {
531      hrs = rst.getRegionServer();
532      while (rst.isAlive() && !hrs.isOnline()) {
533        Thread.sleep(100);
534      }
535      if (!rst.isAlive()) {
536        continue;
537      }
538      boolean isCarryingMeta = false;
539      boolean foundTableRegion = false;
540      regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
541      for (RegionInfo region : regions) {
542        if (region.isMetaRegion()) {
543          isCarryingMeta = true;
544        }
545        if (region.getTable() == tableName) {
546          foundTableRegion = true;
547        }
548        if (foundTableRegion && (isCarryingMeta || !hasMetaRegion)) {
549          break;
550        }
551      }
552      if (isCarryingMeta && hasMetaRegion) {
553        // clients ask for a RS with META
554        if (!foundTableRegion) {
555          HRegionServer destRS = hrs;
556          // the RS doesn't have regions of the specified table so we need move one to this RS
557          List<RegionInfo> tableRegions = TEST_UTIL.getAdmin().getRegions(tableName);
558          RegionInfo hri = tableRegions.get(0);
559          TEST_UTIL.getAdmin().move(hri.getEncodedNameAsBytes(), destRS.getServerName());
560          // wait for region move completes
561          RegionStates regionStates =
562            TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
563          TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() {
564            @Override
565            public boolean evaluate() throws Exception {
566              ServerName sn = regionStates.getRegionServerOfRegion(hri);
567              return (sn != null && sn.equals(destRS.getServerName()));
568            }
569          });
570        }
571        return hrs;
572      } else if (hasMetaRegion || isCarryingMeta) {
573        continue;
574      }
575      if (foundTableRegion) {
576        break;
577      }
578    }
579
580    return hrs;
581  }
582}