001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertTrue; 024 025import java.io.EOFException; 026import java.io.IOException; 027import java.io.InterruptedIOException; 028import java.util.ArrayList; 029import java.util.HashSet; 030import java.util.List; 031import java.util.Set; 032import java.util.concurrent.atomic.AtomicBoolean; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.Waiter; 040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 041import org.apache.hadoop.hbase.client.Put; 042import org.apache.hadoop.hbase.client.RegionInfo; 043import org.apache.hadoop.hbase.client.Result; 044import org.apache.hadoop.hbase.client.ResultScanner; 045import org.apache.hadoop.hbase.client.Scan; 046import org.apache.hadoop.hbase.client.Table; 047import org.apache.hadoop.hbase.client.TableDescriptor; 048import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 049import org.apache.hadoop.hbase.fs.HFileSystem; 050import org.apache.hadoop.hbase.regionserver.HRegion; 051import org.apache.hadoop.hbase.testclassification.LargeTests; 052import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.hadoop.hbase.util.CommonFSUtils; 055import org.apache.hadoop.hbase.util.JVMClusterUtil; 056import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; 057import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 058import org.apache.hadoop.hbase.wal.WAL; 059import org.apache.hadoop.hbase.wal.WAL.Entry; 060import org.apache.hadoop.hbase.wal.WALFactory; 061import org.apache.hadoop.hbase.wal.WALProvider.Writer; 062import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 063import org.apache.hadoop.hdfs.server.datanode.DataNode; 064import org.junit.BeforeClass; 065import org.junit.ClassRule; 066import org.junit.Test; 067import org.junit.experimental.categories.Category; 068import org.slf4j.Logger; 069import org.slf4j.LoggerFactory; 070 071@Category({ VerySlowRegionServerTests.class, LargeTests.class }) 072public class TestLogRolling extends AbstractTestLogRolling { 073 074 @ClassRule 075 public static final HBaseClassTestRule CLASS_RULE = 076 HBaseClassTestRule.forClass(TestLogRolling.class); 077 078 private static final Logger LOG = LoggerFactory.getLogger(TestLogRolling.class); 079 080 @BeforeClass 081 public static void setUpBeforeClass() throws Exception { 082 // TODO: testLogRollOnDatanodeDeath fails if short circuit reads are on under the hadoop2 083 // profile. See HBASE-9337 for related issues. 084 System.setProperty("hbase.tests.use.shortcircuit.reads", "false"); 085 086 /**** configuration for testLogRollOnDatanodeDeath ****/ 087 // lower the namenode & datanode heartbeat so the namenode 088 // quickly detects datanode failures 089 Configuration conf= TEST_UTIL.getConfiguration(); 090 conf.setInt("dfs.namenode.heartbeat.recheck-interval", 5000); 091 conf.setInt("dfs.heartbeat.interval", 1); 092 // the namenode might still try to choose the recently-dead datanode 093 // for a pipeline, so try to a new pipeline multiple times 094 conf.setInt("dfs.client.block.write.retries", 30); 095 conf.setInt("hbase.regionserver.hlog.tolerable.lowreplication", 2); 096 conf.setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3); 097 conf.set(WALFactory.WAL_PROVIDER, "filesystem"); 098 AbstractTestLogRolling.setUpBeforeClass(); 099 100 // For slow sync threshold test: roll after 5 slow syncs in 10 seconds 101 TEST_UTIL.getConfiguration().setInt(FSHLog.SLOW_SYNC_ROLL_THRESHOLD, 5); 102 TEST_UTIL.getConfiguration().setInt(FSHLog.SLOW_SYNC_ROLL_INTERVAL_MS, 10 * 1000); 103 // For slow sync threshold test: roll once after a sync above this threshold 104 TEST_UTIL.getConfiguration().setInt(FSHLog.ROLL_ON_SYNC_TIME_MS, 5000); 105 } 106 107 @Test 108 public void testSlowSyncLogRolling() throws Exception { 109 // Create the test table 110 TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) 111 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); 112 admin.createTable(desc); 113 Table table = TEST_UTIL.getConnection().getTable(desc.getTableName()); 114 int row = 1; 115 try { 116 // Get a reference to the FSHLog 117 server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); 118 RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); 119 final FSHLog log = (FSHLog) server.getWAL(region); 120 121 // Register a WALActionsListener to observe if a SLOW_SYNC roll is requested 122 123 final AtomicBoolean slowSyncHookCalled = new AtomicBoolean(); 124 log.registerWALActionsListener(new WALActionsListener() { 125 @Override 126 public void logRollRequested(WALActionsListener.RollRequestReason reason) { 127 switch (reason) { 128 case SLOW_SYNC: 129 slowSyncHookCalled.lazySet(true); 130 break; 131 default: 132 break; 133 } 134 } 135 }); 136 137 // Write some data 138 139 for (int i = 0; i < 10; i++) { 140 writeData(table, row++); 141 } 142 143 assertFalse("Should not have triggered log roll due to SLOW_SYNC", 144 slowSyncHookCalled.get()); 145 146 // Set up for test 147 slowSyncHookCalled.set(false); 148 149 // Wrap the current writer with the anonymous class below that adds 200 ms of 150 // latency to any sync on the hlog. This should be more than sufficient to trigger 151 // slow sync warnings. 152 final Writer oldWriter1 = log.getWriter(); 153 final Writer newWriter1 = new Writer() { 154 @Override 155 public void close() throws IOException { 156 oldWriter1.close(); 157 } 158 @Override 159 public void sync(boolean forceSync) throws IOException { 160 try { 161 Thread.sleep(200); 162 } catch (InterruptedException e) { 163 InterruptedIOException ex = new InterruptedIOException(); 164 ex.initCause(e); 165 throw ex; 166 } 167 oldWriter1.sync(forceSync); 168 } 169 @Override 170 public void append(Entry entry) throws IOException { 171 oldWriter1.append(entry); 172 } 173 @Override 174 public long getLength() { 175 return oldWriter1.getLength(); 176 } 177 }; 178 log.setWriter(newWriter1); 179 180 // Write some data. 181 // We need to write at least 5 times, but double it. We should only request 182 // a SLOW_SYNC roll once in the current interval. 183 for (int i = 0; i < 10; i++) { 184 writeData(table, row++); 185 } 186 187 // Wait for our wait injecting writer to get rolled out, as needed. 188 189 TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() { 190 @Override 191 public boolean evaluate() throws Exception { 192 return log.getWriter() != newWriter1; 193 } 194 @Override 195 public String explainFailure() throws Exception { 196 return "Waited too long for our test writer to get rolled out"; 197 } 198 }); 199 200 assertTrue("Should have triggered log roll due to SLOW_SYNC", 201 slowSyncHookCalled.get()); 202 203 // Set up for test 204 slowSyncHookCalled.set(false); 205 206 // Wrap the current writer with the anonymous class below that adds 5000 ms of 207 // latency to any sync on the hlog. 208 // This will trip the other threshold. 209 final Writer oldWriter2 = (Writer)log.getWriter(); 210 final Writer newWriter2 = new Writer() { 211 @Override 212 public void close() throws IOException { 213 oldWriter2.close(); 214 } 215 @Override 216 public void sync(boolean forceSync) throws IOException { 217 try { 218 Thread.sleep(5000); 219 } catch (InterruptedException e) { 220 InterruptedIOException ex = new InterruptedIOException(); 221 ex.initCause(e); 222 throw ex; 223 } 224 oldWriter2.sync(forceSync); 225 } 226 @Override 227 public void append(Entry entry) throws IOException { 228 oldWriter2.append(entry); 229 } 230 @Override 231 public long getLength() { 232 return oldWriter2.getLength(); 233 } 234 }; 235 log.setWriter(newWriter2); 236 237 // Write some data. Should only take one sync. 238 239 writeData(table, row++); 240 241 // Wait for our wait injecting writer to get rolled out, as needed. 242 243 TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() { 244 @Override 245 public boolean evaluate() throws Exception { 246 return log.getWriter() != newWriter2; 247 } 248 @Override 249 public String explainFailure() throws Exception { 250 return "Waited too long for our test writer to get rolled out"; 251 } 252 }); 253 254 assertTrue("Should have triggered log roll due to SLOW_SYNC", 255 slowSyncHookCalled.get()); 256 257 // Set up for test 258 slowSyncHookCalled.set(false); 259 260 // Write some data 261 for (int i = 0; i < 10; i++) { 262 writeData(table, row++); 263 } 264 265 assertFalse("Should not have triggered log roll due to SLOW_SYNC", 266 slowSyncHookCalled.get()); 267 268 } finally { 269 table.close(); 270 } 271 } 272 273 void batchWriteAndWait(Table table, final FSHLog log, int start, boolean expect, int timeout) 274 throws IOException { 275 for (int i = 0; i < 10; i++) { 276 Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", (start + i)))); 277 put.addColumn(HConstants.CATALOG_FAMILY, null, value); 278 table.put(put); 279 } 280 Put tmpPut = new Put(Bytes.toBytes("tmprow")); 281 tmpPut.addColumn(HConstants.CATALOG_FAMILY, null, value); 282 long startTime = System.currentTimeMillis(); 283 long remaining = timeout; 284 while (remaining > 0) { 285 if (log.isLowReplicationRollEnabled() == expect) { 286 break; 287 } else { 288 // Trigger calling FSHlog#checkLowReplication() 289 table.put(tmpPut); 290 try { 291 Thread.sleep(200); 292 } catch (InterruptedException e) { 293 // continue 294 } 295 remaining = timeout - (System.currentTimeMillis() - startTime); 296 } 297 } 298 } 299 300 /** 301 * Tests that logs are rolled upon detecting datanode death Requires an HDFS jar with HDFS-826 & 302 * syncFs() support (HDFS-200) 303 */ 304 @Test 305 public void testLogRollOnDatanodeDeath() throws Exception { 306 TEST_UTIL.ensureSomeRegionServersAvailable(2); 307 assertTrue("This test requires WAL file replication set to 2.", 308 fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) == 2); 309 LOG.info("Replication=" + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); 310 311 this.server = cluster.getRegionServer(0); 312 313 // Create the test table and open it 314 TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) 315 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); 316 317 admin.createTable(desc); 318 Table table = TEST_UTIL.getConnection().getTable(desc.getTableName()); 319 320 server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); 321 RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); 322 final FSHLog log = (FSHLog) server.getWAL(region); 323 final AtomicBoolean lowReplicationHookCalled = new AtomicBoolean(false); 324 325 log.registerWALActionsListener(new WALActionsListener() { 326 @Override 327 public void logRollRequested(WALActionsListener.RollRequestReason reason) { 328 switch (reason) { 329 case LOW_REPLICATION: 330 lowReplicationHookCalled.lazySet(true); 331 break; 332 default: 333 break; 334 } 335 } 336 }); 337 338 // add up the datanode count, to ensure proper replication when we kill 1 339 // This function is synchronous; when it returns, the dfs cluster is active 340 // We start 3 servers and then stop 2 to avoid a directory naming conflict 341 // when we stop/start a namenode later, as mentioned in HBASE-5163 342 List<DataNode> existingNodes = dfsCluster.getDataNodes(); 343 int numDataNodes = 3; 344 dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), numDataNodes, true, null, null); 345 List<DataNode> allNodes = dfsCluster.getDataNodes(); 346 for (int i = allNodes.size() - 1; i >= 0; i--) { 347 if (existingNodes.contains(allNodes.get(i))) { 348 dfsCluster.stopDataNode(i); 349 } 350 } 351 352 assertTrue( 353 "DataNodes " + dfsCluster.getDataNodes().size() + " default replication " 354 + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()), 355 dfsCluster.getDataNodes() 356 .size() >= fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) + 1); 357 358 writeData(table, 2); 359 360 long curTime = System.currentTimeMillis(); 361 LOG.info("log.getCurrentFileName(): " + log.getCurrentFileName()); 362 long oldFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log); 363 assertTrue("Log should have a timestamp older than now", 364 curTime > oldFilenum && oldFilenum != -1); 365 366 assertTrue("The log shouldn't have rolled yet", 367 oldFilenum == AbstractFSWALProvider.extractFileNumFromWAL(log)); 368 final DatanodeInfo[] pipeline = log.getPipeline(); 369 assertTrue(pipeline.length == fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); 370 371 // kill a datanode in the pipeline to force a log roll on the next sync() 372 // This function is synchronous, when it returns the node is killed. 373 assertTrue(dfsCluster.stopDataNode(pipeline[0].getName()) != null); 374 375 // this write should succeed, but trigger a log roll 376 writeData(table, 2); 377 long newFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log); 378 379 assertTrue("Missing datanode should've triggered a log roll", 380 newFilenum > oldFilenum && newFilenum > curTime); 381 382 assertTrue("The log rolling hook should have been called with the low replication flag", 383 lowReplicationHookCalled.get()); 384 385 // write some more log data (this should use a new hdfs_out) 386 writeData(table, 3); 387 assertTrue("The log should not roll again.", 388 AbstractFSWALProvider.extractFileNumFromWAL(log) == newFilenum); 389 // kill another datanode in the pipeline, so the replicas will be lower than 390 // the configured value 2. 391 assertTrue(dfsCluster.stopDataNode(pipeline[1].getName()) != null); 392 393 batchWriteAndWait(table, log, 3, false, 14000); 394 int replication = log.getLogReplication(); 395 assertTrue("LowReplication Roller should've been disabled, current replication=" + replication, 396 !log.isLowReplicationRollEnabled()); 397 398 dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), 1, true, null, null); 399 400 // Force roll writer. The new log file will have the default replications, 401 // and the LowReplication Roller will be enabled. 402 log.rollWriter(true); 403 batchWriteAndWait(table, log, 13, true, 10000); 404 replication = log.getLogReplication(); 405 assertTrue("New log file should have the default replication instead of " + replication, 406 replication == fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); 407 assertTrue("LowReplication Roller should've been enabled", log.isLowReplicationRollEnabled()); 408 } 409 410 /** 411 * Test that WAL is rolled when all data nodes in the pipeline have been restarted. 412 * @throws Exception 413 */ 414 @Test 415 public void testLogRollOnPipelineRestart() throws Exception { 416 LOG.info("Starting testLogRollOnPipelineRestart"); 417 assertTrue("This test requires WAL file replication.", 418 fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) > 1); 419 LOG.info("Replication=" + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); 420 // When the hbase:meta table can be opened, the region servers are running 421 Table t = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME); 422 try { 423 this.server = cluster.getRegionServer(0); 424 425 // Create the test table and open it 426 TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) 427 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); 428 429 admin.createTable(desc); 430 Table table = TEST_UTIL.getConnection().getTable(desc.getTableName()); 431 432 server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); 433 RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); 434 final WAL log = server.getWAL(region); 435 final List<Path> paths = new ArrayList<>(1); 436 final List<Integer> preLogRolledCalled = new ArrayList<>(); 437 438 paths.add(AbstractFSWALProvider.getCurrentFileName(log)); 439 log.registerWALActionsListener(new WALActionsListener() { 440 441 @Override 442 public void preLogRoll(Path oldFile, Path newFile) { 443 LOG.debug("preLogRoll: oldFile=" + oldFile + " newFile=" + newFile); 444 preLogRolledCalled.add(new Integer(1)); 445 } 446 447 @Override 448 public void postLogRoll(Path oldFile, Path newFile) { 449 paths.add(newFile); 450 } 451 }); 452 453 writeData(table, 1002); 454 455 long curTime = System.currentTimeMillis(); 456 LOG.info("log.getCurrentFileName()): " + AbstractFSWALProvider.getCurrentFileName(log)); 457 long oldFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log); 458 assertTrue("Log should have a timestamp older than now", 459 curTime > oldFilenum && oldFilenum != -1); 460 461 assertTrue("The log shouldn't have rolled yet", 462 oldFilenum == AbstractFSWALProvider.extractFileNumFromWAL(log)); 463 464 // roll all datanodes in the pipeline 465 dfsCluster.restartDataNodes(); 466 Thread.sleep(1000); 467 dfsCluster.waitActive(); 468 LOG.info("Data Nodes restarted"); 469 validateData(table, 1002); 470 471 // this write should succeed, but trigger a log roll 472 writeData(table, 1003); 473 long newFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log); 474 475 assertTrue("Missing datanode should've triggered a log roll", 476 newFilenum > oldFilenum && newFilenum > curTime); 477 validateData(table, 1003); 478 479 writeData(table, 1004); 480 481 // roll all datanode again 482 dfsCluster.restartDataNodes(); 483 Thread.sleep(1000); 484 dfsCluster.waitActive(); 485 LOG.info("Data Nodes restarted"); 486 validateData(table, 1004); 487 488 // this write should succeed, but trigger a log roll 489 writeData(table, 1005); 490 491 // force a log roll to read back and verify previously written logs 492 log.rollWriter(true); 493 assertTrue("preLogRolledCalled has size of " + preLogRolledCalled.size(), 494 preLogRolledCalled.size() >= 1); 495 496 // read back the data written 497 Set<String> loggedRows = new HashSet<>(); 498 for (Path p : paths) { 499 LOG.debug("recovering lease for " + p); 500 RecoverLeaseFSUtils.recoverFileLease(((HFileSystem) fs).getBackingFs(), p, 501 TEST_UTIL.getConfiguration(), null); 502 503 LOG.debug("Reading WAL " + CommonFSUtils.getPath(p)); 504 WAL.Reader reader = null; 505 try { 506 reader = WALFactory.createReader(fs, p, TEST_UTIL.getConfiguration()); 507 WAL.Entry entry; 508 while ((entry = reader.next()) != null) { 509 LOG.debug("#" + entry.getKey().getSequenceId() + ": " + entry.getEdit().getCells()); 510 for (Cell cell : entry.getEdit().getCells()) { 511 loggedRows.add( 512 Bytes.toStringBinary(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); 513 } 514 } 515 } catch (EOFException e) { 516 LOG.debug("EOF reading file " + CommonFSUtils.getPath(p)); 517 } finally { 518 if (reader != null) reader.close(); 519 } 520 } 521 522 // verify the written rows are there 523 assertTrue(loggedRows.contains("row1002")); 524 assertTrue(loggedRows.contains("row1003")); 525 assertTrue(loggedRows.contains("row1004")); 526 assertTrue(loggedRows.contains("row1005")); 527 528 // flush all regions 529 for (HRegion r : server.getOnlineRegionsLocalContext()) { 530 try { 531 r.flush(true); 532 } catch (Exception e) { 533 // This try/catch was added by HBASE-14317. It is needed 534 // because this issue tightened up the semantic such that 535 // a failed append could not be followed by a successful 536 // sync. What is coming out here is a failed sync, a sync 537 // that used to 'pass'. 538 LOG.info(e.toString(), e); 539 } 540 } 541 542 ResultScanner scanner = table.getScanner(new Scan()); 543 try { 544 for (int i = 2; i <= 5; i++) { 545 Result r = scanner.next(); 546 assertNotNull(r); 547 assertFalse(r.isEmpty()); 548 assertEquals("row100" + i, Bytes.toString(r.getRow())); 549 } 550 } finally { 551 scanner.close(); 552 } 553 554 // verify that no region servers aborted 555 for (JVMClusterUtil.RegionServerThread rsThread : TEST_UTIL.getHBaseCluster() 556 .getRegionServerThreads()) { 557 assertFalse(rsThread.getRegionServer().isAborted()); 558 } 559 } finally { 560 if (t != null) t.close(); 561 } 562 } 563 564}