001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.mockito.Mockito.mock;
021
022import java.io.IOException;
023import java.util.NavigableMap;
024import java.util.TreeMap;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.HBaseTestingUtility;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.KeyValue;
031import org.apache.hadoop.hbase.ServerName;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.TableNameTestRule;
034import org.apache.hadoop.hbase.Waiter;
035import org.apache.hadoop.hbase.client.RegionInfo;
036import org.apache.hadoop.hbase.client.RegionInfoBuilder;
037import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
038import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.apache.hadoop.hbase.wal.WAL;
042import org.apache.hadoop.hbase.wal.WAL.Entry;
043import org.apache.hadoop.hbase.wal.WALEdit;
044import org.apache.hadoop.hbase.wal.WALFactory;
045import org.apache.hadoop.hbase.wal.WALKeyImpl;
046import org.apache.hadoop.hdfs.MiniDFSCluster;
047import org.junit.After;
048import org.junit.AfterClass;
049import org.junit.Rule;
050import org.junit.rules.TestName;
051
052import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
053
054/**
055 * Base class for WALEntryStream tests.
056 */
057public abstract class WALEntryStreamTestBase {
058
059  protected static final long TEST_TIMEOUT_MS = 5000;
060  protected static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();;
061  protected static Configuration CONF;
062  protected static FileSystem fs;
063  protected static MiniDFSCluster cluster;
064  protected static final TableName tableName = TableName.valueOf("tablename");
065  protected static final byte[] family = Bytes.toBytes("column");
066  protected static final byte[] qualifier = Bytes.toBytes("qualifier");
067  protected static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName)
068    .setStartKey(HConstants.EMPTY_START_ROW).setEndKey(HConstants.LAST_ROW).build();
069  protected static final NavigableMap<byte[], Integer> scopes = getScopes();
070  protected final String fakeWalGroupId = "fake-wal-group-id";
071
072  /**
073   * Test helper that waits until a non-null entry is available in the stream next or times out. A
074   * {@link WALEntryStream} provides a streaming access to a queue of log files. Since the stream
075   * can be consumed as the file is being written, callers relying on {@link WALEntryStream#next()}
076   * may need to retry multiple times before an entry appended to the WAL is visible to the stream
077   * consumers. One such cause of delay is the close() of writer writing these log files. While the
078   * closure is in progress, the stream does not switch to the next log in the queue and next() may
079   * return null entries. This utility wraps these retries into a single next call and that makes
080   * the test code simpler.
081   */
082  protected static class WALEntryStreamWithRetries extends WALEntryStream {
083    // Class member to be able to set a non-final from within a lambda.
084    private Entry result;
085
086    public WALEntryStreamWithRetries(ReplicationSourceLogQueue logQueue, Configuration conf,
087      long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName,
088      MetricsSource metrics, String walGroupId) throws IOException {
089      super(logQueue, conf, startPosition, walFileLengthProvider, serverName, metrics, walGroupId);
090    }
091
092    @Override
093    public Entry next() {
094      Waiter.waitFor(CONF, TEST_TIMEOUT_MS, () -> {
095        result = WALEntryStreamWithRetries.super.next();
096        return result != null;
097      });
098      return result;
099    }
100  }
101
102  private static NavigableMap<byte[], Integer> getScopes() {
103    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
104    scopes.put(family, 1);
105    return scopes;
106  }
107
108  class PathWatcher implements WALActionsListener {
109
110    Path currentPath;
111
112    @Override
113    public void preLogRoll(Path oldPath, Path newPath) {
114      logQueue.enqueueLog(newPath, fakeWalGroupId);
115      currentPath = newPath;
116    }
117  }
118
119  protected WAL log;
120  protected ReplicationSourceLogQueue logQueue;
121  protected PathWatcher pathWatcher;
122
123  @Rule
124  public TestName tn = new TestName();
125  protected final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
126
127  protected static void startCluster() throws Exception {
128    TEST_UTIL = new HBaseTestingUtility();
129    CONF = TEST_UTIL.getConfiguration();
130    CONF.setLong("replication.source.sleepforretries", 10);
131    TEST_UTIL.startMiniDFSCluster(3);
132
133    cluster = TEST_UTIL.getDFSCluster();
134    fs = cluster.getFileSystem();
135  }
136
137  @AfterClass
138  public static void tearDownAfterClass() throws Exception {
139    TEST_UTIL.shutdownMiniCluster();
140  }
141
142  protected void initWAL() throws IOException {
143    ReplicationSource source = mock(ReplicationSource.class);
144    MetricsSource metricsSource = new MetricsSource("2");
145    // Source with the same id is shared and carries values from the last run
146    metricsSource.clear();
147    logQueue = new ReplicationSourceLogQueue(CONF, metricsSource, source);
148    pathWatcher = new PathWatcher();
149    final WALFactory wals =
150      new WALFactory(CONF, TableNameTestRule.cleanUpTestName(tn.getMethodName()));
151    wals.getWALProvider().addWALActionsListener(pathWatcher);
152    log = wals.getWAL(info);
153  }
154
155  @After
156  public void tearDown() throws Exception {
157    Closeables.close(log, true);
158  }
159
160  protected void appendToLogAndSync() throws IOException {
161    appendToLogAndSync(1);
162  }
163
164  protected void appendToLogAndSync(int count) throws IOException {
165    long txid = appendToLog(count);
166    log.sync(txid);
167  }
168
169  protected long appendToLog(int count) throws IOException {
170    return log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
171      EnvironmentEdgeManager.currentTime(), mvcc, scopes), getWALEdits(count));
172  }
173
174  protected WALEdit getWALEdits(int count) {
175    WALEdit edit = new WALEdit();
176    for (int i = 0; i < count; i++) {
177      edit.add(new KeyValue(Bytes.toBytes(EnvironmentEdgeManager.currentTime()), family, qualifier,
178        EnvironmentEdgeManager.currentTime(), qualifier));
179    }
180    return edit;
181  }
182}