001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertNotNull; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024 025import java.io.EOFException; 026import java.io.IOException; 027import java.io.InterruptedIOException; 028import java.util.ArrayList; 029import java.util.HashSet; 030import java.util.List; 031import java.util.Set; 032import java.util.concurrent.atomic.AtomicBoolean; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 039import org.apache.hadoop.hbase.client.Put; 040import org.apache.hadoop.hbase.client.RegionInfo; 041import org.apache.hadoop.hbase.client.Result; 042import org.apache.hadoop.hbase.client.ResultScanner; 043import org.apache.hadoop.hbase.client.Scan; 044import org.apache.hadoop.hbase.client.Table; 045import org.apache.hadoop.hbase.client.TableDescriptor; 046import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 047import org.apache.hadoop.hbase.fs.HFileSystem; 048import org.apache.hadoop.hbase.regionserver.HRegion; 049import org.apache.hadoop.hbase.testclassification.LargeTests; 050import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.apache.hadoop.hbase.util.CommonFSUtils; 053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 054import org.apache.hadoop.hbase.util.JVMClusterUtil; 055import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; 056import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 057import org.apache.hadoop.hbase.wal.FSHLogProvider; 058import org.apache.hadoop.hbase.wal.WAL; 059import org.apache.hadoop.hbase.wal.WALFactory; 060import org.apache.hadoop.hbase.wal.WALStreamReader; 061import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 062import org.apache.hadoop.hdfs.server.datanode.DataNode; 063import org.junit.jupiter.api.BeforeAll; 064import org.junit.jupiter.api.Tag; 065import org.junit.jupiter.api.Test; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069@Tag(VerySlowRegionServerTests.TAG) 070@Tag(LargeTests.TAG) 071public class TestLogRolling extends AbstractTestLogRolling { 072 073 private static final Logger LOG = LoggerFactory.getLogger(TestLogRolling.class); 074 075 @BeforeAll 076 public static void setUpBeforeClass() throws Exception { 077 // TODO: testLogRollOnDatanodeDeath fails if short circuit reads are on under the hadoop2 078 // profile. See HBASE-9337 for related issues. 079 System.setProperty("hbase.tests.use.shortcircuit.reads", "false"); 080 081 /**** configuration for testLogRollOnDatanodeDeath ****/ 082 // lower the namenode & datanode heartbeat so the namenode 083 // quickly detects datanode failures 084 Configuration conf = TEST_UTIL.getConfiguration(); 085 conf.setInt("dfs.namenode.heartbeat.recheck-interval", 5000); 086 conf.setInt("dfs.heartbeat.interval", 1); 087 // the namenode might still try to choose the recently-dead datanode 088 // for a pipeline, so try to a new pipeline multiple times 089 conf.setInt("dfs.client.block.write.retries", 30); 090 conf.setInt("hbase.regionserver.hlog.tolerable.lowreplication", 2); 091 conf.setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3); 092 conf.set(WALFactory.WAL_PROVIDER, "filesystem"); 093 AbstractTestLogRolling.setUpBeforeClass(); 094 } 095 096 public static class SlowSyncLogWriter extends ProtobufLogWriter { 097 @Override 098 public void sync(boolean forceSync) throws IOException { 099 try { 100 Thread.sleep(syncLatencyMillis); 101 } catch (InterruptedException e) { 102 InterruptedIOException ex = new InterruptedIOException(); 103 ex.initCause(e); 104 throw ex; 105 } 106 super.sync(forceSync); 107 } 108 } 109 110 @Override 111 protected void setSlowLogWriter(Configuration conf) { 112 conf.set(FSHLogProvider.WRITER_IMPL, SlowSyncLogWriter.class.getName()); 113 } 114 115 @Override 116 protected void setDefaultLogWriter(Configuration conf) { 117 conf.set(FSHLogProvider.WRITER_IMPL, ProtobufLogWriter.class.getName()); 118 } 119 120 void batchWriteAndWait(Table table, final FSHLog log, int start, boolean expect, int timeout) 121 throws IOException { 122 for (int i = 0; i < 10; i++) { 123 Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", (start + i)))); 124 put.addColumn(HConstants.CATALOG_FAMILY, null, value); 125 table.put(put); 126 } 127 Put tmpPut = new Put(Bytes.toBytes("tmprow")); 128 tmpPut.addColumn(HConstants.CATALOG_FAMILY, null, value); 129 long startTime = EnvironmentEdgeManager.currentTime(); 130 long remaining = timeout; 131 while (remaining > 0) { 132 if (log.isLowReplicationRollEnabled() == expect) { 133 break; 134 } else { 135 // Trigger calling FSHlog#checkLowReplication() 136 table.put(tmpPut); 137 try { 138 Thread.sleep(200); 139 } catch (InterruptedException e) { 140 // continue 141 } 142 remaining = timeout - (EnvironmentEdgeManager.currentTime() - startTime); 143 } 144 } 145 } 146 147 @Test 148 public void testSlowSyncLogRolling() throws Exception { 149 // Create the test table 150 TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) 151 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); 152 admin.createTable(desc); 153 try (Table table = TEST_UTIL.getConnection().getTable(desc.getTableName())) { 154 server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); 155 RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); 156 final AbstractFSWAL<?> log = getWALAndRegisterSlowSyncHook(region); 157 158 // Set default log writer, no additional latency to any sync on the hlog. 159 checkSlowSync(log, table, -1, 10, false); 160 161 // Adds 200 ms of latency to any sync on the hlog. This should be more than sufficient to 162 // trigger slow sync warnings. 163 // Write some data. 164 // We need to write at least 5 times, but double it. We should only request 165 // a SLOW_SYNC roll once in the current interval. 166 checkSlowSync(log, table, 200, 10, true); 167 168 // Adds 5000 ms of latency to any sync on the hlog. This will trip the other threshold. 169 // Write some data. Should only take one sync. 170 checkSlowSync(log, table, 5000, 1, true); 171 172 // Set default log writer, no additional latency to any sync on the hlog. 173 checkSlowSync(log, table, -1, 10, false); 174 } 175 } 176 177 /** 178 * Tests that logs are rolled upon detecting datanode death Requires an HDFS jar with HDFS-826 & 179 * syncFs() support (HDFS-200) 180 */ 181 @Test 182 public void testLogRollOnDatanodeDeath() throws Exception { 183 184 Long oldValue = TEST_UTIL.getConfiguration() 185 .getLong("hbase.regionserver.hlog.check.lowreplication.interval", -1); 186 187 try { 188 /** 189 * When we reuse the code of AsyncFSWAL to FSHLog, the low replication is only checked by 190 * {@link LogRoller#checkLowReplication},so in order to make this test spend less time,we 191 * should minimize following config which is maximized by 192 * {@link AbstractTestLogRolling#setUpBeforeClass} 193 */ 194 TEST_UTIL.getConfiguration().setLong("hbase.regionserver.hlog.check.lowreplication.interval", 195 1000); 196 this.tearDown(); 197 this.setUp(); 198 199 TEST_UTIL.ensureSomeRegionServersAvailable(2); 200 assertTrue(fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) == 2, 201 "This test requires WAL file replication set to 2."); 202 LOG.info("Replication=" + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); 203 204 this.server = cluster.getRegionServer(0); 205 206 // Create the test table and open it 207 TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) 208 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); 209 210 admin.createTable(desc); 211 Table table = TEST_UTIL.getConnection().getTable(desc.getTableName()); 212 213 server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); 214 RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); 215 final FSHLog log = (FSHLog) server.getWAL(region); 216 final AtomicBoolean lowReplicationHookCalled = new AtomicBoolean(false); 217 218 log.registerWALActionsListener(new WALActionsListener() { 219 @Override 220 public void logRollRequested(WALActionsListener.RollRequestReason reason) { 221 switch (reason) { 222 case LOW_REPLICATION: 223 lowReplicationHookCalled.lazySet(true); 224 break; 225 default: 226 break; 227 } 228 } 229 }); 230 231 // add up the datanode count, to ensure proper replication when we kill 1 232 // This function is synchronous; when it returns, the dfs cluster is active 233 // We start 3 servers and then stop 2 to avoid a directory naming conflict 234 // when we stop/start a namenode later, as mentioned in HBASE-5163 235 List<DataNode> existingNodes = dfsCluster.getDataNodes(); 236 int numDataNodes = 3; 237 TEST_UTIL.getConfiguration().setLong("hbase.regionserver.hlog.check.lowreplication.interval", 238 1000); 239 dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), numDataNodes, true, null, null); 240 List<DataNode> allNodes = dfsCluster.getDataNodes(); 241 for (int i = allNodes.size() - 1; i >= 0; i--) { 242 if (existingNodes.contains(allNodes.get(i))) { 243 dfsCluster.stopDataNode(i); 244 } 245 } 246 247 assertTrue( 248 dfsCluster.getDataNodes().size() 249 >= fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) + 1, 250 "DataNodes " + dfsCluster.getDataNodes().size() + " default replication " 251 + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); 252 253 writeData(table, 2); 254 255 long curTime = EnvironmentEdgeManager.currentTime(); 256 LOG.info("log.getCurrentFileName(): " + log.getCurrentFileName()); 257 long oldFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log); 258 assertTrue(curTime > oldFilenum && oldFilenum != -1, 259 "Log should have a timestamp older than now"); 260 261 assertTrue(oldFilenum == AbstractFSWALProvider.extractFileNumFromWAL(log), 262 "The log shouldn't have rolled yet"); 263 final DatanodeInfo[] pipeline = log.getPipeline(); 264 assertTrue(pipeline.length == fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); 265 266 // kill a datanode in the pipeline to force a log roll on the next sync() 267 // This function is synchronous, when it returns the node is killed. 268 assertTrue(dfsCluster.stopDataNode(pipeline[0].getName()) != null); 269 270 // this write should succeed, but trigger a log roll 271 writeData(table, 2); 272 273 TEST_UTIL.waitFor(10000, 100, () -> { 274 long newFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log); 275 return newFilenum > oldFilenum && newFilenum > curTime && lowReplicationHookCalled.get(); 276 }); 277 278 long newFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log); 279 280 // write some more log data (this should use a new hdfs_out) 281 writeData(table, 3); 282 assertTrue(AbstractFSWALProvider.extractFileNumFromWAL(log) == newFilenum, 283 "The log should not roll again."); 284 // kill another datanode in the pipeline, so the replicas will be lower than 285 // the configured value 2. 286 assertTrue(dfsCluster.stopDataNode(pipeline[1].getName()) != null); 287 288 batchWriteAndWait(table, log, 3, false, 14000); 289 int replication = log.getLogReplication(); 290 assertTrue(!log.isLowReplicationRollEnabled(), 291 "LowReplication Roller should've been disabled, current replication=" + replication); 292 293 dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), 1, true, null, null); 294 295 // Force roll writer. The new log file will have the default replications, 296 // and the LowReplication Roller will be enabled. 297 log.rollWriter(true); 298 batchWriteAndWait(table, log, 13, true, 10000); 299 replication = log.getLogReplication(); 300 assertTrue(replication == fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()), 301 "New log file should have the default replication instead of " + replication); 302 assertTrue(log.isLowReplicationRollEnabled(), "LowReplication Roller should've been enabled"); 303 } finally { 304 TEST_UTIL.getConfiguration().setLong("hbase.regionserver.hlog.check.lowreplication.interval", 305 oldValue); 306 } 307 } 308 309 /** 310 * Test that WAL is rolled when all data nodes in the pipeline have been restarted. 311 */ 312 @Test 313 public void testLogRollOnPipelineRestart() throws Exception { 314 LOG.info("Starting testLogRollOnPipelineRestart"); 315 assertTrue(fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) > 1, 316 "This test requires WAL file replication."); 317 LOG.info("Replication=" + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); 318 // When the hbase:meta table can be opened, the region servers are running 319 Table t = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME); 320 try { 321 this.server = cluster.getRegionServer(0); 322 323 // Create the test table and open it 324 TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) 325 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); 326 327 admin.createTable(desc); 328 Table table = TEST_UTIL.getConnection().getTable(desc.getTableName()); 329 330 server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); 331 RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); 332 final WAL log = server.getWAL(region); 333 final List<Path> paths = new ArrayList<>(1); 334 final List<Integer> preLogRolledCalled = new ArrayList<>(); 335 336 paths.add(AbstractFSWALProvider.getCurrentFileName(log)); 337 log.registerWALActionsListener(new WALActionsListener() { 338 339 @Override 340 public void preLogRoll(Path oldFile, Path newFile) { 341 LOG.debug("preLogRoll: oldFile=" + oldFile + " newFile=" + newFile); 342 preLogRolledCalled.add(1); 343 } 344 345 @Override 346 public void postLogRoll(Path oldFile, Path newFile) { 347 paths.add(newFile); 348 } 349 }); 350 351 writeData(table, 1002); 352 353 long curTime = EnvironmentEdgeManager.currentTime(); 354 LOG.info("log.getCurrentFileName()): " + AbstractFSWALProvider.getCurrentFileName(log)); 355 long oldFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log); 356 assertTrue(curTime > oldFilenum && oldFilenum != -1, 357 "Log should have a timestamp older than now"); 358 359 assertTrue(oldFilenum == AbstractFSWALProvider.extractFileNumFromWAL(log), 360 "The log shouldn't have rolled yet"); 361 362 // roll all datanodes in the pipeline 363 dfsCluster.restartDataNodes(); 364 Thread.sleep(1000); 365 dfsCluster.waitActive(); 366 LOG.info("Data Nodes restarted"); 367 validateData(table, 1002); 368 369 // this write should succeed, but trigger a log roll 370 writeData(table, 1003); 371 long newFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log); 372 373 assertTrue(newFilenum > oldFilenum && newFilenum > curTime, 374 "Missing datanode should've triggered a log roll"); 375 validateData(table, 1003); 376 377 writeData(table, 1004); 378 379 // roll all datanode again 380 dfsCluster.restartDataNodes(); 381 Thread.sleep(1000); 382 dfsCluster.waitActive(); 383 LOG.info("Data Nodes restarted"); 384 validateData(table, 1004); 385 386 // this write should succeed, but trigger a log roll 387 writeData(table, 1005); 388 389 // force a log roll to read back and verify previously written logs 390 log.rollWriter(true); 391 assertTrue(preLogRolledCalled.size() >= 1, 392 "preLogRolledCalled has size of " + preLogRolledCalled.size()); 393 394 // read back the data written 395 Set<String> loggedRows = new HashSet<>(); 396 for (Path p : paths) { 397 LOG.debug("recovering lease for " + p); 398 RecoverLeaseFSUtils.recoverFileLease(((HFileSystem) fs).getBackingFs(), p, 399 TEST_UTIL.getConfiguration(), null); 400 401 LOG.debug("Reading WAL " + CommonFSUtils.getPath(p)); 402 try (WALStreamReader reader = 403 WALFactory.createStreamReader(fs, p, TEST_UTIL.getConfiguration())) { 404 WAL.Entry entry; 405 while ((entry = reader.next()) != null) { 406 LOG.debug("#" + entry.getKey().getSequenceId() + ": " + entry.getEdit().getCells()); 407 for (Cell cell : entry.getEdit().getCells()) { 408 loggedRows.add( 409 Bytes.toStringBinary(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); 410 } 411 } 412 } catch (EOFException e) { 413 LOG.debug("EOF reading file " + CommonFSUtils.getPath(p)); 414 } 415 } 416 417 // verify the written rows are there 418 assertTrue(loggedRows.contains("row1002")); 419 assertTrue(loggedRows.contains("row1003")); 420 assertTrue(loggedRows.contains("row1004")); 421 assertTrue(loggedRows.contains("row1005")); 422 423 // flush all regions 424 for (HRegion r : server.getOnlineRegionsLocalContext()) { 425 try { 426 r.flush(true); 427 } catch (Exception e) { 428 // This try/catch was added by HBASE-14317. It is needed 429 // because this issue tightened up the semantic such that 430 // a failed append could not be followed by a successful 431 // sync. What is coming out here is a failed sync, a sync 432 // that used to 'pass'. 433 LOG.info(e.toString(), e); 434 } 435 } 436 437 ResultScanner scanner = table.getScanner(new Scan()); 438 try { 439 for (int i = 2; i <= 5; i++) { 440 Result r = scanner.next(); 441 assertNotNull(r); 442 assertFalse(r.isEmpty()); 443 assertEquals("row100" + i, Bytes.toString(r.getRow())); 444 } 445 } finally { 446 scanner.close(); 447 } 448 449 // verify that no region servers aborted 450 for (JVMClusterUtil.RegionServerThread rsThread : TEST_UTIL.getHBaseCluster() 451 .getRegionServerThreads()) { 452 assertFalse(rsThread.getRegionServer().isAborted()); 453 } 454 } finally { 455 if (t != null) t.close(); 456 } 457 } 458 459}