001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertTrue; 024 025import java.io.EOFException; 026import java.io.IOException; 027import java.io.InterruptedIOException; 028import java.util.ArrayList; 029import java.util.HashSet; 030import java.util.List; 031import java.util.Set; 032import java.util.concurrent.atomic.AtomicBoolean; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.Waiter; 040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 041import org.apache.hadoop.hbase.client.Put; 042import org.apache.hadoop.hbase.client.RegionInfo; 043import org.apache.hadoop.hbase.client.Result; 044import org.apache.hadoop.hbase.client.ResultScanner; 045import org.apache.hadoop.hbase.client.Scan; 046import org.apache.hadoop.hbase.client.Table; 047import org.apache.hadoop.hbase.client.TableDescriptor; 048import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 049import org.apache.hadoop.hbase.fs.HFileSystem; 050import org.apache.hadoop.hbase.regionserver.HRegion; 051import org.apache.hadoop.hbase.testclassification.LargeTests; 052import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.hadoop.hbase.util.CommonFSUtils; 055import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 056import org.apache.hadoop.hbase.util.JVMClusterUtil; 057import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; 058import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 059import org.apache.hadoop.hbase.wal.WAL; 060import org.apache.hadoop.hbase.wal.WAL.Entry; 061import org.apache.hadoop.hbase.wal.WALFactory; 062import org.apache.hadoop.hbase.wal.WALProvider.Writer; 063import org.apache.hadoop.hbase.wal.WALStreamReader; 064import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 065import org.apache.hadoop.hdfs.server.datanode.DataNode; 066import org.junit.BeforeClass; 067import org.junit.ClassRule; 068import org.junit.Test; 069import org.junit.experimental.categories.Category; 070import org.slf4j.Logger; 071import org.slf4j.LoggerFactory; 072 073@Category({ VerySlowRegionServerTests.class, LargeTests.class }) 074public class TestLogRolling extends AbstractTestLogRolling { 075 076 @ClassRule 077 public static final HBaseClassTestRule CLASS_RULE = 078 HBaseClassTestRule.forClass(TestLogRolling.class); 079 080 private static final Logger LOG = LoggerFactory.getLogger(TestLogRolling.class); 081 082 @BeforeClass 083 public static void setUpBeforeClass() throws Exception { 084 // TODO: testLogRollOnDatanodeDeath fails if short circuit reads are on under the hadoop2 085 // profile. See HBASE-9337 for related issues. 086 System.setProperty("hbase.tests.use.shortcircuit.reads", "false"); 087 088 /**** configuration for testLogRollOnDatanodeDeath ****/ 089 // lower the namenode & datanode heartbeat so the namenode 090 // quickly detects datanode failures 091 Configuration conf = TEST_UTIL.getConfiguration(); 092 conf.setInt("dfs.namenode.heartbeat.recheck-interval", 5000); 093 conf.setInt("dfs.heartbeat.interval", 1); 094 // the namenode might still try to choose the recently-dead datanode 095 // for a pipeline, so try to a new pipeline multiple times 096 conf.setInt("dfs.client.block.write.retries", 30); 097 conf.setInt("hbase.regionserver.hlog.tolerable.lowreplication", 2); 098 conf.setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3); 099 conf.set(WALFactory.WAL_PROVIDER, "filesystem"); 100 AbstractTestLogRolling.setUpBeforeClass(); 101 102 // For slow sync threshold test: roll after 5 slow syncs in 10 seconds 103 TEST_UTIL.getConfiguration().setInt(FSHLog.SLOW_SYNC_ROLL_THRESHOLD, 5); 104 TEST_UTIL.getConfiguration().setInt(FSHLog.SLOW_SYNC_ROLL_INTERVAL_MS, 10 * 1000); 105 // For slow sync threshold test: roll once after a sync above this threshold 106 TEST_UTIL.getConfiguration().setInt(FSHLog.ROLL_ON_SYNC_TIME_MS, 5000); 107 } 108 109 @Test 110 public void testSlowSyncLogRolling() throws Exception { 111 // Create the test table 112 TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) 113 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); 114 admin.createTable(desc); 115 Table table = TEST_UTIL.getConnection().getTable(desc.getTableName()); 116 int row = 1; 117 try { 118 // Get a reference to the FSHLog 119 server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); 120 RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); 121 final FSHLog log = (FSHLog) server.getWAL(region); 122 123 // Register a WALActionsListener to observe if a SLOW_SYNC roll is requested 124 125 final AtomicBoolean slowSyncHookCalled = new AtomicBoolean(); 126 log.registerWALActionsListener(new WALActionsListener() { 127 @Override 128 public void logRollRequested(WALActionsListener.RollRequestReason reason) { 129 switch (reason) { 130 case SLOW_SYNC: 131 slowSyncHookCalled.lazySet(true); 132 break; 133 default: 134 break; 135 } 136 } 137 }); 138 139 // Write some data 140 141 for (int i = 0; i < 10; i++) { 142 writeData(table, row++); 143 } 144 145 assertFalse("Should not have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); 146 147 // Set up for test 148 slowSyncHookCalled.set(false); 149 150 // Wrap the current writer with the anonymous class below that adds 200 ms of 151 // latency to any sync on the hlog. This should be more than sufficient to trigger 152 // slow sync warnings. 153 final Writer oldWriter1 = log.getWriter(); 154 final Writer newWriter1 = new Writer() { 155 @Override 156 public void close() throws IOException { 157 oldWriter1.close(); 158 } 159 160 @Override 161 public void sync(boolean forceSync) throws IOException { 162 try { 163 Thread.sleep(200); 164 } catch (InterruptedException e) { 165 InterruptedIOException ex = new InterruptedIOException(); 166 ex.initCause(e); 167 throw ex; 168 } 169 oldWriter1.sync(forceSync); 170 } 171 172 @Override 173 public void append(Entry entry) throws IOException { 174 oldWriter1.append(entry); 175 } 176 177 @Override 178 public long getLength() { 179 return oldWriter1.getLength(); 180 } 181 182 @Override 183 public long getSyncedLength() { 184 return oldWriter1.getSyncedLength(); 185 } 186 }; 187 log.setWriter(newWriter1); 188 189 // Write some data. 190 // We need to write at least 5 times, but double it. We should only request 191 // a SLOW_SYNC roll once in the current interval. 192 for (int i = 0; i < 10; i++) { 193 writeData(table, row++); 194 } 195 196 // Wait for our wait injecting writer to get rolled out, as needed. 197 198 TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() { 199 @Override 200 public boolean evaluate() throws Exception { 201 return log.getWriter() != newWriter1; 202 } 203 204 @Override 205 public String explainFailure() throws Exception { 206 return "Waited too long for our test writer to get rolled out"; 207 } 208 }); 209 210 assertTrue("Should have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); 211 212 // Set up for test 213 slowSyncHookCalled.set(false); 214 215 // Wrap the current writer with the anonymous class below that adds 5000 ms of 216 // latency to any sync on the hlog. 217 // This will trip the other threshold. 218 final Writer oldWriter2 = (Writer) log.getWriter(); 219 final Writer newWriter2 = new Writer() { 220 @Override 221 public void close() throws IOException { 222 oldWriter2.close(); 223 } 224 225 @Override 226 public void sync(boolean forceSync) throws IOException { 227 try { 228 Thread.sleep(5000); 229 } catch (InterruptedException e) { 230 InterruptedIOException ex = new InterruptedIOException(); 231 ex.initCause(e); 232 throw ex; 233 } 234 oldWriter2.sync(forceSync); 235 } 236 237 @Override 238 public void append(Entry entry) throws IOException { 239 oldWriter2.append(entry); 240 } 241 242 @Override 243 public long getLength() { 244 return oldWriter2.getLength(); 245 } 246 247 @Override 248 public long getSyncedLength() { 249 return oldWriter2.getSyncedLength(); 250 } 251 }; 252 log.setWriter(newWriter2); 253 254 // Write some data. Should only take one sync. 255 256 writeData(table, row++); 257 258 // Wait for our wait injecting writer to get rolled out, as needed. 259 260 TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() { 261 @Override 262 public boolean evaluate() throws Exception { 263 return log.getWriter() != newWriter2; 264 } 265 266 @Override 267 public String explainFailure() throws Exception { 268 return "Waited too long for our test writer to get rolled out"; 269 } 270 }); 271 272 assertTrue("Should have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); 273 274 // Set up for test 275 slowSyncHookCalled.set(false); 276 277 // Write some data 278 for (int i = 0; i < 10; i++) { 279 writeData(table, row++); 280 } 281 282 assertFalse("Should not have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); 283 284 } finally { 285 table.close(); 286 } 287 } 288 289 void batchWriteAndWait(Table table, final FSHLog log, int start, boolean expect, int timeout) 290 throws IOException { 291 for (int i = 0; i < 10; i++) { 292 Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", (start + i)))); 293 put.addColumn(HConstants.CATALOG_FAMILY, null, value); 294 table.put(put); 295 } 296 Put tmpPut = new Put(Bytes.toBytes("tmprow")); 297 tmpPut.addColumn(HConstants.CATALOG_FAMILY, null, value); 298 long startTime = EnvironmentEdgeManager.currentTime(); 299 long remaining = timeout; 300 while (remaining > 0) { 301 if (log.isLowReplicationRollEnabled() == expect) { 302 break; 303 } else { 304 // Trigger calling FSHlog#checkLowReplication() 305 table.put(tmpPut); 306 try { 307 Thread.sleep(200); 308 } catch (InterruptedException e) { 309 // continue 310 } 311 remaining = timeout - (EnvironmentEdgeManager.currentTime() - startTime); 312 } 313 } 314 } 315 316 /** 317 * Tests that logs are rolled upon detecting datanode death Requires an HDFS jar with HDFS-826 & 318 * syncFs() support (HDFS-200) 319 */ 320 @Test 321 public void testLogRollOnDatanodeDeath() throws Exception { 322 TEST_UTIL.ensureSomeRegionServersAvailable(2); 323 assertTrue("This test requires WAL file replication set to 2.", 324 fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) == 2); 325 LOG.info("Replication=" + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); 326 327 this.server = cluster.getRegionServer(0); 328 329 // Create the test table and open it 330 TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) 331 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); 332 333 admin.createTable(desc); 334 Table table = TEST_UTIL.getConnection().getTable(desc.getTableName()); 335 336 server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); 337 RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); 338 final FSHLog log = (FSHLog) server.getWAL(region); 339 final AtomicBoolean lowReplicationHookCalled = new AtomicBoolean(false); 340 341 log.registerWALActionsListener(new WALActionsListener() { 342 @Override 343 public void logRollRequested(WALActionsListener.RollRequestReason reason) { 344 switch (reason) { 345 case LOW_REPLICATION: 346 lowReplicationHookCalled.lazySet(true); 347 break; 348 default: 349 break; 350 } 351 } 352 }); 353 354 // add up the datanode count, to ensure proper replication when we kill 1 355 // This function is synchronous; when it returns, the dfs cluster is active 356 // We start 3 servers and then stop 2 to avoid a directory naming conflict 357 // when we stop/start a namenode later, as mentioned in HBASE-5163 358 List<DataNode> existingNodes = dfsCluster.getDataNodes(); 359 int numDataNodes = 3; 360 dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), numDataNodes, true, null, null); 361 List<DataNode> allNodes = dfsCluster.getDataNodes(); 362 for (int i = allNodes.size() - 1; i >= 0; i--) { 363 if (existingNodes.contains(allNodes.get(i))) { 364 dfsCluster.stopDataNode(i); 365 } 366 } 367 368 assertTrue( 369 "DataNodes " + dfsCluster.getDataNodes().size() + " default replication " 370 + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()), 371 dfsCluster.getDataNodes().size() 372 >= fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) + 1); 373 374 writeData(table, 2); 375 376 long curTime = EnvironmentEdgeManager.currentTime(); 377 LOG.info("log.getCurrentFileName(): " + log.getCurrentFileName()); 378 long oldFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log); 379 assertTrue("Log should have a timestamp older than now", 380 curTime > oldFilenum && oldFilenum != -1); 381 382 assertTrue("The log shouldn't have rolled yet", 383 oldFilenum == AbstractFSWALProvider.extractFileNumFromWAL(log)); 384 final DatanodeInfo[] pipeline = log.getPipeline(); 385 assertTrue(pipeline.length == fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); 386 387 // kill a datanode in the pipeline to force a log roll on the next sync() 388 // This function is synchronous, when it returns the node is killed. 389 assertTrue(dfsCluster.stopDataNode(pipeline[0].getName()) != null); 390 391 // this write should succeed, but trigger a log roll 392 writeData(table, 2); 393 long newFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log); 394 395 assertTrue("Missing datanode should've triggered a log roll", 396 newFilenum > oldFilenum && newFilenum > curTime); 397 398 assertTrue("The log rolling hook should have been called with the low replication flag", 399 lowReplicationHookCalled.get()); 400 401 // write some more log data (this should use a new hdfs_out) 402 writeData(table, 3); 403 assertTrue("The log should not roll again.", 404 AbstractFSWALProvider.extractFileNumFromWAL(log) == newFilenum); 405 // kill another datanode in the pipeline, so the replicas will be lower than 406 // the configured value 2. 407 assertTrue(dfsCluster.stopDataNode(pipeline[1].getName()) != null); 408 409 batchWriteAndWait(table, log, 3, false, 14000); 410 int replication = log.getLogReplication(); 411 assertTrue("LowReplication Roller should've been disabled, current replication=" + replication, 412 !log.isLowReplicationRollEnabled()); 413 414 dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), 1, true, null, null); 415 416 // Force roll writer. The new log file will have the default replications, 417 // and the LowReplication Roller will be enabled. 418 log.rollWriter(true); 419 batchWriteAndWait(table, log, 13, true, 10000); 420 replication = log.getLogReplication(); 421 assertTrue("New log file should have the default replication instead of " + replication, 422 replication == fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); 423 assertTrue("LowReplication Roller should've been enabled", log.isLowReplicationRollEnabled()); 424 } 425 426 /** 427 * Test that WAL is rolled when all data nodes in the pipeline have been restarted. 428 */ 429 @Test 430 public void testLogRollOnPipelineRestart() throws Exception { 431 LOG.info("Starting testLogRollOnPipelineRestart"); 432 assertTrue("This test requires WAL file replication.", 433 fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) > 1); 434 LOG.info("Replication=" + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); 435 // When the hbase:meta table can be opened, the region servers are running 436 Table t = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME); 437 try { 438 this.server = cluster.getRegionServer(0); 439 440 // Create the test table and open it 441 TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) 442 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); 443 444 admin.createTable(desc); 445 Table table = TEST_UTIL.getConnection().getTable(desc.getTableName()); 446 447 server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); 448 RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); 449 final WAL log = server.getWAL(region); 450 final List<Path> paths = new ArrayList<>(1); 451 final List<Integer> preLogRolledCalled = new ArrayList<>(); 452 453 paths.add(AbstractFSWALProvider.getCurrentFileName(log)); 454 log.registerWALActionsListener(new WALActionsListener() { 455 456 @Override 457 public void preLogRoll(Path oldFile, Path newFile) { 458 LOG.debug("preLogRoll: oldFile=" + oldFile + " newFile=" + newFile); 459 preLogRolledCalled.add(new Integer(1)); 460 } 461 462 @Override 463 public void postLogRoll(Path oldFile, Path newFile) { 464 paths.add(newFile); 465 } 466 }); 467 468 writeData(table, 1002); 469 470 long curTime = EnvironmentEdgeManager.currentTime(); 471 LOG.info("log.getCurrentFileName()): " + AbstractFSWALProvider.getCurrentFileName(log)); 472 long oldFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log); 473 assertTrue("Log should have a timestamp older than now", 474 curTime > oldFilenum && oldFilenum != -1); 475 476 assertTrue("The log shouldn't have rolled yet", 477 oldFilenum == AbstractFSWALProvider.extractFileNumFromWAL(log)); 478 479 // roll all datanodes in the pipeline 480 dfsCluster.restartDataNodes(); 481 Thread.sleep(1000); 482 dfsCluster.waitActive(); 483 LOG.info("Data Nodes restarted"); 484 validateData(table, 1002); 485 486 // this write should succeed, but trigger a log roll 487 writeData(table, 1003); 488 long newFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log); 489 490 assertTrue("Missing datanode should've triggered a log roll", 491 newFilenum > oldFilenum && newFilenum > curTime); 492 validateData(table, 1003); 493 494 writeData(table, 1004); 495 496 // roll all datanode again 497 dfsCluster.restartDataNodes(); 498 Thread.sleep(1000); 499 dfsCluster.waitActive(); 500 LOG.info("Data Nodes restarted"); 501 validateData(table, 1004); 502 503 // this write should succeed, but trigger a log roll 504 writeData(table, 1005); 505 506 // force a log roll to read back and verify previously written logs 507 log.rollWriter(true); 508 assertTrue("preLogRolledCalled has size of " + preLogRolledCalled.size(), 509 preLogRolledCalled.size() >= 1); 510 511 // read back the data written 512 Set<String> loggedRows = new HashSet<>(); 513 for (Path p : paths) { 514 LOG.debug("recovering lease for " + p); 515 RecoverLeaseFSUtils.recoverFileLease(((HFileSystem) fs).getBackingFs(), p, 516 TEST_UTIL.getConfiguration(), null); 517 518 LOG.debug("Reading WAL " + CommonFSUtils.getPath(p)); 519 try (WALStreamReader reader = 520 WALFactory.createStreamReader(fs, p, TEST_UTIL.getConfiguration())) { 521 WAL.Entry entry; 522 while ((entry = reader.next()) != null) { 523 LOG.debug("#" + entry.getKey().getSequenceId() + ": " + entry.getEdit().getCells()); 524 for (Cell cell : entry.getEdit().getCells()) { 525 loggedRows.add( 526 Bytes.toStringBinary(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); 527 } 528 } 529 } catch (EOFException e) { 530 LOG.debug("EOF reading file " + CommonFSUtils.getPath(p)); 531 } 532 } 533 534 // verify the written rows are there 535 assertTrue(loggedRows.contains("row1002")); 536 assertTrue(loggedRows.contains("row1003")); 537 assertTrue(loggedRows.contains("row1004")); 538 assertTrue(loggedRows.contains("row1005")); 539 540 // flush all regions 541 for (HRegion r : server.getOnlineRegionsLocalContext()) { 542 try { 543 r.flush(true); 544 } catch (Exception e) { 545 // This try/catch was added by HBASE-14317. It is needed 546 // because this issue tightened up the semantic such that 547 // a failed append could not be followed by a successful 548 // sync. What is coming out here is a failed sync, a sync 549 // that used to 'pass'. 550 LOG.info(e.toString(), e); 551 } 552 } 553 554 ResultScanner scanner = table.getScanner(new Scan()); 555 try { 556 for (int i = 2; i <= 5; i++) { 557 Result r = scanner.next(); 558 assertNotNull(r); 559 assertFalse(r.isEmpty()); 560 assertEquals("row100" + i, Bytes.toString(r.getRow())); 561 } 562 } finally { 563 scanner.close(); 564 } 565 566 // verify that no region servers aborted 567 for (JVMClusterUtil.RegionServerThread rsThread : TEST_UTIL.getHBaseCluster() 568 .getRegionServerThreads()) { 569 assertFalse(rsThread.getRegionServer().isAborted()); 570 } 571 } finally { 572 if (t != null) t.close(); 573 } 574 } 575 576}