001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.mockito.Mockito.mock;
021
022import java.io.IOException;
023import java.util.NavigableMap;
024import java.util.TreeMap;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.HBaseTestingUtil;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.KeyValue;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.RegionInfo;
033import org.apache.hadoop.hbase.client.RegionInfoBuilder;
034import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
035import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.apache.hadoop.hbase.wal.WAL;
039import org.apache.hadoop.hbase.wal.WALEdit;
040import org.apache.hadoop.hbase.wal.WALEditInternalHelper;
041import org.apache.hadoop.hbase.wal.WALFactory;
042import org.apache.hadoop.hbase.wal.WALKeyImpl;
043import org.apache.hadoop.hdfs.DistributedFileSystem;
044import org.apache.hadoop.hdfs.MiniDFSCluster;
045import org.junit.jupiter.api.AfterAll;
046import org.junit.jupiter.api.AfterEach;
047import org.junit.jupiter.api.BeforeEach;
048import org.junit.jupiter.api.TestInfo;
049
050import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
051
052/**
053 * Base class for WALEntryStream tests.
054 */
055public abstract class WALEntryStreamTestBase {
056
057  protected static final long TEST_TIMEOUT_MS = 5000;
058  protected static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();;
059  protected static Configuration CONF;
060  protected static DistributedFileSystem fs;
061  protected static MiniDFSCluster cluster;
062  protected static final TableName tableName = TableName.valueOf("tablename");
063  protected static final byte[] family = Bytes.toBytes("column");
064  protected static final byte[] qualifier = Bytes.toBytes("qualifier");
065  protected static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName)
066    .setStartKey(HConstants.EMPTY_START_ROW).setEndKey(HConstants.LAST_ROW).build();
067  protected static final NavigableMap<byte[], Integer> scopes = getScopes();
068  protected final String fakeWalGroupId = "fake-wal-group-id";
069
070  /**
071   * Test helper that waits until a non-null entry is available in the stream next or times out. A
072   * {@link WALEntryStream} provides a streaming access to a queue of log files. Since the stream
073   * can be consumed as the file is being written, callers relying on {@link WALEntryStream#next()}
074   * may need to retry multiple times before an entry appended to the WAL is visible to the stream
075   * consumers. One such cause of delay is the close() of writer writing these log files. While the
076   * closure is in progress, the stream does not switch to the next log in the queue and next() may
077   * return null entries. This utility wraps these retries into a single next call and that makes
078   * the test code simpler.
079   */
080  protected static class WALEntryStreamWithRetries extends WALEntryStream {
081
082    private boolean retry = true;
083
084    public WALEntryStreamWithRetries(ReplicationSourceLogQueue logQueue, FileSystem fs,
085      Configuration conf, long startPosition, WALFileLengthProvider walFileLengthProvider,
086      MetricsSource metrics, String walGroupId) {
087      super(logQueue, fs, conf, startPosition, walFileLengthProvider, metrics, walGroupId);
088    }
089
090    public void enableRetry() {
091      retry = true;
092    }
093
094    public void disableRetry() {
095      retry = false;
096    }
097
098    @Override
099    public HasNext hasNext() {
100      // hasNext is idempotent, so we can call it again and do not need to store its return value
101      if (retry) {
102        TEST_UTIL.waitFor(TEST_TIMEOUT_MS, () -> super.hasNext() == HasNext.YES);
103      }
104      return super.hasNext();
105    }
106  }
107
108  private static NavigableMap<byte[], Integer> getScopes() {
109    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
110    scopes.put(family, 1);
111    return scopes;
112  }
113
114  class PathWatcher implements WALActionsListener {
115
116    Path currentPath;
117
118    @Override
119    public void preLogRoll(Path oldPath, Path newPath) {
120      logQueue.enqueueLog(newPath, fakeWalGroupId);
121      currentPath = newPath;
122    }
123  }
124
125  protected WAL log;
126  protected ReplicationSourceLogQueue logQueue;
127  protected PathWatcher pathWatcher;
128  protected String testName;
129  protected final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
130
131  @BeforeEach
132  public void setUp(TestInfo testInfo) {
133    testName = testInfo.getTestMethod().get().getName();
134  }
135
136  protected static void startCluster() throws Exception {
137    CONF = TEST_UTIL.getConfiguration();
138    CONF.setLong("replication.source.sleepforretries", 10);
139    TEST_UTIL.startMiniDFSCluster(3);
140
141    cluster = TEST_UTIL.getDFSCluster();
142    fs = cluster.getFileSystem();
143  }
144
145  @AfterAll
146  public static void tearDownAfterClass() throws Exception {
147    TEST_UTIL.shutdownMiniCluster();
148  }
149
150  protected void initWAL() throws IOException {
151    ReplicationSource source = mock(ReplicationSource.class);
152    MetricsSource metricsSource = new MetricsSource("2");
153    // Source with the same id is shared and carries values from the last run
154    metricsSource.clear();
155    logQueue = new ReplicationSourceLogQueue(CONF, metricsSource, source);
156    pathWatcher = new PathWatcher();
157    final WALFactory wals = new WALFactory(CONF, testName.replaceAll("[\\[:]", "_"));
158    wals.getWALProvider().addWALActionsListener(pathWatcher);
159    log = wals.getWAL(info);
160  }
161
162  @AfterEach
163  public void tearDown() throws Exception {
164    Closeables.close(log, true);
165  }
166
167  protected void appendToLogAndSync() throws IOException {
168    appendToLogAndSync(1);
169  }
170
171  protected void appendToLogAndSync(int count) throws IOException {
172    long txid = appendToLog(count);
173    log.sync(txid);
174  }
175
176  protected long appendToLog(int count) throws IOException {
177    return log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
178      EnvironmentEdgeManager.currentTime(), mvcc, scopes), getWALEdits(count));
179  }
180
181  protected WALEdit getWALEdits(int count) {
182    WALEdit edit = new WALEdit();
183    for (int i = 0; i < count; i++) {
184      WALEditInternalHelper.addExtendedCell(edit,
185        new KeyValue(Bytes.toBytes(EnvironmentEdgeManager.currentTime()), family, qualifier,
186          EnvironmentEdgeManager.currentTime(), qualifier));
187    }
188    return edit;
189  }
190}