001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertNull; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024 025import java.io.IOException; 026import java.util.HashSet; 027import java.util.NavigableMap; 028import java.util.Set; 029import java.util.TreeMap; 030import java.util.concurrent.ThreadLocalRandom; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileStatus; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.KeyValue; 038import org.apache.hadoop.hbase.ServerName; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 041import org.apache.hadoop.hbase.client.RegionInfo; 042import org.apache.hadoop.hbase.client.RegionInfoBuilder; 043import org.apache.hadoop.hbase.client.TableDescriptor; 044import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 045import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 046import org.apache.hadoop.hbase.testclassification.MediumTests; 047import org.apache.hadoop.hbase.testclassification.RegionServerTests; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.apache.hadoop.hbase.util.CommonFSUtils; 050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 051import org.junit.jupiter.api.AfterAll; 052import org.junit.jupiter.api.BeforeAll; 053import org.junit.jupiter.api.BeforeEach; 054import org.junit.jupiter.api.Tag; 055import org.junit.jupiter.api.Test; 056import org.junit.jupiter.api.TestInfo; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060@Tag(RegionServerTests.TAG) 061@Tag(MediumTests.TAG) 062public class TestFSHLogProvider { 063 064 private static final Logger LOG = LoggerFactory.getLogger(TestFSHLogProvider.class); 065 066 private static Configuration conf; 067 private static FileSystem fs; 068 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 069 private MultiVersionConcurrencyControl mvcc; 070 071 private String currentTestName; 072 073 @BeforeEach 074 public void setUp(TestInfo testInfo) throws Exception { 075 currentTestName = testInfo.getTestMethod().get().getName(); 076 mvcc = new MultiVersionConcurrencyControl(); 077 FileStatus[] entries = fs.listStatus(new Path("/")); 078 for (FileStatus dir : entries) { 079 fs.delete(dir.getPath(), true); 080 } 081 } 082 083 @BeforeAll 084 public static void setUpBeforeClass() throws Exception { 085 // Make block sizes small. 086 TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024); 087 // quicker heartbeat interval for faster DN death notification 088 TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000); 089 TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1); 090 TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000); 091 092 // faster failover with cluster.shutdown();fs.close() idiom 093 TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connect.max.retries", 1); 094 TEST_UTIL.getConfiguration().setInt("dfs.client.block.recovery.retries", 1); 095 TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connection.maxidletime", 500); 096 TEST_UTIL.startMiniDFSCluster(3); 097 098 // Set up a working space for our tests. 099 TEST_UTIL.createRootDir(); 100 conf = TEST_UTIL.getConfiguration(); 101 fs = TEST_UTIL.getDFSCluster().getFileSystem(); 102 } 103 104 @AfterAll 105 public static void tearDownAfterClass() throws Exception { 106 TEST_UTIL.shutdownMiniCluster(); 107 } 108 109 @Test 110 public void testGetServerNameFromWALDirectoryName() throws IOException { 111 ServerName sn = ServerName.valueOf("hn", 450, 1398); 112 String hl = CommonFSUtils.getRootDir(conf) + "/" 113 + AbstractFSWALProvider.getWALDirectoryName(sn.toString()); 114 115 // Must not throw exception 116 assertNull(AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, null)); 117 assertNull(AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, 118 CommonFSUtils.getRootDir(conf).toUri().toString())); 119 assertNull(AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, "")); 120 assertNull(AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, " ")); 121 assertNull(AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, hl)); 122 assertNull(AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, hl + "qdf")); 123 assertNull(AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, "sfqf" + hl + "qdf")); 124 125 final String wals = "/WALs/"; 126 ServerName parsed = AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, 127 CommonFSUtils.getRootDir(conf).toUri().toString() + wals + sn 128 + "/localhost%2C32984%2C1343316388997.1343316390417"); 129 assertEquals(sn, parsed, "standard"); 130 131 parsed = AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, hl + "/qdf"); 132 assertEquals(sn, parsed, "subdir"); 133 134 parsed = AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, 135 CommonFSUtils.getRootDir(conf).toUri().toString() + wals + sn 136 + "-splitting/localhost%3A57020.1340474893931"); 137 assertEquals(sn, parsed, "split"); 138 } 139 140 private void addEdits(WAL log, RegionInfo hri, TableDescriptor htd, int times, 141 NavigableMap<byte[], Integer> scopes) throws IOException { 142 final byte[] row = Bytes.toBytes("row"); 143 for (int i = 0; i < times; i++) { 144 long timestamp = EnvironmentEdgeManager.currentTime(); 145 WALEdit cols = new WALEdit(); 146 cols.add(new KeyValue(row, row, row, timestamp, row)); 147 log.appendData(hri, 148 getWalKey(hri.getEncodedNameAsBytes(), htd.getTableName(), timestamp, scopes), cols); 149 } 150 log.sync(); 151 } 152 153 /** 154 * used by TestDefaultWALProviderWithHLogKey 155 */ 156 private WALKeyImpl getWalKey(final byte[] info, final TableName tableName, final long timestamp, 157 NavigableMap<byte[], Integer> scopes) { 158 return new WALKeyImpl(info, tableName, timestamp, mvcc, scopes); 159 } 160 161 /** 162 * helper method to simulate region flush for a WAL. 163 */ 164 private void flushRegion(WAL wal, byte[] regionEncodedName, Set<byte[]> flushedFamilyNames) { 165 wal.startCacheFlush(regionEncodedName, flushedFamilyNames); 166 wal.completeCacheFlush(regionEncodedName, HConstants.NO_SEQNUM); 167 } 168 169 // now we will close asynchronously and will not archive a wal file unless it is fully closed, so 170 // sometimes we need to wait a bit before asserting, especially when you want to test the removal 171 // of numRolledLogFiles 172 private void waitNumRolledLogFiles(WAL wal, int expected) { 173 TEST_UTIL.waitFor(5000, () -> AbstractFSWALProvider.getNumRolledLogFiles(wal) == expected); 174 } 175 176 private void testLogCleaning(WALFactory wals) throws IOException { 177 TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTestName)) 178 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build(); 179 TableDescriptor htd2 = 180 TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTestName + "2")) 181 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build(); 182 NavigableMap<byte[], Integer> scopes1 = new TreeMap<>(Bytes.BYTES_COMPARATOR); 183 for (byte[] fam : htd.getColumnFamilyNames()) { 184 scopes1.put(fam, 0); 185 } 186 NavigableMap<byte[], Integer> scopes2 = new TreeMap<>(Bytes.BYTES_COMPARATOR); 187 for (byte[] fam : htd2.getColumnFamilyNames()) { 188 scopes2.put(fam, 0); 189 } 190 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 191 RegionInfo hri2 = RegionInfoBuilder.newBuilder(htd2.getTableName()).build(); 192 // we want to mix edits from regions, so pick our own identifier. 193 WAL log = wals.getWAL(null); 194 195 // Add a single edit and make sure that rolling won't remove the file 196 // Before HBASE-3198 it used to delete it 197 addEdits(log, hri, htd, 1, scopes1); 198 log.rollWriter(); 199 waitNumRolledLogFiles(log, 1); 200 201 // See if there's anything wrong with more than 1 edit 202 addEdits(log, hri, htd, 2, scopes1); 203 log.rollWriter(); 204 assertEquals(2, FSHLogProvider.getNumRolledLogFiles(log)); 205 206 // Now mix edits from 2 regions, still no flushing 207 addEdits(log, hri, htd, 1, scopes1); 208 addEdits(log, hri2, htd2, 1, scopes2); 209 addEdits(log, hri, htd, 1, scopes1); 210 addEdits(log, hri2, htd2, 1, scopes2); 211 log.rollWriter(); 212 waitNumRolledLogFiles(log, 3); 213 214 // Flush the first region, we expect to see the first two files getting 215 // archived. We need to append something or writer won't be rolled. 216 addEdits(log, hri2, htd2, 1, scopes2); 217 log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames()); 218 log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); 219 log.rollWriter(); 220 waitNumRolledLogFiles(log, 2); 221 222 // Flush the second region, which removes all the remaining output files 223 // since the oldest was completely flushed and the two others only contain 224 // flush information 225 addEdits(log, hri2, htd2, 1, scopes2); 226 log.startCacheFlush(hri2.getEncodedNameAsBytes(), htd2.getColumnFamilyNames()); 227 log.completeCacheFlush(hri2.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); 228 log.rollWriter(); 229 waitNumRolledLogFiles(log, 0); 230 } 231 232 @Test 233 public void testLogCleaning() throws Exception { 234 LOG.info(currentTestName); 235 Configuration localConf = new Configuration(conf); 236 localConf.set(WALFactory.WAL_PROVIDER, FSHLogProvider.class.getName()); 237 WALFactory wals = new WALFactory(localConf, currentTestName); 238 try { 239 testLogCleaning(wals); 240 } finally { 241 wals.close(); 242 } 243 } 244 245 private void testWALArchiving(WALFactory wals) throws IOException { 246 TableDescriptor table1 = 247 TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTestName + "1")) 248 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build(); 249 TableDescriptor table2 = 250 TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTestName + "2")) 251 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build(); 252 NavigableMap<byte[], Integer> scopes1 = new TreeMap<>(Bytes.BYTES_COMPARATOR); 253 for (byte[] fam : table1.getColumnFamilyNames()) { 254 scopes1.put(fam, 0); 255 } 256 NavigableMap<byte[], Integer> scopes2 = new TreeMap<>(Bytes.BYTES_COMPARATOR); 257 for (byte[] fam : table2.getColumnFamilyNames()) { 258 scopes2.put(fam, 0); 259 } 260 WAL wal = wals.getWAL(null); 261 assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(wal)); 262 RegionInfo hri1 = RegionInfoBuilder.newBuilder(table1.getTableName()).build(); 263 RegionInfo hri2 = RegionInfoBuilder.newBuilder(table2.getTableName()).build(); 264 // variables to mock region sequenceIds. 265 // start with the testing logic: insert a waledit, and roll writer 266 addEdits(wal, hri1, table1, 1, scopes1); 267 wal.rollWriter(); 268 // assert that the wal is rolled 269 waitNumRolledLogFiles(wal, 1); 270 // add edits in the second wal file, and roll writer. 271 addEdits(wal, hri1, table1, 1, scopes1); 272 wal.rollWriter(); 273 // assert that the wal is rolled 274 waitNumRolledLogFiles(wal, 2); 275 // add a waledit to table1, and flush the region. 276 addEdits(wal, hri1, table1, 3, scopes1); 277 flushRegion(wal, hri1.getEncodedNameAsBytes(), table1.getColumnFamilyNames()); 278 // roll log; all old logs should be archived. 279 wal.rollWriter(); 280 waitNumRolledLogFiles(wal, 0); 281 // add an edit to table2, and roll writer 282 addEdits(wal, hri2, table2, 1, scopes2); 283 wal.rollWriter(); 284 waitNumRolledLogFiles(wal, 1); 285 // add edits for table1, and roll writer 286 addEdits(wal, hri1, table1, 2, scopes1); 287 wal.rollWriter(); 288 waitNumRolledLogFiles(wal, 2); 289 // add edits for table2, and flush hri1. 290 addEdits(wal, hri2, table2, 2, scopes2); 291 flushRegion(wal, hri1.getEncodedNameAsBytes(), table2.getColumnFamilyNames()); 292 // the log : region-sequenceId map is 293 // log1: region2 (unflushed) 294 // log2: region1 (flushed) 295 // log3: region2 (unflushed) 296 // roll the writer; log2 should be archived. 297 wal.rollWriter(); 298 waitNumRolledLogFiles(wal, 2); 299 // flush region2, and all logs should be archived. 300 addEdits(wal, hri2, table2, 2, scopes2); 301 flushRegion(wal, hri2.getEncodedNameAsBytes(), table2.getColumnFamilyNames()); 302 wal.rollWriter(); 303 waitNumRolledLogFiles(wal, 0); 304 } 305 306 /** 307 * Tests wal archiving by adding data, doing flushing/rolling and checking we archive old logs and 308 * also don't archive "live logs" (that is, a log with un-flushed entries). 309 * <p> 310 * This is what it does: It creates two regions, and does a series of inserts along with log 311 * rolling. Whenever a WAL is rolled, HLogBase checks previous wals for archiving. A wal is 312 * eligible for archiving if for all the regions which have entries in that wal file, have flushed 313 * - past their maximum sequence id in that wal file. 314 * <p> 315 */ 316 @Test 317 public void testWALArchiving() throws IOException { 318 LOG.debug(currentTestName); 319 320 Configuration localConf = new Configuration(conf); 321 localConf.set(WALFactory.WAL_PROVIDER, FSHLogProvider.class.getName()); 322 WALFactory wals = new WALFactory(localConf, currentTestName); 323 try { 324 testWALArchiving(wals); 325 } finally { 326 wals.close(); 327 } 328 } 329 330 /** 331 * Ensure that we can use Set.add to deduplicate WALs 332 */ 333 @Test 334 public void setMembershipDedups() throws IOException { 335 Configuration localConf = new Configuration(conf); 336 localConf.set(WALFactory.WAL_PROVIDER, FSHLogProvider.class.getName()); 337 WALFactory wals = new WALFactory(localConf, currentTestName); 338 try { 339 final Set<WAL> seen = new HashSet<>(1); 340 assertTrue(seen.add(wals.getWAL(null)), 341 "first attempt to add WAL from default provider should work."); 342 for (int i = 0; i < 1000; i++) { 343 assertFalse( 344 seen.add(wals.getWAL(RegionInfoBuilder 345 .newBuilder(TableName.valueOf("Table-" + ThreadLocalRandom.current().nextInt())) 346 .build())), 347 "default wal provider is only supposed to return a single wal, which should " 348 + "compare as .equals itself."); 349 } 350 } finally { 351 wals.close(); 352 } 353 } 354}