001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.apache.hadoop.hbase.wal.WALFactory.META_WAL_PROVIDER; 021import static org.apache.hadoop.hbase.wal.WALFactory.WAL_PROVIDER; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertFalse; 024import static org.junit.Assert.assertNotNull; 025import static org.junit.Assert.assertTrue; 026import static org.junit.Assert.fail; 027 028import java.io.IOException; 029import java.lang.reflect.Method; 030import java.net.BindException; 031import java.util.List; 032import java.util.NavigableMap; 033import java.util.TreeMap; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FSDataInputStream; 036import org.apache.hadoop.fs.FSDataOutputStream; 037import org.apache.hadoop.fs.FileStatus; 038import org.apache.hadoop.fs.FileSystem; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.hbase.Cell; 041import org.apache.hadoop.hbase.CellUtil; 042import org.apache.hadoop.hbase.Coprocessor; 043import org.apache.hadoop.hbase.HBaseClassTestRule; 044import org.apache.hadoop.hbase.HBaseTestingUtility; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.KeyValue; 047import org.apache.hadoop.hbase.ServerName; 048import org.apache.hadoop.hbase.TableName; 049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 050import org.apache.hadoop.hbase.client.RegionInfo; 051import org.apache.hadoop.hbase.client.RegionInfoBuilder; 052import org.apache.hadoop.hbase.client.TableDescriptor; 053import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 054import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 055import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor; 056import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 057import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 058import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost; 059import org.apache.hadoop.hbase.testclassification.MediumTests; 060import org.apache.hadoop.hbase.testclassification.RegionServerTests; 061import org.apache.hadoop.hbase.util.Bytes; 062import org.apache.hadoop.hbase.util.CommonFSUtils; 063import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; 064import org.apache.hadoop.hbase.util.Threads; 065import org.apache.hadoop.hbase.wal.WALFactory.Providers; 066import org.apache.hadoop.hdfs.DistributedFileSystem; 067import org.apache.hadoop.hdfs.MiniDFSCluster; 068import org.apache.hadoop.hdfs.protocol.HdfsConstants; 069import org.junit.After; 070import org.junit.AfterClass; 071import org.junit.Before; 072import org.junit.BeforeClass; 073import org.junit.ClassRule; 074import org.junit.Rule; 075import org.junit.Test; 076import org.junit.experimental.categories.Category; 077import org.junit.rules.TestName; 078import org.slf4j.Logger; 079import org.slf4j.LoggerFactory; 080 081/** 082 * WAL tests that can be reused across providers. 083 */ 084@Category({RegionServerTests.class, MediumTests.class}) 085public class TestWALFactory { 086 087 @ClassRule 088 public static final HBaseClassTestRule CLASS_RULE = 089 HBaseClassTestRule.forClass(TestWALFactory.class); 090 091 private static final Logger LOG = LoggerFactory.getLogger(TestWALFactory.class); 092 093 protected static Configuration conf; 094 private static MiniDFSCluster cluster; 095 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 096 protected static Path hbaseDir; 097 protected static Path hbaseWALDir; 098 099 protected FileSystem fs; 100 protected Path dir; 101 protected WALFactory wals; 102 private ServerName currentServername; 103 104 @Rule 105 public final TestName currentTest = new TestName(); 106 107 @Before 108 public void setUp() throws Exception { 109 fs = cluster.getFileSystem(); 110 dir = new Path(hbaseDir, currentTest.getMethodName()); 111 this.currentServername = ServerName.valueOf(currentTest.getMethodName(), 16010, 1); 112 wals = new WALFactory(conf, this.currentServername.toString()); 113 } 114 115 @After 116 public void tearDown() throws Exception { 117 // testAppendClose closes the FileSystem, which will prevent us from closing cleanly here. 118 try { 119 wals.close(); 120 } catch (IOException exception) { 121 LOG.warn("Encountered exception while closing wal factory. If you have other errors, this" + 122 " may be the cause. Message: " + exception); 123 LOG.debug("Exception details for failure to close wal factory.", exception); 124 } 125 FileStatus[] entries = fs.listStatus(new Path("/")); 126 for (FileStatus dir : entries) { 127 fs.delete(dir.getPath(), true); 128 } 129 } 130 131 @BeforeClass 132 public static void setUpBeforeClass() throws Exception { 133 CommonFSUtils.setWALRootDir(TEST_UTIL.getConfiguration(), new Path("file:///tmp/wal")); 134 // Make block sizes small. 135 TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024); 136 // needed for testAppendClose() 137 // quicker heartbeat interval for faster DN death notification 138 TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000); 139 TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1); 140 TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000); 141 142 // faster failover with cluster.shutdown();fs.close() idiom 143 TEST_UTIL.getConfiguration() 144 .setInt("hbase.ipc.client.connect.max.retries", 1); 145 TEST_UTIL.getConfiguration().setInt( 146 "dfs.client.block.recovery.retries", 1); 147 TEST_UTIL.getConfiguration().setInt( 148 "hbase.ipc.client.connection.maxidletime", 500); 149 TEST_UTIL.getConfiguration().setInt("hbase.lease.recovery.timeout", 10000); 150 TEST_UTIL.getConfiguration().setInt("hbase.lease.recovery.dfs.timeout", 1000); 151 TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, 152 SampleRegionWALCoprocessor.class.getName()); 153 TEST_UTIL.startMiniDFSCluster(3); 154 155 conf = TEST_UTIL.getConfiguration(); 156 cluster = TEST_UTIL.getDFSCluster(); 157 158 hbaseDir = TEST_UTIL.createRootDir(); 159 hbaseWALDir = TEST_UTIL.createWALRootDir(); 160 } 161 162 @AfterClass 163 public static void tearDownAfterClass() throws Exception { 164 TEST_UTIL.shutdownMiniCluster(); 165 } 166 167 @Test 168 public void canCloseSingleton() throws IOException { 169 WALFactory.getInstance(conf).close(); 170 } 171 172 /** 173 * Just write multiple logs then split. Before fix for HADOOP-2283, this 174 * would fail. 175 * @throws IOException 176 */ 177 @Test 178 public void testSplit() throws IOException { 179 final TableName tableName = TableName.valueOf(currentTest.getMethodName()); 180 final byte [] rowName = tableName.getName(); 181 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); 182 final int howmany = 3; 183 RegionInfo[] infos = new RegionInfo[3]; 184 Path tableDataDir = CommonFSUtils.getTableDir(hbaseDir, tableName); 185 fs.mkdirs(tableDataDir); 186 Path tabledir = CommonFSUtils.getWALTableDir(conf, tableName); 187 fs.mkdirs(tabledir); 188 for (int i = 0; i < howmany; i++) { 189 infos[i] = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("" + i)) 190 .setEndKey(Bytes.toBytes("" + (i + 1))).build(); 191 fs.mkdirs(new Path(tabledir, infos[i].getEncodedName())); 192 fs.mkdirs(new Path(tableDataDir, infos[i].getEncodedName())); 193 LOG.info("allo " + new Path(tabledir, infos[i].getEncodedName()).toString()); 194 } 195 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 196 scopes.put(Bytes.toBytes("column"), 0); 197 198 199 // Add edits for three regions. 200 for (int ii = 0; ii < howmany; ii++) { 201 for (int i = 0; i < howmany; i++) { 202 final WAL log = 203 wals.getWAL(infos[i]); 204 for (int j = 0; j < howmany; j++) { 205 WALEdit edit = new WALEdit(); 206 byte [] family = Bytes.toBytes("column"); 207 byte [] qualifier = Bytes.toBytes(Integer.toString(j)); 208 byte [] column = Bytes.toBytes("column:" + Integer.toString(j)); 209 edit.add(new KeyValue(rowName, family, qualifier, 210 System.currentTimeMillis(), column)); 211 LOG.info("Region " + i + ": " + edit); 212 WALKeyImpl walKey = new WALKeyImpl(infos[i].getEncodedNameAsBytes(), tableName, 213 System.currentTimeMillis(), mvcc, scopes); 214 log.appendData(infos[i], walKey, edit); 215 walKey.getWriteEntry(); 216 } 217 log.sync(); 218 log.rollWriter(true); 219 } 220 } 221 wals.shutdown(); 222 // The below calculation of logDir relies on insider information... WALSplitter should be connected better 223 // with the WAL system.... not requiring explicit path. The oldLogDir is just made up not used. 224 Path logDir = 225 new Path(new Path(hbaseWALDir, HConstants.HREGION_LOGDIR_NAME), 226 this.currentServername.toString()); 227 Path oldLogDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME); 228 List<Path> splits = WALSplitter.split(hbaseWALDir, logDir, oldLogDir, fs, conf, wals); 229 verifySplits(splits, howmany); 230 } 231 232 /** 233 * Test new HDFS-265 sync. 234 * @throws Exception 235 */ 236 @Test 237 public void Broken_testSync() throws Exception { 238 TableName tableName = TableName.valueOf(currentTest.getMethodName()); 239 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); 240 // First verify that using streams all works. 241 Path p = new Path(dir, currentTest.getMethodName() + ".fsdos"); 242 FSDataOutputStream out = fs.create(p); 243 out.write(tableName.getName()); 244 Method syncMethod = null; 245 try { 246 syncMethod = out.getClass().getMethod("hflush", new Class<?> []{}); 247 } catch (NoSuchMethodException e) { 248 try { 249 syncMethod = out.getClass().getMethod("sync", new Class<?> []{}); 250 } catch (NoSuchMethodException ex) { 251 fail("This version of Hadoop supports neither Syncable.sync() " + 252 "nor Syncable.hflush()."); 253 } 254 } 255 syncMethod.invoke(out, new Object[]{}); 256 FSDataInputStream in = fs.open(p); 257 assertTrue(in.available() > 0); 258 byte [] buffer = new byte [1024]; 259 int read = in.read(buffer); 260 assertEquals(tableName.getName().length, read); 261 out.close(); 262 in.close(); 263 264 final int total = 20; 265 WAL.Reader reader = null; 266 267 try { 268 RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); 269 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 270 scopes.put(tableName.getName(), 0); 271 final WAL wal = wals.getWAL(info); 272 273 for (int i = 0; i < total; i++) { 274 WALEdit kvs = new WALEdit(); 275 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); 276 wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 277 System.currentTimeMillis(), mvcc, scopes), kvs); 278 } 279 // Now call sync and try reading. Opening a Reader before you sync just 280 // gives you EOFE. 281 wal.sync(); 282 // Open a Reader. 283 Path walPath = AbstractFSWALProvider.getCurrentFileName(wal); 284 reader = wals.createReader(fs, walPath); 285 int count = 0; 286 WAL.Entry entry = new WAL.Entry(); 287 while ((entry = reader.next(entry)) != null) count++; 288 assertEquals(total, count); 289 reader.close(); 290 // Add test that checks to see that an open of a Reader works on a file 291 // that has had a sync done on it. 292 for (int i = 0; i < total; i++) { 293 WALEdit kvs = new WALEdit(); 294 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); 295 wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 296 System.currentTimeMillis(), mvcc, scopes), kvs); 297 } 298 wal.sync(); 299 reader = wals.createReader(fs, walPath); 300 count = 0; 301 while((entry = reader.next(entry)) != null) count++; 302 assertTrue(count >= total); 303 reader.close(); 304 // If I sync, should see double the edits. 305 wal.sync(); 306 reader = wals.createReader(fs, walPath); 307 count = 0; 308 while((entry = reader.next(entry)) != null) count++; 309 assertEquals(total * 2, count); 310 reader.close(); 311 // Now do a test that ensures stuff works when we go over block boundary, 312 // especially that we return good length on file. 313 final byte [] value = new byte[1025 * 1024]; // Make a 1M value. 314 for (int i = 0; i < total; i++) { 315 WALEdit kvs = new WALEdit(); 316 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value)); 317 wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 318 System.currentTimeMillis(), mvcc, scopes), kvs); 319 } 320 // Now I should have written out lots of blocks. Sync then read. 321 wal.sync(); 322 reader = wals.createReader(fs, walPath); 323 count = 0; 324 while((entry = reader.next(entry)) != null) count++; 325 assertEquals(total * 3, count); 326 reader.close(); 327 // shutdown and ensure that Reader gets right length also. 328 wal.shutdown(); 329 reader = wals.createReader(fs, walPath); 330 count = 0; 331 while((entry = reader.next(entry)) != null) count++; 332 assertEquals(total * 3, count); 333 reader.close(); 334 } finally { 335 if (reader != null) reader.close(); 336 } 337 } 338 339 private void verifySplits(final List<Path> splits, final int howmany) 340 throws IOException { 341 assertEquals(howmany * howmany, splits.size()); 342 for (int i = 0; i < splits.size(); i++) { 343 LOG.info("Verifying=" + splits.get(i)); 344 WAL.Reader reader = wals.createReader(fs, splits.get(i)); 345 try { 346 int count = 0; 347 String previousRegion = null; 348 long seqno = -1; 349 WAL.Entry entry = new WAL.Entry(); 350 while((entry = reader.next(entry)) != null) { 351 WALKey key = entry.getKey(); 352 String region = Bytes.toString(key.getEncodedRegionName()); 353 // Assert that all edits are for same region. 354 if (previousRegion != null) { 355 assertEquals(previousRegion, region); 356 } 357 LOG.info("oldseqno=" + seqno + ", newseqno=" + key.getSequenceId()); 358 assertTrue(seqno < key.getSequenceId()); 359 seqno = key.getSequenceId(); 360 previousRegion = region; 361 count++; 362 } 363 assertEquals(howmany, count); 364 } finally { 365 reader.close(); 366 } 367 } 368 } 369 370 /* 371 * We pass different values to recoverFileLease() so that different code paths are covered 372 * 373 * For this test to pass, requires: 374 * 1. HDFS-200 (append support) 375 * 2. HDFS-988 (SafeMode should freeze file operations 376 * [FSNamesystem.nextGenerationStampForBlock]) 377 * 3. HDFS-142 (on restart, maintain pendingCreates) 378 */ 379 @Test 380 public void testAppendClose() throws Exception { 381 TableName tableName = 382 TableName.valueOf(currentTest.getMethodName()); 383 RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).build(); 384 385 WAL wal = wals.getWAL(regionInfo); 386 int total = 20; 387 388 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 389 scopes.put(tableName.getName(), 0); 390 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 391 for (int i = 0; i < total; i++) { 392 WALEdit kvs = new WALEdit(); 393 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); 394 wal.appendData(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName, 395 System.currentTimeMillis(), mvcc, scopes), kvs); 396 } 397 // Now call sync to send the data to HDFS datanodes 398 wal.sync(); 399 int namenodePort = cluster.getNameNodePort(); 400 final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal); 401 402 403 // Stop the cluster. (ensure restart since we're sharing MiniDFSCluster) 404 try { 405 DistributedFileSystem dfs = cluster.getFileSystem(); 406 dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER); 407 TEST_UTIL.shutdownMiniDFSCluster(); 408 try { 409 // wal.writer.close() will throw an exception, 410 // but still call this since it closes the LogSyncer thread first 411 wal.shutdown(); 412 } catch (IOException e) { 413 LOG.info(e.toString(), e); 414 } 415 fs.close(); // closing FS last so DFSOutputStream can't call close 416 LOG.info("STOPPED first instance of the cluster"); 417 } finally { 418 // Restart the cluster 419 while (cluster.isClusterUp()){ 420 LOG.error("Waiting for cluster to go down"); 421 Thread.sleep(1000); 422 } 423 assertFalse(cluster.isClusterUp()); 424 cluster = null; 425 for (int i = 0; i < 100; i++) { 426 try { 427 cluster = TEST_UTIL.startMiniDFSClusterForTestWAL(namenodePort); 428 break; 429 } catch (BindException e) { 430 LOG.info("Sleeping. BindException bringing up new cluster"); 431 Threads.sleep(1000); 432 } 433 } 434 cluster.waitActive(); 435 fs = cluster.getFileSystem(); 436 LOG.info("STARTED second instance."); 437 } 438 439 // set the lease period to be 1 second so that the 440 // namenode triggers lease recovery upon append request 441 Method setLeasePeriod = cluster.getClass() 442 .getDeclaredMethod("setLeasePeriod", new Class[]{Long.TYPE, Long.TYPE}); 443 setLeasePeriod.setAccessible(true); 444 setLeasePeriod.invoke(cluster, 1000L, 1000L); 445 try { 446 Thread.sleep(1000); 447 } catch (InterruptedException e) { 448 LOG.info(e.toString(), e); 449 } 450 451 // Now try recovering the log, like the HMaster would do 452 final FileSystem recoveredFs = fs; 453 final Configuration rlConf = conf; 454 455 class RecoverLogThread extends Thread { 456 public Exception exception = null; 457 458 @Override 459 public void run() { 460 try { 461 RecoverLeaseFSUtils.recoverFileLease(recoveredFs, walPath, rlConf, null); 462 } catch (IOException e) { 463 exception = e; 464 } 465 } 466 } 467 468 RecoverLogThread t = new RecoverLogThread(); 469 t.start(); 470 // Timeout after 60 sec. Without correct patches, would be an infinite loop 471 t.join(60 * 1000); 472 if(t.isAlive()) { 473 t.interrupt(); 474 throw new Exception("Timed out waiting for WAL.recoverLog()"); 475 } 476 477 if (t.exception != null) 478 throw t.exception; 479 480 // Make sure you can read all the content 481 WAL.Reader reader = wals.createReader(fs, walPath); 482 int count = 0; 483 WAL.Entry entry = new WAL.Entry(); 484 while (reader.next(entry) != null) { 485 count++; 486 assertTrue("Should be one KeyValue per WALEdit", 487 entry.getEdit().getCells().size() == 1); 488 } 489 assertEquals(total, count); 490 reader.close(); 491 492 // Reset the lease period 493 setLeasePeriod.invoke(cluster, new Object[]{ 60000L, 3600000L }); 494 } 495 496 /** 497 * Tests that we can write out an edit, close, and then read it back in again. 498 */ 499 @Test 500 public void testEditAdd() throws IOException { 501 int colCount = 10; 502 TableDescriptor htd = 503 TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName())) 504 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("column")).build(); 505 NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); 506 for (byte[] fam : htd.getColumnFamilyNames()) { 507 scopes.put(fam, 0); 508 } 509 byte[] row = Bytes.toBytes("row"); 510 WAL.Reader reader = null; 511 try { 512 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); 513 514 // Write columns named 1, 2, 3, etc. and then values of single byte 515 // 1, 2, 3... 516 long timestamp = System.currentTimeMillis(); 517 WALEdit cols = new WALEdit(); 518 for (int i = 0; i < colCount; i++) { 519 cols.add(new KeyValue(row, Bytes.toBytes("column"), 520 Bytes.toBytes(Integer.toString(i)), 521 timestamp, new byte[] { (byte)(i + '0') })); 522 } 523 RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(row) 524 .setEndKey(Bytes.toBytes(Bytes.toString(row) + "1")).build(); 525 final WAL log = wals.getWAL(info); 526 527 final long txid = log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), 528 htd.getTableName(), System.currentTimeMillis(), mvcc, scopes), cols); 529 log.sync(txid); 530 log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getColumnFamilyNames()); 531 log.completeCacheFlush(info.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); 532 log.shutdown(); 533 Path filename = AbstractFSWALProvider.getCurrentFileName(log); 534 // Now open a reader on the log and assert append worked. 535 reader = wals.createReader(fs, filename); 536 // Above we added all columns on a single row so we only read one 537 // entry in the below... thats why we have '1'. 538 for (int i = 0; i < 1; i++) { 539 WAL.Entry entry = reader.next(null); 540 if (entry == null) break; 541 WALKey key = entry.getKey(); 542 WALEdit val = entry.getEdit(); 543 assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName())); 544 assertTrue(htd.getTableName().equals(key.getTableName())); 545 Cell cell = val.getCells().get(0); 546 assertTrue(Bytes.equals(row, 0, row.length, cell.getRowArray(), cell.getRowOffset(), 547 cell.getRowLength())); 548 assertEquals((byte)(i + '0'), CellUtil.cloneValue(cell)[0]); 549 System.out.println(key + " " + val); 550 } 551 } finally { 552 if (reader != null) { 553 reader.close(); 554 } 555 } 556 } 557 558 @Test 559 public void testAppend() throws IOException { 560 int colCount = 10; 561 TableDescriptor htd = 562 TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName())) 563 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("column")).build(); 564 NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); 565 for (byte[] fam : htd.getColumnFamilyNames()) { 566 scopes.put(fam, 0); 567 } 568 byte[] row = Bytes.toBytes("row"); 569 WAL.Reader reader = null; 570 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); 571 try { 572 // Write columns named 1, 2, 3, etc. and then values of single byte 573 // 1, 2, 3... 574 long timestamp = System.currentTimeMillis(); 575 WALEdit cols = new WALEdit(); 576 for (int i = 0; i < colCount; i++) { 577 cols.add(new KeyValue(row, Bytes.toBytes("column"), 578 Bytes.toBytes(Integer.toString(i)), 579 timestamp, new byte[] { (byte)(i + '0') })); 580 } 581 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 582 final WAL log = wals.getWAL(hri); 583 final long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), 584 htd.getTableName(), System.currentTimeMillis(), mvcc, scopes), cols); 585 log.sync(txid); 586 log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames()); 587 log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); 588 log.shutdown(); 589 Path filename = AbstractFSWALProvider.getCurrentFileName(log); 590 // Now open a reader on the log and assert append worked. 591 reader = wals.createReader(fs, filename); 592 WAL.Entry entry = reader.next(); 593 assertEquals(colCount, entry.getEdit().size()); 594 int idx = 0; 595 for (Cell val : entry.getEdit().getCells()) { 596 assertTrue(Bytes.equals(hri.getEncodedNameAsBytes(), 597 entry.getKey().getEncodedRegionName())); 598 assertTrue(htd.getTableName().equals(entry.getKey().getTableName())); 599 assertTrue(Bytes.equals(row, 0, row.length, val.getRowArray(), val.getRowOffset(), 600 val.getRowLength())); 601 assertEquals((byte) (idx + '0'), CellUtil.cloneValue(val)[0]); 602 System.out.println(entry.getKey() + " " + val); 603 idx++; 604 } 605 } finally { 606 if (reader != null) { 607 reader.close(); 608 } 609 } 610 } 611 612 /** 613 * Test that we can visit entries before they are appended 614 * @throws Exception 615 */ 616 @Test 617 public void testVisitors() throws Exception { 618 final int COL_COUNT = 10; 619 final TableName tableName = TableName.valueOf(currentTest.getMethodName()); 620 final byte [] row = Bytes.toBytes("row"); 621 final DumbWALActionsListener visitor = new DumbWALActionsListener(); 622 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); 623 long timestamp = System.currentTimeMillis(); 624 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 625 scopes.put(Bytes.toBytes("column"), 0); 626 627 RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).build(); 628 final WAL log = wals.getWAL(hri); 629 log.registerWALActionsListener(visitor); 630 for (int i = 0; i < COL_COUNT; i++) { 631 WALEdit cols = new WALEdit(); 632 cols.add(new KeyValue(row, Bytes.toBytes("column"), 633 Bytes.toBytes(Integer.toString(i)), 634 timestamp, new byte[]{(byte) (i + '0')})); 635 log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, 636 System.currentTimeMillis(), mvcc, scopes), cols); 637 } 638 log.sync(); 639 assertEquals(COL_COUNT, visitor.increments); 640 log.unregisterWALActionsListener(visitor); 641 WALEdit cols = new WALEdit(); 642 cols.add(new KeyValue(row, Bytes.toBytes("column"), 643 Bytes.toBytes(Integer.toString(11)), 644 timestamp, new byte[]{(byte) (11 + '0')})); 645 log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, 646 System.currentTimeMillis(), mvcc, scopes), cols); 647 log.sync(); 648 assertEquals(COL_COUNT, visitor.increments); 649 } 650 651 /** 652 * A loaded WAL coprocessor won't break existing WAL test cases. 653 */ 654 @Test 655 public void testWALCoprocessorLoaded() throws Exception { 656 // test to see whether the coprocessor is loaded or not. 657 WALCoprocessorHost host = wals.getWAL(null).getCoprocessorHost(); 658 Coprocessor c = host.findCoprocessor(SampleRegionWALCoprocessor.class); 659 assertNotNull(c); 660 } 661 662 static class DumbWALActionsListener implements WALActionsListener { 663 int increments = 0; 664 665 @Override 666 public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) { 667 increments++; 668 } 669 670 @Override 671 public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) { 672 // To change body of implemented methods use File | Settings | File 673 // Templates. 674 increments++; 675 } 676 } 677 678 @Test 679 public void testWALProviders() throws IOException { 680 Configuration conf = new Configuration(); 681 WALFactory walFactory = new WALFactory(conf, this.currentServername.toString()); 682 assertEquals(walFactory.getWALProvider().getClass(), walFactory.getMetaProvider().getClass()); 683 } 684 685 @Test 686 public void testOnlySetWALProvider() throws IOException { 687 Configuration conf = new Configuration(); 688 conf.set(WAL_PROVIDER, WALFactory.Providers.multiwal.name()); 689 WALFactory walFactory = new WALFactory(conf, this.currentServername.toString()); 690 691 assertEquals(WALFactory.Providers.multiwal.clazz, walFactory.getWALProvider().getClass()); 692 assertEquals(WALFactory.Providers.multiwal.clazz, walFactory.getMetaProvider().getClass()); 693 } 694 695 @Test 696 public void testOnlySetMetaWALProvider() throws IOException { 697 Configuration conf = new Configuration(); 698 conf.set(META_WAL_PROVIDER, WALFactory.Providers.asyncfs.name()); 699 WALFactory walFactory = new WALFactory(conf, this.currentServername.toString()); 700 701 assertEquals(WALFactory.Providers.defaultProvider.clazz, 702 walFactory.getWALProvider().getClass()); 703 assertEquals(WALFactory.Providers.asyncfs.clazz, walFactory.getMetaProvider().getClass()); 704 } 705 706 @Test 707 public void testDefaultProvider() throws IOException { 708 final Configuration conf = new Configuration(); 709 // AsyncFSWal is the default, we should be able to request any WAL. 710 final WALFactory normalWalFactory = new WALFactory(conf, this.currentServername.toString()); 711 Class<? extends WALProvider> fshLogProvider = normalWalFactory.getProviderClass( 712 WALFactory.WAL_PROVIDER, Providers.filesystem.name()); 713 assertEquals(Providers.filesystem.clazz, fshLogProvider); 714 715 // Imagine a world where MultiWAL is the default 716 final WALFactory customizedWalFactory = new WALFactory( 717 conf, this.currentServername.toString()) { 718 @Override 719 Providers getDefaultProvider() { 720 return Providers.multiwal; 721 } 722 }; 723 // If we don't specify a WALProvider, we should get the default implementation. 724 Class<? extends WALProvider> multiwalProviderClass = customizedWalFactory.getProviderClass( 725 WALFactory.WAL_PROVIDER, Providers.multiwal.name()); 726 assertEquals(Providers.multiwal.clazz, multiwalProviderClass); 727 } 728 729 @Test 730 public void testCustomProvider() throws IOException { 731 final Configuration config = new Configuration(); 732 config.set(WALFactory.WAL_PROVIDER, IOTestProvider.class.getName()); 733 final WALFactory walFactory = new WALFactory(config, this.currentServername.toString()); 734 Class<? extends WALProvider> walProvider = walFactory.getProviderClass( 735 WALFactory.WAL_PROVIDER, Providers.filesystem.name()); 736 assertEquals(IOTestProvider.class, walProvider); 737 WALProvider metaWALProvider = walFactory.getMetaProvider(); 738 assertEquals(IOTestProvider.class, metaWALProvider.getClass()); 739 } 740 741 @Test 742 public void testCustomMetaProvider() throws IOException { 743 final Configuration config = new Configuration(); 744 config.set(WALFactory.META_WAL_PROVIDER, IOTestProvider.class.getName()); 745 final WALFactory walFactory = new WALFactory(config, this.currentServername.toString()); 746 Class<? extends WALProvider> walProvider = walFactory.getProviderClass( 747 WALFactory.WAL_PROVIDER, Providers.filesystem.name()); 748 assertEquals(Providers.filesystem.clazz, walProvider); 749 WALProvider metaWALProvider = walFactory.getMetaProvider(); 750 assertEquals(IOTestProvider.class, metaWALProvider.getClass()); 751 } 752}