001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import java.util.List;
024import java.util.NavigableMap;
025import java.util.TreeMap;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtility;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.KeyValue;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.RegionInfo;
036import org.apache.hadoop.hbase.client.RegionInfoBuilder;
037import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALKeyRecordReader;
038import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALRecordReader;
039import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
040import org.apache.hadoop.hbase.testclassification.MapReduceTests;
041import org.apache.hadoop.hbase.testclassification.MediumTests;
042import org.apache.hadoop.hbase.util.Bytes;
043import org.apache.hadoop.hbase.util.CommonFSUtils;
044import org.apache.hadoop.hbase.util.Threads;
045import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
046import org.apache.hadoop.hbase.wal.WAL;
047import org.apache.hadoop.hbase.wal.WALEdit;
048import org.apache.hadoop.hbase.wal.WALFactory;
049import org.apache.hadoop.hbase.wal.WALKey;
050import org.apache.hadoop.hbase.wal.WALKeyImpl;
051import org.apache.hadoop.mapreduce.InputSplit;
052import org.apache.hadoop.mapreduce.MapReduceTestUtil;
053import org.junit.AfterClass;
054import org.junit.Before;
055import org.junit.BeforeClass;
056import org.junit.ClassRule;
057import org.junit.Test;
058import org.junit.experimental.categories.Category;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062/**
063 * JUnit tests for the WALRecordReader
064 */
065@Category({ MapReduceTests.class, MediumTests.class })
066public class TestWALRecordReader {
067
068  @ClassRule
069  public static final HBaseClassTestRule CLASS_RULE =
070      HBaseClassTestRule.forClass(TestWALRecordReader.class);
071
072  private static final Logger LOG = LoggerFactory.getLogger(TestWALRecordReader.class);
073  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
074  private static Configuration conf;
075  private static FileSystem fs;
076  private static Path hbaseDir;
077  private static FileSystem walFs;
078  private static Path walRootDir;
079  // visible for TestHLogRecordReader
080  static final TableName tableName = TableName.valueOf(getName());
081  private static final byte [] rowName = tableName.getName();
082  // visible for TestHLogRecordReader
083  static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
084  private static final byte[] family = Bytes.toBytes("column");
085  private static final byte[] value = Bytes.toBytes("value");
086  private static Path logDir;
087  protected MultiVersionConcurrencyControl mvcc;
088  protected static NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
089
090  private static String getName() {
091    return "TestWALRecordReader";
092  }
093
094  @Before
095  public void setUp() throws Exception {
096    fs.delete(hbaseDir, true);
097    walFs.delete(walRootDir, true);
098    mvcc = new MultiVersionConcurrencyControl();
099  }
100
101  @BeforeClass
102  public static void setUpBeforeClass() throws Exception {
103    // Make block sizes small.
104    conf = TEST_UTIL.getConfiguration();
105    conf.setInt("dfs.blocksize", 1024 * 1024);
106    conf.setInt("dfs.replication", 1);
107    TEST_UTIL.startMiniDFSCluster(1);
108
109    conf = TEST_UTIL.getConfiguration();
110    fs = TEST_UTIL.getDFSCluster().getFileSystem();
111
112    hbaseDir = TEST_UTIL.createRootDir();
113    walRootDir = TEST_UTIL.createWALRootDir();
114    walFs = CommonFSUtils.getWALFileSystem(conf);
115    logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME);
116  }
117
118  @AfterClass
119  public static void tearDownAfterClass() throws Exception {
120    fs.delete(hbaseDir, true);
121    walFs.delete(walRootDir, true);
122    TEST_UTIL.shutdownMiniCluster();
123  }
124
125  /**
126   * Test partial reads from the WALs based on passed time range.
127   */
128  @Test
129  public void testPartialRead() throws Exception {
130    final WALFactory walfactory = new WALFactory(conf, getName());
131    WAL log = walfactory.getWAL(info);
132    // This test depends on timestamp being millisecond based and the filename of the WAL also
133    // being millisecond based.
134    long ts = System.currentTimeMillis();
135    WALEdit edit = new WALEdit();
136    edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value));
137    log.appendData(info, getWalKeyImpl(ts, scopes), edit);
138    edit = new WALEdit();
139    edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value));
140    log.appendData(info, getWalKeyImpl(ts+1, scopes), edit);
141    log.sync();
142    Threads.sleep(10);
143    LOG.info("Before 1st WAL roll " + log.toString());
144    log.rollWriter();
145    LOG.info("Past 1st WAL roll " + log.toString());
146
147    Thread.sleep(1);
148    long ts1 = System.currentTimeMillis();
149
150    edit = new WALEdit();
151    edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value));
152    log.appendData(info, getWalKeyImpl(ts1+1, scopes), edit);
153    edit = new WALEdit();
154    edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value));
155    log.appendData(info, getWalKeyImpl(ts1+2, scopes), edit);
156    log.sync();
157    log.shutdown();
158    walfactory.shutdown();
159    LOG.info("Closed WAL " + log.toString());
160
161
162    WALInputFormat input = new WALInputFormat();
163    Configuration jobConf = new Configuration(conf);
164    jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString());
165    jobConf.setLong(WALInputFormat.END_TIME_KEY, ts);
166
167    // Only 1st file is considered, and only its 1st entry is in-range.
168    List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
169    assertEquals(1, splits.size());
170    testSplit(splits.get(0), Bytes.toBytes("1"));
171
172    jobConf.setLong(WALInputFormat.END_TIME_KEY, ts1+1);
173    splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
174    assertEquals(2, splits.size());
175    // Both entries from first file are in-range.
176    testSplit(splits.get(0), Bytes.toBytes("1"), Bytes.toBytes("2"));
177    // Only the 1st entry from the 2nd file is in-range.
178    testSplit(splits.get(1), Bytes.toBytes("3"));
179
180    jobConf.setLong(WALInputFormat.START_TIME_KEY, ts + 1);
181    jobConf.setLong(WALInputFormat.END_TIME_KEY, ts1 + 1);
182    splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
183    assertEquals(1, splits.size());
184    // Only the 1st entry from the 2nd file is in-range.
185    testSplit(splits.get(0), Bytes.toBytes("3"));
186  }
187
188  /**
189   * Test basic functionality
190   */
191  @Test
192  public void testWALRecordReader() throws Exception {
193    final WALFactory walfactory = new WALFactory(conf, getName());
194    WAL log = walfactory.getWAL(info);
195    byte [] value = Bytes.toBytes("value");
196    WALEdit edit = new WALEdit();
197    edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
198        System.currentTimeMillis(), value));
199    long txid = log.appendData(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit);
200    log.sync(txid);
201
202    Thread.sleep(1); // make sure 2nd log gets a later timestamp
203    long secondTs = System.currentTimeMillis();
204    log.rollWriter();
205
206    edit = new WALEdit();
207    edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), System.currentTimeMillis(), value));
208    txid = log.appendData(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit);
209    log.sync(txid);
210    log.shutdown();
211    walfactory.shutdown();
212    long thirdTs = System.currentTimeMillis();
213
214    // should have 2 log files now
215    WALInputFormat input = new WALInputFormat();
216    Configuration jobConf = new Configuration(conf);
217    jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString());
218
219    // make sure both logs are found
220    List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
221    assertEquals(2, splits.size());
222
223    // should return exactly one KV
224    testSplit(splits.get(0), Bytes.toBytes("1"));
225    // same for the 2nd split
226    testSplit(splits.get(1), Bytes.toBytes("2"));
227
228    // now test basic time ranges:
229
230    // set an endtime, the 2nd log file can be ignored completely.
231    jobConf.setLong(WALInputFormat.END_TIME_KEY, secondTs-1);
232    splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
233    assertEquals(1, splits.size());
234    testSplit(splits.get(0), Bytes.toBytes("1"));
235
236    // now set a start time
237    jobConf.setLong(WALInputFormat.END_TIME_KEY, Long.MAX_VALUE);
238    jobConf.setLong(WALInputFormat.START_TIME_KEY, thirdTs);
239    splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
240    assertTrue(splits.isEmpty());
241  }
242
243  protected WALKeyImpl getWalKeyImpl(final long time, NavigableMap<byte[], Integer> scopes) {
244    return new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, time, mvcc, scopes);
245  }
246
247  private WALRecordReader<WALKey> getReader() {
248    return new WALKeyRecordReader();
249  }
250
251  /**
252   * Create a new reader from the split, and match the edits against the passed columns.
253   */
254  private void testSplit(InputSplit split, byte[]... columns) throws Exception {
255    WALRecordReader<WALKey> reader = getReader();
256    reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf));
257
258    for (byte[] column : columns) {
259      assertTrue(reader.nextKeyValue());
260      Cell cell = reader.getCurrentValue().getCells().get(0);
261      if (!Bytes.equals(column, 0, column.length, cell.getQualifierArray(),
262        cell.getQualifierOffset(), cell.getQualifierLength())) {
263        assertTrue(
264          "expected [" + Bytes.toString(column) + "], actual [" + Bytes.toString(
265            cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "]",
266          false);
267      }
268    }
269    assertFalse(reader.nextKeyValue());
270    reader.close();
271  }
272}