001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertNotEquals;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024import static org.junit.jupiter.api.Assertions.fail;
025
026import java.io.IOException;
027import java.util.List;
028import java.util.NavigableMap;
029import java.util.TreeMap;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.HBaseTestingUtil;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.KeyValue;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.RegionInfo;
040import org.apache.hadoop.hbase.client.RegionInfoBuilder;
041import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALKeyRecordReader;
042import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALRecordReader;
043import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
044import org.apache.hadoop.hbase.testclassification.MapReduceTests;
045import org.apache.hadoop.hbase.testclassification.MediumTests;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.hadoop.hbase.util.CommonFSUtils;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
049import org.apache.hadoop.hbase.util.Threads;
050import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
051import org.apache.hadoop.hbase.wal.WAL;
052import org.apache.hadoop.hbase.wal.WALEdit;
053import org.apache.hadoop.hbase.wal.WALEditInternalHelper;
054import org.apache.hadoop.hbase.wal.WALFactory;
055import org.apache.hadoop.hbase.wal.WALKey;
056import org.apache.hadoop.hbase.wal.WALKeyImpl;
057import org.apache.hadoop.mapreduce.InputSplit;
058import org.apache.hadoop.mapreduce.MapReduceTestUtil;
059import org.junit.jupiter.api.AfterAll;
060import org.junit.jupiter.api.BeforeAll;
061import org.junit.jupiter.api.BeforeEach;
062import org.junit.jupiter.api.Tag;
063import org.junit.jupiter.api.Test;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066
067/**
068 * JUnit tests for the WALRecordReader
069 */
070@Tag(MapReduceTests.TAG)
071@Tag(MediumTests.TAG)
072public class TestWALRecordReader {
073
074  private static final Logger LOG = LoggerFactory.getLogger(TestWALRecordReader.class);
075  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
076  private static Configuration conf;
077  private static FileSystem fs;
078  private static Path hbaseDir;
079  private static FileSystem walFs;
080  private static Path walRootDir;
081  // visible for TestHLogRecordReader
082  static final TableName tableName = TableName.valueOf(getName());
083  private static final byte[] rowName = tableName.getName();
084  // visible for TestHLogRecordReader
085  static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
086  private static final byte[] family = Bytes.toBytes("column");
087  private static final byte[] value = Bytes.toBytes("value");
088  private static Path logDir;
089  protected MultiVersionConcurrencyControl mvcc;
090  protected static NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
091
092  private static String getName() {
093    return "TestWALRecordReader";
094  }
095
096  private static String getServerName() {
097    ServerName serverName = ServerName.valueOf("TestWALRecordReader", 1, 1);
098    return serverName.toString();
099  }
100
101  @BeforeEach
102  public void setUp() throws Exception {
103    fs.delete(hbaseDir, true);
104    walFs.delete(walRootDir, true);
105    mvcc = new MultiVersionConcurrencyControl();
106  }
107
108  @BeforeAll
109  public static void setUpBeforeClass() throws Exception {
110    // Make block sizes small.
111    conf = TEST_UTIL.getConfiguration();
112    conf.setInt("dfs.blocksize", 1024 * 1024);
113    conf.setInt("dfs.replication", 1);
114    TEST_UTIL.startMiniDFSCluster(1);
115
116    conf = TEST_UTIL.getConfiguration();
117    fs = TEST_UTIL.getDFSCluster().getFileSystem();
118
119    hbaseDir = TEST_UTIL.createRootDir();
120    walRootDir = TEST_UTIL.createWALRootDir();
121    walFs = CommonFSUtils.getWALFileSystem(conf);
122    logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME);
123  }
124
125  @AfterAll
126  public static void tearDownAfterClass() throws Exception {
127    fs.delete(hbaseDir, true);
128    walFs.delete(walRootDir, true);
129    TEST_UTIL.shutdownMiniCluster();
130  }
131
132  /**
133   * Test partial reads from the WALs based on passed time range.
134   */
135  @Test
136  public void testPartialRead() throws Exception {
137    final WALFactory walfactory = new WALFactory(conf, getName());
138    WAL log = walfactory.getWAL(info);
139    // This test depends on timestamp being millisecond based and the filename of the WAL also
140    // being millisecond based.
141    long ts = EnvironmentEdgeManager.currentTime();
142    WALEdit edit = new WALEdit();
143    WALEditInternalHelper.addExtendedCell(edit,
144      new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value));
145    log.appendData(info, getWalKeyImpl(ts, scopes), edit);
146    edit = new WALEdit();
147    WALEditInternalHelper.addExtendedCell(edit,
148      new KeyValue(rowName, family, Bytes.toBytes("2"), ts + 1, value));
149    log.appendData(info, getWalKeyImpl(ts + 1, scopes), edit);
150    log.sync();
151    Threads.sleep(10);
152    LOG.info("Before 1st WAL roll " + log.toString());
153    log.rollWriter();
154    LOG.info("Past 1st WAL roll " + log.toString());
155
156    Thread.sleep(1);
157    long ts1 = EnvironmentEdgeManager.currentTime();
158
159    edit = new WALEdit();
160    WALEditInternalHelper.addExtendedCell(edit,
161      new KeyValue(rowName, family, Bytes.toBytes("3"), ts1 + 1, value));
162    log.appendData(info, getWalKeyImpl(ts1 + 1, scopes), edit);
163    edit = new WALEdit();
164    WALEditInternalHelper.addExtendedCell(edit,
165      new KeyValue(rowName, family, Bytes.toBytes("4"), ts1 + 2, value));
166    log.appendData(info, getWalKeyImpl(ts1 + 2, scopes), edit);
167    log.sync();
168    log.shutdown();
169    walfactory.shutdown();
170    LOG.info("Closed WAL " + log.toString());
171
172    WALInputFormat input = new WALInputFormat();
173    Configuration jobConf = new Configuration(conf);
174    jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString());
175    jobConf.setLong(WALInputFormat.END_TIME_KEY, ts);
176
177    // Only 1st file is considered, and only its 1st entry is in-range.
178    List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
179    assertEquals(1, splits.size());
180    testSplit(splits.get(0), Bytes.toBytes("1"));
181
182    jobConf.setLong(WALInputFormat.END_TIME_KEY, ts1 + 1);
183    splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
184    assertEquals(2, splits.size());
185    // Both entries from first file are in-range.
186    testSplit(splits.get(0), Bytes.toBytes("1"), Bytes.toBytes("2"));
187    // Only the 1st entry from the 2nd file is in-range.
188    testSplit(splits.get(1), Bytes.toBytes("3"));
189
190    jobConf.setLong(WALInputFormat.START_TIME_KEY, ts + 1);
191    jobConf.setLong(WALInputFormat.END_TIME_KEY, ts1 + 1);
192    splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
193    assertEquals(1, splits.size());
194    // Only the 1st entry from the 2nd file is in-range.
195    testSplit(splits.get(0), Bytes.toBytes("3"));
196  }
197
198  /**
199   * Test basic functionality
200   */
201  @Test
202  public void testWALRecordReader() throws Exception {
203    final WALFactory walfactory = new WALFactory(conf, getName());
204    WAL log = walfactory.getWAL(info);
205    byte[] value = Bytes.toBytes("value");
206    WALEdit edit = new WALEdit();
207    WALEditInternalHelper.addExtendedCell(edit, new KeyValue(rowName, family, Bytes.toBytes("1"),
208      EnvironmentEdgeManager.currentTime(), value));
209    long txid =
210      log.appendData(info, getWalKeyImpl(EnvironmentEdgeManager.currentTime(), scopes), edit);
211    log.sync(txid);
212
213    Thread.sleep(1); // make sure 2nd log gets a later timestamp
214    long secondTs = EnvironmentEdgeManager.currentTime();
215    log.rollWriter();
216
217    edit = new WALEdit();
218    WALEditInternalHelper.addExtendedCell(edit, new KeyValue(rowName, family, Bytes.toBytes("2"),
219      EnvironmentEdgeManager.currentTime(), value));
220    txid = log.appendData(info, getWalKeyImpl(EnvironmentEdgeManager.currentTime(), scopes), edit);
221    log.sync(txid);
222    log.shutdown();
223    walfactory.shutdown();
224    long thirdTs = EnvironmentEdgeManager.currentTime();
225
226    // should have 2 log files now
227    WALInputFormat input = new WALInputFormat();
228    Configuration jobConf = new Configuration(conf);
229    jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString());
230
231    // make sure both logs are found
232    List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
233    assertEquals(2, splits.size());
234
235    // should return exactly one KV
236    testSplit(splits.get(0), Bytes.toBytes("1"));
237    // same for the 2nd split
238    testSplit(splits.get(1), Bytes.toBytes("2"));
239
240    // now test basic time ranges:
241
242    // set an endtime, the 2nd log file can be ignored completely.
243    jobConf.setLong(WALInputFormat.END_TIME_KEY, secondTs - 1);
244    splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
245    assertEquals(1, splits.size());
246    testSplit(splits.get(0), Bytes.toBytes("1"));
247
248    // now set a start time
249    jobConf.setLong(WALInputFormat.END_TIME_KEY, Long.MAX_VALUE);
250    jobConf.setLong(WALInputFormat.START_TIME_KEY, thirdTs);
251    splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
252    assertTrue(splits.isEmpty());
253  }
254
255  /**
256   * Test WALRecordReader tolerance to moving WAL from active to archive directory
257   * @throws Exception exception
258   */
259  @Test
260  public void testWALRecordReaderActiveArchiveTolerance() throws Exception {
261    final WALFactory walfactory = new WALFactory(conf, getName());
262    WAL log = walfactory.getWAL(info);
263    byte[] value = Bytes.toBytes("value");
264    WALEdit edit = new WALEdit();
265    WALEditInternalHelper.addExtendedCell(edit, new KeyValue(rowName, family, Bytes.toBytes("1"),
266      EnvironmentEdgeManager.currentTime(), value));
267    long txid =
268      log.appendData(info, getWalKeyImpl(EnvironmentEdgeManager.currentTime(), scopes), edit);
269    log.sync(txid);
270
271    Thread.sleep(10); // make sure 2nd edit gets a later timestamp
272
273    edit = new WALEdit();
274    WALEditInternalHelper.addExtendedCell(edit, new KeyValue(rowName, family, Bytes.toBytes("2"),
275      EnvironmentEdgeManager.currentTime(), value));
276    txid = log.appendData(info, getWalKeyImpl(EnvironmentEdgeManager.currentTime(), scopes), edit);
277    log.sync(txid);
278    log.shutdown();
279
280    // should have 2 log entries now
281    WALInputFormat input = new WALInputFormat();
282    Configuration jobConf = new Configuration(conf);
283    jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString());
284    // make sure log is found
285    List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
286    assertEquals(1, splits.size());
287    WALInputFormat.WALSplit split = (WALInputFormat.WALSplit) splits.get(0);
288    LOG.debug("log=" + logDir + " file=" + split.getLogFileName());
289
290    testSplitWithMovingWAL(splits.get(0), Bytes.toBytes("1"), Bytes.toBytes("2"));
291  }
292
293  protected WALKeyImpl getWalKeyImpl(final long time, NavigableMap<byte[], Integer> scopes) {
294    return new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, time, mvcc, scopes);
295  }
296
297  private WALRecordReader<WALKey> getReader() {
298    return new WALKeyRecordReader();
299  }
300
301  /**
302   * Create a new reader from the split, and match the edits against the passed columns.
303   */
304  private void testSplit(InputSplit split, byte[]... columns) throws Exception {
305    WALRecordReader<WALKey> reader = getReader();
306    reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf));
307
308    for (byte[] column : columns) {
309      assertTrue(reader.nextKeyValue());
310      Cell cell = reader.getCurrentValue().getCells().get(0);
311      if (
312        !Bytes.equals(column, 0, column.length, cell.getQualifierArray(), cell.getQualifierOffset(),
313          cell.getQualifierLength())
314      ) {
315        fail("expected [" + Bytes.toString(column) + "], actual [" + Bytes.toString(
316          cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "]");
317      }
318    }
319    assertFalse(reader.nextKeyValue());
320    reader.close();
321  }
322
323  /**
324   * Create a new reader from the split, match the edits against the passed columns, moving WAL to
325   * archive in between readings
326   */
327  private void testSplitWithMovingWAL(InputSplit split, byte[] col1, byte[] col2) throws Exception {
328    WALRecordReader<WALKey> reader = getReader();
329    reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf));
330
331    assertTrue(reader.nextKeyValue());
332    Cell cell = reader.getCurrentValue().getCells().get(0);
333    if (
334      !Bytes.equals(col1, 0, col1.length, cell.getQualifierArray(), cell.getQualifierOffset(),
335        cell.getQualifierLength())
336    ) {
337      fail("expected [" + Bytes.toString(col1) + "], actual [" + Bytes.toString(
338        cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "]");
339    }
340    // Move log file to archive directory
341    // While WAL record reader is open
342    WALInputFormat.WALSplit split_ = (WALInputFormat.WALSplit) split;
343    Path logFile = new Path(split_.getLogFileName());
344    Path archivedLogDir = getWALArchiveDir(conf);
345    Path archivedLogLocation = new Path(archivedLogDir, logFile.getName());
346    assertNotEquals(split_.getLogFileName(), archivedLogLocation.toString());
347
348    assertTrue(fs.rename(logFile, archivedLogLocation));
349    assertTrue(fs.exists(archivedLogDir));
350    assertFalse(fs.exists(logFile));
351    // TODO: This is not behaving as expected. WALInputFormat#WALKeyRecordReader doesn't open
352    // TODO: the archivedLogLocation to read next key value.
353    assertTrue(reader.nextKeyValue());
354    cell = reader.getCurrentValue().getCells().get(0);
355    if (
356      !Bytes.equals(col2, 0, col2.length, cell.getQualifierArray(), cell.getQualifierOffset(),
357        cell.getQualifierLength())
358    ) {
359      fail("expected [" + Bytes.toString(col2) + "], actual [" + Bytes.toString(
360        cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "]");
361    }
362    reader.close();
363  }
364
365  private Path getWALArchiveDir(Configuration conf) throws IOException {
366    Path rootDir = CommonFSUtils.getWALRootDir(conf);
367    String archiveDir = AbstractFSWALProvider.getWALArchiveDirectoryName(conf, getServerName());
368    return new Path(rootDir, archiveDir);
369  }
370}