001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.util.List;
027import java.util.NavigableMap;
028import java.util.TreeMap;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HBaseTestingUtility;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.KeyValue;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.RegionInfo;
040import org.apache.hadoop.hbase.client.RegionInfoBuilder;
041import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALKeyRecordReader;
042import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALRecordReader;
043import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
044import org.apache.hadoop.hbase.testclassification.MapReduceTests;
045import org.apache.hadoop.hbase.testclassification.MediumTests;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.hadoop.hbase.util.CommonFSUtils;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
049import org.apache.hadoop.hbase.util.Threads;
050import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
051import org.apache.hadoop.hbase.wal.WAL;
052import org.apache.hadoop.hbase.wal.WALEdit;
053import org.apache.hadoop.hbase.wal.WALFactory;
054import org.apache.hadoop.hbase.wal.WALKey;
055import org.apache.hadoop.hbase.wal.WALKeyImpl;
056import org.apache.hadoop.mapreduce.InputSplit;
057import org.apache.hadoop.mapreduce.MapReduceTestUtil;
058import org.junit.AfterClass;
059import org.junit.Before;
060import org.junit.BeforeClass;
061import org.junit.ClassRule;
062import org.junit.Test;
063import org.junit.experimental.categories.Category;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066
067/**
068 * JUnit tests for the WALRecordReader
069 */
070@Category({ MapReduceTests.class, MediumTests.class })
071public class TestWALRecordReader {
072
073  @ClassRule
074  public static final HBaseClassTestRule CLASS_RULE =
075    HBaseClassTestRule.forClass(TestWALRecordReader.class);
076
077  private static final Logger LOG = LoggerFactory.getLogger(TestWALRecordReader.class);
078  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
079  private static Configuration conf;
080  private static FileSystem fs;
081  private static Path hbaseDir;
082  private static FileSystem walFs;
083  private static Path walRootDir;
084  // visible for TestHLogRecordReader
085  static final TableName tableName = TableName.valueOf(getName());
086  private static final byte[] rowName = tableName.getName();
087  // visible for TestHLogRecordReader
088  static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
089  private static final byte[] family = Bytes.toBytes("column");
090  private static final byte[] value = Bytes.toBytes("value");
091  private static Path logDir;
092  protected MultiVersionConcurrencyControl mvcc;
093  protected static NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
094
095  private static String getName() {
096    return "TestWALRecordReader";
097  }
098
099  private static String getServerName() {
100    ServerName serverName = ServerName.valueOf("TestWALRecordReader", 1, 1);
101    return serverName.toString();
102  }
103
104  @Before
105  public void setUp() throws Exception {
106    fs.delete(hbaseDir, true);
107    walFs.delete(walRootDir, true);
108    mvcc = new MultiVersionConcurrencyControl();
109  }
110
111  @BeforeClass
112  public static void setUpBeforeClass() throws Exception {
113    // Make block sizes small.
114    conf = TEST_UTIL.getConfiguration();
115    conf.setInt("dfs.blocksize", 1024 * 1024);
116    conf.setInt("dfs.replication", 1);
117    TEST_UTIL.startMiniDFSCluster(1);
118
119    conf = TEST_UTIL.getConfiguration();
120    fs = TEST_UTIL.getDFSCluster().getFileSystem();
121
122    hbaseDir = TEST_UTIL.createRootDir();
123    walRootDir = TEST_UTIL.createWALRootDir();
124    walFs = CommonFSUtils.getWALFileSystem(conf);
125    logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME);
126  }
127
128  @AfterClass
129  public static void tearDownAfterClass() throws Exception {
130    fs.delete(hbaseDir, true);
131    walFs.delete(walRootDir, true);
132    TEST_UTIL.shutdownMiniCluster();
133  }
134
135  /**
136   * Test partial reads from the WALs based on passed time range.
137   */
138  @Test
139  public void testPartialRead() throws Exception {
140    final WALFactory walfactory = new WALFactory(conf, getName());
141    WAL log = walfactory.getWAL(info);
142    // This test depends on timestamp being millisecond based and the filename of the WAL also
143    // being millisecond based.
144    long ts = EnvironmentEdgeManager.currentTime();
145    WALEdit edit = new WALEdit();
146    edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value));
147    log.appendData(info, getWalKeyImpl(ts, scopes), edit);
148    edit = new WALEdit();
149    edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts + 1, value));
150    log.appendData(info, getWalKeyImpl(ts + 1, scopes), edit);
151    log.sync();
152    Threads.sleep(10);
153    LOG.info("Before 1st WAL roll " + log.toString());
154    log.rollWriter();
155    LOG.info("Past 1st WAL roll " + log.toString());
156
157    Thread.sleep(1);
158    long ts1 = EnvironmentEdgeManager.currentTime();
159
160    edit = new WALEdit();
161    edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1 + 1, value));
162    log.appendData(info, getWalKeyImpl(ts1 + 1, scopes), edit);
163    edit = new WALEdit();
164    edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1 + 2, value));
165    log.appendData(info, getWalKeyImpl(ts1 + 2, scopes), edit);
166    log.sync();
167    log.shutdown();
168    walfactory.shutdown();
169    LOG.info("Closed WAL " + log.toString());
170
171    WALInputFormat input = new WALInputFormat();
172    Configuration jobConf = new Configuration(conf);
173    jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString());
174    jobConf.setLong(WALInputFormat.END_TIME_KEY, ts);
175
176    // Only 1st file is considered, and only its 1st entry is in-range.
177    List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
178    assertEquals(1, splits.size());
179    testSplit(splits.get(0), Bytes.toBytes("1"));
180
181    jobConf.setLong(WALInputFormat.END_TIME_KEY, ts1 + 1);
182    splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
183    assertEquals(2, splits.size());
184    // Both entries from first file are in-range.
185    testSplit(splits.get(0), Bytes.toBytes("1"), Bytes.toBytes("2"));
186    // Only the 1st entry from the 2nd file is in-range.
187    testSplit(splits.get(1), Bytes.toBytes("3"));
188
189    jobConf.setLong(WALInputFormat.START_TIME_KEY, ts + 1);
190    jobConf.setLong(WALInputFormat.END_TIME_KEY, ts1 + 1);
191    splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
192    assertEquals(1, splits.size());
193    // Only the 1st entry from the 2nd file is in-range.
194    testSplit(splits.get(0), Bytes.toBytes("3"));
195  }
196
197  /**
198   * Test basic functionality
199   */
200  @Test
201  public void testWALRecordReader() throws Exception {
202    final WALFactory walfactory = new WALFactory(conf, getName());
203    WAL log = walfactory.getWAL(info);
204    byte[] value = Bytes.toBytes("value");
205    WALEdit edit = new WALEdit();
206    edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), EnvironmentEdgeManager.currentTime(),
207      value));
208    long txid =
209      log.appendData(info, getWalKeyImpl(EnvironmentEdgeManager.currentTime(), scopes), edit);
210    log.sync(txid);
211
212    Thread.sleep(1); // make sure 2nd log gets a later timestamp
213    long secondTs = EnvironmentEdgeManager.currentTime();
214    log.rollWriter();
215
216    edit = new WALEdit();
217    edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), EnvironmentEdgeManager.currentTime(),
218      value));
219    txid = log.appendData(info, getWalKeyImpl(EnvironmentEdgeManager.currentTime(), scopes), edit);
220    log.sync(txid);
221    log.shutdown();
222    walfactory.shutdown();
223    long thirdTs = EnvironmentEdgeManager.currentTime();
224
225    // should have 2 log files now
226    WALInputFormat input = new WALInputFormat();
227    Configuration jobConf = new Configuration(conf);
228    jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString());
229
230    // make sure both logs are found
231    List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
232    assertEquals(2, splits.size());
233
234    // should return exactly one KV
235    testSplit(splits.get(0), Bytes.toBytes("1"));
236    // same for the 2nd split
237    testSplit(splits.get(1), Bytes.toBytes("2"));
238
239    // now test basic time ranges:
240
241    // set an endtime, the 2nd log file can be ignored completely.
242    jobConf.setLong(WALInputFormat.END_TIME_KEY, secondTs - 1);
243    splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
244    assertEquals(1, splits.size());
245    testSplit(splits.get(0), Bytes.toBytes("1"));
246
247    // now set a start time
248    jobConf.setLong(WALInputFormat.END_TIME_KEY, Long.MAX_VALUE);
249    jobConf.setLong(WALInputFormat.START_TIME_KEY, thirdTs);
250    splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
251    assertTrue(splits.isEmpty());
252  }
253
254  protected WALKeyImpl getWalKeyImpl(final long time, NavigableMap<byte[], Integer> scopes) {
255    return new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, time, mvcc, scopes);
256  }
257
258  private WALRecordReader<WALKey> getReader() {
259    return new WALKeyRecordReader();
260  }
261
262  /**
263   * Create a new reader from the split, and match the edits against the passed columns.
264   */
265  private void testSplit(InputSplit split, byte[]... columns) throws Exception {
266    WALRecordReader<WALKey> reader = getReader();
267    reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf));
268
269    for (byte[] column : columns) {
270      assertTrue(reader.nextKeyValue());
271      Cell cell = reader.getCurrentValue().getCells().get(0);
272      if (
273        !Bytes.equals(column, 0, column.length, cell.getQualifierArray(), cell.getQualifierOffset(),
274          cell.getQualifierLength())
275      ) {
276        assertTrue(
277          "expected [" + Bytes.toString(column) + "], actual [" + Bytes.toString(
278            cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "]",
279          false);
280      }
281    }
282    assertFalse(reader.nextKeyValue());
283    reader.close();
284  }
285
286  /**
287   * Create a new reader from the split, match the edits against the passed columns, moving WAL to
288   * archive in between readings
289   */
290  private void testSplitWithMovingWAL(InputSplit split, byte[] col1, byte[] col2) throws Exception {
291    WALRecordReader<WALKey> reader = getReader();
292    reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf));
293
294    assertTrue(reader.nextKeyValue());
295    Cell cell = reader.getCurrentValue().getCells().get(0);
296    if (
297      !Bytes.equals(col1, 0, col1.length, cell.getQualifierArray(), cell.getQualifierOffset(),
298        cell.getQualifierLength())
299    ) {
300      assertTrue(
301        "expected [" + Bytes.toString(col1) + "], actual [" + Bytes.toString(
302          cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "]",
303        false);
304    }
305    // Move log file to archive directory
306    // While WAL record reader is open
307    WALInputFormat.WALSplit split_ = (WALInputFormat.WALSplit) split;
308    Path logFile = new Path(split_.getLogFileName());
309    Path archivedLogDir = getWALArchiveDir(conf);
310    Path archivedLogLocation = new Path(archivedLogDir, logFile.getName());
311    assertNotEquals(split_.getLogFileName(), archivedLogLocation.toString());
312
313    assertTrue(fs.rename(logFile, archivedLogLocation));
314    assertTrue(fs.exists(archivedLogDir));
315    assertFalse(fs.exists(logFile));
316    // TODO: This is not behaving as expected. WALInputFormat#WALKeyRecordReader doesn't open
317    // TODO: the archivedLogLocation to read next key value.
318    assertTrue(reader.nextKeyValue());
319    cell = reader.getCurrentValue().getCells().get(0);
320    if (
321      !Bytes.equals(col2, 0, col2.length, cell.getQualifierArray(), cell.getQualifierOffset(),
322        cell.getQualifierLength())
323    ) {
324      assertTrue(
325        "expected [" + Bytes.toString(col2) + "], actual [" + Bytes.toString(
326          cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "]",
327        false);
328    }
329    reader.close();
330  }
331
332  private Path getWALArchiveDir(Configuration conf) throws IOException {
333    Path rootDir = CommonFSUtils.getWALRootDir(conf);
334    String archiveDir = AbstractFSWALProvider.getWALArchiveDirectoryName(conf, getServerName());
335    return new Path(rootDir, archiveDir);
336  }
337
338}