001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertNotNull;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024
025import java.io.EOFException;
026import java.io.IOException;
027import java.io.InterruptedIOException;
028import java.util.ArrayList;
029import java.util.HashSet;
030import java.util.List;
031import java.util.Set;
032import java.util.concurrent.atomic.AtomicBoolean;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
039import org.apache.hadoop.hbase.client.Put;
040import org.apache.hadoop.hbase.client.RegionInfo;
041import org.apache.hadoop.hbase.client.Result;
042import org.apache.hadoop.hbase.client.ResultScanner;
043import org.apache.hadoop.hbase.client.Scan;
044import org.apache.hadoop.hbase.client.Table;
045import org.apache.hadoop.hbase.client.TableDescriptor;
046import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
047import org.apache.hadoop.hbase.fs.HFileSystem;
048import org.apache.hadoop.hbase.regionserver.HRegion;
049import org.apache.hadoop.hbase.testclassification.LargeTests;
050import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.apache.hadoop.hbase.util.CommonFSUtils;
053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
054import org.apache.hadoop.hbase.util.JVMClusterUtil;
055import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
056import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
057import org.apache.hadoop.hbase.wal.FSHLogProvider;
058import org.apache.hadoop.hbase.wal.WAL;
059import org.apache.hadoop.hbase.wal.WALFactory;
060import org.apache.hadoop.hbase.wal.WALStreamReader;
061import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
062import org.apache.hadoop.hdfs.server.datanode.DataNode;
063import org.junit.jupiter.api.BeforeAll;
064import org.junit.jupiter.api.Tag;
065import org.junit.jupiter.api.Test;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069@Tag(VerySlowRegionServerTests.TAG)
070@Tag(LargeTests.TAG)
071public class TestLogRolling extends AbstractTestLogRolling {
072
073  private static final Logger LOG = LoggerFactory.getLogger(TestLogRolling.class);
074
075  @BeforeAll
076  public static void setUpBeforeClass() throws Exception {
077    // TODO: testLogRollOnDatanodeDeath fails if short circuit reads are on under the hadoop2
078    // profile. See HBASE-9337 for related issues.
079    System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
080
081    /**** configuration for testLogRollOnDatanodeDeath ****/
082    // lower the namenode & datanode heartbeat so the namenode
083    // quickly detects datanode failures
084    Configuration conf = TEST_UTIL.getConfiguration();
085    conf.setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
086    conf.setInt("dfs.heartbeat.interval", 1);
087    // the namenode might still try to choose the recently-dead datanode
088    // for a pipeline, so try to a new pipeline multiple times
089    conf.setInt("dfs.client.block.write.retries", 30);
090    conf.setInt("hbase.regionserver.hlog.tolerable.lowreplication", 2);
091    conf.setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3);
092    conf.set(WALFactory.WAL_PROVIDER, "filesystem");
093    AbstractTestLogRolling.setUpBeforeClass();
094  }
095
096  public static class SlowSyncLogWriter extends ProtobufLogWriter {
097    @Override
098    public void sync(boolean forceSync) throws IOException {
099      try {
100        Thread.sleep(syncLatencyMillis);
101      } catch (InterruptedException e) {
102        InterruptedIOException ex = new InterruptedIOException();
103        ex.initCause(e);
104        throw ex;
105      }
106      super.sync(forceSync);
107    }
108  }
109
110  @Override
111  protected void setSlowLogWriter(Configuration conf) {
112    conf.set(FSHLogProvider.WRITER_IMPL, SlowSyncLogWriter.class.getName());
113  }
114
115  @Override
116  protected void setDefaultLogWriter(Configuration conf) {
117    conf.set(FSHLogProvider.WRITER_IMPL, ProtobufLogWriter.class.getName());
118  }
119
120  void batchWriteAndWait(Table table, final FSHLog log, int start, boolean expect, int timeout)
121    throws IOException {
122    for (int i = 0; i < 10; i++) {
123      Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", (start + i))));
124      put.addColumn(HConstants.CATALOG_FAMILY, null, value);
125      table.put(put);
126    }
127    Put tmpPut = new Put(Bytes.toBytes("tmprow"));
128    tmpPut.addColumn(HConstants.CATALOG_FAMILY, null, value);
129    long startTime = EnvironmentEdgeManager.currentTime();
130    long remaining = timeout;
131    while (remaining > 0) {
132      if (log.isLowReplicationRollEnabled() == expect) {
133        break;
134      } else {
135        // Trigger calling FSHlog#checkLowReplication()
136        table.put(tmpPut);
137        try {
138          Thread.sleep(200);
139        } catch (InterruptedException e) {
140          // continue
141        }
142        remaining = timeout - (EnvironmentEdgeManager.currentTime() - startTime);
143      }
144    }
145  }
146
147  @Test
148  public void testSlowSyncLogRolling() throws Exception {
149    // Create the test table
150    TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName()))
151      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build();
152    admin.createTable(desc);
153    try (Table table = TEST_UTIL.getConnection().getTable(desc.getTableName())) {
154      server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
155      RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo();
156      final AbstractFSWAL<?> log = getWALAndRegisterSlowSyncHook(region);
157
158      // Set default log writer, no additional latency to any sync on the hlog.
159      checkSlowSync(log, table, -1, 10, false);
160
161      // Adds 200 ms of latency to any sync on the hlog. This should be more than sufficient to
162      // trigger slow sync warnings.
163      // Write some data.
164      // We need to write at least 5 times, but double it. We should only request
165      // a SLOW_SYNC roll once in the current interval.
166      checkSlowSync(log, table, 200, 10, true);
167
168      // Adds 5000 ms of latency to any sync on the hlog. This will trip the other threshold.
169      // Write some data. Should only take one sync.
170      checkSlowSync(log, table, 5000, 1, true);
171
172      // Set default log writer, no additional latency to any sync on the hlog.
173      checkSlowSync(log, table, -1, 10, false);
174    }
175  }
176
177  /**
178   * Tests that logs are rolled upon detecting datanode death Requires an HDFS jar with HDFS-826 &
179   * syncFs() support (HDFS-200)
180   */
181  @Test
182  public void testLogRollOnDatanodeDeath() throws Exception {
183
184    Long oldValue = TEST_UTIL.getConfiguration()
185      .getLong("hbase.regionserver.hlog.check.lowreplication.interval", -1);
186
187    try {
188      /**
189       * When we reuse the code of AsyncFSWAL to FSHLog, the low replication is only checked by
190       * {@link LogRoller#checkLowReplication},so in order to make this test spend less time,we
191       * should minimize following config which is maximized by
192       * {@link AbstractTestLogRolling#setUpBeforeClass}
193       */
194      TEST_UTIL.getConfiguration().setLong("hbase.regionserver.hlog.check.lowreplication.interval",
195        1000);
196      this.tearDown();
197      this.setUp();
198
199      TEST_UTIL.ensureSomeRegionServersAvailable(2);
200      assertTrue(fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) == 2,
201        "This test requires WAL file replication set to 2.");
202      LOG.info("Replication=" + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()));
203
204      this.server = cluster.getRegionServer(0);
205
206      // Create the test table and open it
207      TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName()))
208        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build();
209
210      admin.createTable(desc);
211      Table table = TEST_UTIL.getConnection().getTable(desc.getTableName());
212
213      server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
214      RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo();
215      final FSHLog log = (FSHLog) server.getWAL(region);
216      final AtomicBoolean lowReplicationHookCalled = new AtomicBoolean(false);
217
218      log.registerWALActionsListener(new WALActionsListener() {
219        @Override
220        public void logRollRequested(WALActionsListener.RollRequestReason reason) {
221          switch (reason) {
222            case LOW_REPLICATION:
223              lowReplicationHookCalled.lazySet(true);
224              break;
225            default:
226              break;
227          }
228        }
229      });
230
231      // add up the datanode count, to ensure proper replication when we kill 1
232      // This function is synchronous; when it returns, the dfs cluster is active
233      // We start 3 servers and then stop 2 to avoid a directory naming conflict
234      // when we stop/start a namenode later, as mentioned in HBASE-5163
235      List<DataNode> existingNodes = dfsCluster.getDataNodes();
236      int numDataNodes = 3;
237      TEST_UTIL.getConfiguration().setLong("hbase.regionserver.hlog.check.lowreplication.interval",
238        1000);
239      dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), numDataNodes, true, null, null);
240      List<DataNode> allNodes = dfsCluster.getDataNodes();
241      for (int i = allNodes.size() - 1; i >= 0; i--) {
242        if (existingNodes.contains(allNodes.get(i))) {
243          dfsCluster.stopDataNode(i);
244        }
245      }
246
247      assertTrue(
248        dfsCluster.getDataNodes().size()
249            >= fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) + 1,
250        "DataNodes " + dfsCluster.getDataNodes().size() + " default replication "
251          + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()));
252
253      writeData(table, 2);
254
255      long curTime = EnvironmentEdgeManager.currentTime();
256      LOG.info("log.getCurrentFileName(): " + log.getCurrentFileName());
257      long oldFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log);
258      assertTrue(curTime > oldFilenum && oldFilenum != -1,
259        "Log should have a timestamp older than now");
260
261      assertTrue(oldFilenum == AbstractFSWALProvider.extractFileNumFromWAL(log),
262        "The log shouldn't have rolled yet");
263      final DatanodeInfo[] pipeline = log.getPipeline();
264      assertTrue(pipeline.length == fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()));
265
266      // kill a datanode in the pipeline to force a log roll on the next sync()
267      // This function is synchronous, when it returns the node is killed.
268      assertTrue(dfsCluster.stopDataNode(pipeline[0].getName()) != null);
269
270      // this write should succeed, but trigger a log roll
271      writeData(table, 2);
272
273      TEST_UTIL.waitFor(10000, 100, () -> {
274        long newFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log);
275        return newFilenum > oldFilenum && newFilenum > curTime && lowReplicationHookCalled.get();
276      });
277
278      long newFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log);
279
280      // write some more log data (this should use a new hdfs_out)
281      writeData(table, 3);
282      assertTrue(AbstractFSWALProvider.extractFileNumFromWAL(log) == newFilenum,
283        "The log should not roll again.");
284      // kill another datanode in the pipeline, so the replicas will be lower than
285      // the configured value 2.
286      assertTrue(dfsCluster.stopDataNode(pipeline[1].getName()) != null);
287
288      batchWriteAndWait(table, log, 3, false, 14000);
289      int replication = log.getLogReplication();
290      assertTrue(!log.isLowReplicationRollEnabled(),
291        "LowReplication Roller should've been disabled, current replication=" + replication);
292
293      dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), 1, true, null, null);
294
295      // Force roll writer. The new log file will have the default replications,
296      // and the LowReplication Roller will be enabled.
297      log.rollWriter(true);
298      batchWriteAndWait(table, log, 13, true, 10000);
299      replication = log.getLogReplication();
300      assertTrue(replication == fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()),
301        "New log file should have the default replication instead of " + replication);
302      assertTrue(log.isLowReplicationRollEnabled(), "LowReplication Roller should've been enabled");
303    } finally {
304      TEST_UTIL.getConfiguration().setLong("hbase.regionserver.hlog.check.lowreplication.interval",
305        oldValue);
306    }
307  }
308
309  /**
310   * Test that WAL is rolled when all data nodes in the pipeline have been restarted.
311   */
312  @Test
313  public void testLogRollOnPipelineRestart() throws Exception {
314    LOG.info("Starting testLogRollOnPipelineRestart");
315    assertTrue(fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) > 1,
316      "This test requires WAL file replication.");
317    LOG.info("Replication=" + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()));
318    // When the hbase:meta table can be opened, the region servers are running
319    Table t = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME);
320    try {
321      this.server = cluster.getRegionServer(0);
322
323      // Create the test table and open it
324      TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName()))
325        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build();
326
327      admin.createTable(desc);
328      Table table = TEST_UTIL.getConnection().getTable(desc.getTableName());
329
330      server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
331      RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo();
332      final WAL log = server.getWAL(region);
333      final List<Path> paths = new ArrayList<>(1);
334      final List<Integer> preLogRolledCalled = new ArrayList<>();
335
336      paths.add(AbstractFSWALProvider.getCurrentFileName(log));
337      log.registerWALActionsListener(new WALActionsListener() {
338
339        @Override
340        public void preLogRoll(Path oldFile, Path newFile) {
341          LOG.debug("preLogRoll: oldFile=" + oldFile + " newFile=" + newFile);
342          preLogRolledCalled.add(1);
343        }
344
345        @Override
346        public void postLogRoll(Path oldFile, Path newFile) {
347          paths.add(newFile);
348        }
349      });
350
351      writeData(table, 1002);
352
353      long curTime = EnvironmentEdgeManager.currentTime();
354      LOG.info("log.getCurrentFileName()): " + AbstractFSWALProvider.getCurrentFileName(log));
355      long oldFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log);
356      assertTrue(curTime > oldFilenum && oldFilenum != -1,
357        "Log should have a timestamp older than now");
358
359      assertTrue(oldFilenum == AbstractFSWALProvider.extractFileNumFromWAL(log),
360        "The log shouldn't have rolled yet");
361
362      // roll all datanodes in the pipeline
363      dfsCluster.restartDataNodes();
364      Thread.sleep(1000);
365      dfsCluster.waitActive();
366      LOG.info("Data Nodes restarted");
367      validateData(table, 1002);
368
369      // this write should succeed, but trigger a log roll
370      writeData(table, 1003);
371      long newFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log);
372
373      assertTrue(newFilenum > oldFilenum && newFilenum > curTime,
374        "Missing datanode should've triggered a log roll");
375      validateData(table, 1003);
376
377      writeData(table, 1004);
378
379      // roll all datanode again
380      dfsCluster.restartDataNodes();
381      Thread.sleep(1000);
382      dfsCluster.waitActive();
383      LOG.info("Data Nodes restarted");
384      validateData(table, 1004);
385
386      // this write should succeed, but trigger a log roll
387      writeData(table, 1005);
388
389      // force a log roll to read back and verify previously written logs
390      log.rollWriter(true);
391      assertTrue(preLogRolledCalled.size() >= 1,
392        "preLogRolledCalled has size of " + preLogRolledCalled.size());
393
394      // read back the data written
395      Set<String> loggedRows = new HashSet<>();
396      for (Path p : paths) {
397        LOG.debug("recovering lease for " + p);
398        RecoverLeaseFSUtils.recoverFileLease(((HFileSystem) fs).getBackingFs(), p,
399          TEST_UTIL.getConfiguration(), null);
400
401        LOG.debug("Reading WAL " + CommonFSUtils.getPath(p));
402        try (WALStreamReader reader =
403          WALFactory.createStreamReader(fs, p, TEST_UTIL.getConfiguration())) {
404          WAL.Entry entry;
405          while ((entry = reader.next()) != null) {
406            LOG.debug("#" + entry.getKey().getSequenceId() + ": " + entry.getEdit().getCells());
407            for (Cell cell : entry.getEdit().getCells()) {
408              loggedRows.add(
409                Bytes.toStringBinary(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
410            }
411          }
412        } catch (EOFException e) {
413          LOG.debug("EOF reading file " + CommonFSUtils.getPath(p));
414        }
415      }
416
417      // verify the written rows are there
418      assertTrue(loggedRows.contains("row1002"));
419      assertTrue(loggedRows.contains("row1003"));
420      assertTrue(loggedRows.contains("row1004"));
421      assertTrue(loggedRows.contains("row1005"));
422
423      // flush all regions
424      for (HRegion r : server.getOnlineRegionsLocalContext()) {
425        try {
426          r.flush(true);
427        } catch (Exception e) {
428          // This try/catch was added by HBASE-14317. It is needed
429          // because this issue tightened up the semantic such that
430          // a failed append could not be followed by a successful
431          // sync. What is coming out here is a failed sync, a sync
432          // that used to 'pass'.
433          LOG.info(e.toString(), e);
434        }
435      }
436
437      ResultScanner scanner = table.getScanner(new Scan());
438      try {
439        for (int i = 2; i <= 5; i++) {
440          Result r = scanner.next();
441          assertNotNull(r);
442          assertFalse(r.isEmpty());
443          assertEquals("row100" + i, Bytes.toString(r.getRow()));
444        }
445      } finally {
446        scanner.close();
447      }
448
449      // verify that no region servers aborted
450      for (JVMClusterUtil.RegionServerThread rsThread : TEST_UTIL.getHBaseCluster()
451        .getRegionServerThreads()) {
452        assertFalse(rsThread.getRegionServer().isAborted());
453      }
454    } finally {
455      if (t != null) t.close();
456    }
457  }
458
459}