001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertTrue;
024
025import java.io.EOFException;
026import java.io.IOException;
027import java.io.InterruptedIOException;
028import java.util.ArrayList;
029import java.util.HashSet;
030import java.util.List;
031import java.util.Set;
032import java.util.concurrent.atomic.AtomicBoolean;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.Waiter;
040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
041import org.apache.hadoop.hbase.client.Put;
042import org.apache.hadoop.hbase.client.RegionInfo;
043import org.apache.hadoop.hbase.client.Result;
044import org.apache.hadoop.hbase.client.ResultScanner;
045import org.apache.hadoop.hbase.client.Scan;
046import org.apache.hadoop.hbase.client.Table;
047import org.apache.hadoop.hbase.client.TableDescriptor;
048import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
049import org.apache.hadoop.hbase.fs.HFileSystem;
050import org.apache.hadoop.hbase.regionserver.HRegion;
051import org.apache.hadoop.hbase.testclassification.LargeTests;
052import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.apache.hadoop.hbase.util.CommonFSUtils;
055import org.apache.hadoop.hbase.util.JVMClusterUtil;
056import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
057import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
058import org.apache.hadoop.hbase.wal.WAL;
059import org.apache.hadoop.hbase.wal.WAL.Entry;
060import org.apache.hadoop.hbase.wal.WALFactory;
061import org.apache.hadoop.hbase.wal.WALProvider.Writer;
062import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
063import org.apache.hadoop.hdfs.server.datanode.DataNode;
064import org.junit.BeforeClass;
065import org.junit.ClassRule;
066import org.junit.Test;
067import org.junit.experimental.categories.Category;
068import org.slf4j.Logger;
069import org.slf4j.LoggerFactory;
070
071@Category({ VerySlowRegionServerTests.class, LargeTests.class })
072public class TestLogRolling extends AbstractTestLogRolling {
073
074  @ClassRule
075  public static final HBaseClassTestRule CLASS_RULE =
076      HBaseClassTestRule.forClass(TestLogRolling.class);
077
078  private static final Logger LOG = LoggerFactory.getLogger(TestLogRolling.class);
079
080  @BeforeClass
081  public static void setUpBeforeClass() throws Exception {
082    // TODO: testLogRollOnDatanodeDeath fails if short circuit reads are on under the hadoop2
083    // profile. See HBASE-9337 for related issues.
084    System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
085
086    /**** configuration for testLogRollOnDatanodeDeath ****/
087    // lower the namenode & datanode heartbeat so the namenode
088    // quickly detects datanode failures
089    Configuration conf= TEST_UTIL.getConfiguration();
090    conf.setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
091    conf.setInt("dfs.heartbeat.interval", 1);
092    // the namenode might still try to choose the recently-dead datanode
093    // for a pipeline, so try to a new pipeline multiple times
094    conf.setInt("dfs.client.block.write.retries", 30);
095    conf.setInt("hbase.regionserver.hlog.tolerable.lowreplication", 2);
096    conf.setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3);
097    conf.set(WALFactory.WAL_PROVIDER, "filesystem");
098    AbstractTestLogRolling.setUpBeforeClass();
099
100    // For slow sync threshold test: roll after 5 slow syncs in 10 seconds
101    TEST_UTIL.getConfiguration().setInt(FSHLog.SLOW_SYNC_ROLL_THRESHOLD, 5);
102    TEST_UTIL.getConfiguration().setInt(FSHLog.SLOW_SYNC_ROLL_INTERVAL_MS, 10 * 1000);
103    // For slow sync threshold test: roll once after a sync above this threshold
104    TEST_UTIL.getConfiguration().setInt(FSHLog.ROLL_ON_SYNC_TIME_MS, 5000);
105  }
106
107  @Test
108  public void testSlowSyncLogRolling() throws Exception {
109    // Create the test table
110    TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName()))
111        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build();
112    admin.createTable(desc);
113    Table table = TEST_UTIL.getConnection().getTable(desc.getTableName());
114    int row = 1;
115    try {
116      // Get a reference to the FSHLog
117      server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
118      RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo();
119      final FSHLog log = (FSHLog) server.getWAL(region);
120
121      // Register a WALActionsListener to observe if a SLOW_SYNC roll is requested
122
123      final AtomicBoolean slowSyncHookCalled = new AtomicBoolean();
124      log.registerWALActionsListener(new WALActionsListener() {
125        @Override
126        public void logRollRequested(WALActionsListener.RollRequestReason reason) {
127          switch (reason) {
128            case SLOW_SYNC:
129              slowSyncHookCalled.lazySet(true);
130              break;
131            default:
132              break;
133          }
134        }
135      });
136
137      // Write some data
138
139      for (int i = 0; i < 10; i++) {
140        writeData(table, row++);
141      }
142
143      assertFalse("Should not have triggered log roll due to SLOW_SYNC",
144        slowSyncHookCalled.get());
145
146      // Set up for test
147      slowSyncHookCalled.set(false);
148
149      // Wrap the current writer with the anonymous class below that adds 200 ms of
150      // latency to any sync on the hlog. This should be more than sufficient to trigger
151      // slow sync warnings.
152      final Writer oldWriter1 = log.getWriter();
153      final Writer newWriter1 = new Writer() {
154        @Override
155        public void close() throws IOException {
156          oldWriter1.close();
157        }
158        @Override
159        public void sync(boolean forceSync) throws IOException {
160          try {
161            Thread.sleep(200);
162          } catch (InterruptedException e) {
163            InterruptedIOException ex = new InterruptedIOException();
164            ex.initCause(e);
165            throw ex;
166          }
167          oldWriter1.sync(forceSync);
168        }
169        @Override
170        public void append(Entry entry) throws IOException {
171          oldWriter1.append(entry);
172        }
173        @Override
174        public long getLength() {
175          return oldWriter1.getLength();
176        }
177
178        @Override
179        public long getSyncedLength() {
180          return oldWriter1.getSyncedLength();
181        }
182      };
183      log.setWriter(newWriter1);
184
185      // Write some data.
186      // We need to write at least 5 times, but double it. We should only request
187      // a SLOW_SYNC roll once in the current interval.
188      for (int i = 0; i < 10; i++) {
189        writeData(table, row++);
190      }
191
192      // Wait for our wait injecting writer to get rolled out, as needed.
193
194      TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() {
195        @Override
196        public boolean evaluate() throws Exception {
197          return log.getWriter() != newWriter1;
198        }
199        @Override
200        public String explainFailure() throws Exception {
201          return "Waited too long for our test writer to get rolled out";
202        }
203      });
204
205      assertTrue("Should have triggered log roll due to SLOW_SYNC",
206        slowSyncHookCalled.get());
207
208      // Set up for test
209      slowSyncHookCalled.set(false);
210
211      // Wrap the current writer with the anonymous class below that adds 5000 ms of
212      // latency to any sync on the hlog.
213      // This will trip the other threshold.
214      final Writer oldWriter2 = (Writer)log.getWriter();
215      final Writer newWriter2 = new Writer() {
216        @Override
217        public void close() throws IOException {
218          oldWriter2.close();
219        }
220        @Override
221        public void sync(boolean forceSync) throws IOException {
222          try {
223            Thread.sleep(5000);
224          } catch (InterruptedException e) {
225            InterruptedIOException ex = new InterruptedIOException();
226            ex.initCause(e);
227            throw ex;
228          }
229          oldWriter2.sync(forceSync);
230        }
231        @Override
232        public void append(Entry entry) throws IOException {
233          oldWriter2.append(entry);
234        }
235        @Override
236        public long getLength() {
237          return oldWriter2.getLength();
238        }
239
240        @Override
241        public long getSyncedLength() {
242          return oldWriter2.getSyncedLength();
243        }
244      };
245      log.setWriter(newWriter2);
246
247      // Write some data. Should only take one sync.
248
249      writeData(table, row++);
250
251      // Wait for our wait injecting writer to get rolled out, as needed.
252
253      TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() {
254        @Override
255        public boolean evaluate() throws Exception {
256          return log.getWriter() != newWriter2;
257        }
258        @Override
259        public String explainFailure() throws Exception {
260          return "Waited too long for our test writer to get rolled out";
261        }
262      });
263
264      assertTrue("Should have triggered log roll due to SLOW_SYNC",
265        slowSyncHookCalled.get());
266
267      // Set up for test
268      slowSyncHookCalled.set(false);
269
270      // Write some data
271      for (int i = 0; i < 10; i++) {
272        writeData(table, row++);
273      }
274
275      assertFalse("Should not have triggered log roll due to SLOW_SYNC",
276        slowSyncHookCalled.get());
277
278    } finally {
279      table.close();
280    }
281  }
282
283  void batchWriteAndWait(Table table, final FSHLog log, int start, boolean expect, int timeout)
284      throws IOException {
285    for (int i = 0; i < 10; i++) {
286      Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", (start + i))));
287      put.addColumn(HConstants.CATALOG_FAMILY, null, value);
288      table.put(put);
289    }
290    Put tmpPut = new Put(Bytes.toBytes("tmprow"));
291    tmpPut.addColumn(HConstants.CATALOG_FAMILY, null, value);
292    long startTime = System.currentTimeMillis();
293    long remaining = timeout;
294    while (remaining > 0) {
295      if (log.isLowReplicationRollEnabled() == expect) {
296        break;
297      } else {
298        // Trigger calling FSHlog#checkLowReplication()
299        table.put(tmpPut);
300        try {
301          Thread.sleep(200);
302        } catch (InterruptedException e) {
303          // continue
304        }
305        remaining = timeout - (System.currentTimeMillis() - startTime);
306      }
307    }
308  }
309
310  /**
311   * Tests that logs are rolled upon detecting datanode death Requires an HDFS jar with HDFS-826 &
312   * syncFs() support (HDFS-200)
313   */
314  @Test
315  public void testLogRollOnDatanodeDeath() throws Exception {
316    TEST_UTIL.ensureSomeRegionServersAvailable(2);
317    assertTrue("This test requires WAL file replication set to 2.",
318      fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) == 2);
319    LOG.info("Replication=" + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()));
320
321    this.server = cluster.getRegionServer(0);
322
323    // Create the test table and open it
324    TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName()))
325        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build();
326
327    admin.createTable(desc);
328    Table table = TEST_UTIL.getConnection().getTable(desc.getTableName());
329
330    server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
331    RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo();
332    final FSHLog log = (FSHLog) server.getWAL(region);
333    final AtomicBoolean lowReplicationHookCalled = new AtomicBoolean(false);
334
335    log.registerWALActionsListener(new WALActionsListener() {
336      @Override
337      public void logRollRequested(WALActionsListener.RollRequestReason reason) {
338        switch (reason) {
339          case LOW_REPLICATION:
340            lowReplicationHookCalled.lazySet(true);
341            break;
342          default:
343            break;
344        }
345      }
346    });
347
348    // add up the datanode count, to ensure proper replication when we kill 1
349    // This function is synchronous; when it returns, the dfs cluster is active
350    // We start 3 servers and then stop 2 to avoid a directory naming conflict
351    // when we stop/start a namenode later, as mentioned in HBASE-5163
352    List<DataNode> existingNodes = dfsCluster.getDataNodes();
353    int numDataNodes = 3;
354    dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), numDataNodes, true, null, null);
355    List<DataNode> allNodes = dfsCluster.getDataNodes();
356    for (int i = allNodes.size() - 1; i >= 0; i--) {
357      if (existingNodes.contains(allNodes.get(i))) {
358        dfsCluster.stopDataNode(i);
359      }
360    }
361
362    assertTrue(
363      "DataNodes " + dfsCluster.getDataNodes().size() + " default replication "
364          + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()),
365      dfsCluster.getDataNodes()
366          .size() >= fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) + 1);
367
368    writeData(table, 2);
369
370    long curTime = System.currentTimeMillis();
371    LOG.info("log.getCurrentFileName(): " + log.getCurrentFileName());
372    long oldFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log);
373    assertTrue("Log should have a timestamp older than now",
374      curTime > oldFilenum && oldFilenum != -1);
375
376    assertTrue("The log shouldn't have rolled yet",
377      oldFilenum == AbstractFSWALProvider.extractFileNumFromWAL(log));
378    final DatanodeInfo[] pipeline = log.getPipeline();
379    assertTrue(pipeline.length == fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()));
380
381    // kill a datanode in the pipeline to force a log roll on the next sync()
382    // This function is synchronous, when it returns the node is killed.
383    assertTrue(dfsCluster.stopDataNode(pipeline[0].getName()) != null);
384
385    // this write should succeed, but trigger a log roll
386    writeData(table, 2);
387    long newFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log);
388
389    assertTrue("Missing datanode should've triggered a log roll",
390      newFilenum > oldFilenum && newFilenum > curTime);
391
392    assertTrue("The log rolling hook should have been called with the low replication flag",
393      lowReplicationHookCalled.get());
394
395    // write some more log data (this should use a new hdfs_out)
396    writeData(table, 3);
397    assertTrue("The log should not roll again.",
398      AbstractFSWALProvider.extractFileNumFromWAL(log) == newFilenum);
399    // kill another datanode in the pipeline, so the replicas will be lower than
400    // the configured value 2.
401    assertTrue(dfsCluster.stopDataNode(pipeline[1].getName()) != null);
402
403    batchWriteAndWait(table, log, 3, false, 14000);
404    int replication = log.getLogReplication();
405    assertTrue("LowReplication Roller should've been disabled, current replication=" + replication,
406      !log.isLowReplicationRollEnabled());
407
408    dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), 1, true, null, null);
409
410    // Force roll writer. The new log file will have the default replications,
411    // and the LowReplication Roller will be enabled.
412    log.rollWriter(true);
413    batchWriteAndWait(table, log, 13, true, 10000);
414    replication = log.getLogReplication();
415    assertTrue("New log file should have the default replication instead of " + replication,
416      replication == fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()));
417    assertTrue("LowReplication Roller should've been enabled", log.isLowReplicationRollEnabled());
418  }
419
420  /**
421   * Test that WAL is rolled when all data nodes in the pipeline have been restarted.
422   * @throws Exception
423   */
424  @Test
425  public void testLogRollOnPipelineRestart() throws Exception {
426    LOG.info("Starting testLogRollOnPipelineRestart");
427    assertTrue("This test requires WAL file replication.",
428      fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) > 1);
429    LOG.info("Replication=" + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()));
430    // When the hbase:meta table can be opened, the region servers are running
431    Table t = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME);
432    try {
433      this.server = cluster.getRegionServer(0);
434
435      // Create the test table and open it
436      TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName()))
437          .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build();
438
439      admin.createTable(desc);
440      Table table = TEST_UTIL.getConnection().getTable(desc.getTableName());
441
442      server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
443      RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo();
444      final WAL log = server.getWAL(region);
445      final List<Path> paths = new ArrayList<>(1);
446      final List<Integer> preLogRolledCalled = new ArrayList<>();
447
448      paths.add(AbstractFSWALProvider.getCurrentFileName(log));
449      log.registerWALActionsListener(new WALActionsListener() {
450
451        @Override
452        public void preLogRoll(Path oldFile, Path newFile) {
453          LOG.debug("preLogRoll: oldFile=" + oldFile + " newFile=" + newFile);
454          preLogRolledCalled.add(new Integer(1));
455        }
456
457        @Override
458        public void postLogRoll(Path oldFile, Path newFile) {
459          paths.add(newFile);
460        }
461      });
462
463      writeData(table, 1002);
464
465      long curTime = System.currentTimeMillis();
466      LOG.info("log.getCurrentFileName()): " + AbstractFSWALProvider.getCurrentFileName(log));
467      long oldFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log);
468      assertTrue("Log should have a timestamp older than now",
469        curTime > oldFilenum && oldFilenum != -1);
470
471      assertTrue("The log shouldn't have rolled yet",
472        oldFilenum == AbstractFSWALProvider.extractFileNumFromWAL(log));
473
474      // roll all datanodes in the pipeline
475      dfsCluster.restartDataNodes();
476      Thread.sleep(1000);
477      dfsCluster.waitActive();
478      LOG.info("Data Nodes restarted");
479      validateData(table, 1002);
480
481      // this write should succeed, but trigger a log roll
482      writeData(table, 1003);
483      long newFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log);
484
485      assertTrue("Missing datanode should've triggered a log roll",
486        newFilenum > oldFilenum && newFilenum > curTime);
487      validateData(table, 1003);
488
489      writeData(table, 1004);
490
491      // roll all datanode again
492      dfsCluster.restartDataNodes();
493      Thread.sleep(1000);
494      dfsCluster.waitActive();
495      LOG.info("Data Nodes restarted");
496      validateData(table, 1004);
497
498      // this write should succeed, but trigger a log roll
499      writeData(table, 1005);
500
501      // force a log roll to read back and verify previously written logs
502      log.rollWriter(true);
503      assertTrue("preLogRolledCalled has size of " + preLogRolledCalled.size(),
504        preLogRolledCalled.size() >= 1);
505
506      // read back the data written
507      Set<String> loggedRows = new HashSet<>();
508      for (Path p : paths) {
509        LOG.debug("recovering lease for " + p);
510        RecoverLeaseFSUtils.recoverFileLease(((HFileSystem) fs).getBackingFs(), p,
511          TEST_UTIL.getConfiguration(), null);
512
513        LOG.debug("Reading WAL " + CommonFSUtils.getPath(p));
514        WAL.Reader reader = null;
515        try {
516          reader = WALFactory.createReader(fs, p, TEST_UTIL.getConfiguration());
517          WAL.Entry entry;
518          while ((entry = reader.next()) != null) {
519            LOG.debug("#" + entry.getKey().getSequenceId() + ": " + entry.getEdit().getCells());
520            for (Cell cell : entry.getEdit().getCells()) {
521              loggedRows.add(
522                Bytes.toStringBinary(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
523            }
524          }
525        } catch (EOFException e) {
526          LOG.debug("EOF reading file " + CommonFSUtils.getPath(p));
527        } finally {
528          if (reader != null) reader.close();
529        }
530      }
531
532      // verify the written rows are there
533      assertTrue(loggedRows.contains("row1002"));
534      assertTrue(loggedRows.contains("row1003"));
535      assertTrue(loggedRows.contains("row1004"));
536      assertTrue(loggedRows.contains("row1005"));
537
538      // flush all regions
539      for (HRegion r : server.getOnlineRegionsLocalContext()) {
540        try {
541          r.flush(true);
542        } catch (Exception e) {
543          // This try/catch was added by HBASE-14317. It is needed
544          // because this issue tightened up the semantic such that
545          // a failed append could not be followed by a successful
546          // sync. What is coming out here is a failed sync, a sync
547          // that used to 'pass'.
548          LOG.info(e.toString(), e);
549        }
550      }
551
552      ResultScanner scanner = table.getScanner(new Scan());
553      try {
554        for (int i = 2; i <= 5; i++) {
555          Result r = scanner.next();
556          assertNotNull(r);
557          assertFalse(r.isEmpty());
558          assertEquals("row100" + i, Bytes.toString(r.getRow()));
559        }
560      } finally {
561        scanner.close();
562      }
563
564      // verify that no region servers aborted
565      for (JVMClusterUtil.RegionServerThread rsThread : TEST_UTIL.getHBaseCluster()
566          .getRegionServerThreads()) {
567        assertFalse(rsThread.getRegionServer().isAborted());
568      }
569    } finally {
570      if (t != null) t.close();
571    }
572  }
573
574}