001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.FileSystem; 026import org.apache.hadoop.hbase.HBaseTestingUtility; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.MiniHBaseCluster; 029import org.apache.hadoop.hbase.ServerName; 030import org.apache.hadoop.hbase.StartMiniClusterOption; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.Admin; 033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 034import org.apache.hadoop.hbase.client.Get; 035import org.apache.hadoop.hbase.client.Put; 036import org.apache.hadoop.hbase.client.RegionInfo; 037import org.apache.hadoop.hbase.client.Result; 038import org.apache.hadoop.hbase.client.Table; 039import org.apache.hadoop.hbase.client.TableDescriptor; 040import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 041import org.apache.hadoop.hbase.regionserver.HRegion; 042import org.apache.hadoop.hbase.regionserver.HRegionServer; 043import org.apache.hadoop.hbase.regionserver.Store; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.apache.hadoop.hbase.util.Threads; 046import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 047import org.apache.hadoop.hbase.wal.WAL; 048import org.apache.hadoop.hbase.wal.WALFactory; 049import org.apache.hadoop.hdfs.MiniDFSCluster; 050import org.junit.After; 051import org.junit.Assert; 052import org.junit.Before; 053import org.junit.BeforeClass; 054import org.junit.Rule; 055import org.junit.Test; 056import org.junit.rules.TestName; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060/** 061 * Test log deletion as logs are rolled. 062 */ 063public abstract class AbstractTestLogRolling { 064 private static final Logger LOG = LoggerFactory.getLogger(AbstractTestLogRolling.class); 065 protected HRegionServer server; 066 protected String tableName; 067 protected byte[] value; 068 protected FileSystem fs; 069 protected MiniDFSCluster dfsCluster; 070 protected Admin admin; 071 protected MiniHBaseCluster cluster; 072 protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 073 @Rule 074 public final TestName name = new TestName(); 075 076 public AbstractTestLogRolling() { 077 this.server = null; 078 this.tableName = null; 079 080 String className = this.getClass().getName(); 081 StringBuilder v = new StringBuilder(className); 082 while (v.length() < 1000) { 083 v.append(className); 084 } 085 this.value = Bytes.toBytes(v.toString()); 086 } 087 088 // Need to override this setup so we can edit the config before it gets sent 089 // to the HDFS & HBase cluster startup. 090 @BeforeClass 091 public static void setUpBeforeClass() throws Exception { 092 /**** configuration for testLogRolling ****/ 093 // Force a region split after every 768KB 094 Configuration conf = TEST_UTIL.getConfiguration(); 095 conf.setLong(HConstants.HREGION_MAX_FILESIZE, 768L * 1024L); 096 097 // We roll the log after every 32 writes 098 conf.setInt("hbase.regionserver.maxlogentries", 32); 099 100 conf.setInt("hbase.regionserver.logroll.errors.tolerated", 2); 101 conf.setInt("hbase.rpc.timeout", 10 * 1000); 102 103 // For less frequently updated regions flush after every 2 flushes 104 conf.setInt("hbase.hregion.memstore.optionalflushcount", 2); 105 106 // We flush the cache after every 8192 bytes 107 conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 8192); 108 109 // Increase the amount of time between client retries 110 conf.setLong("hbase.client.pause", 10 * 1000); 111 112 // Reduce thread wake frequency so that other threads can get 113 // a chance to run. 114 conf.setInt(HConstants.THREAD_WAKE_FREQUENCY, 2 * 1000); 115 116 // disable low replication check for log roller to get a more stable result 117 // TestWALOpenAfterDNRollingStart will test this option. 118 conf.setLong("hbase.regionserver.hlog.check.lowreplication.interval", 24L * 60 * 60 * 1000); 119 } 120 121 @Before 122 public void setUp() throws Exception { 123 // Use 2 DataNodes and default values for other StartMiniCluster options. 124 TEST_UTIL.startMiniCluster(StartMiniClusterOption.builder().numDataNodes(2).build()); 125 126 cluster = TEST_UTIL.getHBaseCluster(); 127 dfsCluster = TEST_UTIL.getDFSCluster(); 128 fs = TEST_UTIL.getTestFileSystem(); 129 admin = TEST_UTIL.getAdmin(); 130 131 // disable region rebalancing (interferes with log watching) 132 cluster.getMaster().balanceSwitch(false); 133 } 134 135 @After 136 public void tearDown() throws Exception { 137 TEST_UTIL.shutdownMiniCluster(); 138 } 139 140 protected void startAndWriteData() throws IOException, InterruptedException { 141 // When the hbase:meta table can be opened, the region servers are running 142 TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME); 143 this.server = cluster.getRegionServerThreads().get(0).getRegionServer(); 144 145 Table table = createTestTable(this.tableName); 146 147 server = TEST_UTIL.getRSForFirstRegionInTable(table.getName()); 148 for (int i = 1; i <= 256; i++) { // 256 writes should cause 8 log rolls 149 doPut(table, i); 150 if (i % 32 == 0) { 151 // After every 32 writes sleep to let the log roller run 152 try { 153 Thread.sleep(2000); 154 } catch (InterruptedException e) { 155 // continue 156 } 157 } 158 } 159 } 160 161 /** 162 * Tests that log rolling doesn't hang when no data is written. 163 */ 164 @Test 165 public void testLogRollOnNothingWritten() throws Exception { 166 final Configuration conf = TEST_UTIL.getConfiguration(); 167 final WALFactory wals = 168 new WALFactory(conf, ServerName.valueOf("test.com", 8080, 1).toString()); 169 final WAL newLog = wals.getWAL(null); 170 try { 171 // Now roll the log before we write anything. 172 newLog.rollWriter(true); 173 } finally { 174 wals.close(); 175 } 176 } 177 178 private void assertLogFileSize(WAL log) throws InterruptedException { 179 if (AbstractFSWALProvider.getNumRolledLogFiles(log) > 0) { 180 assertTrue(AbstractFSWALProvider.getLogFileSize(log) > 0); 181 } else { 182 for (int i = 0; i < 10; i++) { 183 if (AbstractFSWALProvider.getLogFileSize(log) != 0) { 184 Thread.sleep(10); 185 } 186 } 187 assertEquals(0, AbstractFSWALProvider.getLogFileSize(log)); 188 } 189 } 190 191 /** 192 * Tests that logs are deleted 193 */ 194 @Test 195 public void testLogRolling() throws Exception { 196 this.tableName = getName(); 197 // TODO: Why does this write data take for ever? 198 startAndWriteData(); 199 RegionInfo region = server.getRegions(TableName.valueOf(tableName)).get(0).getRegionInfo(); 200 final WAL log = server.getWAL(region); 201 LOG.info( 202 "after writing there are " + AbstractFSWALProvider.getNumRolledLogFiles(log) + " log files"); 203 assertLogFileSize(log); 204 205 // flush all regions 206 for (HRegion r : server.getOnlineRegionsLocalContext()) { 207 r.flush(true); 208 } 209 210 // Now roll the log 211 log.rollWriter(); 212 213 int count = AbstractFSWALProvider.getNumRolledLogFiles(log); 214 LOG.info("after flushing all regions and rolling logs there are " + count + " log files"); 215 assertTrue(("actual count: " + count), count <= 2); 216 assertLogFileSize(log); 217 } 218 219 protected String getName() { 220 return "TestLogRolling-" + name.getMethodName(); 221 } 222 223 void writeData(Table table, int rownum) throws IOException { 224 doPut(table, rownum); 225 226 // sleep to let the log roller run (if it needs to) 227 try { 228 Thread.sleep(2000); 229 } catch (InterruptedException e) { 230 // continue 231 } 232 } 233 234 void validateData(Table table, int rownum) throws IOException { 235 String row = "row" + String.format("%1$04d", rownum); 236 Get get = new Get(Bytes.toBytes(row)); 237 get.addFamily(HConstants.CATALOG_FAMILY); 238 Result result = table.get(get); 239 assertTrue(result.size() == 1); 240 assertTrue(Bytes.equals(value, result.getValue(HConstants.CATALOG_FAMILY, null))); 241 LOG.info("Validated row " + row); 242 } 243 244 /** 245 * Tests that logs are deleted when some region has a compaction record in WAL and no other 246 * records. See HBASE-8597. 247 */ 248 @Test 249 public void testCompactionRecordDoesntBlockRolling() throws Exception { 250 Table table = null; 251 252 // When the hbase:meta table can be opened, the region servers are running 253 Table t = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME); 254 try { 255 table = createTestTable(getName()); 256 257 server = TEST_UTIL.getRSForFirstRegionInTable(table.getName()); 258 HRegion region = server.getRegions(table.getName()).get(0); 259 final WAL log = server.getWAL(region.getRegionInfo()); 260 Store s = region.getStore(HConstants.CATALOG_FAMILY); 261 262 // have to flush namespace to ensure it doesn't affect wall tests 263 admin.flush(TableName.NAMESPACE_TABLE_NAME); 264 265 // Put some stuff into table, to make sure we have some files to compact. 266 for (int i = 1; i <= 2; ++i) { 267 doPut(table, i); 268 admin.flush(table.getName()); 269 } 270 doPut(table, 3); // don't flush yet, or compaction might trigger before we roll WAL 271 assertEquals("Should have no WAL after initial writes", 0, 272 AbstractFSWALProvider.getNumRolledLogFiles(log)); 273 assertEquals(2, s.getStorefilesCount()); 274 275 // Roll the log and compact table, to have compaction record in the 2nd WAL. 276 log.rollWriter(); 277 assertEquals("Should have WAL; one table is not flushed", 1, 278 AbstractFSWALProvider.getNumRolledLogFiles(log)); 279 admin.flush(table.getName()); 280 region.compact(false); 281 // Wait for compaction in case if flush triggered it before us. 282 Assert.assertNotNull(s); 283 for (int waitTime = 3000; s.getStorefilesCount() > 1 && waitTime > 0; waitTime -= 200) { 284 Threads.sleepWithoutInterrupt(200); 285 } 286 assertEquals("Compaction didn't happen", 1, s.getStorefilesCount()); 287 288 // Write some value to the table so the WAL cannot be deleted until table is flushed. 289 doPut(table, 0); // Now 2nd WAL will have both compaction and put record for table. 290 log.rollWriter(); // 1st WAL deleted, 2nd not deleted yet. 291 assertEquals("Should have WAL; one table is not flushed", 1, 292 AbstractFSWALProvider.getNumRolledLogFiles(log)); 293 294 // Flush table to make latest WAL obsolete; write another record, and roll again. 295 admin.flush(table.getName()); 296 doPut(table, 1); 297 log.rollWriter(); // Now 2nd WAL is deleted and 3rd is added. 298 assertEquals("Should have 1 WALs at the end", 1, 299 AbstractFSWALProvider.getNumRolledLogFiles(log)); 300 } finally { 301 if (t != null) t.close(); 302 if (table != null) table.close(); 303 } 304 } 305 306 protected void doPut(Table table, int i) throws IOException { 307 Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i))); 308 put.addColumn(HConstants.CATALOG_FAMILY, null, value); 309 table.put(put); 310 } 311 312 protected Table createTestTable(String tableName) throws IOException { 313 // Create the test table and open it 314 TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) 315 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); 316 admin.createTable(desc); 317 return TEST_UTIL.getConnection().getTable(desc.getTableName()); 318 } 319}