001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertTrue; 024 025import java.io.EOFException; 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.HashSet; 029import java.util.List; 030import java.util.Set; 031import java.util.concurrent.atomic.AtomicBoolean; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 039import org.apache.hadoop.hbase.client.Put; 040import org.apache.hadoop.hbase.client.RegionInfo; 041import org.apache.hadoop.hbase.client.Result; 042import org.apache.hadoop.hbase.client.ResultScanner; 043import org.apache.hadoop.hbase.client.Scan; 044import org.apache.hadoop.hbase.client.Table; 045import org.apache.hadoop.hbase.client.TableDescriptor; 046import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 047import org.apache.hadoop.hbase.fs.HFileSystem; 048import org.apache.hadoop.hbase.regionserver.HRegion; 049import org.apache.hadoop.hbase.testclassification.LargeTests; 050import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.apache.hadoop.hbase.util.FSUtils; 053import org.apache.hadoop.hbase.util.JVMClusterUtil; 054import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 055import org.apache.hadoop.hbase.wal.WAL; 056import org.apache.hadoop.hbase.wal.WALFactory; 057import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 058import org.apache.hadoop.hdfs.server.datanode.DataNode; 059import org.junit.BeforeClass; 060import org.junit.ClassRule; 061import org.junit.Test; 062import org.junit.experimental.categories.Category; 063import org.slf4j.Logger; 064import org.slf4j.LoggerFactory; 065 066@Category({ VerySlowRegionServerTests.class, LargeTests.class }) 067public class TestLogRolling extends AbstractTestLogRolling { 068 069 @ClassRule 070 public static final HBaseClassTestRule CLASS_RULE = 071 HBaseClassTestRule.forClass(TestLogRolling.class); 072 073 private static final Logger LOG = LoggerFactory.getLogger(TestLogRolling.class); 074 075 @BeforeClass 076 public static void setUpBeforeClass() throws Exception { 077 // TODO: testLogRollOnDatanodeDeath fails if short circuit reads are on under the hadoop2 078 // profile. See HBASE-9337 for related issues. 079 System.setProperty("hbase.tests.use.shortcircuit.reads", "false"); 080 081 /**** configuration for testLogRollOnDatanodeDeath ****/ 082 // lower the namenode & datanode heartbeat so the namenode 083 // quickly detects datanode failures 084 Configuration conf= TEST_UTIL.getConfiguration(); 085 conf.setInt("dfs.namenode.heartbeat.recheck-interval", 5000); 086 conf.setInt("dfs.heartbeat.interval", 1); 087 // the namenode might still try to choose the recently-dead datanode 088 // for a pipeline, so try to a new pipeline multiple times 089 conf.setInt("dfs.client.block.write.retries", 30); 090 conf.setInt("hbase.regionserver.hlog.tolerable.lowreplication", 2); 091 conf.setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3); 092 conf.set(WALFactory.WAL_PROVIDER, "filesystem"); 093 AbstractTestLogRolling.setUpBeforeClass(); 094 } 095 096 void batchWriteAndWait(Table table, final FSHLog log, int start, boolean expect, int timeout) 097 throws IOException { 098 for (int i = 0; i < 10; i++) { 099 Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", (start + i)))); 100 put.addColumn(HConstants.CATALOG_FAMILY, null, value); 101 table.put(put); 102 } 103 Put tmpPut = new Put(Bytes.toBytes("tmprow")); 104 tmpPut.addColumn(HConstants.CATALOG_FAMILY, null, value); 105 long startTime = System.currentTimeMillis(); 106 long remaining = timeout; 107 while (remaining > 0) { 108 if (log.isLowReplicationRollEnabled() == expect) { 109 break; 110 } else { 111 // Trigger calling FSHlog#checkLowReplication() 112 table.put(tmpPut); 113 try { 114 Thread.sleep(200); 115 } catch (InterruptedException e) { 116 // continue 117 } 118 remaining = timeout - (System.currentTimeMillis() - startTime); 119 } 120 } 121 } 122 123 /** 124 * Tests that logs are rolled upon detecting datanode death Requires an HDFS jar with HDFS-826 & 125 * syncFs() support (HDFS-200) 126 */ 127 @Test 128 public void testLogRollOnDatanodeDeath() throws Exception { 129 TEST_UTIL.ensureSomeRegionServersAvailable(2); 130 assertTrue("This test requires WAL file replication set to 2.", 131 fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) == 2); 132 LOG.info("Replication=" + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); 133 134 this.server = cluster.getRegionServer(0); 135 136 // Create the test table and open it 137 TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) 138 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); 139 140 admin.createTable(desc); 141 Table table = TEST_UTIL.getConnection().getTable(desc.getTableName()); 142 143 server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); 144 RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); 145 final FSHLog log = (FSHLog) server.getWAL(region); 146 final AtomicBoolean lowReplicationHookCalled = new AtomicBoolean(false); 147 148 log.registerWALActionsListener(new WALActionsListener() { 149 @Override 150 public void logRollRequested(boolean lowReplication) { 151 if (lowReplication) { 152 lowReplicationHookCalled.lazySet(true); 153 } 154 } 155 }); 156 157 // add up the datanode count, to ensure proper replication when we kill 1 158 // This function is synchronous; when it returns, the dfs cluster is active 159 // We start 3 servers and then stop 2 to avoid a directory naming conflict 160 // when we stop/start a namenode later, as mentioned in HBASE-5163 161 List<DataNode> existingNodes = dfsCluster.getDataNodes(); 162 int numDataNodes = 3; 163 dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), numDataNodes, true, null, null); 164 List<DataNode> allNodes = dfsCluster.getDataNodes(); 165 for (int i = allNodes.size() - 1; i >= 0; i--) { 166 if (existingNodes.contains(allNodes.get(i))) { 167 dfsCluster.stopDataNode(i); 168 } 169 } 170 171 assertTrue( 172 "DataNodes " + dfsCluster.getDataNodes().size() + " default replication " 173 + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()), 174 dfsCluster.getDataNodes() 175 .size() >= fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) + 1); 176 177 writeData(table, 2); 178 179 long curTime = System.currentTimeMillis(); 180 LOG.info("log.getCurrentFileName(): " + log.getCurrentFileName()); 181 long oldFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log); 182 assertTrue("Log should have a timestamp older than now", 183 curTime > oldFilenum && oldFilenum != -1); 184 185 assertTrue("The log shouldn't have rolled yet", 186 oldFilenum == AbstractFSWALProvider.extractFileNumFromWAL(log)); 187 final DatanodeInfo[] pipeline = log.getPipeline(); 188 assertTrue(pipeline.length == fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); 189 190 // kill a datanode in the pipeline to force a log roll on the next sync() 191 // This function is synchronous, when it returns the node is killed. 192 assertTrue(dfsCluster.stopDataNode(pipeline[0].getName()) != null); 193 194 // this write should succeed, but trigger a log roll 195 writeData(table, 2); 196 long newFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log); 197 198 assertTrue("Missing datanode should've triggered a log roll", 199 newFilenum > oldFilenum && newFilenum > curTime); 200 201 assertTrue("The log rolling hook should have been called with the low replication flag", 202 lowReplicationHookCalled.get()); 203 204 // write some more log data (this should use a new hdfs_out) 205 writeData(table, 3); 206 assertTrue("The log should not roll again.", 207 AbstractFSWALProvider.extractFileNumFromWAL(log) == newFilenum); 208 // kill another datanode in the pipeline, so the replicas will be lower than 209 // the configured value 2. 210 assertTrue(dfsCluster.stopDataNode(pipeline[1].getName()) != null); 211 212 batchWriteAndWait(table, log, 3, false, 14000); 213 int replication = log.getLogReplication(); 214 assertTrue("LowReplication Roller should've been disabled, current replication=" + replication, 215 !log.isLowReplicationRollEnabled()); 216 217 dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), 1, true, null, null); 218 219 // Force roll writer. The new log file will have the default replications, 220 // and the LowReplication Roller will be enabled. 221 log.rollWriter(true); 222 batchWriteAndWait(table, log, 13, true, 10000); 223 replication = log.getLogReplication(); 224 assertTrue("New log file should have the default replication instead of " + replication, 225 replication == fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); 226 assertTrue("LowReplication Roller should've been enabled", log.isLowReplicationRollEnabled()); 227 } 228 229 /** 230 * Test that WAL is rolled when all data nodes in the pipeline have been restarted. 231 * @throws Exception 232 */ 233 @Test 234 public void testLogRollOnPipelineRestart() throws Exception { 235 LOG.info("Starting testLogRollOnPipelineRestart"); 236 assertTrue("This test requires WAL file replication.", 237 fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) > 1); 238 LOG.info("Replication=" + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); 239 // When the hbase:meta table can be opened, the region servers are running 240 Table t = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME); 241 try { 242 this.server = cluster.getRegionServer(0); 243 244 // Create the test table and open it 245 TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) 246 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); 247 248 admin.createTable(desc); 249 Table table = TEST_UTIL.getConnection().getTable(desc.getTableName()); 250 251 server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); 252 RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); 253 final WAL log = server.getWAL(region); 254 final List<Path> paths = new ArrayList<>(1); 255 final List<Integer> preLogRolledCalled = new ArrayList<>(); 256 257 paths.add(AbstractFSWALProvider.getCurrentFileName(log)); 258 log.registerWALActionsListener(new WALActionsListener() { 259 260 @Override 261 public void preLogRoll(Path oldFile, Path newFile) { 262 LOG.debug("preLogRoll: oldFile=" + oldFile + " newFile=" + newFile); 263 preLogRolledCalled.add(new Integer(1)); 264 } 265 266 @Override 267 public void postLogRoll(Path oldFile, Path newFile) { 268 paths.add(newFile); 269 } 270 }); 271 272 writeData(table, 1002); 273 274 long curTime = System.currentTimeMillis(); 275 LOG.info("log.getCurrentFileName()): " + AbstractFSWALProvider.getCurrentFileName(log)); 276 long oldFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log); 277 assertTrue("Log should have a timestamp older than now", 278 curTime > oldFilenum && oldFilenum != -1); 279 280 assertTrue("The log shouldn't have rolled yet", 281 oldFilenum == AbstractFSWALProvider.extractFileNumFromWAL(log)); 282 283 // roll all datanodes in the pipeline 284 dfsCluster.restartDataNodes(); 285 Thread.sleep(1000); 286 dfsCluster.waitActive(); 287 LOG.info("Data Nodes restarted"); 288 validateData(table, 1002); 289 290 // this write should succeed, but trigger a log roll 291 writeData(table, 1003); 292 long newFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log); 293 294 assertTrue("Missing datanode should've triggered a log roll", 295 newFilenum > oldFilenum && newFilenum > curTime); 296 validateData(table, 1003); 297 298 writeData(table, 1004); 299 300 // roll all datanode again 301 dfsCluster.restartDataNodes(); 302 Thread.sleep(1000); 303 dfsCluster.waitActive(); 304 LOG.info("Data Nodes restarted"); 305 validateData(table, 1004); 306 307 // this write should succeed, but trigger a log roll 308 writeData(table, 1005); 309 310 // force a log roll to read back and verify previously written logs 311 log.rollWriter(true); 312 assertTrue("preLogRolledCalled has size of " + preLogRolledCalled.size(), 313 preLogRolledCalled.size() >= 1); 314 315 // read back the data written 316 Set<String> loggedRows = new HashSet<>(); 317 FSUtils fsUtils = FSUtils.getInstance(fs, TEST_UTIL.getConfiguration()); 318 for (Path p : paths) { 319 LOG.debug("recovering lease for " + p); 320 fsUtils.recoverFileLease(((HFileSystem) fs).getBackingFs(), p, TEST_UTIL.getConfiguration(), 321 null); 322 323 LOG.debug("Reading WAL " + FSUtils.getPath(p)); 324 WAL.Reader reader = null; 325 try { 326 reader = WALFactory.createReader(fs, p, TEST_UTIL.getConfiguration()); 327 WAL.Entry entry; 328 while ((entry = reader.next()) != null) { 329 LOG.debug("#" + entry.getKey().getSequenceId() + ": " + entry.getEdit().getCells()); 330 for (Cell cell : entry.getEdit().getCells()) { 331 loggedRows.add( 332 Bytes.toStringBinary(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); 333 } 334 } 335 } catch (EOFException e) { 336 LOG.debug("EOF reading file " + FSUtils.getPath(p)); 337 } finally { 338 if (reader != null) reader.close(); 339 } 340 } 341 342 // verify the written rows are there 343 assertTrue(loggedRows.contains("row1002")); 344 assertTrue(loggedRows.contains("row1003")); 345 assertTrue(loggedRows.contains("row1004")); 346 assertTrue(loggedRows.contains("row1005")); 347 348 // flush all regions 349 for (HRegion r : server.getOnlineRegionsLocalContext()) { 350 try { 351 r.flush(true); 352 } catch (Exception e) { 353 // This try/catch was added by HBASE-14317. It is needed 354 // because this issue tightened up the semantic such that 355 // a failed append could not be followed by a successful 356 // sync. What is coming out here is a failed sync, a sync 357 // that used to 'pass'. 358 LOG.info(e.toString(), e); 359 } 360 } 361 362 ResultScanner scanner = table.getScanner(new Scan()); 363 try { 364 for (int i = 2; i <= 5; i++) { 365 Result r = scanner.next(); 366 assertNotNull(r); 367 assertFalse(r.isEmpty()); 368 assertEquals("row100" + i, Bytes.toString(r.getRow())); 369 } 370 } finally { 371 scanner.close(); 372 } 373 374 // verify that no region servers aborted 375 for (JVMClusterUtil.RegionServerThread rsThread : TEST_UTIL.getHBaseCluster() 376 .getRegionServerThreads()) { 377 assertFalse(rsThread.getRegionServer().isAborted()); 378 } 379 } finally { 380 if (t != null) t.close(); 381 } 382 } 383 384}