001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertTrue; 024 025import java.io.EOFException; 026import java.io.IOException; 027import java.io.InterruptedIOException; 028import java.util.ArrayList; 029import java.util.HashSet; 030import java.util.List; 031import java.util.Set; 032import java.util.concurrent.atomic.AtomicBoolean; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.Waiter; 040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 041import org.apache.hadoop.hbase.client.Put; 042import org.apache.hadoop.hbase.client.RegionInfo; 043import org.apache.hadoop.hbase.client.Result; 044import org.apache.hadoop.hbase.client.ResultScanner; 045import org.apache.hadoop.hbase.client.Scan; 046import org.apache.hadoop.hbase.client.Table; 047import org.apache.hadoop.hbase.client.TableDescriptor; 048import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 049import org.apache.hadoop.hbase.fs.HFileSystem; 050import org.apache.hadoop.hbase.regionserver.HRegion; 051import org.apache.hadoop.hbase.testclassification.LargeTests; 052import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.hadoop.hbase.util.CommonFSUtils; 055import org.apache.hadoop.hbase.util.JVMClusterUtil; 056import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; 057import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 058import org.apache.hadoop.hbase.wal.WAL; 059import org.apache.hadoop.hbase.wal.WAL.Entry; 060import org.apache.hadoop.hbase.wal.WALFactory; 061import org.apache.hadoop.hbase.wal.WALProvider.Writer; 062import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 063import org.apache.hadoop.hdfs.server.datanode.DataNode; 064import org.junit.BeforeClass; 065import org.junit.ClassRule; 066import org.junit.Test; 067import org.junit.experimental.categories.Category; 068import org.slf4j.Logger; 069import org.slf4j.LoggerFactory; 070 071@Category({ VerySlowRegionServerTests.class, LargeTests.class }) 072public class TestLogRolling extends AbstractTestLogRolling { 073 074 @ClassRule 075 public static final HBaseClassTestRule CLASS_RULE = 076 HBaseClassTestRule.forClass(TestLogRolling.class); 077 078 private static final Logger LOG = LoggerFactory.getLogger(TestLogRolling.class); 079 080 @BeforeClass 081 public static void setUpBeforeClass() throws Exception { 082 // TODO: testLogRollOnDatanodeDeath fails if short circuit reads are on under the hadoop2 083 // profile. See HBASE-9337 for related issues. 084 System.setProperty("hbase.tests.use.shortcircuit.reads", "false"); 085 086 /**** configuration for testLogRollOnDatanodeDeath ****/ 087 // lower the namenode & datanode heartbeat so the namenode 088 // quickly detects datanode failures 089 Configuration conf= TEST_UTIL.getConfiguration(); 090 conf.setInt("dfs.namenode.heartbeat.recheck-interval", 5000); 091 conf.setInt("dfs.heartbeat.interval", 1); 092 // the namenode might still try to choose the recently-dead datanode 093 // for a pipeline, so try to a new pipeline multiple times 094 conf.setInt("dfs.client.block.write.retries", 30); 095 conf.setInt("hbase.regionserver.hlog.tolerable.lowreplication", 2); 096 conf.setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3); 097 conf.set(WALFactory.WAL_PROVIDER, "filesystem"); 098 AbstractTestLogRolling.setUpBeforeClass(); 099 100 // For slow sync threshold test: roll after 5 slow syncs in 10 seconds 101 TEST_UTIL.getConfiguration().setInt(FSHLog.SLOW_SYNC_ROLL_THRESHOLD, 5); 102 TEST_UTIL.getConfiguration().setInt(FSHLog.SLOW_SYNC_ROLL_INTERVAL_MS, 10 * 1000); 103 // For slow sync threshold test: roll once after a sync above this threshold 104 TEST_UTIL.getConfiguration().setInt(FSHLog.ROLL_ON_SYNC_TIME_MS, 5000); 105 } 106 107 @Test 108 public void testSlowSyncLogRolling() throws Exception { 109 // Create the test table 110 TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) 111 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); 112 admin.createTable(desc); 113 Table table = TEST_UTIL.getConnection().getTable(desc.getTableName()); 114 int row = 1; 115 try { 116 // Get a reference to the FSHLog 117 server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); 118 RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); 119 final FSHLog log = (FSHLog) server.getWAL(region); 120 121 // Register a WALActionsListener to observe if a SLOW_SYNC roll is requested 122 123 final AtomicBoolean slowSyncHookCalled = new AtomicBoolean(); 124 log.registerWALActionsListener(new WALActionsListener() { 125 @Override 126 public void logRollRequested(WALActionsListener.RollRequestReason reason) { 127 switch (reason) { 128 case SLOW_SYNC: 129 slowSyncHookCalled.lazySet(true); 130 break; 131 default: 132 break; 133 } 134 } 135 }); 136 137 // Write some data 138 139 for (int i = 0; i < 10; i++) { 140 writeData(table, row++); 141 } 142 143 assertFalse("Should not have triggered log roll due to SLOW_SYNC", 144 slowSyncHookCalled.get()); 145 146 // Set up for test 147 slowSyncHookCalled.set(false); 148 149 // Wrap the current writer with the anonymous class below that adds 200 ms of 150 // latency to any sync on the hlog. This should be more than sufficient to trigger 151 // slow sync warnings. 152 final Writer oldWriter1 = log.getWriter(); 153 final Writer newWriter1 = new Writer() { 154 @Override 155 public void close() throws IOException { 156 oldWriter1.close(); 157 } 158 @Override 159 public void sync(boolean forceSync) throws IOException { 160 try { 161 Thread.sleep(200); 162 } catch (InterruptedException e) { 163 InterruptedIOException ex = new InterruptedIOException(); 164 ex.initCause(e); 165 throw ex; 166 } 167 oldWriter1.sync(forceSync); 168 } 169 @Override 170 public void append(Entry entry) throws IOException { 171 oldWriter1.append(entry); 172 } 173 @Override 174 public long getLength() { 175 return oldWriter1.getLength(); 176 } 177 178 @Override 179 public long getSyncedLength() { 180 return oldWriter1.getSyncedLength(); 181 } 182 }; 183 log.setWriter(newWriter1); 184 185 // Write some data. 186 // We need to write at least 5 times, but double it. We should only request 187 // a SLOW_SYNC roll once in the current interval. 188 for (int i = 0; i < 10; i++) { 189 writeData(table, row++); 190 } 191 192 // Wait for our wait injecting writer to get rolled out, as needed. 193 194 TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() { 195 @Override 196 public boolean evaluate() throws Exception { 197 return log.getWriter() != newWriter1; 198 } 199 @Override 200 public String explainFailure() throws Exception { 201 return "Waited too long for our test writer to get rolled out"; 202 } 203 }); 204 205 assertTrue("Should have triggered log roll due to SLOW_SYNC", 206 slowSyncHookCalled.get()); 207 208 // Set up for test 209 slowSyncHookCalled.set(false); 210 211 // Wrap the current writer with the anonymous class below that adds 5000 ms of 212 // latency to any sync on the hlog. 213 // This will trip the other threshold. 214 final Writer oldWriter2 = (Writer)log.getWriter(); 215 final Writer newWriter2 = new Writer() { 216 @Override 217 public void close() throws IOException { 218 oldWriter2.close(); 219 } 220 @Override 221 public void sync(boolean forceSync) throws IOException { 222 try { 223 Thread.sleep(5000); 224 } catch (InterruptedException e) { 225 InterruptedIOException ex = new InterruptedIOException(); 226 ex.initCause(e); 227 throw ex; 228 } 229 oldWriter2.sync(forceSync); 230 } 231 @Override 232 public void append(Entry entry) throws IOException { 233 oldWriter2.append(entry); 234 } 235 @Override 236 public long getLength() { 237 return oldWriter2.getLength(); 238 } 239 240 @Override 241 public long getSyncedLength() { 242 return oldWriter2.getSyncedLength(); 243 } 244 }; 245 log.setWriter(newWriter2); 246 247 // Write some data. Should only take one sync. 248 249 writeData(table, row++); 250 251 // Wait for our wait injecting writer to get rolled out, as needed. 252 253 TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() { 254 @Override 255 public boolean evaluate() throws Exception { 256 return log.getWriter() != newWriter2; 257 } 258 @Override 259 public String explainFailure() throws Exception { 260 return "Waited too long for our test writer to get rolled out"; 261 } 262 }); 263 264 assertTrue("Should have triggered log roll due to SLOW_SYNC", 265 slowSyncHookCalled.get()); 266 267 // Set up for test 268 slowSyncHookCalled.set(false); 269 270 // Write some data 271 for (int i = 0; i < 10; i++) { 272 writeData(table, row++); 273 } 274 275 assertFalse("Should not have triggered log roll due to SLOW_SYNC", 276 slowSyncHookCalled.get()); 277 278 } finally { 279 table.close(); 280 } 281 } 282 283 void batchWriteAndWait(Table table, final FSHLog log, int start, boolean expect, int timeout) 284 throws IOException { 285 for (int i = 0; i < 10; i++) { 286 Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", (start + i)))); 287 put.addColumn(HConstants.CATALOG_FAMILY, null, value); 288 table.put(put); 289 } 290 Put tmpPut = new Put(Bytes.toBytes("tmprow")); 291 tmpPut.addColumn(HConstants.CATALOG_FAMILY, null, value); 292 long startTime = System.currentTimeMillis(); 293 long remaining = timeout; 294 while (remaining > 0) { 295 if (log.isLowReplicationRollEnabled() == expect) { 296 break; 297 } else { 298 // Trigger calling FSHlog#checkLowReplication() 299 table.put(tmpPut); 300 try { 301 Thread.sleep(200); 302 } catch (InterruptedException e) { 303 // continue 304 } 305 remaining = timeout - (System.currentTimeMillis() - startTime); 306 } 307 } 308 } 309 310 /** 311 * Tests that logs are rolled upon detecting datanode death Requires an HDFS jar with HDFS-826 & 312 * syncFs() support (HDFS-200) 313 */ 314 @Test 315 public void testLogRollOnDatanodeDeath() throws Exception { 316 TEST_UTIL.ensureSomeRegionServersAvailable(2); 317 assertTrue("This test requires WAL file replication set to 2.", 318 fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) == 2); 319 LOG.info("Replication=" + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); 320 321 this.server = cluster.getRegionServer(0); 322 323 // Create the test table and open it 324 TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) 325 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); 326 327 admin.createTable(desc); 328 Table table = TEST_UTIL.getConnection().getTable(desc.getTableName()); 329 330 server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); 331 RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); 332 final FSHLog log = (FSHLog) server.getWAL(region); 333 final AtomicBoolean lowReplicationHookCalled = new AtomicBoolean(false); 334 335 log.registerWALActionsListener(new WALActionsListener() { 336 @Override 337 public void logRollRequested(WALActionsListener.RollRequestReason reason) { 338 switch (reason) { 339 case LOW_REPLICATION: 340 lowReplicationHookCalled.lazySet(true); 341 break; 342 default: 343 break; 344 } 345 } 346 }); 347 348 // add up the datanode count, to ensure proper replication when we kill 1 349 // This function is synchronous; when it returns, the dfs cluster is active 350 // We start 3 servers and then stop 2 to avoid a directory naming conflict 351 // when we stop/start a namenode later, as mentioned in HBASE-5163 352 List<DataNode> existingNodes = dfsCluster.getDataNodes(); 353 int numDataNodes = 3; 354 dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), numDataNodes, true, null, null); 355 List<DataNode> allNodes = dfsCluster.getDataNodes(); 356 for (int i = allNodes.size() - 1; i >= 0; i--) { 357 if (existingNodes.contains(allNodes.get(i))) { 358 dfsCluster.stopDataNode(i); 359 } 360 } 361 362 assertTrue( 363 "DataNodes " + dfsCluster.getDataNodes().size() + " default replication " 364 + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()), 365 dfsCluster.getDataNodes() 366 .size() >= fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) + 1); 367 368 writeData(table, 2); 369 370 long curTime = System.currentTimeMillis(); 371 LOG.info("log.getCurrentFileName(): " + log.getCurrentFileName()); 372 long oldFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log); 373 assertTrue("Log should have a timestamp older than now", 374 curTime > oldFilenum && oldFilenum != -1); 375 376 assertTrue("The log shouldn't have rolled yet", 377 oldFilenum == AbstractFSWALProvider.extractFileNumFromWAL(log)); 378 final DatanodeInfo[] pipeline = log.getPipeline(); 379 assertTrue(pipeline.length == fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); 380 381 // kill a datanode in the pipeline to force a log roll on the next sync() 382 // This function is synchronous, when it returns the node is killed. 383 assertTrue(dfsCluster.stopDataNode(pipeline[0].getName()) != null); 384 385 // this write should succeed, but trigger a log roll 386 writeData(table, 2); 387 long newFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log); 388 389 assertTrue("Missing datanode should've triggered a log roll", 390 newFilenum > oldFilenum && newFilenum > curTime); 391 392 assertTrue("The log rolling hook should have been called with the low replication flag", 393 lowReplicationHookCalled.get()); 394 395 // write some more log data (this should use a new hdfs_out) 396 writeData(table, 3); 397 assertTrue("The log should not roll again.", 398 AbstractFSWALProvider.extractFileNumFromWAL(log) == newFilenum); 399 // kill another datanode in the pipeline, so the replicas will be lower than 400 // the configured value 2. 401 assertTrue(dfsCluster.stopDataNode(pipeline[1].getName()) != null); 402 403 batchWriteAndWait(table, log, 3, false, 14000); 404 int replication = log.getLogReplication(); 405 assertTrue("LowReplication Roller should've been disabled, current replication=" + replication, 406 !log.isLowReplicationRollEnabled()); 407 408 dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), 1, true, null, null); 409 410 // Force roll writer. The new log file will have the default replications, 411 // and the LowReplication Roller will be enabled. 412 log.rollWriter(true); 413 batchWriteAndWait(table, log, 13, true, 10000); 414 replication = log.getLogReplication(); 415 assertTrue("New log file should have the default replication instead of " + replication, 416 replication == fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); 417 assertTrue("LowReplication Roller should've been enabled", log.isLowReplicationRollEnabled()); 418 } 419 420 /** 421 * Test that WAL is rolled when all data nodes in the pipeline have been restarted. 422 * @throws Exception 423 */ 424 @Test 425 public void testLogRollOnPipelineRestart() throws Exception { 426 LOG.info("Starting testLogRollOnPipelineRestart"); 427 assertTrue("This test requires WAL file replication.", 428 fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) > 1); 429 LOG.info("Replication=" + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); 430 // When the hbase:meta table can be opened, the region servers are running 431 Table t = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME); 432 try { 433 this.server = cluster.getRegionServer(0); 434 435 // Create the test table and open it 436 TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) 437 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); 438 439 admin.createTable(desc); 440 Table table = TEST_UTIL.getConnection().getTable(desc.getTableName()); 441 442 server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); 443 RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); 444 final WAL log = server.getWAL(region); 445 final List<Path> paths = new ArrayList<>(1); 446 final List<Integer> preLogRolledCalled = new ArrayList<>(); 447 448 paths.add(AbstractFSWALProvider.getCurrentFileName(log)); 449 log.registerWALActionsListener(new WALActionsListener() { 450 451 @Override 452 public void preLogRoll(Path oldFile, Path newFile) { 453 LOG.debug("preLogRoll: oldFile=" + oldFile + " newFile=" + newFile); 454 preLogRolledCalled.add(new Integer(1)); 455 } 456 457 @Override 458 public void postLogRoll(Path oldFile, Path newFile) { 459 paths.add(newFile); 460 } 461 }); 462 463 writeData(table, 1002); 464 465 long curTime = System.currentTimeMillis(); 466 LOG.info("log.getCurrentFileName()): " + AbstractFSWALProvider.getCurrentFileName(log)); 467 long oldFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log); 468 assertTrue("Log should have a timestamp older than now", 469 curTime > oldFilenum && oldFilenum != -1); 470 471 assertTrue("The log shouldn't have rolled yet", 472 oldFilenum == AbstractFSWALProvider.extractFileNumFromWAL(log)); 473 474 // roll all datanodes in the pipeline 475 dfsCluster.restartDataNodes(); 476 Thread.sleep(1000); 477 dfsCluster.waitActive(); 478 LOG.info("Data Nodes restarted"); 479 validateData(table, 1002); 480 481 // this write should succeed, but trigger a log roll 482 writeData(table, 1003); 483 long newFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log); 484 485 assertTrue("Missing datanode should've triggered a log roll", 486 newFilenum > oldFilenum && newFilenum > curTime); 487 validateData(table, 1003); 488 489 writeData(table, 1004); 490 491 // roll all datanode again 492 dfsCluster.restartDataNodes(); 493 Thread.sleep(1000); 494 dfsCluster.waitActive(); 495 LOG.info("Data Nodes restarted"); 496 validateData(table, 1004); 497 498 // this write should succeed, but trigger a log roll 499 writeData(table, 1005); 500 501 // force a log roll to read back and verify previously written logs 502 log.rollWriter(true); 503 assertTrue("preLogRolledCalled has size of " + preLogRolledCalled.size(), 504 preLogRolledCalled.size() >= 1); 505 506 // read back the data written 507 Set<String> loggedRows = new HashSet<>(); 508 for (Path p : paths) { 509 LOG.debug("recovering lease for " + p); 510 RecoverLeaseFSUtils.recoverFileLease(((HFileSystem) fs).getBackingFs(), p, 511 TEST_UTIL.getConfiguration(), null); 512 513 LOG.debug("Reading WAL " + CommonFSUtils.getPath(p)); 514 WAL.Reader reader = null; 515 try { 516 reader = WALFactory.createReader(fs, p, TEST_UTIL.getConfiguration()); 517 WAL.Entry entry; 518 while ((entry = reader.next()) != null) { 519 LOG.debug("#" + entry.getKey().getSequenceId() + ": " + entry.getEdit().getCells()); 520 for (Cell cell : entry.getEdit().getCells()) { 521 loggedRows.add( 522 Bytes.toStringBinary(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); 523 } 524 } 525 } catch (EOFException e) { 526 LOG.debug("EOF reading file " + CommonFSUtils.getPath(p)); 527 } finally { 528 if (reader != null) reader.close(); 529 } 530 } 531 532 // verify the written rows are there 533 assertTrue(loggedRows.contains("row1002")); 534 assertTrue(loggedRows.contains("row1003")); 535 assertTrue(loggedRows.contains("row1004")); 536 assertTrue(loggedRows.contains("row1005")); 537 538 // flush all regions 539 for (HRegion r : server.getOnlineRegionsLocalContext()) { 540 try { 541 r.flush(true); 542 } catch (Exception e) { 543 // This try/catch was added by HBASE-14317. It is needed 544 // because this issue tightened up the semantic such that 545 // a failed append could not be followed by a successful 546 // sync. What is coming out here is a failed sync, a sync 547 // that used to 'pass'. 548 LOG.info(e.toString(), e); 549 } 550 } 551 552 ResultScanner scanner = table.getScanner(new Scan()); 553 try { 554 for (int i = 2; i <= 5; i++) { 555 Result r = scanner.next(); 556 assertNotNull(r); 557 assertFalse(r.isEmpty()); 558 assertEquals("row100" + i, Bytes.toString(r.getRow())); 559 } 560 } finally { 561 scanner.close(); 562 } 563 564 // verify that no region servers aborted 565 for (JVMClusterUtil.RegionServerThread rsThread : TEST_UTIL.getHBaseCluster() 566 .getRegionServerThreads()) { 567 assertFalse(rsThread.getRegionServer().isAborted()); 568 } 569 } finally { 570 if (t != null) t.close(); 571 } 572 } 573 574}