001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.mockito.Mockito.mock;
021
022import java.io.IOException;
023import java.util.NavigableMap;
024import java.util.TreeMap;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.HBaseTestingUtil;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.KeyValue;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.RegionInfo;
033import org.apache.hadoop.hbase.client.RegionInfoBuilder;
034import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
035import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.apache.hadoop.hbase.wal.WAL;
039import org.apache.hadoop.hbase.wal.WALEdit;
040import org.apache.hadoop.hbase.wal.WALEditInternalHelper;
041import org.apache.hadoop.hbase.wal.WALFactory;
042import org.apache.hadoop.hbase.wal.WALKeyImpl;
043import org.apache.hadoop.hdfs.DistributedFileSystem;
044import org.apache.hadoop.hdfs.MiniDFSCluster;
045import org.junit.After;
046import org.junit.AfterClass;
047import org.junit.Rule;
048import org.junit.rules.TestName;
049
050import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
051
052/**
053 * Base class for WALEntryStream tests.
054 */
055public abstract class WALEntryStreamTestBase {
056
057  protected static final long TEST_TIMEOUT_MS = 5000;
058  protected static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();;
059  protected static Configuration CONF;
060  protected static DistributedFileSystem fs;
061  protected static MiniDFSCluster cluster;
062  protected static final TableName tableName = TableName.valueOf("tablename");
063  protected static final byte[] family = Bytes.toBytes("column");
064  protected static final byte[] qualifier = Bytes.toBytes("qualifier");
065  protected static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName)
066    .setStartKey(HConstants.EMPTY_START_ROW).setEndKey(HConstants.LAST_ROW).build();
067  protected static final NavigableMap<byte[], Integer> scopes = getScopes();
068  protected final String fakeWalGroupId = "fake-wal-group-id";
069
070  /**
071   * Test helper that waits until a non-null entry is available in the stream next or times out. A
072   * {@link WALEntryStream} provides a streaming access to a queue of log files. Since the stream
073   * can be consumed as the file is being written, callers relying on {@link WALEntryStream#next()}
074   * may need to retry multiple times before an entry appended to the WAL is visible to the stream
075   * consumers. One such cause of delay is the close() of writer writing these log files. While the
076   * closure is in progress, the stream does not switch to the next log in the queue and next() may
077   * return null entries. This utility wraps these retries into a single next call and that makes
078   * the test code simpler.
079   */
080  protected static class WALEntryStreamWithRetries extends WALEntryStream {
081
082    private boolean retry = true;
083
084    public WALEntryStreamWithRetries(ReplicationSourceLogQueue logQueue, FileSystem fs,
085      Configuration conf, long startPosition, WALFileLengthProvider walFileLengthProvider,
086      MetricsSource metrics, String walGroupId) {
087      super(logQueue, fs, conf, startPosition, walFileLengthProvider, metrics, walGroupId);
088    }
089
090    public void enableRetry() {
091      retry = true;
092    }
093
094    public void disableRetry() {
095      retry = false;
096    }
097
098    @Override
099    public HasNext hasNext() {
100      // hasNext is idempotent, so we can call it again and do not need to store its return value
101      if (retry) {
102        TEST_UTIL.waitFor(TEST_TIMEOUT_MS, () -> super.hasNext() == HasNext.YES);
103      }
104      return super.hasNext();
105    }
106  }
107
108  private static NavigableMap<byte[], Integer> getScopes() {
109    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
110    scopes.put(family, 1);
111    return scopes;
112  }
113
114  class PathWatcher implements WALActionsListener {
115
116    Path currentPath;
117
118    @Override
119    public void preLogRoll(Path oldPath, Path newPath) {
120      logQueue.enqueueLog(newPath, fakeWalGroupId);
121      currentPath = newPath;
122    }
123  }
124
125  protected WAL log;
126  protected ReplicationSourceLogQueue logQueue;
127  protected PathWatcher pathWatcher;
128
129  @Rule
130  public TestName tn = new TestName();
131  protected final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
132
133  protected static void startCluster() throws Exception {
134    CONF = TEST_UTIL.getConfiguration();
135    CONF.setLong("replication.source.sleepforretries", 10);
136    TEST_UTIL.startMiniDFSCluster(3);
137
138    cluster = TEST_UTIL.getDFSCluster();
139    fs = cluster.getFileSystem();
140  }
141
142  @AfterClass
143  public static void tearDownAfterClass() throws Exception {
144    TEST_UTIL.shutdownMiniCluster();
145  }
146
147  protected void initWAL() throws IOException {
148    ReplicationSource source = mock(ReplicationSource.class);
149    MetricsSource metricsSource = new MetricsSource("2");
150    // Source with the same id is shared and carries values from the last run
151    metricsSource.clear();
152    logQueue = new ReplicationSourceLogQueue(CONF, metricsSource, source);
153    pathWatcher = new PathWatcher();
154    final WALFactory wals = new WALFactory(CONF, tn.getMethodName().replaceAll("[\\[:]", "_"));
155    wals.getWALProvider().addWALActionsListener(pathWatcher);
156    log = wals.getWAL(info);
157  }
158
159  @After
160  public void tearDown() throws Exception {
161    Closeables.close(log, true);
162  }
163
164  protected void appendToLogAndSync() throws IOException {
165    appendToLogAndSync(1);
166  }
167
168  protected void appendToLogAndSync(int count) throws IOException {
169    long txid = appendToLog(count);
170    log.sync(txid);
171  }
172
173  protected long appendToLog(int count) throws IOException {
174    return log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
175      EnvironmentEdgeManager.currentTime(), mvcc, scopes), getWALEdits(count));
176  }
177
178  protected WALEdit getWALEdits(int count) {
179    WALEdit edit = new WALEdit();
180    for (int i = 0; i < count; i++) {
181      WALEditInternalHelper.addExtendedCell(edit,
182        new KeyValue(Bytes.toBytes(EnvironmentEdgeManager.currentTime()), family, qualifier,
183          EnvironmentEdgeManager.currentTime(), qualifier));
184    }
185    return edit;
186  }
187}