001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.hamcrest.CoreMatchers.containsString; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertNotEquals; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertNull; 025import static org.junit.Assert.assertThat; 026import static org.junit.Assert.assertTrue; 027import static org.junit.Assert.fail; 028 029import java.io.IOException; 030import java.util.ArrayList; 031import java.util.Comparator; 032import java.util.List; 033import java.util.NavigableMap; 034import java.util.Set; 035import java.util.TreeMap; 036import java.util.UUID; 037import java.util.concurrent.atomic.AtomicBoolean; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.fs.FileStatus; 040import org.apache.hadoop.fs.FileSystem; 041import org.apache.hadoop.fs.Path; 042import org.apache.hadoop.hbase.CellScanner; 043import org.apache.hadoop.hbase.Coprocessor; 044import org.apache.hadoop.hbase.HBaseConfiguration; 045import org.apache.hadoop.hbase.HBaseTestingUtility; 046import org.apache.hadoop.hbase.HConstants; 047import org.apache.hadoop.hbase.KeyValue; 048import org.apache.hadoop.hbase.TableName; 049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 050import org.apache.hadoop.hbase.client.Get; 051import org.apache.hadoop.hbase.client.Put; 052import org.apache.hadoop.hbase.client.RegionInfo; 053import org.apache.hadoop.hbase.client.RegionInfoBuilder; 054import org.apache.hadoop.hbase.client.Result; 055import org.apache.hadoop.hbase.client.TableDescriptor; 056import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 057import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 058import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor; 059import org.apache.hadoop.hbase.regionserver.HRegion; 060import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 061import org.apache.hadoop.hbase.regionserver.SequenceId; 062import org.apache.hadoop.hbase.util.Bytes; 063import org.apache.hadoop.hbase.util.CommonFSUtils; 064import org.apache.hadoop.hbase.util.EnvironmentEdge; 065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 066import org.apache.hadoop.hbase.util.Threads; 067import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 068import org.apache.hadoop.hbase.wal.WAL; 069import org.apache.hadoop.hbase.wal.WALEdit; 070import org.apache.hadoop.hbase.wal.WALKey; 071import org.apache.hadoop.hbase.wal.WALKeyImpl; 072import org.junit.AfterClass; 073import org.junit.Before; 074import org.junit.BeforeClass; 075import org.junit.Rule; 076import org.junit.Test; 077import org.junit.rules.TestName; 078import org.slf4j.Logger; 079import org.slf4j.LoggerFactory; 080 081public abstract class AbstractTestFSWAL { 082 083 protected static final Logger LOG = LoggerFactory.getLogger(AbstractTestFSWAL.class); 084 085 protected static Configuration CONF; 086 protected static FileSystem FS; 087 protected static Path DIR; 088 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 089 090 @Rule 091 public final TestName currentTest = new TestName(); 092 093 @Before 094 public void setUp() throws Exception { 095 FileStatus[] entries = FS.listStatus(new Path("/")); 096 for (FileStatus dir : entries) { 097 FS.delete(dir.getPath(), true); 098 } 099 final Path hbaseDir = TEST_UTIL.createRootDir(); 100 final Path hbaseWALDir = TEST_UTIL.createWALRootDir(); 101 DIR = new Path(hbaseWALDir, currentTest.getMethodName()); 102 assertNotEquals(hbaseDir, hbaseWALDir); 103 } 104 105 @BeforeClass 106 public static void setUpBeforeClass() throws Exception { 107 // Make block sizes small. 108 TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024); 109 // quicker heartbeat interval for faster DN death notification 110 TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000); 111 TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1); 112 TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000); 113 114 // faster failover with cluster.shutdown();fs.close() idiom 115 TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connect.max.retries", 1); 116 TEST_UTIL.getConfiguration().setInt("dfs.client.block.recovery.retries", 1); 117 TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connection.maxidletime", 500); 118 TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, 119 SampleRegionWALCoprocessor.class.getName()); 120 TEST_UTIL.startMiniDFSCluster(3); 121 122 CONF = TEST_UTIL.getConfiguration(); 123 FS = TEST_UTIL.getDFSCluster().getFileSystem(); 124 } 125 126 @AfterClass 127 public static void tearDownAfterClass() throws Exception { 128 TEST_UTIL.shutdownMiniCluster(); 129 } 130 131 protected abstract AbstractFSWAL<?> newWAL(FileSystem fs, Path rootDir, String WALDir, 132 String archiveDir, Configuration conf, List<WALActionsListener> listeners, 133 boolean failIfWALExists, String prefix, String suffix) throws IOException; 134 135 protected abstract AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String WALDir, 136 String archiveDir, Configuration conf, List<WALActionsListener> listeners, 137 boolean failIfWALExists, String prefix, String suffix, Runnable action) throws IOException; 138 139 /** 140 * A loaded WAL coprocessor won't break existing WAL test cases. 141 */ 142 @Test 143 public void testWALCoprocessorLoaded() throws Exception { 144 // test to see whether the coprocessor is loaded or not. 145 AbstractFSWAL<?> wal = null; 146 try { 147 wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), 148 HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null); 149 WALCoprocessorHost host = wal.getCoprocessorHost(); 150 Coprocessor c = host.findCoprocessor(SampleRegionWALCoprocessor.class); 151 assertNotNull(c); 152 } finally { 153 if (wal != null) { 154 wal.close(); 155 } 156 } 157 } 158 159 protected void addEdits(WAL log, RegionInfo hri, TableDescriptor htd, int times, 160 MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes) 161 throws IOException { 162 final byte[] row = Bytes.toBytes("row"); 163 for (int i = 0; i < times; i++) { 164 long timestamp = System.currentTimeMillis(); 165 WALEdit cols = new WALEdit(); 166 cols.add(new KeyValue(row, row, row, timestamp, row)); 167 WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), htd.getTableName(), 168 SequenceId.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE, 169 HConstants.NO_NONCE, mvcc, scopes); 170 log.append(hri, key, cols, true); 171 } 172 log.sync(); 173 } 174 175 /** 176 * helper method to simulate region flush for a WAL. 177 * @param wal 178 * @param regionEncodedName 179 */ 180 protected void flushRegion(WAL wal, byte[] regionEncodedName, Set<byte[]> flushedFamilyNames) { 181 wal.startCacheFlush(regionEncodedName, flushedFamilyNames); 182 wal.completeCacheFlush(regionEncodedName); 183 } 184 185 /** 186 * tests the log comparator. Ensure that we are not mixing meta logs with non-meta logs (throws 187 * exception if we do). Comparison is based on the timestamp present in the wal name. 188 * @throws Exception 189 */ 190 @Test 191 public void testWALComparator() throws Exception { 192 AbstractFSWAL<?> wal1 = null; 193 AbstractFSWAL<?> walMeta = null; 194 try { 195 wal1 = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), 196 HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null); 197 LOG.debug("Log obtained is: " + wal1); 198 Comparator<Path> comp = wal1.LOG_NAME_COMPARATOR; 199 Path p1 = wal1.computeFilename(11); 200 Path p2 = wal1.computeFilename(12); 201 // comparing with itself returns 0 202 assertTrue(comp.compare(p1, p1) == 0); 203 // comparing with different filenum. 204 assertTrue(comp.compare(p1, p2) < 0); 205 walMeta = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), 206 HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, 207 AbstractFSWALProvider.META_WAL_PROVIDER_ID); 208 Comparator<Path> compMeta = walMeta.LOG_NAME_COMPARATOR; 209 210 Path p1WithMeta = walMeta.computeFilename(11); 211 Path p2WithMeta = walMeta.computeFilename(12); 212 assertTrue(compMeta.compare(p1WithMeta, p1WithMeta) == 0); 213 assertTrue(compMeta.compare(p1WithMeta, p2WithMeta) < 0); 214 // mixing meta and non-meta logs gives error 215 boolean ex = false; 216 try { 217 comp.compare(p1WithMeta, p2); 218 } catch (IllegalArgumentException e) { 219 ex = true; 220 } 221 assertTrue("Comparator doesn't complain while checking meta log files", ex); 222 boolean exMeta = false; 223 try { 224 compMeta.compare(p1WithMeta, p2); 225 } catch (IllegalArgumentException e) { 226 exMeta = true; 227 } 228 assertTrue("Meta comparator doesn't complain while checking log files", exMeta); 229 } finally { 230 if (wal1 != null) { 231 wal1.close(); 232 } 233 if (walMeta != null) { 234 walMeta.close(); 235 } 236 } 237 } 238 239 /** 240 * On rolling a wal after reaching the threshold, {@link WAL#rollWriter()} returns the list of 241 * regions which should be flushed in order to archive the oldest wal file. 242 * <p> 243 * This method tests this behavior by inserting edits and rolling the wal enough times to reach 244 * the max number of logs threshold. It checks whether we get the "right regions" for flush on 245 * rolling the wal. 246 * @throws Exception 247 */ 248 @Test 249 public void testFindMemStoresEligibleForFlush() throws Exception { 250 LOG.debug("testFindMemStoresEligibleForFlush"); 251 Configuration conf1 = HBaseConfiguration.create(CONF); 252 conf1.setInt("hbase.regionserver.maxlogs", 1); 253 AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(conf1), DIR.toString(), 254 HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null); 255 TableDescriptor t1 = TableDescriptorBuilder.newBuilder(TableName.valueOf("t1")) 256 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build(); 257 TableDescriptor t2 = TableDescriptorBuilder.newBuilder(TableName.valueOf("t2")) 258 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build(); 259 RegionInfo hri1 = RegionInfoBuilder.newBuilder(t1.getTableName()).build(); 260 RegionInfo hri2 = RegionInfoBuilder.newBuilder(t2.getTableName()).build(); 261 // add edits and roll the wal 262 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 263 NavigableMap<byte[], Integer> scopes1 = new TreeMap<>(Bytes.BYTES_COMPARATOR); 264 for (byte[] fam : t1.getColumnFamilyNames()) { 265 scopes1.put(fam, 0); 266 } 267 NavigableMap<byte[], Integer> scopes2 = new TreeMap<>(Bytes.BYTES_COMPARATOR); 268 for (byte[] fam : t2.getColumnFamilyNames()) { 269 scopes2.put(fam, 0); 270 } 271 try { 272 addEdits(wal, hri1, t1, 2, mvcc, scopes1); 273 wal.rollWriter(); 274 // add some more edits and roll the wal. This would reach the log number threshold 275 addEdits(wal, hri1, t1, 2, mvcc, scopes1); 276 wal.rollWriter(); 277 // with above rollWriter call, the max logs limit is reached. 278 assertTrue(wal.getNumRolledLogFiles() == 2); 279 280 // get the regions to flush; since there is only one region in the oldest wal, it should 281 // return only one region. 282 byte[][] regionsToFlush = wal.findRegionsToForceFlush(); 283 assertEquals(1, regionsToFlush.length); 284 assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]); 285 // insert edits in second region 286 addEdits(wal, hri2, t2, 2, mvcc, scopes2); 287 // get the regions to flush, it should still read region1. 288 regionsToFlush = wal.findRegionsToForceFlush(); 289 assertEquals(1, regionsToFlush.length); 290 assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]); 291 // flush region 1, and roll the wal file. Only last wal which has entries for region1 should 292 // remain. 293 flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames()); 294 wal.rollWriter(); 295 // only one wal should remain now (that is for the second region). 296 assertEquals(1, wal.getNumRolledLogFiles()); 297 // flush the second region 298 flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames()); 299 wal.rollWriter(true); 300 // no wal should remain now. 301 assertEquals(0, wal.getNumRolledLogFiles()); 302 // add edits both to region 1 and region 2, and roll. 303 addEdits(wal, hri1, t1, 2, mvcc, scopes1); 304 addEdits(wal, hri2, t2, 2, mvcc, scopes2); 305 wal.rollWriter(); 306 // add edits and roll the writer, to reach the max logs limit. 307 assertEquals(1, wal.getNumRolledLogFiles()); 308 addEdits(wal, hri1, t1, 2, mvcc, scopes1); 309 wal.rollWriter(); 310 // it should return two regions to flush, as the oldest wal file has entries 311 // for both regions. 312 regionsToFlush = wal.findRegionsToForceFlush(); 313 assertEquals(2, regionsToFlush.length); 314 // flush both regions 315 flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames()); 316 flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames()); 317 wal.rollWriter(true); 318 assertEquals(0, wal.getNumRolledLogFiles()); 319 // Add an edit to region1, and roll the wal. 320 addEdits(wal, hri1, t1, 2, mvcc, scopes1); 321 // tests partial flush: roll on a partial flush, and ensure that wal is not archived. 322 wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames()); 323 wal.rollWriter(); 324 wal.completeCacheFlush(hri1.getEncodedNameAsBytes()); 325 assertEquals(1, wal.getNumRolledLogFiles()); 326 } finally { 327 if (wal != null) { 328 wal.close(); 329 } 330 } 331 } 332 333 @Test(expected = IOException.class) 334 public void testFailedToCreateWALIfParentRenamed() throws IOException, 335 CommonFSUtils.StreamLacksCapabilityException { 336 final String name = "testFailedToCreateWALIfParentRenamed"; 337 AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), name, 338 HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null); 339 long filenum = System.currentTimeMillis(); 340 Path path = wal.computeFilename(filenum); 341 wal.createWriterInstance(path); 342 Path parent = path.getParent(); 343 path = wal.computeFilename(filenum + 1); 344 Path newPath = new Path(parent.getParent(), parent.getName() + "-splitting"); 345 FS.rename(parent, newPath); 346 wal.createWriterInstance(path); 347 fail("It should fail to create the new WAL"); 348 } 349 350 /** 351 * Test flush for sure has a sequence id that is beyond the last edit appended. We do this by 352 * slowing appends in the background ring buffer thread while in foreground we call flush. The 353 * addition of the sync over HRegion in flush should fix an issue where flush was returning before 354 * all of its appends had made it out to the WAL (HBASE-11109). 355 * @throws IOException 356 * @see <a href="https://issues.apache.org/jira/browse/HBASE-11109">HBASE-11109</a> 357 */ 358 @Test 359 public void testFlushSequenceIdIsGreaterThanAllEditsInHFile() throws IOException { 360 String testName = currentTest.getMethodName(); 361 final TableName tableName = TableName.valueOf(testName); 362 final RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).build(); 363 final byte[] rowName = tableName.getName(); 364 final TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName) 365 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("f")).build(); 366 HRegion r = HBaseTestingUtility.createRegionAndWAL(hri, TEST_UTIL.getDefaultRootDirPath(), 367 TEST_UTIL.getConfiguration(), htd); 368 HBaseTestingUtility.closeRegionAndWAL(r); 369 final int countPerFamily = 10; 370 final AtomicBoolean goslow = new AtomicBoolean(false); 371 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 372 for (byte[] fam : htd.getColumnFamilyNames()) { 373 scopes.put(fam, 0); 374 } 375 // subclass and doctor a method. 376 AbstractFSWAL<?> wal = newSlowWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), 377 testName, CONF, null, true, null, null, new Runnable() { 378 379 @Override 380 public void run() { 381 if (goslow.get()) { 382 Threads.sleep(100); 383 LOG.debug("Sleeping before appending 100ms"); 384 } 385 } 386 }); 387 HRegion region = HRegion.openHRegion(TEST_UTIL.getConfiguration(), 388 TEST_UTIL.getTestFileSystem(), TEST_UTIL.getDefaultRootDirPath(), hri, htd, wal); 389 EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate(); 390 try { 391 List<Put> puts = null; 392 for (byte[] fam : htd.getColumnFamilyNames()) { 393 puts = 394 TestWALReplay.addRegionEdits(rowName, fam, countPerFamily, ee, region, "x"); 395 } 396 397 // Now assert edits made it in. 398 final Get g = new Get(rowName); 399 Result result = region.get(g); 400 assertEquals(countPerFamily * htd.getColumnFamilyNames().size(), result.size()); 401 402 // Construct a WALEdit and add it a few times to the WAL. 403 WALEdit edits = new WALEdit(); 404 for (Put p : puts) { 405 CellScanner cs = p.cellScanner(); 406 while (cs.advance()) { 407 edits.add(cs.current()); 408 } 409 } 410 // Add any old cluster id. 411 List<UUID> clusterIds = new ArrayList<>(1); 412 clusterIds.add(UUID.randomUUID()); 413 // Now make appends run slow. 414 goslow.set(true); 415 for (int i = 0; i < countPerFamily; i++) { 416 final RegionInfo info = region.getRegionInfo(); 417 final WALKeyImpl logkey = new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 418 System.currentTimeMillis(), clusterIds, -1, -1, region.getMVCC(), scopes); 419 wal.append(info, logkey, edits, true); 420 region.getMVCC().completeAndWait(logkey.getWriteEntry()); 421 } 422 region.flush(true); 423 // FlushResult.flushSequenceId is not visible here so go get the current sequence id. 424 long currentSequenceId = region.getReadPoint(null); 425 // Now release the appends 426 goslow.set(false); 427 assertTrue(currentSequenceId >= region.getReadPoint(null)); 428 } finally { 429 region.close(true); 430 wal.close(); 431 } 432 } 433 434 @Test 435 public void testSyncNoAppend() throws IOException { 436 String testName = currentTest.getMethodName(); 437 AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), testName, 438 CONF, null, true, null, null); 439 try { 440 wal.sync(); 441 } finally { 442 wal.close(); 443 } 444 } 445 446 @Test 447 public void testWriteEntryCanBeNull() throws IOException { 448 String testName = currentTest.getMethodName(); 449 AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), testName, 450 CONF, null, true, null, null); 451 wal.close(); 452 TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf("table")) 453 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build(); 454 RegionInfo ri = RegionInfoBuilder.newBuilder(td.getTableName()).build(); 455 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 456 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 457 for (byte[] fam : td.getColumnFamilyNames()) { 458 scopes.put(fam, 0); 459 } 460 long timestamp = System.currentTimeMillis(); 461 byte[] row = Bytes.toBytes("row"); 462 WALEdit cols = new WALEdit(); 463 cols.add(new KeyValue(row, row, row, timestamp, row)); 464 WALKeyImpl key = 465 new WALKeyImpl(ri.getEncodedNameAsBytes(), td.getTableName(), SequenceId.NO_SEQUENCE_ID, 466 timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE, mvcc, scopes); 467 try { 468 wal.append(ri, key, cols, true); 469 fail("Should fail since the wal has already been closed"); 470 } catch (IOException e) { 471 // expected 472 assertThat(e.getMessage(), containsString("log is closed")); 473 // the WriteEntry should be null since we fail before setting it. 474 assertNull(key.getWriteEntry()); 475 } 476 } 477}