001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertTrue; 024 025import java.io.EOFException; 026import java.io.IOException; 027import java.io.InterruptedIOException; 028import java.util.ArrayList; 029import java.util.HashSet; 030import java.util.List; 031import java.util.Set; 032import java.util.concurrent.atomic.AtomicBoolean; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.Waiter; 040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 041import org.apache.hadoop.hbase.client.Put; 042import org.apache.hadoop.hbase.client.RegionInfo; 043import org.apache.hadoop.hbase.client.Result; 044import org.apache.hadoop.hbase.client.ResultScanner; 045import org.apache.hadoop.hbase.client.Scan; 046import org.apache.hadoop.hbase.client.Table; 047import org.apache.hadoop.hbase.client.TableDescriptor; 048import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 049import org.apache.hadoop.hbase.fs.HFileSystem; 050import org.apache.hadoop.hbase.regionserver.HRegion; 051import org.apache.hadoop.hbase.testclassification.LargeTests; 052import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.hadoop.hbase.util.CommonFSUtils; 055import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 056import org.apache.hadoop.hbase.util.JVMClusterUtil; 057import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; 058import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 059import org.apache.hadoop.hbase.wal.WAL; 060import org.apache.hadoop.hbase.wal.WAL.Entry; 061import org.apache.hadoop.hbase.wal.WALFactory; 062import org.apache.hadoop.hbase.wal.WALProvider.Writer; 063import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 064import org.apache.hadoop.hdfs.server.datanode.DataNode; 065import org.junit.BeforeClass; 066import org.junit.ClassRule; 067import org.junit.Test; 068import org.junit.experimental.categories.Category; 069import org.slf4j.Logger; 070import org.slf4j.LoggerFactory; 071 072@Category({ VerySlowRegionServerTests.class, LargeTests.class }) 073public class TestLogRolling extends AbstractTestLogRolling { 074 075 @ClassRule 076 public static final HBaseClassTestRule CLASS_RULE = 077 HBaseClassTestRule.forClass(TestLogRolling.class); 078 079 private static final Logger LOG = LoggerFactory.getLogger(TestLogRolling.class); 080 081 @BeforeClass 082 public static void setUpBeforeClass() throws Exception { 083 // TODO: testLogRollOnDatanodeDeath fails if short circuit reads are on under the hadoop2 084 // profile. See HBASE-9337 for related issues. 085 System.setProperty("hbase.tests.use.shortcircuit.reads", "false"); 086 087 /**** configuration for testLogRollOnDatanodeDeath ****/ 088 // lower the namenode & datanode heartbeat so the namenode 089 // quickly detects datanode failures 090 Configuration conf = TEST_UTIL.getConfiguration(); 091 conf.setInt("dfs.namenode.heartbeat.recheck-interval", 5000); 092 conf.setInt("dfs.heartbeat.interval", 1); 093 // the namenode might still try to choose the recently-dead datanode 094 // for a pipeline, so try to a new pipeline multiple times 095 conf.setInt("dfs.client.block.write.retries", 30); 096 conf.setInt("hbase.regionserver.hlog.tolerable.lowreplication", 2); 097 conf.setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3); 098 conf.set(WALFactory.WAL_PROVIDER, "filesystem"); 099 AbstractTestLogRolling.setUpBeforeClass(); 100 101 // For slow sync threshold test: roll after 5 slow syncs in 10 seconds 102 TEST_UTIL.getConfiguration().setInt(FSHLog.SLOW_SYNC_ROLL_THRESHOLD, 5); 103 TEST_UTIL.getConfiguration().setInt(FSHLog.SLOW_SYNC_ROLL_INTERVAL_MS, 10 * 1000); 104 // For slow sync threshold test: roll once after a sync above this threshold 105 TEST_UTIL.getConfiguration().setInt(FSHLog.ROLL_ON_SYNC_TIME_MS, 5000); 106 } 107 108 @Test 109 public void testSlowSyncLogRolling() throws Exception { 110 // Create the test table 111 TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) 112 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); 113 admin.createTable(desc); 114 Table table = TEST_UTIL.getConnection().getTable(desc.getTableName()); 115 int row = 1; 116 try { 117 // Get a reference to the FSHLog 118 server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); 119 RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); 120 final FSHLog log = (FSHLog) server.getWAL(region); 121 122 // Register a WALActionsListener to observe if a SLOW_SYNC roll is requested 123 124 final AtomicBoolean slowSyncHookCalled = new AtomicBoolean(); 125 log.registerWALActionsListener(new WALActionsListener() { 126 @Override 127 public void logRollRequested(WALActionsListener.RollRequestReason reason) { 128 switch (reason) { 129 case SLOW_SYNC: 130 slowSyncHookCalled.lazySet(true); 131 break; 132 default: 133 break; 134 } 135 } 136 }); 137 138 // Write some data 139 140 for (int i = 0; i < 10; i++) { 141 writeData(table, row++); 142 } 143 144 assertFalse("Should not have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); 145 146 // Set up for test 147 slowSyncHookCalled.set(false); 148 149 // Wrap the current writer with the anonymous class below that adds 200 ms of 150 // latency to any sync on the hlog. This should be more than sufficient to trigger 151 // slow sync warnings. 152 final Writer oldWriter1 = log.getWriter(); 153 final Writer newWriter1 = new Writer() { 154 @Override 155 public void close() throws IOException { 156 oldWriter1.close(); 157 } 158 159 @Override 160 public void sync(boolean forceSync) throws IOException { 161 try { 162 Thread.sleep(200); 163 } catch (InterruptedException e) { 164 InterruptedIOException ex = new InterruptedIOException(); 165 ex.initCause(e); 166 throw ex; 167 } 168 oldWriter1.sync(forceSync); 169 } 170 171 @Override 172 public void append(Entry entry) throws IOException { 173 oldWriter1.append(entry); 174 } 175 176 @Override 177 public long getLength() { 178 return oldWriter1.getLength(); 179 } 180 181 @Override 182 public long getSyncedLength() { 183 return oldWriter1.getSyncedLength(); 184 } 185 }; 186 log.setWriter(newWriter1); 187 188 // Write some data. 189 // We need to write at least 5 times, but double it. We should only request 190 // a SLOW_SYNC roll once in the current interval. 191 for (int i = 0; i < 10; i++) { 192 writeData(table, row++); 193 } 194 195 // Wait for our wait injecting writer to get rolled out, as needed. 196 197 TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() { 198 @Override 199 public boolean evaluate() throws Exception { 200 return log.getWriter() != newWriter1; 201 } 202 203 @Override 204 public String explainFailure() throws Exception { 205 return "Waited too long for our test writer to get rolled out"; 206 } 207 }); 208 209 assertTrue("Should have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); 210 211 // Set up for test 212 slowSyncHookCalled.set(false); 213 214 // Wrap the current writer with the anonymous class below that adds 5000 ms of 215 // latency to any sync on the hlog. 216 // This will trip the other threshold. 217 final Writer oldWriter2 = (Writer) log.getWriter(); 218 final Writer newWriter2 = new Writer() { 219 @Override 220 public void close() throws IOException { 221 oldWriter2.close(); 222 } 223 224 @Override 225 public void sync(boolean forceSync) throws IOException { 226 try { 227 Thread.sleep(5000); 228 } catch (InterruptedException e) { 229 InterruptedIOException ex = new InterruptedIOException(); 230 ex.initCause(e); 231 throw ex; 232 } 233 oldWriter2.sync(forceSync); 234 } 235 236 @Override 237 public void append(Entry entry) throws IOException { 238 oldWriter2.append(entry); 239 } 240 241 @Override 242 public long getLength() { 243 return oldWriter2.getLength(); 244 } 245 246 @Override 247 public long getSyncedLength() { 248 return oldWriter2.getSyncedLength(); 249 } 250 }; 251 log.setWriter(newWriter2); 252 253 // Write some data. Should only take one sync. 254 255 writeData(table, row++); 256 257 // Wait for our wait injecting writer to get rolled out, as needed. 258 259 TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() { 260 @Override 261 public boolean evaluate() throws Exception { 262 return log.getWriter() != newWriter2; 263 } 264 265 @Override 266 public String explainFailure() throws Exception { 267 return "Waited too long for our test writer to get rolled out"; 268 } 269 }); 270 271 assertTrue("Should have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); 272 273 // Set up for test 274 slowSyncHookCalled.set(false); 275 276 // Write some data 277 for (int i = 0; i < 10; i++) { 278 writeData(table, row++); 279 } 280 281 assertFalse("Should not have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); 282 283 } finally { 284 table.close(); 285 } 286 } 287 288 void batchWriteAndWait(Table table, final FSHLog log, int start, boolean expect, int timeout) 289 throws IOException { 290 for (int i = 0; i < 10; i++) { 291 Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", (start + i)))); 292 put.addColumn(HConstants.CATALOG_FAMILY, null, value); 293 table.put(put); 294 } 295 Put tmpPut = new Put(Bytes.toBytes("tmprow")); 296 tmpPut.addColumn(HConstants.CATALOG_FAMILY, null, value); 297 long startTime = EnvironmentEdgeManager.currentTime(); 298 long remaining = timeout; 299 while (remaining > 0) { 300 if (log.isLowReplicationRollEnabled() == expect) { 301 break; 302 } else { 303 // Trigger calling FSHlog#checkLowReplication() 304 table.put(tmpPut); 305 try { 306 Thread.sleep(200); 307 } catch (InterruptedException e) { 308 // continue 309 } 310 remaining = timeout - (EnvironmentEdgeManager.currentTime() - startTime); 311 } 312 } 313 } 314 315 /** 316 * Tests that logs are rolled upon detecting datanode death Requires an HDFS jar with HDFS-826 & 317 * syncFs() support (HDFS-200) 318 */ 319 @Test 320 public void testLogRollOnDatanodeDeath() throws Exception { 321 TEST_UTIL.ensureSomeRegionServersAvailable(2); 322 assertTrue("This test requires WAL file replication set to 2.", 323 fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) == 2); 324 LOG.info("Replication=" + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); 325 326 this.server = cluster.getRegionServer(0); 327 328 // Create the test table and open it 329 TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) 330 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); 331 332 admin.createTable(desc); 333 Table table = TEST_UTIL.getConnection().getTable(desc.getTableName()); 334 335 server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); 336 RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); 337 final FSHLog log = (FSHLog) server.getWAL(region); 338 final AtomicBoolean lowReplicationHookCalled = new AtomicBoolean(false); 339 340 log.registerWALActionsListener(new WALActionsListener() { 341 @Override 342 public void logRollRequested(WALActionsListener.RollRequestReason reason) { 343 switch (reason) { 344 case LOW_REPLICATION: 345 lowReplicationHookCalled.lazySet(true); 346 break; 347 default: 348 break; 349 } 350 } 351 }); 352 353 // add up the datanode count, to ensure proper replication when we kill 1 354 // This function is synchronous; when it returns, the dfs cluster is active 355 // We start 3 servers and then stop 2 to avoid a directory naming conflict 356 // when we stop/start a namenode later, as mentioned in HBASE-5163 357 List<DataNode> existingNodes = dfsCluster.getDataNodes(); 358 int numDataNodes = 3; 359 dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), numDataNodes, true, null, null); 360 List<DataNode> allNodes = dfsCluster.getDataNodes(); 361 for (int i = allNodes.size() - 1; i >= 0; i--) { 362 if (existingNodes.contains(allNodes.get(i))) { 363 dfsCluster.stopDataNode(i); 364 } 365 } 366 367 assertTrue( 368 "DataNodes " + dfsCluster.getDataNodes().size() + " default replication " 369 + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()), 370 dfsCluster.getDataNodes().size() 371 >= fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) + 1); 372 373 writeData(table, 2); 374 375 long curTime = EnvironmentEdgeManager.currentTime(); 376 LOG.info("log.getCurrentFileName(): " + log.getCurrentFileName()); 377 long oldFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log); 378 assertTrue("Log should have a timestamp older than now", 379 curTime > oldFilenum && oldFilenum != -1); 380 381 assertTrue("The log shouldn't have rolled yet", 382 oldFilenum == AbstractFSWALProvider.extractFileNumFromWAL(log)); 383 final DatanodeInfo[] pipeline = log.getPipeline(); 384 assertTrue(pipeline.length == fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); 385 386 // kill a datanode in the pipeline to force a log roll on the next sync() 387 // This function is synchronous, when it returns the node is killed. 388 assertTrue(dfsCluster.stopDataNode(pipeline[0].getName()) != null); 389 390 // this write should succeed, but trigger a log roll 391 writeData(table, 2); 392 long newFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log); 393 394 assertTrue("Missing datanode should've triggered a log roll", 395 newFilenum > oldFilenum && newFilenum > curTime); 396 397 assertTrue("The log rolling hook should have been called with the low replication flag", 398 lowReplicationHookCalled.get()); 399 400 // write some more log data (this should use a new hdfs_out) 401 writeData(table, 3); 402 assertTrue("The log should not roll again.", 403 AbstractFSWALProvider.extractFileNumFromWAL(log) == newFilenum); 404 // kill another datanode in the pipeline, so the replicas will be lower than 405 // the configured value 2. 406 assertTrue(dfsCluster.stopDataNode(pipeline[1].getName()) != null); 407 408 batchWriteAndWait(table, log, 3, false, 14000); 409 int replication = log.getLogReplication(); 410 assertTrue("LowReplication Roller should've been disabled, current replication=" + replication, 411 !log.isLowReplicationRollEnabled()); 412 413 dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), 1, true, null, null); 414 415 // Force roll writer. The new log file will have the default replications, 416 // and the LowReplication Roller will be enabled. 417 log.rollWriter(true); 418 batchWriteAndWait(table, log, 13, true, 10000); 419 replication = log.getLogReplication(); 420 assertTrue("New log file should have the default replication instead of " + replication, 421 replication == fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); 422 assertTrue("LowReplication Roller should've been enabled", log.isLowReplicationRollEnabled()); 423 } 424 425 /** 426 * Test that WAL is rolled when all data nodes in the pipeline have been restarted. n 427 */ 428 @Test 429 public void testLogRollOnPipelineRestart() throws Exception { 430 LOG.info("Starting testLogRollOnPipelineRestart"); 431 assertTrue("This test requires WAL file replication.", 432 fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) > 1); 433 LOG.info("Replication=" + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); 434 // When the hbase:meta table can be opened, the region servers are running 435 Table t = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME); 436 try { 437 this.server = cluster.getRegionServer(0); 438 439 // Create the test table and open it 440 TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) 441 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); 442 443 admin.createTable(desc); 444 Table table = TEST_UTIL.getConnection().getTable(desc.getTableName()); 445 446 server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); 447 RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); 448 final WAL log = server.getWAL(region); 449 final List<Path> paths = new ArrayList<>(1); 450 final List<Integer> preLogRolledCalled = new ArrayList<>(); 451 452 paths.add(AbstractFSWALProvider.getCurrentFileName(log)); 453 log.registerWALActionsListener(new WALActionsListener() { 454 455 @Override 456 public void preLogRoll(Path oldFile, Path newFile) { 457 LOG.debug("preLogRoll: oldFile=" + oldFile + " newFile=" + newFile); 458 preLogRolledCalled.add(new Integer(1)); 459 } 460 461 @Override 462 public void postLogRoll(Path oldFile, Path newFile) { 463 paths.add(newFile); 464 } 465 }); 466 467 writeData(table, 1002); 468 469 long curTime = EnvironmentEdgeManager.currentTime(); 470 LOG.info("log.getCurrentFileName()): " + AbstractFSWALProvider.getCurrentFileName(log)); 471 long oldFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log); 472 assertTrue("Log should have a timestamp older than now", 473 curTime > oldFilenum && oldFilenum != -1); 474 475 assertTrue("The log shouldn't have rolled yet", 476 oldFilenum == AbstractFSWALProvider.extractFileNumFromWAL(log)); 477 478 // roll all datanodes in the pipeline 479 dfsCluster.restartDataNodes(); 480 Thread.sleep(1000); 481 dfsCluster.waitActive(); 482 LOG.info("Data Nodes restarted"); 483 validateData(table, 1002); 484 485 // this write should succeed, but trigger a log roll 486 writeData(table, 1003); 487 long newFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log); 488 489 assertTrue("Missing datanode should've triggered a log roll", 490 newFilenum > oldFilenum && newFilenum > curTime); 491 validateData(table, 1003); 492 493 writeData(table, 1004); 494 495 // roll all datanode again 496 dfsCluster.restartDataNodes(); 497 Thread.sleep(1000); 498 dfsCluster.waitActive(); 499 LOG.info("Data Nodes restarted"); 500 validateData(table, 1004); 501 502 // this write should succeed, but trigger a log roll 503 writeData(table, 1005); 504 505 // force a log roll to read back and verify previously written logs 506 log.rollWriter(true); 507 assertTrue("preLogRolledCalled has size of " + preLogRolledCalled.size(), 508 preLogRolledCalled.size() >= 1); 509 510 // read back the data written 511 Set<String> loggedRows = new HashSet<>(); 512 for (Path p : paths) { 513 LOG.debug("recovering lease for " + p); 514 RecoverLeaseFSUtils.recoverFileLease(((HFileSystem) fs).getBackingFs(), p, 515 TEST_UTIL.getConfiguration(), null); 516 517 LOG.debug("Reading WAL " + CommonFSUtils.getPath(p)); 518 WAL.Reader reader = null; 519 try { 520 reader = WALFactory.createReader(fs, p, TEST_UTIL.getConfiguration()); 521 WAL.Entry entry; 522 while ((entry = reader.next()) != null) { 523 LOG.debug("#" + entry.getKey().getSequenceId() + ": " + entry.getEdit().getCells()); 524 for (Cell cell : entry.getEdit().getCells()) { 525 loggedRows.add( 526 Bytes.toStringBinary(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); 527 } 528 } 529 } catch (EOFException e) { 530 LOG.debug("EOF reading file " + CommonFSUtils.getPath(p)); 531 } finally { 532 if (reader != null) reader.close(); 533 } 534 } 535 536 // verify the written rows are there 537 assertTrue(loggedRows.contains("row1002")); 538 assertTrue(loggedRows.contains("row1003")); 539 assertTrue(loggedRows.contains("row1004")); 540 assertTrue(loggedRows.contains("row1005")); 541 542 // flush all regions 543 for (HRegion r : server.getOnlineRegionsLocalContext()) { 544 try { 545 r.flush(true); 546 } catch (Exception e) { 547 // This try/catch was added by HBASE-14317. It is needed 548 // because this issue tightened up the semantic such that 549 // a failed append could not be followed by a successful 550 // sync. What is coming out here is a failed sync, a sync 551 // that used to 'pass'. 552 LOG.info(e.toString(), e); 553 } 554 } 555 556 ResultScanner scanner = table.getScanner(new Scan()); 557 try { 558 for (int i = 2; i <= 5; i++) { 559 Result r = scanner.next(); 560 assertNotNull(r); 561 assertFalse(r.isEmpty()); 562 assertEquals("row100" + i, Bytes.toString(r.getRow())); 563 } 564 } finally { 565 scanner.close(); 566 } 567 568 // verify that no region servers aborted 569 for (JVMClusterUtil.RegionServerThread rsThread : TEST_UTIL.getHBaseCluster() 570 .getRegionServerThreads()) { 571 assertFalse(rsThread.getRegionServer().isAborted()); 572 } 573 } finally { 574 if (t != null) t.close(); 575 } 576 } 577 578}