001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertTrue;
024
025import java.io.EOFException;
026import java.io.IOException;
027import java.io.InterruptedIOException;
028import java.util.ArrayList;
029import java.util.HashSet;
030import java.util.List;
031import java.util.Set;
032import java.util.concurrent.atomic.AtomicBoolean;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.Waiter;
040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
041import org.apache.hadoop.hbase.client.Put;
042import org.apache.hadoop.hbase.client.RegionInfo;
043import org.apache.hadoop.hbase.client.Result;
044import org.apache.hadoop.hbase.client.ResultScanner;
045import org.apache.hadoop.hbase.client.Scan;
046import org.apache.hadoop.hbase.client.Table;
047import org.apache.hadoop.hbase.client.TableDescriptor;
048import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
049import org.apache.hadoop.hbase.fs.HFileSystem;
050import org.apache.hadoop.hbase.regionserver.HRegion;
051import org.apache.hadoop.hbase.testclassification.LargeTests;
052import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.apache.hadoop.hbase.util.CommonFSUtils;
055import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
056import org.apache.hadoop.hbase.util.JVMClusterUtil;
057import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
058import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
059import org.apache.hadoop.hbase.wal.WAL;
060import org.apache.hadoop.hbase.wal.WAL.Entry;
061import org.apache.hadoop.hbase.wal.WALFactory;
062import org.apache.hadoop.hbase.wal.WALProvider.Writer;
063import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
064import org.apache.hadoop.hdfs.server.datanode.DataNode;
065import org.junit.BeforeClass;
066import org.junit.ClassRule;
067import org.junit.Test;
068import org.junit.experimental.categories.Category;
069import org.slf4j.Logger;
070import org.slf4j.LoggerFactory;
071
072@Category({ VerySlowRegionServerTests.class, LargeTests.class })
073public class TestLogRolling extends AbstractTestLogRolling {
074
075  @ClassRule
076  public static final HBaseClassTestRule CLASS_RULE =
077    HBaseClassTestRule.forClass(TestLogRolling.class);
078
079  private static final Logger LOG = LoggerFactory.getLogger(TestLogRolling.class);
080
081  @BeforeClass
082  public static void setUpBeforeClass() throws Exception {
083    // TODO: testLogRollOnDatanodeDeath fails if short circuit reads are on under the hadoop2
084    // profile. See HBASE-9337 for related issues.
085    System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
086
087    /**** configuration for testLogRollOnDatanodeDeath ****/
088    // lower the namenode & datanode heartbeat so the namenode
089    // quickly detects datanode failures
090    Configuration conf = TEST_UTIL.getConfiguration();
091    conf.setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
092    conf.setInt("dfs.heartbeat.interval", 1);
093    // the namenode might still try to choose the recently-dead datanode
094    // for a pipeline, so try to a new pipeline multiple times
095    conf.setInt("dfs.client.block.write.retries", 30);
096    conf.setInt("hbase.regionserver.hlog.tolerable.lowreplication", 2);
097    conf.setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3);
098    conf.set(WALFactory.WAL_PROVIDER, "filesystem");
099    AbstractTestLogRolling.setUpBeforeClass();
100
101    // For slow sync threshold test: roll after 5 slow syncs in 10 seconds
102    TEST_UTIL.getConfiguration().setInt(FSHLog.SLOW_SYNC_ROLL_THRESHOLD, 5);
103    TEST_UTIL.getConfiguration().setInt(FSHLog.SLOW_SYNC_ROLL_INTERVAL_MS, 10 * 1000);
104    // For slow sync threshold test: roll once after a sync above this threshold
105    TEST_UTIL.getConfiguration().setInt(FSHLog.ROLL_ON_SYNC_TIME_MS, 5000);
106  }
107
108  @Test
109  public void testSlowSyncLogRolling() throws Exception {
110    // Create the test table
111    TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName()))
112      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build();
113    admin.createTable(desc);
114    Table table = TEST_UTIL.getConnection().getTable(desc.getTableName());
115    int row = 1;
116    try {
117      // Get a reference to the FSHLog
118      server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
119      RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo();
120      final FSHLog log = (FSHLog) server.getWAL(region);
121
122      // Register a WALActionsListener to observe if a SLOW_SYNC roll is requested
123
124      final AtomicBoolean slowSyncHookCalled = new AtomicBoolean();
125      log.registerWALActionsListener(new WALActionsListener() {
126        @Override
127        public void logRollRequested(WALActionsListener.RollRequestReason reason) {
128          switch (reason) {
129            case SLOW_SYNC:
130              slowSyncHookCalled.lazySet(true);
131              break;
132            default:
133              break;
134          }
135        }
136      });
137
138      // Write some data
139
140      for (int i = 0; i < 10; i++) {
141        writeData(table, row++);
142      }
143
144      assertFalse("Should not have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get());
145
146      // Set up for test
147      slowSyncHookCalled.set(false);
148
149      // Wrap the current writer with the anonymous class below that adds 200 ms of
150      // latency to any sync on the hlog. This should be more than sufficient to trigger
151      // slow sync warnings.
152      final Writer oldWriter1 = log.getWriter();
153      final Writer newWriter1 = new Writer() {
154        @Override
155        public void close() throws IOException {
156          oldWriter1.close();
157        }
158
159        @Override
160        public void sync(boolean forceSync) throws IOException {
161          try {
162            Thread.sleep(200);
163          } catch (InterruptedException e) {
164            InterruptedIOException ex = new InterruptedIOException();
165            ex.initCause(e);
166            throw ex;
167          }
168          oldWriter1.sync(forceSync);
169        }
170
171        @Override
172        public void append(Entry entry) throws IOException {
173          oldWriter1.append(entry);
174        }
175
176        @Override
177        public long getLength() {
178          return oldWriter1.getLength();
179        }
180
181        @Override
182        public long getSyncedLength() {
183          return oldWriter1.getSyncedLength();
184        }
185      };
186      log.setWriter(newWriter1);
187
188      // Write some data.
189      // We need to write at least 5 times, but double it. We should only request
190      // a SLOW_SYNC roll once in the current interval.
191      for (int i = 0; i < 10; i++) {
192        writeData(table, row++);
193      }
194
195      // Wait for our wait injecting writer to get rolled out, as needed.
196
197      TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() {
198        @Override
199        public boolean evaluate() throws Exception {
200          return log.getWriter() != newWriter1;
201        }
202
203        @Override
204        public String explainFailure() throws Exception {
205          return "Waited too long for our test writer to get rolled out";
206        }
207      });
208
209      assertTrue("Should have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get());
210
211      // Set up for test
212      slowSyncHookCalled.set(false);
213
214      // Wrap the current writer with the anonymous class below that adds 5000 ms of
215      // latency to any sync on the hlog.
216      // This will trip the other threshold.
217      final Writer oldWriter2 = (Writer) log.getWriter();
218      final Writer newWriter2 = new Writer() {
219        @Override
220        public void close() throws IOException {
221          oldWriter2.close();
222        }
223
224        @Override
225        public void sync(boolean forceSync) throws IOException {
226          try {
227            Thread.sleep(5000);
228          } catch (InterruptedException e) {
229            InterruptedIOException ex = new InterruptedIOException();
230            ex.initCause(e);
231            throw ex;
232          }
233          oldWriter2.sync(forceSync);
234        }
235
236        @Override
237        public void append(Entry entry) throws IOException {
238          oldWriter2.append(entry);
239        }
240
241        @Override
242        public long getLength() {
243          return oldWriter2.getLength();
244        }
245
246        @Override
247        public long getSyncedLength() {
248          return oldWriter2.getSyncedLength();
249        }
250      };
251      log.setWriter(newWriter2);
252
253      // Write some data. Should only take one sync.
254
255      writeData(table, row++);
256
257      // Wait for our wait injecting writer to get rolled out, as needed.
258
259      TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() {
260        @Override
261        public boolean evaluate() throws Exception {
262          return log.getWriter() != newWriter2;
263        }
264
265        @Override
266        public String explainFailure() throws Exception {
267          return "Waited too long for our test writer to get rolled out";
268        }
269      });
270
271      assertTrue("Should have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get());
272
273      // Set up for test
274      slowSyncHookCalled.set(false);
275
276      // Write some data
277      for (int i = 0; i < 10; i++) {
278        writeData(table, row++);
279      }
280
281      assertFalse("Should not have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get());
282
283    } finally {
284      table.close();
285    }
286  }
287
288  void batchWriteAndWait(Table table, final FSHLog log, int start, boolean expect, int timeout)
289    throws IOException {
290    for (int i = 0; i < 10; i++) {
291      Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", (start + i))));
292      put.addColumn(HConstants.CATALOG_FAMILY, null, value);
293      table.put(put);
294    }
295    Put tmpPut = new Put(Bytes.toBytes("tmprow"));
296    tmpPut.addColumn(HConstants.CATALOG_FAMILY, null, value);
297    long startTime = EnvironmentEdgeManager.currentTime();
298    long remaining = timeout;
299    while (remaining > 0) {
300      if (log.isLowReplicationRollEnabled() == expect) {
301        break;
302      } else {
303        // Trigger calling FSHlog#checkLowReplication()
304        table.put(tmpPut);
305        try {
306          Thread.sleep(200);
307        } catch (InterruptedException e) {
308          // continue
309        }
310        remaining = timeout - (EnvironmentEdgeManager.currentTime() - startTime);
311      }
312    }
313  }
314
315  /**
316   * Tests that logs are rolled upon detecting datanode death Requires an HDFS jar with HDFS-826 &
317   * syncFs() support (HDFS-200)
318   */
319  @Test
320  public void testLogRollOnDatanodeDeath() throws Exception {
321    TEST_UTIL.ensureSomeRegionServersAvailable(2);
322    assertTrue("This test requires WAL file replication set to 2.",
323      fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) == 2);
324    LOG.info("Replication=" + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()));
325
326    this.server = cluster.getRegionServer(0);
327
328    // Create the test table and open it
329    TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName()))
330      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build();
331
332    admin.createTable(desc);
333    Table table = TEST_UTIL.getConnection().getTable(desc.getTableName());
334
335    server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
336    RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo();
337    final FSHLog log = (FSHLog) server.getWAL(region);
338    final AtomicBoolean lowReplicationHookCalled = new AtomicBoolean(false);
339
340    log.registerWALActionsListener(new WALActionsListener() {
341      @Override
342      public void logRollRequested(WALActionsListener.RollRequestReason reason) {
343        switch (reason) {
344          case LOW_REPLICATION:
345            lowReplicationHookCalled.lazySet(true);
346            break;
347          default:
348            break;
349        }
350      }
351    });
352
353    // add up the datanode count, to ensure proper replication when we kill 1
354    // This function is synchronous; when it returns, the dfs cluster is active
355    // We start 3 servers and then stop 2 to avoid a directory naming conflict
356    // when we stop/start a namenode later, as mentioned in HBASE-5163
357    List<DataNode> existingNodes = dfsCluster.getDataNodes();
358    int numDataNodes = 3;
359    dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), numDataNodes, true, null, null);
360    List<DataNode> allNodes = dfsCluster.getDataNodes();
361    for (int i = allNodes.size() - 1; i >= 0; i--) {
362      if (existingNodes.contains(allNodes.get(i))) {
363        dfsCluster.stopDataNode(i);
364      }
365    }
366
367    assertTrue(
368      "DataNodes " + dfsCluster.getDataNodes().size() + " default replication "
369        + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()),
370      dfsCluster.getDataNodes().size()
371          >= fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) + 1);
372
373    writeData(table, 2);
374
375    long curTime = EnvironmentEdgeManager.currentTime();
376    LOG.info("log.getCurrentFileName(): " + log.getCurrentFileName());
377    long oldFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log);
378    assertTrue("Log should have a timestamp older than now",
379      curTime > oldFilenum && oldFilenum != -1);
380
381    assertTrue("The log shouldn't have rolled yet",
382      oldFilenum == AbstractFSWALProvider.extractFileNumFromWAL(log));
383    final DatanodeInfo[] pipeline = log.getPipeline();
384    assertTrue(pipeline.length == fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()));
385
386    // kill a datanode in the pipeline to force a log roll on the next sync()
387    // This function is synchronous, when it returns the node is killed.
388    assertTrue(dfsCluster.stopDataNode(pipeline[0].getName()) != null);
389
390    // this write should succeed, but trigger a log roll
391    writeData(table, 2);
392    long newFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log);
393
394    assertTrue("Missing datanode should've triggered a log roll",
395      newFilenum > oldFilenum && newFilenum > curTime);
396
397    assertTrue("The log rolling hook should have been called with the low replication flag",
398      lowReplicationHookCalled.get());
399
400    // write some more log data (this should use a new hdfs_out)
401    writeData(table, 3);
402    assertTrue("The log should not roll again.",
403      AbstractFSWALProvider.extractFileNumFromWAL(log) == newFilenum);
404    // kill another datanode in the pipeline, so the replicas will be lower than
405    // the configured value 2.
406    assertTrue(dfsCluster.stopDataNode(pipeline[1].getName()) != null);
407
408    batchWriteAndWait(table, log, 3, false, 14000);
409    int replication = log.getLogReplication();
410    assertTrue("LowReplication Roller should've been disabled, current replication=" + replication,
411      !log.isLowReplicationRollEnabled());
412
413    dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), 1, true, null, null);
414
415    // Force roll writer. The new log file will have the default replications,
416    // and the LowReplication Roller will be enabled.
417    log.rollWriter(true);
418    batchWriteAndWait(table, log, 13, true, 10000);
419    replication = log.getLogReplication();
420    assertTrue("New log file should have the default replication instead of " + replication,
421      replication == fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()));
422    assertTrue("LowReplication Roller should've been enabled", log.isLowReplicationRollEnabled());
423  }
424
425  /**
426   * Test that WAL is rolled when all data nodes in the pipeline have been restarted. n
427   */
428  @Test
429  public void testLogRollOnPipelineRestart() throws Exception {
430    LOG.info("Starting testLogRollOnPipelineRestart");
431    assertTrue("This test requires WAL file replication.",
432      fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) > 1);
433    LOG.info("Replication=" + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()));
434    // When the hbase:meta table can be opened, the region servers are running
435    Table t = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME);
436    try {
437      this.server = cluster.getRegionServer(0);
438
439      // Create the test table and open it
440      TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName()))
441        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build();
442
443      admin.createTable(desc);
444      Table table = TEST_UTIL.getConnection().getTable(desc.getTableName());
445
446      server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
447      RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo();
448      final WAL log = server.getWAL(region);
449      final List<Path> paths = new ArrayList<>(1);
450      final List<Integer> preLogRolledCalled = new ArrayList<>();
451
452      paths.add(AbstractFSWALProvider.getCurrentFileName(log));
453      log.registerWALActionsListener(new WALActionsListener() {
454
455        @Override
456        public void preLogRoll(Path oldFile, Path newFile) {
457          LOG.debug("preLogRoll: oldFile=" + oldFile + " newFile=" + newFile);
458          preLogRolledCalled.add(new Integer(1));
459        }
460
461        @Override
462        public void postLogRoll(Path oldFile, Path newFile) {
463          paths.add(newFile);
464        }
465      });
466
467      writeData(table, 1002);
468
469      long curTime = EnvironmentEdgeManager.currentTime();
470      LOG.info("log.getCurrentFileName()): " + AbstractFSWALProvider.getCurrentFileName(log));
471      long oldFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log);
472      assertTrue("Log should have a timestamp older than now",
473        curTime > oldFilenum && oldFilenum != -1);
474
475      assertTrue("The log shouldn't have rolled yet",
476        oldFilenum == AbstractFSWALProvider.extractFileNumFromWAL(log));
477
478      // roll all datanodes in the pipeline
479      dfsCluster.restartDataNodes();
480      Thread.sleep(1000);
481      dfsCluster.waitActive();
482      LOG.info("Data Nodes restarted");
483      validateData(table, 1002);
484
485      // this write should succeed, but trigger a log roll
486      writeData(table, 1003);
487      long newFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log);
488
489      assertTrue("Missing datanode should've triggered a log roll",
490        newFilenum > oldFilenum && newFilenum > curTime);
491      validateData(table, 1003);
492
493      writeData(table, 1004);
494
495      // roll all datanode again
496      dfsCluster.restartDataNodes();
497      Thread.sleep(1000);
498      dfsCluster.waitActive();
499      LOG.info("Data Nodes restarted");
500      validateData(table, 1004);
501
502      // this write should succeed, but trigger a log roll
503      writeData(table, 1005);
504
505      // force a log roll to read back and verify previously written logs
506      log.rollWriter(true);
507      assertTrue("preLogRolledCalled has size of " + preLogRolledCalled.size(),
508        preLogRolledCalled.size() >= 1);
509
510      // read back the data written
511      Set<String> loggedRows = new HashSet<>();
512      for (Path p : paths) {
513        LOG.debug("recovering lease for " + p);
514        RecoverLeaseFSUtils.recoverFileLease(((HFileSystem) fs).getBackingFs(), p,
515          TEST_UTIL.getConfiguration(), null);
516
517        LOG.debug("Reading WAL " + CommonFSUtils.getPath(p));
518        WAL.Reader reader = null;
519        try {
520          reader = WALFactory.createReader(fs, p, TEST_UTIL.getConfiguration());
521          WAL.Entry entry;
522          while ((entry = reader.next()) != null) {
523            LOG.debug("#" + entry.getKey().getSequenceId() + ": " + entry.getEdit().getCells());
524            for (Cell cell : entry.getEdit().getCells()) {
525              loggedRows.add(
526                Bytes.toStringBinary(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
527            }
528          }
529        } catch (EOFException e) {
530          LOG.debug("EOF reading file " + CommonFSUtils.getPath(p));
531        } finally {
532          if (reader != null) reader.close();
533        }
534      }
535
536      // verify the written rows are there
537      assertTrue(loggedRows.contains("row1002"));
538      assertTrue(loggedRows.contains("row1003"));
539      assertTrue(loggedRows.contains("row1004"));
540      assertTrue(loggedRows.contains("row1005"));
541
542      // flush all regions
543      for (HRegion r : server.getOnlineRegionsLocalContext()) {
544        try {
545          r.flush(true);
546        } catch (Exception e) {
547          // This try/catch was added by HBASE-14317. It is needed
548          // because this issue tightened up the semantic such that
549          // a failed append could not be followed by a successful
550          // sync. What is coming out here is a failed sync, a sync
551          // that used to 'pass'.
552          LOG.info(e.toString(), e);
553        }
554      }
555
556      ResultScanner scanner = table.getScanner(new Scan());
557      try {
558        for (int i = 2; i <= 5; i++) {
559          Result r = scanner.next();
560          assertNotNull(r);
561          assertFalse(r.isEmpty());
562          assertEquals("row100" + i, Bytes.toString(r.getRow()));
563        }
564      } finally {
565        scanner.close();
566      }
567
568      // verify that no region servers aborted
569      for (JVMClusterUtil.RegionServerThread rsThread : TEST_UTIL.getHBaseCluster()
570        .getRegionServerThreads()) {
571        assertFalse(rsThread.getRegionServer().isAborted());
572      }
573    } finally {
574      if (t != null) t.close();
575    }
576  }
577
578}