001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.apache.hadoop.hbase.wal.WALFactory.META_WAL_PROVIDER; 021import static org.apache.hadoop.hbase.wal.WALFactory.WAL_PROVIDER; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertFalse; 024import static org.junit.Assert.assertNotNull; 025import static org.junit.Assert.assertTrue; 026import static org.junit.Assert.fail; 027 028import java.io.IOException; 029import java.lang.reflect.Method; 030import java.net.BindException; 031import java.util.List; 032import java.util.NavigableMap; 033import java.util.TreeMap; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FSDataInputStream; 036import org.apache.hadoop.fs.FSDataOutputStream; 037import org.apache.hadoop.fs.FileStatus; 038import org.apache.hadoop.fs.FileSystem; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.hbase.Cell; 041import org.apache.hadoop.hbase.CellUtil; 042import org.apache.hadoop.hbase.Coprocessor; 043import org.apache.hadoop.hbase.HBaseClassTestRule; 044import org.apache.hadoop.hbase.HBaseTestingUtility; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.KeyValue; 047import org.apache.hadoop.hbase.ServerName; 048import org.apache.hadoop.hbase.TableName; 049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 050import org.apache.hadoop.hbase.client.RegionInfo; 051import org.apache.hadoop.hbase.client.RegionInfoBuilder; 052import org.apache.hadoop.hbase.client.TableDescriptor; 053import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 054import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 055import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor; 056import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 057import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 058import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost; 059import org.apache.hadoop.hbase.testclassification.MediumTests; 060import org.apache.hadoop.hbase.testclassification.RegionServerTests; 061import org.apache.hadoop.hbase.util.Bytes; 062import org.apache.hadoop.hbase.util.FSUtils; 063import org.apache.hadoop.hbase.util.Threads; 064import org.apache.hadoop.hbase.wal.WALFactory.Providers; 065import org.apache.hadoop.hdfs.DistributedFileSystem; 066import org.apache.hadoop.hdfs.MiniDFSCluster; 067import org.apache.hadoop.hdfs.protocol.HdfsConstants; 068import org.junit.After; 069import org.junit.AfterClass; 070import org.junit.Before; 071import org.junit.BeforeClass; 072import org.junit.ClassRule; 073import org.junit.Rule; 074import org.junit.Test; 075import org.junit.experimental.categories.Category; 076import org.junit.rules.TestName; 077import org.slf4j.Logger; 078import org.slf4j.LoggerFactory; 079 080/** 081 * WAL tests that can be reused across providers. 082 */ 083@Category({RegionServerTests.class, MediumTests.class}) 084public class TestWALFactory { 085 086 @ClassRule 087 public static final HBaseClassTestRule CLASS_RULE = 088 HBaseClassTestRule.forClass(TestWALFactory.class); 089 090 private static final Logger LOG = LoggerFactory.getLogger(TestWALFactory.class); 091 092 protected static Configuration conf; 093 private static MiniDFSCluster cluster; 094 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 095 protected static Path hbaseDir; 096 protected static Path hbaseWALDir; 097 098 protected FileSystem fs; 099 protected Path dir; 100 protected WALFactory wals; 101 private ServerName currentServername; 102 103 @Rule 104 public final TestName currentTest = new TestName(); 105 106 @Before 107 public void setUp() throws Exception { 108 fs = cluster.getFileSystem(); 109 dir = new Path(hbaseDir, currentTest.getMethodName()); 110 this.currentServername = ServerName.valueOf(currentTest.getMethodName(), 16010, 1); 111 wals = new WALFactory(conf, this.currentServername.toString()); 112 } 113 114 @After 115 public void tearDown() throws Exception { 116 // testAppendClose closes the FileSystem, which will prevent us from closing cleanly here. 117 try { 118 wals.close(); 119 } catch (IOException exception) { 120 LOG.warn("Encountered exception while closing wal factory. If you have other errors, this" + 121 " may be the cause. Message: " + exception); 122 LOG.debug("Exception details for failure to close wal factory.", exception); 123 } 124 FileStatus[] entries = fs.listStatus(new Path("/")); 125 for (FileStatus dir : entries) { 126 fs.delete(dir.getPath(), true); 127 } 128 } 129 130 @BeforeClass 131 public static void setUpBeforeClass() throws Exception { 132 // Make block sizes small. 133 TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024); 134 // needed for testAppendClose() 135 // quicker heartbeat interval for faster DN death notification 136 TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000); 137 TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1); 138 TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000); 139 140 // faster failover with cluster.shutdown();fs.close() idiom 141 TEST_UTIL.getConfiguration() 142 .setInt("hbase.ipc.client.connect.max.retries", 1); 143 TEST_UTIL.getConfiguration().setInt( 144 "dfs.client.block.recovery.retries", 1); 145 TEST_UTIL.getConfiguration().setInt( 146 "hbase.ipc.client.connection.maxidletime", 500); 147 TEST_UTIL.getConfiguration().setInt("hbase.lease.recovery.timeout", 10000); 148 TEST_UTIL.getConfiguration().setInt("hbase.lease.recovery.dfs.timeout", 1000); 149 TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, 150 SampleRegionWALCoprocessor.class.getName()); 151 TEST_UTIL.startMiniDFSCluster(3); 152 153 conf = TEST_UTIL.getConfiguration(); 154 cluster = TEST_UTIL.getDFSCluster(); 155 156 hbaseDir = TEST_UTIL.createRootDir(); 157 hbaseWALDir = TEST_UTIL.createWALRootDir(); 158 } 159 160 @AfterClass 161 public static void tearDownAfterClass() throws Exception { 162 TEST_UTIL.shutdownMiniCluster(); 163 } 164 165 @Test 166 public void canCloseSingleton() throws IOException { 167 WALFactory.getInstance(conf).close(); 168 } 169 170 /** 171 * Just write multiple logs then split. Before fix for HADOOP-2283, this 172 * would fail. 173 * @throws IOException 174 */ 175 @Test 176 public void testSplit() throws IOException { 177 final TableName tableName = TableName.valueOf(currentTest.getMethodName()); 178 final byte [] rowName = tableName.getName(); 179 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); 180 final int howmany = 3; 181 RegionInfo[] infos = new RegionInfo[3]; 182 Path tabledir = FSUtils.getWALTableDir(conf, tableName); 183 fs.mkdirs(tabledir); 184 for (int i = 0; i < howmany; i++) { 185 infos[i] = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("" + i)) 186 .setEndKey(Bytes.toBytes("" + (i + 1))).build(); 187 fs.mkdirs(new Path(tabledir, infos[i].getEncodedName())); 188 LOG.info("allo " + new Path(tabledir, infos[i].getEncodedName()).toString()); 189 } 190 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 191 scopes.put(Bytes.toBytes("column"), 0); 192 193 194 // Add edits for three regions. 195 for (int ii = 0; ii < howmany; ii++) { 196 for (int i = 0; i < howmany; i++) { 197 final WAL log = 198 wals.getWAL(infos[i]); 199 for (int j = 0; j < howmany; j++) { 200 WALEdit edit = new WALEdit(); 201 byte [] family = Bytes.toBytes("column"); 202 byte [] qualifier = Bytes.toBytes(Integer.toString(j)); 203 byte [] column = Bytes.toBytes("column:" + Integer.toString(j)); 204 edit.add(new KeyValue(rowName, family, qualifier, 205 System.currentTimeMillis(), column)); 206 LOG.info("Region " + i + ": " + edit); 207 WALKeyImpl walKey = new WALKeyImpl(infos[i].getEncodedNameAsBytes(), tableName, 208 System.currentTimeMillis(), mvcc, scopes); 209 log.append(infos[i], walKey, edit, true); 210 walKey.getWriteEntry(); 211 } 212 log.sync(); 213 log.rollWriter(true); 214 } 215 } 216 wals.shutdown(); 217 // The below calculation of logDir relies on insider information... WALSplitter should be connected better 218 // with the WAL system.... not requiring explicit path. The oldLogDir is just made up not used. 219 Path logDir = 220 new Path(new Path(hbaseWALDir, HConstants.HREGION_LOGDIR_NAME), 221 this.currentServername.toString()); 222 Path oldLogDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME); 223 List<Path> splits = WALSplitter.split(hbaseWALDir, logDir, oldLogDir, fs, conf, wals); 224 verifySplits(splits, howmany); 225 } 226 227 /** 228 * Test new HDFS-265 sync. 229 * @throws Exception 230 */ 231 @Test 232 public void Broken_testSync() throws Exception { 233 TableName tableName = TableName.valueOf(currentTest.getMethodName()); 234 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); 235 // First verify that using streams all works. 236 Path p = new Path(dir, currentTest.getMethodName() + ".fsdos"); 237 FSDataOutputStream out = fs.create(p); 238 out.write(tableName.getName()); 239 Method syncMethod = null; 240 try { 241 syncMethod = out.getClass().getMethod("hflush", new Class<?> []{}); 242 } catch (NoSuchMethodException e) { 243 try { 244 syncMethod = out.getClass().getMethod("sync", new Class<?> []{}); 245 } catch (NoSuchMethodException ex) { 246 fail("This version of Hadoop supports neither Syncable.sync() " + 247 "nor Syncable.hflush()."); 248 } 249 } 250 syncMethod.invoke(out, new Object[]{}); 251 FSDataInputStream in = fs.open(p); 252 assertTrue(in.available() > 0); 253 byte [] buffer = new byte [1024]; 254 int read = in.read(buffer); 255 assertEquals(tableName.getName().length, read); 256 out.close(); 257 in.close(); 258 259 final int total = 20; 260 WAL.Reader reader = null; 261 262 try { 263 RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); 264 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 265 scopes.put(tableName.getName(), 0); 266 final WAL wal = wals.getWAL(info); 267 268 for (int i = 0; i < total; i++) { 269 WALEdit kvs = new WALEdit(); 270 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); 271 wal.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 272 System.currentTimeMillis(), mvcc, scopes), kvs, true); 273 } 274 // Now call sync and try reading. Opening a Reader before you sync just 275 // gives you EOFE. 276 wal.sync(); 277 // Open a Reader. 278 Path walPath = AbstractFSWALProvider.getCurrentFileName(wal); 279 reader = wals.createReader(fs, walPath); 280 int count = 0; 281 WAL.Entry entry = new WAL.Entry(); 282 while ((entry = reader.next(entry)) != null) count++; 283 assertEquals(total, count); 284 reader.close(); 285 // Add test that checks to see that an open of a Reader works on a file 286 // that has had a sync done on it. 287 for (int i = 0; i < total; i++) { 288 WALEdit kvs = new WALEdit(); 289 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); 290 wal.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 291 System.currentTimeMillis(), mvcc, scopes), kvs, true); 292 } 293 wal.sync(); 294 reader = wals.createReader(fs, walPath); 295 count = 0; 296 while((entry = reader.next(entry)) != null) count++; 297 assertTrue(count >= total); 298 reader.close(); 299 // If I sync, should see double the edits. 300 wal.sync(); 301 reader = wals.createReader(fs, walPath); 302 count = 0; 303 while((entry = reader.next(entry)) != null) count++; 304 assertEquals(total * 2, count); 305 reader.close(); 306 // Now do a test that ensures stuff works when we go over block boundary, 307 // especially that we return good length on file. 308 final byte [] value = new byte[1025 * 1024]; // Make a 1M value. 309 for (int i = 0; i < total; i++) { 310 WALEdit kvs = new WALEdit(); 311 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value)); 312 wal.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 313 System.currentTimeMillis(), mvcc, scopes), kvs, true); 314 } 315 // Now I should have written out lots of blocks. Sync then read. 316 wal.sync(); 317 reader = wals.createReader(fs, walPath); 318 count = 0; 319 while((entry = reader.next(entry)) != null) count++; 320 assertEquals(total * 3, count); 321 reader.close(); 322 // shutdown and ensure that Reader gets right length also. 323 wal.shutdown(); 324 reader = wals.createReader(fs, walPath); 325 count = 0; 326 while((entry = reader.next(entry)) != null) count++; 327 assertEquals(total * 3, count); 328 reader.close(); 329 } finally { 330 if (reader != null) reader.close(); 331 } 332 } 333 334 private void verifySplits(final List<Path> splits, final int howmany) 335 throws IOException { 336 assertEquals(howmany * howmany, splits.size()); 337 for (int i = 0; i < splits.size(); i++) { 338 LOG.info("Verifying=" + splits.get(i)); 339 WAL.Reader reader = wals.createReader(fs, splits.get(i)); 340 try { 341 int count = 0; 342 String previousRegion = null; 343 long seqno = -1; 344 WAL.Entry entry = new WAL.Entry(); 345 while((entry = reader.next(entry)) != null) { 346 WALKey key = entry.getKey(); 347 String region = Bytes.toString(key.getEncodedRegionName()); 348 // Assert that all edits are for same region. 349 if (previousRegion != null) { 350 assertEquals(previousRegion, region); 351 } 352 LOG.info("oldseqno=" + seqno + ", newseqno=" + key.getSequenceId()); 353 assertTrue(seqno < key.getSequenceId()); 354 seqno = key.getSequenceId(); 355 previousRegion = region; 356 count++; 357 } 358 assertEquals(howmany, count); 359 } finally { 360 reader.close(); 361 } 362 } 363 } 364 365 /* 366 * We pass different values to recoverFileLease() so that different code paths are covered 367 * 368 * For this test to pass, requires: 369 * 1. HDFS-200 (append support) 370 * 2. HDFS-988 (SafeMode should freeze file operations 371 * [FSNamesystem.nextGenerationStampForBlock]) 372 * 3. HDFS-142 (on restart, maintain pendingCreates) 373 */ 374 @Test 375 public void testAppendClose() throws Exception { 376 TableName tableName = 377 TableName.valueOf(currentTest.getMethodName()); 378 RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).build(); 379 380 WAL wal = wals.getWAL(regionInfo); 381 int total = 20; 382 383 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 384 scopes.put(tableName.getName(), 0); 385 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 386 for (int i = 0; i < total; i++) { 387 WALEdit kvs = new WALEdit(); 388 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); 389 wal.append(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName, 390 System.currentTimeMillis(), mvcc, scopes), 391 kvs, true); 392 } 393 // Now call sync to send the data to HDFS datanodes 394 wal.sync(); 395 int namenodePort = cluster.getNameNodePort(); 396 final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal); 397 398 399 // Stop the cluster. (ensure restart since we're sharing MiniDFSCluster) 400 try { 401 DistributedFileSystem dfs = cluster.getFileSystem(); 402 dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER); 403 TEST_UTIL.shutdownMiniDFSCluster(); 404 try { 405 // wal.writer.close() will throw an exception, 406 // but still call this since it closes the LogSyncer thread first 407 wal.shutdown(); 408 } catch (IOException e) { 409 LOG.info(e.toString(), e); 410 } 411 fs.close(); // closing FS last so DFSOutputStream can't call close 412 LOG.info("STOPPED first instance of the cluster"); 413 } finally { 414 // Restart the cluster 415 while (cluster.isClusterUp()){ 416 LOG.error("Waiting for cluster to go down"); 417 Thread.sleep(1000); 418 } 419 assertFalse(cluster.isClusterUp()); 420 cluster = null; 421 for (int i = 0; i < 100; i++) { 422 try { 423 cluster = TEST_UTIL.startMiniDFSClusterForTestWAL(namenodePort); 424 break; 425 } catch (BindException e) { 426 LOG.info("Sleeping. BindException bringing up new cluster"); 427 Threads.sleep(1000); 428 } 429 } 430 cluster.waitActive(); 431 fs = cluster.getFileSystem(); 432 LOG.info("STARTED second instance."); 433 } 434 435 // set the lease period to be 1 second so that the 436 // namenode triggers lease recovery upon append request 437 Method setLeasePeriod = cluster.getClass() 438 .getDeclaredMethod("setLeasePeriod", new Class[]{Long.TYPE, Long.TYPE}); 439 setLeasePeriod.setAccessible(true); 440 setLeasePeriod.invoke(cluster, 1000L, 1000L); 441 try { 442 Thread.sleep(1000); 443 } catch (InterruptedException e) { 444 LOG.info(e.toString(), e); 445 } 446 447 // Now try recovering the log, like the HMaster would do 448 final FileSystem recoveredFs = fs; 449 final Configuration rlConf = conf; 450 451 class RecoverLogThread extends Thread { 452 public Exception exception = null; 453 @Override 454 public void run() { 455 try { 456 FSUtils.getInstance(fs, rlConf) 457 .recoverFileLease(recoveredFs, walPath, rlConf, null); 458 } catch (IOException e) { 459 exception = e; 460 } 461 } 462 } 463 464 RecoverLogThread t = new RecoverLogThread(); 465 t.start(); 466 // Timeout after 60 sec. Without correct patches, would be an infinite loop 467 t.join(60 * 1000); 468 if(t.isAlive()) { 469 t.interrupt(); 470 throw new Exception("Timed out waiting for WAL.recoverLog()"); 471 } 472 473 if (t.exception != null) 474 throw t.exception; 475 476 // Make sure you can read all the content 477 WAL.Reader reader = wals.createReader(fs, walPath); 478 int count = 0; 479 WAL.Entry entry = new WAL.Entry(); 480 while (reader.next(entry) != null) { 481 count++; 482 assertTrue("Should be one KeyValue per WALEdit", 483 entry.getEdit().getCells().size() == 1); 484 } 485 assertEquals(total, count); 486 reader.close(); 487 488 // Reset the lease period 489 setLeasePeriod.invoke(cluster, new Object[]{ 60000L, 3600000L }); 490 } 491 492 /** 493 * Tests that we can write out an edit, close, and then read it back in again. 494 */ 495 @Test 496 public void testEditAdd() throws IOException { 497 int colCount = 10; 498 TableDescriptor htd = 499 TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName())) 500 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("column")).build(); 501 NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); 502 for (byte[] fam : htd.getColumnFamilyNames()) { 503 scopes.put(fam, 0); 504 } 505 byte[] row = Bytes.toBytes("row"); 506 WAL.Reader reader = null; 507 try { 508 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); 509 510 // Write columns named 1, 2, 3, etc. and then values of single byte 511 // 1, 2, 3... 512 long timestamp = System.currentTimeMillis(); 513 WALEdit cols = new WALEdit(); 514 for (int i = 0; i < colCount; i++) { 515 cols.add(new KeyValue(row, Bytes.toBytes("column"), 516 Bytes.toBytes(Integer.toString(i)), 517 timestamp, new byte[] { (byte)(i + '0') })); 518 } 519 RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(row) 520 .setEndKey(Bytes.toBytes(Bytes.toString(row) + "1")).build(); 521 final WAL log = wals.getWAL(info); 522 523 final long txid = log.append(info, 524 new WALKeyImpl(info.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis(), 525 mvcc, scopes), 526 cols, true); 527 log.sync(txid); 528 log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getColumnFamilyNames()); 529 log.completeCacheFlush(info.getEncodedNameAsBytes()); 530 log.shutdown(); 531 Path filename = AbstractFSWALProvider.getCurrentFileName(log); 532 // Now open a reader on the log and assert append worked. 533 reader = wals.createReader(fs, filename); 534 // Above we added all columns on a single row so we only read one 535 // entry in the below... thats why we have '1'. 536 for (int i = 0; i < 1; i++) { 537 WAL.Entry entry = reader.next(null); 538 if (entry == null) break; 539 WALKey key = entry.getKey(); 540 WALEdit val = entry.getEdit(); 541 assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName())); 542 assertTrue(htd.getTableName().equals(key.getTableName())); 543 Cell cell = val.getCells().get(0); 544 assertTrue(Bytes.equals(row, 0, row.length, cell.getRowArray(), cell.getRowOffset(), 545 cell.getRowLength())); 546 assertEquals((byte)(i + '0'), CellUtil.cloneValue(cell)[0]); 547 System.out.println(key + " " + val); 548 } 549 } finally { 550 if (reader != null) { 551 reader.close(); 552 } 553 } 554 } 555 556 @Test 557 public void testAppend() throws IOException { 558 int colCount = 10; 559 TableDescriptor htd = 560 TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName())) 561 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("column")).build(); 562 NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); 563 for (byte[] fam : htd.getColumnFamilyNames()) { 564 scopes.put(fam, 0); 565 } 566 byte[] row = Bytes.toBytes("row"); 567 WAL.Reader reader = null; 568 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); 569 try { 570 // Write columns named 1, 2, 3, etc. and then values of single byte 571 // 1, 2, 3... 572 long timestamp = System.currentTimeMillis(); 573 WALEdit cols = new WALEdit(); 574 for (int i = 0; i < colCount; i++) { 575 cols.add(new KeyValue(row, Bytes.toBytes("column"), 576 Bytes.toBytes(Integer.toString(i)), 577 timestamp, new byte[] { (byte)(i + '0') })); 578 } 579 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 580 final WAL log = wals.getWAL(hri); 581 final long txid = log.append(hri, 582 new WALKeyImpl(hri.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis(), 583 mvcc, scopes), 584 cols, true); 585 log.sync(txid); 586 log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames()); 587 log.completeCacheFlush(hri.getEncodedNameAsBytes()); 588 log.shutdown(); 589 Path filename = AbstractFSWALProvider.getCurrentFileName(log); 590 // Now open a reader on the log and assert append worked. 591 reader = wals.createReader(fs, filename); 592 WAL.Entry entry = reader.next(); 593 assertEquals(colCount, entry.getEdit().size()); 594 int idx = 0; 595 for (Cell val : entry.getEdit().getCells()) { 596 assertTrue(Bytes.equals(hri.getEncodedNameAsBytes(), 597 entry.getKey().getEncodedRegionName())); 598 assertTrue(htd.getTableName().equals(entry.getKey().getTableName())); 599 assertTrue(Bytes.equals(row, 0, row.length, val.getRowArray(), val.getRowOffset(), 600 val.getRowLength())); 601 assertEquals((byte) (idx + '0'), CellUtil.cloneValue(val)[0]); 602 System.out.println(entry.getKey() + " " + val); 603 idx++; 604 } 605 } finally { 606 if (reader != null) { 607 reader.close(); 608 } 609 } 610 } 611 612 /** 613 * Test that we can visit entries before they are appended 614 * @throws Exception 615 */ 616 @Test 617 public void testVisitors() throws Exception { 618 final int COL_COUNT = 10; 619 final TableName tableName = TableName.valueOf(currentTest.getMethodName()); 620 final byte [] row = Bytes.toBytes("row"); 621 final DumbWALActionsListener visitor = new DumbWALActionsListener(); 622 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); 623 long timestamp = System.currentTimeMillis(); 624 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 625 scopes.put(Bytes.toBytes("column"), 0); 626 627 RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).build(); 628 final WAL log = wals.getWAL(hri); 629 log.registerWALActionsListener(visitor); 630 for (int i = 0; i < COL_COUNT; i++) { 631 WALEdit cols = new WALEdit(); 632 cols.add(new KeyValue(row, Bytes.toBytes("column"), 633 Bytes.toBytes(Integer.toString(i)), 634 timestamp, new byte[]{(byte) (i + '0')})); 635 log.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, 636 System.currentTimeMillis(), mvcc, scopes), cols, true); 637 } 638 log.sync(); 639 assertEquals(COL_COUNT, visitor.increments); 640 log.unregisterWALActionsListener(visitor); 641 WALEdit cols = new WALEdit(); 642 cols.add(new KeyValue(row, Bytes.toBytes("column"), 643 Bytes.toBytes(Integer.toString(11)), 644 timestamp, new byte[]{(byte) (11 + '0')})); 645 log.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, 646 System.currentTimeMillis(), mvcc, scopes), cols, true); 647 log.sync(); 648 assertEquals(COL_COUNT, visitor.increments); 649 } 650 651 /** 652 * A loaded WAL coprocessor won't break existing WAL test cases. 653 */ 654 @Test 655 public void testWALCoprocessorLoaded() throws Exception { 656 // test to see whether the coprocessor is loaded or not. 657 WALCoprocessorHost host = wals.getWAL(null).getCoprocessorHost(); 658 Coprocessor c = host.findCoprocessor(SampleRegionWALCoprocessor.class); 659 assertNotNull(c); 660 } 661 662 static class DumbWALActionsListener implements WALActionsListener { 663 int increments = 0; 664 665 @Override 666 public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) { 667 increments++; 668 } 669 670 @Override 671 public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) { 672 // To change body of implemented methods use File | Settings | File 673 // Templates. 674 increments++; 675 } 676 } 677 678 @Test 679 public void testWALProviders() throws IOException { 680 Configuration conf = new Configuration(); 681 WALFactory walFactory = new WALFactory(conf, this.currentServername.toString()); 682 assertEquals(walFactory.getWALProvider().getClass(), walFactory.getMetaProvider().getClass()); 683 } 684 685 @Test 686 public void testOnlySetWALProvider() throws IOException { 687 Configuration conf = new Configuration(); 688 conf.set(WAL_PROVIDER, WALFactory.Providers.multiwal.name()); 689 WALFactory walFactory = new WALFactory(conf, this.currentServername.toString()); 690 691 assertEquals(WALFactory.Providers.multiwal.clazz, walFactory.getWALProvider().getClass()); 692 assertEquals(WALFactory.Providers.multiwal.clazz, walFactory.getMetaProvider().getClass()); 693 } 694 695 @Test 696 public void testOnlySetMetaWALProvider() throws IOException { 697 Configuration conf = new Configuration(); 698 conf.set(META_WAL_PROVIDER, WALFactory.Providers.asyncfs.name()); 699 WALFactory walFactory = new WALFactory(conf, this.currentServername.toString()); 700 701 assertEquals(WALFactory.Providers.defaultProvider.clazz, 702 walFactory.getWALProvider().getClass()); 703 assertEquals(WALFactory.Providers.asyncfs.clazz, walFactory.getMetaProvider().getClass()); 704 } 705 706 @Test 707 public void testDefaultProvider() throws IOException { 708 final Configuration conf = new Configuration(); 709 // AsyncFSWal is the default, we should be able to request any WAL. 710 final WALFactory normalWalFactory = new WALFactory(conf, this.currentServername.toString()); 711 Class<? extends WALProvider> fshLogProvider = normalWalFactory.getProviderClass( 712 WALFactory.WAL_PROVIDER, Providers.filesystem.name()); 713 assertEquals(Providers.filesystem.clazz, fshLogProvider); 714 715 // Imagine a world where MultiWAL is the default 716 final WALFactory customizedWalFactory = new WALFactory( 717 conf, this.currentServername.toString()) { 718 @Override 719 Providers getDefaultProvider() { 720 return Providers.multiwal; 721 } 722 }; 723 // If we don't specify a WALProvider, we should get the default implementation. 724 Class<? extends WALProvider> multiwalProviderClass = customizedWalFactory.getProviderClass( 725 WALFactory.WAL_PROVIDER, Providers.multiwal.name()); 726 assertEquals(Providers.multiwal.clazz, multiwalProviderClass); 727 } 728}