001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.Iterator;
029import java.util.List;
030import java.util.NavigableSet;
031import java.util.concurrent.atomic.LongAdder;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.HBaseTestingUtility;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.KeyValue;
038import org.apache.hadoop.hbase.MiniHBaseCluster;
039import org.apache.hadoop.hbase.ServerName;
040import org.apache.hadoop.hbase.SplitLogCounters;
041import org.apache.hadoop.hbase.StartMiniClusterOption;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.Waiter;
044import org.apache.hadoop.hbase.client.Put;
045import org.apache.hadoop.hbase.client.RegionInfo;
046import org.apache.hadoop.hbase.client.RegionInfoBuilder;
047import org.apache.hadoop.hbase.client.RegionLocator;
048import org.apache.hadoop.hbase.client.Table;
049import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
050import org.apache.hadoop.hbase.master.assignment.RegionStates;
051import org.apache.hadoop.hbase.regionserver.HRegionServer;
052import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
053import org.apache.hadoop.hbase.regionserver.Region;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.apache.hadoop.hbase.util.CommonFSUtils;
056import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
057import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
058import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
059import org.apache.hadoop.hbase.util.Threads;
060import org.apache.hadoop.hbase.wal.WAL;
061import org.apache.hadoop.hbase.wal.WALEdit;
062import org.apache.hadoop.hbase.wal.WALFactory;
063import org.apache.hadoop.hbase.wal.WALKeyImpl;
064import org.apache.hadoop.hbase.zookeeper.ZKUtil;
065import org.junit.After;
066import org.junit.AfterClass;
067import org.junit.Before;
068import org.junit.BeforeClass;
069import org.junit.Rule;
070import org.junit.Test;
071import org.junit.rules.TestName;
072import org.slf4j.Logger;
073import org.slf4j.LoggerFactory;
074
075import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
076
077/**
078 * Base class for testing distributed log splitting.
079 */
080public abstract class AbstractTestDLS {
081  private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogManager.class);
082
083  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
084
085  // Start a cluster with 2 masters and 5 regionservers
086  private static final int NUM_MASTERS = 2;
087  private static final int NUM_RS = 5;
088  private static byte[] COLUMN_FAMILY = Bytes.toBytes("family");
089
090  @Rule
091  public TestName testName = new TestName();
092
093  private TableName tableName;
094  private MiniHBaseCluster cluster;
095  private HMaster master;
096  private Configuration conf;
097
098  @Rule
099  public TestName name = new TestName();
100
101  @BeforeClass
102  public static void setup() throws Exception {
103    // Uncomment the following line if more verbosity is needed for
104    // debugging (see HBASE-12285 for details).
105    // Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
106    TEST_UTIL.startMiniZKCluster();
107    TEST_UTIL.startMiniDFSCluster(3);
108  }
109
110  @AfterClass
111  public static void tearDown() throws Exception {
112    TEST_UTIL.shutdownMiniCluster();
113  }
114
115  protected abstract String getWalProvider();
116
117  private void startCluster(int numRS) throws Exception {
118    SplitLogCounters.resetCounters();
119    LOG.info("Starting cluster");
120    conf.setLong("hbase.splitlog.max.resubmit", 0);
121    // Make the failure test faster
122    conf.setInt("zookeeper.recovery.retry", 0);
123    conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1);
124    conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing
125    conf.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 3);
126    conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
127    conf.set("hbase.wal.provider", getWalProvider());
128    StartMiniClusterOption option =
129      StartMiniClusterOption.builder().numMasters(NUM_MASTERS).numRegionServers(numRS).build();
130    TEST_UTIL.startMiniHBaseCluster(option);
131    cluster = TEST_UTIL.getHBaseCluster();
132    LOG.info("Waiting for active/ready master");
133    cluster.waitForActiveAndReadyMaster();
134    master = cluster.getMaster();
135    TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
136      @Override
137      public boolean evaluate() throws Exception {
138        return cluster.getLiveRegionServerThreads().size() >= numRS;
139      }
140    });
141  }
142
143  @Before
144  public void before() throws Exception {
145    conf = TEST_UTIL.getConfiguration();
146    tableName = TableName.valueOf(testName.getMethodName());
147  }
148
149  @After
150  public void after() throws Exception {
151    TEST_UTIL.shutdownMiniHBaseCluster();
152    TEST_UTIL.getTestFileSystem().delete(CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration()),
153      true);
154    ZKUtil.deleteNodeRecursively(TEST_UTIL.getZooKeeperWatcher(), "/hbase");
155  }
156
157  @Test
158  public void testMasterStartsUpWithLogSplittingWork() throws Exception {
159    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1);
160    startCluster(NUM_RS);
161
162    int numRegionsToCreate = 40;
163    int numLogLines = 1000;
164    // turn off load balancing to prevent regions from moving around otherwise
165    // they will consume recovered.edits
166    master.balanceSwitch(false);
167
168    try (Table ht = installTable(numRegionsToCreate)) {
169      HRegionServer hrs = findRSToKill(false);
170      List<RegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
171      makeWAL(hrs, regions, numLogLines, 100);
172
173      // abort master
174      abortMaster(cluster);
175
176      // abort RS
177      LOG.info("Aborting region server: " + hrs.getServerName());
178      hrs.abort("testing");
179
180      // wait for abort completes
181      TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
182        @Override
183        public boolean evaluate() throws Exception {
184          return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 1;
185        }
186      });
187
188      Thread.sleep(2000);
189      LOG.info("Current Open Regions:" + HBaseTestingUtility.getAllOnlineRegions(cluster).size());
190
191      // wait for abort completes
192      TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
193        @Override
194        public boolean evaluate() throws Exception {
195          return (HBaseTestingUtility.getAllOnlineRegions(cluster).size()
196              >= (numRegionsToCreate + 1));
197        }
198      });
199
200      LOG.info("Current Open Regions After Master Node Starts Up:"
201        + HBaseTestingUtility.getAllOnlineRegions(cluster).size());
202
203      assertEquals(numLogLines, TEST_UTIL.countRows(ht));
204    }
205  }
206
207  @Test
208  public void testThreeRSAbort() throws Exception {
209    LOG.info("testThreeRSAbort");
210    int numRegionsToCreate = 40;
211    int numRowsPerRegion = 100;
212
213    startCluster(NUM_RS); // NUM_RS=6.
214
215    try (Table table = installTable(numRegionsToCreate)) {
216      populateDataInTable(numRowsPerRegion);
217
218      List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
219      assertEquals(NUM_RS, rsts.size());
220      cluster.killRegionServer(rsts.get(0).getRegionServer().getServerName());
221      cluster.killRegionServer(rsts.get(1).getRegionServer().getServerName());
222      cluster.killRegionServer(rsts.get(2).getRegionServer().getServerName());
223
224      TEST_UTIL.waitFor(60000, new Waiter.ExplainingPredicate<Exception>() {
225
226        @Override
227        public boolean evaluate() throws Exception {
228          return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 3;
229        }
230
231        @Override
232        public String explainFailure() throws Exception {
233          return "Timed out waiting for server aborts.";
234        }
235      });
236      TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
237      int rows;
238      try {
239        rows = TEST_UTIL.countRows(table);
240      } catch (Exception e) {
241        Threads.printThreadInfo(System.out, "Thread dump before fail");
242        throw e;
243      }
244      assertEquals(numRegionsToCreate * numRowsPerRegion, rows);
245    }
246  }
247
248  private Table installTable(int nrs) throws Exception {
249    return installTable(nrs, 0);
250  }
251
252  private Table installTable(int nrs, int existingRegions) throws Exception {
253    // Create a table with regions
254    byte[] family = Bytes.toBytes("family");
255    LOG.info("Creating table with " + nrs + " regions");
256    Table table = TEST_UTIL.createMultiRegionTable(tableName, family, nrs);
257    int numRegions = -1;
258    try (RegionLocator r = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
259      numRegions = r.getStartKeys().length;
260    }
261    assertEquals(nrs, numRegions);
262    LOG.info("Waiting for no more RIT\n");
263    blockUntilNoRIT();
264    // disable-enable cycle to get rid of table's dead regions left behind
265    // by createMultiRegions
266    assertTrue(TEST_UTIL.getAdmin().isTableEnabled(tableName));
267    LOG.debug("Disabling table\n");
268    TEST_UTIL.getAdmin().disableTable(tableName);
269    LOG.debug("Waiting for no more RIT\n");
270    blockUntilNoRIT();
271    NavigableSet<String> regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
272    LOG.debug("Verifying only catalog and namespace regions are assigned\n");
273    if (regions.size() != 2) {
274      for (String oregion : regions) {
275        LOG.debug("Region still online: " + oregion);
276      }
277    }
278    assertEquals(2 + existingRegions, regions.size());
279    LOG.debug("Enabling table\n");
280    TEST_UTIL.getAdmin().enableTable(tableName);
281    LOG.debug("Waiting for no more RIT\n");
282    blockUntilNoRIT();
283    LOG.debug("Verifying there are " + numRegions + " assigned on cluster\n");
284    regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
285    assertEquals(numRegions + 2 + existingRegions, regions.size());
286    return table;
287  }
288
289  void populateDataInTable(int nrows) throws Exception {
290    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
291    assertEquals(NUM_RS, rsts.size());
292
293    for (RegionServerThread rst : rsts) {
294      HRegionServer hrs = rst.getRegionServer();
295      List<RegionInfo> hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
296      for (RegionInfo hri : hris) {
297        if (hri.getTable().isSystemTable()) {
298          continue;
299        }
300        LOG.debug(
301          "adding data to rs = " + rst.getName() + " region = " + hri.getRegionNameAsString());
302        Region region = hrs.getOnlineRegion(hri.getRegionName());
303        assertTrue(region != null);
304        putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY);
305      }
306    }
307
308    for (MasterThread mt : cluster.getLiveMasterThreads()) {
309      HRegionServer hrs = mt.getMaster();
310      List<RegionInfo> hris;
311      try {
312        hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
313      } catch (ServerNotRunningYetException e) {
314        // It's ok: this master may be a backup. Ignored.
315        continue;
316      }
317      for (RegionInfo hri : hris) {
318        if (hri.getTable().isSystemTable()) {
319          continue;
320        }
321        LOG.debug(
322          "adding data to rs = " + mt.getName() + " region = " + hri.getRegionNameAsString());
323        Region region = hrs.getOnlineRegion(hri.getRegionName());
324        assertTrue(region != null);
325        putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY);
326      }
327    }
328  }
329
330  public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int num_edits, int edit_size)
331    throws IOException {
332    makeWAL(hrs, regions, num_edits, edit_size, true);
333  }
334
335  public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int numEdits, int editSize,
336    boolean cleanShutdown) throws IOException {
337    // remove root and meta region
338    regions.remove(RegionInfoBuilder.FIRST_META_REGIONINFO);
339
340    for (Iterator<RegionInfo> iter = regions.iterator(); iter.hasNext();) {
341      RegionInfo regionInfo = iter.next();
342      if (regionInfo.getTable().isSystemTable()) {
343        iter.remove();
344      }
345    }
346    byte[] value = new byte[editSize];
347
348    List<RegionInfo> hris = new ArrayList<>();
349    for (RegionInfo region : regions) {
350      if (region.getTable() != tableName) {
351        continue;
352      }
353      hris.add(region);
354    }
355    LOG.info("Creating wal edits across " + hris.size() + " regions.");
356    for (int i = 0; i < editSize; i++) {
357      value[i] = (byte) ('a' + (i % 26));
358    }
359    int n = hris.size();
360    int[] counts = new int[n];
361    // sync every ~30k to line up with desired wal rolls
362    final int syncEvery = 30 * 1024 / editSize;
363    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
364    if (n > 0) {
365      for (int i = 0; i < numEdits; i += 1) {
366        WALEdit e = new WALEdit();
367        RegionInfo curRegionInfo = hris.get(i % n);
368        WAL log = hrs.getWAL(curRegionInfo);
369        byte[] startRow = curRegionInfo.getStartKey();
370        if (startRow == null || startRow.length == 0) {
371          startRow = new byte[] { 0, 0, 0, 0, 1 };
372        }
373        byte[] row = Bytes.incrementBytes(startRow, counts[i % n]);
374        row = Arrays.copyOfRange(row, 3, 8); // use last 5 bytes because
375        // HBaseTestingUtility.createMultiRegions use 5 bytes key
376        byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i));
377        e.add(
378          new KeyValue(row, COLUMN_FAMILY, qualifier, EnvironmentEdgeManager.currentTime(), value));
379        log.appendData(curRegionInfo, new WALKeyImpl(curRegionInfo.getEncodedNameAsBytes(),
380          tableName, EnvironmentEdgeManager.currentTime(), mvcc), e);
381        if (0 == i % syncEvery) {
382          log.sync();
383        }
384        counts[i % n] += 1;
385      }
386    }
387    // done as two passes because the regions might share logs. shutdown is idempotent, but sync
388    // will cause errors if done after.
389    for (RegionInfo info : hris) {
390      WAL log = hrs.getWAL(info);
391      log.sync();
392    }
393    if (cleanShutdown) {
394      for (RegionInfo info : hris) {
395        WAL log = hrs.getWAL(info);
396        log.shutdown();
397      }
398    }
399    for (int i = 0; i < n; i++) {
400      LOG.info("region " + hris.get(i).getRegionNameAsString() + " has " + counts[i] + " edits");
401    }
402    return;
403  }
404
405  private int countWAL(Path log, FileSystem fs, Configuration conf) throws IOException {
406    int count = 0;
407    try (WAL.Reader in = WALFactory.createReader(fs, log, conf)) {
408      WAL.Entry e;
409      while ((e = in.next()) != null) {
410        if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) {
411          count++;
412        }
413      }
414    }
415    return count;
416  }
417
418  private void blockUntilNoRIT() throws Exception {
419    TEST_UTIL.waitUntilNoRegionsInTransition(60000);
420  }
421
422  private void putData(Region region, byte[] startRow, int numRows, byte[] qf, byte[]... families)
423    throws IOException {
424    for (int i = 0; i < numRows; i++) {
425      Put put = new Put(Bytes.add(startRow, Bytes.toBytes(i)));
426      for (byte[] family : families) {
427        put.addColumn(family, qf, null);
428      }
429      region.put(put);
430    }
431  }
432
433  private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems)
434    throws InterruptedException {
435    long curt = EnvironmentEdgeManager.currentTime();
436    long endt = curt + timems;
437    while (curt < endt) {
438      if (ctr.sum() == oldval) {
439        Thread.sleep(100);
440        curt = EnvironmentEdgeManager.currentTime();
441      } else {
442        assertEquals(newval, ctr.sum());
443        return;
444      }
445    }
446    fail();
447  }
448
449  private void abortMaster(MiniHBaseCluster cluster) throws InterruptedException {
450    for (MasterThread mt : cluster.getLiveMasterThreads()) {
451      if (mt.getMaster().isActiveMaster()) {
452        mt.getMaster().abort("Aborting for tests", new Exception("Trace info"));
453        mt.join();
454        break;
455      }
456    }
457    LOG.debug("Master is aborted");
458  }
459
460  /**
461   * Find a RS that has regions of a table.
462   * @param hasMetaRegion when true, the returned RS has hbase:meta region as well
463   */
464  private HRegionServer findRSToKill(boolean hasMetaRegion) throws Exception {
465    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
466    List<RegionInfo> regions = null;
467    HRegionServer hrs = null;
468
469    for (RegionServerThread rst : rsts) {
470      hrs = rst.getRegionServer();
471      while (rst.isAlive() && !hrs.isOnline()) {
472        Thread.sleep(100);
473      }
474      if (!rst.isAlive()) {
475        continue;
476      }
477      boolean isCarryingMeta = false;
478      boolean foundTableRegion = false;
479      regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
480      for (RegionInfo region : regions) {
481        if (region.isMetaRegion()) {
482          isCarryingMeta = true;
483        }
484        if (region.getTable() == tableName) {
485          foundTableRegion = true;
486        }
487        if (foundTableRegion && (isCarryingMeta || !hasMetaRegion)) {
488          break;
489        }
490      }
491      if (isCarryingMeta && hasMetaRegion) {
492        // clients ask for a RS with META
493        if (!foundTableRegion) {
494          HRegionServer destRS = hrs;
495          // the RS doesn't have regions of the specified table so we need move one to this RS
496          List<RegionInfo> tableRegions = TEST_UTIL.getAdmin().getRegions(tableName);
497          RegionInfo hri = tableRegions.get(0);
498          TEST_UTIL.getAdmin().move(hri.getEncodedNameAsBytes(), destRS.getServerName());
499          // wait for region move completes
500          RegionStates regionStates =
501            TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
502          TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() {
503            @Override
504            public boolean evaluate() throws Exception {
505              ServerName sn = regionStates.getRegionServerOfRegion(hri);
506              return (sn != null && sn.equals(destRS.getServerName()));
507            }
508          });
509        }
510        return hrs;
511      } else if (hasMetaRegion || isCarryingMeta) {
512        continue;
513      }
514      if (foundTableRegion) {
515        break;
516      }
517    }
518
519    return hrs;
520  }
521}