001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.apache.hadoop.hbase.wal.WALFactory.META_WAL_PROVIDER; 021import static org.apache.hadoop.hbase.wal.WALFactory.WAL_PROVIDER; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertFalse; 024import static org.junit.Assert.assertNotNull; 025import static org.junit.Assert.assertTrue; 026import static org.junit.Assert.fail; 027 028import java.io.IOException; 029import java.io.InputStream; 030import java.lang.reflect.Method; 031import java.net.BindException; 032import java.util.ArrayList; 033import java.util.List; 034import java.util.NavigableMap; 035import java.util.TreeMap; 036import java.util.concurrent.atomic.AtomicBoolean; 037import java.util.stream.Collectors; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.fs.FSDataInputStream; 040import org.apache.hadoop.fs.FSDataOutputStream; 041import org.apache.hadoop.fs.FileStatus; 042import org.apache.hadoop.fs.FileSystem; 043import org.apache.hadoop.fs.Path; 044import org.apache.hadoop.hbase.Cell; 045import org.apache.hadoop.hbase.CellUtil; 046import org.apache.hadoop.hbase.Coprocessor; 047import org.apache.hadoop.hbase.HBaseClassTestRule; 048import org.apache.hadoop.hbase.HBaseTestingUtility; 049import org.apache.hadoop.hbase.HConstants; 050import org.apache.hadoop.hbase.KeyValue; 051import org.apache.hadoop.hbase.ServerName; 052import org.apache.hadoop.hbase.TableName; 053import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 054import org.apache.hadoop.hbase.client.RegionInfo; 055import org.apache.hadoop.hbase.client.RegionInfoBuilder; 056import org.apache.hadoop.hbase.client.TableDescriptor; 057import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 058import org.apache.hadoop.hbase.codec.Codec; 059import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 060import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor; 061import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 062import org.apache.hadoop.hbase.regionserver.wal.CompressionContext; 063import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 064import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; 065import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost; 066import org.apache.hadoop.hbase.testclassification.MediumTests; 067import org.apache.hadoop.hbase.testclassification.RegionServerTests; 068import org.apache.hadoop.hbase.util.Bytes; 069import org.apache.hadoop.hbase.util.CommonFSUtils; 070import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 071import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; 072import org.apache.hadoop.hbase.util.Threads; 073import org.apache.hadoop.hbase.wal.WALFactory.Providers; 074import org.apache.hadoop.hdfs.DistributedFileSystem; 075import org.apache.hadoop.hdfs.MiniDFSCluster; 076import org.apache.hadoop.hdfs.protocol.HdfsConstants; 077import org.junit.After; 078import org.junit.AfterClass; 079import org.junit.Before; 080import org.junit.BeforeClass; 081import org.junit.ClassRule; 082import org.junit.Rule; 083import org.junit.Test; 084import org.junit.experimental.categories.Category; 085import org.junit.rules.TestName; 086import org.slf4j.Logger; 087import org.slf4j.LoggerFactory; 088 089/** 090 * WAL tests that can be reused across providers. 091 */ 092@Category({ RegionServerTests.class, MediumTests.class }) 093public class TestWALFactory { 094 095 @ClassRule 096 public static final HBaseClassTestRule CLASS_RULE = 097 HBaseClassTestRule.forClass(TestWALFactory.class); 098 099 private static final Logger LOG = LoggerFactory.getLogger(TestWALFactory.class); 100 101 protected static Configuration conf; 102 private static MiniDFSCluster cluster; 103 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 104 protected static Path hbaseDir; 105 protected static Path hbaseWALDir; 106 107 protected FileSystem fs; 108 protected Path dir; 109 protected WALFactory wals; 110 private ServerName currentServername; 111 112 @Rule 113 public final TestName currentTest = new TestName(); 114 115 @Before 116 public void setUp() throws Exception { 117 fs = cluster.getFileSystem(); 118 dir = new Path(hbaseDir, currentTest.getMethodName()); 119 this.currentServername = ServerName.valueOf(currentTest.getMethodName(), 16010, 1); 120 wals = new WALFactory(conf, this.currentServername.toString()); 121 } 122 123 @After 124 public void tearDown() throws Exception { 125 // testAppendClose closes the FileSystem, which will prevent us from closing cleanly here. 126 try { 127 wals.close(); 128 } catch (IOException exception) { 129 LOG.warn("Encountered exception while closing wal factory. If you have other errors, this" 130 + " may be the cause. Message: " + exception); 131 LOG.debug("Exception details for failure to close wal factory.", exception); 132 } 133 FileStatus[] entries = fs.listStatus(new Path("/")); 134 for (FileStatus dir : entries) { 135 fs.delete(dir.getPath(), true); 136 } 137 } 138 139 @BeforeClass 140 public static void setUpBeforeClass() throws Exception { 141 CommonFSUtils.setWALRootDir(TEST_UTIL.getConfiguration(), new Path("file:///tmp/wal")); 142 // Make block sizes small. 143 TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024); 144 // needed for testAppendClose() 145 // quicker heartbeat interval for faster DN death notification 146 TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000); 147 TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1); 148 TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000); 149 150 // faster failover with cluster.shutdown();fs.close() idiom 151 TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connect.max.retries", 1); 152 TEST_UTIL.getConfiguration().setInt("dfs.client.block.recovery.retries", 1); 153 TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connection.maxidletime", 500); 154 TEST_UTIL.getConfiguration().setInt("hbase.lease.recovery.timeout", 10000); 155 TEST_UTIL.getConfiguration().setInt("hbase.lease.recovery.dfs.timeout", 1000); 156 TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, 157 SampleRegionWALCoprocessor.class.getName()); 158 TEST_UTIL.startMiniDFSCluster(3); 159 160 conf = TEST_UTIL.getConfiguration(); 161 cluster = TEST_UTIL.getDFSCluster(); 162 163 hbaseDir = TEST_UTIL.createRootDir(); 164 hbaseWALDir = TEST_UTIL.createWALRootDir(); 165 } 166 167 @AfterClass 168 public static void tearDownAfterClass() throws Exception { 169 TEST_UTIL.shutdownMiniCluster(); 170 } 171 172 @Test 173 public void canCloseSingleton() throws IOException { 174 WALFactory.getInstance(conf).close(); 175 } 176 177 /** 178 * Just write multiple logs then split. Before fix for HADOOP-2283, this would fail. n 179 */ 180 @Test 181 public void testSplit() throws IOException { 182 final TableName tableName = TableName.valueOf(currentTest.getMethodName()); 183 final byte[] rowName = tableName.getName(); 184 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); 185 final int howmany = 3; 186 RegionInfo[] infos = new RegionInfo[3]; 187 Path tableDataDir = CommonFSUtils.getTableDir(hbaseDir, tableName); 188 fs.mkdirs(tableDataDir); 189 Path tabledir = CommonFSUtils.getWALTableDir(conf, tableName); 190 fs.mkdirs(tabledir); 191 for (int i = 0; i < howmany; i++) { 192 infos[i] = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("" + i)) 193 .setEndKey(Bytes.toBytes("" + (i + 1))).build(); 194 fs.mkdirs(new Path(tabledir, infos[i].getEncodedName())); 195 fs.mkdirs(new Path(tableDataDir, infos[i].getEncodedName())); 196 LOG.info("allo " + new Path(tabledir, infos[i].getEncodedName()).toString()); 197 } 198 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 199 scopes.put(Bytes.toBytes("column"), 0); 200 201 // Add edits for three regions. 202 for (int ii = 0; ii < howmany; ii++) { 203 for (int i = 0; i < howmany; i++) { 204 final WAL log = wals.getWAL(infos[i]); 205 for (int j = 0; j < howmany; j++) { 206 WALEdit edit = new WALEdit(); 207 byte[] family = Bytes.toBytes("column"); 208 byte[] qualifier = Bytes.toBytes(Integer.toString(j)); 209 byte[] column = Bytes.toBytes("column:" + Integer.toString(j)); 210 edit.add( 211 new KeyValue(rowName, family, qualifier, EnvironmentEdgeManager.currentTime(), column)); 212 LOG.info("Region " + i + ": " + edit); 213 WALKeyImpl walKey = new WALKeyImpl(infos[i].getEncodedNameAsBytes(), tableName, 214 EnvironmentEdgeManager.currentTime(), mvcc, scopes); 215 log.appendData(infos[i], walKey, edit); 216 walKey.getWriteEntry(); 217 } 218 log.sync(); 219 log.rollWriter(true); 220 } 221 } 222 wals.shutdown(); 223 // The below calculation of logDir relies on insider information... WALSplitter should be 224 // connected better 225 // with the WAL system.... not requiring explicit path. The oldLogDir is just made up not used. 226 Path logDir = new Path(new Path(hbaseWALDir, HConstants.HREGION_LOGDIR_NAME), 227 this.currentServername.toString()); 228 Path oldLogDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME); 229 List<Path> splits = WALSplitter.split(hbaseWALDir, logDir, oldLogDir, fs, conf, wals); 230 verifySplits(splits, howmany); 231 } 232 233 /** 234 * Test new HDFS-265 sync. n 235 */ 236 @Test 237 public void Broken_testSync() throws Exception { 238 TableName tableName = TableName.valueOf(currentTest.getMethodName()); 239 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); 240 // First verify that using streams all works. 241 Path p = new Path(dir, currentTest.getMethodName() + ".fsdos"); 242 FSDataOutputStream out = fs.create(p); 243 out.write(tableName.getName()); 244 Method syncMethod = null; 245 try { 246 syncMethod = out.getClass().getMethod("hflush", new Class<?>[] {}); 247 } catch (NoSuchMethodException e) { 248 try { 249 syncMethod = out.getClass().getMethod("sync", new Class<?>[] {}); 250 } catch (NoSuchMethodException ex) { 251 fail("This version of Hadoop supports neither Syncable.sync() " + "nor Syncable.hflush()."); 252 } 253 } 254 syncMethod.invoke(out, new Object[] {}); 255 FSDataInputStream in = fs.open(p); 256 assertTrue(in.available() > 0); 257 byte[] buffer = new byte[1024]; 258 int read = in.read(buffer); 259 assertEquals(tableName.getName().length, read); 260 out.close(); 261 in.close(); 262 263 final int total = 20; 264 WAL.Reader reader = null; 265 266 try { 267 RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); 268 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 269 scopes.put(tableName.getName(), 0); 270 final WAL wal = wals.getWAL(info); 271 272 for (int i = 0; i < total; i++) { 273 WALEdit kvs = new WALEdit(); 274 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); 275 wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 276 EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs); 277 } 278 // Now call sync and try reading. Opening a Reader before you sync just 279 // gives you EOFE. 280 wal.sync(); 281 // Open a Reader. 282 Path walPath = AbstractFSWALProvider.getCurrentFileName(wal); 283 reader = wals.createReader(fs, walPath); 284 int count = 0; 285 WAL.Entry entry = new WAL.Entry(); 286 while ((entry = reader.next(entry)) != null) 287 count++; 288 assertEquals(total, count); 289 reader.close(); 290 // Add test that checks to see that an open of a Reader works on a file 291 // that has had a sync done on it. 292 for (int i = 0; i < total; i++) { 293 WALEdit kvs = new WALEdit(); 294 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); 295 wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 296 EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs); 297 } 298 wal.sync(); 299 reader = wals.createReader(fs, walPath); 300 count = 0; 301 while ((entry = reader.next(entry)) != null) 302 count++; 303 assertTrue(count >= total); 304 reader.close(); 305 // If I sync, should see double the edits. 306 wal.sync(); 307 reader = wals.createReader(fs, walPath); 308 count = 0; 309 while ((entry = reader.next(entry)) != null) 310 count++; 311 assertEquals(total * 2, count); 312 reader.close(); 313 // Now do a test that ensures stuff works when we go over block boundary, 314 // especially that we return good length on file. 315 final byte[] value = new byte[1025 * 1024]; // Make a 1M value. 316 for (int i = 0; i < total; i++) { 317 WALEdit kvs = new WALEdit(); 318 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value)); 319 wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 320 EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs); 321 } 322 // Now I should have written out lots of blocks. Sync then read. 323 wal.sync(); 324 reader = wals.createReader(fs, walPath); 325 count = 0; 326 while ((entry = reader.next(entry)) != null) 327 count++; 328 assertEquals(total * 3, count); 329 reader.close(); 330 // shutdown and ensure that Reader gets right length also. 331 wal.shutdown(); 332 reader = wals.createReader(fs, walPath); 333 count = 0; 334 while ((entry = reader.next(entry)) != null) 335 count++; 336 assertEquals(total * 3, count); 337 reader.close(); 338 } finally { 339 if (reader != null) reader.close(); 340 } 341 } 342 343 private void verifySplits(final List<Path> splits, final int howmany) throws IOException { 344 assertEquals(howmany * howmany, splits.size()); 345 for (int i = 0; i < splits.size(); i++) { 346 LOG.info("Verifying=" + splits.get(i)); 347 WAL.Reader reader = wals.createReader(fs, splits.get(i)); 348 try { 349 int count = 0; 350 String previousRegion = null; 351 long seqno = -1; 352 WAL.Entry entry = new WAL.Entry(); 353 while ((entry = reader.next(entry)) != null) { 354 WALKey key = entry.getKey(); 355 String region = Bytes.toString(key.getEncodedRegionName()); 356 // Assert that all edits are for same region. 357 if (previousRegion != null) { 358 assertEquals(previousRegion, region); 359 } 360 LOG.info("oldseqno=" + seqno + ", newseqno=" + key.getSequenceId()); 361 assertTrue(seqno < key.getSequenceId()); 362 seqno = key.getSequenceId(); 363 previousRegion = region; 364 count++; 365 } 366 assertEquals(howmany, count); 367 } finally { 368 reader.close(); 369 } 370 } 371 } 372 373 /* 374 * We pass different values to recoverFileLease() so that different code paths are covered For 375 * this test to pass, requires: 1. HDFS-200 (append support) 2. HDFS-988 (SafeMode should freeze 376 * file operations [FSNamesystem.nextGenerationStampForBlock]) 3. HDFS-142 (on restart, maintain 377 * pendingCreates) 378 */ 379 @Test 380 public void testAppendClose() throws Exception { 381 TableName tableName = TableName.valueOf(currentTest.getMethodName()); 382 RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).build(); 383 384 WAL wal = wals.getWAL(regionInfo); 385 int total = 20; 386 387 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 388 scopes.put(tableName.getName(), 0); 389 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 390 for (int i = 0; i < total; i++) { 391 WALEdit kvs = new WALEdit(); 392 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); 393 wal.appendData(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName, 394 EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs); 395 } 396 // Now call sync to send the data to HDFS datanodes 397 wal.sync(); 398 int namenodePort = cluster.getNameNodePort(); 399 final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal); 400 401 // Stop the cluster. (ensure restart since we're sharing MiniDFSCluster) 402 try { 403 DistributedFileSystem dfs = cluster.getFileSystem(); 404 dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER); 405 TEST_UTIL.shutdownMiniDFSCluster(); 406 try { 407 // wal.writer.close() will throw an exception, 408 // but still call this since it closes the LogSyncer thread first 409 wal.shutdown(); 410 } catch (IOException e) { 411 LOG.info(e.toString(), e); 412 } 413 fs.close(); // closing FS last so DFSOutputStream can't call close 414 LOG.info("STOPPED first instance of the cluster"); 415 } finally { 416 // Restart the cluster 417 while (cluster.isClusterUp()) { 418 LOG.error("Waiting for cluster to go down"); 419 Thread.sleep(1000); 420 } 421 assertFalse(cluster.isClusterUp()); 422 cluster = null; 423 for (int i = 0; i < 100; i++) { 424 try { 425 cluster = TEST_UTIL.startMiniDFSClusterForTestWAL(namenodePort); 426 break; 427 } catch (BindException e) { 428 LOG.info("Sleeping. BindException bringing up new cluster"); 429 Threads.sleep(1000); 430 } 431 } 432 cluster.waitActive(); 433 fs = cluster.getFileSystem(); 434 LOG.info("STARTED second instance."); 435 } 436 437 // set the lease period to be 1 second so that the 438 // namenode triggers lease recovery upon append request 439 Method setLeasePeriod = 440 cluster.getClass().getDeclaredMethod("setLeasePeriod", new Class[] { Long.TYPE, Long.TYPE }); 441 setLeasePeriod.setAccessible(true); 442 setLeasePeriod.invoke(cluster, 1000L, 1000L); 443 try { 444 Thread.sleep(1000); 445 } catch (InterruptedException e) { 446 LOG.info(e.toString(), e); 447 } 448 449 // Now try recovering the log, like the HMaster would do 450 final FileSystem recoveredFs = fs; 451 final Configuration rlConf = conf; 452 453 class RecoverLogThread extends Thread { 454 public Exception exception = null; 455 456 @Override 457 public void run() { 458 try { 459 RecoverLeaseFSUtils.recoverFileLease(recoveredFs, walPath, rlConf, null); 460 } catch (IOException e) { 461 exception = e; 462 } 463 } 464 } 465 466 RecoverLogThread t = new RecoverLogThread(); 467 t.start(); 468 // Timeout after 60 sec. Without correct patches, would be an infinite loop 469 t.join(60 * 1000); 470 if (t.isAlive()) { 471 t.interrupt(); 472 throw new Exception("Timed out waiting for WAL.recoverLog()"); 473 } 474 475 if (t.exception != null) throw t.exception; 476 477 // Make sure you can read all the content 478 WAL.Reader reader = wals.createReader(fs, walPath); 479 int count = 0; 480 WAL.Entry entry = new WAL.Entry(); 481 while (reader.next(entry) != null) { 482 count++; 483 assertTrue("Should be one KeyValue per WALEdit", entry.getEdit().getCells().size() == 1); 484 } 485 assertEquals(total, count); 486 reader.close(); 487 488 // Reset the lease period 489 setLeasePeriod.invoke(cluster, new Object[] { 60000L, 3600000L }); 490 } 491 492 /** 493 * Tests that we can write out an edit, close, and then read it back in again. 494 */ 495 @Test 496 public void testEditAdd() throws IOException { 497 int colCount = 10; 498 TableDescriptor htd = 499 TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName())) 500 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("column")).build(); 501 NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); 502 for (byte[] fam : htd.getColumnFamilyNames()) { 503 scopes.put(fam, 0); 504 } 505 byte[] row = Bytes.toBytes("row"); 506 WAL.Reader reader = null; 507 try { 508 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); 509 510 // Write columns named 1, 2, 3, etc. and then values of single byte 511 // 1, 2, 3... 512 long timestamp = EnvironmentEdgeManager.currentTime(); 513 WALEdit cols = new WALEdit(); 514 for (int i = 0; i < colCount; i++) { 515 cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)), 516 timestamp, new byte[] { (byte) (i + '0') })); 517 } 518 RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(row) 519 .setEndKey(Bytes.toBytes(Bytes.toString(row) + "1")).build(); 520 final WAL log = wals.getWAL(info); 521 522 final long txid = log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), 523 htd.getTableName(), EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols); 524 log.sync(txid); 525 log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getColumnFamilyNames()); 526 log.completeCacheFlush(info.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); 527 log.shutdown(); 528 Path filename = AbstractFSWALProvider.getCurrentFileName(log); 529 // Now open a reader on the log and assert append worked. 530 reader = wals.createReader(fs, filename); 531 // Above we added all columns on a single row so we only read one 532 // entry in the below... thats why we have '1'. 533 for (int i = 0; i < 1; i++) { 534 WAL.Entry entry = reader.next(null); 535 if (entry == null) break; 536 WALKey key = entry.getKey(); 537 WALEdit val = entry.getEdit(); 538 assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName())); 539 assertTrue(htd.getTableName().equals(key.getTableName())); 540 Cell cell = val.getCells().get(0); 541 assertTrue(Bytes.equals(row, 0, row.length, cell.getRowArray(), cell.getRowOffset(), 542 cell.getRowLength())); 543 assertEquals((byte) (i + '0'), CellUtil.cloneValue(cell)[0]); 544 System.out.println(key + " " + val); 545 } 546 } finally { 547 if (reader != null) { 548 reader.close(); 549 } 550 } 551 } 552 553 @Test 554 public void testAppend() throws IOException { 555 int colCount = 10; 556 TableDescriptor htd = 557 TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName())) 558 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("column")).build(); 559 NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); 560 for (byte[] fam : htd.getColumnFamilyNames()) { 561 scopes.put(fam, 0); 562 } 563 byte[] row = Bytes.toBytes("row"); 564 WAL.Reader reader = null; 565 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); 566 try { 567 // Write columns named 1, 2, 3, etc. and then values of single byte 568 // 1, 2, 3... 569 long timestamp = EnvironmentEdgeManager.currentTime(); 570 WALEdit cols = new WALEdit(); 571 for (int i = 0; i < colCount; i++) { 572 cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)), 573 timestamp, new byte[] { (byte) (i + '0') })); 574 } 575 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 576 final WAL log = wals.getWAL(hri); 577 final long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), 578 htd.getTableName(), EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols); 579 log.sync(txid); 580 log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames()); 581 log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); 582 log.shutdown(); 583 Path filename = AbstractFSWALProvider.getCurrentFileName(log); 584 // Now open a reader on the log and assert append worked. 585 reader = wals.createReader(fs, filename); 586 WAL.Entry entry = reader.next(); 587 assertEquals(colCount, entry.getEdit().size()); 588 int idx = 0; 589 for (Cell val : entry.getEdit().getCells()) { 590 assertTrue( 591 Bytes.equals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName())); 592 assertTrue(htd.getTableName().equals(entry.getKey().getTableName())); 593 assertTrue(Bytes.equals(row, 0, row.length, val.getRowArray(), val.getRowOffset(), 594 val.getRowLength())); 595 assertEquals((byte) (idx + '0'), CellUtil.cloneValue(val)[0]); 596 System.out.println(entry.getKey() + " " + val); 597 idx++; 598 } 599 } finally { 600 if (reader != null) { 601 reader.close(); 602 } 603 } 604 } 605 606 /** 607 * Test that we can visit entries before they are appended n 608 */ 609 @Test 610 public void testVisitors() throws Exception { 611 final int COL_COUNT = 10; 612 final TableName tableName = TableName.valueOf(currentTest.getMethodName()); 613 final byte[] row = Bytes.toBytes("row"); 614 final DumbWALActionsListener visitor = new DumbWALActionsListener(); 615 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); 616 long timestamp = EnvironmentEdgeManager.currentTime(); 617 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 618 scopes.put(Bytes.toBytes("column"), 0); 619 620 RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).build(); 621 final WAL log = wals.getWAL(hri); 622 log.registerWALActionsListener(visitor); 623 for (int i = 0; i < COL_COUNT; i++) { 624 WALEdit cols = new WALEdit(); 625 cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)), 626 timestamp, new byte[] { (byte) (i + '0') })); 627 log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, 628 EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols); 629 } 630 log.sync(); 631 assertEquals(COL_COUNT, visitor.increments); 632 log.unregisterWALActionsListener(visitor); 633 WALEdit cols = new WALEdit(); 634 cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(11)), 635 timestamp, new byte[] { (byte) (11 + '0') })); 636 log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, 637 EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols); 638 log.sync(); 639 assertEquals(COL_COUNT, visitor.increments); 640 } 641 642 /** 643 * A loaded WAL coprocessor won't break existing WAL test cases. 644 */ 645 @Test 646 public void testWALCoprocessorLoaded() throws Exception { 647 // test to see whether the coprocessor is loaded or not. 648 WALCoprocessorHost host = wals.getWAL(null).getCoprocessorHost(); 649 Coprocessor c = host.findCoprocessor(SampleRegionWALCoprocessor.class); 650 assertNotNull(c); 651 } 652 653 static class DumbWALActionsListener implements WALActionsListener { 654 int increments = 0; 655 656 @Override 657 public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) { 658 increments++; 659 } 660 } 661 662 @Test 663 public void testWALProviders() throws IOException { 664 Configuration conf = new Configuration(); 665 WALFactory walFactory = new WALFactory(conf, this.currentServername.toString()); 666 assertEquals(walFactory.getWALProvider().getClass(), walFactory.getMetaProvider().getClass()); 667 } 668 669 @Test 670 public void testOnlySetWALProvider() throws IOException { 671 Configuration conf = new Configuration(); 672 conf.set(WAL_PROVIDER, WALFactory.Providers.multiwal.name()); 673 WALFactory walFactory = new WALFactory(conf, this.currentServername.toString()); 674 675 assertEquals(WALFactory.Providers.multiwal.clazz, walFactory.getWALProvider().getClass()); 676 assertEquals(WALFactory.Providers.multiwal.clazz, walFactory.getMetaProvider().getClass()); 677 } 678 679 @Test 680 public void testOnlySetMetaWALProvider() throws IOException { 681 Configuration conf = new Configuration(); 682 conf.set(META_WAL_PROVIDER, WALFactory.Providers.asyncfs.name()); 683 WALFactory walFactory = new WALFactory(conf, this.currentServername.toString()); 684 685 assertEquals(WALFactory.Providers.defaultProvider.clazz, 686 walFactory.getWALProvider().getClass()); 687 assertEquals(WALFactory.Providers.asyncfs.clazz, walFactory.getMetaProvider().getClass()); 688 } 689 690 @Test 691 public void testDefaultProvider() throws IOException { 692 final Configuration conf = new Configuration(); 693 // AsyncFSWal is the default, we should be able to request any WAL. 694 final WALFactory normalWalFactory = new WALFactory(conf, this.currentServername.toString()); 695 Class<? extends WALProvider> fshLogProvider = 696 normalWalFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.filesystem.name()); 697 assertEquals(Providers.filesystem.clazz, fshLogProvider); 698 699 // Imagine a world where MultiWAL is the default 700 final WALFactory customizedWalFactory = 701 new WALFactory(conf, this.currentServername.toString()) { 702 @Override 703 Providers getDefaultProvider() { 704 return Providers.multiwal; 705 } 706 }; 707 // If we don't specify a WALProvider, we should get the default implementation. 708 Class<? extends WALProvider> multiwalProviderClass = 709 customizedWalFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.multiwal.name()); 710 assertEquals(Providers.multiwal.clazz, multiwalProviderClass); 711 } 712 713 @Test 714 public void testCustomProvider() throws IOException { 715 final Configuration config = new Configuration(); 716 config.set(WALFactory.WAL_PROVIDER, IOTestProvider.class.getName()); 717 final WALFactory walFactory = new WALFactory(config, this.currentServername.toString()); 718 Class<? extends WALProvider> walProvider = 719 walFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.filesystem.name()); 720 assertEquals(IOTestProvider.class, walProvider); 721 WALProvider metaWALProvider = walFactory.getMetaProvider(); 722 assertEquals(IOTestProvider.class, metaWALProvider.getClass()); 723 } 724 725 @Test 726 public void testCustomMetaProvider() throws IOException { 727 final Configuration config = new Configuration(); 728 config.set(WALFactory.META_WAL_PROVIDER, IOTestProvider.class.getName()); 729 final WALFactory walFactory = new WALFactory(config, this.currentServername.toString()); 730 Class<? extends WALProvider> walProvider = 731 walFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.filesystem.name()); 732 assertEquals(Providers.filesystem.clazz, walProvider); 733 WALProvider metaWALProvider = walFactory.getMetaProvider(); 734 assertEquals(IOTestProvider.class, metaWALProvider.getClass()); 735 } 736 737 @Test 738 public void testReaderClosedOnBadCodec() throws IOException { 739 // Create our own Configuration and WALFactory to avoid breaking other test methods 740 Configuration confWithCodec = new Configuration(conf); 741 confWithCodec.setClass(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, BrokenWALCellCodec.class, 742 Codec.class); 743 WALFactory customFactory = new WALFactory(confWithCodec, this.currentServername.toString()); 744 745 // Hack a Proxy over the FileSystem so that we can track the InputStreams opened by 746 // the FileSystem and know if close() was called on those InputStreams. 747 List<InputStreamProxy> openedReaders = new ArrayList<>(); 748 FileSystemProxy proxyFs = new FileSystemProxy(fs) { 749 @Override 750 public FSDataInputStream open(Path p) throws IOException { 751 InputStreamProxy is = new InputStreamProxy(super.open(p)); 752 openedReaders.add(is); 753 return is; 754 } 755 756 @Override 757 public FSDataInputStream open(Path p, int blockSize) throws IOException { 758 InputStreamProxy is = new InputStreamProxy(super.open(p, blockSize)); 759 openedReaders.add(is); 760 return is; 761 } 762 }; 763 764 final TableDescriptor htd = 765 TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName())) 766 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("column")).build(); 767 final RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 768 769 NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); 770 for (byte[] fam : htd.getColumnFamilyNames()) { 771 scopes.put(fam, 0); 772 } 773 byte[] row = Bytes.toBytes("row"); 774 WAL.Reader reader = null; 775 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); 776 try { 777 // Write one column in one edit. 778 WALEdit cols = new WALEdit(); 779 cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes("0"), 780 EnvironmentEdgeManager.currentTime(), new byte[] { 0 })); 781 final WAL log = customFactory.getWAL(hri); 782 final long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), 783 htd.getTableName(), EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols); 784 // Sync the edit to the WAL 785 log.sync(txid); 786 log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames()); 787 log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); 788 log.shutdown(); 789 790 // Inject our failure, object is constructed via reflection. 791 BrokenWALCellCodec.THROW_FAILURE_ON_INIT.set(true); 792 793 // Now open a reader on the log which will throw an exception when 794 // we try to instantiate the custom Codec. 795 Path filename = AbstractFSWALProvider.getCurrentFileName(log); 796 try { 797 reader = customFactory.createReader(proxyFs, filename); 798 fail("Expected to see an exception when creating WAL reader"); 799 } catch (Exception e) { 800 // Expected that we get an exception 801 } 802 // We should have exactly one reader 803 assertEquals(1, openedReaders.size()); 804 // And that reader should be closed. 805 long unclosedReaders = 806 openedReaders.stream().filter((r) -> !r.isClosed.get()).collect(Collectors.counting()); 807 assertEquals("Should not find any open readers", 0, (int) unclosedReaders); 808 } finally { 809 if (reader != null) { 810 reader.close(); 811 } 812 } 813 } 814 815 /** 816 * A proxy around FSDataInputStream which can report if close() was called. 817 */ 818 private static class InputStreamProxy extends FSDataInputStream { 819 private final InputStream real; 820 private final AtomicBoolean isClosed = new AtomicBoolean(false); 821 822 public InputStreamProxy(InputStream real) { 823 super(real); 824 this.real = real; 825 } 826 827 @Override 828 public void close() throws IOException { 829 isClosed.set(true); 830 real.close(); 831 } 832 } 833 834 /** 835 * A custom WALCellCodec in which we can inject failure. 836 */ 837 @SuppressWarnings("unused") 838 private static class BrokenWALCellCodec extends WALCellCodec { 839 static final AtomicBoolean THROW_FAILURE_ON_INIT = new AtomicBoolean(false); 840 841 static void maybeInjectFailure() { 842 if (THROW_FAILURE_ON_INIT.get()) { 843 throw new RuntimeException("Injected instantiation exception"); 844 } 845 } 846 847 public BrokenWALCellCodec() { 848 super(); 849 maybeInjectFailure(); 850 } 851 852 public BrokenWALCellCodec(Configuration conf, CompressionContext compression) { 853 super(conf, compression); 854 maybeInjectFailure(); 855 } 856 } 857}