001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertTrue;
024
025import java.io.EOFException;
026import java.io.IOException;
027import java.io.InterruptedIOException;
028import java.util.ArrayList;
029import java.util.HashSet;
030import java.util.List;
031import java.util.Set;
032import java.util.concurrent.atomic.AtomicBoolean;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
040import org.apache.hadoop.hbase.client.Put;
041import org.apache.hadoop.hbase.client.RegionInfo;
042import org.apache.hadoop.hbase.client.Result;
043import org.apache.hadoop.hbase.client.ResultScanner;
044import org.apache.hadoop.hbase.client.Scan;
045import org.apache.hadoop.hbase.client.Table;
046import org.apache.hadoop.hbase.client.TableDescriptor;
047import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
048import org.apache.hadoop.hbase.fs.HFileSystem;
049import org.apache.hadoop.hbase.regionserver.HRegion;
050import org.apache.hadoop.hbase.testclassification.LargeTests;
051import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.util.CommonFSUtils;
054import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
055import org.apache.hadoop.hbase.util.JVMClusterUtil;
056import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
057import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
058import org.apache.hadoop.hbase.wal.FSHLogProvider;
059import org.apache.hadoop.hbase.wal.WAL;
060import org.apache.hadoop.hbase.wal.WALFactory;
061import org.apache.hadoop.hbase.wal.WALStreamReader;
062import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
063import org.apache.hadoop.hdfs.server.datanode.DataNode;
064import org.junit.BeforeClass;
065import org.junit.ClassRule;
066import org.junit.Test;
067import org.junit.experimental.categories.Category;
068import org.slf4j.Logger;
069import org.slf4j.LoggerFactory;
070
071@Category({ VerySlowRegionServerTests.class, LargeTests.class })
072public class TestLogRolling extends AbstractTestLogRolling {
073
074  @ClassRule
075  public static final HBaseClassTestRule CLASS_RULE =
076    HBaseClassTestRule.forClass(TestLogRolling.class);
077
078  private static final Logger LOG = LoggerFactory.getLogger(TestLogRolling.class);
079
080  @BeforeClass
081  public static void setUpBeforeClass() throws Exception {
082    // TODO: testLogRollOnDatanodeDeath fails if short circuit reads are on under the hadoop2
083    // profile. See HBASE-9337 for related issues.
084    System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
085
086    /**** configuration for testLogRollOnDatanodeDeath ****/
087    // lower the namenode & datanode heartbeat so the namenode
088    // quickly detects datanode failures
089    Configuration conf = TEST_UTIL.getConfiguration();
090    conf.setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
091    conf.setInt("dfs.heartbeat.interval", 1);
092    // the namenode might still try to choose the recently-dead datanode
093    // for a pipeline, so try to a new pipeline multiple times
094    conf.setInt("dfs.client.block.write.retries", 30);
095    conf.setInt("hbase.regionserver.hlog.tolerable.lowreplication", 2);
096    conf.setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3);
097    conf.set(WALFactory.WAL_PROVIDER, "filesystem");
098    AbstractTestLogRolling.setUpBeforeClass();
099  }
100
101  public static class SlowSyncLogWriter extends ProtobufLogWriter {
102    @Override
103    public void sync(boolean forceSync) throws IOException {
104      try {
105        Thread.sleep(syncLatencyMillis);
106      } catch (InterruptedException e) {
107        InterruptedIOException ex = new InterruptedIOException();
108        ex.initCause(e);
109        throw ex;
110      }
111      super.sync(forceSync);
112    }
113  }
114
115  @Override
116  protected void setSlowLogWriter(Configuration conf) {
117    conf.set(FSHLogProvider.WRITER_IMPL, SlowSyncLogWriter.class.getName());
118  }
119
120  @Override
121  protected void setDefaultLogWriter(Configuration conf) {
122    conf.set(FSHLogProvider.WRITER_IMPL, ProtobufLogWriter.class.getName());
123  }
124
125  void batchWriteAndWait(Table table, final FSHLog log, int start, boolean expect, int timeout)
126    throws IOException {
127    for (int i = 0; i < 10; i++) {
128      Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", (start + i))));
129      put.addColumn(HConstants.CATALOG_FAMILY, null, value);
130      table.put(put);
131    }
132    Put tmpPut = new Put(Bytes.toBytes("tmprow"));
133    tmpPut.addColumn(HConstants.CATALOG_FAMILY, null, value);
134    long startTime = EnvironmentEdgeManager.currentTime();
135    long remaining = timeout;
136    while (remaining > 0) {
137      if (log.isLowReplicationRollEnabled() == expect) {
138        break;
139      } else {
140        // Trigger calling FSHlog#checkLowReplication()
141        table.put(tmpPut);
142        try {
143          Thread.sleep(200);
144        } catch (InterruptedException e) {
145          // continue
146        }
147        remaining = timeout - (EnvironmentEdgeManager.currentTime() - startTime);
148      }
149    }
150  }
151
152  @Test
153  public void testSlowSyncLogRolling() throws Exception {
154    // Create the test table
155    TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName()))
156      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build();
157    admin.createTable(desc);
158    try (Table table = TEST_UTIL.getConnection().getTable(desc.getTableName())) {
159      server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
160      RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo();
161      final AbstractFSWAL<?> log = getWALAndRegisterSlowSyncHook(region);
162
163      // Set default log writer, no additional latency to any sync on the hlog.
164      checkSlowSync(log, table, -1, 10, false);
165
166      // Adds 200 ms of latency to any sync on the hlog. This should be more than sufficient to
167      // trigger slow sync warnings.
168      // Write some data.
169      // We need to write at least 5 times, but double it. We should only request
170      // a SLOW_SYNC roll once in the current interval.
171      checkSlowSync(log, table, 200, 10, true);
172
173      // Adds 5000 ms of latency to any sync on the hlog. This will trip the other threshold.
174      // Write some data. Should only take one sync.
175      checkSlowSync(log, table, 5000, 1, true);
176
177      // Set default log writer, no additional latency to any sync on the hlog.
178      checkSlowSync(log, table, -1, 10, false);
179    }
180  }
181
182  /**
183   * Tests that logs are rolled upon detecting datanode death Requires an HDFS jar with HDFS-826 &
184   * syncFs() support (HDFS-200)
185   */
186  @Test
187  public void testLogRollOnDatanodeDeath() throws Exception {
188
189    Long oldValue = TEST_UTIL.getConfiguration()
190      .getLong("hbase.regionserver.hlog.check.lowreplication.interval", -1);
191
192    try {
193      /**
194       * When we reuse the code of AsyncFSWAL to FSHLog, the low replication is only checked by
195       * {@link LogRoller#checkLowReplication},so in order to make this test spend less time,we
196       * should minimize following config which is maximized by
197       * {@link AbstractTestLogRolling#setUpBeforeClass}
198       */
199      TEST_UTIL.getConfiguration().setLong("hbase.regionserver.hlog.check.lowreplication.interval",
200        1000);
201      this.tearDown();
202      this.setUp();
203
204      TEST_UTIL.ensureSomeRegionServersAvailable(2);
205      assertTrue("This test requires WAL file replication set to 2.",
206        fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) == 2);
207      LOG.info("Replication=" + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()));
208
209      this.server = cluster.getRegionServer(0);
210
211      // Create the test table and open it
212      TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName()))
213        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build();
214
215      admin.createTable(desc);
216      Table table = TEST_UTIL.getConnection().getTable(desc.getTableName());
217
218      server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
219      RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo();
220      final FSHLog log = (FSHLog) server.getWAL(region);
221      final AtomicBoolean lowReplicationHookCalled = new AtomicBoolean(false);
222
223      log.registerWALActionsListener(new WALActionsListener() {
224        @Override
225        public void logRollRequested(WALActionsListener.RollRequestReason reason) {
226          switch (reason) {
227            case LOW_REPLICATION:
228              lowReplicationHookCalled.lazySet(true);
229              break;
230            default:
231              break;
232          }
233        }
234      });
235
236      // add up the datanode count, to ensure proper replication when we kill 1
237      // This function is synchronous; when it returns, the dfs cluster is active
238      // We start 3 servers and then stop 2 to avoid a directory naming conflict
239      // when we stop/start a namenode later, as mentioned in HBASE-5163
240      List<DataNode> existingNodes = dfsCluster.getDataNodes();
241      int numDataNodes = 3;
242      TEST_UTIL.getConfiguration().setLong("hbase.regionserver.hlog.check.lowreplication.interval",
243        1000);
244      dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), numDataNodes, true, null, null);
245      List<DataNode> allNodes = dfsCluster.getDataNodes();
246      for (int i = allNodes.size() - 1; i >= 0; i--) {
247        if (existingNodes.contains(allNodes.get(i))) {
248          dfsCluster.stopDataNode(i);
249        }
250      }
251
252      assertTrue(
253        "DataNodes " + dfsCluster.getDataNodes().size() + " default replication "
254          + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()),
255        dfsCluster.getDataNodes().size()
256            >= fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) + 1);
257
258      writeData(table, 2);
259
260      long curTime = EnvironmentEdgeManager.currentTime();
261      LOG.info("log.getCurrentFileName(): " + log.getCurrentFileName());
262      long oldFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log);
263      assertTrue("Log should have a timestamp older than now",
264        curTime > oldFilenum && oldFilenum != -1);
265
266      assertTrue("The log shouldn't have rolled yet",
267        oldFilenum == AbstractFSWALProvider.extractFileNumFromWAL(log));
268      final DatanodeInfo[] pipeline = log.getPipeline();
269      assertTrue(pipeline.length == fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()));
270
271      // kill a datanode in the pipeline to force a log roll on the next sync()
272      // This function is synchronous, when it returns the node is killed.
273      assertTrue(dfsCluster.stopDataNode(pipeline[0].getName()) != null);
274
275      // this write should succeed, but trigger a log roll
276      writeData(table, 2);
277
278      TEST_UTIL.waitFor(10000, 100, () -> {
279        long newFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log);
280        return newFilenum > oldFilenum && newFilenum > curTime && lowReplicationHookCalled.get();
281      });
282
283      long newFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log);
284
285      // write some more log data (this should use a new hdfs_out)
286      writeData(table, 3);
287      assertTrue("The log should not roll again.",
288        AbstractFSWALProvider.extractFileNumFromWAL(log) == newFilenum);
289      // kill another datanode in the pipeline, so the replicas will be lower than
290      // the configured value 2.
291      assertTrue(dfsCluster.stopDataNode(pipeline[1].getName()) != null);
292
293      batchWriteAndWait(table, log, 3, false, 14000);
294      int replication = log.getLogReplication();
295      assertTrue(
296        "LowReplication Roller should've been disabled, current replication=" + replication,
297        !log.isLowReplicationRollEnabled());
298
299      dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), 1, true, null, null);
300
301      // Force roll writer. The new log file will have the default replications,
302      // and the LowReplication Roller will be enabled.
303      log.rollWriter(true);
304      batchWriteAndWait(table, log, 13, true, 10000);
305      replication = log.getLogReplication();
306      assertTrue("New log file should have the default replication instead of " + replication,
307        replication == fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()));
308      assertTrue("LowReplication Roller should've been enabled", log.isLowReplicationRollEnabled());
309    } finally {
310      TEST_UTIL.getConfiguration().setLong("hbase.regionserver.hlog.check.lowreplication.interval",
311        oldValue);
312    }
313  }
314
315  /**
316   * Test that WAL is rolled when all data nodes in the pipeline have been restarted.
317   */
318  @Test
319  public void testLogRollOnPipelineRestart() throws Exception {
320    LOG.info("Starting testLogRollOnPipelineRestart");
321    assertTrue("This test requires WAL file replication.",
322      fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) > 1);
323    LOG.info("Replication=" + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()));
324    // When the hbase:meta table can be opened, the region servers are running
325    Table t = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME);
326    try {
327      this.server = cluster.getRegionServer(0);
328
329      // Create the test table and open it
330      TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName()))
331        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build();
332
333      admin.createTable(desc);
334      Table table = TEST_UTIL.getConnection().getTable(desc.getTableName());
335
336      server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
337      RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo();
338      final WAL log = server.getWAL(region);
339      final List<Path> paths = new ArrayList<>(1);
340      final List<Integer> preLogRolledCalled = new ArrayList<>();
341
342      paths.add(AbstractFSWALProvider.getCurrentFileName(log));
343      log.registerWALActionsListener(new WALActionsListener() {
344
345        @Override
346        public void preLogRoll(Path oldFile, Path newFile) {
347          LOG.debug("preLogRoll: oldFile=" + oldFile + " newFile=" + newFile);
348          preLogRolledCalled.add(new Integer(1));
349        }
350
351        @Override
352        public void postLogRoll(Path oldFile, Path newFile) {
353          paths.add(newFile);
354        }
355      });
356
357      writeData(table, 1002);
358
359      long curTime = EnvironmentEdgeManager.currentTime();
360      LOG.info("log.getCurrentFileName()): " + AbstractFSWALProvider.getCurrentFileName(log));
361      long oldFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log);
362      assertTrue("Log should have a timestamp older than now",
363        curTime > oldFilenum && oldFilenum != -1);
364
365      assertTrue("The log shouldn't have rolled yet",
366        oldFilenum == AbstractFSWALProvider.extractFileNumFromWAL(log));
367
368      // roll all datanodes in the pipeline
369      dfsCluster.restartDataNodes();
370      Thread.sleep(1000);
371      dfsCluster.waitActive();
372      LOG.info("Data Nodes restarted");
373      validateData(table, 1002);
374
375      // this write should succeed, but trigger a log roll
376      writeData(table, 1003);
377      long newFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log);
378
379      assertTrue("Missing datanode should've triggered a log roll",
380        newFilenum > oldFilenum && newFilenum > curTime);
381      validateData(table, 1003);
382
383      writeData(table, 1004);
384
385      // roll all datanode again
386      dfsCluster.restartDataNodes();
387      Thread.sleep(1000);
388      dfsCluster.waitActive();
389      LOG.info("Data Nodes restarted");
390      validateData(table, 1004);
391
392      // this write should succeed, but trigger a log roll
393      writeData(table, 1005);
394
395      // force a log roll to read back and verify previously written logs
396      log.rollWriter(true);
397      assertTrue("preLogRolledCalled has size of " + preLogRolledCalled.size(),
398        preLogRolledCalled.size() >= 1);
399
400      // read back the data written
401      Set<String> loggedRows = new HashSet<>();
402      for (Path p : paths) {
403        LOG.debug("recovering lease for " + p);
404        RecoverLeaseFSUtils.recoverFileLease(((HFileSystem) fs).getBackingFs(), p,
405          TEST_UTIL.getConfiguration(), null);
406
407        LOG.debug("Reading WAL " + CommonFSUtils.getPath(p));
408        try (WALStreamReader reader =
409          WALFactory.createStreamReader(fs, p, TEST_UTIL.getConfiguration())) {
410          WAL.Entry entry;
411          while ((entry = reader.next()) != null) {
412            LOG.debug("#" + entry.getKey().getSequenceId() + ": " + entry.getEdit().getCells());
413            for (Cell cell : entry.getEdit().getCells()) {
414              loggedRows.add(
415                Bytes.toStringBinary(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
416            }
417          }
418        } catch (EOFException e) {
419          LOG.debug("EOF reading file " + CommonFSUtils.getPath(p));
420        }
421      }
422
423      // verify the written rows are there
424      assertTrue(loggedRows.contains("row1002"));
425      assertTrue(loggedRows.contains("row1003"));
426      assertTrue(loggedRows.contains("row1004"));
427      assertTrue(loggedRows.contains("row1005"));
428
429      // flush all regions
430      for (HRegion r : server.getOnlineRegionsLocalContext()) {
431        try {
432          r.flush(true);
433        } catch (Exception e) {
434          // This try/catch was added by HBASE-14317. It is needed
435          // because this issue tightened up the semantic such that
436          // a failed append could not be followed by a successful
437          // sync. What is coming out here is a failed sync, a sync
438          // that used to 'pass'.
439          LOG.info(e.toString(), e);
440        }
441      }
442
443      ResultScanner scanner = table.getScanner(new Scan());
444      try {
445        for (int i = 2; i <= 5; i++) {
446          Result r = scanner.next();
447          assertNotNull(r);
448          assertFalse(r.isEmpty());
449          assertEquals("row100" + i, Bytes.toString(r.getRow()));
450        }
451      } finally {
452        scanner.close();
453      }
454
455      // verify that no region servers aborted
456      for (JVMClusterUtil.RegionServerThread rsThread : TEST_UTIL.getHBaseCluster()
457        .getRegionServerThreads()) {
458        assertFalse(rsThread.getRegionServer().isAborted());
459      }
460    } finally {
461      if (t != null) t.close();
462    }
463  }
464
465}