001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertTrue;
024
025import java.io.EOFException;
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.HashSet;
029import java.util.List;
030import java.util.Set;
031import java.util.concurrent.atomic.AtomicBoolean;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
039import org.apache.hadoop.hbase.client.Put;
040import org.apache.hadoop.hbase.client.RegionInfo;
041import org.apache.hadoop.hbase.client.Result;
042import org.apache.hadoop.hbase.client.ResultScanner;
043import org.apache.hadoop.hbase.client.Scan;
044import org.apache.hadoop.hbase.client.Table;
045import org.apache.hadoop.hbase.client.TableDescriptor;
046import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
047import org.apache.hadoop.hbase.fs.HFileSystem;
048import org.apache.hadoop.hbase.regionserver.HRegion;
049import org.apache.hadoop.hbase.testclassification.LargeTests;
050import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.apache.hadoop.hbase.util.FSUtils;
053import org.apache.hadoop.hbase.util.JVMClusterUtil;
054import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
055import org.apache.hadoop.hbase.wal.WAL;
056import org.apache.hadoop.hbase.wal.WALFactory;
057import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
058import org.apache.hadoop.hdfs.server.datanode.DataNode;
059import org.junit.BeforeClass;
060import org.junit.ClassRule;
061import org.junit.Test;
062import org.junit.experimental.categories.Category;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066@Category({ VerySlowRegionServerTests.class, LargeTests.class })
067public class TestLogRolling extends AbstractTestLogRolling {
068
069  @ClassRule
070  public static final HBaseClassTestRule CLASS_RULE =
071      HBaseClassTestRule.forClass(TestLogRolling.class);
072
073  private static final Logger LOG = LoggerFactory.getLogger(TestLogRolling.class);
074
075  @BeforeClass
076  public static void setUpBeforeClass() throws Exception {
077    // TODO: testLogRollOnDatanodeDeath fails if short circuit reads are on under the hadoop2
078    // profile. See HBASE-9337 for related issues.
079    System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
080
081    /**** configuration for testLogRollOnDatanodeDeath ****/
082    // lower the namenode & datanode heartbeat so the namenode
083    // quickly detects datanode failures
084    Configuration conf= TEST_UTIL.getConfiguration();
085    conf.setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
086    conf.setInt("dfs.heartbeat.interval", 1);
087    // the namenode might still try to choose the recently-dead datanode
088    // for a pipeline, so try to a new pipeline multiple times
089    conf.setInt("dfs.client.block.write.retries", 30);
090    conf.setInt("hbase.regionserver.hlog.tolerable.lowreplication", 2);
091    conf.setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3);
092    conf.set(WALFactory.WAL_PROVIDER, "filesystem");
093    AbstractTestLogRolling.setUpBeforeClass();
094  }
095
096  void batchWriteAndWait(Table table, final FSHLog log, int start, boolean expect, int timeout)
097      throws IOException {
098    for (int i = 0; i < 10; i++) {
099      Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", (start + i))));
100      put.addColumn(HConstants.CATALOG_FAMILY, null, value);
101      table.put(put);
102    }
103    Put tmpPut = new Put(Bytes.toBytes("tmprow"));
104    tmpPut.addColumn(HConstants.CATALOG_FAMILY, null, value);
105    long startTime = System.currentTimeMillis();
106    long remaining = timeout;
107    while (remaining > 0) {
108      if (log.isLowReplicationRollEnabled() == expect) {
109        break;
110      } else {
111        // Trigger calling FSHlog#checkLowReplication()
112        table.put(tmpPut);
113        try {
114          Thread.sleep(200);
115        } catch (InterruptedException e) {
116          // continue
117        }
118        remaining = timeout - (System.currentTimeMillis() - startTime);
119      }
120    }
121  }
122
123  /**
124   * Tests that logs are rolled upon detecting datanode death Requires an HDFS jar with HDFS-826 &
125   * syncFs() support (HDFS-200)
126   */
127  @Test
128  public void testLogRollOnDatanodeDeath() throws Exception {
129    TEST_UTIL.ensureSomeRegionServersAvailable(2);
130    assertTrue("This test requires WAL file replication set to 2.",
131      fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) == 2);
132    LOG.info("Replication=" + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()));
133
134    this.server = cluster.getRegionServer(0);
135
136    // Create the test table and open it
137    TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName()))
138        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build();
139
140    admin.createTable(desc);
141    Table table = TEST_UTIL.getConnection().getTable(desc.getTableName());
142
143    server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
144    RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo();
145    final FSHLog log = (FSHLog) server.getWAL(region);
146    final AtomicBoolean lowReplicationHookCalled = new AtomicBoolean(false);
147
148    log.registerWALActionsListener(new WALActionsListener() {
149      @Override
150      public void logRollRequested(boolean lowReplication) {
151        if (lowReplication) {
152          lowReplicationHookCalled.lazySet(true);
153        }
154      }
155    });
156
157    // add up the datanode count, to ensure proper replication when we kill 1
158    // This function is synchronous; when it returns, the dfs cluster is active
159    // We start 3 servers and then stop 2 to avoid a directory naming conflict
160    // when we stop/start a namenode later, as mentioned in HBASE-5163
161    List<DataNode> existingNodes = dfsCluster.getDataNodes();
162    int numDataNodes = 3;
163    dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), numDataNodes, true, null, null);
164    List<DataNode> allNodes = dfsCluster.getDataNodes();
165    for (int i = allNodes.size() - 1; i >= 0; i--) {
166      if (existingNodes.contains(allNodes.get(i))) {
167        dfsCluster.stopDataNode(i);
168      }
169    }
170
171    assertTrue(
172      "DataNodes " + dfsCluster.getDataNodes().size() + " default replication "
173          + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()),
174      dfsCluster.getDataNodes()
175          .size() >= fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) + 1);
176
177    writeData(table, 2);
178
179    long curTime = System.currentTimeMillis();
180    LOG.info("log.getCurrentFileName(): " + log.getCurrentFileName());
181    long oldFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log);
182    assertTrue("Log should have a timestamp older than now",
183      curTime > oldFilenum && oldFilenum != -1);
184
185    assertTrue("The log shouldn't have rolled yet",
186      oldFilenum == AbstractFSWALProvider.extractFileNumFromWAL(log));
187    final DatanodeInfo[] pipeline = log.getPipeline();
188    assertTrue(pipeline.length == fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()));
189
190    // kill a datanode in the pipeline to force a log roll on the next sync()
191    // This function is synchronous, when it returns the node is killed.
192    assertTrue(dfsCluster.stopDataNode(pipeline[0].getName()) != null);
193
194    // this write should succeed, but trigger a log roll
195    writeData(table, 2);
196    long newFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log);
197
198    assertTrue("Missing datanode should've triggered a log roll",
199      newFilenum > oldFilenum && newFilenum > curTime);
200
201    assertTrue("The log rolling hook should have been called with the low replication flag",
202      lowReplicationHookCalled.get());
203
204    // write some more log data (this should use a new hdfs_out)
205    writeData(table, 3);
206    assertTrue("The log should not roll again.",
207      AbstractFSWALProvider.extractFileNumFromWAL(log) == newFilenum);
208    // kill another datanode in the pipeline, so the replicas will be lower than
209    // the configured value 2.
210    assertTrue(dfsCluster.stopDataNode(pipeline[1].getName()) != null);
211
212    batchWriteAndWait(table, log, 3, false, 14000);
213    int replication = log.getLogReplication();
214    assertTrue("LowReplication Roller should've been disabled, current replication=" + replication,
215      !log.isLowReplicationRollEnabled());
216
217    dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), 1, true, null, null);
218
219    // Force roll writer. The new log file will have the default replications,
220    // and the LowReplication Roller will be enabled.
221    log.rollWriter(true);
222    batchWriteAndWait(table, log, 13, true, 10000);
223    replication = log.getLogReplication();
224    assertTrue("New log file should have the default replication instead of " + replication,
225      replication == fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()));
226    assertTrue("LowReplication Roller should've been enabled", log.isLowReplicationRollEnabled());
227  }
228
229  /**
230   * Test that WAL is rolled when all data nodes in the pipeline have been restarted.
231   * @throws Exception
232   */
233  @Test
234  public void testLogRollOnPipelineRestart() throws Exception {
235    LOG.info("Starting testLogRollOnPipelineRestart");
236    assertTrue("This test requires WAL file replication.",
237      fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) > 1);
238    LOG.info("Replication=" + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()));
239    // When the hbase:meta table can be opened, the region servers are running
240    Table t = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME);
241    try {
242      this.server = cluster.getRegionServer(0);
243
244      // Create the test table and open it
245      TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName()))
246          .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build();
247
248      admin.createTable(desc);
249      Table table = TEST_UTIL.getConnection().getTable(desc.getTableName());
250
251      server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
252      RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo();
253      final WAL log = server.getWAL(region);
254      final List<Path> paths = new ArrayList<>(1);
255      final List<Integer> preLogRolledCalled = new ArrayList<>();
256
257      paths.add(AbstractFSWALProvider.getCurrentFileName(log));
258      log.registerWALActionsListener(new WALActionsListener() {
259
260        @Override
261        public void preLogRoll(Path oldFile, Path newFile) {
262          LOG.debug("preLogRoll: oldFile=" + oldFile + " newFile=" + newFile);
263          preLogRolledCalled.add(new Integer(1));
264        }
265
266        @Override
267        public void postLogRoll(Path oldFile, Path newFile) {
268          paths.add(newFile);
269        }
270      });
271
272      writeData(table, 1002);
273
274      long curTime = System.currentTimeMillis();
275      LOG.info("log.getCurrentFileName()): " + AbstractFSWALProvider.getCurrentFileName(log));
276      long oldFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log);
277      assertTrue("Log should have a timestamp older than now",
278        curTime > oldFilenum && oldFilenum != -1);
279
280      assertTrue("The log shouldn't have rolled yet",
281        oldFilenum == AbstractFSWALProvider.extractFileNumFromWAL(log));
282
283      // roll all datanodes in the pipeline
284      dfsCluster.restartDataNodes();
285      Thread.sleep(1000);
286      dfsCluster.waitActive();
287      LOG.info("Data Nodes restarted");
288      validateData(table, 1002);
289
290      // this write should succeed, but trigger a log roll
291      writeData(table, 1003);
292      long newFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log);
293
294      assertTrue("Missing datanode should've triggered a log roll",
295        newFilenum > oldFilenum && newFilenum > curTime);
296      validateData(table, 1003);
297
298      writeData(table, 1004);
299
300      // roll all datanode again
301      dfsCluster.restartDataNodes();
302      Thread.sleep(1000);
303      dfsCluster.waitActive();
304      LOG.info("Data Nodes restarted");
305      validateData(table, 1004);
306
307      // this write should succeed, but trigger a log roll
308      writeData(table, 1005);
309
310      // force a log roll to read back and verify previously written logs
311      log.rollWriter(true);
312      assertTrue("preLogRolledCalled has size of " + preLogRolledCalled.size(),
313        preLogRolledCalled.size() >= 1);
314
315      // read back the data written
316      Set<String> loggedRows = new HashSet<>();
317      FSUtils fsUtils = FSUtils.getInstance(fs, TEST_UTIL.getConfiguration());
318      for (Path p : paths) {
319        LOG.debug("recovering lease for " + p);
320        fsUtils.recoverFileLease(((HFileSystem) fs).getBackingFs(), p, TEST_UTIL.getConfiguration(),
321          null);
322
323        LOG.debug("Reading WAL " + FSUtils.getPath(p));
324        WAL.Reader reader = null;
325        try {
326          reader = WALFactory.createReader(fs, p, TEST_UTIL.getConfiguration());
327          WAL.Entry entry;
328          while ((entry = reader.next()) != null) {
329            LOG.debug("#" + entry.getKey().getSequenceId() + ": " + entry.getEdit().getCells());
330            for (Cell cell : entry.getEdit().getCells()) {
331              loggedRows.add(
332                Bytes.toStringBinary(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
333            }
334          }
335        } catch (EOFException e) {
336          LOG.debug("EOF reading file " + FSUtils.getPath(p));
337        } finally {
338          if (reader != null) reader.close();
339        }
340      }
341
342      // verify the written rows are there
343      assertTrue(loggedRows.contains("row1002"));
344      assertTrue(loggedRows.contains("row1003"));
345      assertTrue(loggedRows.contains("row1004"));
346      assertTrue(loggedRows.contains("row1005"));
347
348      // flush all regions
349      for (HRegion r : server.getOnlineRegionsLocalContext()) {
350        try {
351          r.flush(true);
352        } catch (Exception e) {
353          // This try/catch was added by HBASE-14317. It is needed
354          // because this issue tightened up the semantic such that
355          // a failed append could not be followed by a successful
356          // sync. What is coming out here is a failed sync, a sync
357          // that used to 'pass'.
358          LOG.info(e.toString(), e);
359        }
360      }
361
362      ResultScanner scanner = table.getScanner(new Scan());
363      try {
364        for (int i = 2; i <= 5; i++) {
365          Result r = scanner.next();
366          assertNotNull(r);
367          assertFalse(r.isEmpty());
368          assertEquals("row100" + i, Bytes.toString(r.getRow()));
369        }
370      } finally {
371        scanner.close();
372      }
373
374      // verify that no region servers aborted
375      for (JVMClusterUtil.RegionServerThread rsThread : TEST_UTIL.getHBaseCluster()
376          .getRegionServerThreads()) {
377        assertFalse(rsThread.getRegionServer().isAborted());
378      }
379    } finally {
380      if (t != null) t.close();
381    }
382  }
383
384}