001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.util.List;
025import java.util.NavigableMap;
026import java.util.TreeMap;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.KeyValue;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.client.RegionInfoBuilder;
038import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALKeyRecordReader;
039import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALRecordReader;
040import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
041import org.apache.hadoop.hbase.testclassification.MapReduceTests;
042import org.apache.hadoop.hbase.testclassification.MediumTests;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.hadoop.hbase.util.FSUtils;
045import org.apache.hadoop.hbase.wal.WAL;
046import org.apache.hadoop.hbase.wal.WALEdit;
047import org.apache.hadoop.hbase.wal.WALFactory;
048import org.apache.hadoop.hbase.wal.WALKey;
049import org.apache.hadoop.hbase.wal.WALKeyImpl;
050import org.apache.hadoop.mapreduce.InputSplit;
051import org.apache.hadoop.mapreduce.MapReduceTestUtil;
052import org.junit.AfterClass;
053import org.junit.Before;
054import org.junit.BeforeClass;
055import org.junit.ClassRule;
056import org.junit.Test;
057import org.junit.experimental.categories.Category;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061/**
062 * JUnit tests for the WALRecordReader
063 */
064@Category({ MapReduceTests.class, MediumTests.class })
065public class TestWALRecordReader {
066
067  @ClassRule
068  public static final HBaseClassTestRule CLASS_RULE =
069      HBaseClassTestRule.forClass(TestWALRecordReader.class);
070
071  private static final Logger LOG = LoggerFactory.getLogger(TestWALRecordReader.class);
072  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
073  private static Configuration conf;
074  private static FileSystem fs;
075  private static Path hbaseDir;
076  private static FileSystem walFs;
077  private static Path walRootDir;
078  // visible for TestHLogRecordReader
079  static final TableName tableName = TableName.valueOf(getName());
080  private static final byte [] rowName = tableName.getName();
081  // visible for TestHLogRecordReader
082  static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
083  private static final byte[] family = Bytes.toBytes("column");
084  private static final byte[] value = Bytes.toBytes("value");
085  private static Path logDir;
086  protected MultiVersionConcurrencyControl mvcc;
087  protected static NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
088
089  private static String getName() {
090    return "TestWALRecordReader";
091  }
092
093  @Before
094  public void setUp() throws Exception {
095    fs.delete(hbaseDir, true);
096    walFs.delete(walRootDir, true);
097    mvcc = new MultiVersionConcurrencyControl();
098  }
099
100  @BeforeClass
101  public static void setUpBeforeClass() throws Exception {
102    // Make block sizes small.
103    conf = TEST_UTIL.getConfiguration();
104    conf.setInt("dfs.blocksize", 1024 * 1024);
105    conf.setInt("dfs.replication", 1);
106    TEST_UTIL.startMiniDFSCluster(1);
107
108    conf = TEST_UTIL.getConfiguration();
109    fs = TEST_UTIL.getDFSCluster().getFileSystem();
110
111    hbaseDir = TEST_UTIL.createRootDir();
112    walRootDir = TEST_UTIL.createWALRootDir();
113    walFs = FSUtils.getWALFileSystem(conf);
114    logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME);
115  }
116
117  @AfterClass
118  public static void tearDownAfterClass() throws Exception {
119    fs.delete(hbaseDir, true);
120    walFs.delete(walRootDir, true);
121    TEST_UTIL.shutdownMiniCluster();
122  }
123
124  /**
125   * Test partial reads from the log based on passed time range
126   * @throws Exception
127   */
128  @Test
129  public void testPartialRead() throws Exception {
130    final WALFactory walfactory = new WALFactory(conf, getName());
131    WAL log = walfactory.getWAL(info);
132    // This test depends on timestamp being millisecond based and the filename of the WAL also
133    // being millisecond based.
134    long ts = System.currentTimeMillis();
135    WALEdit edit = new WALEdit();
136    edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value));
137    log.append(info, getWalKeyImpl(ts, scopes), edit, true);
138    edit = new WALEdit();
139    edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value));
140    log.append(info, getWalKeyImpl(ts+1, scopes), edit, true);
141    log.sync();
142    LOG.info("Before 1st WAL roll " + log.toString());
143    log.rollWriter();
144    LOG.info("Past 1st WAL roll " + log.toString());
145
146    Thread.sleep(1);
147    long ts1 = System.currentTimeMillis();
148
149    edit = new WALEdit();
150    edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value));
151    log.append(info, getWalKeyImpl(ts1+1, scopes), edit, true);
152    edit = new WALEdit();
153    edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value));
154    log.append(info, getWalKeyImpl(ts1+2, scopes), edit, true);
155    log.sync();
156    log.shutdown();
157    walfactory.shutdown();
158    LOG.info("Closed WAL " + log.toString());
159
160
161    WALInputFormat input = new WALInputFormat();
162    Configuration jobConf = new Configuration(conf);
163    jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString());
164    jobConf.setLong(WALInputFormat.END_TIME_KEY, ts);
165
166    // only 1st file is considered, and only its 1st entry is used
167    List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
168
169    assertEquals(1, splits.size());
170    testSplit(splits.get(0), Bytes.toBytes("1"));
171
172    jobConf.setLong(WALInputFormat.START_TIME_KEY, ts+1);
173    jobConf.setLong(WALInputFormat.END_TIME_KEY, ts1+1);
174    splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
175    // both files need to be considered
176    assertEquals(2, splits.size());
177    // only the 2nd entry from the 1st file is used
178    testSplit(splits.get(0), Bytes.toBytes("2"));
179    // only the 1nd entry from the 2nd file is used
180    testSplit(splits.get(1), Bytes.toBytes("3"));
181  }
182
183  /**
184   * Test basic functionality
185   * @throws Exception
186   */
187  @Test
188  public void testWALRecordReader() throws Exception {
189    final WALFactory walfactory = new WALFactory(conf, getName());
190    WAL log = walfactory.getWAL(info);
191    byte [] value = Bytes.toBytes("value");
192    WALEdit edit = new WALEdit();
193    edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
194        System.currentTimeMillis(), value));
195    long txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true);
196    log.sync(txid);
197
198    Thread.sleep(1); // make sure 2nd log gets a later timestamp
199    long secondTs = System.currentTimeMillis();
200    log.rollWriter();
201
202    edit = new WALEdit();
203    edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
204        System.currentTimeMillis(), value));
205    txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true);
206    log.sync(txid);
207    log.shutdown();
208    walfactory.shutdown();
209    long thirdTs = System.currentTimeMillis();
210
211    // should have 2 log files now
212    WALInputFormat input = new WALInputFormat();
213    Configuration jobConf = new Configuration(conf);
214    jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString());
215
216    // make sure both logs are found
217    List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
218    assertEquals(2, splits.size());
219
220    // should return exactly one KV
221    testSplit(splits.get(0), Bytes.toBytes("1"));
222    // same for the 2nd split
223    testSplit(splits.get(1), Bytes.toBytes("2"));
224
225    // now test basic time ranges:
226
227    // set an endtime, the 2nd log file can be ignored completely.
228    jobConf.setLong(WALInputFormat.END_TIME_KEY, secondTs-1);
229    splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
230    assertEquals(1, splits.size());
231    testSplit(splits.get(0), Bytes.toBytes("1"));
232
233    // now set a start time
234    jobConf.setLong(WALInputFormat.END_TIME_KEY, Long.MAX_VALUE);
235    jobConf.setLong(WALInputFormat.START_TIME_KEY, thirdTs);
236    splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
237    // both logs need to be considered
238    assertEquals(2, splits.size());
239    // but both readers skip all edits
240    testSplit(splits.get(0));
241    testSplit(splits.get(1));
242  }
243
244  protected WALKeyImpl getWalKeyImpl(final long time, NavigableMap<byte[], Integer> scopes) {
245    return new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, time, mvcc, scopes);
246  }
247
248  private WALRecordReader<WALKey> getReader() {
249    return new WALKeyRecordReader();
250  }
251
252  /**
253   * Create a new reader from the split, and match the edits against the passed columns.
254   */
255  private void testSplit(InputSplit split, byte[]... columns) throws Exception {
256    WALRecordReader<WALKey> reader = getReader();
257    reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf));
258
259    for (byte[] column : columns) {
260      assertTrue(reader.nextKeyValue());
261      Cell cell = reader.getCurrentValue().getCells().get(0);
262      if (!Bytes.equals(column, 0, column.length, cell.getQualifierArray(),
263        cell.getQualifierOffset(), cell.getQualifierLength())) {
264        assertTrue(
265          "expected [" + Bytes.toString(column) + "], actual [" + Bytes.toString(
266            cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "]",
267          false);
268      }
269    }
270    assertFalse(reader.nextKeyValue());
271    reader.close();
272  }
273}