001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.hamcrest.MatcherAssert.assertThat; 021import static org.hamcrest.Matchers.greaterThan; 022import static org.junit.jupiter.api.Assertions.assertEquals; 023import static org.junit.jupiter.api.Assertions.assertFalse; 024import static org.junit.jupiter.api.Assertions.assertNotNull; 025import static org.junit.jupiter.api.Assertions.assertTrue; 026 027import java.io.IOException; 028import java.util.concurrent.Executors; 029import java.util.concurrent.ScheduledExecutorService; 030import java.util.concurrent.atomic.AtomicBoolean; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.hbase.HBaseTestingUtil; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.ServerName; 036import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 037import org.apache.hadoop.hbase.StartTestingClusterOption; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.Waiter; 040import org.apache.hadoop.hbase.client.Admin; 041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 042import org.apache.hadoop.hbase.client.Get; 043import org.apache.hadoop.hbase.client.Put; 044import org.apache.hadoop.hbase.client.RegionInfo; 045import org.apache.hadoop.hbase.client.Result; 046import org.apache.hadoop.hbase.client.Table; 047import org.apache.hadoop.hbase.client.TableDescriptor; 048import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 049import org.apache.hadoop.hbase.regionserver.HRegion; 050import org.apache.hadoop.hbase.regionserver.HRegionServer; 051import org.apache.hadoop.hbase.regionserver.Store; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.util.Threads; 054import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 055import org.apache.hadoop.hbase.wal.WAL; 056import org.apache.hadoop.hbase.wal.WALFactory; 057import org.apache.hadoop.hbase.wal.WALProvider; 058import org.apache.hadoop.hdfs.MiniDFSCluster; 059import org.junit.jupiter.api.AfterAll; 060import org.junit.jupiter.api.AfterEach; 061import org.junit.jupiter.api.BeforeAll; 062import org.junit.jupiter.api.BeforeEach; 063import org.junit.jupiter.api.Test; 064import org.junit.jupiter.api.TestInfo; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067 068import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 069 070/** 071 * Test log deletion as logs are rolled. 072 */ 073public abstract class AbstractTestLogRolling { 074 private static final Logger LOG = LoggerFactory.getLogger(AbstractTestLogRolling.class); 075 protected HRegionServer server; 076 protected String tableName; 077 protected byte[] value; 078 protected FileSystem fs; 079 protected MiniDFSCluster dfsCluster; 080 protected Admin admin; 081 protected SingleProcessHBaseCluster cluster; 082 protected static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 083 private String name; 084 protected static int syncLatencyMillis; 085 private static int rowNum = 1; 086 private static final AtomicBoolean slowSyncHookCalled = new AtomicBoolean(); 087 protected static ScheduledExecutorService EXECUTOR; 088 089 public AbstractTestLogRolling() { 090 this.server = null; 091 this.tableName = null; 092 093 String className = this.getClass().getName(); 094 StringBuilder v = new StringBuilder(className); 095 while (v.length() < 1000) { 096 v.append(className); 097 } 098 this.value = Bytes.toBytes(v.toString()); 099 } 100 101 // Need to override this setup so we can edit the config before it gets sent 102 // to the HDFS & HBase cluster startup. 103 @BeforeAll 104 public static void setUpBeforeClass() throws Exception { 105 /**** configuration for testLogRolling ****/ 106 // Force a region split after every 768KB 107 Configuration conf = TEST_UTIL.getConfiguration(); 108 conf.setLong(HConstants.HREGION_MAX_FILESIZE, 768L * 1024L); 109 110 // We roll the log after every 32 writes 111 conf.setInt("hbase.regionserver.maxlogentries", 32); 112 113 conf.setInt("hbase.regionserver.logroll.errors.tolerated", 2); 114 conf.setInt("hbase.rpc.timeout", 10 * 1000); 115 116 // For less frequently updated regions flush after every 2 flushes 117 conf.setInt("hbase.hregion.memstore.optionalflushcount", 2); 118 119 // We flush the cache after every 8192 bytes 120 conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 8192); 121 122 // Increase the amount of time between client retries 123 conf.setLong("hbase.client.pause", 10 * 1000); 124 125 // Reduce thread wake frequency so that other threads can get 126 // a chance to run. 127 conf.setInt(HConstants.THREAD_WAKE_FREQUENCY, 2 * 1000); 128 129 // disable low replication check for log roller to get a more stable result 130 // TestWALOpenAfterDNRollingStart will test this option. 131 conf.setLong("hbase.regionserver.hlog.check.lowreplication.interval", 24L * 60 * 60 * 1000); 132 133 // For slow sync threshold test: roll after 5 slow syncs in 10 seconds 134 conf.setInt(FSHLog.SLOW_SYNC_ROLL_THRESHOLD, 5); 135 conf.setInt(FSHLog.SLOW_SYNC_ROLL_INTERVAL_MS, 10 * 1000); 136 // For slow sync threshold test: roll once after a sync above this threshold 137 conf.setInt(FSHLog.ROLL_ON_SYNC_TIME_MS, 5000); 138 139 // Slow sync executor. 140 EXECUTOR = Executors 141 .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("Slow-sync-%d") 142 .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 143 } 144 145 @BeforeEach 146 public void initTestName(TestInfo testInfo) { 147 name = testInfo.getTestMethod().get().getName(); 148 } 149 150 @BeforeEach 151 public void setUp() throws Exception { 152 // Use 2 DataNodes and default values for other StartMiniCluster options. 153 TEST_UTIL.startMiniCluster(StartTestingClusterOption.builder().numDataNodes(2).build()); 154 155 cluster = TEST_UTIL.getHBaseCluster(); 156 dfsCluster = TEST_UTIL.getDFSCluster(); 157 fs = TEST_UTIL.getTestFileSystem(); 158 admin = TEST_UTIL.getAdmin(); 159 160 // disable region rebalancing (interferes with log watching) 161 cluster.getMaster().balanceSwitch(false); 162 } 163 164 @AfterEach 165 public void tearDown() throws Exception { 166 TEST_UTIL.shutdownMiniCluster(); 167 } 168 169 @AfterAll 170 public static void tearDownAfterClass() { 171 EXECUTOR.shutdownNow(); 172 } 173 174 private void startAndWriteData() throws IOException, InterruptedException { 175 this.server = cluster.getRegionServerThreads().get(0).getRegionServer(); 176 177 Table table = createTestTable(this.tableName); 178 179 server = TEST_UTIL.getRSForFirstRegionInTable(table.getName()); 180 for (int i = 1; i <= 256; i++) { // 256 writes should cause 8 log rolls 181 doPut(table, i); 182 if (i % 32 == 0) { 183 // After every 32 writes sleep to let the log roller run 184 try { 185 Thread.sleep(2000); 186 } catch (InterruptedException e) { 187 // continue 188 } 189 } 190 } 191 } 192 193 private static void setSyncLatencyMillis(int latency) { 194 syncLatencyMillis = latency; 195 } 196 197 protected final AbstractFSWAL<?> getWALAndRegisterSlowSyncHook(RegionInfo region) 198 throws IOException { 199 // Get a reference to the wal. 200 final AbstractFSWAL<?> log = (AbstractFSWAL<?>) server.getWAL(region); 201 202 // Register a WALActionsListener to observe if a SLOW_SYNC roll is requested 203 log.registerWALActionsListener(new WALActionsListener() { 204 @Override 205 public void logRollRequested(RollRequestReason reason) { 206 switch (reason) { 207 case SLOW_SYNC: 208 slowSyncHookCalled.lazySet(true); 209 break; 210 default: 211 break; 212 } 213 } 214 }); 215 return log; 216 } 217 218 protected final void checkSlowSync(AbstractFSWAL<?> log, Table table, int slowSyncLatency, 219 int writeCount, boolean slowSync) throws Exception { 220 if (slowSyncLatency > 0) { 221 setSyncLatencyMillis(slowSyncLatency); 222 setSlowLogWriter(log.conf); 223 } else { 224 setDefaultLogWriter(log.conf); 225 } 226 227 // Set up for test 228 log.rollWriter(true); 229 slowSyncHookCalled.set(false); 230 231 final WALProvider.WriterBase oldWriter = log.getWriter(); 232 233 // Write some data 234 for (int i = 0; i < writeCount; i++) { 235 writeData(table, rowNum++); 236 } 237 238 if (slowSync) { 239 TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() { 240 @Override 241 public boolean evaluate() throws Exception { 242 return log.getWriter() != oldWriter; 243 } 244 245 @Override 246 public String explainFailure() throws Exception { 247 return "Waited too long for our test writer to get rolled out"; 248 } 249 }); 250 251 assertTrue(slowSyncHookCalled.get(), "Should have triggered log roll due to SLOW_SYNC"); 252 } else { 253 assertFalse(slowSyncHookCalled.get(), "Should not have triggered log roll due to SLOW_SYNC"); 254 } 255 } 256 257 protected abstract void setSlowLogWriter(Configuration conf); 258 259 protected abstract void setDefaultLogWriter(Configuration conf); 260 261 /** 262 * Tests that log rolling doesn't hang when no data is written. 263 */ 264 @Test 265 public void testLogRollOnNothingWritten() throws Exception { 266 final Configuration conf = TEST_UTIL.getConfiguration(); 267 final WALFactory wals = 268 new WALFactory(conf, ServerName.valueOf("test.com", 8080, 1).toString()); 269 final WAL newLog = wals.getWAL(null); 270 try { 271 // Now roll the log before we write anything. 272 newLog.rollWriter(true); 273 } finally { 274 wals.close(); 275 } 276 } 277 278 /** 279 * Tests that logs are deleted 280 */ 281 @Test 282 public void testLogRolling() throws Exception { 283 this.tableName = getName(); 284 // TODO: Why does this write data take for ever? 285 startAndWriteData(); 286 RegionInfo region = server.getRegions(TableName.valueOf(tableName)).get(0).getRegionInfo(); 287 final WAL log = server.getWAL(region); 288 LOG.info( 289 "after writing there are " + AbstractFSWALProvider.getNumRolledLogFiles(log) + " log files"); 290 291 // roll the log, so we should have at least one rolled file and the log file size should be 292 // greater than 0, in case in the above method we rolled in the last round and then flushed so 293 // all the old wal files are deleted and cause the below assertion to fail 294 log.rollWriter(); 295 296 assertThat(AbstractFSWALProvider.getLogFileSize(log), greaterThan(0L)); 297 298 // flush all regions 299 for (HRegion r : server.getOnlineRegionsLocalContext()) { 300 r.flush(true); 301 } 302 303 // Now roll the log the again 304 log.rollWriter(); 305 306 // should have deleted all the rolled wal files 307 TEST_UTIL.waitFor(5000, () -> AbstractFSWALProvider.getNumRolledLogFiles(log) == 0); 308 assertEquals(0, AbstractFSWALProvider.getLogFileSize(log)); 309 } 310 311 protected String getName() { 312 return "TestLogRolling-" + name; 313 } 314 315 void writeData(Table table, int rownum) throws IOException { 316 doPut(table, rownum); 317 318 // sleep to let the log roller run (if it needs to) 319 try { 320 Thread.sleep(2000); 321 } catch (InterruptedException e) { 322 // continue 323 } 324 } 325 326 void validateData(Table table, int rownum) throws IOException { 327 String row = "row" + String.format("%1$04d", rownum); 328 Get get = new Get(Bytes.toBytes(row)); 329 get.addFamily(HConstants.CATALOG_FAMILY); 330 Result result = table.get(get); 331 assertTrue(result.size() == 1); 332 assertTrue(Bytes.equals(value, result.getValue(HConstants.CATALOG_FAMILY, null))); 333 LOG.info("Validated row " + row); 334 } 335 336 /** 337 * Tests that logs are deleted when some region has a compaction record in WAL and no other 338 * records. See HBASE-8597. 339 */ 340 @Test 341 public void testCompactionRecordDoesntBlockRolling() throws Exception { 342 343 // When the hbase:meta table can be opened, the region servers are running 344 try (Table t = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME); 345 Table table = createTestTable(getName())) { 346 347 server = TEST_UTIL.getRSForFirstRegionInTable(table.getName()); 348 HRegion region = server.getRegions(table.getName()).get(0); 349 final WAL log = server.getWAL(region.getRegionInfo()); 350 Store s = region.getStore(HConstants.CATALOG_FAMILY); 351 352 // Put some stuff into table, to make sure we have some files to compact. 353 for (int i = 1; i <= 2; ++i) { 354 doPut(table, i); 355 admin.flush(table.getName()); 356 } 357 doPut(table, 3); // don't flush yet, or compaction might trigger before we roll WAL 358 assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(log), 359 "Should have no WAL after initial writes"); 360 assertEquals(2, s.getStorefilesCount()); 361 362 // Roll the log and compact table, to have compaction record in the 2nd WAL. 363 log.rollWriter(); 364 assertEquals(1, AbstractFSWALProvider.getNumRolledLogFiles(log), 365 "Should have WAL; one table is not flushed"); 366 admin.flush(table.getName()); 367 region.compact(false); 368 // Wait for compaction in case if flush triggered it before us. 369 assertNotNull(s); 370 for (int waitTime = 3000; s.getStorefilesCount() > 1 && waitTime > 0; waitTime -= 200) { 371 Threads.sleepWithoutInterrupt(200); 372 } 373 assertEquals(1, s.getStorefilesCount(), "Compaction didn't happen"); 374 375 // Write some value to the table so the WAL cannot be deleted until table is flushed. 376 doPut(table, 0); // Now 2nd WAL will have both compaction and put record for table. 377 log.rollWriter(); // 1st WAL deleted, 2nd not deleted yet. 378 assertEquals(1, AbstractFSWALProvider.getNumRolledLogFiles(log), 379 "Should have WAL; one table is not flushed"); 380 381 // Flush table to make latest WAL obsolete; write another record, and roll again. 382 admin.flush(table.getName()); 383 doPut(table, 1); 384 log.rollWriter(); // Now 2nd WAL is deleted and 3rd is added. 385 assertEquals(1, AbstractFSWALProvider.getNumRolledLogFiles(log), 386 "Should have 1 WALs at the end"); 387 } 388 } 389 390 protected void doPut(Table table, int i) throws IOException { 391 Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i))); 392 put.addColumn(HConstants.CATALOG_FAMILY, null, value); 393 table.put(put); 394 } 395 396 protected Table createTestTable(String tableName) throws IOException { 397 // Create the test table and open it 398 TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) 399 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); 400 admin.createTable(desc); 401 return TEST_UTIL.getConnection().getTable(desc.getTableName()); 402 } 403}