001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver.wal; 020 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.hbase.HBaseTestingUtility; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.MiniHBaseCluster; 030import org.apache.hadoop.hbase.ServerName; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.Admin; 033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 034import org.apache.hadoop.hbase.client.Get; 035import org.apache.hadoop.hbase.client.Put; 036import org.apache.hadoop.hbase.client.RegionInfo; 037import org.apache.hadoop.hbase.client.Result; 038import org.apache.hadoop.hbase.client.Table; 039import org.apache.hadoop.hbase.client.TableDescriptor; 040import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 041import org.apache.hadoop.hbase.regionserver.HRegion; 042import org.apache.hadoop.hbase.regionserver.HRegionServer; 043import org.apache.hadoop.hbase.regionserver.Store; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.apache.hadoop.hbase.util.Threads; 046import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 047import org.apache.hadoop.hbase.wal.WAL; 048import org.apache.hadoop.hbase.wal.WALFactory; 049import org.apache.hadoop.hdfs.MiniDFSCluster; 050import org.junit.After; 051import org.junit.Assert; 052import org.junit.Before; 053import org.junit.BeforeClass; 054import org.junit.Rule; 055import org.junit.Test; 056import org.junit.rules.TestName; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060/** 061 * Test log deletion as logs are rolled. 062 */ 063public abstract class AbstractTestLogRolling { 064 private static final Logger LOG = LoggerFactory.getLogger(AbstractTestLogRolling.class); 065 protected HRegionServer server; 066 protected String tableName; 067 protected byte[] value; 068 protected FileSystem fs; 069 protected MiniDFSCluster dfsCluster; 070 protected Admin admin; 071 protected MiniHBaseCluster cluster; 072 protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 073 @Rule public final TestName name = new TestName(); 074 075 public AbstractTestLogRolling() { 076 this.server = null; 077 this.tableName = null; 078 079 String className = this.getClass().getName(); 080 StringBuilder v = new StringBuilder(className); 081 while (v.length() < 1000) { 082 v.append(className); 083 } 084 this.value = Bytes.toBytes(v.toString()); 085 } 086 087 // Need to override this setup so we can edit the config before it gets sent 088 // to the HDFS & HBase cluster startup. 089 @BeforeClass 090 public static void setUpBeforeClass() throws Exception { 091 /**** configuration for testLogRolling ****/ 092 // Force a region split after every 768KB 093 Configuration conf = TEST_UTIL.getConfiguration(); 094 conf.setLong(HConstants.HREGION_MAX_FILESIZE, 768L * 1024L); 095 096 // We roll the log after every 32 writes 097 conf.setInt("hbase.regionserver.maxlogentries", 32); 098 099 conf.setInt("hbase.regionserver.logroll.errors.tolerated", 2); 100 conf.setInt("hbase.rpc.timeout", 10 * 1000); 101 102 // For less frequently updated regions flush after every 2 flushes 103 conf.setInt("hbase.hregion.memstore.optionalflushcount", 2); 104 105 // We flush the cache after every 8192 bytes 106 conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 8192); 107 108 // Increase the amount of time between client retries 109 conf.setLong("hbase.client.pause", 10 * 1000); 110 111 // Reduce thread wake frequency so that other threads can get 112 // a chance to run. 113 conf.setInt(HConstants.THREAD_WAKE_FREQUENCY, 2 * 1000); 114 115 // disable low replication check for log roller to get a more stable result 116 // TestWALOpenAfterDNRollingStart will test this option. 117 conf.setLong("hbase.regionserver.hlog.check.lowreplication.interval", 24L * 60 * 60 * 1000); 118 } 119 120 @Before 121 public void setUp() throws Exception { 122 TEST_UTIL.startMiniCluster(1, 1, 2); 123 124 cluster = TEST_UTIL.getHBaseCluster(); 125 dfsCluster = TEST_UTIL.getDFSCluster(); 126 fs = TEST_UTIL.getTestFileSystem(); 127 admin = TEST_UTIL.getAdmin(); 128 129 // disable region rebalancing (interferes with log watching) 130 cluster.getMaster().balanceSwitch(false); 131 } 132 133 @After 134 public void tearDown() throws Exception { 135 TEST_UTIL.shutdownMiniCluster(); 136 } 137 138 protected void startAndWriteData() throws IOException, InterruptedException { 139 // When the hbase:meta table can be opened, the region servers are running 140 TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME); 141 this.server = cluster.getRegionServerThreads().get(0).getRegionServer(); 142 143 Table table = createTestTable(this.tableName); 144 145 server = TEST_UTIL.getRSForFirstRegionInTable(table.getName()); 146 for (int i = 1; i <= 256; i++) { // 256 writes should cause 8 log rolls 147 doPut(table, i); 148 if (i % 32 == 0) { 149 // After every 32 writes sleep to let the log roller run 150 try { 151 Thread.sleep(2000); 152 } catch (InterruptedException e) { 153 // continue 154 } 155 } 156 } 157 } 158 159 /** 160 * Tests that log rolling doesn't hang when no data is written. 161 */ 162 @Test 163 public void testLogRollOnNothingWritten() throws Exception { 164 final Configuration conf = TEST_UTIL.getConfiguration(); 165 final WALFactory wals = 166 new WALFactory(conf, ServerName.valueOf("test.com", 8080, 1).toString()); 167 final WAL newLog = wals.getWAL(null); 168 try { 169 // Now roll the log before we write anything. 170 newLog.rollWriter(true); 171 } finally { 172 wals.close(); 173 } 174 } 175 176 private void assertLogFileSize(WAL log) { 177 if (AbstractFSWALProvider.getNumRolledLogFiles(log) > 0) { 178 assertTrue(AbstractFSWALProvider.getLogFileSize(log) > 0); 179 } else { 180 assertEquals(0, AbstractFSWALProvider.getLogFileSize(log)); 181 } 182 } 183 184 /** 185 * Tests that logs are deleted 186 */ 187 @Test 188 public void testLogRolling() throws Exception { 189 this.tableName = getName(); 190 // TODO: Why does this write data take for ever? 191 startAndWriteData(); 192 RegionInfo region = server.getRegions(TableName.valueOf(tableName)).get(0).getRegionInfo(); 193 final WAL log = server.getWAL(region); 194 LOG.info("after writing there are " + AbstractFSWALProvider.getNumRolledLogFiles(log) + " log files"); 195 assertLogFileSize(log); 196 197 // flush all regions 198 for (HRegion r : server.getOnlineRegionsLocalContext()) { 199 r.flush(true); 200 } 201 202 // Now roll the log 203 log.rollWriter(); 204 205 int count = AbstractFSWALProvider.getNumRolledLogFiles(log); 206 LOG.info("after flushing all regions and rolling logs there are " + count + " log files"); 207 assertTrue(("actual count: " + count), count <= 2); 208 assertLogFileSize(log); 209 } 210 211 protected String getName() { 212 return "TestLogRolling-" + name.getMethodName(); 213 } 214 215 void writeData(Table table, int rownum) throws IOException { 216 doPut(table, rownum); 217 218 // sleep to let the log roller run (if it needs to) 219 try { 220 Thread.sleep(2000); 221 } catch (InterruptedException e) { 222 // continue 223 } 224 } 225 226 void validateData(Table table, int rownum) throws IOException { 227 String row = "row" + String.format("%1$04d", rownum); 228 Get get = new Get(Bytes.toBytes(row)); 229 get.addFamily(HConstants.CATALOG_FAMILY); 230 Result result = table.get(get); 231 assertTrue(result.size() == 1); 232 assertTrue(Bytes.equals(value, 233 result.getValue(HConstants.CATALOG_FAMILY, null))); 234 LOG.info("Validated row " + row); 235 } 236 237 /** 238 * Tests that logs are deleted when some region has a compaction 239 * record in WAL and no other records. See HBASE-8597. 240 */ 241 @Test 242 public void testCompactionRecordDoesntBlockRolling() throws Exception { 243 Table table = null; 244 245 // When the hbase:meta table can be opened, the region servers are running 246 Table t = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME); 247 try { 248 table = createTestTable(getName()); 249 250 server = TEST_UTIL.getRSForFirstRegionInTable(table.getName()); 251 HRegion region = server.getRegions(table.getName()).get(0); 252 final WAL log = server.getWAL(region.getRegionInfo()); 253 Store s = region.getStore(HConstants.CATALOG_FAMILY); 254 255 //have to flush namespace to ensure it doesn't affect wall tests 256 admin.flush(TableName.NAMESPACE_TABLE_NAME); 257 258 // Put some stuff into table, to make sure we have some files to compact. 259 for (int i = 1; i <= 2; ++i) { 260 doPut(table, i); 261 admin.flush(table.getName()); 262 } 263 doPut(table, 3); // don't flush yet, or compaction might trigger before we roll WAL 264 assertEquals("Should have no WAL after initial writes", 0, 265 AbstractFSWALProvider.getNumRolledLogFiles(log)); 266 assertEquals(2, s.getStorefilesCount()); 267 268 // Roll the log and compact table, to have compaction record in the 2nd WAL. 269 log.rollWriter(); 270 assertEquals("Should have WAL; one table is not flushed", 1, 271 AbstractFSWALProvider.getNumRolledLogFiles(log)); 272 admin.flush(table.getName()); 273 region.compact(false); 274 // Wait for compaction in case if flush triggered it before us. 275 Assert.assertNotNull(s); 276 for (int waitTime = 3000; s.getStorefilesCount() > 1 && waitTime > 0; waitTime -= 200) { 277 Threads.sleepWithoutInterrupt(200); 278 } 279 assertEquals("Compaction didn't happen", 1, s.getStorefilesCount()); 280 281 // Write some value to the table so the WAL cannot be deleted until table is flushed. 282 doPut(table, 0); // Now 2nd WAL will have both compaction and put record for table. 283 log.rollWriter(); // 1st WAL deleted, 2nd not deleted yet. 284 assertEquals("Should have WAL; one table is not flushed", 1, 285 AbstractFSWALProvider.getNumRolledLogFiles(log)); 286 287 // Flush table to make latest WAL obsolete; write another record, and roll again. 288 admin.flush(table.getName()); 289 doPut(table, 1); 290 log.rollWriter(); // Now 2nd WAL is deleted and 3rd is added. 291 assertEquals("Should have 1 WALs at the end", 1, 292 AbstractFSWALProvider.getNumRolledLogFiles(log)); 293 } finally { 294 if (t != null) t.close(); 295 if (table != null) table.close(); 296 } 297 } 298 299 protected void doPut(Table table, int i) throws IOException { 300 Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i))); 301 put.addColumn(HConstants.CATALOG_FAMILY, null, value); 302 table.put(put); 303 } 304 305 protected Table createTestTable(String tableName) throws IOException { 306 // Create the test table and open it 307 TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) 308 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); 309 admin.createTable(desc); 310 return TEST_UTIL.getConnection().getTable(desc.getTableName()); 311 } 312}