001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import java.util.List; 024import java.util.NavigableMap; 025import java.util.TreeMap; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseTestingUtility; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.KeyValue; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.RegionInfo; 036import org.apache.hadoop.hbase.client.RegionInfoBuilder; 037import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALKeyRecordReader; 038import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALRecordReader; 039import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 040import org.apache.hadoop.hbase.testclassification.MapReduceTests; 041import org.apache.hadoop.hbase.testclassification.MediumTests; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.apache.hadoop.hbase.util.CommonFSUtils; 044import org.apache.hadoop.hbase.util.Threads; 045import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 046import org.apache.hadoop.hbase.wal.WAL; 047import org.apache.hadoop.hbase.wal.WALEdit; 048import org.apache.hadoop.hbase.wal.WALFactory; 049import org.apache.hadoop.hbase.wal.WALKey; 050import org.apache.hadoop.hbase.wal.WALKeyImpl; 051import org.apache.hadoop.mapreduce.InputSplit; 052import org.apache.hadoop.mapreduce.MapReduceTestUtil; 053import org.junit.AfterClass; 054import org.junit.Before; 055import org.junit.BeforeClass; 056import org.junit.ClassRule; 057import org.junit.Test; 058import org.junit.experimental.categories.Category; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062/** 063 * JUnit tests for the WALRecordReader 064 */ 065@Category({ MapReduceTests.class, MediumTests.class }) 066public class TestWALRecordReader { 067 068 @ClassRule 069 public static final HBaseClassTestRule CLASS_RULE = 070 HBaseClassTestRule.forClass(TestWALRecordReader.class); 071 072 private static final Logger LOG = LoggerFactory.getLogger(TestWALRecordReader.class); 073 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 074 private static Configuration conf; 075 private static FileSystem fs; 076 private static Path hbaseDir; 077 private static FileSystem walFs; 078 private static Path walRootDir; 079 // visible for TestHLogRecordReader 080 static final TableName tableName = TableName.valueOf(getName()); 081 private static final byte [] rowName = tableName.getName(); 082 // visible for TestHLogRecordReader 083 static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); 084 private static final byte[] family = Bytes.toBytes("column"); 085 private static final byte[] value = Bytes.toBytes("value"); 086 private static Path logDir; 087 protected MultiVersionConcurrencyControl mvcc; 088 protected static NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 089 090 private static String getName() { 091 return "TestWALRecordReader"; 092 } 093 094 @Before 095 public void setUp() throws Exception { 096 fs.delete(hbaseDir, true); 097 walFs.delete(walRootDir, true); 098 mvcc = new MultiVersionConcurrencyControl(); 099 } 100 101 @BeforeClass 102 public static void setUpBeforeClass() throws Exception { 103 // Make block sizes small. 104 conf = TEST_UTIL.getConfiguration(); 105 conf.setInt("dfs.blocksize", 1024 * 1024); 106 conf.setInt("dfs.replication", 1); 107 TEST_UTIL.startMiniDFSCluster(1); 108 109 conf = TEST_UTIL.getConfiguration(); 110 fs = TEST_UTIL.getDFSCluster().getFileSystem(); 111 112 hbaseDir = TEST_UTIL.createRootDir(); 113 walRootDir = TEST_UTIL.createWALRootDir(); 114 walFs = CommonFSUtils.getWALFileSystem(conf); 115 logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME); 116 } 117 118 @AfterClass 119 public static void tearDownAfterClass() throws Exception { 120 fs.delete(hbaseDir, true); 121 walFs.delete(walRootDir, true); 122 TEST_UTIL.shutdownMiniCluster(); 123 } 124 125 /** 126 * Test partial reads from the WALs based on passed time range. 127 */ 128 @Test 129 public void testPartialRead() throws Exception { 130 final WALFactory walfactory = new WALFactory(conf, getName()); 131 WAL log = walfactory.getWAL(info); 132 // This test depends on timestamp being millisecond based and the filename of the WAL also 133 // being millisecond based. 134 long ts = System.currentTimeMillis(); 135 WALEdit edit = new WALEdit(); 136 edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value)); 137 log.appendData(info, getWalKeyImpl(ts, scopes), edit); 138 edit = new WALEdit(); 139 edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value)); 140 log.appendData(info, getWalKeyImpl(ts+1, scopes), edit); 141 log.sync(); 142 Threads.sleep(10); 143 LOG.info("Before 1st WAL roll " + log.toString()); 144 log.rollWriter(); 145 LOG.info("Past 1st WAL roll " + log.toString()); 146 147 Thread.sleep(1); 148 long ts1 = System.currentTimeMillis(); 149 150 edit = new WALEdit(); 151 edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value)); 152 log.appendData(info, getWalKeyImpl(ts1+1, scopes), edit); 153 edit = new WALEdit(); 154 edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value)); 155 log.appendData(info, getWalKeyImpl(ts1+2, scopes), edit); 156 log.sync(); 157 log.shutdown(); 158 walfactory.shutdown(); 159 LOG.info("Closed WAL " + log.toString()); 160 161 162 WALInputFormat input = new WALInputFormat(); 163 Configuration jobConf = new Configuration(conf); 164 jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString()); 165 jobConf.setLong(WALInputFormat.END_TIME_KEY, ts); 166 167 // Only 1st file is considered, and only its 1st entry is in-range. 168 List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); 169 assertEquals(1, splits.size()); 170 testSplit(splits.get(0), Bytes.toBytes("1")); 171 172 jobConf.setLong(WALInputFormat.END_TIME_KEY, ts1+1); 173 splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); 174 assertEquals(2, splits.size()); 175 // Both entries from first file are in-range. 176 testSplit(splits.get(0), Bytes.toBytes("1"), Bytes.toBytes("2")); 177 // Only the 1st entry from the 2nd file is in-range. 178 testSplit(splits.get(1), Bytes.toBytes("3")); 179 180 jobConf.setLong(WALInputFormat.START_TIME_KEY, ts + 1); 181 jobConf.setLong(WALInputFormat.END_TIME_KEY, ts1 + 1); 182 splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); 183 assertEquals(1, splits.size()); 184 // Only the 1st entry from the 2nd file is in-range. 185 testSplit(splits.get(0), Bytes.toBytes("3")); 186 } 187 188 /** 189 * Test basic functionality 190 */ 191 @Test 192 public void testWALRecordReader() throws Exception { 193 final WALFactory walfactory = new WALFactory(conf, getName()); 194 WAL log = walfactory.getWAL(info); 195 byte [] value = Bytes.toBytes("value"); 196 WALEdit edit = new WALEdit(); 197 edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), 198 System.currentTimeMillis(), value)); 199 long txid = log.appendData(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit); 200 log.sync(txid); 201 202 Thread.sleep(1); // make sure 2nd log gets a later timestamp 203 long secondTs = System.currentTimeMillis(); 204 log.rollWriter(); 205 206 edit = new WALEdit(); 207 edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), System.currentTimeMillis(), value)); 208 txid = log.appendData(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit); 209 log.sync(txid); 210 log.shutdown(); 211 walfactory.shutdown(); 212 long thirdTs = System.currentTimeMillis(); 213 214 // should have 2 log files now 215 WALInputFormat input = new WALInputFormat(); 216 Configuration jobConf = new Configuration(conf); 217 jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString()); 218 219 // make sure both logs are found 220 List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); 221 assertEquals(2, splits.size()); 222 223 // should return exactly one KV 224 testSplit(splits.get(0), Bytes.toBytes("1")); 225 // same for the 2nd split 226 testSplit(splits.get(1), Bytes.toBytes("2")); 227 228 // now test basic time ranges: 229 230 // set an endtime, the 2nd log file can be ignored completely. 231 jobConf.setLong(WALInputFormat.END_TIME_KEY, secondTs-1); 232 splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); 233 assertEquals(1, splits.size()); 234 testSplit(splits.get(0), Bytes.toBytes("1")); 235 236 // now set a start time 237 jobConf.setLong(WALInputFormat.END_TIME_KEY, Long.MAX_VALUE); 238 jobConf.setLong(WALInputFormat.START_TIME_KEY, thirdTs); 239 splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); 240 assertTrue(splits.isEmpty()); 241 } 242 243 protected WALKeyImpl getWalKeyImpl(final long time, NavigableMap<byte[], Integer> scopes) { 244 return new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, time, mvcc, scopes); 245 } 246 247 private WALRecordReader<WALKey> getReader() { 248 return new WALKeyRecordReader(); 249 } 250 251 /** 252 * Create a new reader from the split, and match the edits against the passed columns. 253 */ 254 private void testSplit(InputSplit split, byte[]... columns) throws Exception { 255 WALRecordReader<WALKey> reader = getReader(); 256 reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf)); 257 258 for (byte[] column : columns) { 259 assertTrue(reader.nextKeyValue()); 260 Cell cell = reader.getCurrentValue().getCells().get(0); 261 if (!Bytes.equals(column, 0, column.length, cell.getQualifierArray(), 262 cell.getQualifierOffset(), cell.getQualifierLength())) { 263 assertTrue( 264 "expected [" + Bytes.toString(column) + "], actual [" + Bytes.toString( 265 cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "]", 266 false); 267 } 268 } 269 assertFalse(reader.nextKeyValue()); 270 reader.close(); 271 } 272}