001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertTrue;
024import java.io.IOException;
025import java.util.List;
026import java.util.NavigableMap;
027import java.util.TreeMap;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseTestingUtil;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.KeyValue;
036import org.apache.hadoop.hbase.ServerName;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.RegionInfo;
039import org.apache.hadoop.hbase.client.RegionInfoBuilder;
040import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALKeyRecordReader;
041import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALRecordReader;
042import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
043import org.apache.hadoop.hbase.testclassification.MapReduceTests;
044import org.apache.hadoop.hbase.testclassification.MediumTests;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.apache.hadoop.hbase.util.CommonFSUtils;
047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
048import org.apache.hadoop.hbase.util.Threads;
049import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
050import org.apache.hadoop.hbase.wal.WAL;
051import org.apache.hadoop.hbase.wal.WALEdit;
052import org.apache.hadoop.hbase.wal.WALFactory;
053import org.apache.hadoop.hbase.wal.WALKey;
054import org.apache.hadoop.hbase.wal.WALKeyImpl;
055import org.apache.hadoop.mapreduce.InputSplit;
056import org.apache.hadoop.mapreduce.MapReduceTestUtil;
057import org.junit.AfterClass;
058import org.junit.Before;
059import org.junit.BeforeClass;
060import org.junit.ClassRule;
061import org.junit.Test;
062import org.junit.experimental.categories.Category;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066/**
067 * JUnit tests for the WALRecordReader
068 */
069@Category({ MapReduceTests.class, MediumTests.class })
070public class TestWALRecordReader {
071
072  @ClassRule
073  public static final HBaseClassTestRule CLASS_RULE =
074      HBaseClassTestRule.forClass(TestWALRecordReader.class);
075
076  private static final Logger LOG = LoggerFactory.getLogger(TestWALRecordReader.class);
077  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
078  private static Configuration conf;
079  private static FileSystem fs;
080  private static Path hbaseDir;
081  private static FileSystem walFs;
082  private static Path walRootDir;
083  // visible for TestHLogRecordReader
084  static final TableName tableName = TableName.valueOf(getName());
085  private static final byte [] rowName = tableName.getName();
086  // visible for TestHLogRecordReader
087  static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
088  private static final byte[] family = Bytes.toBytes("column");
089  private static final byte[] value = Bytes.toBytes("value");
090  private static Path logDir;
091  protected MultiVersionConcurrencyControl mvcc;
092  protected static NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
093
094  private static String getName() {
095    return "TestWALRecordReader";
096  }
097
098  private static String getServerName() {
099    ServerName serverName = ServerName.valueOf("TestWALRecordReader", 1, 1);
100    return serverName.toString();
101  }
102
103  @Before
104  public void setUp() throws Exception {
105    fs.delete(hbaseDir, true);
106    walFs.delete(walRootDir, true);
107    mvcc = new MultiVersionConcurrencyControl();
108  }
109
110  @BeforeClass
111  public static void setUpBeforeClass() throws Exception {
112    // Make block sizes small.
113    conf = TEST_UTIL.getConfiguration();
114    conf.setInt("dfs.blocksize", 1024 * 1024);
115    conf.setInt("dfs.replication", 1);
116    TEST_UTIL.startMiniDFSCluster(1);
117
118    conf = TEST_UTIL.getConfiguration();
119    fs = TEST_UTIL.getDFSCluster().getFileSystem();
120
121    hbaseDir = TEST_UTIL.createRootDir();
122    walRootDir = TEST_UTIL.createWALRootDir();
123    walFs = CommonFSUtils.getWALFileSystem(conf);
124    logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME);
125  }
126
127  @AfterClass
128  public static void tearDownAfterClass() throws Exception {
129    fs.delete(hbaseDir, true);
130    walFs.delete(walRootDir, true);
131    TEST_UTIL.shutdownMiniCluster();
132  }
133
134  /**
135   * Test partial reads from the WALs based on passed time range.
136   */
137  @Test
138  public void testPartialRead() throws Exception {
139    final WALFactory walfactory = new WALFactory(conf, getName());
140    WAL log = walfactory.getWAL(info);
141    // This test depends on timestamp being millisecond based and the filename of the WAL also
142    // being millisecond based.
143    long ts = EnvironmentEdgeManager.currentTime();
144    WALEdit edit = new WALEdit();
145    edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value));
146    log.appendData(info, getWalKeyImpl(ts, scopes), edit);
147    edit = new WALEdit();
148    edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value));
149    log.appendData(info, getWalKeyImpl(ts+1, scopes), edit);
150    log.sync();
151    Threads.sleep(10);
152    LOG.info("Before 1st WAL roll " + log.toString());
153    log.rollWriter();
154    LOG.info("Past 1st WAL roll " + log.toString());
155
156    Thread.sleep(1);
157    long ts1 = EnvironmentEdgeManager.currentTime();
158
159    edit = new WALEdit();
160    edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value));
161    log.appendData(info, getWalKeyImpl(ts1+1, scopes), edit);
162    edit = new WALEdit();
163    edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value));
164    log.appendData(info, getWalKeyImpl(ts1+2, scopes), edit);
165    log.sync();
166    log.shutdown();
167    walfactory.shutdown();
168    LOG.info("Closed WAL " + log.toString());
169
170
171    WALInputFormat input = new WALInputFormat();
172    Configuration jobConf = new Configuration(conf);
173    jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString());
174    jobConf.setLong(WALInputFormat.END_TIME_KEY, ts);
175
176    // Only 1st file is considered, and only its 1st entry is in-range.
177    List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
178    assertEquals(1, splits.size());
179    testSplit(splits.get(0), Bytes.toBytes("1"));
180
181    jobConf.setLong(WALInputFormat.END_TIME_KEY, ts1+1);
182    splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
183    assertEquals(2, splits.size());
184    // Both entries from first file are in-range.
185    testSplit(splits.get(0), Bytes.toBytes("1"), Bytes.toBytes("2"));
186    // Only the 1st entry from the 2nd file is in-range.
187    testSplit(splits.get(1), Bytes.toBytes("3"));
188
189    jobConf.setLong(WALInputFormat.START_TIME_KEY, ts + 1);
190    jobConf.setLong(WALInputFormat.END_TIME_KEY, ts1 + 1);
191    splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
192    assertEquals(1, splits.size());
193    // Only the 1st entry from the 2nd file is in-range.
194    testSplit(splits.get(0), Bytes.toBytes("3"));
195  }
196
197  /**
198   * Test basic functionality
199   */
200  @Test
201  public void testWALRecordReader() throws Exception {
202    final WALFactory walfactory = new WALFactory(conf, getName());
203    WAL log = walfactory.getWAL(info);
204    byte [] value = Bytes.toBytes("value");
205    WALEdit edit = new WALEdit();
206    edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
207      EnvironmentEdgeManager.currentTime(), value));
208    long txid = log.appendData(info,
209      getWalKeyImpl(EnvironmentEdgeManager.currentTime(), scopes), edit);
210    log.sync(txid);
211
212    Thread.sleep(1); // make sure 2nd log gets a later timestamp
213    long secondTs = EnvironmentEdgeManager.currentTime();
214    log.rollWriter();
215
216    edit = new WALEdit();
217    edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
218      EnvironmentEdgeManager.currentTime(), value));
219    txid = log.appendData(info,
220      getWalKeyImpl(EnvironmentEdgeManager.currentTime(), scopes), edit);
221    log.sync(txid);
222    log.shutdown();
223    walfactory.shutdown();
224    long thirdTs = EnvironmentEdgeManager.currentTime();
225
226    // should have 2 log files now
227    WALInputFormat input = new WALInputFormat();
228    Configuration jobConf = new Configuration(conf);
229    jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString());
230
231    // make sure both logs are found
232    List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
233    assertEquals(2, splits.size());
234
235    // should return exactly one KV
236    testSplit(splits.get(0), Bytes.toBytes("1"));
237    // same for the 2nd split
238    testSplit(splits.get(1), Bytes.toBytes("2"));
239
240    // now test basic time ranges:
241
242    // set an endtime, the 2nd log file can be ignored completely.
243    jobConf.setLong(WALInputFormat.END_TIME_KEY, secondTs-1);
244    splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
245    assertEquals(1, splits.size());
246    testSplit(splits.get(0), Bytes.toBytes("1"));
247
248    // now set a start time
249    jobConf.setLong(WALInputFormat.END_TIME_KEY, Long.MAX_VALUE);
250    jobConf.setLong(WALInputFormat.START_TIME_KEY, thirdTs);
251    splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
252    assertTrue(splits.isEmpty());
253  }
254
255  /**
256   * Test WALRecordReader tolerance to moving WAL from active
257   * to archive directory
258   * @throws Exception exception
259   */
260  @Test
261  public void testWALRecordReaderActiveArchiveTolerance() throws Exception {
262    final WALFactory walfactory = new WALFactory(conf, getName());
263    WAL log = walfactory.getWAL(info);
264    byte [] value = Bytes.toBytes("value");
265    WALEdit edit = new WALEdit();
266    edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
267      EnvironmentEdgeManager.currentTime(), value));
268    long txid = log.appendData(info,
269      getWalKeyImpl(EnvironmentEdgeManager.currentTime(), scopes), edit);
270    log.sync(txid);
271
272    Thread.sleep(10); // make sure 2nd edit gets a later timestamp
273
274    edit = new WALEdit();
275    edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
276      EnvironmentEdgeManager.currentTime(), value));
277    txid = log.appendData(info,
278      getWalKeyImpl(EnvironmentEdgeManager.currentTime(), scopes), edit);
279    log.sync(txid);
280    log.shutdown();
281
282    // should have 2 log entries now
283    WALInputFormat input = new WALInputFormat();
284    Configuration jobConf = new Configuration(conf);
285    jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString());
286    // make sure log is found
287    List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
288    assertEquals(1, splits.size());
289    WALInputFormat.WALSplit split = (WALInputFormat.WALSplit) splits.get(0);
290    LOG.debug("log="+logDir+" file="+ split.getLogFileName());
291
292    testSplitWithMovingWAL(splits.get(0), Bytes.toBytes("1"), Bytes.toBytes("2"));
293  }
294
295  protected WALKeyImpl getWalKeyImpl(final long time, NavigableMap<byte[], Integer> scopes) {
296    return new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, time, mvcc, scopes);
297  }
298
299  private WALRecordReader<WALKey> getReader() {
300    return new WALKeyRecordReader();
301  }
302
303  /**
304   * Create a new reader from the split, and match the edits against the passed columns.
305   */
306  private void testSplit(InputSplit split, byte[]... columns) throws Exception {
307    WALRecordReader<WALKey> reader = getReader();
308    reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf));
309
310    for (byte[] column : columns) {
311      assertTrue(reader.nextKeyValue());
312      Cell cell = reader.getCurrentValue().getCells().get(0);
313      if (!Bytes.equals(column, 0, column.length, cell.getQualifierArray(),
314        cell.getQualifierOffset(), cell.getQualifierLength())) {
315        assertTrue(
316          "expected [" + Bytes.toString(column) + "], actual [" + Bytes.toString(
317            cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "]",
318          false);
319      }
320    }
321    assertFalse(reader.nextKeyValue());
322    reader.close();
323  }
324
325  /**
326   * Create a new reader from the split, match the edits against the passed columns,
327   * moving WAL to archive in between readings
328   */
329  private void testSplitWithMovingWAL(InputSplit split, byte[] col1, byte[] col2) throws Exception {
330    WALRecordReader<WALKey> reader = getReader();
331    reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf));
332
333    assertTrue(reader.nextKeyValue());
334    Cell cell = reader.getCurrentValue().getCells().get(0);
335    if (!Bytes.equals(col1, 0, col1.length, cell.getQualifierArray(), cell.getQualifierOffset(),
336      cell.getQualifierLength())) {
337      assertTrue(
338        "expected [" + Bytes.toString(col1) + "], actual [" + Bytes.toString(
339          cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "]",
340        false);
341    }
342    // Move log file to archive directory
343    // While WAL record reader is open
344    WALInputFormat.WALSplit split_ = (WALInputFormat.WALSplit) split;
345    Path logFile = new Path(split_.getLogFileName());
346    Path archivedLogDir = getWALArchiveDir(conf);
347    Path archivedLogLocation = new Path(archivedLogDir, logFile.getName());
348    assertNotEquals(split_.getLogFileName(), archivedLogLocation.toString());
349
350    assertTrue(fs.rename(logFile, archivedLogLocation));
351    assertTrue(fs.exists(archivedLogDir));
352    assertFalse(fs.exists(logFile));
353    // TODO: This is not behaving as expected. WALInputFormat#WALKeyRecordReader doesn't open
354    // TODO: the archivedLogLocation to read next key value.
355    assertTrue(reader.nextKeyValue());
356    cell = reader.getCurrentValue().getCells().get(0);
357    if (!Bytes.equals(col2, 0, col2.length, cell.getQualifierArray(), cell.getQualifierOffset(),
358      cell.getQualifierLength())) {
359      assertTrue(
360        "expected [" + Bytes.toString(col2) + "], actual [" + Bytes.toString(
361          cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "]",
362        false);
363    }
364    reader.close();
365  }
366
367  private Path getWALArchiveDir(Configuration conf) throws IOException {
368    Path rootDir = CommonFSUtils.getWALRootDir(conf);
369    String archiveDir = AbstractFSWALProvider.getWALArchiveDirectoryName(conf, getServerName());
370    return new Path(rootDir, archiveDir);
371  }
372}