001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotEquals; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertNull; 025import static org.junit.Assert.assertSame; 026import static org.junit.Assert.assertTrue; 027import static org.mockito.Mockito.when; 028 029import java.io.IOException; 030import java.util.ArrayList; 031import java.util.HashMap; 032import java.util.Map; 033import java.util.NavigableMap; 034import java.util.OptionalLong; 035import java.util.TreeMap; 036import java.util.UUID; 037import java.util.concurrent.ExecutionException; 038import java.util.concurrent.ForkJoinPool; 039import java.util.concurrent.Future; 040import java.util.concurrent.PriorityBlockingQueue; 041import java.util.concurrent.atomic.AtomicBoolean; 042import java.util.concurrent.atomic.AtomicInteger; 043import java.util.concurrent.atomic.AtomicLong; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.fs.FileSystem; 046import org.apache.hadoop.fs.Path; 047import org.apache.hadoop.hbase.Cell; 048import org.apache.hadoop.hbase.HBaseClassTestRule; 049import org.apache.hadoop.hbase.HBaseTestingUtility; 050import org.apache.hadoop.hbase.HConstants; 051import org.apache.hadoop.hbase.KeyValue; 052import org.apache.hadoop.hbase.Server; 053import org.apache.hadoop.hbase.TableName; 054import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 055import org.apache.hadoop.hbase.client.RegionInfo; 056import org.apache.hadoop.hbase.client.RegionInfoBuilder; 057import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 058import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 059import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; 060import org.apache.hadoop.hbase.replication.WALEntryFilter; 061import org.apache.hadoop.hbase.testclassification.LargeTests; 062import org.apache.hadoop.hbase.testclassification.ReplicationTests; 063import org.apache.hadoop.hbase.util.Bytes; 064import org.apache.hadoop.hbase.wal.WAL; 065import org.apache.hadoop.hbase.wal.WAL.Entry; 066import org.apache.hadoop.hbase.wal.WALEdit; 067import org.apache.hadoop.hbase.wal.WALFactory; 068import org.apache.hadoop.hbase.wal.WALKeyImpl; 069import org.apache.hadoop.hdfs.MiniDFSCluster; 070import org.junit.After; 071import org.junit.AfterClass; 072import org.junit.Assert; 073import org.junit.Before; 074import org.junit.BeforeClass; 075import org.junit.ClassRule; 076import org.junit.Rule; 077import org.junit.Test; 078import org.junit.experimental.categories.Category; 079import org.junit.rules.TestName; 080import org.mockito.Mockito; 081import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 082 083 084@Category({ ReplicationTests.class, LargeTests.class }) 085public class TestWALEntryStream { 086 087 @ClassRule 088 public static final HBaseClassTestRule CLASS_RULE = 089 HBaseClassTestRule.forClass(TestWALEntryStream.class); 090 091 private static HBaseTestingUtility TEST_UTIL; 092 private static Configuration CONF; 093 private static FileSystem fs; 094 private static MiniDFSCluster cluster; 095 private static final TableName tableName = TableName.valueOf("tablename"); 096 private static final byte[] family = Bytes.toBytes("column"); 097 private static final byte[] qualifier = Bytes.toBytes("qualifier"); 098 private static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName) 099 .setStartKey(HConstants.EMPTY_START_ROW).setEndKey(HConstants.LAST_ROW).build(); 100 private static final NavigableMap<byte[], Integer> scopes = getScopes(); 101 102 private static NavigableMap<byte[], Integer> getScopes() { 103 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 104 scopes.put(family, 1); 105 return scopes; 106 } 107 108 private WAL log; 109 PriorityBlockingQueue<Path> walQueue; 110 private PathWatcher pathWatcher; 111 112 @Rule 113 public TestName tn = new TestName(); 114 private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 115 116 @BeforeClass 117 public static void setUpBeforeClass() throws Exception { 118 TEST_UTIL = new HBaseTestingUtility(); 119 CONF = TEST_UTIL.getConfiguration(); 120 TEST_UTIL.startMiniDFSCluster(3); 121 122 cluster = TEST_UTIL.getDFSCluster(); 123 fs = cluster.getFileSystem(); 124 } 125 126 @AfterClass 127 public static void tearDownAfterClass() throws Exception { 128 TEST_UTIL.shutdownMiniCluster(); 129 } 130 131 @Before 132 public void setUp() throws Exception { 133 walQueue = new PriorityBlockingQueue<>(); 134 pathWatcher = new PathWatcher(); 135 final WALFactory wals = new WALFactory(CONF, tn.getMethodName()); 136 wals.getWALProvider().addWALActionsListener(pathWatcher); 137 log = wals.getWAL(info); 138 } 139 140 @After 141 public void tearDown() throws Exception { 142 log.close(); 143 } 144 145 // Try out different combinations of row count and KeyValue count 146 @Test 147 public void testDifferentCounts() throws Exception { 148 int[] NB_ROWS = { 1500, 60000 }; 149 int[] NB_KVS = { 1, 100 }; 150 // whether compression is used 151 Boolean[] BOOL_VALS = { false, true }; 152 // long lastPosition = 0; 153 for (int nbRows : NB_ROWS) { 154 for (int walEditKVs : NB_KVS) { 155 for (boolean isCompressionEnabled : BOOL_VALS) { 156 TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION, 157 isCompressionEnabled); 158 mvcc.advanceTo(1); 159 160 for (int i = 0; i < nbRows; i++) { 161 appendToLogAndSync(walEditKVs); 162 } 163 164 log.rollWriter(); 165 166 try (WALEntryStream entryStream = 167 new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) { 168 int i = 0; 169 while (entryStream.hasNext()) { 170 assertNotNull(entryStream.next()); 171 i++; 172 } 173 assertEquals(nbRows, i); 174 175 // should've read all entries 176 assertFalse(entryStream.hasNext()); 177 } 178 // reset everything for next loop 179 log.close(); 180 setUp(); 181 } 182 } 183 } 184 } 185 186 /** 187 * Tests basic reading of log appends 188 */ 189 @Test 190 public void testAppendsWithRolls() throws Exception { 191 appendToLogAndSync(); 192 long oldPos; 193 try (WALEntryStream entryStream = 194 new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) { 195 // There's one edit in the log, read it. Reading past it needs to throw exception 196 assertTrue(entryStream.hasNext()); 197 WAL.Entry entry = entryStream.peek(); 198 assertSame(entry, entryStream.next()); 199 assertNotNull(entry); 200 assertFalse(entryStream.hasNext()); 201 assertNull(entryStream.peek()); 202 assertNull(entryStream.next()); 203 oldPos = entryStream.getPosition(); 204 } 205 206 appendToLogAndSync(); 207 208 try (WALEntryStream entryStream = new WALEntryStream(walQueue, CONF, oldPos, 209 log, null, new MetricsSource("1"))) { 210 // Read the newly added entry, make sure we made progress 211 WAL.Entry entry = entryStream.next(); 212 assertNotEquals(oldPos, entryStream.getPosition()); 213 assertNotNull(entry); 214 oldPos = entryStream.getPosition(); 215 } 216 217 // We rolled but we still should see the end of the first log and get that item 218 appendToLogAndSync(); 219 log.rollWriter(); 220 appendToLogAndSync(); 221 222 try (WALEntryStream entryStream = new WALEntryStream(walQueue, CONF, oldPos, 223 log, null, new MetricsSource("1"))) { 224 WAL.Entry entry = entryStream.next(); 225 assertNotEquals(oldPos, entryStream.getPosition()); 226 assertNotNull(entry); 227 228 // next item should come from the new log 229 entry = entryStream.next(); 230 assertNotEquals(oldPos, entryStream.getPosition()); 231 assertNotNull(entry); 232 233 // no more entries to read 234 assertFalse(entryStream.hasNext()); 235 oldPos = entryStream.getPosition(); 236 } 237 } 238 239 /** 240 * Tests that if after a stream is opened, more entries come in and then the log is rolled, we 241 * don't mistakenly dequeue the current log thinking we're done with it 242 */ 243 @Test 244 public void testLogrollWhileStreaming() throws Exception { 245 appendToLog("1"); 246 appendToLog("2");// 2 247 try (WALEntryStream entryStream = 248 new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) { 249 assertEquals("1", getRow(entryStream.next())); 250 251 appendToLog("3"); // 3 - comes in after reader opened 252 log.rollWriter(); // log roll happening while we're reading 253 appendToLog("4"); // 4 - this append is in the rolled log 254 255 assertEquals("2", getRow(entryStream.next())); 256 assertEquals(2, walQueue.size()); // we should not have dequeued yet since there's still an 257 // entry in first log 258 assertEquals("3", getRow(entryStream.next())); // if implemented improperly, this would be 4 259 // and 3 would be skipped 260 assertEquals("4", getRow(entryStream.next())); // 4 261 assertEquals(1, walQueue.size()); // now we've dequeued and moved on to next log properly 262 assertFalse(entryStream.hasNext()); 263 } 264 } 265 266 /** 267 * Tests that if writes come in while we have a stream open, we shouldn't miss them 268 */ 269 @Test 270 public void testNewEntriesWhileStreaming() throws Exception { 271 appendToLog("1"); 272 try (WALEntryStream entryStream = 273 new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) { 274 entryStream.next(); // we've hit the end of the stream at this point 275 276 // some new entries come in while we're streaming 277 appendToLog("2"); 278 appendToLog("3"); 279 280 // don't see them 281 assertFalse(entryStream.hasNext()); 282 283 // But we do if we reset 284 entryStream.reset(); 285 assertEquals("2", getRow(entryStream.next())); 286 assertEquals("3", getRow(entryStream.next())); 287 assertFalse(entryStream.hasNext()); 288 } 289 } 290 291 @Test 292 public void testResumeStreamingFromPosition() throws Exception { 293 long lastPosition = 0; 294 appendToLog("1"); 295 try (WALEntryStream entryStream = 296 new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) { 297 entryStream.next(); // we've hit the end of the stream at this point 298 appendToLog("2"); 299 appendToLog("3"); 300 lastPosition = entryStream.getPosition(); 301 } 302 // next stream should picks up where we left off 303 try (WALEntryStream entryStream = 304 new WALEntryStream(walQueue, CONF, lastPosition, log, null, new MetricsSource("1"))) { 305 assertEquals("2", getRow(entryStream.next())); 306 assertEquals("3", getRow(entryStream.next())); 307 assertFalse(entryStream.hasNext()); // done 308 assertEquals(1, walQueue.size()); 309 } 310 } 311 312 /** 313 * Tests that if we stop before hitting the end of a stream, we can continue where we left off 314 * using the last position 315 */ 316 @Test 317 public void testPosition() throws Exception { 318 long lastPosition = 0; 319 appendEntriesToLogAndSync(3); 320 // read only one element 321 try (WALEntryStream entryStream = new WALEntryStream(walQueue, CONF, lastPosition, 322 log, null, new MetricsSource("1"))) { 323 entryStream.next(); 324 lastPosition = entryStream.getPosition(); 325 } 326 // there should still be two more entries from where we left off 327 try (WALEntryStream entryStream = 328 new WALEntryStream(walQueue, CONF, lastPosition, log, null, new MetricsSource("1"))) { 329 assertNotNull(entryStream.next()); 330 assertNotNull(entryStream.next()); 331 assertFalse(entryStream.hasNext()); 332 } 333 } 334 335 336 @Test 337 public void testEmptyStream() throws Exception { 338 try (WALEntryStream entryStream = 339 new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) { 340 assertFalse(entryStream.hasNext()); 341 } 342 } 343 344 @Test 345 public void testWALKeySerialization() throws Exception { 346 Map<String, byte[]> attributes = new HashMap<String, byte[]>(); 347 attributes.put("foo", Bytes.toBytes("foo-value")); 348 attributes.put("bar", Bytes.toBytes("bar-value")); 349 WALKeyImpl key = new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 350 System.currentTimeMillis(), new ArrayList<UUID>(), 0L, 0L, 351 mvcc, scopes, attributes); 352 Assert.assertEquals(attributes, key.getExtendedAttributes()); 353 354 WALProtos.WALKey.Builder builder = key.getBuilder(WALCellCodec.getNoneCompressor()); 355 WALProtos.WALKey serializedKey = builder.build(); 356 357 WALKeyImpl deserializedKey = new WALKeyImpl(); 358 deserializedKey.readFieldsFromPb(serializedKey, WALCellCodec.getNoneUncompressor()); 359 360 //equals() only checks region name, sequence id and write time 361 Assert.assertEquals(key, deserializedKey); 362 //can't use Map.equals() because byte arrays use reference equality 363 Assert.assertEquals(key.getExtendedAttributes().keySet(), 364 deserializedKey.getExtendedAttributes().keySet()); 365 for (Map.Entry<String, byte[]> entry : deserializedKey.getExtendedAttributes().entrySet()){ 366 Assert.assertArrayEquals(key.getExtendedAttribute(entry.getKey()), entry.getValue()); 367 } 368 Assert.assertEquals(key.getReplicationScopes(), deserializedKey.getReplicationScopes()); 369 } 370 371 private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) { 372 ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); 373 when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); 374 when(mockSourceManager.getTotalBufferLimit()).thenReturn( 375 (long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); 376 Server mockServer = Mockito.mock(Server.class); 377 ReplicationSource source = Mockito.mock(ReplicationSource.class); 378 when(source.getSourceManager()).thenReturn(mockSourceManager); 379 when(source.getSourceMetrics()).thenReturn(new MetricsSource("1")); 380 when(source.getWALFileLengthProvider()).thenReturn(log); 381 when(source.getServer()).thenReturn(mockServer); 382 when(source.isRecovered()).thenReturn(recovered); 383 MetricsReplicationGlobalSourceSource globalMetrics = Mockito.mock( 384 MetricsReplicationGlobalSourceSource.class); 385 when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics); 386 return source; 387 } 388 389 private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) { 390 ReplicationSource source = mockReplicationSource(recovered, conf); 391 when(source.isPeerEnabled()).thenReturn(true); 392 ReplicationSourceWALReader reader = 393 new ReplicationSourceWALReader(fs, conf, walQueue, 0, getDummyFilter(), source); 394 reader.start(); 395 return reader; 396 } 397 398 @Test 399 public void testReplicationSourceWALReader() throws Exception { 400 appendEntriesToLogAndSync(3); 401 // get ending position 402 long position; 403 try (WALEntryStream entryStream = 404 new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) { 405 entryStream.next(); 406 entryStream.next(); 407 entryStream.next(); 408 position = entryStream.getPosition(); 409 } 410 411 // start up a reader 412 Path walPath = walQueue.peek(); 413 ReplicationSourceWALReader reader = createReader(false, CONF); 414 WALEntryBatch entryBatch = reader.take(); 415 416 // should've batched up our entries 417 assertNotNull(entryBatch); 418 assertEquals(3, entryBatch.getWalEntries().size()); 419 assertEquals(position, entryBatch.getLastWalPosition()); 420 assertEquals(walPath, entryBatch.getLastWalPath()); 421 assertEquals(3, entryBatch.getNbRowKeys()); 422 423 appendToLog("foo"); 424 entryBatch = reader.take(); 425 assertEquals(1, entryBatch.getNbEntries()); 426 assertEquals("foo", getRow(entryBatch.getWalEntries().get(0))); 427 } 428 429 @Test 430 public void testReplicationSourceWALReaderRecovered() throws Exception { 431 appendEntriesToLogAndSync(10); 432 Path walPath = walQueue.peek(); 433 log.rollWriter(); 434 appendEntriesToLogAndSync(5); 435 log.shutdown(); 436 437 Configuration conf = new Configuration(CONF); 438 conf.setInt("replication.source.nb.capacity", 10); 439 440 ReplicationSourceWALReader reader = createReader(true, conf); 441 442 WALEntryBatch batch = reader.take(); 443 assertEquals(walPath, batch.getLastWalPath()); 444 assertEquals(10, batch.getNbEntries()); 445 assertFalse(batch.isEndOfFile()); 446 447 batch = reader.take(); 448 assertEquals(walPath, batch.getLastWalPath()); 449 assertEquals(0, batch.getNbEntries()); 450 assertTrue(batch.isEndOfFile()); 451 452 walPath = walQueue.peek(); 453 batch = reader.take(); 454 assertEquals(walPath, batch.getLastWalPath()); 455 assertEquals(5, batch.getNbEntries()); 456 assertTrue(batch.isEndOfFile()); 457 458 assertSame(WALEntryBatch.NO_MORE_DATA, reader.take()); 459 } 460 461 // Testcase for HBASE-20206 462 @Test 463 public void testReplicationSourceWALReaderWrongPosition() throws Exception { 464 appendEntriesToLogAndSync(1); 465 Path walPath = walQueue.peek(); 466 log.rollWriter(); 467 appendEntriesToLogAndSync(20); 468 TEST_UTIL.waitFor(5000, new ExplainingPredicate<Exception>() { 469 470 @Override 471 public boolean evaluate() throws Exception { 472 return fs.getFileStatus(walPath).getLen() > 0; 473 } 474 475 @Override 476 public String explainFailure() throws Exception { 477 return walPath + " has not been closed yet"; 478 } 479 480 }); 481 long walLength = fs.getFileStatus(walPath).getLen(); 482 483 ReplicationSourceWALReader reader = createReader(false, CONF); 484 485 WALEntryBatch entryBatch = reader.take(); 486 assertEquals(walPath, entryBatch.getLastWalPath()); 487 assertTrue("Position " + entryBatch.getLastWalPosition() + " is out of range, file length is " + 488 walLength, entryBatch.getLastWalPosition() <= walLength); 489 assertEquals(1, entryBatch.getNbEntries()); 490 assertTrue(entryBatch.isEndOfFile()); 491 492 Path walPath2 = walQueue.peek(); 493 entryBatch = reader.take(); 494 assertEquals(walPath2, entryBatch.getLastWalPath()); 495 assertEquals(20, entryBatch.getNbEntries()); 496 assertFalse(entryBatch.isEndOfFile()); 497 498 log.rollWriter(); 499 appendEntriesToLogAndSync(10); 500 entryBatch = reader.take(); 501 assertEquals(walPath2, entryBatch.getLastWalPath()); 502 assertEquals(0, entryBatch.getNbEntries()); 503 assertTrue(entryBatch.isEndOfFile()); 504 505 Path walPath3 = walQueue.peek(); 506 entryBatch = reader.take(); 507 assertEquals(walPath3, entryBatch.getLastWalPath()); 508 assertEquals(10, entryBatch.getNbEntries()); 509 assertFalse(entryBatch.isEndOfFile()); 510 } 511 512 @Test 513 public void testReplicationSourceWALReaderDisabled() 514 throws IOException, InterruptedException, ExecutionException { 515 appendEntriesToLogAndSync(3); 516 // get ending position 517 long position; 518 try (WALEntryStream entryStream = 519 new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) { 520 entryStream.next(); 521 entryStream.next(); 522 entryStream.next(); 523 position = entryStream.getPosition(); 524 } 525 526 // start up a reader 527 Path walPath = walQueue.peek(); 528 ReplicationSource source = mockReplicationSource(false, CONF); 529 AtomicInteger invokeCount = new AtomicInteger(0); 530 AtomicBoolean enabled = new AtomicBoolean(false); 531 when(source.isPeerEnabled()).then(i -> { 532 invokeCount.incrementAndGet(); 533 return enabled.get(); 534 }); 535 536 ReplicationSourceWALReader reader = 537 new ReplicationSourceWALReader(fs, CONF, walQueue, 0, getDummyFilter(), source); 538 reader.start(); 539 Future<WALEntryBatch> future = ForkJoinPool.commonPool().submit(() -> { 540 return reader.take(); 541 }); 542 // make sure that the isPeerEnabled has been called several times 543 TEST_UTIL.waitFor(30000, () -> invokeCount.get() >= 5); 544 // confirm that we can read nothing if the peer is disabled 545 assertFalse(future.isDone()); 546 // then enable the peer, we should get the batch 547 enabled.set(true); 548 WALEntryBatch entryBatch = future.get(); 549 550 // should've batched up our entries 551 assertNotNull(entryBatch); 552 assertEquals(3, entryBatch.getWalEntries().size()); 553 assertEquals(position, entryBatch.getLastWalPosition()); 554 assertEquals(walPath, entryBatch.getLastWalPath()); 555 assertEquals(3, entryBatch.getNbRowKeys()); 556 } 557 558 private String getRow(WAL.Entry entry) { 559 Cell cell = entry.getEdit().getCells().get(0); 560 return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); 561 } 562 563 private void appendToLog(String key) throws IOException { 564 final long txid = log.appendData(info, 565 new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), 566 mvcc, scopes), getWALEdit(key)); 567 log.sync(txid); 568 } 569 570 private void appendEntriesToLogAndSync(int count) throws IOException { 571 long txid = -1L; 572 for (int i = 0; i < count; i++) { 573 txid = appendToLog(1); 574 } 575 log.sync(txid); 576 } 577 578 private void appendToLogAndSync() throws IOException { 579 appendToLogAndSync(1); 580 } 581 582 private void appendToLogAndSync(int count) throws IOException { 583 long txid = appendToLog(count); 584 log.sync(txid); 585 } 586 587 private long appendToLog(int count) throws IOException { 588 return log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 589 System.currentTimeMillis(), mvcc, scopes), getWALEdits(count)); 590 } 591 592 private WALEdit getWALEdits(int count) { 593 WALEdit edit = new WALEdit(); 594 for (int i = 0; i < count; i++) { 595 edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier, 596 System.currentTimeMillis(), qualifier)); 597 } 598 return edit; 599 } 600 601 private WALEdit getWALEdit(String row) { 602 WALEdit edit = new WALEdit(); 603 edit.add( 604 new KeyValue(Bytes.toBytes(row), family, qualifier, System.currentTimeMillis(), qualifier)); 605 return edit; 606 } 607 608 private WALEntryFilter getDummyFilter() { 609 return new WALEntryFilter() { 610 611 @Override 612 public Entry filter(Entry entry) { 613 return entry; 614 } 615 }; 616 } 617 618 class PathWatcher implements WALActionsListener { 619 620 Path currentPath; 621 622 @Override 623 public void preLogRoll(Path oldPath, Path newPath) throws IOException { 624 walQueue.add(newPath); 625 currentPath = newPath; 626 } 627 } 628 629 @Test 630 public void testReadBeyondCommittedLength() throws IOException, InterruptedException { 631 appendToLog("1"); 632 appendToLog("2"); 633 long size = log.getLogFileSizeIfBeingWritten(walQueue.peek()).getAsLong(); 634 AtomicLong fileLength = new AtomicLong(size - 1); 635 try (WALEntryStream entryStream = new WALEntryStream(walQueue, CONF, 0, 636 p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"))) { 637 assertTrue(entryStream.hasNext()); 638 assertNotNull(entryStream.next()); 639 // can not get log 2 640 assertFalse(entryStream.hasNext()); 641 Thread.sleep(1000); 642 entryStream.reset(); 643 // still can not get log 2 644 assertFalse(entryStream.hasNext()); 645 646 // can get log 2 now 647 fileLength.set(size); 648 entryStream.reset(); 649 assertTrue(entryStream.hasNext()); 650 assertNotNull(entryStream.next()); 651 652 assertFalse(entryStream.hasNext()); 653 } 654 } 655}