001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNull;
025import static org.junit.Assert.assertSame;
026import static org.junit.Assert.assertTrue;
027import static org.mockito.Mockito.doNothing;
028import static org.mockito.Mockito.mock;
029import static org.mockito.Mockito.when;
030
031import java.io.IOException;
032import java.util.ArrayList;
033import java.util.Arrays;
034import java.util.HashMap;
035import java.util.Map;
036import java.util.NavigableMap;
037import java.util.OptionalLong;
038import java.util.TreeMap;
039import java.util.UUID;
040import java.util.concurrent.ExecutionException;
041import java.util.concurrent.ForkJoinPool;
042import java.util.concurrent.Future;
043import java.util.concurrent.PriorityBlockingQueue;
044import java.util.concurrent.atomic.AtomicBoolean;
045import java.util.concurrent.atomic.AtomicInteger;
046import java.util.concurrent.atomic.AtomicLong;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.fs.FSDataOutputStream;
049import org.apache.hadoop.fs.Path;
050import org.apache.hadoop.hbase.Cell;
051import org.apache.hadoop.hbase.HConstants;
052import org.apache.hadoop.hbase.KeyValue;
053import org.apache.hadoop.hbase.Server;
054import org.apache.hadoop.hbase.TableName;
055import org.apache.hadoop.hbase.Waiter;
056import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
057import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
058import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
059import org.apache.hadoop.hbase.replication.WALEntryFilter;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
062import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
063import org.apache.hadoop.hbase.wal.WAL;
064import org.apache.hadoop.hbase.wal.WAL.Entry;
065import org.apache.hadoop.hbase.wal.WALEdit;
066import org.apache.hadoop.hbase.wal.WALFactory;
067import org.apache.hadoop.hbase.wal.WALKeyImpl;
068import org.apache.hadoop.hbase.wal.WALProvider;
069import org.junit.Assert;
070import org.junit.Before;
071import org.junit.Test;
072import org.mockito.Mockito;
073
074import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
075
076public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
077
078  @Before
079  public void setUp() throws Exception {
080    initWAL();
081  }
082
083  /**
084   * Tests basic reading of log appends
085   */
086  @Test
087  public void testAppendsWithRolls() throws Exception {
088    appendToLogAndSync();
089    long oldPos;
090    try (WALEntryStream entryStream =
091      new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
092      // There's one edit in the log, read it. Reading past it needs to throw exception
093      assertTrue(entryStream.hasNext());
094      WAL.Entry entry = entryStream.peek();
095      assertSame(entry, entryStream.next());
096      assertNotNull(entry);
097      assertFalse(entryStream.hasNext());
098      assertNull(entryStream.peek());
099      assertNull(entryStream.next());
100      oldPos = entryStream.getPosition();
101    }
102
103    appendToLogAndSync();
104
105    try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, oldPos, log,
106      null, new MetricsSource("1"), fakeWalGroupId)) {
107      // Read the newly added entry, make sure we made progress
108      WAL.Entry entry = entryStream.next();
109      assertNotEquals(oldPos, entryStream.getPosition());
110      assertNotNull(entry);
111      oldPos = entryStream.getPosition();
112    }
113
114    // We rolled but we still should see the end of the first log and get that item
115    appendToLogAndSync();
116    log.rollWriter();
117    appendToLogAndSync();
118
119    try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, oldPos, log,
120      null, new MetricsSource("1"), fakeWalGroupId)) {
121      WAL.Entry entry = entryStream.next();
122      assertNotEquals(oldPos, entryStream.getPosition());
123      assertNotNull(entry);
124
125      // next item should come from the new log
126      entry = entryStream.next();
127      assertNotEquals(oldPos, entryStream.getPosition());
128      assertNotNull(entry);
129
130      // no more entries to read
131      assertFalse(entryStream.hasNext());
132      oldPos = entryStream.getPosition();
133    }
134  }
135
136  /**
137   * Tests that if after a stream is opened, more entries come in and then the log is rolled, we
138   * don't mistakenly dequeue the current log thinking we're done with it
139   */
140  @Test
141  public void testLogrollWhileStreaming() throws Exception {
142    appendToLog("1");
143    appendToLog("2");// 2
144    try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, 0, log, null,
145      new MetricsSource("1"), fakeWalGroupId)) {
146      assertEquals("1", getRow(entryStream.next()));
147
148      appendToLog("3"); // 3 - comes in after reader opened
149      log.rollWriter(); // log roll happening while we're reading
150      appendToLog("4"); // 4 - this append is in the rolled log
151
152      assertEquals("2", getRow(entryStream.next()));
153      assertEquals(2, getQueue().size()); // we should not have dequeued yet since there's still an
154      // entry in first log
155      assertEquals("3", getRow(entryStream.next())); // if implemented improperly, this would be 4
156                                                     // and 3 would be skipped
157      assertEquals("4", getRow(entryStream.next())); // 4
158      assertEquals(1, getQueue().size()); // now we've dequeued and moved on to next log properly
159      assertFalse(entryStream.hasNext());
160    }
161  }
162
163  /**
164   * Tests that if writes come in while we have a stream open, we shouldn't miss them
165   */
166
167  @Test
168  public void testNewEntriesWhileStreaming() throws Exception {
169    appendToLog("1");
170    try (WALEntryStream entryStream =
171      new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
172      entryStream.next(); // we've hit the end of the stream at this point
173
174      // some new entries come in while we're streaming
175      appendToLog("2");
176      appendToLog("3");
177
178      // don't see them
179      assertFalse(entryStream.hasNext());
180
181      // But we do if we reset
182      entryStream.reset();
183      assertEquals("2", getRow(entryStream.next()));
184      assertEquals("3", getRow(entryStream.next()));
185      assertFalse(entryStream.hasNext());
186    }
187  }
188
189  @Test
190  public void testResumeStreamingFromPosition() throws Exception {
191    long lastPosition = 0;
192    appendToLog("1");
193    try (WALEntryStream entryStream =
194      new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
195      entryStream.next(); // we've hit the end of the stream at this point
196      appendToLog("2");
197      appendToLog("3");
198      lastPosition = entryStream.getPosition();
199    }
200    // next stream should picks up where we left off
201    try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, lastPosition, log, null,
202      new MetricsSource("1"), fakeWalGroupId)) {
203      assertEquals("2", getRow(entryStream.next()));
204      assertEquals("3", getRow(entryStream.next()));
205      assertFalse(entryStream.hasNext()); // done
206      assertEquals(1, getQueue().size());
207    }
208  }
209
210  /**
211   * Tests that if we stop before hitting the end of a stream, we can continue where we left off
212   * using the last position
213   */
214
215  @Test
216  public void testPosition() throws Exception {
217    long lastPosition = 0;
218    appendEntriesToLogAndSync(3);
219    // read only one element
220    try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, lastPosition, log, null,
221      new MetricsSource("1"), fakeWalGroupId)) {
222      entryStream.next();
223      lastPosition = entryStream.getPosition();
224    }
225    // there should still be two more entries from where we left off
226    try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, lastPosition, log, null,
227      new MetricsSource("1"), fakeWalGroupId)) {
228      assertNotNull(entryStream.next());
229      assertNotNull(entryStream.next());
230      assertFalse(entryStream.hasNext());
231    }
232  }
233
234  @Test
235  public void testEmptyStream() throws Exception {
236    try (WALEntryStream entryStream =
237      new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
238      assertFalse(entryStream.hasNext());
239    }
240  }
241
242  @Test
243  public void testWALKeySerialization() throws Exception {
244    Map<String, byte[]> attributes = new HashMap<String, byte[]>();
245    attributes.put("foo", Bytes.toBytes("foo-value"));
246    attributes.put("bar", Bytes.toBytes("bar-value"));
247    WALKeyImpl key =
248      new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, EnvironmentEdgeManager.currentTime(),
249        new ArrayList<UUID>(), 0L, 0L, mvcc, scopes, attributes);
250    Assert.assertEquals(attributes, key.getExtendedAttributes());
251
252    WALProtos.WALKey.Builder builder = key.getBuilder(WALCellCodec.getNoneCompressor());
253    WALProtos.WALKey serializedKey = builder.build();
254
255    WALKeyImpl deserializedKey = new WALKeyImpl();
256    deserializedKey.readFieldsFromPb(serializedKey, WALCellCodec.getNoneUncompressor());
257
258    // equals() only checks region name, sequence id and write time
259    Assert.assertEquals(key, deserializedKey);
260    // can't use Map.equals() because byte arrays use reference equality
261    Assert.assertEquals(key.getExtendedAttributes().keySet(),
262      deserializedKey.getExtendedAttributes().keySet());
263    for (Map.Entry<String, byte[]> entry : deserializedKey.getExtendedAttributes().entrySet()) {
264      Assert.assertArrayEquals(key.getExtendedAttribute(entry.getKey()), entry.getValue());
265    }
266    Assert.assertEquals(key.getReplicationScopes(), deserializedKey.getReplicationScopes());
267  }
268
269  private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) {
270    ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
271    when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
272    when(mockSourceManager.getTotalBufferLimit())
273      .thenReturn((long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
274    Server mockServer = Mockito.mock(Server.class);
275    ReplicationSource source = Mockito.mock(ReplicationSource.class);
276    when(source.getSourceManager()).thenReturn(mockSourceManager);
277    when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));
278    when(source.getWALFileLengthProvider()).thenReturn(log);
279    when(source.getServer()).thenReturn(mockServer);
280    when(source.isRecovered()).thenReturn(recovered);
281    MetricsReplicationGlobalSourceSource globalMetrics =
282      Mockito.mock(MetricsReplicationGlobalSourceSource.class);
283    when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics);
284    return source;
285  }
286
287  private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) {
288    ReplicationSource source = mockReplicationSource(recovered, conf);
289    when(source.isPeerEnabled()).thenReturn(true);
290    ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, logQueue, 0,
291      getDummyFilter(), source, fakeWalGroupId);
292    reader.start();
293    return reader;
294  }
295
296  private ReplicationSourceWALReader createReaderWithBadReplicationFilter(int numFailures,
297    Configuration conf) {
298    ReplicationSource source = mockReplicationSource(false, conf);
299    when(source.isPeerEnabled()).thenReturn(true);
300    ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, logQueue, 0,
301      getIntermittentFailingFilter(numFailures), source, fakeWalGroupId);
302    reader.start();
303    return reader;
304  }
305
306  @Test
307  public void testReplicationSourceWALReader() throws Exception {
308    appendEntriesToLogAndSync(3);
309    // get ending position
310    long position;
311    try (WALEntryStream entryStream =
312      new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
313      entryStream.next();
314      entryStream.next();
315      entryStream.next();
316      position = entryStream.getPosition();
317    }
318
319    // start up a reader
320    Path walPath = getQueue().peek();
321    ReplicationSourceWALReader reader = createReader(false, CONF);
322    WALEntryBatch entryBatch = reader.take();
323
324    // should've batched up our entries
325    assertNotNull(entryBatch);
326    assertEquals(3, entryBatch.getWalEntries().size());
327    assertEquals(position, entryBatch.getLastWalPosition());
328    assertEquals(walPath, entryBatch.getLastWalPath());
329    assertEquals(3, entryBatch.getNbRowKeys());
330
331    appendToLog("foo");
332    entryBatch = reader.take();
333    assertEquals(1, entryBatch.getNbEntries());
334    assertEquals("foo", getRow(entryBatch.getWalEntries().get(0)));
335  }
336
337  @Test
338  public void testReplicationSourceWALReaderWithFailingFilter() throws Exception {
339    appendEntriesToLogAndSync(3);
340    // get ending position
341    long position;
342    try (WALEntryStream entryStream =
343      new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
344      entryStream.next();
345      entryStream.next();
346      entryStream.next();
347      position = entryStream.getPosition();
348    }
349
350    // start up a reader
351    Path walPath = getQueue().peek();
352    int numFailuresInFilter = 5;
353    ReplicationSourceWALReader reader =
354      createReaderWithBadReplicationFilter(numFailuresInFilter, CONF);
355    WALEntryBatch entryBatch = reader.take();
356    assertEquals(numFailuresInFilter, FailingWALEntryFilter.numFailures());
357
358    // should've batched up our entries
359    assertNotNull(entryBatch);
360    assertEquals(3, entryBatch.getWalEntries().size());
361    assertEquals(position, entryBatch.getLastWalPosition());
362    assertEquals(walPath, entryBatch.getLastWalPath());
363    assertEquals(3, entryBatch.getNbRowKeys());
364  }
365
366  @Test
367  public void testReplicationSourceWALReaderRecovered() throws Exception {
368    appendEntriesToLogAndSync(10);
369    Path walPath = getQueue().peek();
370    log.rollWriter();
371    appendEntriesToLogAndSync(5);
372    log.shutdown();
373
374    Configuration conf = new Configuration(CONF);
375    conf.setInt("replication.source.nb.capacity", 10);
376
377    ReplicationSourceWALReader reader = createReader(true, conf);
378
379    WALEntryBatch batch = reader.take();
380    assertEquals(walPath, batch.getLastWalPath());
381    assertEquals(10, batch.getNbEntries());
382    assertFalse(batch.isEndOfFile());
383
384    batch = reader.take();
385    assertEquals(walPath, batch.getLastWalPath());
386    assertEquals(0, batch.getNbEntries());
387    assertTrue(batch.isEndOfFile());
388
389    walPath = getQueue().peek();
390    batch = reader.take();
391    assertEquals(walPath, batch.getLastWalPath());
392    assertEquals(5, batch.getNbEntries());
393    assertTrue(batch.isEndOfFile());
394
395    assertSame(WALEntryBatch.NO_MORE_DATA, reader.take());
396  }
397
398  // Testcase for HBASE-20206
399  @Test
400  public void testReplicationSourceWALReaderWrongPosition() throws Exception {
401    appendEntriesToLogAndSync(1);
402    Path walPath = getQueue().peek();
403    log.rollWriter();
404    appendEntriesToLogAndSync(20);
405    TEST_UTIL.waitFor(5000, new ExplainingPredicate<Exception>() {
406
407      @Override
408      public boolean evaluate() throws Exception {
409        return fs.getFileStatus(walPath).getLen() > 0
410          && ((AbstractFSWAL<?>) log).getInflightWALCloseCount() == 0;
411      }
412
413      @Override
414      public String explainFailure() throws Exception {
415        return walPath + " has not been closed yet";
416      }
417
418    });
419
420    ReplicationSourceWALReader reader = createReader(false, CONF);
421
422    WALEntryBatch entryBatch = reader.take();
423    assertEquals(walPath, entryBatch.getLastWalPath());
424
425    long walLength = fs.getFileStatus(walPath).getLen();
426    assertTrue("Position " + entryBatch.getLastWalPosition() + " is out of range, file length is "
427      + walLength, entryBatch.getLastWalPosition() <= walLength);
428    assertEquals(1, entryBatch.getNbEntries());
429    assertTrue(entryBatch.isEndOfFile());
430
431    Path walPath2 = getQueue().peek();
432    entryBatch = reader.take();
433    assertEquals(walPath2, entryBatch.getLastWalPath());
434    assertEquals(20, entryBatch.getNbEntries());
435    assertFalse(entryBatch.isEndOfFile());
436
437    log.rollWriter();
438    appendEntriesToLogAndSync(10);
439    entryBatch = reader.take();
440    assertEquals(walPath2, entryBatch.getLastWalPath());
441    assertEquals(0, entryBatch.getNbEntries());
442    assertTrue(entryBatch.isEndOfFile());
443
444    Path walPath3 = getQueue().peek();
445    entryBatch = reader.take();
446    assertEquals(walPath3, entryBatch.getLastWalPath());
447    assertEquals(10, entryBatch.getNbEntries());
448    assertFalse(entryBatch.isEndOfFile());
449  }
450
451  @Test
452  public void testReplicationSourceWALReaderDisabled()
453    throws IOException, InterruptedException, ExecutionException {
454    appendEntriesToLogAndSync(3);
455    // get ending position
456    long position;
457    try (WALEntryStream entryStream =
458      new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
459      entryStream.next();
460      entryStream.next();
461      entryStream.next();
462      position = entryStream.getPosition();
463    }
464
465    // start up a reader
466    Path walPath = getQueue().peek();
467    ReplicationSource source = mockReplicationSource(false, CONF);
468    AtomicInteger invokeCount = new AtomicInteger(0);
469    AtomicBoolean enabled = new AtomicBoolean(false);
470    when(source.isPeerEnabled()).then(i -> {
471      invokeCount.incrementAndGet();
472      return enabled.get();
473    });
474
475    ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, CONF, logQueue, 0,
476      getDummyFilter(), source, fakeWalGroupId);
477    reader.start();
478    Future<WALEntryBatch> future = ForkJoinPool.commonPool().submit(() -> {
479      return reader.take();
480    });
481    // make sure that the isPeerEnabled has been called several times
482    TEST_UTIL.waitFor(30000, () -> invokeCount.get() >= 5);
483    // confirm that we can read nothing if the peer is disabled
484    assertFalse(future.isDone());
485    // then enable the peer, we should get the batch
486    enabled.set(true);
487    WALEntryBatch entryBatch = future.get();
488
489    // should've batched up our entries
490    assertNotNull(entryBatch);
491    assertEquals(3, entryBatch.getWalEntries().size());
492    assertEquals(position, entryBatch.getLastWalPosition());
493    assertEquals(walPath, entryBatch.getLastWalPath());
494    assertEquals(3, entryBatch.getNbRowKeys());
495  }
496
497  private String getRow(WAL.Entry entry) {
498    Cell cell = entry.getEdit().getCells().get(0);
499    return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
500  }
501
502  private void appendToLog(String key) throws IOException {
503    final long txid = log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
504      EnvironmentEdgeManager.currentTime(), mvcc, scopes), getWALEdit(key));
505    log.sync(txid);
506  }
507
508  private void appendEntriesToLogAndSync(int count) throws IOException {
509    long txid = -1L;
510    for (int i = 0; i < count; i++) {
511      txid = appendToLog(1);
512    }
513    log.sync(txid);
514  }
515
516  private WALEdit getWALEdit(String row) {
517    WALEdit edit = new WALEdit();
518    edit.add(new KeyValue(Bytes.toBytes(row), family, qualifier,
519      EnvironmentEdgeManager.currentTime(), qualifier));
520    return edit;
521  }
522
523  private WALEntryFilter getDummyFilter() {
524    return new WALEntryFilter() {
525
526      @Override
527      public Entry filter(Entry entry) {
528        return entry;
529      }
530    };
531  }
532
533  private WALEntryFilter getIntermittentFailingFilter(int numFailuresInFilter) {
534    return new FailingWALEntryFilter(numFailuresInFilter);
535  }
536
537  public static class FailingWALEntryFilter implements WALEntryFilter {
538    private int numFailures = 0;
539    private static int countFailures = 0;
540
541    public FailingWALEntryFilter(int numFailuresInFilter) {
542      numFailures = numFailuresInFilter;
543    }
544
545    @Override
546    public Entry filter(Entry entry) {
547      if (countFailures == numFailures) {
548        return entry;
549      }
550      countFailures = countFailures + 1;
551      throw new WALEntryFilterRetryableException("failing filter");
552    }
553
554    public static int numFailures() {
555      return countFailures;
556    }
557  }
558
559  @Test
560  public void testReadBeyondCommittedLength() throws IOException, InterruptedException {
561    appendToLog("1");
562    appendToLog("2");
563    long size = log.getLogFileSizeIfBeingWritten(getQueue().peek()).getAsLong();
564    AtomicLong fileLength = new AtomicLong(size - 1);
565    try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, 0,
566      p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"), fakeWalGroupId)) {
567      assertTrue(entryStream.hasNext());
568      assertNotNull(entryStream.next());
569      // can not get log 2
570      assertFalse(entryStream.hasNext());
571      Thread.sleep(1000);
572      entryStream.reset();
573      // still can not get log 2
574      assertFalse(entryStream.hasNext());
575
576      // can get log 2 now
577      fileLength.set(size);
578      entryStream.reset();
579      assertTrue(entryStream.hasNext());
580      assertNotNull(entryStream.next());
581
582      assertFalse(entryStream.hasNext());
583    }
584  }
585
586  /*
587   * Test removal of 0 length log from logQueue if the source is a recovered source and size of
588   * logQueue is only 1.
589   */
590  @Test
591  public void testEOFExceptionForRecoveredQueue() throws Exception {
592    // Create a 0 length log.
593    Path emptyLog = new Path("emptyLog");
594    FSDataOutputStream fsdos = fs.create(emptyLog);
595    fsdos.close();
596    assertEquals(0, fs.getFileStatus(emptyLog).getLen());
597
598    Configuration conf = new Configuration(CONF);
599    // Override the max retries multiplier to fail fast.
600    conf.setInt("replication.source.maxretriesmultiplier", 1);
601    conf.setBoolean("replication.source.eof.autorecovery", true);
602    conf.setInt("replication.source.nb.batches", 10);
603    // Create a reader thread with source as recovered source.
604    ReplicationSource source = mockReplicationSource(true, conf);
605    when(source.isPeerEnabled()).thenReturn(true);
606
607    MetricsSource metrics = mock(MetricsSource.class);
608    doNothing().when(metrics).incrSizeOfLogQueue();
609    doNothing().when(metrics).decrSizeOfLogQueue();
610    ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, metrics, source);
611    localLogQueue.enqueueLog(emptyLog, fakeWalGroupId);
612    ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, localLogQueue, 0,
613      getDummyFilter(), source, fakeWalGroupId);
614    reader.start();
615    reader.join();
616    // ReplicationSourceWALReaderThread#handleEofException method will
617    // remove empty log from logQueue.
618    assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId));
619  }
620
621  @Test
622  public void testEOFExceptionForRecoveredQueueWithMultipleLogs() throws Exception {
623    Configuration conf = new Configuration(CONF);
624    MetricsSource metrics = mock(MetricsSource.class);
625    ReplicationSource source = mockReplicationSource(true, conf);
626    ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, metrics, source);
627    // Create a 0 length log.
628    Path emptyLog = new Path(fs.getHomeDirectory(), "log.2");
629    FSDataOutputStream fsdos = fs.create(emptyLog);
630    fsdos.close();
631    assertEquals(0, fs.getFileStatus(emptyLog).getLen());
632    localLogQueue.enqueueLog(emptyLog, fakeWalGroupId);
633
634    final Path log1 = new Path(fs.getHomeDirectory(), "log.1");
635    WALProvider.Writer writer1 = WALFactory.createWALWriter(fs, log1, TEST_UTIL.getConfiguration());
636    appendEntries(writer1, 3);
637    localLogQueue.enqueueLog(log1, fakeWalGroupId);
638
639    ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
640    // Make it look like the source is from recovered source.
641    when(mockSourceManager.getOldSources())
642      .thenReturn(new ArrayList<>(Arrays.asList((ReplicationSourceInterface) source)));
643    when(source.isPeerEnabled()).thenReturn(true);
644    when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
645    // Override the max retries multiplier to fail fast.
646    conf.setInt("replication.source.maxretriesmultiplier", 1);
647    conf.setBoolean("replication.source.eof.autorecovery", true);
648    conf.setInt("replication.source.nb.batches", 10);
649    // Create a reader thread.
650    ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, localLogQueue, 0,
651      getDummyFilter(), source, fakeWalGroupId);
652    assertEquals("Initial log queue size is not correct", 2,
653      localLogQueue.getQueueSize(fakeWalGroupId));
654    reader.start();
655    reader.join();
656
657    // remove empty log from logQueue.
658    assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId));
659    assertEquals("Log queue should be empty", 0, localLogQueue.getQueueSize(fakeWalGroupId));
660  }
661
662  private PriorityBlockingQueue<Path> getQueue() {
663    return logQueue.getQueue(fakeWalGroupId);
664  }
665
666  private void appendEntries(WALProvider.Writer writer, int numEntries) throws IOException {
667    for (int i = 0; i < numEntries; i++) {
668      byte[] b = Bytes.toBytes(Integer.toString(i));
669      KeyValue kv = new KeyValue(b, b, b);
670      WALEdit edit = new WALEdit();
671      edit.add(kv);
672      WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0, HConstants.DEFAULT_CLUSTER_ID);
673      NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
674      scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL);
675      writer.append(new WAL.Entry(key, edit));
676      writer.sync(false);
677    }
678    writer.close();
679  }
680
681  /**
682   * Tests size of log queue is incremented and decremented properly.
683   */
684  @Test
685  public void testSizeOfLogQueue() throws Exception {
686    // There should be always 1 log which is current wal.
687    assertEquals(1, logQueue.getMetrics().getSizeOfLogQueue());
688    appendToLogAndSync();
689
690    log.rollWriter();
691    // After rolling there will be 2 wals in the queue
692    assertEquals(2, logQueue.getMetrics().getSizeOfLogQueue());
693
694    try (WALEntryStream entryStream =
695      new WALEntryStream(logQueue, CONF, 0, log, null, logQueue.getMetrics(), fakeWalGroupId)) {
696      // There's one edit in the log, read it.
697      assertTrue(entryStream.hasNext());
698      WAL.Entry entry = entryStream.next();
699      assertNotNull(entry);
700      assertFalse(entryStream.hasNext());
701    }
702    // After removing one wal, size of log queue will be 1 again.
703    assertEquals(1, logQueue.getMetrics().getSizeOfLogQueue());
704  }
705
706  /**
707   * Tests that wals are closed cleanly and we read the trailer when we remove wal from
708   * WALEntryStream.
709   */
710  @Test
711  public void testCleanClosedWALs() throws Exception {
712    try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, 0, log, null,
713      logQueue.getMetrics(), fakeWalGroupId)) {
714      assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs());
715      appendToLogAndSync();
716      assertNotNull(entryStream.next());
717      log.rollWriter();
718      appendToLogAndSync();
719      assertNotNull(entryStream.next());
720      assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs());
721    }
722  }
723
724  /**
725   * Tests that we handle EOFException properly if the wal has moved to oldWALs directory.
726   * @throws Exception exception
727   */
728  @Test
729  public void testEOFExceptionInOldWALsDirectory() throws Exception {
730    assertEquals(1, logQueue.getQueueSize(fakeWalGroupId));
731    AbstractFSWAL abstractWAL = (AbstractFSWAL) log;
732    Path emptyLogFile = abstractWAL.getCurrentFileName();
733    log.rollWriter(true);
734
735    // AsyncFSWAl and FSHLog both moves the log from WALs to oldWALs directory asynchronously.
736    // Wait for in flight wal close count to become 0. This makes sure that empty wal is moved to
737    // oldWALs directory.
738    Waiter.waitFor(CONF, 5000,
739      (Waiter.Predicate<Exception>) () -> abstractWAL.getInflightWALCloseCount() == 0);
740    // There will 2 logs in the queue.
741    assertEquals(2, logQueue.getQueueSize(fakeWalGroupId));
742
743    // Get the archived dir path for the first wal.
744    Path archivePath = AbstractFSWALProvider.findArchivedLog(emptyLogFile, CONF);
745    // Make sure that the wal path is not the same as archived Dir path.
746    assertNotNull(archivePath);
747    assertTrue(fs.exists(archivePath));
748    fs.truncate(archivePath, 0);
749    // make sure the size of the wal file is 0.
750    assertEquals(0, fs.getFileStatus(archivePath).getLen());
751
752    ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
753    ReplicationSource source = Mockito.mock(ReplicationSource.class);
754    when(source.isPeerEnabled()).thenReturn(true);
755    when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
756
757    Configuration localConf = new Configuration(CONF);
758    localConf.setInt("replication.source.maxretriesmultiplier", 1);
759    localConf.setBoolean("replication.source.eof.autorecovery", true);
760    // Start the reader thread.
761    createReader(false, localConf);
762    // Wait for the replication queue size to be 1. This means that we have handled
763    // 0 length wal from oldWALs directory.
764    Waiter.waitFor(localConf, 10000,
765      (Waiter.Predicate<Exception>) () -> logQueue.getQueueSize(fakeWalGroupId) == 1);
766  }
767}