001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertTrue;
024
025import java.io.EOFException;
026import java.io.IOException;
027import java.io.InterruptedIOException;
028import java.util.ArrayList;
029import java.util.HashSet;
030import java.util.List;
031import java.util.Set;
032import java.util.concurrent.atomic.AtomicBoolean;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.Waiter;
040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
041import org.apache.hadoop.hbase.client.Put;
042import org.apache.hadoop.hbase.client.RegionInfo;
043import org.apache.hadoop.hbase.client.Result;
044import org.apache.hadoop.hbase.client.ResultScanner;
045import org.apache.hadoop.hbase.client.Scan;
046import org.apache.hadoop.hbase.client.Table;
047import org.apache.hadoop.hbase.client.TableDescriptor;
048import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
049import org.apache.hadoop.hbase.fs.HFileSystem;
050import org.apache.hadoop.hbase.regionserver.HRegion;
051import org.apache.hadoop.hbase.testclassification.LargeTests;
052import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.apache.hadoop.hbase.util.CommonFSUtils;
055import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
056import org.apache.hadoop.hbase.util.JVMClusterUtil;
057import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
058import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
059import org.apache.hadoop.hbase.wal.WAL;
060import org.apache.hadoop.hbase.wal.WAL.Entry;
061import org.apache.hadoop.hbase.wal.WALFactory;
062import org.apache.hadoop.hbase.wal.WALProvider.Writer;
063import org.apache.hadoop.hbase.wal.WALStreamReader;
064import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
065import org.apache.hadoop.hdfs.server.datanode.DataNode;
066import org.junit.BeforeClass;
067import org.junit.ClassRule;
068import org.junit.Test;
069import org.junit.experimental.categories.Category;
070import org.slf4j.Logger;
071import org.slf4j.LoggerFactory;
072
073@Category({ VerySlowRegionServerTests.class, LargeTests.class })
074public class TestLogRolling extends AbstractTestLogRolling {
075
076  @ClassRule
077  public static final HBaseClassTestRule CLASS_RULE =
078    HBaseClassTestRule.forClass(TestLogRolling.class);
079
080  private static final Logger LOG = LoggerFactory.getLogger(TestLogRolling.class);
081
082  @BeforeClass
083  public static void setUpBeforeClass() throws Exception {
084    // TODO: testLogRollOnDatanodeDeath fails if short circuit reads are on under the hadoop2
085    // profile. See HBASE-9337 for related issues.
086    System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
087
088    /**** configuration for testLogRollOnDatanodeDeath ****/
089    // lower the namenode & datanode heartbeat so the namenode
090    // quickly detects datanode failures
091    Configuration conf = TEST_UTIL.getConfiguration();
092    conf.setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
093    conf.setInt("dfs.heartbeat.interval", 1);
094    // the namenode might still try to choose the recently-dead datanode
095    // for a pipeline, so try to a new pipeline multiple times
096    conf.setInt("dfs.client.block.write.retries", 30);
097    conf.setInt("hbase.regionserver.hlog.tolerable.lowreplication", 2);
098    conf.setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3);
099    conf.set(WALFactory.WAL_PROVIDER, "filesystem");
100    AbstractTestLogRolling.setUpBeforeClass();
101
102    // For slow sync threshold test: roll after 5 slow syncs in 10 seconds
103    TEST_UTIL.getConfiguration().setInt(FSHLog.SLOW_SYNC_ROLL_THRESHOLD, 5);
104    TEST_UTIL.getConfiguration().setInt(FSHLog.SLOW_SYNC_ROLL_INTERVAL_MS, 10 * 1000);
105    // For slow sync threshold test: roll once after a sync above this threshold
106    TEST_UTIL.getConfiguration().setInt(FSHLog.ROLL_ON_SYNC_TIME_MS, 5000);
107  }
108
109  @Test
110  public void testSlowSyncLogRolling() throws Exception {
111    // Create the test table
112    TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName()))
113      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build();
114    admin.createTable(desc);
115    Table table = TEST_UTIL.getConnection().getTable(desc.getTableName());
116    int row = 1;
117    try {
118      // Get a reference to the FSHLog
119      server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
120      RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo();
121      final FSHLog log = (FSHLog) server.getWAL(region);
122
123      // Register a WALActionsListener to observe if a SLOW_SYNC roll is requested
124
125      final AtomicBoolean slowSyncHookCalled = new AtomicBoolean();
126      log.registerWALActionsListener(new WALActionsListener() {
127        @Override
128        public void logRollRequested(WALActionsListener.RollRequestReason reason) {
129          switch (reason) {
130            case SLOW_SYNC:
131              slowSyncHookCalled.lazySet(true);
132              break;
133            default:
134              break;
135          }
136        }
137      });
138
139      // Write some data
140
141      for (int i = 0; i < 10; i++) {
142        writeData(table, row++);
143      }
144
145      assertFalse("Should not have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get());
146
147      // Set up for test
148      slowSyncHookCalled.set(false);
149
150      // Wrap the current writer with the anonymous class below that adds 200 ms of
151      // latency to any sync on the hlog. This should be more than sufficient to trigger
152      // slow sync warnings.
153      final Writer oldWriter1 = log.getWriter();
154      final Writer newWriter1 = new Writer() {
155        @Override
156        public void close() throws IOException {
157          oldWriter1.close();
158        }
159
160        @Override
161        public void sync(boolean forceSync) throws IOException {
162          try {
163            Thread.sleep(200);
164          } catch (InterruptedException e) {
165            InterruptedIOException ex = new InterruptedIOException();
166            ex.initCause(e);
167            throw ex;
168          }
169          oldWriter1.sync(forceSync);
170        }
171
172        @Override
173        public void append(Entry entry) throws IOException {
174          oldWriter1.append(entry);
175        }
176
177        @Override
178        public long getLength() {
179          return oldWriter1.getLength();
180        }
181
182        @Override
183        public long getSyncedLength() {
184          return oldWriter1.getSyncedLength();
185        }
186      };
187      log.setWriter(newWriter1);
188
189      // Write some data.
190      // We need to write at least 5 times, but double it. We should only request
191      // a SLOW_SYNC roll once in the current interval.
192      for (int i = 0; i < 10; i++) {
193        writeData(table, row++);
194      }
195
196      // Wait for our wait injecting writer to get rolled out, as needed.
197
198      TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() {
199        @Override
200        public boolean evaluate() throws Exception {
201          return log.getWriter() != newWriter1;
202        }
203
204        @Override
205        public String explainFailure() throws Exception {
206          return "Waited too long for our test writer to get rolled out";
207        }
208      });
209
210      assertTrue("Should have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get());
211
212      // Set up for test
213      slowSyncHookCalled.set(false);
214
215      // Wrap the current writer with the anonymous class below that adds 5000 ms of
216      // latency to any sync on the hlog.
217      // This will trip the other threshold.
218      final Writer oldWriter2 = (Writer) log.getWriter();
219      final Writer newWriter2 = new Writer() {
220        @Override
221        public void close() throws IOException {
222          oldWriter2.close();
223        }
224
225        @Override
226        public void sync(boolean forceSync) throws IOException {
227          try {
228            Thread.sleep(5000);
229          } catch (InterruptedException e) {
230            InterruptedIOException ex = new InterruptedIOException();
231            ex.initCause(e);
232            throw ex;
233          }
234          oldWriter2.sync(forceSync);
235        }
236
237        @Override
238        public void append(Entry entry) throws IOException {
239          oldWriter2.append(entry);
240        }
241
242        @Override
243        public long getLength() {
244          return oldWriter2.getLength();
245        }
246
247        @Override
248        public long getSyncedLength() {
249          return oldWriter2.getSyncedLength();
250        }
251      };
252      log.setWriter(newWriter2);
253
254      // Write some data. Should only take one sync.
255
256      writeData(table, row++);
257
258      // Wait for our wait injecting writer to get rolled out, as needed.
259
260      TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() {
261        @Override
262        public boolean evaluate() throws Exception {
263          return log.getWriter() != newWriter2;
264        }
265
266        @Override
267        public String explainFailure() throws Exception {
268          return "Waited too long for our test writer to get rolled out";
269        }
270      });
271
272      assertTrue("Should have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get());
273
274      // Set up for test
275      slowSyncHookCalled.set(false);
276
277      // Write some data
278      for (int i = 0; i < 10; i++) {
279        writeData(table, row++);
280      }
281
282      assertFalse("Should not have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get());
283
284    } finally {
285      table.close();
286    }
287  }
288
289  void batchWriteAndWait(Table table, final FSHLog log, int start, boolean expect, int timeout)
290    throws IOException {
291    for (int i = 0; i < 10; i++) {
292      Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", (start + i))));
293      put.addColumn(HConstants.CATALOG_FAMILY, null, value);
294      table.put(put);
295    }
296    Put tmpPut = new Put(Bytes.toBytes("tmprow"));
297    tmpPut.addColumn(HConstants.CATALOG_FAMILY, null, value);
298    long startTime = EnvironmentEdgeManager.currentTime();
299    long remaining = timeout;
300    while (remaining > 0) {
301      if (log.isLowReplicationRollEnabled() == expect) {
302        break;
303      } else {
304        // Trigger calling FSHlog#checkLowReplication()
305        table.put(tmpPut);
306        try {
307          Thread.sleep(200);
308        } catch (InterruptedException e) {
309          // continue
310        }
311        remaining = timeout - (EnvironmentEdgeManager.currentTime() - startTime);
312      }
313    }
314  }
315
316  /**
317   * Tests that logs are rolled upon detecting datanode death Requires an HDFS jar with HDFS-826 &
318   * syncFs() support (HDFS-200)
319   */
320  @Test
321  public void testLogRollOnDatanodeDeath() throws Exception {
322    TEST_UTIL.ensureSomeRegionServersAvailable(2);
323    assertTrue("This test requires WAL file replication set to 2.",
324      fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) == 2);
325    LOG.info("Replication=" + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()));
326
327    this.server = cluster.getRegionServer(0);
328
329    // Create the test table and open it
330    TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName()))
331      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build();
332
333    admin.createTable(desc);
334    Table table = TEST_UTIL.getConnection().getTable(desc.getTableName());
335
336    server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
337    RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo();
338    final FSHLog log = (FSHLog) server.getWAL(region);
339    final AtomicBoolean lowReplicationHookCalled = new AtomicBoolean(false);
340
341    log.registerWALActionsListener(new WALActionsListener() {
342      @Override
343      public void logRollRequested(WALActionsListener.RollRequestReason reason) {
344        switch (reason) {
345          case LOW_REPLICATION:
346            lowReplicationHookCalled.lazySet(true);
347            break;
348          default:
349            break;
350        }
351      }
352    });
353
354    // add up the datanode count, to ensure proper replication when we kill 1
355    // This function is synchronous; when it returns, the dfs cluster is active
356    // We start 3 servers and then stop 2 to avoid a directory naming conflict
357    // when we stop/start a namenode later, as mentioned in HBASE-5163
358    List<DataNode> existingNodes = dfsCluster.getDataNodes();
359    int numDataNodes = 3;
360    dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), numDataNodes, true, null, null);
361    List<DataNode> allNodes = dfsCluster.getDataNodes();
362    for (int i = allNodes.size() - 1; i >= 0; i--) {
363      if (existingNodes.contains(allNodes.get(i))) {
364        dfsCluster.stopDataNode(i);
365      }
366    }
367
368    assertTrue(
369      "DataNodes " + dfsCluster.getDataNodes().size() + " default replication "
370        + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()),
371      dfsCluster.getDataNodes().size()
372          >= fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) + 1);
373
374    writeData(table, 2);
375
376    long curTime = EnvironmentEdgeManager.currentTime();
377    LOG.info("log.getCurrentFileName(): " + log.getCurrentFileName());
378    long oldFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log);
379    assertTrue("Log should have a timestamp older than now",
380      curTime > oldFilenum && oldFilenum != -1);
381
382    assertTrue("The log shouldn't have rolled yet",
383      oldFilenum == AbstractFSWALProvider.extractFileNumFromWAL(log));
384    final DatanodeInfo[] pipeline = log.getPipeline();
385    assertTrue(pipeline.length == fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()));
386
387    // kill a datanode in the pipeline to force a log roll on the next sync()
388    // This function is synchronous, when it returns the node is killed.
389    assertTrue(dfsCluster.stopDataNode(pipeline[0].getName()) != null);
390
391    // this write should succeed, but trigger a log roll
392    writeData(table, 2);
393    long newFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log);
394
395    assertTrue("Missing datanode should've triggered a log roll",
396      newFilenum > oldFilenum && newFilenum > curTime);
397
398    assertTrue("The log rolling hook should have been called with the low replication flag",
399      lowReplicationHookCalled.get());
400
401    // write some more log data (this should use a new hdfs_out)
402    writeData(table, 3);
403    assertTrue("The log should not roll again.",
404      AbstractFSWALProvider.extractFileNumFromWAL(log) == newFilenum);
405    // kill another datanode in the pipeline, so the replicas will be lower than
406    // the configured value 2.
407    assertTrue(dfsCluster.stopDataNode(pipeline[1].getName()) != null);
408
409    batchWriteAndWait(table, log, 3, false, 14000);
410    int replication = log.getLogReplication();
411    assertTrue("LowReplication Roller should've been disabled, current replication=" + replication,
412      !log.isLowReplicationRollEnabled());
413
414    dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), 1, true, null, null);
415
416    // Force roll writer. The new log file will have the default replications,
417    // and the LowReplication Roller will be enabled.
418    log.rollWriter(true);
419    batchWriteAndWait(table, log, 13, true, 10000);
420    replication = log.getLogReplication();
421    assertTrue("New log file should have the default replication instead of " + replication,
422      replication == fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()));
423    assertTrue("LowReplication Roller should've been enabled", log.isLowReplicationRollEnabled());
424  }
425
426  /**
427   * Test that WAL is rolled when all data nodes in the pipeline have been restarted.
428   */
429  @Test
430  public void testLogRollOnPipelineRestart() throws Exception {
431    LOG.info("Starting testLogRollOnPipelineRestart");
432    assertTrue("This test requires WAL file replication.",
433      fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) > 1);
434    LOG.info("Replication=" + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()));
435    // When the hbase:meta table can be opened, the region servers are running
436    Table t = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME);
437    try {
438      this.server = cluster.getRegionServer(0);
439
440      // Create the test table and open it
441      TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName()))
442        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build();
443
444      admin.createTable(desc);
445      Table table = TEST_UTIL.getConnection().getTable(desc.getTableName());
446
447      server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
448      RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo();
449      final WAL log = server.getWAL(region);
450      final List<Path> paths = new ArrayList<>(1);
451      final List<Integer> preLogRolledCalled = new ArrayList<>();
452
453      paths.add(AbstractFSWALProvider.getCurrentFileName(log));
454      log.registerWALActionsListener(new WALActionsListener() {
455
456        @Override
457        public void preLogRoll(Path oldFile, Path newFile) {
458          LOG.debug("preLogRoll: oldFile=" + oldFile + " newFile=" + newFile);
459          preLogRolledCalled.add(new Integer(1));
460        }
461
462        @Override
463        public void postLogRoll(Path oldFile, Path newFile) {
464          paths.add(newFile);
465        }
466      });
467
468      writeData(table, 1002);
469
470      long curTime = EnvironmentEdgeManager.currentTime();
471      LOG.info("log.getCurrentFileName()): " + AbstractFSWALProvider.getCurrentFileName(log));
472      long oldFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log);
473      assertTrue("Log should have a timestamp older than now",
474        curTime > oldFilenum && oldFilenum != -1);
475
476      assertTrue("The log shouldn't have rolled yet",
477        oldFilenum == AbstractFSWALProvider.extractFileNumFromWAL(log));
478
479      // roll all datanodes in the pipeline
480      dfsCluster.restartDataNodes();
481      Thread.sleep(1000);
482      dfsCluster.waitActive();
483      LOG.info("Data Nodes restarted");
484      validateData(table, 1002);
485
486      // this write should succeed, but trigger a log roll
487      writeData(table, 1003);
488      long newFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log);
489
490      assertTrue("Missing datanode should've triggered a log roll",
491        newFilenum > oldFilenum && newFilenum > curTime);
492      validateData(table, 1003);
493
494      writeData(table, 1004);
495
496      // roll all datanode again
497      dfsCluster.restartDataNodes();
498      Thread.sleep(1000);
499      dfsCluster.waitActive();
500      LOG.info("Data Nodes restarted");
501      validateData(table, 1004);
502
503      // this write should succeed, but trigger a log roll
504      writeData(table, 1005);
505
506      // force a log roll to read back and verify previously written logs
507      log.rollWriter(true);
508      assertTrue("preLogRolledCalled has size of " + preLogRolledCalled.size(),
509        preLogRolledCalled.size() >= 1);
510
511      // read back the data written
512      Set<String> loggedRows = new HashSet<>();
513      for (Path p : paths) {
514        LOG.debug("recovering lease for " + p);
515        RecoverLeaseFSUtils.recoverFileLease(((HFileSystem) fs).getBackingFs(), p,
516          TEST_UTIL.getConfiguration(), null);
517
518        LOG.debug("Reading WAL " + CommonFSUtils.getPath(p));
519        try (WALStreamReader reader =
520          WALFactory.createStreamReader(fs, p, TEST_UTIL.getConfiguration())) {
521          WAL.Entry entry;
522          while ((entry = reader.next()) != null) {
523            LOG.debug("#" + entry.getKey().getSequenceId() + ": " + entry.getEdit().getCells());
524            for (Cell cell : entry.getEdit().getCells()) {
525              loggedRows.add(
526                Bytes.toStringBinary(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
527            }
528          }
529        } catch (EOFException e) {
530          LOG.debug("EOF reading file " + CommonFSUtils.getPath(p));
531        }
532      }
533
534      // verify the written rows are there
535      assertTrue(loggedRows.contains("row1002"));
536      assertTrue(loggedRows.contains("row1003"));
537      assertTrue(loggedRows.contains("row1004"));
538      assertTrue(loggedRows.contains("row1005"));
539
540      // flush all regions
541      for (HRegion r : server.getOnlineRegionsLocalContext()) {
542        try {
543          r.flush(true);
544        } catch (Exception e) {
545          // This try/catch was added by HBASE-14317. It is needed
546          // because this issue tightened up the semantic such that
547          // a failed append could not be followed by a successful
548          // sync. What is coming out here is a failed sync, a sync
549          // that used to 'pass'.
550          LOG.info(e.toString(), e);
551        }
552      }
553
554      ResultScanner scanner = table.getScanner(new Scan());
555      try {
556        for (int i = 2; i <= 5; i++) {
557          Result r = scanner.next();
558          assertNotNull(r);
559          assertFalse(r.isEmpty());
560          assertEquals("row100" + i, Bytes.toString(r.getRow()));
561        }
562      } finally {
563        scanner.close();
564      }
565
566      // verify that no region servers aborted
567      for (JVMClusterUtil.RegionServerThread rsThread : TEST_UTIL.getHBaseCluster()
568        .getRegionServerThreads()) {
569        assertFalse(rsThread.getRegionServer().isAborted());
570      }
571    } finally {
572      if (t != null) t.close();
573    }
574  }
575
576}