001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotEquals; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertNull; 025import static org.junit.Assert.assertSame; 026import static org.junit.Assert.assertTrue; 027import static org.mockito.Mockito.when; 028 029import java.io.IOException; 030import java.util.NavigableMap; 031import java.util.OptionalLong; 032import java.util.TreeMap; 033import java.util.concurrent.ExecutionException; 034import java.util.concurrent.ForkJoinPool; 035import java.util.concurrent.Future; 036import java.util.concurrent.PriorityBlockingQueue; 037import java.util.concurrent.atomic.AtomicBoolean; 038import java.util.concurrent.atomic.AtomicInteger; 039import java.util.concurrent.atomic.AtomicLong; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.FileSystem; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.hbase.Cell; 044import org.apache.hadoop.hbase.HBaseClassTestRule; 045import org.apache.hadoop.hbase.HBaseTestingUtility; 046import org.apache.hadoop.hbase.HConstants; 047import org.apache.hadoop.hbase.KeyValue; 048import org.apache.hadoop.hbase.Server; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 051import org.apache.hadoop.hbase.client.RegionInfo; 052import org.apache.hadoop.hbase.client.RegionInfoBuilder; 053import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 054import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 055import org.apache.hadoop.hbase.replication.WALEntryFilter; 056import org.apache.hadoop.hbase.testclassification.LargeTests; 057import org.apache.hadoop.hbase.testclassification.ReplicationTests; 058import org.apache.hadoop.hbase.util.Bytes; 059import org.apache.hadoop.hbase.wal.WAL; 060import org.apache.hadoop.hbase.wal.WAL.Entry; 061import org.apache.hadoop.hbase.wal.WALEdit; 062import org.apache.hadoop.hbase.wal.WALFactory; 063import org.apache.hadoop.hbase.wal.WALKeyImpl; 064import org.apache.hadoop.hdfs.MiniDFSCluster; 065import org.junit.After; 066import org.junit.AfterClass; 067import org.junit.Before; 068import org.junit.BeforeClass; 069import org.junit.ClassRule; 070import org.junit.Rule; 071import org.junit.Test; 072import org.junit.experimental.categories.Category; 073import org.junit.rules.TestName; 074import org.mockito.Mockito; 075 076@Category({ ReplicationTests.class, LargeTests.class }) 077public class TestWALEntryStream { 078 079 @ClassRule 080 public static final HBaseClassTestRule CLASS_RULE = 081 HBaseClassTestRule.forClass(TestWALEntryStream.class); 082 083 private static HBaseTestingUtility TEST_UTIL; 084 private static Configuration CONF; 085 private static FileSystem fs; 086 private static MiniDFSCluster cluster; 087 private static final TableName tableName = TableName.valueOf("tablename"); 088 private static final byte[] family = Bytes.toBytes("column"); 089 private static final byte[] qualifier = Bytes.toBytes("qualifier"); 090 private static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName) 091 .setStartKey(HConstants.EMPTY_START_ROW).setEndKey(HConstants.LAST_ROW).build(); 092 private static final NavigableMap<byte[], Integer> scopes = getScopes(); 093 094 private static NavigableMap<byte[], Integer> getScopes() { 095 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 096 scopes.put(family, 1); 097 return scopes; 098 } 099 100 private WAL log; 101 PriorityBlockingQueue<Path> walQueue; 102 private PathWatcher pathWatcher; 103 104 @Rule 105 public TestName tn = new TestName(); 106 private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 107 108 @BeforeClass 109 public static void setUpBeforeClass() throws Exception { 110 TEST_UTIL = new HBaseTestingUtility(); 111 CONF = TEST_UTIL.getConfiguration(); 112 TEST_UTIL.startMiniDFSCluster(3); 113 114 cluster = TEST_UTIL.getDFSCluster(); 115 fs = cluster.getFileSystem(); 116 } 117 118 @AfterClass 119 public static void tearDownAfterClass() throws Exception { 120 TEST_UTIL.shutdownMiniCluster(); 121 } 122 123 @Before 124 public void setUp() throws Exception { 125 walQueue = new PriorityBlockingQueue<>(); 126 pathWatcher = new PathWatcher(); 127 final WALFactory wals = new WALFactory(CONF, tn.getMethodName()); 128 wals.getWALProvider().addWALActionsListener(pathWatcher); 129 log = wals.getWAL(info); 130 } 131 132 @After 133 public void tearDown() throws Exception { 134 log.close(); 135 } 136 137 // Try out different combinations of row count and KeyValue count 138 @Test 139 public void testDifferentCounts() throws Exception { 140 int[] NB_ROWS = { 1500, 60000 }; 141 int[] NB_KVS = { 1, 100 }; 142 // whether compression is used 143 Boolean[] BOOL_VALS = { false, true }; 144 // long lastPosition = 0; 145 for (int nbRows : NB_ROWS) { 146 for (int walEditKVs : NB_KVS) { 147 for (boolean isCompressionEnabled : BOOL_VALS) { 148 TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION, 149 isCompressionEnabled); 150 mvcc.advanceTo(1); 151 152 for (int i = 0; i < nbRows; i++) { 153 appendToLogAndSync(walEditKVs); 154 } 155 156 log.rollWriter(); 157 158 try (WALEntryStream entryStream = 159 new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) { 160 int i = 0; 161 while (entryStream.hasNext()) { 162 assertNotNull(entryStream.next()); 163 i++; 164 } 165 assertEquals(nbRows, i); 166 167 // should've read all entries 168 assertFalse(entryStream.hasNext()); 169 } 170 // reset everything for next loop 171 log.close(); 172 setUp(); 173 } 174 } 175 } 176 } 177 178 /** 179 * Tests basic reading of log appends 180 */ 181 @Test 182 public void testAppendsWithRolls() throws Exception { 183 appendToLogAndSync(); 184 long oldPos; 185 try (WALEntryStream entryStream = 186 new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) { 187 // There's one edit in the log, read it. Reading past it needs to throw exception 188 assertTrue(entryStream.hasNext()); 189 WAL.Entry entry = entryStream.peek(); 190 assertSame(entry, entryStream.next()); 191 assertNotNull(entry); 192 assertFalse(entryStream.hasNext()); 193 assertNull(entryStream.peek()); 194 assertNull(entryStream.next()); 195 oldPos = entryStream.getPosition(); 196 } 197 198 appendToLogAndSync(); 199 200 try (WALEntryStream entryStream = new WALEntryStream(walQueue, CONF, oldPos, 201 log, null, new MetricsSource("1"))) { 202 // Read the newly added entry, make sure we made progress 203 WAL.Entry entry = entryStream.next(); 204 assertNotEquals(oldPos, entryStream.getPosition()); 205 assertNotNull(entry); 206 oldPos = entryStream.getPosition(); 207 } 208 209 // We rolled but we still should see the end of the first log and get that item 210 appendToLogAndSync(); 211 log.rollWriter(); 212 appendToLogAndSync(); 213 214 try (WALEntryStream entryStream = new WALEntryStream(walQueue, CONF, oldPos, 215 log, null, new MetricsSource("1"))) { 216 WAL.Entry entry = entryStream.next(); 217 assertNotEquals(oldPos, entryStream.getPosition()); 218 assertNotNull(entry); 219 220 // next item should come from the new log 221 entry = entryStream.next(); 222 assertNotEquals(oldPos, entryStream.getPosition()); 223 assertNotNull(entry); 224 225 // no more entries to read 226 assertFalse(entryStream.hasNext()); 227 oldPos = entryStream.getPosition(); 228 } 229 } 230 231 /** 232 * Tests that if after a stream is opened, more entries come in and then the log is rolled, we 233 * don't mistakenly dequeue the current log thinking we're done with it 234 */ 235 @Test 236 public void testLogrollWhileStreaming() throws Exception { 237 appendToLog("1"); 238 appendToLog("2");// 2 239 try (WALEntryStream entryStream = 240 new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) { 241 assertEquals("1", getRow(entryStream.next())); 242 243 appendToLog("3"); // 3 - comes in after reader opened 244 log.rollWriter(); // log roll happening while we're reading 245 appendToLog("4"); // 4 - this append is in the rolled log 246 247 assertEquals("2", getRow(entryStream.next())); 248 assertEquals(2, walQueue.size()); // we should not have dequeued yet since there's still an 249 // entry in first log 250 assertEquals("3", getRow(entryStream.next())); // if implemented improperly, this would be 4 251 // and 3 would be skipped 252 assertEquals("4", getRow(entryStream.next())); // 4 253 assertEquals(1, walQueue.size()); // now we've dequeued and moved on to next log properly 254 assertFalse(entryStream.hasNext()); 255 } 256 } 257 258 /** 259 * Tests that if writes come in while we have a stream open, we shouldn't miss them 260 */ 261 @Test 262 public void testNewEntriesWhileStreaming() throws Exception { 263 appendToLog("1"); 264 try (WALEntryStream entryStream = 265 new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) { 266 entryStream.next(); // we've hit the end of the stream at this point 267 268 // some new entries come in while we're streaming 269 appendToLog("2"); 270 appendToLog("3"); 271 272 // don't see them 273 assertFalse(entryStream.hasNext()); 274 275 // But we do if we reset 276 entryStream.reset(); 277 assertEquals("2", getRow(entryStream.next())); 278 assertEquals("3", getRow(entryStream.next())); 279 assertFalse(entryStream.hasNext()); 280 } 281 } 282 283 @Test 284 public void testResumeStreamingFromPosition() throws Exception { 285 long lastPosition = 0; 286 appendToLog("1"); 287 try (WALEntryStream entryStream = 288 new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) { 289 entryStream.next(); // we've hit the end of the stream at this point 290 appendToLog("2"); 291 appendToLog("3"); 292 lastPosition = entryStream.getPosition(); 293 } 294 // next stream should picks up where we left off 295 try (WALEntryStream entryStream = 296 new WALEntryStream(walQueue, CONF, lastPosition, log, null, new MetricsSource("1"))) { 297 assertEquals("2", getRow(entryStream.next())); 298 assertEquals("3", getRow(entryStream.next())); 299 assertFalse(entryStream.hasNext()); // done 300 assertEquals(1, walQueue.size()); 301 } 302 } 303 304 /** 305 * Tests that if we stop before hitting the end of a stream, we can continue where we left off 306 * using the last position 307 */ 308 @Test 309 public void testPosition() throws Exception { 310 long lastPosition = 0; 311 appendEntriesToLogAndSync(3); 312 // read only one element 313 try (WALEntryStream entryStream = new WALEntryStream(walQueue, CONF, lastPosition, 314 log, null, new MetricsSource("1"))) { 315 entryStream.next(); 316 lastPosition = entryStream.getPosition(); 317 } 318 // there should still be two more entries from where we left off 319 try (WALEntryStream entryStream = 320 new WALEntryStream(walQueue, CONF, lastPosition, log, null, new MetricsSource("1"))) { 321 assertNotNull(entryStream.next()); 322 assertNotNull(entryStream.next()); 323 assertFalse(entryStream.hasNext()); 324 } 325 } 326 327 328 @Test 329 public void testEmptyStream() throws Exception { 330 try (WALEntryStream entryStream = 331 new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) { 332 assertFalse(entryStream.hasNext()); 333 } 334 } 335 336 private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) { 337 ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); 338 when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); 339 Server mockServer = Mockito.mock(Server.class); 340 ReplicationSource source = Mockito.mock(ReplicationSource.class); 341 when(source.getSourceManager()).thenReturn(mockSourceManager); 342 when(source.getSourceMetrics()).thenReturn(new MetricsSource("1")); 343 when(source.getWALFileLengthProvider()).thenReturn(log); 344 when(source.getServer()).thenReturn(mockServer); 345 when(source.isRecovered()).thenReturn(recovered); 346 return source; 347 } 348 349 private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) { 350 ReplicationSource source = mockReplicationSource(recovered, conf); 351 when(source.isPeerEnabled()).thenReturn(true); 352 ReplicationSourceWALReader reader = 353 new ReplicationSourceWALReader(fs, conf, walQueue, 0, getDummyFilter(), source); 354 reader.start(); 355 return reader; 356 } 357 358 @Test 359 public void testReplicationSourceWALReader() throws Exception { 360 appendEntriesToLogAndSync(3); 361 // get ending position 362 long position; 363 try (WALEntryStream entryStream = 364 new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) { 365 entryStream.next(); 366 entryStream.next(); 367 entryStream.next(); 368 position = entryStream.getPosition(); 369 } 370 371 // start up a reader 372 Path walPath = walQueue.peek(); 373 ReplicationSourceWALReader reader = createReader(false, CONF); 374 WALEntryBatch entryBatch = reader.take(); 375 376 // should've batched up our entries 377 assertNotNull(entryBatch); 378 assertEquals(3, entryBatch.getWalEntries().size()); 379 assertEquals(position, entryBatch.getLastWalPosition()); 380 assertEquals(walPath, entryBatch.getLastWalPath()); 381 assertEquals(3, entryBatch.getNbRowKeys()); 382 383 appendToLog("foo"); 384 entryBatch = reader.take(); 385 assertEquals(1, entryBatch.getNbEntries()); 386 assertEquals("foo", getRow(entryBatch.getWalEntries().get(0))); 387 } 388 389 @Test 390 public void testReplicationSourceWALReaderRecovered() throws Exception { 391 appendEntriesToLogAndSync(10); 392 Path walPath = walQueue.peek(); 393 log.rollWriter(); 394 appendEntriesToLogAndSync(5); 395 log.shutdown(); 396 397 Configuration conf = new Configuration(CONF); 398 conf.setInt("replication.source.nb.capacity", 10); 399 400 ReplicationSourceWALReader reader = createReader(true, conf); 401 402 WALEntryBatch batch = reader.take(); 403 assertEquals(walPath, batch.getLastWalPath()); 404 assertEquals(10, batch.getNbEntries()); 405 assertFalse(batch.isEndOfFile()); 406 407 batch = reader.take(); 408 assertEquals(walPath, batch.getLastWalPath()); 409 assertEquals(0, batch.getNbEntries()); 410 assertTrue(batch.isEndOfFile()); 411 412 walPath = walQueue.peek(); 413 batch = reader.take(); 414 assertEquals(walPath, batch.getLastWalPath()); 415 assertEquals(5, batch.getNbEntries()); 416 // Actually this should be true but we haven't handled this yet since for a normal queue the 417 // last one is always open... Not a big deal for now. 418 assertFalse(batch.isEndOfFile()); 419 420 assertSame(WALEntryBatch.NO_MORE_DATA, reader.take()); 421 } 422 423 // Testcase for HBASE-20206 424 @Test 425 public void testReplicationSourceWALReaderWrongPosition() throws Exception { 426 appendEntriesToLogAndSync(1); 427 Path walPath = walQueue.peek(); 428 log.rollWriter(); 429 appendEntriesToLogAndSync(20); 430 TEST_UTIL.waitFor(5000, new ExplainingPredicate<Exception>() { 431 432 @Override 433 public boolean evaluate() throws Exception { 434 return fs.getFileStatus(walPath).getLen() > 0; 435 } 436 437 @Override 438 public String explainFailure() throws Exception { 439 return walPath + " has not been closed yet"; 440 } 441 442 }); 443 long walLength = fs.getFileStatus(walPath).getLen(); 444 445 ReplicationSourceWALReader reader = createReader(false, CONF); 446 447 WALEntryBatch entryBatch = reader.take(); 448 assertEquals(walPath, entryBatch.getLastWalPath()); 449 assertTrue("Position " + entryBatch.getLastWalPosition() + " is out of range, file length is " + 450 walLength, entryBatch.getLastWalPosition() <= walLength); 451 assertEquals(1, entryBatch.getNbEntries()); 452 assertTrue(entryBatch.isEndOfFile()); 453 454 Path walPath2 = walQueue.peek(); 455 entryBatch = reader.take(); 456 assertEquals(walPath2, entryBatch.getLastWalPath()); 457 assertEquals(20, entryBatch.getNbEntries()); 458 assertFalse(entryBatch.isEndOfFile()); 459 460 log.rollWriter(); 461 appendEntriesToLogAndSync(10); 462 entryBatch = reader.take(); 463 assertEquals(walPath2, entryBatch.getLastWalPath()); 464 assertEquals(0, entryBatch.getNbEntries()); 465 assertTrue(entryBatch.isEndOfFile()); 466 467 Path walPath3 = walQueue.peek(); 468 entryBatch = reader.take(); 469 assertEquals(walPath3, entryBatch.getLastWalPath()); 470 assertEquals(10, entryBatch.getNbEntries()); 471 assertFalse(entryBatch.isEndOfFile()); 472 } 473 474 @Test 475 public void testReplicationSourceWALReaderDisabled() 476 throws IOException, InterruptedException, ExecutionException { 477 appendEntriesToLogAndSync(3); 478 // get ending position 479 long position; 480 try (WALEntryStream entryStream = 481 new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) { 482 entryStream.next(); 483 entryStream.next(); 484 entryStream.next(); 485 position = entryStream.getPosition(); 486 } 487 488 // start up a reader 489 Path walPath = walQueue.peek(); 490 ReplicationSource source = mockReplicationSource(false, CONF); 491 AtomicInteger invokeCount = new AtomicInteger(0); 492 AtomicBoolean enabled = new AtomicBoolean(false); 493 when(source.isPeerEnabled()).then(i -> { 494 invokeCount.incrementAndGet(); 495 return enabled.get(); 496 }); 497 498 ReplicationSourceWALReader reader = 499 new ReplicationSourceWALReader(fs, CONF, walQueue, 0, getDummyFilter(), source); 500 reader.start(); 501 Future<WALEntryBatch> future = ForkJoinPool.commonPool().submit(() -> { 502 return reader.take(); 503 }); 504 // make sure that the isPeerEnabled has been called several times 505 TEST_UTIL.waitFor(30000, () -> invokeCount.get() >= 5); 506 // confirm that we can read nothing if the peer is disabled 507 assertFalse(future.isDone()); 508 // then enable the peer, we should get the batch 509 enabled.set(true); 510 WALEntryBatch entryBatch = future.get(); 511 512 // should've batched up our entries 513 assertNotNull(entryBatch); 514 assertEquals(3, entryBatch.getWalEntries().size()); 515 assertEquals(position, entryBatch.getLastWalPosition()); 516 assertEquals(walPath, entryBatch.getLastWalPath()); 517 assertEquals(3, entryBatch.getNbRowKeys()); 518 } 519 520 private String getRow(WAL.Entry entry) { 521 Cell cell = entry.getEdit().getCells().get(0); 522 return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); 523 } 524 525 private void appendToLog(String key) throws IOException { 526 final long txid = log.appendData(info, 527 new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), 528 mvcc, scopes), getWALEdit(key)); 529 log.sync(txid); 530 } 531 532 private void appendEntriesToLogAndSync(int count) throws IOException { 533 long txid = -1L; 534 for (int i = 0; i < count; i++) { 535 txid = appendToLog(1); 536 } 537 log.sync(txid); 538 } 539 540 private void appendToLogAndSync() throws IOException { 541 appendToLogAndSync(1); 542 } 543 544 private void appendToLogAndSync(int count) throws IOException { 545 long txid = appendToLog(count); 546 log.sync(txid); 547 } 548 549 private long appendToLog(int count) throws IOException { 550 return log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 551 System.currentTimeMillis(), mvcc, scopes), getWALEdits(count)); 552 } 553 554 private WALEdit getWALEdits(int count) { 555 WALEdit edit = new WALEdit(); 556 for (int i = 0; i < count; i++) { 557 edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier, 558 System.currentTimeMillis(), qualifier)); 559 } 560 return edit; 561 } 562 563 private WALEdit getWALEdit(String row) { 564 WALEdit edit = new WALEdit(); 565 edit.add( 566 new KeyValue(Bytes.toBytes(row), family, qualifier, System.currentTimeMillis(), qualifier)); 567 return edit; 568 } 569 570 private WALEntryFilter getDummyFilter() { 571 return new WALEntryFilter() { 572 573 @Override 574 public Entry filter(Entry entry) { 575 return entry; 576 } 577 }; 578 } 579 580 class PathWatcher implements WALActionsListener { 581 582 Path currentPath; 583 584 @Override 585 public void preLogRoll(Path oldPath, Path newPath) throws IOException { 586 walQueue.add(newPath); 587 currentPath = newPath; 588 } 589 } 590 591 @Test 592 public void testReadBeyondCommittedLength() throws IOException, InterruptedException { 593 appendToLog("1"); 594 appendToLog("2"); 595 long size = log.getLogFileSizeIfBeingWritten(walQueue.peek()).getAsLong(); 596 AtomicLong fileLength = new AtomicLong(size - 1); 597 try (WALEntryStream entryStream = new WALEntryStream(walQueue, CONF, 0, 598 p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"))) { 599 assertTrue(entryStream.hasNext()); 600 assertNotNull(entryStream.next()); 601 // can not get log 2 602 assertFalse(entryStream.hasNext()); 603 Thread.sleep(1000); 604 entryStream.reset(); 605 // still can not get log 2 606 assertFalse(entryStream.hasNext()); 607 608 // can get log 2 now 609 fileLength.set(size); 610 entryStream.reset(); 611 assertTrue(entryStream.hasNext()); 612 assertNotNull(entryStream.next()); 613 614 assertFalse(entryStream.hasNext()); 615 } 616 } 617}