001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNull;
025import static org.junit.Assert.assertSame;
026import static org.junit.Assert.assertTrue;
027import static org.mockito.Mockito.when;
028
029import java.io.IOException;
030import java.util.NavigableMap;
031import java.util.OptionalLong;
032import java.util.TreeMap;
033import java.util.concurrent.ExecutionException;
034import java.util.concurrent.ForkJoinPool;
035import java.util.concurrent.Future;
036import java.util.concurrent.PriorityBlockingQueue;
037import java.util.concurrent.atomic.AtomicBoolean;
038import java.util.concurrent.atomic.AtomicInteger;
039import java.util.concurrent.atomic.AtomicLong;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.hbase.Cell;
044import org.apache.hadoop.hbase.HBaseClassTestRule;
045import org.apache.hadoop.hbase.HBaseTestingUtility;
046import org.apache.hadoop.hbase.HConstants;
047import org.apache.hadoop.hbase.KeyValue;
048import org.apache.hadoop.hbase.Server;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
051import org.apache.hadoop.hbase.client.RegionInfo;
052import org.apache.hadoop.hbase.client.RegionInfoBuilder;
053import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
054import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
055import org.apache.hadoop.hbase.replication.WALEntryFilter;
056import org.apache.hadoop.hbase.testclassification.LargeTests;
057import org.apache.hadoop.hbase.testclassification.ReplicationTests;
058import org.apache.hadoop.hbase.util.Bytes;
059import org.apache.hadoop.hbase.wal.WAL;
060import org.apache.hadoop.hbase.wal.WAL.Entry;
061import org.apache.hadoop.hbase.wal.WALEdit;
062import org.apache.hadoop.hbase.wal.WALFactory;
063import org.apache.hadoop.hbase.wal.WALKeyImpl;
064import org.apache.hadoop.hdfs.MiniDFSCluster;
065import org.junit.After;
066import org.junit.AfterClass;
067import org.junit.Before;
068import org.junit.BeforeClass;
069import org.junit.ClassRule;
070import org.junit.Rule;
071import org.junit.Test;
072import org.junit.experimental.categories.Category;
073import org.junit.rules.TestName;
074import org.mockito.Mockito;
075
076@Category({ ReplicationTests.class, LargeTests.class })
077public class TestWALEntryStream {
078
079  @ClassRule
080  public static final HBaseClassTestRule CLASS_RULE =
081      HBaseClassTestRule.forClass(TestWALEntryStream.class);
082
083  private static HBaseTestingUtility TEST_UTIL;
084  private static Configuration CONF;
085  private static FileSystem fs;
086  private static MiniDFSCluster cluster;
087  private static final TableName tableName = TableName.valueOf("tablename");
088  private static final byte[] family = Bytes.toBytes("column");
089  private static final byte[] qualifier = Bytes.toBytes("qualifier");
090  private static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName)
091      .setStartKey(HConstants.EMPTY_START_ROW).setEndKey(HConstants.LAST_ROW).build();
092  private static final NavigableMap<byte[], Integer> scopes = getScopes();
093
094  private static NavigableMap<byte[], Integer> getScopes() {
095    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
096    scopes.put(family, 1);
097    return scopes;
098  }
099
100  private WAL log;
101  PriorityBlockingQueue<Path> walQueue;
102  private PathWatcher pathWatcher;
103
104  @Rule
105  public TestName tn = new TestName();
106  private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
107
108  @BeforeClass
109  public static void setUpBeforeClass() throws Exception {
110    TEST_UTIL = new HBaseTestingUtility();
111    CONF = TEST_UTIL.getConfiguration();
112    TEST_UTIL.startMiniDFSCluster(3);
113
114    cluster = TEST_UTIL.getDFSCluster();
115    fs = cluster.getFileSystem();
116  }
117
118  @AfterClass
119  public static void tearDownAfterClass() throws Exception {
120    TEST_UTIL.shutdownMiniCluster();
121  }
122
123  @Before
124  public void setUp() throws Exception {
125    walQueue = new PriorityBlockingQueue<>();
126    pathWatcher = new PathWatcher();
127    final WALFactory wals = new WALFactory(CONF, tn.getMethodName());
128    wals.getWALProvider().addWALActionsListener(pathWatcher);
129    log = wals.getWAL(info);
130  }
131
132  @After
133  public void tearDown() throws Exception {
134    log.close();
135  }
136
137  // Try out different combinations of row count and KeyValue count
138  @Test
139  public void testDifferentCounts() throws Exception {
140    int[] NB_ROWS = { 1500, 60000 };
141    int[] NB_KVS = { 1, 100 };
142    // whether compression is used
143    Boolean[] BOOL_VALS = { false, true };
144    // long lastPosition = 0;
145    for (int nbRows : NB_ROWS) {
146      for (int walEditKVs : NB_KVS) {
147        for (boolean isCompressionEnabled : BOOL_VALS) {
148          TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION,
149            isCompressionEnabled);
150          mvcc.advanceTo(1);
151
152          for (int i = 0; i < nbRows; i++) {
153            appendToLogAndSync(walEditKVs);
154          }
155
156          log.rollWriter();
157
158          try (WALEntryStream entryStream =
159              new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
160            int i = 0;
161            while (entryStream.hasNext()) {
162              assertNotNull(entryStream.next());
163              i++;
164            }
165            assertEquals(nbRows, i);
166
167            // should've read all entries
168            assertFalse(entryStream.hasNext());
169          }
170          // reset everything for next loop
171          log.close();
172          setUp();
173        }
174      }
175    }
176  }
177
178  /**
179   * Tests basic reading of log appends
180   */
181  @Test
182  public void testAppendsWithRolls() throws Exception {
183    appendToLogAndSync();
184    long oldPos;
185    try (WALEntryStream entryStream =
186        new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
187      // There's one edit in the log, read it. Reading past it needs to throw exception
188      assertTrue(entryStream.hasNext());
189      WAL.Entry entry = entryStream.peek();
190      assertSame(entry, entryStream.next());
191      assertNotNull(entry);
192      assertFalse(entryStream.hasNext());
193      assertNull(entryStream.peek());
194      assertNull(entryStream.next());
195      oldPos = entryStream.getPosition();
196    }
197
198    appendToLogAndSync();
199
200    try (WALEntryStream entryStream = new WALEntryStream(walQueue, CONF, oldPos,
201        log, null, new MetricsSource("1"))) {
202      // Read the newly added entry, make sure we made progress
203      WAL.Entry entry = entryStream.next();
204      assertNotEquals(oldPos, entryStream.getPosition());
205      assertNotNull(entry);
206      oldPos = entryStream.getPosition();
207    }
208
209    // We rolled but we still should see the end of the first log and get that item
210    appendToLogAndSync();
211    log.rollWriter();
212    appendToLogAndSync();
213
214    try (WALEntryStream entryStream = new WALEntryStream(walQueue, CONF, oldPos,
215        log, null, new MetricsSource("1"))) {
216      WAL.Entry entry = entryStream.next();
217      assertNotEquals(oldPos, entryStream.getPosition());
218      assertNotNull(entry);
219
220      // next item should come from the new log
221      entry = entryStream.next();
222      assertNotEquals(oldPos, entryStream.getPosition());
223      assertNotNull(entry);
224
225      // no more entries to read
226      assertFalse(entryStream.hasNext());
227      oldPos = entryStream.getPosition();
228    }
229  }
230
231  /**
232   * Tests that if after a stream is opened, more entries come in and then the log is rolled, we
233   * don't mistakenly dequeue the current log thinking we're done with it
234   */
235  @Test
236  public void testLogrollWhileStreaming() throws Exception {
237    appendToLog("1");
238    appendToLog("2");// 2
239    try (WALEntryStream entryStream =
240        new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
241      assertEquals("1", getRow(entryStream.next()));
242
243      appendToLog("3"); // 3 - comes in after reader opened
244      log.rollWriter(); // log roll happening while we're reading
245      appendToLog("4"); // 4 - this append is in the rolled log
246
247      assertEquals("2", getRow(entryStream.next()));
248      assertEquals(2, walQueue.size()); // we should not have dequeued yet since there's still an
249                                        // entry in first log
250      assertEquals("3", getRow(entryStream.next())); // if implemented improperly, this would be 4
251                                                     // and 3 would be skipped
252      assertEquals("4", getRow(entryStream.next())); // 4
253      assertEquals(1, walQueue.size()); // now we've dequeued and moved on to next log properly
254      assertFalse(entryStream.hasNext());
255    }
256  }
257
258  /**
259   * Tests that if writes come in while we have a stream open, we shouldn't miss them
260   */
261  @Test
262  public void testNewEntriesWhileStreaming() throws Exception {
263    appendToLog("1");
264    try (WALEntryStream entryStream =
265        new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
266      entryStream.next(); // we've hit the end of the stream at this point
267
268      // some new entries come in while we're streaming
269      appendToLog("2");
270      appendToLog("3");
271
272      // don't see them
273      assertFalse(entryStream.hasNext());
274
275      // But we do if we reset
276      entryStream.reset();
277      assertEquals("2", getRow(entryStream.next()));
278      assertEquals("3", getRow(entryStream.next()));
279      assertFalse(entryStream.hasNext());
280    }
281  }
282
283  @Test
284  public void testResumeStreamingFromPosition() throws Exception {
285    long lastPosition = 0;
286    appendToLog("1");
287    try (WALEntryStream entryStream =
288        new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
289      entryStream.next(); // we've hit the end of the stream at this point
290      appendToLog("2");
291      appendToLog("3");
292      lastPosition = entryStream.getPosition();
293    }
294    // next stream should picks up where we left off
295    try (WALEntryStream entryStream =
296        new WALEntryStream(walQueue, CONF, lastPosition, log, null, new MetricsSource("1"))) {
297      assertEquals("2", getRow(entryStream.next()));
298      assertEquals("3", getRow(entryStream.next()));
299      assertFalse(entryStream.hasNext()); // done
300      assertEquals(1, walQueue.size());
301    }
302  }
303
304  /**
305   * Tests that if we stop before hitting the end of a stream, we can continue where we left off
306   * using the last position
307   */
308  @Test
309  public void testPosition() throws Exception {
310    long lastPosition = 0;
311    appendEntriesToLogAndSync(3);
312    // read only one element
313    try (WALEntryStream entryStream = new WALEntryStream(walQueue, CONF, lastPosition,
314        log, null, new MetricsSource("1"))) {
315      entryStream.next();
316      lastPosition = entryStream.getPosition();
317    }
318    // there should still be two more entries from where we left off
319    try (WALEntryStream entryStream =
320        new WALEntryStream(walQueue, CONF, lastPosition, log, null, new MetricsSource("1"))) {
321      assertNotNull(entryStream.next());
322      assertNotNull(entryStream.next());
323      assertFalse(entryStream.hasNext());
324    }
325  }
326
327
328  @Test
329  public void testEmptyStream() throws Exception {
330    try (WALEntryStream entryStream =
331        new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
332      assertFalse(entryStream.hasNext());
333    }
334  }
335
336  private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) {
337    ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
338    when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
339    Server mockServer = Mockito.mock(Server.class);
340    ReplicationSource source = Mockito.mock(ReplicationSource.class);
341    when(source.getSourceManager()).thenReturn(mockSourceManager);
342    when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));
343    when(source.getWALFileLengthProvider()).thenReturn(log);
344    when(source.getServer()).thenReturn(mockServer);
345    when(source.isRecovered()).thenReturn(recovered);
346    return source;
347  }
348
349  private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) {
350    ReplicationSource source = mockReplicationSource(recovered, conf);
351    when(source.isPeerEnabled()).thenReturn(true);
352    ReplicationSourceWALReader reader =
353      new ReplicationSourceWALReader(fs, conf, walQueue, 0, getDummyFilter(), source);
354    reader.start();
355    return reader;
356  }
357
358  @Test
359  public void testReplicationSourceWALReader() throws Exception {
360    appendEntriesToLogAndSync(3);
361    // get ending position
362    long position;
363    try (WALEntryStream entryStream =
364        new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
365      entryStream.next();
366      entryStream.next();
367      entryStream.next();
368      position = entryStream.getPosition();
369    }
370
371    // start up a reader
372    Path walPath = walQueue.peek();
373    ReplicationSourceWALReader reader = createReader(false, CONF);
374    WALEntryBatch entryBatch = reader.take();
375
376    // should've batched up our entries
377    assertNotNull(entryBatch);
378    assertEquals(3, entryBatch.getWalEntries().size());
379    assertEquals(position, entryBatch.getLastWalPosition());
380    assertEquals(walPath, entryBatch.getLastWalPath());
381    assertEquals(3, entryBatch.getNbRowKeys());
382
383    appendToLog("foo");
384    entryBatch = reader.take();
385    assertEquals(1, entryBatch.getNbEntries());
386    assertEquals("foo", getRow(entryBatch.getWalEntries().get(0)));
387  }
388
389  @Test
390  public void testReplicationSourceWALReaderRecovered() throws Exception {
391    appendEntriesToLogAndSync(10);
392    Path walPath = walQueue.peek();
393    log.rollWriter();
394    appendEntriesToLogAndSync(5);
395    log.shutdown();
396
397    Configuration conf = new Configuration(CONF);
398    conf.setInt("replication.source.nb.capacity", 10);
399
400    ReplicationSourceWALReader reader = createReader(true, conf);
401
402    WALEntryBatch batch = reader.take();
403    assertEquals(walPath, batch.getLastWalPath());
404    assertEquals(10, batch.getNbEntries());
405    assertFalse(batch.isEndOfFile());
406
407    batch = reader.take();
408    assertEquals(walPath, batch.getLastWalPath());
409    assertEquals(0, batch.getNbEntries());
410    assertTrue(batch.isEndOfFile());
411
412    walPath = walQueue.peek();
413    batch = reader.take();
414    assertEquals(walPath, batch.getLastWalPath());
415    assertEquals(5, batch.getNbEntries());
416    // Actually this should be true but we haven't handled this yet since for a normal queue the
417    // last one is always open... Not a big deal for now.
418    assertFalse(batch.isEndOfFile());
419
420    assertSame(WALEntryBatch.NO_MORE_DATA, reader.take());
421  }
422
423  // Testcase for HBASE-20206
424  @Test
425  public void testReplicationSourceWALReaderWrongPosition() throws Exception {
426    appendEntriesToLogAndSync(1);
427    Path walPath = walQueue.peek();
428    log.rollWriter();
429    appendEntriesToLogAndSync(20);
430    TEST_UTIL.waitFor(5000, new ExplainingPredicate<Exception>() {
431
432      @Override
433      public boolean evaluate() throws Exception {
434        return fs.getFileStatus(walPath).getLen() > 0;
435      }
436
437      @Override
438      public String explainFailure() throws Exception {
439        return walPath + " has not been closed yet";
440      }
441
442    });
443    long walLength = fs.getFileStatus(walPath).getLen();
444
445    ReplicationSourceWALReader reader = createReader(false, CONF);
446
447    WALEntryBatch entryBatch = reader.take();
448    assertEquals(walPath, entryBatch.getLastWalPath());
449    assertTrue("Position " + entryBatch.getLastWalPosition() + " is out of range, file length is " +
450      walLength, entryBatch.getLastWalPosition() <= walLength);
451    assertEquals(1, entryBatch.getNbEntries());
452    assertTrue(entryBatch.isEndOfFile());
453
454    Path walPath2 = walQueue.peek();
455    entryBatch = reader.take();
456    assertEquals(walPath2, entryBatch.getLastWalPath());
457    assertEquals(20, entryBatch.getNbEntries());
458    assertFalse(entryBatch.isEndOfFile());
459
460    log.rollWriter();
461    appendEntriesToLogAndSync(10);
462    entryBatch = reader.take();
463    assertEquals(walPath2, entryBatch.getLastWalPath());
464    assertEquals(0, entryBatch.getNbEntries());
465    assertTrue(entryBatch.isEndOfFile());
466
467    Path walPath3 = walQueue.peek();
468    entryBatch = reader.take();
469    assertEquals(walPath3, entryBatch.getLastWalPath());
470    assertEquals(10, entryBatch.getNbEntries());
471    assertFalse(entryBatch.isEndOfFile());
472  }
473
474  @Test
475  public void testReplicationSourceWALReaderDisabled()
476      throws IOException, InterruptedException, ExecutionException {
477    appendEntriesToLogAndSync(3);
478    // get ending position
479    long position;
480    try (WALEntryStream entryStream =
481      new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
482      entryStream.next();
483      entryStream.next();
484      entryStream.next();
485      position = entryStream.getPosition();
486    }
487
488    // start up a reader
489    Path walPath = walQueue.peek();
490    ReplicationSource source = mockReplicationSource(false, CONF);
491    AtomicInteger invokeCount = new AtomicInteger(0);
492    AtomicBoolean enabled = new AtomicBoolean(false);
493    when(source.isPeerEnabled()).then(i -> {
494      invokeCount.incrementAndGet();
495      return enabled.get();
496    });
497
498    ReplicationSourceWALReader reader =
499      new ReplicationSourceWALReader(fs, CONF, walQueue, 0, getDummyFilter(), source);
500    reader.start();
501    Future<WALEntryBatch> future = ForkJoinPool.commonPool().submit(() -> {
502      return reader.take();
503    });
504    // make sure that the isPeerEnabled has been called several times
505    TEST_UTIL.waitFor(30000, () -> invokeCount.get() >= 5);
506    // confirm that we can read nothing if the peer is disabled
507    assertFalse(future.isDone());
508    // then enable the peer, we should get the batch
509    enabled.set(true);
510    WALEntryBatch entryBatch = future.get();
511
512    // should've batched up our entries
513    assertNotNull(entryBatch);
514    assertEquals(3, entryBatch.getWalEntries().size());
515    assertEquals(position, entryBatch.getLastWalPosition());
516    assertEquals(walPath, entryBatch.getLastWalPath());
517    assertEquals(3, entryBatch.getNbRowKeys());
518  }
519
520  private String getRow(WAL.Entry entry) {
521    Cell cell = entry.getEdit().getCells().get(0);
522    return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
523  }
524
525  private void appendToLog(String key) throws IOException {
526    final long txid = log.appendData(info,
527      new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(),
528          mvcc, scopes), getWALEdit(key));
529    log.sync(txid);
530  }
531
532  private void appendEntriesToLogAndSync(int count) throws IOException {
533    long txid = -1L;
534    for (int i = 0; i < count; i++) {
535      txid = appendToLog(1);
536    }
537    log.sync(txid);
538  }
539
540  private void appendToLogAndSync() throws IOException {
541    appendToLogAndSync(1);
542  }
543
544  private void appendToLogAndSync(int count) throws IOException {
545    long txid = appendToLog(count);
546    log.sync(txid);
547  }
548
549  private long appendToLog(int count) throws IOException {
550    return log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
551      System.currentTimeMillis(), mvcc, scopes), getWALEdits(count));
552  }
553
554  private WALEdit getWALEdits(int count) {
555    WALEdit edit = new WALEdit();
556    for (int i = 0; i < count; i++) {
557      edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier,
558          System.currentTimeMillis(), qualifier));
559    }
560    return edit;
561  }
562
563  private WALEdit getWALEdit(String row) {
564    WALEdit edit = new WALEdit();
565    edit.add(
566      new KeyValue(Bytes.toBytes(row), family, qualifier, System.currentTimeMillis(), qualifier));
567    return edit;
568  }
569
570  private WALEntryFilter getDummyFilter() {
571    return new WALEntryFilter() {
572
573      @Override
574      public Entry filter(Entry entry) {
575        return entry;
576      }
577    };
578  }
579
580  class PathWatcher implements WALActionsListener {
581
582    Path currentPath;
583
584    @Override
585    public void preLogRoll(Path oldPath, Path newPath) throws IOException {
586      walQueue.add(newPath);
587      currentPath = newPath;
588    }
589  }
590
591  @Test
592  public void testReadBeyondCommittedLength() throws IOException, InterruptedException {
593    appendToLog("1");
594    appendToLog("2");
595    long size = log.getLogFileSizeIfBeingWritten(walQueue.peek()).getAsLong();
596    AtomicLong fileLength = new AtomicLong(size - 1);
597    try (WALEntryStream entryStream = new WALEntryStream(walQueue,  CONF, 0,
598        p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"))) {
599      assertTrue(entryStream.hasNext());
600      assertNotNull(entryStream.next());
601      // can not get log 2
602      assertFalse(entryStream.hasNext());
603      Thread.sleep(1000);
604      entryStream.reset();
605      // still can not get log 2
606      assertFalse(entryStream.hasNext());
607
608      // can get log 2 now
609      fileLength.set(size);
610      entryStream.reset();
611      assertTrue(entryStream.hasNext());
612      assertNotNull(entryStream.next());
613
614      assertFalse(entryStream.hasNext());
615    }
616  }
617}