001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.apache.hadoop.hbase.wal.WALFactory.META_WAL_PROVIDER; 021import static org.apache.hadoop.hbase.wal.WALFactory.WAL_PROVIDER; 022import static org.junit.jupiter.api.Assertions.assertEquals; 023import static org.junit.jupiter.api.Assertions.assertFalse; 024import static org.junit.jupiter.api.Assertions.assertNotNull; 025import static org.junit.jupiter.api.Assertions.assertNotSame; 026import static org.junit.jupiter.api.Assertions.assertThrows; 027import static org.junit.jupiter.api.Assertions.assertTrue; 028import static org.junit.jupiter.api.Assertions.fail; 029 030import java.io.IOException; 031import java.io.InputStream; 032import java.lang.reflect.Method; 033import java.net.BindException; 034import java.util.ArrayList; 035import java.util.List; 036import java.util.NavigableMap; 037import java.util.TreeMap; 038import java.util.concurrent.atomic.AtomicBoolean; 039import java.util.stream.Collectors; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.FSDataInputStream; 042import org.apache.hadoop.fs.FSDataOutputStream; 043import org.apache.hadoop.fs.FileStatus; 044import org.apache.hadoop.fs.FileSystem; 045import org.apache.hadoop.fs.Path; 046import org.apache.hadoop.hbase.Cell; 047import org.apache.hadoop.hbase.CellUtil; 048import org.apache.hadoop.hbase.Coprocessor; 049import org.apache.hadoop.hbase.HBaseTestingUtil; 050import org.apache.hadoop.hbase.HConstants; 051import org.apache.hadoop.hbase.KeyValue; 052import org.apache.hadoop.hbase.ServerName; 053import org.apache.hadoop.hbase.TableName; 054import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 055import org.apache.hadoop.hbase.client.RegionInfo; 056import org.apache.hadoop.hbase.client.RegionInfoBuilder; 057import org.apache.hadoop.hbase.client.TableDescriptor; 058import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 059import org.apache.hadoop.hbase.codec.Codec; 060import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 061import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor; 062import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 063import org.apache.hadoop.hbase.regionserver.wal.CompressionContext; 064import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 065import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; 066import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost; 067import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 068import org.apache.hadoop.hbase.testclassification.MediumTests; 069import org.apache.hadoop.hbase.testclassification.RegionServerTests; 070import org.apache.hadoop.hbase.util.Bytes; 071import org.apache.hadoop.hbase.util.CommonFSUtils; 072import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 073import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; 074import org.apache.hadoop.hbase.util.Threads; 075import org.apache.hadoop.hbase.wal.WALFactory.Providers; 076import org.apache.hadoop.hdfs.DistributedFileSystem; 077import org.apache.hadoop.hdfs.MiniDFSCluster; 078import org.apache.hadoop.hdfs.protocol.HdfsConstants; 079import org.junit.jupiter.api.AfterAll; 080import org.junit.jupiter.api.AfterEach; 081import org.junit.jupiter.api.BeforeAll; 082import org.junit.jupiter.api.BeforeEach; 083import org.junit.jupiter.api.Tag; 084import org.junit.jupiter.api.Test; 085import org.junit.jupiter.api.TestInfo; 086import org.slf4j.Logger; 087import org.slf4j.LoggerFactory; 088 089/** 090 * WAL tests that can be reused across providers. 091 */ 092@Tag(RegionServerTests.TAG) 093@Tag(MediumTests.TAG) 094public class TestWALFactory { 095 096 private static final Logger LOG = LoggerFactory.getLogger(TestWALFactory.class); 097 098 protected static Configuration conf; 099 private static MiniDFSCluster cluster; 100 protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 101 protected static Path hbaseDir; 102 protected static Path hbaseWALDir; 103 104 protected FileSystem fs; 105 protected Path dir; 106 protected WALFactory wals; 107 private ServerName currentServername; 108 109 private String currentTestName; 110 111 @BeforeEach 112 public void setUp(TestInfo testInfo) throws Exception { 113 currentTestName = testInfo.getTestMethod().get().getName(); 114 fs = cluster.getFileSystem(); 115 dir = new Path(hbaseDir, currentTestName); 116 this.currentServername = ServerName.valueOf(currentTestName, 16010, 1); 117 wals = new WALFactory(conf, this.currentServername.toString()); 118 } 119 120 @AfterEach 121 public void tearDown() throws Exception { 122 // testAppendClose closes the FileSystem, which will prevent us from closing cleanly here. 123 try { 124 wals.close(); 125 } catch (IOException exception) { 126 LOG.warn("Encountered exception while closing wal factory. If you have other errors, this" 127 + " may be the cause. Message: " + exception); 128 LOG.debug("Exception details for failure to close wal factory.", exception); 129 } 130 FileStatus[] entries = fs.listStatus(new Path("/")); 131 for (FileStatus dir : entries) { 132 fs.delete(dir.getPath(), true); 133 } 134 } 135 136 @BeforeAll 137 public static void setUpBeforeClass() throws Exception { 138 CommonFSUtils.setWALRootDir(TEST_UTIL.getConfiguration(), new Path("file:///tmp/wal")); 139 // Make block sizes small. 140 TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024); 141 // needed for testAppendClose() 142 // quicker heartbeat interval for faster DN death notification 143 TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000); 144 TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1); 145 TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000); 146 147 // faster failover with cluster.shutdown();fs.close() idiom 148 TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connect.max.retries", 1); 149 TEST_UTIL.getConfiguration().setInt("dfs.client.block.recovery.retries", 1); 150 TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connection.maxidletime", 500); 151 TEST_UTIL.getConfiguration().setInt("hbase.lease.recovery.timeout", 10000); 152 TEST_UTIL.getConfiguration().setInt("hbase.lease.recovery.dfs.timeout", 1000); 153 TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, 154 SampleRegionWALCoprocessor.class.getName()); 155 TEST_UTIL.startMiniDFSCluster(3); 156 157 conf = TEST_UTIL.getConfiguration(); 158 cluster = TEST_UTIL.getDFSCluster(); 159 160 hbaseDir = TEST_UTIL.createRootDir(); 161 hbaseWALDir = TEST_UTIL.createWALRootDir(); 162 } 163 164 @AfterAll 165 public static void tearDownAfterClass() throws Exception { 166 TEST_UTIL.shutdownMiniCluster(); 167 } 168 169 @Test 170 public void canCloseSingleton() throws IOException { 171 WALFactory.getInstance(conf).close(); 172 } 173 174 /** 175 * Just write multiple logs then split. Before fix for HADOOP-2283, this would fail. 176 */ 177 @Test 178 public void testSplit() throws IOException { 179 final TableName tableName = TableName.valueOf(currentTestName); 180 final byte[] rowName = tableName.getName(); 181 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); 182 final int howmany = 3; 183 RegionInfo[] infos = new RegionInfo[3]; 184 Path tableDataDir = CommonFSUtils.getTableDir(hbaseDir, tableName); 185 fs.mkdirs(tableDataDir); 186 Path tabledir = CommonFSUtils.getWALTableDir(conf, tableName); 187 fs.mkdirs(tabledir); 188 for (int i = 0; i < howmany; i++) { 189 infos[i] = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("" + i)) 190 .setEndKey(Bytes.toBytes("" + (i + 1))).build(); 191 fs.mkdirs(new Path(tabledir, infos[i].getEncodedName())); 192 fs.mkdirs(new Path(tableDataDir, infos[i].getEncodedName())); 193 LOG.info("allo " + new Path(tabledir, infos[i].getEncodedName()).toString()); 194 } 195 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 196 scopes.put(Bytes.toBytes("column"), 0); 197 198 // Add edits for three regions. 199 for (int ii = 0; ii < howmany; ii++) { 200 for (int i = 0; i < howmany; i++) { 201 final WAL log = wals.getWAL(infos[i]); 202 for (int j = 0; j < howmany; j++) { 203 WALEdit edit = new WALEdit(); 204 byte[] family = Bytes.toBytes("column"); 205 byte[] qualifier = Bytes.toBytes(Integer.toString(j)); 206 byte[] column = Bytes.toBytes("column:" + Integer.toString(j)); 207 edit.add( 208 new KeyValue(rowName, family, qualifier, EnvironmentEdgeManager.currentTime(), column)); 209 LOG.info("Region " + i + ": " + edit); 210 WALKeyImpl walKey = new WALKeyImpl(infos[i].getEncodedNameAsBytes(), tableName, 211 EnvironmentEdgeManager.currentTime(), mvcc, scopes); 212 log.appendData(infos[i], walKey, edit); 213 walKey.getWriteEntry(); 214 } 215 log.sync(); 216 log.rollWriter(true); 217 } 218 } 219 wals.shutdown(); 220 // The below calculation of logDir relies on insider information... WALSplitter should be 221 // connected better 222 // with the WAL system.... not requiring explicit path. The oldLogDir is just made up not used. 223 Path logDir = new Path(new Path(hbaseWALDir, HConstants.HREGION_LOGDIR_NAME), 224 this.currentServername.toString()); 225 Path oldLogDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME); 226 List<Path> splits = WALSplitter.split(hbaseWALDir, logDir, oldLogDir, fs, conf, wals); 227 verifySplits(splits, howmany); 228 } 229 230 /** 231 * Test new HDFS-265 sync. 232 */ 233 @Test 234 public void Broken_testSync() throws Exception { 235 TableName tableName = TableName.valueOf(currentTestName); 236 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); 237 // First verify that using streams all works. 238 Path p = new Path(dir, currentTestName + ".fsdos"); 239 FSDataOutputStream out = fs.create(p); 240 out.write(tableName.getName()); 241 Method syncMethod = null; 242 try { 243 syncMethod = out.getClass().getMethod("hflush", new Class<?>[] {}); 244 } catch (NoSuchMethodException e) { 245 try { 246 syncMethod = out.getClass().getMethod("sync", new Class<?>[] {}); 247 } catch (NoSuchMethodException ex) { 248 fail("This version of Hadoop supports neither Syncable.sync() " + "nor Syncable.hflush()."); 249 } 250 } 251 syncMethod.invoke(out, new Object[] {}); 252 FSDataInputStream in = fs.open(p); 253 assertTrue(in.available() > 0); 254 byte[] buffer = new byte[1024]; 255 int read = in.read(buffer); 256 assertEquals(tableName.getName().length, read); 257 out.close(); 258 in.close(); 259 260 final int total = 20; 261 RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); 262 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 263 scopes.put(tableName.getName(), 0); 264 final WAL wal = wals.getWAL(info); 265 266 for (int i = 0; i < total; i++) { 267 WALEdit kvs = new WALEdit(); 268 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); 269 wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 270 EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs); 271 } 272 // Now call sync and try reading. Opening a Reader before you sync just 273 // gives you EOFE. 274 wal.sync(); 275 // Open a Reader. 276 Path walPath = AbstractFSWALProvider.getCurrentFileName(wal); 277 int count = NoEOFWALStreamReader.count(wals, fs, walPath); 278 assertEquals(total, count); 279 // Add test that checks to see that an open of a Reader works on a file 280 // that has had a sync done on it. 281 for (int i = 0; i < total; i++) { 282 WALEdit kvs = new WALEdit(); 283 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); 284 wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 285 EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs); 286 } 287 wal.sync(); 288 count = NoEOFWALStreamReader.count(wals, fs, walPath); 289 assertTrue(count >= total); 290 // If I sync, should see double the edits. 291 wal.sync(); 292 count = NoEOFWALStreamReader.count(wals, fs, walPath); 293 assertEquals(total * 2, count); 294 // Now do a test that ensures stuff works when we go over block boundary, 295 // especially that we return good length on file. 296 final byte[] value = new byte[1025 * 1024]; // Make a 1M value. 297 for (int i = 0; i < total; i++) { 298 WALEdit kvs = new WALEdit(); 299 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value)); 300 wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 301 EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs); 302 } 303 // Now I should have written out lots of blocks. Sync then read. 304 wal.sync(); 305 count = NoEOFWALStreamReader.count(wals, fs, walPath); 306 assertEquals(total * 3, count); 307 // shutdown and ensure that Reader gets right length also. 308 wal.shutdown(); 309 count = NoEOFWALStreamReader.count(wals, fs, walPath); 310 assertEquals(total * 3, count); 311 } 312 313 private void verifySplits(final List<Path> splits, final int howmany) throws IOException { 314 assertEquals(howmany * howmany, splits.size()); 315 for (int i = 0; i < splits.size(); i++) { 316 LOG.info("Verifying=" + splits.get(i)); 317 try (WALStreamReader reader = wals.createStreamReader(fs, splits.get(i))) { 318 int count = 0; 319 String previousRegion = null; 320 long seqno = -1; 321 WAL.Entry entry = new WAL.Entry(); 322 while ((entry = reader.next(entry)) != null) { 323 WALKey key = entry.getKey(); 324 String region = Bytes.toString(key.getEncodedRegionName()); 325 // Assert that all edits are for same region. 326 if (previousRegion != null) { 327 assertEquals(previousRegion, region); 328 } 329 LOG.info("oldseqno=" + seqno + ", newseqno=" + key.getSequenceId()); 330 assertTrue(seqno < key.getSequenceId()); 331 seqno = key.getSequenceId(); 332 previousRegion = region; 333 count++; 334 } 335 assertEquals(howmany, count); 336 } 337 } 338 } 339 340 /* 341 * We pass different values to recoverFileLease() so that different code paths are covered For 342 * this test to pass, requires: 1. HDFS-200 (append support) 2. HDFS-988 (SafeMode should freeze 343 * file operations [FSNamesystem.nextGenerationStampForBlock]) 3. HDFS-142 (on restart, maintain 344 * pendingCreates) 345 */ 346 @Test 347 public void testAppendClose() throws Exception { 348 TableName tableName = TableName.valueOf(currentTestName); 349 RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).build(); 350 351 WAL wal = wals.getWAL(regionInfo); 352 int total = 20; 353 354 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 355 scopes.put(tableName.getName(), 0); 356 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 357 for (int i = 0; i < total; i++) { 358 WALEdit kvs = new WALEdit(); 359 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); 360 wal.appendData(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName, 361 EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs); 362 } 363 // Now call sync to send the data to HDFS datanodes 364 wal.sync(); 365 int namenodePort = cluster.getNameNodePort(); 366 final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal); 367 368 // Stop the cluster. (ensure restart since we're sharing MiniDFSCluster) 369 try { 370 DistributedFileSystem dfs = cluster.getFileSystem(); 371 dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER); 372 TEST_UTIL.shutdownMiniDFSCluster(); 373 try { 374 // wal.writer.close() will throw an exception, 375 // but still call this since it closes the LogSyncer thread first 376 wal.shutdown(); 377 } catch (IOException e) { 378 LOG.info(e.toString(), e); 379 } 380 fs.close(); // closing FS last so DFSOutputStream can't call close 381 LOG.info("STOPPED first instance of the cluster"); 382 } finally { 383 // Restart the cluster 384 while (cluster.isClusterUp()) { 385 LOG.error("Waiting for cluster to go down"); 386 Thread.sleep(1000); 387 } 388 assertFalse(cluster.isClusterUp()); 389 cluster = null; 390 for (int i = 0; i < 100; i++) { 391 try { 392 cluster = TEST_UTIL.startMiniDFSClusterForTestWAL(namenodePort); 393 break; 394 } catch (BindException e) { 395 LOG.info("Sleeping. BindException bringing up new cluster"); 396 Threads.sleep(1000); 397 } 398 } 399 cluster.waitActive(); 400 fs = cluster.getFileSystem(); 401 LOG.info("STARTED second instance."); 402 } 403 404 // set the lease period to be 1 second so that the 405 // namenode triggers lease recovery upon append request 406 Method setLeasePeriod = 407 cluster.getClass().getDeclaredMethod("setLeasePeriod", new Class[] { Long.TYPE, Long.TYPE }); 408 setLeasePeriod.setAccessible(true); 409 setLeasePeriod.invoke(cluster, 1000L, 1000L); 410 try { 411 Thread.sleep(1000); 412 } catch (InterruptedException e) { 413 LOG.info(e.toString(), e); 414 } 415 416 // Now try recovering the log, like the HMaster would do 417 final FileSystem recoveredFs = fs; 418 final Configuration rlConf = conf; 419 420 class RecoverLogThread extends Thread { 421 public Exception exception = null; 422 423 @Override 424 public void run() { 425 try { 426 RecoverLeaseFSUtils.recoverFileLease(recoveredFs, walPath, rlConf, null); 427 } catch (IOException e) { 428 exception = e; 429 } 430 } 431 } 432 433 RecoverLogThread t = new RecoverLogThread(); 434 t.start(); 435 // Timeout after 60 sec. Without correct patches, would be an infinite loop 436 t.join(60 * 1000); 437 if (t.isAlive()) { 438 t.interrupt(); 439 throw new Exception("Timed out waiting for WAL.recoverLog()"); 440 } 441 442 if (t.exception != null) throw t.exception; 443 444 // Make sure you can read all the content 445 int count = 0; 446 try (NoEOFWALStreamReader reader = NoEOFWALStreamReader.create(wals, fs, walPath)) { 447 WAL.Entry entry = new WAL.Entry(); 448 while (reader.next(entry) != null) { 449 count++; 450 assertTrue(entry.getEdit().getCells().size() == 1, "Should be one KeyValue per WALEdit"); 451 } 452 } 453 assertEquals(total, count); 454 455 // Reset the lease period 456 setLeasePeriod.invoke(cluster, new Object[] { 60000L, 3600000L }); 457 } 458 459 /** 460 * Tests that we can write out an edit, close, and then read it back in again. 461 */ 462 @Test 463 public void testEditAdd() throws IOException { 464 int colCount = 10; 465 TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTestName)) 466 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("column")).build(); 467 NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); 468 for (byte[] fam : htd.getColumnFamilyNames()) { 469 scopes.put(fam, 0); 470 } 471 byte[] row = Bytes.toBytes("row"); 472 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); 473 474 // Write columns named 1, 2, 3, etc. and then values of single byte 475 // 1, 2, 3... 476 long timestamp = EnvironmentEdgeManager.currentTime(); 477 WALEdit cols = new WALEdit(); 478 for (int i = 0; i < colCount; i++) { 479 cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)), 480 timestamp, new byte[] { (byte) (i + '0') })); 481 } 482 RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(row) 483 .setEndKey(Bytes.toBytes(Bytes.toString(row) + "1")).build(); 484 final WAL log = wals.getWAL(info); 485 486 final long txid = log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), 487 htd.getTableName(), EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols); 488 log.sync(txid); 489 log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getColumnFamilyNames()); 490 log.completeCacheFlush(info.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); 491 log.shutdown(); 492 Path filename = AbstractFSWALProvider.getCurrentFileName(log); 493 // Now open a reader on the log and assert append worked. 494 try (NoEOFWALStreamReader reader = NoEOFWALStreamReader.create(wals, fs, filename)) { 495 // Above we added all columns on a single row so we only read one 496 // entry in the below... thats why we have '1'. 497 for (int i = 0; i < 1; i++) { 498 WAL.Entry entry = reader.next(null); 499 if (entry == null) break; 500 WALKey key = entry.getKey(); 501 WALEdit val = entry.getEdit(); 502 assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName())); 503 assertTrue(htd.getTableName().equals(key.getTableName())); 504 Cell cell = val.getCells().get(0); 505 assertTrue(Bytes.equals(row, 0, row.length, cell.getRowArray(), cell.getRowOffset(), 506 cell.getRowLength())); 507 assertEquals((byte) (i + '0'), CellUtil.cloneValue(cell)[0]); 508 LOG.info(key + " " + val); 509 } 510 } 511 } 512 513 @Test 514 public void testAppend() throws IOException { 515 int colCount = 10; 516 TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTestName)) 517 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("column")).build(); 518 NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); 519 for (byte[] fam : htd.getColumnFamilyNames()) { 520 scopes.put(fam, 0); 521 } 522 byte[] row = Bytes.toBytes("row"); 523 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); 524 // Write columns named 1, 2, 3, etc. and then values of single byte 525 // 1, 2, 3... 526 long timestamp = EnvironmentEdgeManager.currentTime(); 527 WALEdit cols = new WALEdit(); 528 for (int i = 0; i < colCount; i++) { 529 cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)), 530 timestamp, new byte[] { (byte) (i + '0') })); 531 } 532 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 533 final WAL log = wals.getWAL(hri); 534 final long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), 535 htd.getTableName(), EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols); 536 log.sync(txid); 537 log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames()); 538 log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); 539 log.shutdown(); 540 Path filename = AbstractFSWALProvider.getCurrentFileName(log); 541 // Now open a reader on the log and assert append worked. 542 try (WALStreamReader reader = wals.createStreamReader(fs, filename)) { 543 WAL.Entry entry = reader.next(); 544 assertEquals(colCount, entry.getEdit().size()); 545 int idx = 0; 546 for (Cell val : entry.getEdit().getCells()) { 547 assertTrue( 548 Bytes.equals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName())); 549 assertTrue(htd.getTableName().equals(entry.getKey().getTableName())); 550 assertTrue(Bytes.equals(row, 0, row.length, val.getRowArray(), val.getRowOffset(), 551 val.getRowLength())); 552 assertEquals((byte) (idx + '0'), CellUtil.cloneValue(val)[0]); 553 System.out.println(entry.getKey() + " " + val); 554 idx++; 555 } 556 } 557 } 558 559 /** 560 * Test that we can visit entries before they are appended 561 */ 562 @Test 563 public void testVisitors() throws Exception { 564 final int COL_COUNT = 10; 565 final TableName tableName = TableName.valueOf(currentTestName); 566 final byte[] row = Bytes.toBytes("row"); 567 final DumbWALActionsListener visitor = new DumbWALActionsListener(); 568 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); 569 long timestamp = EnvironmentEdgeManager.currentTime(); 570 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 571 scopes.put(Bytes.toBytes("column"), 0); 572 573 RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).build(); 574 final WAL log = wals.getWAL(hri); 575 log.registerWALActionsListener(visitor); 576 for (int i = 0; i < COL_COUNT; i++) { 577 WALEdit cols = new WALEdit(); 578 cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)), 579 timestamp, new byte[] { (byte) (i + '0') })); 580 log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, 581 EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols); 582 } 583 log.sync(); 584 assertEquals(COL_COUNT, visitor.increments); 585 log.unregisterWALActionsListener(visitor); 586 WALEdit cols = new WALEdit(); 587 cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(11)), 588 timestamp, new byte[] { (byte) (11 + '0') })); 589 log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, 590 EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols); 591 log.sync(); 592 assertEquals(COL_COUNT, visitor.increments); 593 } 594 595 /** 596 * A loaded WAL coprocessor won't break existing WAL test cases. 597 */ 598 @Test 599 public void testWALCoprocessorLoaded() throws Exception { 600 // test to see whether the coprocessor is loaded or not. 601 WALCoprocessorHost host = wals.getWAL(null).getCoprocessorHost(); 602 Coprocessor c = host.findCoprocessor(SampleRegionWALCoprocessor.class); 603 assertNotNull(c); 604 } 605 606 static class DumbWALActionsListener implements WALActionsListener { 607 int increments = 0; 608 609 @Override 610 public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) { 611 increments++; 612 } 613 } 614 615 @Test 616 public void testWALProviders() throws IOException { 617 Configuration conf = new Configuration(); 618 WALFactory walFactory = new WALFactory(conf, this.currentServername, null); 619 assertEquals(walFactory.getWALProvider().getClass(), walFactory.getMetaProvider().getClass()); 620 621 // if providers are not set and do not enable SyncReplicationWALProvider 622 walFactory = new WALFactory(conf, this.currentServername, null); 623 assertEquals(walFactory.getWALProvider().getClass(), walFactory.getMetaProvider().getClass()); 624 } 625 626 @Test 627 public void testOnlySetWALProvider() throws IOException { 628 Configuration conf = new Configuration(); 629 conf.set(WAL_PROVIDER, WALFactory.Providers.multiwal.name()); 630 WALFactory walFactory = new WALFactory(conf, this.currentServername, null); 631 // class of WALProvider and metaWALProvider are the same when metaWALProvider is not set 632 assertEquals(WALFactory.Providers.multiwal.clazz, walFactory.getWALProvider().getClass()); 633 assertEquals(WALFactory.Providers.multiwal.clazz, walFactory.getMetaProvider().getClass()); 634 } 635 636 @Test 637 public void testOnlySetMetaWALProvider() throws IOException { 638 Configuration conf = new Configuration(); 639 conf.set(META_WAL_PROVIDER, WALFactory.Providers.asyncfs.name()); 640 WALFactory walFactory = new WALFactory(conf, this.currentServername, null); 641 assertEquals(WALFactory.Providers.defaultProvider.clazz, 642 walFactory.getWALProvider().getClass()); 643 assertEquals(WALFactory.Providers.asyncfs.clazz, walFactory.getMetaProvider().getClass()); 644 } 645 646 @Test 647 public void testDefaultProvider() throws IOException { 648 final Configuration conf = new Configuration(); 649 // AsyncFSWal is the default, we should be able to request any WAL. 650 final WALFactory normalWalFactory = new WALFactory(conf, this.currentServername, null); 651 Class<? extends WALProvider> fshLogProvider = 652 normalWalFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.filesystem.name()); 653 assertEquals(Providers.filesystem.clazz, fshLogProvider); 654 655 // Imagine a world where MultiWAL is the default 656 final WALFactory customizedWalFactory = new WALFactory(conf, this.currentServername, null) { 657 @Override 658 Providers getDefaultProvider() { 659 return Providers.multiwal; 660 } 661 }; 662 // If we don't specify a WALProvider, we should get the default implementation. 663 Class<? extends WALProvider> multiwalProviderClass = 664 customizedWalFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.multiwal.name()); 665 assertEquals(Providers.multiwal.clazz, multiwalProviderClass); 666 } 667 668 @Test 669 public void testCustomProvider() throws IOException { 670 final Configuration config = new Configuration(); 671 config.set(WALFactory.WAL_PROVIDER, IOTestProvider.class.getName()); 672 final WALFactory walFactory = new WALFactory(config, this.currentServername, null); 673 Class<? extends WALProvider> walProvider = 674 walFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.filesystem.name()); 675 assertEquals(IOTestProvider.class, walProvider); 676 WALProvider metaWALProvider = walFactory.getMetaProvider(); 677 assertEquals(IOTestProvider.class, metaWALProvider.getClass()); 678 } 679 680 @Test 681 public void testCustomMetaProvider() throws IOException { 682 final Configuration config = new Configuration(); 683 config.set(WALFactory.META_WAL_PROVIDER, IOTestProvider.class.getName()); 684 final WALFactory walFactory = new WALFactory(config, this.currentServername, null); 685 Class<? extends WALProvider> walProvider = 686 walFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.filesystem.name()); 687 assertEquals(Providers.filesystem.clazz, walProvider); 688 WALProvider metaWALProvider = walFactory.getMetaProvider(); 689 assertEquals(IOTestProvider.class, metaWALProvider.getClass()); 690 } 691 692 @Test 693 public void testCustomReplicationProvider() throws IOException { 694 final Configuration config = new Configuration(); 695 config.set(WALFactory.REPLICATION_WAL_PROVIDER, IOTestProvider.class.getName()); 696 final WALFactory walFactory = new WALFactory(config, this.currentServername, null); 697 Class<? extends WALProvider> walProvider = 698 walFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.filesystem.name()); 699 assertEquals(Providers.filesystem.clazz, walProvider); 700 WALProvider replicationWALProvider = walFactory.getReplicationProvider(); 701 assertEquals(IOTestProvider.class, replicationWALProvider.getClass()); 702 } 703 704 /** 705 * Confirm that we will use different WALs for hbase:meta and hbase:replication 706 */ 707 @Test 708 public void testDifferentWALs() throws IOException { 709 WAL normalWAL = wals.getWAL(null); 710 WAL metaWAL = wals.getWAL(RegionInfoBuilder.FIRST_META_REGIONINFO); 711 WAL replicationWAL = wals.getWAL(RegionInfoBuilder 712 .newBuilder(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME_DEFAULT).build()); 713 assertNotSame(normalWAL, metaWAL); 714 assertNotSame(normalWAL, replicationWAL); 715 assertNotSame(metaWAL, replicationWAL); 716 } 717 718 @Test 719 public void testReaderClosedOnBadCodec() throws IOException { 720 // Create our own Configuration and WALFactory to avoid breaking other test methods 721 Configuration confWithCodec = new Configuration(conf); 722 confWithCodec.setClass(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, BrokenWALCellCodec.class, 723 Codec.class); 724 WALFactory customFactory = new WALFactory(confWithCodec, this.currentServername.toString()); 725 726 // Hack a Proxy over the FileSystem so that we can track the InputStreams opened by 727 // the FileSystem and know if close() was called on those InputStreams. 728 List<InputStreamProxy> openedReaders = new ArrayList<>(); 729 FileSystemProxy proxyFs = new FileSystemProxy(fs) { 730 @Override 731 public FSDataInputStream open(Path p) throws IOException { 732 InputStreamProxy is = new InputStreamProxy(super.open(p)); 733 openedReaders.add(is); 734 return is; 735 } 736 737 @Override 738 public FSDataInputStream open(Path p, int blockSize) throws IOException { 739 InputStreamProxy is = new InputStreamProxy(super.open(p, blockSize)); 740 openedReaders.add(is); 741 return is; 742 } 743 }; 744 745 final TableDescriptor htd = 746 TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTestName)) 747 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("column")).build(); 748 final RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 749 750 NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); 751 for (byte[] fam : htd.getColumnFamilyNames()) { 752 scopes.put(fam, 0); 753 } 754 byte[] row = Bytes.toBytes("row"); 755 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); 756 // Write one column in one edit. 757 WALEdit cols = new WALEdit(); 758 cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes("0"), 759 EnvironmentEdgeManager.currentTime(), new byte[] { 0 })); 760 final WAL log = customFactory.getWAL(hri); 761 final long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), 762 htd.getTableName(), EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols); 763 // Sync the edit to the WAL 764 log.sync(txid); 765 log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames()); 766 log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); 767 log.shutdown(); 768 769 // Inject our failure, object is constructed via reflection. 770 BrokenWALCellCodec.THROW_FAILURE_ON_INIT.set(true); 771 772 // Now open a reader on the log which will throw an exception when 773 // we try to instantiate the custom Codec. 774 Path filename = AbstractFSWALProvider.getCurrentFileName(log); 775 assertThrows(IOException.class, () -> customFactory.createStreamReader(proxyFs, filename), 776 "Expected to see an exception when creating WAL reader"); 777 // We should have exactly one reader 778 assertEquals(1, openedReaders.size()); 779 // And that reader should be closed. 780 long unclosedReaders = 781 openedReaders.stream().filter((r) -> !r.isClosed.get()).collect(Collectors.counting()); 782 assertEquals(0, unclosedReaders, "Should not find any open readers"); 783 } 784 785 /** 786 * A proxy around FSDataInputStream which can report if close() was called. 787 */ 788 private static class InputStreamProxy extends FSDataInputStream { 789 private final InputStream real; 790 private final AtomicBoolean isClosed = new AtomicBoolean(false); 791 792 public InputStreamProxy(InputStream real) { 793 super(real); 794 this.real = real; 795 } 796 797 @Override 798 public void close() throws IOException { 799 isClosed.set(true); 800 real.close(); 801 } 802 } 803 804 /** 805 * A custom WALCellCodec in which we can inject failure. 806 */ 807 @SuppressWarnings("unused") 808 private static class BrokenWALCellCodec extends WALCellCodec { 809 static final AtomicBoolean THROW_FAILURE_ON_INIT = new AtomicBoolean(false); 810 811 static void maybeInjectFailure() { 812 if (THROW_FAILURE_ON_INIT.get()) { 813 throw new RuntimeException("Injected instantiation exception"); 814 } 815 } 816 817 public BrokenWALCellCodec() { 818 super(); 819 maybeInjectFailure(); 820 } 821 822 public BrokenWALCellCodec(Configuration conf, CompressionContext compression) { 823 super(conf, compression); 824 maybeInjectFailure(); 825 } 826 } 827}