001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotEquals; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertNull; 025import static org.junit.Assert.assertSame; 026import static org.junit.Assert.assertTrue; 027import static org.mockito.Mockito.doNothing; 028import static org.mockito.Mockito.mock; 029import static org.mockito.Mockito.when; 030 031import java.io.IOException; 032import java.util.ArrayList; 033import java.util.Arrays; 034import java.util.HashMap; 035import java.util.Map; 036import java.util.NavigableMap; 037import java.util.OptionalLong; 038import java.util.TreeMap; 039import java.util.UUID; 040import java.util.concurrent.ExecutionException; 041import java.util.concurrent.ForkJoinPool; 042import java.util.concurrent.Future; 043import java.util.concurrent.PriorityBlockingQueue; 044import java.util.concurrent.atomic.AtomicBoolean; 045import java.util.concurrent.atomic.AtomicInteger; 046import java.util.concurrent.atomic.AtomicLong; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.fs.FSDataOutputStream; 049import org.apache.hadoop.fs.Path; 050import org.apache.hadoop.hbase.Cell; 051import org.apache.hadoop.hbase.HConstants; 052import org.apache.hadoop.hbase.KeyValue; 053import org.apache.hadoop.hbase.Server; 054import org.apache.hadoop.hbase.TableName; 055import org.apache.hadoop.hbase.Waiter; 056import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 057import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 058import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; 059import org.apache.hadoop.hbase.replication.WALEntryFilter; 060import org.apache.hadoop.hbase.util.Bytes; 061import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 062import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 063import org.apache.hadoop.hbase.wal.WAL; 064import org.apache.hadoop.hbase.wal.WAL.Entry; 065import org.apache.hadoop.hbase.wal.WALEdit; 066import org.apache.hadoop.hbase.wal.WALFactory; 067import org.apache.hadoop.hbase.wal.WALKeyImpl; 068import org.apache.hadoop.hbase.wal.WALProvider; 069import org.junit.Assert; 070import org.junit.Before; 071import org.junit.Test; 072import org.mockito.Mockito; 073 074import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 075 076public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase { 077 078 @Before 079 public void setUp() throws Exception { 080 initWAL(); 081 } 082 083 /** 084 * Tests basic reading of log appends 085 */ 086 @Test 087 public void testAppendsWithRolls() throws Exception { 088 appendToLogAndSync(); 089 long oldPos; 090 try (WALEntryStream entryStream = 091 new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) { 092 // There's one edit in the log, read it. Reading past it needs to throw exception 093 assertTrue(entryStream.hasNext()); 094 WAL.Entry entry = entryStream.peek(); 095 assertSame(entry, entryStream.next()); 096 assertNotNull(entry); 097 assertFalse(entryStream.hasNext()); 098 assertNull(entryStream.peek()); 099 assertNull(entryStream.next()); 100 oldPos = entryStream.getPosition(); 101 } 102 103 appendToLogAndSync(); 104 105 try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, oldPos, log, 106 null, new MetricsSource("1"), fakeWalGroupId)) { 107 // Read the newly added entry, make sure we made progress 108 WAL.Entry entry = entryStream.next(); 109 assertNotEquals(oldPos, entryStream.getPosition()); 110 assertNotNull(entry); 111 oldPos = entryStream.getPosition(); 112 } 113 114 // We rolled but we still should see the end of the first log and get that item 115 appendToLogAndSync(); 116 log.rollWriter(); 117 appendToLogAndSync(); 118 119 try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, oldPos, log, 120 null, new MetricsSource("1"), fakeWalGroupId)) { 121 WAL.Entry entry = entryStream.next(); 122 assertNotEquals(oldPos, entryStream.getPosition()); 123 assertNotNull(entry); 124 125 // next item should come from the new log 126 entry = entryStream.next(); 127 assertNotEquals(oldPos, entryStream.getPosition()); 128 assertNotNull(entry); 129 130 // no more entries to read 131 assertFalse(entryStream.hasNext()); 132 oldPos = entryStream.getPosition(); 133 } 134 } 135 136 /** 137 * Tests that if after a stream is opened, more entries come in and then the log is rolled, we 138 * don't mistakenly dequeue the current log thinking we're done with it 139 */ 140 @Test 141 public void testLogrollWhileStreaming() throws Exception { 142 appendToLog("1"); 143 appendToLog("2");// 2 144 try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, 0, log, null, 145 new MetricsSource("1"), fakeWalGroupId)) { 146 assertEquals("1", getRow(entryStream.next())); 147 148 appendToLog("3"); // 3 - comes in after reader opened 149 log.rollWriter(); // log roll happening while we're reading 150 appendToLog("4"); // 4 - this append is in the rolled log 151 152 assertEquals("2", getRow(entryStream.next())); 153 assertEquals(2, getQueue().size()); // we should not have dequeued yet since there's still an 154 // entry in first log 155 assertEquals("3", getRow(entryStream.next())); // if implemented improperly, this would be 4 156 // and 3 would be skipped 157 assertEquals("4", getRow(entryStream.next())); // 4 158 assertEquals(1, getQueue().size()); // now we've dequeued and moved on to next log properly 159 assertFalse(entryStream.hasNext()); 160 } 161 } 162 163 /** 164 * Tests that if writes come in while we have a stream open, we shouldn't miss them 165 */ 166 167 @Test 168 public void testNewEntriesWhileStreaming() throws Exception { 169 appendToLog("1"); 170 try (WALEntryStream entryStream = 171 new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) { 172 entryStream.next(); // we've hit the end of the stream at this point 173 174 // some new entries come in while we're streaming 175 appendToLog("2"); 176 appendToLog("3"); 177 178 // don't see them 179 assertFalse(entryStream.hasNext()); 180 181 // But we do if we reset 182 entryStream.reset(); 183 assertEquals("2", getRow(entryStream.next())); 184 assertEquals("3", getRow(entryStream.next())); 185 assertFalse(entryStream.hasNext()); 186 } 187 } 188 189 @Test 190 public void testResumeStreamingFromPosition() throws Exception { 191 long lastPosition = 0; 192 appendToLog("1"); 193 try (WALEntryStream entryStream = 194 new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) { 195 entryStream.next(); // we've hit the end of the stream at this point 196 appendToLog("2"); 197 appendToLog("3"); 198 lastPosition = entryStream.getPosition(); 199 } 200 // next stream should picks up where we left off 201 try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, lastPosition, log, null, 202 new MetricsSource("1"), fakeWalGroupId)) { 203 assertEquals("2", getRow(entryStream.next())); 204 assertEquals("3", getRow(entryStream.next())); 205 assertFalse(entryStream.hasNext()); // done 206 assertEquals(1, getQueue().size()); 207 } 208 } 209 210 /** 211 * Tests that if we stop before hitting the end of a stream, we can continue where we left off 212 * using the last position 213 */ 214 215 @Test 216 public void testPosition() throws Exception { 217 long lastPosition = 0; 218 appendEntriesToLogAndSync(3); 219 // read only one element 220 try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, lastPosition, log, null, 221 new MetricsSource("1"), fakeWalGroupId)) { 222 entryStream.next(); 223 lastPosition = entryStream.getPosition(); 224 } 225 // there should still be two more entries from where we left off 226 try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, lastPosition, log, null, 227 new MetricsSource("1"), fakeWalGroupId)) { 228 assertNotNull(entryStream.next()); 229 assertNotNull(entryStream.next()); 230 assertFalse(entryStream.hasNext()); 231 } 232 } 233 234 @Test 235 public void testEmptyStream() throws Exception { 236 try (WALEntryStream entryStream = 237 new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) { 238 assertFalse(entryStream.hasNext()); 239 } 240 } 241 242 @Test 243 public void testWALKeySerialization() throws Exception { 244 Map<String, byte[]> attributes = new HashMap<String, byte[]>(); 245 attributes.put("foo", Bytes.toBytes("foo-value")); 246 attributes.put("bar", Bytes.toBytes("bar-value")); 247 WALKeyImpl key = 248 new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, EnvironmentEdgeManager.currentTime(), 249 new ArrayList<UUID>(), 0L, 0L, mvcc, scopes, attributes); 250 Assert.assertEquals(attributes, key.getExtendedAttributes()); 251 252 WALProtos.WALKey.Builder builder = key.getBuilder(WALCellCodec.getNoneCompressor()); 253 WALProtos.WALKey serializedKey = builder.build(); 254 255 WALKeyImpl deserializedKey = new WALKeyImpl(); 256 deserializedKey.readFieldsFromPb(serializedKey, WALCellCodec.getNoneUncompressor()); 257 258 // equals() only checks region name, sequence id and write time 259 Assert.assertEquals(key, deserializedKey); 260 // can't use Map.equals() because byte arrays use reference equality 261 Assert.assertEquals(key.getExtendedAttributes().keySet(), 262 deserializedKey.getExtendedAttributes().keySet()); 263 for (Map.Entry<String, byte[]> entry : deserializedKey.getExtendedAttributes().entrySet()) { 264 Assert.assertArrayEquals(key.getExtendedAttribute(entry.getKey()), entry.getValue()); 265 } 266 Assert.assertEquals(key.getReplicationScopes(), deserializedKey.getReplicationScopes()); 267 } 268 269 private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) { 270 ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); 271 when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); 272 when(mockSourceManager.getTotalBufferLimit()) 273 .thenReturn((long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); 274 Server mockServer = Mockito.mock(Server.class); 275 ReplicationSource source = Mockito.mock(ReplicationSource.class); 276 when(source.getSourceManager()).thenReturn(mockSourceManager); 277 when(source.getSourceMetrics()).thenReturn(new MetricsSource("1")); 278 when(source.getWALFileLengthProvider()).thenReturn(log); 279 when(source.getServer()).thenReturn(mockServer); 280 when(source.isRecovered()).thenReturn(recovered); 281 MetricsReplicationGlobalSourceSource globalMetrics = 282 Mockito.mock(MetricsReplicationGlobalSourceSource.class); 283 when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics); 284 return source; 285 } 286 287 private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) { 288 ReplicationSource source = mockReplicationSource(recovered, conf); 289 when(source.isPeerEnabled()).thenReturn(true); 290 ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, logQueue, 0, 291 getDummyFilter(), source, fakeWalGroupId); 292 reader.start(); 293 return reader; 294 } 295 296 private ReplicationSourceWALReader createReaderWithBadReplicationFilter(int numFailures, 297 Configuration conf) { 298 ReplicationSource source = mockReplicationSource(false, conf); 299 when(source.isPeerEnabled()).thenReturn(true); 300 ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, logQueue, 0, 301 getIntermittentFailingFilter(numFailures), source, fakeWalGroupId); 302 reader.start(); 303 return reader; 304 } 305 306 @Test 307 public void testReplicationSourceWALReader() throws Exception { 308 appendEntriesToLogAndSync(3); 309 // get ending position 310 long position; 311 try (WALEntryStream entryStream = 312 new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) { 313 entryStream.next(); 314 entryStream.next(); 315 entryStream.next(); 316 position = entryStream.getPosition(); 317 } 318 319 // start up a reader 320 Path walPath = getQueue().peek(); 321 ReplicationSourceWALReader reader = createReader(false, CONF); 322 WALEntryBatch entryBatch = reader.take(); 323 324 // should've batched up our entries 325 assertNotNull(entryBatch); 326 assertEquals(3, entryBatch.getWalEntries().size()); 327 assertEquals(position, entryBatch.getLastWalPosition()); 328 assertEquals(walPath, entryBatch.getLastWalPath()); 329 assertEquals(3, entryBatch.getNbRowKeys()); 330 331 appendToLog("foo"); 332 entryBatch = reader.take(); 333 assertEquals(1, entryBatch.getNbEntries()); 334 assertEquals("foo", getRow(entryBatch.getWalEntries().get(0))); 335 } 336 337 @Test 338 public void testReplicationSourceWALReaderWithFailingFilter() throws Exception { 339 appendEntriesToLogAndSync(3); 340 // get ending position 341 long position; 342 try (WALEntryStream entryStream = 343 new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) { 344 entryStream.next(); 345 entryStream.next(); 346 entryStream.next(); 347 position = entryStream.getPosition(); 348 } 349 350 // start up a reader 351 Path walPath = getQueue().peek(); 352 int numFailuresInFilter = 5; 353 ReplicationSourceWALReader reader = 354 createReaderWithBadReplicationFilter(numFailuresInFilter, CONF); 355 WALEntryBatch entryBatch = reader.take(); 356 assertEquals(numFailuresInFilter, FailingWALEntryFilter.numFailures()); 357 358 // should've batched up our entries 359 assertNotNull(entryBatch); 360 assertEquals(3, entryBatch.getWalEntries().size()); 361 assertEquals(position, entryBatch.getLastWalPosition()); 362 assertEquals(walPath, entryBatch.getLastWalPath()); 363 assertEquals(3, entryBatch.getNbRowKeys()); 364 } 365 366 @Test 367 public void testReplicationSourceWALReaderRecovered() throws Exception { 368 appendEntriesToLogAndSync(10); 369 Path walPath = getQueue().peek(); 370 log.rollWriter(); 371 appendEntriesToLogAndSync(5); 372 log.shutdown(); 373 374 Configuration conf = new Configuration(CONF); 375 conf.setInt("replication.source.nb.capacity", 10); 376 377 ReplicationSourceWALReader reader = createReader(true, conf); 378 379 WALEntryBatch batch = reader.take(); 380 assertEquals(walPath, batch.getLastWalPath()); 381 assertEquals(10, batch.getNbEntries()); 382 assertFalse(batch.isEndOfFile()); 383 384 batch = reader.take(); 385 assertEquals(walPath, batch.getLastWalPath()); 386 assertEquals(0, batch.getNbEntries()); 387 assertTrue(batch.isEndOfFile()); 388 389 walPath = getQueue().peek(); 390 batch = reader.take(); 391 assertEquals(walPath, batch.getLastWalPath()); 392 assertEquals(5, batch.getNbEntries()); 393 assertTrue(batch.isEndOfFile()); 394 395 assertSame(WALEntryBatch.NO_MORE_DATA, reader.take()); 396 } 397 398 // Testcase for HBASE-20206 399 @Test 400 public void testReplicationSourceWALReaderWrongPosition() throws Exception { 401 appendEntriesToLogAndSync(1); 402 Path walPath = getQueue().peek(); 403 log.rollWriter(); 404 appendEntriesToLogAndSync(20); 405 TEST_UTIL.waitFor(5000, new ExplainingPredicate<Exception>() { 406 407 @Override 408 public boolean evaluate() throws Exception { 409 return fs.getFileStatus(walPath).getLen() > 0 410 && ((AbstractFSWAL<?>) log).getInflightWALCloseCount() == 0; 411 } 412 413 @Override 414 public String explainFailure() throws Exception { 415 return walPath + " has not been closed yet"; 416 } 417 418 }); 419 420 ReplicationSourceWALReader reader = createReader(false, CONF); 421 422 WALEntryBatch entryBatch = reader.take(); 423 assertEquals(walPath, entryBatch.getLastWalPath()); 424 425 long walLength = fs.getFileStatus(walPath).getLen(); 426 assertTrue("Position " + entryBatch.getLastWalPosition() + " is out of range, file length is " 427 + walLength, entryBatch.getLastWalPosition() <= walLength); 428 assertEquals(1, entryBatch.getNbEntries()); 429 assertTrue(entryBatch.isEndOfFile()); 430 431 Path walPath2 = getQueue().peek(); 432 entryBatch = reader.take(); 433 assertEquals(walPath2, entryBatch.getLastWalPath()); 434 assertEquals(20, entryBatch.getNbEntries()); 435 assertFalse(entryBatch.isEndOfFile()); 436 437 log.rollWriter(); 438 appendEntriesToLogAndSync(10); 439 entryBatch = reader.take(); 440 assertEquals(walPath2, entryBatch.getLastWalPath()); 441 assertEquals(0, entryBatch.getNbEntries()); 442 assertTrue(entryBatch.isEndOfFile()); 443 444 Path walPath3 = getQueue().peek(); 445 entryBatch = reader.take(); 446 assertEquals(walPath3, entryBatch.getLastWalPath()); 447 assertEquals(10, entryBatch.getNbEntries()); 448 assertFalse(entryBatch.isEndOfFile()); 449 } 450 451 @Test 452 public void testReplicationSourceWALReaderDisabled() 453 throws IOException, InterruptedException, ExecutionException { 454 appendEntriesToLogAndSync(3); 455 // get ending position 456 long position; 457 try (WALEntryStream entryStream = 458 new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) { 459 entryStream.next(); 460 entryStream.next(); 461 entryStream.next(); 462 position = entryStream.getPosition(); 463 } 464 465 // start up a reader 466 Path walPath = getQueue().peek(); 467 ReplicationSource source = mockReplicationSource(false, CONF); 468 AtomicInteger invokeCount = new AtomicInteger(0); 469 AtomicBoolean enabled = new AtomicBoolean(false); 470 when(source.isPeerEnabled()).then(i -> { 471 invokeCount.incrementAndGet(); 472 return enabled.get(); 473 }); 474 475 ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, CONF, logQueue, 0, 476 getDummyFilter(), source, fakeWalGroupId); 477 reader.start(); 478 Future<WALEntryBatch> future = ForkJoinPool.commonPool().submit(() -> { 479 return reader.take(); 480 }); 481 // make sure that the isPeerEnabled has been called several times 482 TEST_UTIL.waitFor(30000, () -> invokeCount.get() >= 5); 483 // confirm that we can read nothing if the peer is disabled 484 assertFalse(future.isDone()); 485 // then enable the peer, we should get the batch 486 enabled.set(true); 487 WALEntryBatch entryBatch = future.get(); 488 489 // should've batched up our entries 490 assertNotNull(entryBatch); 491 assertEquals(3, entryBatch.getWalEntries().size()); 492 assertEquals(position, entryBatch.getLastWalPosition()); 493 assertEquals(walPath, entryBatch.getLastWalPath()); 494 assertEquals(3, entryBatch.getNbRowKeys()); 495 } 496 497 private String getRow(WAL.Entry entry) { 498 Cell cell = entry.getEdit().getCells().get(0); 499 return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); 500 } 501 502 private void appendToLog(String key) throws IOException { 503 final long txid = log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 504 EnvironmentEdgeManager.currentTime(), mvcc, scopes), getWALEdit(key)); 505 log.sync(txid); 506 } 507 508 private void appendEntriesToLogAndSync(int count) throws IOException { 509 long txid = -1L; 510 for (int i = 0; i < count; i++) { 511 txid = appendToLog(1); 512 } 513 log.sync(txid); 514 } 515 516 private WALEdit getWALEdit(String row) { 517 WALEdit edit = new WALEdit(); 518 edit.add(new KeyValue(Bytes.toBytes(row), family, qualifier, 519 EnvironmentEdgeManager.currentTime(), qualifier)); 520 return edit; 521 } 522 523 private WALEntryFilter getDummyFilter() { 524 return new WALEntryFilter() { 525 526 @Override 527 public Entry filter(Entry entry) { 528 return entry; 529 } 530 }; 531 } 532 533 private WALEntryFilter getIntermittentFailingFilter(int numFailuresInFilter) { 534 return new FailingWALEntryFilter(numFailuresInFilter); 535 } 536 537 public static class FailingWALEntryFilter implements WALEntryFilter { 538 private int numFailures = 0; 539 private static int countFailures = 0; 540 541 public FailingWALEntryFilter(int numFailuresInFilter) { 542 numFailures = numFailuresInFilter; 543 } 544 545 @Override 546 public Entry filter(Entry entry) { 547 if (countFailures == numFailures) { 548 return entry; 549 } 550 countFailures = countFailures + 1; 551 throw new WALEntryFilterRetryableException("failing filter"); 552 } 553 554 public static int numFailures() { 555 return countFailures; 556 } 557 } 558 559 @Test 560 public void testReadBeyondCommittedLength() throws IOException, InterruptedException { 561 appendToLog("1"); 562 appendToLog("2"); 563 long size = log.getLogFileSizeIfBeingWritten(getQueue().peek()).getAsLong(); 564 AtomicLong fileLength = new AtomicLong(size - 1); 565 try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, 0, 566 p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"), fakeWalGroupId)) { 567 assertTrue(entryStream.hasNext()); 568 assertNotNull(entryStream.next()); 569 // can not get log 2 570 assertFalse(entryStream.hasNext()); 571 Thread.sleep(1000); 572 entryStream.reset(); 573 // still can not get log 2 574 assertFalse(entryStream.hasNext()); 575 576 // can get log 2 now 577 fileLength.set(size); 578 entryStream.reset(); 579 assertTrue(entryStream.hasNext()); 580 assertNotNull(entryStream.next()); 581 582 assertFalse(entryStream.hasNext()); 583 } 584 } 585 586 /* 587 * Test removal of 0 length log from logQueue if the source is a recovered source and size of 588 * logQueue is only 1. 589 */ 590 @Test 591 public void testEOFExceptionForRecoveredQueue() throws Exception { 592 // Create a 0 length log. 593 Path emptyLog = new Path("emptyLog"); 594 FSDataOutputStream fsdos = fs.create(emptyLog); 595 fsdos.close(); 596 assertEquals(0, fs.getFileStatus(emptyLog).getLen()); 597 598 Configuration conf = new Configuration(CONF); 599 // Override the max retries multiplier to fail fast. 600 conf.setInt("replication.source.maxretriesmultiplier", 1); 601 conf.setBoolean("replication.source.eof.autorecovery", true); 602 conf.setInt("replication.source.nb.batches", 10); 603 // Create a reader thread with source as recovered source. 604 ReplicationSource source = mockReplicationSource(true, conf); 605 when(source.isPeerEnabled()).thenReturn(true); 606 607 MetricsSource metrics = mock(MetricsSource.class); 608 doNothing().when(metrics).incrSizeOfLogQueue(); 609 doNothing().when(metrics).decrSizeOfLogQueue(); 610 ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, metrics, source); 611 localLogQueue.enqueueLog(emptyLog, fakeWalGroupId); 612 ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, localLogQueue, 0, 613 getDummyFilter(), source, fakeWalGroupId); 614 reader.run(); 615 // ReplicationSourceWALReaderThread#handleEofException method will 616 // remove empty log from logQueue. 617 assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId)); 618 } 619 620 @Test 621 public void testEOFExceptionForRecoveredQueueWithMultipleLogs() throws Exception { 622 Configuration conf = new Configuration(CONF); 623 MetricsSource metrics = mock(MetricsSource.class); 624 ReplicationSource source = mockReplicationSource(true, conf); 625 ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, metrics, source); 626 // Create a 0 length log. 627 Path emptyLog = new Path(fs.getHomeDirectory(), "log.2"); 628 FSDataOutputStream fsdos = fs.create(emptyLog); 629 fsdos.close(); 630 assertEquals(0, fs.getFileStatus(emptyLog).getLen()); 631 localLogQueue.enqueueLog(emptyLog, fakeWalGroupId); 632 633 final Path log1 = new Path(fs.getHomeDirectory(), "log.1"); 634 WALProvider.Writer writer1 = WALFactory.createWALWriter(fs, log1, TEST_UTIL.getConfiguration()); 635 appendEntries(writer1, 3); 636 localLogQueue.enqueueLog(log1, fakeWalGroupId); 637 638 ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class); 639 // Make it look like the source is from recovered source. 640 when(mockSourceManager.getOldSources()) 641 .thenReturn(new ArrayList<>(Arrays.asList((ReplicationSourceInterface) source))); 642 when(source.isPeerEnabled()).thenReturn(true); 643 when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); 644 // Override the max retries multiplier to fail fast. 645 conf.setInt("replication.source.maxretriesmultiplier", 1); 646 conf.setBoolean("replication.source.eof.autorecovery", true); 647 conf.setInt("replication.source.nb.batches", 10); 648 // Create a reader thread. 649 ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, localLogQueue, 0, 650 getDummyFilter(), source, fakeWalGroupId); 651 assertEquals("Initial log queue size is not correct", 2, 652 localLogQueue.getQueueSize(fakeWalGroupId)); 653 reader.run(); 654 655 // remove empty log from logQueue. 656 assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId)); 657 assertEquals("Log queue should be empty", 0, localLogQueue.getQueueSize(fakeWalGroupId)); 658 } 659 660 private PriorityBlockingQueue<Path> getQueue() { 661 return logQueue.getQueue(fakeWalGroupId); 662 } 663 664 private void appendEntries(WALProvider.Writer writer, int numEntries) throws IOException { 665 for (int i = 0; i < numEntries; i++) { 666 byte[] b = Bytes.toBytes(Integer.toString(i)); 667 KeyValue kv = new KeyValue(b, b, b); 668 WALEdit edit = new WALEdit(); 669 edit.add(kv); 670 WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0, HConstants.DEFAULT_CLUSTER_ID); 671 NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); 672 scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL); 673 writer.append(new WAL.Entry(key, edit)); 674 writer.sync(false); 675 } 676 writer.close(); 677 } 678 679 /** 680 * Tests size of log queue is incremented and decremented properly. 681 */ 682 @Test 683 public void testSizeOfLogQueue() throws Exception { 684 // There should be always 1 log which is current wal. 685 assertEquals(1, logQueue.getMetrics().getSizeOfLogQueue()); 686 appendToLogAndSync(); 687 688 log.rollWriter(); 689 // After rolling there will be 2 wals in the queue 690 assertEquals(2, logQueue.getMetrics().getSizeOfLogQueue()); 691 692 try (WALEntryStream entryStream = 693 new WALEntryStream(logQueue, CONF, 0, log, null, logQueue.getMetrics(), fakeWalGroupId)) { 694 // There's one edit in the log, read it. 695 assertTrue(entryStream.hasNext()); 696 WAL.Entry entry = entryStream.next(); 697 assertNotNull(entry); 698 assertFalse(entryStream.hasNext()); 699 } 700 // After removing one wal, size of log queue will be 1 again. 701 assertEquals(1, logQueue.getMetrics().getSizeOfLogQueue()); 702 } 703 704 /** 705 * Tests that wals are closed cleanly and we read the trailer when we remove wal from 706 * WALEntryStream. 707 */ 708 @Test 709 public void testCleanClosedWALs() throws Exception { 710 try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, 0, log, null, 711 logQueue.getMetrics(), fakeWalGroupId)) { 712 assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs()); 713 appendToLogAndSync(); 714 assertNotNull(entryStream.next()); 715 log.rollWriter(); 716 appendToLogAndSync(); 717 assertNotNull(entryStream.next()); 718 assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs()); 719 } 720 } 721 722 /** 723 * Tests that we handle EOFException properly if the wal has moved to oldWALs directory. 724 * @throws Exception exception 725 */ 726 @Test 727 public void testEOFExceptionInOldWALsDirectory() throws Exception { 728 assertEquals(1, logQueue.getQueueSize(fakeWalGroupId)); 729 AbstractFSWAL abstractWAL = (AbstractFSWAL) log; 730 Path emptyLogFile = abstractWAL.getCurrentFileName(); 731 log.rollWriter(true); 732 733 // AsyncFSWAl and FSHLog both moves the log from WALs to oldWALs directory asynchronously. 734 // Wait for in flight wal close count to become 0. This makes sure that empty wal is moved to 735 // oldWALs directory. 736 Waiter.waitFor(CONF, 5000, 737 (Waiter.Predicate<Exception>) () -> abstractWAL.getInflightWALCloseCount() == 0); 738 // There will 2 logs in the queue. 739 assertEquals(2, logQueue.getQueueSize(fakeWalGroupId)); 740 741 // Get the archived dir path for the first wal. 742 Path archivePath = AbstractFSWALProvider.findArchivedLog(emptyLogFile, CONF); 743 // Make sure that the wal path is not the same as archived Dir path. 744 assertNotNull(archivePath); 745 assertTrue(fs.exists(archivePath)); 746 fs.truncate(archivePath, 0); 747 // make sure the size of the wal file is 0. 748 assertEquals(0, fs.getFileStatus(archivePath).getLen()); 749 750 ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); 751 ReplicationSource source = Mockito.mock(ReplicationSource.class); 752 when(source.isPeerEnabled()).thenReturn(true); 753 when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); 754 755 Configuration localConf = new Configuration(CONF); 756 localConf.setInt("replication.source.maxretriesmultiplier", 1); 757 localConf.setBoolean("replication.source.eof.autorecovery", true); 758 // Start the reader thread. 759 createReader(false, localConf); 760 // Wait for the replication queue size to be 1. This means that we have handled 761 // 0 length wal from oldWALs directory. 762 Waiter.waitFor(localConf, 10000, 763 (Waiter.Predicate<Exception>) () -> logQueue.getQueueSize(fakeWalGroupId) == 1); 764 } 765}