001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.apache.hadoop.hbase.wal.WALFactory.META_WAL_PROVIDER; 021import static org.apache.hadoop.hbase.wal.WALFactory.WAL_PROVIDER; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertFalse; 024import static org.junit.Assert.assertNotNull; 025import static org.junit.Assert.assertTrue; 026import static org.junit.Assert.fail; 027 028import java.io.IOException; 029import java.lang.reflect.Method; 030import java.net.BindException; 031import java.util.List; 032import java.util.NavigableMap; 033import java.util.TreeMap; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FSDataInputStream; 036import org.apache.hadoop.fs.FSDataOutputStream; 037import org.apache.hadoop.fs.FileStatus; 038import org.apache.hadoop.fs.FileSystem; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.hbase.Cell; 041import org.apache.hadoop.hbase.CellUtil; 042import org.apache.hadoop.hbase.Coprocessor; 043import org.apache.hadoop.hbase.HBaseClassTestRule; 044import org.apache.hadoop.hbase.HBaseTestingUtility; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.KeyValue; 047import org.apache.hadoop.hbase.ServerName; 048import org.apache.hadoop.hbase.TableName; 049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 050import org.apache.hadoop.hbase.client.RegionInfo; 051import org.apache.hadoop.hbase.client.RegionInfoBuilder; 052import org.apache.hadoop.hbase.client.TableDescriptor; 053import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 054import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 055import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor; 056import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 057import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 058import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost; 059import org.apache.hadoop.hbase.testclassification.MediumTests; 060import org.apache.hadoop.hbase.testclassification.RegionServerTests; 061import org.apache.hadoop.hbase.util.Bytes; 062import org.apache.hadoop.hbase.util.CommonFSUtils; 063import org.apache.hadoop.hbase.util.FSUtils; 064import org.apache.hadoop.hbase.util.Threads; 065import org.apache.hadoop.hbase.wal.WALFactory.Providers; 066import org.apache.hadoop.hdfs.DistributedFileSystem; 067import org.apache.hadoop.hdfs.MiniDFSCluster; 068import org.apache.hadoop.hdfs.protocol.HdfsConstants; 069import org.junit.After; 070import org.junit.AfterClass; 071import org.junit.Before; 072import org.junit.BeforeClass; 073import org.junit.ClassRule; 074import org.junit.Rule; 075import org.junit.Test; 076import org.junit.experimental.categories.Category; 077import org.junit.rules.TestName; 078import org.slf4j.Logger; 079import org.slf4j.LoggerFactory; 080 081/** 082 * WAL tests that can be reused across providers. 083 */ 084@Category({RegionServerTests.class, MediumTests.class}) 085public class TestWALFactory { 086 087 @ClassRule 088 public static final HBaseClassTestRule CLASS_RULE = 089 HBaseClassTestRule.forClass(TestWALFactory.class); 090 091 private static final Logger LOG = LoggerFactory.getLogger(TestWALFactory.class); 092 093 protected static Configuration conf; 094 private static MiniDFSCluster cluster; 095 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 096 protected static Path hbaseDir; 097 protected static Path hbaseWALDir; 098 099 protected FileSystem fs; 100 protected Path dir; 101 protected WALFactory wals; 102 private ServerName currentServername; 103 104 @Rule 105 public final TestName currentTest = new TestName(); 106 107 @Before 108 public void setUp() throws Exception { 109 fs = cluster.getFileSystem(); 110 dir = new Path(hbaseDir, currentTest.getMethodName()); 111 this.currentServername = ServerName.valueOf(currentTest.getMethodName(), 16010, 1); 112 wals = new WALFactory(conf, this.currentServername.toString()); 113 } 114 115 @After 116 public void tearDown() throws Exception { 117 // testAppendClose closes the FileSystem, which will prevent us from closing cleanly here. 118 try { 119 wals.close(); 120 } catch (IOException exception) { 121 LOG.warn("Encountered exception while closing wal factory. If you have other errors, this" + 122 " may be the cause. Message: " + exception); 123 LOG.debug("Exception details for failure to close wal factory.", exception); 124 } 125 FileStatus[] entries = fs.listStatus(new Path("/")); 126 for (FileStatus dir : entries) { 127 fs.delete(dir.getPath(), true); 128 } 129 } 130 131 @BeforeClass 132 public static void setUpBeforeClass() throws Exception { 133 CommonFSUtils.setWALRootDir(TEST_UTIL.getConfiguration(), new Path("file:///tmp/wal")); 134 // Make block sizes small. 135 TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024); 136 // needed for testAppendClose() 137 // quicker heartbeat interval for faster DN death notification 138 TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000); 139 TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1); 140 TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000); 141 142 // faster failover with cluster.shutdown();fs.close() idiom 143 TEST_UTIL.getConfiguration() 144 .setInt("hbase.ipc.client.connect.max.retries", 1); 145 TEST_UTIL.getConfiguration().setInt( 146 "dfs.client.block.recovery.retries", 1); 147 TEST_UTIL.getConfiguration().setInt( 148 "hbase.ipc.client.connection.maxidletime", 500); 149 TEST_UTIL.getConfiguration().setInt("hbase.lease.recovery.timeout", 10000); 150 TEST_UTIL.getConfiguration().setInt("hbase.lease.recovery.dfs.timeout", 1000); 151 TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, 152 SampleRegionWALCoprocessor.class.getName()); 153 TEST_UTIL.startMiniDFSCluster(3); 154 155 conf = TEST_UTIL.getConfiguration(); 156 cluster = TEST_UTIL.getDFSCluster(); 157 158 hbaseDir = TEST_UTIL.createRootDir(); 159 hbaseWALDir = TEST_UTIL.createWALRootDir(); 160 } 161 162 @AfterClass 163 public static void tearDownAfterClass() throws Exception { 164 TEST_UTIL.shutdownMiniCluster(); 165 } 166 167 @Test 168 public void canCloseSingleton() throws IOException { 169 WALFactory.getInstance(conf).close(); 170 } 171 172 /** 173 * Just write multiple logs then split. Before fix for HADOOP-2283, this 174 * would fail. 175 * @throws IOException 176 */ 177 @Test 178 public void testSplit() throws IOException { 179 final TableName tableName = TableName.valueOf(currentTest.getMethodName()); 180 final byte [] rowName = tableName.getName(); 181 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); 182 final int howmany = 3; 183 RegionInfo[] infos = new RegionInfo[3]; 184 Path tabledir = FSUtils.getWALTableDir(conf, tableName); 185 fs.mkdirs(tabledir); 186 for (int i = 0; i < howmany; i++) { 187 infos[i] = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("" + i)) 188 .setEndKey(Bytes.toBytes("" + (i + 1))).build(); 189 fs.mkdirs(new Path(tabledir, infos[i].getEncodedName())); 190 LOG.info("allo " + new Path(tabledir, infos[i].getEncodedName()).toString()); 191 } 192 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 193 scopes.put(Bytes.toBytes("column"), 0); 194 195 196 // Add edits for three regions. 197 for (int ii = 0; ii < howmany; ii++) { 198 for (int i = 0; i < howmany; i++) { 199 final WAL log = 200 wals.getWAL(infos[i]); 201 for (int j = 0; j < howmany; j++) { 202 WALEdit edit = new WALEdit(); 203 byte [] family = Bytes.toBytes("column"); 204 byte [] qualifier = Bytes.toBytes(Integer.toString(j)); 205 byte [] column = Bytes.toBytes("column:" + Integer.toString(j)); 206 edit.add(new KeyValue(rowName, family, qualifier, 207 System.currentTimeMillis(), column)); 208 LOG.info("Region " + i + ": " + edit); 209 WALKeyImpl walKey = new WALKeyImpl(infos[i].getEncodedNameAsBytes(), tableName, 210 System.currentTimeMillis(), mvcc, scopes); 211 log.appendData(infos[i], walKey, edit); 212 walKey.getWriteEntry(); 213 } 214 log.sync(); 215 log.rollWriter(true); 216 } 217 } 218 wals.shutdown(); 219 // The below calculation of logDir relies on insider information... WALSplitter should be connected better 220 // with the WAL system.... not requiring explicit path. The oldLogDir is just made up not used. 221 Path logDir = 222 new Path(new Path(hbaseWALDir, HConstants.HREGION_LOGDIR_NAME), 223 this.currentServername.toString()); 224 Path oldLogDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME); 225 List<Path> splits = WALSplitter.split(hbaseWALDir, logDir, oldLogDir, fs, conf, wals); 226 verifySplits(splits, howmany); 227 } 228 229 /** 230 * Test new HDFS-265 sync. 231 * @throws Exception 232 */ 233 @Test 234 public void Broken_testSync() throws Exception { 235 TableName tableName = TableName.valueOf(currentTest.getMethodName()); 236 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); 237 // First verify that using streams all works. 238 Path p = new Path(dir, currentTest.getMethodName() + ".fsdos"); 239 FSDataOutputStream out = fs.create(p); 240 out.write(tableName.getName()); 241 Method syncMethod = null; 242 try { 243 syncMethod = out.getClass().getMethod("hflush", new Class<?> []{}); 244 } catch (NoSuchMethodException e) { 245 try { 246 syncMethod = out.getClass().getMethod("sync", new Class<?> []{}); 247 } catch (NoSuchMethodException ex) { 248 fail("This version of Hadoop supports neither Syncable.sync() " + 249 "nor Syncable.hflush()."); 250 } 251 } 252 syncMethod.invoke(out, new Object[]{}); 253 FSDataInputStream in = fs.open(p); 254 assertTrue(in.available() > 0); 255 byte [] buffer = new byte [1024]; 256 int read = in.read(buffer); 257 assertEquals(tableName.getName().length, read); 258 out.close(); 259 in.close(); 260 261 final int total = 20; 262 WAL.Reader reader = null; 263 264 try { 265 RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); 266 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 267 scopes.put(tableName.getName(), 0); 268 final WAL wal = wals.getWAL(info); 269 270 for (int i = 0; i < total; i++) { 271 WALEdit kvs = new WALEdit(); 272 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); 273 wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 274 System.currentTimeMillis(), mvcc, scopes), kvs); 275 } 276 // Now call sync and try reading. Opening a Reader before you sync just 277 // gives you EOFE. 278 wal.sync(); 279 // Open a Reader. 280 Path walPath = AbstractFSWALProvider.getCurrentFileName(wal); 281 reader = wals.createReader(fs, walPath); 282 int count = 0; 283 WAL.Entry entry = new WAL.Entry(); 284 while ((entry = reader.next(entry)) != null) count++; 285 assertEquals(total, count); 286 reader.close(); 287 // Add test that checks to see that an open of a Reader works on a file 288 // that has had a sync done on it. 289 for (int i = 0; i < total; i++) { 290 WALEdit kvs = new WALEdit(); 291 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); 292 wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 293 System.currentTimeMillis(), mvcc, scopes), kvs); 294 } 295 wal.sync(); 296 reader = wals.createReader(fs, walPath); 297 count = 0; 298 while((entry = reader.next(entry)) != null) count++; 299 assertTrue(count >= total); 300 reader.close(); 301 // If I sync, should see double the edits. 302 wal.sync(); 303 reader = wals.createReader(fs, walPath); 304 count = 0; 305 while((entry = reader.next(entry)) != null) count++; 306 assertEquals(total * 2, count); 307 reader.close(); 308 // Now do a test that ensures stuff works when we go over block boundary, 309 // especially that we return good length on file. 310 final byte [] value = new byte[1025 * 1024]; // Make a 1M value. 311 for (int i = 0; i < total; i++) { 312 WALEdit kvs = new WALEdit(); 313 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value)); 314 wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 315 System.currentTimeMillis(), mvcc, scopes), kvs); 316 } 317 // Now I should have written out lots of blocks. Sync then read. 318 wal.sync(); 319 reader = wals.createReader(fs, walPath); 320 count = 0; 321 while((entry = reader.next(entry)) != null) count++; 322 assertEquals(total * 3, count); 323 reader.close(); 324 // shutdown and ensure that Reader gets right length also. 325 wal.shutdown(); 326 reader = wals.createReader(fs, walPath); 327 count = 0; 328 while((entry = reader.next(entry)) != null) count++; 329 assertEquals(total * 3, count); 330 reader.close(); 331 } finally { 332 if (reader != null) reader.close(); 333 } 334 } 335 336 private void verifySplits(final List<Path> splits, final int howmany) 337 throws IOException { 338 assertEquals(howmany * howmany, splits.size()); 339 for (int i = 0; i < splits.size(); i++) { 340 LOG.info("Verifying=" + splits.get(i)); 341 WAL.Reader reader = wals.createReader(fs, splits.get(i)); 342 try { 343 int count = 0; 344 String previousRegion = null; 345 long seqno = -1; 346 WAL.Entry entry = new WAL.Entry(); 347 while((entry = reader.next(entry)) != null) { 348 WALKey key = entry.getKey(); 349 String region = Bytes.toString(key.getEncodedRegionName()); 350 // Assert that all edits are for same region. 351 if (previousRegion != null) { 352 assertEquals(previousRegion, region); 353 } 354 LOG.info("oldseqno=" + seqno + ", newseqno=" + key.getSequenceId()); 355 assertTrue(seqno < key.getSequenceId()); 356 seqno = key.getSequenceId(); 357 previousRegion = region; 358 count++; 359 } 360 assertEquals(howmany, count); 361 } finally { 362 reader.close(); 363 } 364 } 365 } 366 367 /* 368 * We pass different values to recoverFileLease() so that different code paths are covered 369 * 370 * For this test to pass, requires: 371 * 1. HDFS-200 (append support) 372 * 2. HDFS-988 (SafeMode should freeze file operations 373 * [FSNamesystem.nextGenerationStampForBlock]) 374 * 3. HDFS-142 (on restart, maintain pendingCreates) 375 */ 376 @Test 377 public void testAppendClose() throws Exception { 378 TableName tableName = 379 TableName.valueOf(currentTest.getMethodName()); 380 RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).build(); 381 382 WAL wal = wals.getWAL(regionInfo); 383 int total = 20; 384 385 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 386 scopes.put(tableName.getName(), 0); 387 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 388 for (int i = 0; i < total; i++) { 389 WALEdit kvs = new WALEdit(); 390 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); 391 wal.appendData(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName, 392 System.currentTimeMillis(), mvcc, scopes), kvs); 393 } 394 // Now call sync to send the data to HDFS datanodes 395 wal.sync(); 396 int namenodePort = cluster.getNameNodePort(); 397 final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal); 398 399 400 // Stop the cluster. (ensure restart since we're sharing MiniDFSCluster) 401 try { 402 DistributedFileSystem dfs = cluster.getFileSystem(); 403 dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER); 404 TEST_UTIL.shutdownMiniDFSCluster(); 405 try { 406 // wal.writer.close() will throw an exception, 407 // but still call this since it closes the LogSyncer thread first 408 wal.shutdown(); 409 } catch (IOException e) { 410 LOG.info(e.toString(), e); 411 } 412 fs.close(); // closing FS last so DFSOutputStream can't call close 413 LOG.info("STOPPED first instance of the cluster"); 414 } finally { 415 // Restart the cluster 416 while (cluster.isClusterUp()){ 417 LOG.error("Waiting for cluster to go down"); 418 Thread.sleep(1000); 419 } 420 assertFalse(cluster.isClusterUp()); 421 cluster = null; 422 for (int i = 0; i < 100; i++) { 423 try { 424 cluster = TEST_UTIL.startMiniDFSClusterForTestWAL(namenodePort); 425 break; 426 } catch (BindException e) { 427 LOG.info("Sleeping. BindException bringing up new cluster"); 428 Threads.sleep(1000); 429 } 430 } 431 cluster.waitActive(); 432 fs = cluster.getFileSystem(); 433 LOG.info("STARTED second instance."); 434 } 435 436 // set the lease period to be 1 second so that the 437 // namenode triggers lease recovery upon append request 438 Method setLeasePeriod = cluster.getClass() 439 .getDeclaredMethod("setLeasePeriod", new Class[]{Long.TYPE, Long.TYPE}); 440 setLeasePeriod.setAccessible(true); 441 setLeasePeriod.invoke(cluster, 1000L, 1000L); 442 try { 443 Thread.sleep(1000); 444 } catch (InterruptedException e) { 445 LOG.info(e.toString(), e); 446 } 447 448 // Now try recovering the log, like the HMaster would do 449 final FileSystem recoveredFs = fs; 450 final Configuration rlConf = conf; 451 452 class RecoverLogThread extends Thread { 453 public Exception exception = null; 454 @Override 455 public void run() { 456 try { 457 FSUtils.getInstance(fs, rlConf) 458 .recoverFileLease(recoveredFs, walPath, rlConf, null); 459 } catch (IOException e) { 460 exception = e; 461 } 462 } 463 } 464 465 RecoverLogThread t = new RecoverLogThread(); 466 t.start(); 467 // Timeout after 60 sec. Without correct patches, would be an infinite loop 468 t.join(60 * 1000); 469 if(t.isAlive()) { 470 t.interrupt(); 471 throw new Exception("Timed out waiting for WAL.recoverLog()"); 472 } 473 474 if (t.exception != null) 475 throw t.exception; 476 477 // Make sure you can read all the content 478 WAL.Reader reader = wals.createReader(fs, walPath); 479 int count = 0; 480 WAL.Entry entry = new WAL.Entry(); 481 while (reader.next(entry) != null) { 482 count++; 483 assertTrue("Should be one KeyValue per WALEdit", 484 entry.getEdit().getCells().size() == 1); 485 } 486 assertEquals(total, count); 487 reader.close(); 488 489 // Reset the lease period 490 setLeasePeriod.invoke(cluster, new Object[]{ 60000L, 3600000L }); 491 } 492 493 /** 494 * Tests that we can write out an edit, close, and then read it back in again. 495 */ 496 @Test 497 public void testEditAdd() throws IOException { 498 int colCount = 10; 499 TableDescriptor htd = 500 TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName())) 501 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("column")).build(); 502 NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); 503 for (byte[] fam : htd.getColumnFamilyNames()) { 504 scopes.put(fam, 0); 505 } 506 byte[] row = Bytes.toBytes("row"); 507 WAL.Reader reader = null; 508 try { 509 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); 510 511 // Write columns named 1, 2, 3, etc. and then values of single byte 512 // 1, 2, 3... 513 long timestamp = System.currentTimeMillis(); 514 WALEdit cols = new WALEdit(); 515 for (int i = 0; i < colCount; i++) { 516 cols.add(new KeyValue(row, Bytes.toBytes("column"), 517 Bytes.toBytes(Integer.toString(i)), 518 timestamp, new byte[] { (byte)(i + '0') })); 519 } 520 RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(row) 521 .setEndKey(Bytes.toBytes(Bytes.toString(row) + "1")).build(); 522 final WAL log = wals.getWAL(info); 523 524 final long txid = log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), 525 htd.getTableName(), System.currentTimeMillis(), mvcc, scopes), cols); 526 log.sync(txid); 527 log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getColumnFamilyNames()); 528 log.completeCacheFlush(info.getEncodedNameAsBytes()); 529 log.shutdown(); 530 Path filename = AbstractFSWALProvider.getCurrentFileName(log); 531 // Now open a reader on the log and assert append worked. 532 reader = wals.createReader(fs, filename); 533 // Above we added all columns on a single row so we only read one 534 // entry in the below... thats why we have '1'. 535 for (int i = 0; i < 1; i++) { 536 WAL.Entry entry = reader.next(null); 537 if (entry == null) break; 538 WALKey key = entry.getKey(); 539 WALEdit val = entry.getEdit(); 540 assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName())); 541 assertTrue(htd.getTableName().equals(key.getTableName())); 542 Cell cell = val.getCells().get(0); 543 assertTrue(Bytes.equals(row, 0, row.length, cell.getRowArray(), cell.getRowOffset(), 544 cell.getRowLength())); 545 assertEquals((byte)(i + '0'), CellUtil.cloneValue(cell)[0]); 546 System.out.println(key + " " + val); 547 } 548 } finally { 549 if (reader != null) { 550 reader.close(); 551 } 552 } 553 } 554 555 @Test 556 public void testAppend() throws IOException { 557 int colCount = 10; 558 TableDescriptor htd = 559 TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName())) 560 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("column")).build(); 561 NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); 562 for (byte[] fam : htd.getColumnFamilyNames()) { 563 scopes.put(fam, 0); 564 } 565 byte[] row = Bytes.toBytes("row"); 566 WAL.Reader reader = null; 567 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); 568 try { 569 // Write columns named 1, 2, 3, etc. and then values of single byte 570 // 1, 2, 3... 571 long timestamp = System.currentTimeMillis(); 572 WALEdit cols = new WALEdit(); 573 for (int i = 0; i < colCount; i++) { 574 cols.add(new KeyValue(row, Bytes.toBytes("column"), 575 Bytes.toBytes(Integer.toString(i)), 576 timestamp, new byte[] { (byte)(i + '0') })); 577 } 578 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 579 final WAL log = wals.getWAL(hri); 580 final long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), 581 htd.getTableName(), System.currentTimeMillis(), mvcc, scopes), cols); 582 log.sync(txid); 583 log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames()); 584 log.completeCacheFlush(hri.getEncodedNameAsBytes()); 585 log.shutdown(); 586 Path filename = AbstractFSWALProvider.getCurrentFileName(log); 587 // Now open a reader on the log and assert append worked. 588 reader = wals.createReader(fs, filename); 589 WAL.Entry entry = reader.next(); 590 assertEquals(colCount, entry.getEdit().size()); 591 int idx = 0; 592 for (Cell val : entry.getEdit().getCells()) { 593 assertTrue(Bytes.equals(hri.getEncodedNameAsBytes(), 594 entry.getKey().getEncodedRegionName())); 595 assertTrue(htd.getTableName().equals(entry.getKey().getTableName())); 596 assertTrue(Bytes.equals(row, 0, row.length, val.getRowArray(), val.getRowOffset(), 597 val.getRowLength())); 598 assertEquals((byte) (idx + '0'), CellUtil.cloneValue(val)[0]); 599 System.out.println(entry.getKey() + " " + val); 600 idx++; 601 } 602 } finally { 603 if (reader != null) { 604 reader.close(); 605 } 606 } 607 } 608 609 /** 610 * Test that we can visit entries before they are appended 611 * @throws Exception 612 */ 613 @Test 614 public void testVisitors() throws Exception { 615 final int COL_COUNT = 10; 616 final TableName tableName = TableName.valueOf(currentTest.getMethodName()); 617 final byte [] row = Bytes.toBytes("row"); 618 final DumbWALActionsListener visitor = new DumbWALActionsListener(); 619 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); 620 long timestamp = System.currentTimeMillis(); 621 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 622 scopes.put(Bytes.toBytes("column"), 0); 623 624 RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).build(); 625 final WAL log = wals.getWAL(hri); 626 log.registerWALActionsListener(visitor); 627 for (int i = 0; i < COL_COUNT; i++) { 628 WALEdit cols = new WALEdit(); 629 cols.add(new KeyValue(row, Bytes.toBytes("column"), 630 Bytes.toBytes(Integer.toString(i)), 631 timestamp, new byte[]{(byte) (i + '0')})); 632 log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, 633 System.currentTimeMillis(), mvcc, scopes), cols); 634 } 635 log.sync(); 636 assertEquals(COL_COUNT, visitor.increments); 637 log.unregisterWALActionsListener(visitor); 638 WALEdit cols = new WALEdit(); 639 cols.add(new KeyValue(row, Bytes.toBytes("column"), 640 Bytes.toBytes(Integer.toString(11)), 641 timestamp, new byte[]{(byte) (11 + '0')})); 642 log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, 643 System.currentTimeMillis(), mvcc, scopes), cols); 644 log.sync(); 645 assertEquals(COL_COUNT, visitor.increments); 646 } 647 648 /** 649 * A loaded WAL coprocessor won't break existing WAL test cases. 650 */ 651 @Test 652 public void testWALCoprocessorLoaded() throws Exception { 653 // test to see whether the coprocessor is loaded or not. 654 WALCoprocessorHost host = wals.getWAL(null).getCoprocessorHost(); 655 Coprocessor c = host.findCoprocessor(SampleRegionWALCoprocessor.class); 656 assertNotNull(c); 657 } 658 659 static class DumbWALActionsListener implements WALActionsListener { 660 int increments = 0; 661 662 @Override 663 public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) { 664 increments++; 665 } 666 667 @Override 668 public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) { 669 // To change body of implemented methods use File | Settings | File 670 // Templates. 671 increments++; 672 } 673 } 674 675 @Test 676 public void testWALProviders() throws IOException { 677 Configuration conf = new Configuration(); 678 WALFactory walFactory = new WALFactory(conf, this.currentServername.toString()); 679 assertEquals(walFactory.getWALProvider().getClass(), walFactory.getMetaProvider().getClass()); 680 } 681 682 @Test 683 public void testOnlySetWALProvider() throws IOException { 684 Configuration conf = new Configuration(); 685 conf.set(WAL_PROVIDER, WALFactory.Providers.multiwal.name()); 686 WALFactory walFactory = new WALFactory(conf, this.currentServername.toString()); 687 688 assertEquals(WALFactory.Providers.multiwal.clazz, walFactory.getWALProvider().getClass()); 689 assertEquals(WALFactory.Providers.multiwal.clazz, walFactory.getMetaProvider().getClass()); 690 } 691 692 @Test 693 public void testOnlySetMetaWALProvider() throws IOException { 694 Configuration conf = new Configuration(); 695 conf.set(META_WAL_PROVIDER, WALFactory.Providers.asyncfs.name()); 696 WALFactory walFactory = new WALFactory(conf, this.currentServername.toString()); 697 698 assertEquals(WALFactory.Providers.defaultProvider.clazz, 699 walFactory.getWALProvider().getClass()); 700 assertEquals(WALFactory.Providers.asyncfs.clazz, walFactory.getMetaProvider().getClass()); 701 } 702 703 @Test 704 public void testDefaultProvider() throws IOException { 705 final Configuration conf = new Configuration(); 706 // AsyncFSWal is the default, we should be able to request any WAL. 707 final WALFactory normalWalFactory = new WALFactory(conf, this.currentServername.toString()); 708 Class<? extends WALProvider> fshLogProvider = normalWalFactory.getProviderClass( 709 WALFactory.WAL_PROVIDER, Providers.filesystem.name()); 710 assertEquals(Providers.filesystem.clazz, fshLogProvider); 711 712 // Imagine a world where MultiWAL is the default 713 final WALFactory customizedWalFactory = new WALFactory( 714 conf, this.currentServername.toString()) { 715 @Override 716 Providers getDefaultProvider() { 717 return Providers.multiwal; 718 } 719 }; 720 // If we don't specify a WALProvider, we should get the default implementation. 721 Class<? extends WALProvider> multiwalProviderClass = customizedWalFactory.getProviderClass( 722 WALFactory.WAL_PROVIDER, Providers.multiwal.name()); 723 assertEquals(Providers.multiwal.clazz, multiwalProviderClass); 724 } 725 726 @Test 727 public void testCustomProvider() throws IOException { 728 final Configuration config = new Configuration(); 729 config.set(WALFactory.WAL_PROVIDER, IOTestProvider.class.getName()); 730 final WALFactory walFactory = new WALFactory(config, this.currentServername.toString()); 731 Class<? extends WALProvider> walProvider = walFactory.getProviderClass( 732 WALFactory.WAL_PROVIDER, Providers.filesystem.name()); 733 assertEquals(IOTestProvider.class, walProvider); 734 WALProvider metaWALProvider = walFactory.getMetaProvider(); 735 assertEquals(IOTestProvider.class, metaWALProvider.getClass()); 736 } 737 738 @Test 739 public void testCustomMetaProvider() throws IOException { 740 final Configuration config = new Configuration(); 741 config.set(WALFactory.META_WAL_PROVIDER, IOTestProvider.class.getName()); 742 final WALFactory walFactory = new WALFactory(config, this.currentServername.toString()); 743 Class<? extends WALProvider> walProvider = walFactory.getProviderClass( 744 WALFactory.WAL_PROVIDER, Providers.filesystem.name()); 745 assertEquals(Providers.filesystem.clazz, walProvider); 746 WALProvider metaWALProvider = walFactory.getMetaProvider(); 747 assertEquals(IOTestProvider.class, metaWALProvider.getClass()); 748 } 749}