001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertTrue;
025import static org.junit.Assert.fail;
026import static org.mockito.Mockito.when;
027
028import java.io.IOException;
029import java.util.NavigableMap;
030import java.util.NoSuchElementException;
031import java.util.OptionalLong;
032import java.util.TreeMap;
033import java.util.concurrent.PriorityBlockingQueue;
034import java.util.concurrent.atomic.AtomicLong;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.Cell;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseTestingUtility;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.KeyValue;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.client.RegionInfo;
045import org.apache.hadoop.hbase.client.RegionInfoBuilder;
046import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
047import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
048import org.apache.hadoop.hbase.replication.WALEntryFilter;
049import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch;
050import org.apache.hadoop.hbase.testclassification.LargeTests;
051import org.apache.hadoop.hbase.testclassification.ReplicationTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.wal.WAL;
054import org.apache.hadoop.hbase.wal.WAL.Entry;
055import org.apache.hadoop.hbase.wal.WALEdit;
056import org.apache.hadoop.hbase.wal.WALFactory;
057import org.apache.hadoop.hbase.wal.WALKeyImpl;
058import org.apache.hadoop.hdfs.MiniDFSCluster;
059import org.junit.After;
060import org.junit.AfterClass;
061import org.junit.Before;
062import org.junit.BeforeClass;
063import org.junit.ClassRule;
064import org.junit.Rule;
065import org.junit.Test;
066import org.junit.experimental.categories.Category;
067import org.junit.rules.TestName;
068import org.mockito.Mockito;
069
070@Category({ ReplicationTests.class, LargeTests.class })
071public class TestWALEntryStream {
072
073  @ClassRule
074  public static final HBaseClassTestRule CLASS_RULE =
075      HBaseClassTestRule.forClass(TestWALEntryStream.class);
076
077  private static HBaseTestingUtility TEST_UTIL;
078  private static Configuration conf;
079  private static FileSystem fs;
080  private static MiniDFSCluster cluster;
081  private static final TableName tableName = TableName.valueOf("tablename");
082  private static final byte[] family = Bytes.toBytes("column");
083  private static final byte[] qualifier = Bytes.toBytes("qualifier");
084  private static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName)
085      .setStartKey(HConstants.EMPTY_START_ROW).setEndKey(HConstants.LAST_ROW).build();
086  private static final NavigableMap<byte[], Integer> scopes = getScopes();
087
088  private static NavigableMap<byte[], Integer> getScopes() {
089    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
090    scopes.put(family, 1);
091    return scopes;
092  }
093
094  private WAL log;
095  PriorityBlockingQueue<Path> walQueue;
096  private PathWatcher pathWatcher;
097
098  @Rule
099  public TestName tn = new TestName();
100  private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
101
102  @BeforeClass
103  public static void setUpBeforeClass() throws Exception {
104    TEST_UTIL = new HBaseTestingUtility();
105    conf = TEST_UTIL.getConfiguration();
106    TEST_UTIL.startMiniDFSCluster(3);
107
108    cluster = TEST_UTIL.getDFSCluster();
109    fs = cluster.getFileSystem();
110  }
111
112  @AfterClass
113  public static void tearDownAfterClass() throws Exception {
114    TEST_UTIL.shutdownMiniCluster();
115  }
116
117  @Before
118  public void setUp() throws Exception {
119    walQueue = new PriorityBlockingQueue<>();
120    pathWatcher = new PathWatcher();
121    final WALFactory wals = new WALFactory(conf, tn.getMethodName());
122    wals.getWALProvider().addWALActionsListener(pathWatcher);
123    log = wals.getWAL(info);
124  }
125
126  @After
127  public void tearDown() throws Exception {
128    log.close();
129  }
130
131  // Try out different combinations of row count and KeyValue count
132  @Test
133  public void testDifferentCounts() throws Exception {
134    int[] NB_ROWS = { 1500, 60000 };
135    int[] NB_KVS = { 1, 100 };
136    // whether compression is used
137    Boolean[] BOOL_VALS = { false, true };
138    // long lastPosition = 0;
139    for (int nbRows : NB_ROWS) {
140      for (int walEditKVs : NB_KVS) {
141        for (boolean isCompressionEnabled : BOOL_VALS) {
142          TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION,
143            isCompressionEnabled);
144          mvcc.advanceTo(1);
145
146          for (int i = 0; i < nbRows; i++) {
147            appendToLogPlus(walEditKVs);
148          }
149
150          log.rollWriter();
151
152          try (WALEntryStream entryStream =
153              new WALEntryStream(walQueue, conf, 0, log, null, new MetricsSource("1"))) {
154            int i = 0;
155            while (entryStream.hasNext()) {
156              assertNotNull(entryStream.next());
157              i++;
158            }
159            assertEquals(nbRows, i);
160
161            // should've read all entries
162            assertFalse(entryStream.hasNext());
163          }
164          // reset everything for next loop
165          log.close();
166          setUp();
167        }
168      }
169    }
170  }
171
172  /**
173   * Tests basic reading of log appends
174   */
175  @Test
176  public void testAppendsWithRolls() throws Exception {
177    appendToLog();
178    long oldPos;
179    try (WALEntryStream entryStream =
180        new WALEntryStream(walQueue, conf, 0, log, null, new MetricsSource("1"))) {
181      // There's one edit in the log, read it. Reading past it needs to throw exception
182      assertTrue(entryStream.hasNext());
183      WAL.Entry entry = entryStream.next();
184      assertNotNull(entry);
185      assertFalse(entryStream.hasNext());
186      try {
187        entry = entryStream.next();
188        fail();
189      } catch (NoSuchElementException e) {
190        // expected
191      }
192      oldPos = entryStream.getPosition();
193    }
194
195    appendToLog();
196
197    try (WALEntryStream entryStream = new WALEntryStream(walQueue, conf, oldPos,
198        log, null, new MetricsSource("1"))) {
199      // Read the newly added entry, make sure we made progress
200      WAL.Entry entry = entryStream.next();
201      assertNotEquals(oldPos, entryStream.getPosition());
202      assertNotNull(entry);
203      oldPos = entryStream.getPosition();
204    }
205
206    // We rolled but we still should see the end of the first log and get that item
207    appendToLog();
208    log.rollWriter();
209    appendToLog();
210
211    try (WALEntryStream entryStream = new WALEntryStream(walQueue, conf, oldPos,
212        log, null, new MetricsSource("1"))) {
213      WAL.Entry entry = entryStream.next();
214      assertNotEquals(oldPos, entryStream.getPosition());
215      assertNotNull(entry);
216
217      // next item should come from the new log
218      entry = entryStream.next();
219      assertNotEquals(oldPos, entryStream.getPosition());
220      assertNotNull(entry);
221
222      // no more entries to read
223      assertFalse(entryStream.hasNext());
224      oldPos = entryStream.getPosition();
225    }
226  }
227
228  /**
229   * Tests that if after a stream is opened, more entries come in and then the log is rolled, we
230   * don't mistakenly dequeue the current log thinking we're done with it
231   */
232  @Test
233  public void testLogrollWhileStreaming() throws Exception {
234    appendToLog("1");
235    appendToLog("2");// 2
236    try (WALEntryStream entryStream =
237        new WALEntryStream(walQueue, conf, 0, log, null, new MetricsSource("1"))) {
238      assertEquals("1", getRow(entryStream.next()));
239
240      appendToLog("3"); // 3 - comes in after reader opened
241      log.rollWriter(); // log roll happening while we're reading
242      appendToLog("4"); // 4 - this append is in the rolled log
243
244      assertEquals("2", getRow(entryStream.next()));
245      assertEquals(2, walQueue.size()); // we should not have dequeued yet since there's still an
246                                        // entry in first log
247      assertEquals("3", getRow(entryStream.next())); // if implemented improperly, this would be 4
248                                                     // and 3 would be skipped
249      assertEquals("4", getRow(entryStream.next())); // 4
250      assertEquals(1, walQueue.size()); // now we've dequeued and moved on to next log properly
251      assertFalse(entryStream.hasNext());
252    }
253  }
254
255  /**
256   * Tests that if writes come in while we have a stream open, we shouldn't miss them
257   */
258  @Test
259  public void testNewEntriesWhileStreaming() throws Exception {
260    appendToLog("1");
261    try (WALEntryStream entryStream =
262        new WALEntryStream(walQueue, conf, 0, log, null, new MetricsSource("1"))) {
263      entryStream.next(); // we've hit the end of the stream at this point
264
265      // some new entries come in while we're streaming
266      appendToLog("2");
267      appendToLog("3");
268
269      // don't see them
270      assertFalse(entryStream.hasNext());
271
272      // But we do if we reset
273      entryStream.reset();
274      assertEquals("2", getRow(entryStream.next()));
275      assertEquals("3", getRow(entryStream.next()));
276      assertFalse(entryStream.hasNext());
277    }
278  }
279
280  @Test
281  public void testResumeStreamingFromPosition() throws Exception {
282    long lastPosition = 0;
283    appendToLog("1");
284    try (WALEntryStream entryStream =
285        new WALEntryStream(walQueue, conf, 0, log, null, new MetricsSource("1"))) {
286      entryStream.next(); // we've hit the end of the stream at this point
287      appendToLog("2");
288      appendToLog("3");
289      lastPosition = entryStream.getPosition();
290    }
291    // next stream should picks up where we left off
292    try (WALEntryStream entryStream =
293        new WALEntryStream(walQueue, conf, lastPosition, log, null, new MetricsSource("1"))) {
294      assertEquals("2", getRow(entryStream.next()));
295      assertEquals("3", getRow(entryStream.next()));
296      assertFalse(entryStream.hasNext()); // done
297      assertEquals(1, walQueue.size());
298    }
299  }
300
301  /**
302   * Tests that if we stop before hitting the end of a stream, we can continue where we left off
303   * using the last position
304   */
305  @Test
306  public void testPosition() throws Exception {
307    long lastPosition = 0;
308    appendEntriesToLog(3);
309    // read only one element
310    try (WALEntryStream entryStream = new WALEntryStream(walQueue, conf, lastPosition,
311        log, null, new MetricsSource("1"))) {
312      entryStream.next();
313      lastPosition = entryStream.getPosition();
314    }
315    // there should still be two more entries from where we left off
316    try (WALEntryStream entryStream =
317        new WALEntryStream(walQueue, conf, lastPosition, log, null, new MetricsSource("1"))) {
318      assertNotNull(entryStream.next());
319      assertNotNull(entryStream.next());
320      assertFalse(entryStream.hasNext());
321    }
322  }
323
324
325  @Test
326  public void testEmptyStream() throws Exception {
327    try (WALEntryStream entryStream =
328        new WALEntryStream(walQueue, conf, 0, log, null, new MetricsSource("1"))) {
329      assertFalse(entryStream.hasNext());
330    }
331  }
332
333  @Test
334  public void testReplicationSourceWALReaderThread() throws Exception {
335    appendEntriesToLog(3);
336    // get ending position
337    long position;
338    try (WALEntryStream entryStream =
339        new WALEntryStream(walQueue, conf, 0, log, null, new MetricsSource("1"))) {
340      entryStream.next();
341      entryStream.next();
342      entryStream.next();
343      position = entryStream.getPosition();
344    }
345
346    // start up a batcher
347    ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
348    when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
349    ReplicationSource source = Mockito.mock(ReplicationSource.class);
350    when(source.getSourceManager()).thenReturn(mockSourceManager);
351    when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));
352    when(source.getWALFileLengthProvider()).thenReturn(log);
353    ReplicationSourceWALReader batcher = new ReplicationSourceWALReader(fs, conf,
354        walQueue, 0, getDummyFilter(), source);
355    Path walPath = walQueue.peek();
356    batcher.start();
357    WALEntryBatch entryBatch = batcher.take();
358
359    // should've batched up our entries
360    assertNotNull(entryBatch);
361    assertEquals(3, entryBatch.getWalEntries().size());
362    assertEquals(position, entryBatch.getLastWalPosition());
363    assertEquals(walPath, entryBatch.getLastWalPath());
364    assertEquals(3, entryBatch.getNbRowKeys());
365
366    appendToLog("foo");
367    entryBatch = batcher.take();
368    assertEquals(1, entryBatch.getNbEntries());
369    assertEquals("foo", getRow(entryBatch.getWalEntries().get(0)));
370  }
371
372  private String getRow(WAL.Entry entry) {
373    Cell cell = entry.getEdit().getCells().get(0);
374    return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
375  }
376
377  private void appendToLog(String key) throws IOException {
378    final long txid = log.append(info,
379      new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(),
380          mvcc, scopes), getWALEdit(key), true);
381    log.sync(txid);
382  }
383
384  private void appendEntriesToLog(int count) throws IOException {
385    for (int i = 0; i < count; i++) {
386      appendToLog();
387    }
388  }
389
390  private void appendToLog() throws IOException {
391    appendToLogPlus(1);
392  }
393
394  private void appendToLogPlus(int count) throws IOException {
395    final long txid = log.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
396            System.currentTimeMillis(), mvcc, scopes), getWALEdits(count), true);
397    log.sync(txid);
398  }
399
400  private WALEdit getWALEdits(int count) {
401    WALEdit edit = new WALEdit();
402    for (int i = 0; i < count; i++) {
403      edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier,
404          System.currentTimeMillis(), qualifier));
405    }
406    return edit;
407  }
408
409  private WALEdit getWALEdit(String row) {
410    WALEdit edit = new WALEdit();
411    edit.add(
412      new KeyValue(Bytes.toBytes(row), family, qualifier, System.currentTimeMillis(), qualifier));
413    return edit;
414  }
415
416  private WALEntryFilter getDummyFilter() {
417    return new WALEntryFilter() {
418
419      @Override
420      public Entry filter(Entry entry) {
421        return entry;
422      }
423    };
424  }
425
426  class PathWatcher implements WALActionsListener {
427
428    Path currentPath;
429
430    @Override
431    public void preLogRoll(Path oldPath, Path newPath) throws IOException {
432      walQueue.add(newPath);
433      currentPath = newPath;
434    }
435  }
436
437  @Test
438  public void testReadBeyondCommittedLength() throws IOException, InterruptedException {
439    appendToLog("1");
440    appendToLog("2");
441    long size = log.getLogFileSizeIfBeingWritten(walQueue.peek()).getAsLong();
442    AtomicLong fileLength = new AtomicLong(size - 1);
443    try (WALEntryStream entryStream = new WALEntryStream(walQueue, conf, 0,
444        p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"))) {
445      assertTrue(entryStream.hasNext());
446      assertNotNull(entryStream.next());
447      // can not get log 2
448      assertFalse(entryStream.hasNext());
449      Thread.sleep(1000);
450      entryStream.reset();
451      // still can not get log 2
452      assertFalse(entryStream.hasNext());
453
454      // can get log 2 now
455      fileLength.set(size);
456      entryStream.reset();
457      assertTrue(entryStream.hasNext());
458      assertNotNull(entryStream.next());
459
460      assertFalse(entryStream.hasNext());
461    }
462  }
463}