001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNull; 023import static org.junit.Assert.assertTrue; 024 025import java.io.IOException; 026import java.util.HashSet; 027import java.util.NavigableMap; 028import java.util.Set; 029import java.util.TreeMap; 030import java.util.concurrent.ThreadLocalRandom; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileStatus; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseTestingUtility; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.KeyValue; 039import org.apache.hadoop.hbase.ServerName; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 042import org.apache.hadoop.hbase.client.RegionInfo; 043import org.apache.hadoop.hbase.client.RegionInfoBuilder; 044import org.apache.hadoop.hbase.client.TableDescriptor; 045import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 046import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 047import org.apache.hadoop.hbase.testclassification.MediumTests; 048import org.apache.hadoop.hbase.testclassification.RegionServerTests; 049import org.apache.hadoop.hbase.util.Bytes; 050import org.apache.hadoop.hbase.util.CommonFSUtils; 051import org.junit.AfterClass; 052import org.junit.Before; 053import org.junit.BeforeClass; 054import org.junit.ClassRule; 055import org.junit.Rule; 056import org.junit.Test; 057import org.junit.experimental.categories.Category; 058import org.junit.rules.TestName; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062@Category({RegionServerTests.class, MediumTests.class}) 063public class TestFSHLogProvider { 064 065 @ClassRule 066 public static final HBaseClassTestRule CLASS_RULE = 067 HBaseClassTestRule.forClass(TestFSHLogProvider.class); 068 069 private static final Logger LOG = LoggerFactory.getLogger(TestFSHLogProvider.class); 070 071 private static Configuration conf; 072 private static FileSystem fs; 073 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 074 private MultiVersionConcurrencyControl mvcc; 075 076 @Rule 077 public final TestName currentTest = new TestName(); 078 079 @Before 080 public void setUp() throws Exception { 081 mvcc = new MultiVersionConcurrencyControl(); 082 FileStatus[] entries = fs.listStatus(new Path("/")); 083 for (FileStatus dir : entries) { 084 fs.delete(dir.getPath(), true); 085 } 086 } 087 088 @BeforeClass 089 public static void setUpBeforeClass() throws Exception { 090 // Make block sizes small. 091 TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024); 092 // quicker heartbeat interval for faster DN death notification 093 TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000); 094 TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1); 095 TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000); 096 097 // faster failover with cluster.shutdown();fs.close() idiom 098 TEST_UTIL.getConfiguration() 099 .setInt("hbase.ipc.client.connect.max.retries", 1); 100 TEST_UTIL.getConfiguration().setInt( 101 "dfs.client.block.recovery.retries", 1); 102 TEST_UTIL.getConfiguration().setInt( 103 "hbase.ipc.client.connection.maxidletime", 500); 104 TEST_UTIL.startMiniDFSCluster(3); 105 106 // Set up a working space for our tests. 107 TEST_UTIL.createRootDir(); 108 conf = TEST_UTIL.getConfiguration(); 109 fs = TEST_UTIL.getDFSCluster().getFileSystem(); 110 } 111 112 @AfterClass 113 public static void tearDownAfterClass() throws Exception { 114 TEST_UTIL.shutdownMiniCluster(); 115 } 116 117 static String getName() { 118 return "TestDefaultWALProvider"; 119 } 120 121 @Test 122 public void testGetServerNameFromWALDirectoryName() throws IOException { 123 ServerName sn = ServerName.valueOf("hn", 450, 1398); 124 String hl = CommonFSUtils.getRootDir(conf) + "/" + 125 AbstractFSWALProvider.getWALDirectoryName(sn.toString()); 126 127 // Must not throw exception 128 assertNull(AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, null)); 129 assertNull(AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, 130 CommonFSUtils.getRootDir(conf).toUri().toString())); 131 assertNull(AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, "")); 132 assertNull(AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, " ")); 133 assertNull(AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, hl)); 134 assertNull(AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, hl + "qdf")); 135 assertNull(AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, "sfqf" + hl + "qdf")); 136 137 final String wals = "/WALs/"; 138 ServerName parsed = AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, 139 CommonFSUtils.getRootDir(conf).toUri().toString() + wals + sn + 140 "/localhost%2C32984%2C1343316388997.1343316390417"); 141 assertEquals("standard", sn, parsed); 142 143 parsed = AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, hl + "/qdf"); 144 assertEquals("subdir", sn, parsed); 145 146 parsed = AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, 147 CommonFSUtils.getRootDir(conf).toUri().toString() + wals + sn + 148 "-splitting/localhost%3A57020.1340474893931"); 149 assertEquals("split", sn, parsed); 150 } 151 152 153 private void addEdits(WAL log, RegionInfo hri, TableDescriptor htd, int times, 154 NavigableMap<byte[], Integer> scopes) throws IOException { 155 final byte[] row = Bytes.toBytes("row"); 156 for (int i = 0; i < times; i++) { 157 long timestamp = System.currentTimeMillis(); 158 WALEdit cols = new WALEdit(); 159 cols.add(new KeyValue(row, row, row, timestamp, row)); 160 log.appendData(hri, 161 getWalKey(hri.getEncodedNameAsBytes(), htd.getTableName(), timestamp, scopes), cols); 162 } 163 log.sync(); 164 } 165 166 /** 167 * used by TestDefaultWALProviderWithHLogKey 168 * @param scopes 169 */ 170 WALKeyImpl getWalKey(final byte[] info, final TableName tableName, final long timestamp, 171 NavigableMap<byte[], Integer> scopes) { 172 return new WALKeyImpl(info, tableName, timestamp, mvcc, scopes); 173 } 174 175 /** 176 * helper method to simulate region flush for a WAL. 177 * @param wal 178 * @param regionEncodedName 179 */ 180 protected void flushRegion(WAL wal, byte[] regionEncodedName, Set<byte[]> flushedFamilyNames) { 181 wal.startCacheFlush(regionEncodedName, flushedFamilyNames); 182 wal.completeCacheFlush(regionEncodedName, HConstants.NO_SEQNUM); 183 } 184 185 @Test 186 public void testLogCleaning() throws Exception { 187 LOG.info(currentTest.getMethodName()); 188 TableDescriptor htd = 189 TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName())) 190 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build(); 191 TableDescriptor htd2 = 192 TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName() + "2")) 193 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build(); 194 NavigableMap<byte[], Integer> scopes1 = new TreeMap<>(Bytes.BYTES_COMPARATOR); 195 for (byte[] fam : htd.getColumnFamilyNames()) { 196 scopes1.put(fam, 0); 197 } 198 NavigableMap<byte[], Integer> scopes2 = new TreeMap<>(Bytes.BYTES_COMPARATOR); 199 for (byte[] fam : htd2.getColumnFamilyNames()) { 200 scopes2.put(fam, 0); 201 } 202 Configuration localConf = new Configuration(conf); 203 localConf.set(WALFactory.WAL_PROVIDER, FSHLogProvider.class.getName()); 204 WALFactory wals = new WALFactory(localConf, currentTest.getMethodName()); 205 try { 206 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 207 RegionInfo hri2 = RegionInfoBuilder.newBuilder(htd2.getTableName()).build(); 208 // we want to mix edits from regions, so pick our own identifier. 209 WAL log = wals.getWAL(null); 210 211 // Add a single edit and make sure that rolling won't remove the file 212 // Before HBASE-3198 it used to delete it 213 addEdits(log, hri, htd, 1, scopes1); 214 log.rollWriter(); 215 assertEquals(1, AbstractFSWALProvider.getNumRolledLogFiles(log)); 216 217 // See if there's anything wrong with more than 1 edit 218 addEdits(log, hri, htd, 2, scopes1); 219 log.rollWriter(); 220 assertEquals(2, FSHLogProvider.getNumRolledLogFiles(log)); 221 222 // Now mix edits from 2 regions, still no flushing 223 addEdits(log, hri, htd, 1, scopes1); 224 addEdits(log, hri2, htd2, 1, scopes2); 225 addEdits(log, hri, htd, 1, scopes1); 226 addEdits(log, hri2, htd2, 1, scopes2); 227 log.rollWriter(); 228 assertEquals(3, AbstractFSWALProvider.getNumRolledLogFiles(log)); 229 230 // Flush the first region, we expect to see the first two files getting 231 // archived. We need to append something or writer won't be rolled. 232 addEdits(log, hri2, htd2, 1, scopes2); 233 log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames()); 234 log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); 235 log.rollWriter(); 236 int count = AbstractFSWALProvider.getNumRolledLogFiles(log); 237 assertEquals(2, count); 238 239 // Flush the second region, which removes all the remaining output files 240 // since the oldest was completely flushed and the two others only contain 241 // flush information 242 addEdits(log, hri2, htd2, 1, scopes2); 243 log.startCacheFlush(hri2.getEncodedNameAsBytes(), htd2.getColumnFamilyNames()); 244 log.completeCacheFlush(hri2.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); 245 log.rollWriter(); 246 assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(log)); 247 } finally { 248 if (wals != null) { 249 wals.close(); 250 } 251 } 252 } 253 254 /** 255 * Tests wal archiving by adding data, doing flushing/rolling and checking we archive old logs 256 * and also don't archive "live logs" (that is, a log with un-flushed entries). 257 * <p> 258 * This is what it does: 259 * It creates two regions, and does a series of inserts along with log rolling. 260 * Whenever a WAL is rolled, HLogBase checks previous wals for archiving. A wal is eligible for 261 * archiving if for all the regions which have entries in that wal file, have flushed - past 262 * their maximum sequence id in that wal file. 263 * <p> 264 * @throws IOException 265 */ 266 @Test 267 public void testWALArchiving() throws IOException { 268 LOG.debug(currentTest.getMethodName()); 269 TableDescriptor table1 = 270 TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName() + "1")) 271 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build(); 272 TableDescriptor table2 = 273 TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName() + "2")) 274 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build(); 275 NavigableMap<byte[], Integer> scopes1 = new TreeMap<>(Bytes.BYTES_COMPARATOR); 276 for (byte[] fam : table1.getColumnFamilyNames()) { 277 scopes1.put(fam, 0); 278 } 279 NavigableMap<byte[], Integer> scopes2 = new TreeMap<>(Bytes.BYTES_COMPARATOR); 280 for (byte[] fam : table2.getColumnFamilyNames()) { 281 scopes2.put(fam, 0); 282 } 283 Configuration localConf = new Configuration(conf); 284 localConf.set(WALFactory.WAL_PROVIDER, FSHLogProvider.class.getName()); 285 WALFactory wals = new WALFactory(localConf, currentTest.getMethodName()); 286 try { 287 WAL wal = wals.getWAL(null); 288 assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(wal)); 289 RegionInfo hri1 = RegionInfoBuilder.newBuilder(table1.getTableName()).build(); 290 RegionInfo hri2 = RegionInfoBuilder.newBuilder(table2.getTableName()).build(); 291 // variables to mock region sequenceIds. 292 // start with the testing logic: insert a waledit, and roll writer 293 addEdits(wal, hri1, table1, 1, scopes1); 294 wal.rollWriter(); 295 // assert that the wal is rolled 296 assertEquals(1, AbstractFSWALProvider.getNumRolledLogFiles(wal)); 297 // add edits in the second wal file, and roll writer. 298 addEdits(wal, hri1, table1, 1, scopes1); 299 wal.rollWriter(); 300 // assert that the wal is rolled 301 assertEquals(2, AbstractFSWALProvider.getNumRolledLogFiles(wal)); 302 // add a waledit to table1, and flush the region. 303 addEdits(wal, hri1, table1, 3, scopes1); 304 flushRegion(wal, hri1.getEncodedNameAsBytes(), table1.getColumnFamilyNames()); 305 // roll log; all old logs should be archived. 306 wal.rollWriter(); 307 assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(wal)); 308 // add an edit to table2, and roll writer 309 addEdits(wal, hri2, table2, 1, scopes2); 310 wal.rollWriter(); 311 assertEquals(1, AbstractFSWALProvider.getNumRolledLogFiles(wal)); 312 // add edits for table1, and roll writer 313 addEdits(wal, hri1, table1, 2, scopes1); 314 wal.rollWriter(); 315 assertEquals(2, AbstractFSWALProvider.getNumRolledLogFiles(wal)); 316 // add edits for table2, and flush hri1. 317 addEdits(wal, hri2, table2, 2, scopes2); 318 flushRegion(wal, hri1.getEncodedNameAsBytes(), table2.getColumnFamilyNames()); 319 // the log : region-sequenceId map is 320 // log1: region2 (unflushed) 321 // log2: region1 (flushed) 322 // log3: region2 (unflushed) 323 // roll the writer; log2 should be archived. 324 wal.rollWriter(); 325 assertEquals(2, AbstractFSWALProvider.getNumRolledLogFiles(wal)); 326 // flush region2, and all logs should be archived. 327 addEdits(wal, hri2, table2, 2, scopes2); 328 flushRegion(wal, hri2.getEncodedNameAsBytes(), table2.getColumnFamilyNames()); 329 wal.rollWriter(); 330 assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(wal)); 331 } finally { 332 if (wals != null) { 333 wals.close(); 334 } 335 } 336 } 337 338 /** 339 * Write to a log file with three concurrent threads and verifying all data is written. 340 * @throws Exception 341 */ 342 @Test 343 public void testConcurrentWrites() throws Exception { 344 // Run the WPE tool with three threads writing 3000 edits each concurrently. 345 // When done, verify that all edits were written. 346 int errCode = WALPerformanceEvaluation. 347 innerMain(new Configuration(TEST_UTIL.getConfiguration()), 348 new String [] {"-threads", "3", "-verify", "-noclosefs", "-iterations", "3000"}); 349 assertEquals(0, errCode); 350 } 351 352 /** 353 * Ensure that we can use Set.add to deduplicate WALs 354 */ 355 @Test 356 public void setMembershipDedups() throws IOException { 357 Configuration localConf = new Configuration(conf); 358 localConf.set(WALFactory.WAL_PROVIDER, FSHLogProvider.class.getName()); 359 WALFactory wals = new WALFactory(localConf, currentTest.getMethodName()); 360 try { 361 final Set<WAL> seen = new HashSet<>(1); 362 assertTrue("first attempt to add WAL from default provider should work.", 363 seen.add(wals.getWAL(null))); 364 for (int i = 0; i < 1000; i++) { 365 assertFalse( 366 "default wal provider is only supposed to return a single wal, which should " + 367 "compare as .equals itself.", 368 seen.add(wals.getWAL(RegionInfoBuilder 369 .newBuilder(TableName.valueOf("Table-" + ThreadLocalRandom.current().nextInt())) 370 .build()))); 371 } 372 } finally { 373 wals.close(); 374 } 375 } 376}