001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNull;
025import static org.junit.Assert.assertSame;
026import static org.junit.Assert.assertTrue;
027import static org.mockito.Mockito.when;
028
029import java.io.IOException;
030import java.util.ArrayList;
031import java.util.HashMap;
032import java.util.Map;
033import java.util.NavigableMap;
034import java.util.OptionalLong;
035import java.util.TreeMap;
036import java.util.UUID;
037import java.util.concurrent.ExecutionException;
038import java.util.concurrent.ForkJoinPool;
039import java.util.concurrent.Future;
040import java.util.concurrent.PriorityBlockingQueue;
041import java.util.concurrent.atomic.AtomicBoolean;
042import java.util.concurrent.atomic.AtomicInteger;
043import java.util.concurrent.atomic.AtomicLong;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.fs.FileSystem;
046import org.apache.hadoop.fs.Path;
047import org.apache.hadoop.hbase.Cell;
048import org.apache.hadoop.hbase.HBaseClassTestRule;
049import org.apache.hadoop.hbase.HBaseTestingUtility;
050import org.apache.hadoop.hbase.HConstants;
051import org.apache.hadoop.hbase.KeyValue;
052import org.apache.hadoop.hbase.Server;
053import org.apache.hadoop.hbase.TableName;
054import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
055import org.apache.hadoop.hbase.client.RegionInfo;
056import org.apache.hadoop.hbase.client.RegionInfoBuilder;
057import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
058import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
059import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
060import org.apache.hadoop.hbase.replication.WALEntryFilter;
061import org.apache.hadoop.hbase.testclassification.LargeTests;
062import org.apache.hadoop.hbase.testclassification.ReplicationTests;
063import org.apache.hadoop.hbase.util.Bytes;
064import org.apache.hadoop.hbase.wal.WAL;
065import org.apache.hadoop.hbase.wal.WAL.Entry;
066import org.apache.hadoop.hbase.wal.WALEdit;
067import org.apache.hadoop.hbase.wal.WALFactory;
068import org.apache.hadoop.hbase.wal.WALKeyImpl;
069import org.apache.hadoop.hdfs.MiniDFSCluster;
070import org.junit.After;
071import org.junit.AfterClass;
072import org.junit.Assert;
073import org.junit.Before;
074import org.junit.BeforeClass;
075import org.junit.ClassRule;
076import org.junit.Rule;
077import org.junit.Test;
078import org.junit.experimental.categories.Category;
079import org.junit.rules.TestName;
080import org.mockito.Mockito;
081import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
082
083
084@Category({ ReplicationTests.class, LargeTests.class })
085public class TestWALEntryStream {
086
087  @ClassRule
088  public static final HBaseClassTestRule CLASS_RULE =
089      HBaseClassTestRule.forClass(TestWALEntryStream.class);
090
091  private static HBaseTestingUtility TEST_UTIL;
092  private static Configuration CONF;
093  private static FileSystem fs;
094  private static MiniDFSCluster cluster;
095  private static final TableName tableName = TableName.valueOf("tablename");
096  private static final byte[] family = Bytes.toBytes("column");
097  private static final byte[] qualifier = Bytes.toBytes("qualifier");
098  private static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName)
099      .setStartKey(HConstants.EMPTY_START_ROW).setEndKey(HConstants.LAST_ROW).build();
100  private static final NavigableMap<byte[], Integer> scopes = getScopes();
101
102  private static NavigableMap<byte[], Integer> getScopes() {
103    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
104    scopes.put(family, 1);
105    return scopes;
106  }
107
108  private WAL log;
109  PriorityBlockingQueue<Path> walQueue;
110  private PathWatcher pathWatcher;
111
112  @Rule
113  public TestName tn = new TestName();
114  private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
115
116  @BeforeClass
117  public static void setUpBeforeClass() throws Exception {
118    TEST_UTIL = new HBaseTestingUtility();
119    CONF = TEST_UTIL.getConfiguration();
120    TEST_UTIL.startMiniDFSCluster(3);
121
122    cluster = TEST_UTIL.getDFSCluster();
123    fs = cluster.getFileSystem();
124  }
125
126  @AfterClass
127  public static void tearDownAfterClass() throws Exception {
128    TEST_UTIL.shutdownMiniCluster();
129  }
130
131  @Before
132  public void setUp() throws Exception {
133    walQueue = new PriorityBlockingQueue<>();
134    pathWatcher = new PathWatcher();
135    final WALFactory wals = new WALFactory(CONF, tn.getMethodName());
136    wals.getWALProvider().addWALActionsListener(pathWatcher);
137    log = wals.getWAL(info);
138  }
139
140  @After
141  public void tearDown() throws Exception {
142    log.close();
143  }
144
145  // Try out different combinations of row count and KeyValue count
146  @Test
147  public void testDifferentCounts() throws Exception {
148    int[] NB_ROWS = { 1500, 60000 };
149    int[] NB_KVS = { 1, 100 };
150    // whether compression is used
151    Boolean[] BOOL_VALS = { false, true };
152    // long lastPosition = 0;
153    for (int nbRows : NB_ROWS) {
154      for (int walEditKVs : NB_KVS) {
155        for (boolean isCompressionEnabled : BOOL_VALS) {
156          TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION,
157            isCompressionEnabled);
158          mvcc.advanceTo(1);
159
160          for (int i = 0; i < nbRows; i++) {
161            appendToLogAndSync(walEditKVs);
162          }
163
164          log.rollWriter();
165
166          try (WALEntryStream entryStream =
167              new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
168            int i = 0;
169            while (entryStream.hasNext()) {
170              assertNotNull(entryStream.next());
171              i++;
172            }
173            assertEquals(nbRows, i);
174
175            // should've read all entries
176            assertFalse(entryStream.hasNext());
177          }
178          // reset everything for next loop
179          log.close();
180          setUp();
181        }
182      }
183    }
184  }
185
186  /**
187   * Tests basic reading of log appends
188   */
189  @Test
190  public void testAppendsWithRolls() throws Exception {
191    appendToLogAndSync();
192    long oldPos;
193    try (WALEntryStream entryStream =
194        new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
195      // There's one edit in the log, read it. Reading past it needs to throw exception
196      assertTrue(entryStream.hasNext());
197      WAL.Entry entry = entryStream.peek();
198      assertSame(entry, entryStream.next());
199      assertNotNull(entry);
200      assertFalse(entryStream.hasNext());
201      assertNull(entryStream.peek());
202      assertNull(entryStream.next());
203      oldPos = entryStream.getPosition();
204    }
205
206    appendToLogAndSync();
207
208    try (WALEntryStream entryStream = new WALEntryStream(walQueue, CONF, oldPos,
209        log, null, new MetricsSource("1"))) {
210      // Read the newly added entry, make sure we made progress
211      WAL.Entry entry = entryStream.next();
212      assertNotEquals(oldPos, entryStream.getPosition());
213      assertNotNull(entry);
214      oldPos = entryStream.getPosition();
215    }
216
217    // We rolled but we still should see the end of the first log and get that item
218    appendToLogAndSync();
219    log.rollWriter();
220    appendToLogAndSync();
221
222    try (WALEntryStream entryStream = new WALEntryStream(walQueue, CONF, oldPos,
223        log, null, new MetricsSource("1"))) {
224      WAL.Entry entry = entryStream.next();
225      assertNotEquals(oldPos, entryStream.getPosition());
226      assertNotNull(entry);
227
228      // next item should come from the new log
229      entry = entryStream.next();
230      assertNotEquals(oldPos, entryStream.getPosition());
231      assertNotNull(entry);
232
233      // no more entries to read
234      assertFalse(entryStream.hasNext());
235      oldPos = entryStream.getPosition();
236    }
237  }
238
239  /**
240   * Tests that if after a stream is opened, more entries come in and then the log is rolled, we
241   * don't mistakenly dequeue the current log thinking we're done with it
242   */
243  @Test
244  public void testLogrollWhileStreaming() throws Exception {
245    appendToLog("1");
246    appendToLog("2");// 2
247    try (WALEntryStream entryStream =
248        new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
249      assertEquals("1", getRow(entryStream.next()));
250
251      appendToLog("3"); // 3 - comes in after reader opened
252      log.rollWriter(); // log roll happening while we're reading
253      appendToLog("4"); // 4 - this append is in the rolled log
254
255      assertEquals("2", getRow(entryStream.next()));
256      assertEquals(2, walQueue.size()); // we should not have dequeued yet since there's still an
257                                        // entry in first log
258      assertEquals("3", getRow(entryStream.next())); // if implemented improperly, this would be 4
259                                                     // and 3 would be skipped
260      assertEquals("4", getRow(entryStream.next())); // 4
261      assertEquals(1, walQueue.size()); // now we've dequeued and moved on to next log properly
262      assertFalse(entryStream.hasNext());
263    }
264  }
265
266  /**
267   * Tests that if writes come in while we have a stream open, we shouldn't miss them
268   */
269  @Test
270  public void testNewEntriesWhileStreaming() throws Exception {
271    appendToLog("1");
272    try (WALEntryStream entryStream =
273        new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
274      entryStream.next(); // we've hit the end of the stream at this point
275
276      // some new entries come in while we're streaming
277      appendToLog("2");
278      appendToLog("3");
279
280      // don't see them
281      assertFalse(entryStream.hasNext());
282
283      // But we do if we reset
284      entryStream.reset();
285      assertEquals("2", getRow(entryStream.next()));
286      assertEquals("3", getRow(entryStream.next()));
287      assertFalse(entryStream.hasNext());
288    }
289  }
290
291  @Test
292  public void testResumeStreamingFromPosition() throws Exception {
293    long lastPosition = 0;
294    appendToLog("1");
295    try (WALEntryStream entryStream =
296        new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
297      entryStream.next(); // we've hit the end of the stream at this point
298      appendToLog("2");
299      appendToLog("3");
300      lastPosition = entryStream.getPosition();
301    }
302    // next stream should picks up where we left off
303    try (WALEntryStream entryStream =
304        new WALEntryStream(walQueue, CONF, lastPosition, log, null, new MetricsSource("1"))) {
305      assertEquals("2", getRow(entryStream.next()));
306      assertEquals("3", getRow(entryStream.next()));
307      assertFalse(entryStream.hasNext()); // done
308      assertEquals(1, walQueue.size());
309    }
310  }
311
312  /**
313   * Tests that if we stop before hitting the end of a stream, we can continue where we left off
314   * using the last position
315   */
316  @Test
317  public void testPosition() throws Exception {
318    long lastPosition = 0;
319    appendEntriesToLogAndSync(3);
320    // read only one element
321    try (WALEntryStream entryStream = new WALEntryStream(walQueue, CONF, lastPosition,
322        log, null, new MetricsSource("1"))) {
323      entryStream.next();
324      lastPosition = entryStream.getPosition();
325    }
326    // there should still be two more entries from where we left off
327    try (WALEntryStream entryStream =
328        new WALEntryStream(walQueue, CONF, lastPosition, log, null, new MetricsSource("1"))) {
329      assertNotNull(entryStream.next());
330      assertNotNull(entryStream.next());
331      assertFalse(entryStream.hasNext());
332    }
333  }
334
335
336  @Test
337  public void testEmptyStream() throws Exception {
338    try (WALEntryStream entryStream =
339        new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
340      assertFalse(entryStream.hasNext());
341    }
342  }
343
344  @Test
345  public void testWALKeySerialization() throws Exception {
346    Map<String, byte[]> attributes = new HashMap<String, byte[]>();
347    attributes.put("foo", Bytes.toBytes("foo-value"));
348    attributes.put("bar", Bytes.toBytes("bar-value"));
349    WALKeyImpl key = new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
350        System.currentTimeMillis(), new ArrayList<UUID>(), 0L, 0L,
351        mvcc, scopes, attributes);
352    Assert.assertEquals(attributes, key.getExtendedAttributes());
353
354    WALProtos.WALKey.Builder builder = key.getBuilder(WALCellCodec.getNoneCompressor());
355    WALProtos.WALKey serializedKey = builder.build();
356
357    WALKeyImpl deserializedKey = new WALKeyImpl();
358    deserializedKey.readFieldsFromPb(serializedKey, WALCellCodec.getNoneUncompressor());
359
360    //equals() only checks region name, sequence id and write time
361    Assert.assertEquals(key, deserializedKey);
362    //can't use Map.equals() because byte arrays use reference equality
363    Assert.assertEquals(key.getExtendedAttributes().keySet(),
364        deserializedKey.getExtendedAttributes().keySet());
365    for (Map.Entry<String, byte[]> entry : deserializedKey.getExtendedAttributes().entrySet()){
366      Assert.assertArrayEquals(key.getExtendedAttribute(entry.getKey()), entry.getValue());
367    }
368    Assert.assertEquals(key.getReplicationScopes(), deserializedKey.getReplicationScopes());
369  }
370
371  private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) {
372    ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
373    when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
374    when(mockSourceManager.getTotalBufferLimit()).thenReturn(
375        (long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
376    Server mockServer = Mockito.mock(Server.class);
377    ReplicationSource source = Mockito.mock(ReplicationSource.class);
378    when(source.getSourceManager()).thenReturn(mockSourceManager);
379    when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));
380    when(source.getWALFileLengthProvider()).thenReturn(log);
381    when(source.getServer()).thenReturn(mockServer);
382    when(source.isRecovered()).thenReturn(recovered);
383    MetricsReplicationGlobalSourceSource globalMetrics = Mockito.mock(
384        MetricsReplicationGlobalSourceSource.class);
385    when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics);
386    return source;
387  }
388
389  private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) {
390    ReplicationSource source = mockReplicationSource(recovered, conf);
391    when(source.isPeerEnabled()).thenReturn(true);
392    ReplicationSourceWALReader reader =
393      new ReplicationSourceWALReader(fs, conf, walQueue, 0, getDummyFilter(), source);
394    reader.start();
395    return reader;
396  }
397
398  @Test
399  public void testReplicationSourceWALReader() throws Exception {
400    appendEntriesToLogAndSync(3);
401    // get ending position
402    long position;
403    try (WALEntryStream entryStream =
404        new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
405      entryStream.next();
406      entryStream.next();
407      entryStream.next();
408      position = entryStream.getPosition();
409    }
410
411    // start up a reader
412    Path walPath = walQueue.peek();
413    ReplicationSourceWALReader reader = createReader(false, CONF);
414    WALEntryBatch entryBatch = reader.take();
415
416    // should've batched up our entries
417    assertNotNull(entryBatch);
418    assertEquals(3, entryBatch.getWalEntries().size());
419    assertEquals(position, entryBatch.getLastWalPosition());
420    assertEquals(walPath, entryBatch.getLastWalPath());
421    assertEquals(3, entryBatch.getNbRowKeys());
422
423    appendToLog("foo");
424    entryBatch = reader.take();
425    assertEquals(1, entryBatch.getNbEntries());
426    assertEquals("foo", getRow(entryBatch.getWalEntries().get(0)));
427  }
428
429  @Test
430  public void testReplicationSourceWALReaderRecovered() throws Exception {
431    appendEntriesToLogAndSync(10);
432    Path walPath = walQueue.peek();
433    log.rollWriter();
434    appendEntriesToLogAndSync(5);
435    log.shutdown();
436
437    Configuration conf = new Configuration(CONF);
438    conf.setInt("replication.source.nb.capacity", 10);
439
440    ReplicationSourceWALReader reader = createReader(true, conf);
441
442    WALEntryBatch batch = reader.take();
443    assertEquals(walPath, batch.getLastWalPath());
444    assertEquals(10, batch.getNbEntries());
445    assertFalse(batch.isEndOfFile());
446
447    batch = reader.take();
448    assertEquals(walPath, batch.getLastWalPath());
449    assertEquals(0, batch.getNbEntries());
450    assertTrue(batch.isEndOfFile());
451
452    walPath = walQueue.peek();
453    batch = reader.take();
454    assertEquals(walPath, batch.getLastWalPath());
455    assertEquals(5, batch.getNbEntries());
456    assertTrue(batch.isEndOfFile());
457
458    assertSame(WALEntryBatch.NO_MORE_DATA, reader.take());
459  }
460
461  // Testcase for HBASE-20206
462  @Test
463  public void testReplicationSourceWALReaderWrongPosition() throws Exception {
464    appendEntriesToLogAndSync(1);
465    Path walPath = walQueue.peek();
466    log.rollWriter();
467    appendEntriesToLogAndSync(20);
468    TEST_UTIL.waitFor(5000, new ExplainingPredicate<Exception>() {
469
470      @Override
471      public boolean evaluate() throws Exception {
472        return fs.getFileStatus(walPath).getLen() > 0;
473      }
474
475      @Override
476      public String explainFailure() throws Exception {
477        return walPath + " has not been closed yet";
478      }
479
480    });
481    long walLength = fs.getFileStatus(walPath).getLen();
482
483    ReplicationSourceWALReader reader = createReader(false, CONF);
484
485    WALEntryBatch entryBatch = reader.take();
486    assertEquals(walPath, entryBatch.getLastWalPath());
487    assertTrue("Position " + entryBatch.getLastWalPosition() + " is out of range, file length is " +
488      walLength, entryBatch.getLastWalPosition() <= walLength);
489    assertEquals(1, entryBatch.getNbEntries());
490    assertTrue(entryBatch.isEndOfFile());
491
492    Path walPath2 = walQueue.peek();
493    entryBatch = reader.take();
494    assertEquals(walPath2, entryBatch.getLastWalPath());
495    assertEquals(20, entryBatch.getNbEntries());
496    assertFalse(entryBatch.isEndOfFile());
497
498    log.rollWriter();
499    appendEntriesToLogAndSync(10);
500    entryBatch = reader.take();
501    assertEquals(walPath2, entryBatch.getLastWalPath());
502    assertEquals(0, entryBatch.getNbEntries());
503    assertTrue(entryBatch.isEndOfFile());
504
505    Path walPath3 = walQueue.peek();
506    entryBatch = reader.take();
507    assertEquals(walPath3, entryBatch.getLastWalPath());
508    assertEquals(10, entryBatch.getNbEntries());
509    assertFalse(entryBatch.isEndOfFile());
510  }
511
512  @Test
513  public void testReplicationSourceWALReaderDisabled()
514      throws IOException, InterruptedException, ExecutionException {
515    appendEntriesToLogAndSync(3);
516    // get ending position
517    long position;
518    try (WALEntryStream entryStream =
519      new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
520      entryStream.next();
521      entryStream.next();
522      entryStream.next();
523      position = entryStream.getPosition();
524    }
525
526    // start up a reader
527    Path walPath = walQueue.peek();
528    ReplicationSource source = mockReplicationSource(false, CONF);
529    AtomicInteger invokeCount = new AtomicInteger(0);
530    AtomicBoolean enabled = new AtomicBoolean(false);
531    when(source.isPeerEnabled()).then(i -> {
532      invokeCount.incrementAndGet();
533      return enabled.get();
534    });
535
536    ReplicationSourceWALReader reader =
537      new ReplicationSourceWALReader(fs, CONF, walQueue, 0, getDummyFilter(), source);
538    reader.start();
539    Future<WALEntryBatch> future = ForkJoinPool.commonPool().submit(() -> {
540      return reader.take();
541    });
542    // make sure that the isPeerEnabled has been called several times
543    TEST_UTIL.waitFor(30000, () -> invokeCount.get() >= 5);
544    // confirm that we can read nothing if the peer is disabled
545    assertFalse(future.isDone());
546    // then enable the peer, we should get the batch
547    enabled.set(true);
548    WALEntryBatch entryBatch = future.get();
549
550    // should've batched up our entries
551    assertNotNull(entryBatch);
552    assertEquals(3, entryBatch.getWalEntries().size());
553    assertEquals(position, entryBatch.getLastWalPosition());
554    assertEquals(walPath, entryBatch.getLastWalPath());
555    assertEquals(3, entryBatch.getNbRowKeys());
556  }
557
558  private String getRow(WAL.Entry entry) {
559    Cell cell = entry.getEdit().getCells().get(0);
560    return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
561  }
562
563  private void appendToLog(String key) throws IOException {
564    final long txid = log.appendData(info,
565      new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(),
566          mvcc, scopes), getWALEdit(key));
567    log.sync(txid);
568  }
569
570  private void appendEntriesToLogAndSync(int count) throws IOException {
571    long txid = -1L;
572    for (int i = 0; i < count; i++) {
573      txid = appendToLog(1);
574    }
575    log.sync(txid);
576  }
577
578  private void appendToLogAndSync() throws IOException {
579    appendToLogAndSync(1);
580  }
581
582  private void appendToLogAndSync(int count) throws IOException {
583    long txid = appendToLog(count);
584    log.sync(txid);
585  }
586
587  private long appendToLog(int count) throws IOException {
588    return log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
589      System.currentTimeMillis(), mvcc, scopes), getWALEdits(count));
590  }
591
592  private WALEdit getWALEdits(int count) {
593    WALEdit edit = new WALEdit();
594    for (int i = 0; i < count; i++) {
595      edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier,
596          System.currentTimeMillis(), qualifier));
597    }
598    return edit;
599  }
600
601  private WALEdit getWALEdit(String row) {
602    WALEdit edit = new WALEdit();
603    edit.add(
604      new KeyValue(Bytes.toBytes(row), family, qualifier, System.currentTimeMillis(), qualifier));
605    return edit;
606  }
607
608  private WALEntryFilter getDummyFilter() {
609    return new WALEntryFilter() {
610
611      @Override
612      public Entry filter(Entry entry) {
613        return entry;
614      }
615    };
616  }
617
618  class PathWatcher implements WALActionsListener {
619
620    Path currentPath;
621
622    @Override
623    public void preLogRoll(Path oldPath, Path newPath) throws IOException {
624      walQueue.add(newPath);
625      currentPath = newPath;
626    }
627  }
628
629  @Test
630  public void testReadBeyondCommittedLength() throws IOException, InterruptedException {
631    appendToLog("1");
632    appendToLog("2");
633    long size = log.getLogFileSizeIfBeingWritten(walQueue.peek()).getAsLong();
634    AtomicLong fileLength = new AtomicLong(size - 1);
635    try (WALEntryStream entryStream = new WALEntryStream(walQueue,  CONF, 0,
636        p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"))) {
637      assertTrue(entryStream.hasNext());
638      assertNotNull(entryStream.next());
639      // can not get log 2
640      assertFalse(entryStream.hasNext());
641      Thread.sleep(1000);
642      entryStream.reset();
643      // still can not get log 2
644      assertFalse(entryStream.hasNext());
645
646      // can get log 2 now
647      fileLength.set(size);
648      entryStream.reset();
649      assertTrue(entryStream.hasNext());
650      assertNotNull(entryStream.next());
651
652      assertFalse(entryStream.hasNext());
653    }
654  }
655}