001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.Iterator;
028import java.util.List;
029import java.util.NavigableSet;
030import java.util.concurrent.atomic.LongAdder;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.HBaseTestingUtility;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.KeyValue;
037import org.apache.hadoop.hbase.MiniHBaseCluster;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.SplitLogCounters;
040import org.apache.hadoop.hbase.StartMiniClusterOption;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.Waiter;
043import org.apache.hadoop.hbase.client.Put;
044import org.apache.hadoop.hbase.client.RegionInfo;
045import org.apache.hadoop.hbase.client.RegionInfoBuilder;
046import org.apache.hadoop.hbase.client.RegionLocator;
047import org.apache.hadoop.hbase.client.Table;
048import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
049import org.apache.hadoop.hbase.master.assignment.RegionStates;
050import org.apache.hadoop.hbase.regionserver.HRegionServer;
051import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
052import org.apache.hadoop.hbase.regionserver.Region;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.apache.hadoop.hbase.util.CommonFSUtils;
055import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
056import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
057import org.apache.hadoop.hbase.util.Threads;
058import org.apache.hadoop.hbase.wal.WAL;
059import org.apache.hadoop.hbase.wal.WALEdit;
060import org.apache.hadoop.hbase.wal.WALFactory;
061import org.apache.hadoop.hbase.wal.WALKeyImpl;
062import org.apache.hadoop.hbase.zookeeper.ZKUtil;
063import org.junit.After;
064import org.junit.AfterClass;
065import org.junit.Before;
066import org.junit.BeforeClass;
067import org.junit.Rule;
068import org.junit.Test;
069import org.junit.rules.TestName;
070import org.slf4j.Logger;
071import org.slf4j.LoggerFactory;
072import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
073
074/**
075 * Base class for testing distributed log splitting.
076 */
077public abstract class AbstractTestDLS {
078  private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogManager.class);
079
080  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
081
082  // Start a cluster with 2 masters and 5 regionservers
083  private static final int NUM_MASTERS = 2;
084  private static final int NUM_RS = 5;
085  private static byte[] COLUMN_FAMILY = Bytes.toBytes("family");
086
087  @Rule
088  public TestName testName = new TestName();
089
090  private TableName tableName;
091  private MiniHBaseCluster cluster;
092  private HMaster master;
093  private Configuration conf;
094
095  @Rule
096  public TestName name = new TestName();
097
098  @BeforeClass
099  public static void setup() throws Exception {
100    // Uncomment the following line if more verbosity is needed for
101    // debugging (see HBASE-12285 for details).
102    // Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
103    TEST_UTIL.startMiniZKCluster();
104    TEST_UTIL.startMiniDFSCluster(3);
105  }
106
107  @AfterClass
108  public static void tearDown() throws Exception {
109    TEST_UTIL.shutdownMiniCluster();
110  }
111
112  protected abstract String getWalProvider();
113
114  private void startCluster(int numRS) throws Exception {
115    SplitLogCounters.resetCounters();
116    LOG.info("Starting cluster");
117    conf.setLong("hbase.splitlog.max.resubmit", 0);
118    // Make the failure test faster
119    conf.setInt("zookeeper.recovery.retry", 0);
120    conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1);
121    conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing
122    conf.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 3);
123    conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
124    conf.set("hbase.wal.provider", getWalProvider());
125    StartMiniClusterOption option = StartMiniClusterOption.builder()
126        .numMasters(NUM_MASTERS).numRegionServers(numRS).build();
127    TEST_UTIL.startMiniHBaseCluster(option);
128    cluster = TEST_UTIL.getHBaseCluster();
129    LOG.info("Waiting for active/ready master");
130    cluster.waitForActiveAndReadyMaster();
131    master = cluster.getMaster();
132    TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
133      @Override
134      public boolean evaluate() throws Exception {
135        return cluster.getLiveRegionServerThreads().size() >= numRS;
136      }
137    });
138  }
139
140  @Before
141  public void before() throws Exception {
142    conf = TEST_UTIL.getConfiguration();
143    tableName = TableName.valueOf(testName.getMethodName());
144  }
145
146  @After
147  public void after() throws Exception {
148    TEST_UTIL.shutdownMiniHBaseCluster();
149    TEST_UTIL.getTestFileSystem().delete(CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration()),
150      true);
151    ZKUtil.deleteNodeRecursively(TEST_UTIL.getZooKeeperWatcher(), "/hbase");
152  }
153
154  @Test
155  public void testMasterStartsUpWithLogSplittingWork() throws Exception {
156    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1);
157    startCluster(NUM_RS);
158
159    int numRegionsToCreate = 40;
160    int numLogLines = 1000;
161    // turn off load balancing to prevent regions from moving around otherwise
162    // they will consume recovered.edits
163    master.balanceSwitch(false);
164
165    try (Table ht = installTable(numRegionsToCreate)) {
166      HRegionServer hrs = findRSToKill(false);
167      List<RegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
168      makeWAL(hrs, regions, numLogLines, 100);
169
170      // abort master
171      abortMaster(cluster);
172
173      // abort RS
174      LOG.info("Aborting region server: " + hrs.getServerName());
175      hrs.abort("testing");
176
177      // wait for abort completes
178      TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
179        @Override
180        public boolean evaluate() throws Exception {
181          return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 1;
182        }
183      });
184
185      Thread.sleep(2000);
186      LOG.info("Current Open Regions:" + HBaseTestingUtility.getAllOnlineRegions(cluster).size());
187
188      // wait for abort completes
189      TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
190        @Override
191        public boolean evaluate() throws Exception {
192          return (HBaseTestingUtility.getAllOnlineRegions(cluster)
193              .size() >= (numRegionsToCreate + 1));
194        }
195      });
196
197      LOG.info("Current Open Regions After Master Node Starts Up:" +
198          HBaseTestingUtility.getAllOnlineRegions(cluster).size());
199
200      assertEquals(numLogLines, TEST_UTIL.countRows(ht));
201    }
202  }
203
204  @Test
205  public void testThreeRSAbort() throws Exception {
206    LOG.info("testThreeRSAbort");
207    int numRegionsToCreate = 40;
208    int numRowsPerRegion = 100;
209
210    startCluster(NUM_RS); // NUM_RS=6.
211
212    try (Table table = installTable(numRegionsToCreate)) {
213      populateDataInTable(numRowsPerRegion);
214
215      List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
216      assertEquals(NUM_RS, rsts.size());
217      cluster.killRegionServer(rsts.get(0).getRegionServer().getServerName());
218      cluster.killRegionServer(rsts.get(1).getRegionServer().getServerName());
219      cluster.killRegionServer(rsts.get(2).getRegionServer().getServerName());
220
221      TEST_UTIL.waitFor(60000, new Waiter.ExplainingPredicate<Exception>() {
222
223        @Override
224        public boolean evaluate() throws Exception {
225          return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 3;
226        }
227
228        @Override
229        public String explainFailure() throws Exception {
230          return "Timed out waiting for server aborts.";
231        }
232      });
233      TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
234      int rows;
235      try {
236        rows = TEST_UTIL.countRows(table);
237      } catch (Exception e) {
238        Threads.printThreadInfo(System.out, "Thread dump before fail");
239        throw e;
240      }
241      assertEquals(numRegionsToCreate * numRowsPerRegion, rows);
242    }
243  }
244
245  private Table installTable(int nrs) throws Exception {
246    return installTable(nrs, 0);
247  }
248
249  private Table installTable(int nrs, int existingRegions) throws Exception {
250    // Create a table with regions
251    byte[] family = Bytes.toBytes("family");
252    LOG.info("Creating table with " + nrs + " regions");
253    Table table = TEST_UTIL.createMultiRegionTable(tableName, family, nrs);
254    int numRegions = -1;
255    try (RegionLocator r = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
256      numRegions = r.getStartKeys().length;
257    }
258    assertEquals(nrs, numRegions);
259    LOG.info("Waiting for no more RIT\n");
260    blockUntilNoRIT();
261    // disable-enable cycle to get rid of table's dead regions left behind
262    // by createMultiRegions
263    assertTrue(TEST_UTIL.getAdmin().isTableEnabled(tableName));
264    LOG.debug("Disabling table\n");
265    TEST_UTIL.getAdmin().disableTable(tableName);
266    LOG.debug("Waiting for no more RIT\n");
267    blockUntilNoRIT();
268    NavigableSet<String> regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
269    LOG.debug("Verifying only catalog and namespace regions are assigned\n");
270    if (regions.size() != 2) {
271      for (String oregion : regions) {
272        LOG.debug("Region still online: " + oregion);
273      }
274    }
275    assertEquals(2 + existingRegions, regions.size());
276    LOG.debug("Enabling table\n");
277    TEST_UTIL.getAdmin().enableTable(tableName);
278    LOG.debug("Waiting for no more RIT\n");
279    blockUntilNoRIT();
280    LOG.debug("Verifying there are " + numRegions + " assigned on cluster\n");
281    regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
282    assertEquals(numRegions + 2 + existingRegions, regions.size());
283    return table;
284  }
285
286  void populateDataInTable(int nrows) throws Exception {
287    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
288    assertEquals(NUM_RS, rsts.size());
289
290    for (RegionServerThread rst : rsts) {
291      HRegionServer hrs = rst.getRegionServer();
292      List<RegionInfo> hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
293      for (RegionInfo hri : hris) {
294        if (hri.getTable().isSystemTable()) {
295          continue;
296        }
297        LOG.debug(
298          "adding data to rs = " + rst.getName() + " region = " + hri.getRegionNameAsString());
299        Region region = hrs.getOnlineRegion(hri.getRegionName());
300        assertTrue(region != null);
301        putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY);
302      }
303    }
304
305    for (MasterThread mt : cluster.getLiveMasterThreads()) {
306      HRegionServer hrs = mt.getMaster();
307      List<RegionInfo> hris;
308      try {
309        hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
310      } catch (ServerNotRunningYetException e) {
311        // It's ok: this master may be a backup. Ignored.
312        continue;
313      }
314      for (RegionInfo hri : hris) {
315        if (hri.getTable().isSystemTable()) {
316          continue;
317        }
318        LOG.debug(
319          "adding data to rs = " + mt.getName() + " region = " + hri.getRegionNameAsString());
320        Region region = hrs.getOnlineRegion(hri.getRegionName());
321        assertTrue(region != null);
322        putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY);
323      }
324    }
325  }
326
327  public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int num_edits, int edit_size)
328      throws IOException {
329    makeWAL(hrs, regions, num_edits, edit_size, true);
330  }
331
332  public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int numEdits, int editSize,
333      boolean cleanShutdown) throws IOException {
334    // remove root and meta region
335    regions.remove(RegionInfoBuilder.FIRST_META_REGIONINFO);
336
337    for (Iterator<RegionInfo> iter = regions.iterator(); iter.hasNext();) {
338      RegionInfo regionInfo = iter.next();
339      if (regionInfo.getTable().isSystemTable()) {
340        iter.remove();
341      }
342    }
343    byte[] value = new byte[editSize];
344
345    List<RegionInfo> hris = new ArrayList<>();
346    for (RegionInfo region : regions) {
347      if (region.getTable() != tableName) {
348        continue;
349      }
350      hris.add(region);
351    }
352    LOG.info("Creating wal edits across " + hris.size() + " regions.");
353    for (int i = 0; i < editSize; i++) {
354      value[i] = (byte) ('a' + (i % 26));
355    }
356    int n = hris.size();
357    int[] counts = new int[n];
358    // sync every ~30k to line up with desired wal rolls
359    final int syncEvery = 30 * 1024 / editSize;
360    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
361    if (n > 0) {
362      for (int i = 0; i < numEdits; i += 1) {
363        WALEdit e = new WALEdit();
364        RegionInfo curRegionInfo = hris.get(i % n);
365        WAL log = hrs.getWAL(curRegionInfo);
366        byte[] startRow = curRegionInfo.getStartKey();
367        if (startRow == null || startRow.length == 0) {
368          startRow = new byte[] { 0, 0, 0, 0, 1 };
369        }
370        byte[] row = Bytes.incrementBytes(startRow, counts[i % n]);
371        row = Arrays.copyOfRange(row, 3, 8); // use last 5 bytes because
372        // HBaseTestingUtility.createMultiRegions use 5 bytes key
373        byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i));
374        e.add(new KeyValue(row, COLUMN_FAMILY, qualifier, System.currentTimeMillis(), value));
375        log.appendData(curRegionInfo, new WALKeyImpl(curRegionInfo.getEncodedNameAsBytes(),
376          tableName, System.currentTimeMillis(), mvcc), e);
377        if (0 == i % syncEvery) {
378          log.sync();
379        }
380        counts[i % n] += 1;
381      }
382    }
383    // done as two passes because the regions might share logs. shutdown is idempotent, but sync
384    // will cause errors if done after.
385    for (RegionInfo info : hris) {
386      WAL log = hrs.getWAL(info);
387      log.sync();
388    }
389    if (cleanShutdown) {
390      for (RegionInfo info : hris) {
391        WAL log = hrs.getWAL(info);
392        log.shutdown();
393      }
394    }
395    for (int i = 0; i < n; i++) {
396      LOG.info("region " + hris.get(i).getRegionNameAsString() + " has " + counts[i] + " edits");
397    }
398    return;
399  }
400
401  private int countWAL(Path log, FileSystem fs, Configuration conf) throws IOException {
402    int count = 0;
403    try (WAL.Reader in = WALFactory.createReader(fs, log, conf)) {
404      WAL.Entry e;
405      while ((e = in.next()) != null) {
406        if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) {
407          count++;
408        }
409      }
410    }
411    return count;
412  }
413
414  private void blockUntilNoRIT() throws Exception {
415    TEST_UTIL.waitUntilNoRegionsInTransition(60000);
416  }
417
418  private void putData(Region region, byte[] startRow, int numRows, byte[] qf, byte[]... families)
419      throws IOException {
420    for (int i = 0; i < numRows; i++) {
421      Put put = new Put(Bytes.add(startRow, Bytes.toBytes(i)));
422      for (byte[] family : families) {
423        put.addColumn(family, qf, null);
424      }
425      region.put(put);
426    }
427  }
428
429  private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems)
430      throws InterruptedException {
431    long curt = System.currentTimeMillis();
432    long endt = curt + timems;
433    while (curt < endt) {
434      if (ctr.sum() == oldval) {
435        Thread.sleep(100);
436        curt = System.currentTimeMillis();
437      } else {
438        assertEquals(newval, ctr.sum());
439        return;
440      }
441    }
442    fail();
443  }
444
445  private void abortMaster(MiniHBaseCluster cluster) throws InterruptedException {
446    for (MasterThread mt : cluster.getLiveMasterThreads()) {
447      if (mt.getMaster().isActiveMaster()) {
448        mt.getMaster().abort("Aborting for tests", new Exception("Trace info"));
449        mt.join();
450        break;
451      }
452    }
453    LOG.debug("Master is aborted");
454  }
455
456  /**
457   * Find a RS that has regions of a table.
458   * @param hasMetaRegion when true, the returned RS has hbase:meta region as well
459   */
460  private HRegionServer findRSToKill(boolean hasMetaRegion) throws Exception {
461    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
462    List<RegionInfo> regions = null;
463    HRegionServer hrs = null;
464
465    for (RegionServerThread rst : rsts) {
466      hrs = rst.getRegionServer();
467      while (rst.isAlive() && !hrs.isOnline()) {
468        Thread.sleep(100);
469      }
470      if (!rst.isAlive()) {
471        continue;
472      }
473      boolean isCarryingMeta = false;
474      boolean foundTableRegion = false;
475      regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
476      for (RegionInfo region : regions) {
477        if (region.isMetaRegion()) {
478          isCarryingMeta = true;
479        }
480        if (region.getTable() == tableName) {
481          foundTableRegion = true;
482        }
483        if (foundTableRegion && (isCarryingMeta || !hasMetaRegion)) {
484          break;
485        }
486      }
487      if (isCarryingMeta && hasMetaRegion) {
488        // clients ask for a RS with META
489        if (!foundTableRegion) {
490          HRegionServer destRS = hrs;
491          // the RS doesn't have regions of the specified table so we need move one to this RS
492          List<RegionInfo> tableRegions = TEST_UTIL.getAdmin().getRegions(tableName);
493          RegionInfo hri = tableRegions.get(0);
494          TEST_UTIL.getAdmin().move(hri.getEncodedNameAsBytes(), destRS.getServerName());
495          // wait for region move completes
496          RegionStates regionStates =
497              TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
498          TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() {
499            @Override
500            public boolean evaluate() throws Exception {
501              ServerName sn = regionStates.getRegionServerOfRegion(hri);
502              return (sn != null && sn.equals(destRS.getServerName()));
503            }
504          });
505        }
506        return hrs;
507      } else if (hasMetaRegion || isCarryingMeta) {
508        continue;
509      }
510      if (foundTableRegion) {
511        break;
512      }
513    }
514
515    return hrs;
516  }
517}