001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotEquals; 023import static org.junit.Assert.assertTrue; 024 025import java.io.IOException; 026import java.util.List; 027import java.util.NavigableMap; 028import java.util.TreeMap; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.HBaseClassTestRule; 034import org.apache.hadoop.hbase.HBaseTestingUtil; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.KeyValue; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.RegionInfo; 040import org.apache.hadoop.hbase.client.RegionInfoBuilder; 041import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALKeyRecordReader; 042import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALRecordReader; 043import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 044import org.apache.hadoop.hbase.testclassification.MapReduceTests; 045import org.apache.hadoop.hbase.testclassification.MediumTests; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.hbase.util.CommonFSUtils; 048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 049import org.apache.hadoop.hbase.util.Threads; 050import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 051import org.apache.hadoop.hbase.wal.WAL; 052import org.apache.hadoop.hbase.wal.WALEdit; 053import org.apache.hadoop.hbase.wal.WALFactory; 054import org.apache.hadoop.hbase.wal.WALKey; 055import org.apache.hadoop.hbase.wal.WALKeyImpl; 056import org.apache.hadoop.mapreduce.InputSplit; 057import org.apache.hadoop.mapreduce.MapReduceTestUtil; 058import org.junit.AfterClass; 059import org.junit.Before; 060import org.junit.BeforeClass; 061import org.junit.ClassRule; 062import org.junit.Test; 063import org.junit.experimental.categories.Category; 064import org.slf4j.Logger; 065import org.slf4j.LoggerFactory; 066 067/** 068 * JUnit tests for the WALRecordReader 069 */ 070@Category({ MapReduceTests.class, MediumTests.class }) 071public class TestWALRecordReader { 072 073 @ClassRule 074 public static final HBaseClassTestRule CLASS_RULE = 075 HBaseClassTestRule.forClass(TestWALRecordReader.class); 076 077 private static final Logger LOG = LoggerFactory.getLogger(TestWALRecordReader.class); 078 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 079 private static Configuration conf; 080 private static FileSystem fs; 081 private static Path hbaseDir; 082 private static FileSystem walFs; 083 private static Path walRootDir; 084 // visible for TestHLogRecordReader 085 static final TableName tableName = TableName.valueOf(getName()); 086 private static final byte[] rowName = tableName.getName(); 087 // visible for TestHLogRecordReader 088 static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); 089 private static final byte[] family = Bytes.toBytes("column"); 090 private static final byte[] value = Bytes.toBytes("value"); 091 private static Path logDir; 092 protected MultiVersionConcurrencyControl mvcc; 093 protected static NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 094 095 private static String getName() { 096 return "TestWALRecordReader"; 097 } 098 099 private static String getServerName() { 100 ServerName serverName = ServerName.valueOf("TestWALRecordReader", 1, 1); 101 return serverName.toString(); 102 } 103 104 @Before 105 public void setUp() throws Exception { 106 fs.delete(hbaseDir, true); 107 walFs.delete(walRootDir, true); 108 mvcc = new MultiVersionConcurrencyControl(); 109 } 110 111 @BeforeClass 112 public static void setUpBeforeClass() throws Exception { 113 // Make block sizes small. 114 conf = TEST_UTIL.getConfiguration(); 115 conf.setInt("dfs.blocksize", 1024 * 1024); 116 conf.setInt("dfs.replication", 1); 117 TEST_UTIL.startMiniDFSCluster(1); 118 119 conf = TEST_UTIL.getConfiguration(); 120 fs = TEST_UTIL.getDFSCluster().getFileSystem(); 121 122 hbaseDir = TEST_UTIL.createRootDir(); 123 walRootDir = TEST_UTIL.createWALRootDir(); 124 walFs = CommonFSUtils.getWALFileSystem(conf); 125 logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME); 126 } 127 128 @AfterClass 129 public static void tearDownAfterClass() throws Exception { 130 fs.delete(hbaseDir, true); 131 walFs.delete(walRootDir, true); 132 TEST_UTIL.shutdownMiniCluster(); 133 } 134 135 /** 136 * Test partial reads from the WALs based on passed time range. 137 */ 138 @Test 139 public void testPartialRead() throws Exception { 140 final WALFactory walfactory = new WALFactory(conf, getName()); 141 WAL log = walfactory.getWAL(info); 142 // This test depends on timestamp being millisecond based and the filename of the WAL also 143 // being millisecond based. 144 long ts = EnvironmentEdgeManager.currentTime(); 145 WALEdit edit = new WALEdit(); 146 edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value)); 147 log.appendData(info, getWalKeyImpl(ts, scopes), edit); 148 edit = new WALEdit(); 149 edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts + 1, value)); 150 log.appendData(info, getWalKeyImpl(ts + 1, scopes), edit); 151 log.sync(); 152 Threads.sleep(10); 153 LOG.info("Before 1st WAL roll " + log.toString()); 154 log.rollWriter(); 155 LOG.info("Past 1st WAL roll " + log.toString()); 156 157 Thread.sleep(1); 158 long ts1 = EnvironmentEdgeManager.currentTime(); 159 160 edit = new WALEdit(); 161 edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1 + 1, value)); 162 log.appendData(info, getWalKeyImpl(ts1 + 1, scopes), edit); 163 edit = new WALEdit(); 164 edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1 + 2, value)); 165 log.appendData(info, getWalKeyImpl(ts1 + 2, scopes), edit); 166 log.sync(); 167 log.shutdown(); 168 walfactory.shutdown(); 169 LOG.info("Closed WAL " + log.toString()); 170 171 WALInputFormat input = new WALInputFormat(); 172 Configuration jobConf = new Configuration(conf); 173 jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString()); 174 jobConf.setLong(WALInputFormat.END_TIME_KEY, ts); 175 176 // Only 1st file is considered, and only its 1st entry is in-range. 177 List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); 178 assertEquals(1, splits.size()); 179 testSplit(splits.get(0), Bytes.toBytes("1")); 180 181 jobConf.setLong(WALInputFormat.END_TIME_KEY, ts1 + 1); 182 splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); 183 assertEquals(2, splits.size()); 184 // Both entries from first file are in-range. 185 testSplit(splits.get(0), Bytes.toBytes("1"), Bytes.toBytes("2")); 186 // Only the 1st entry from the 2nd file is in-range. 187 testSplit(splits.get(1), Bytes.toBytes("3")); 188 189 jobConf.setLong(WALInputFormat.START_TIME_KEY, ts + 1); 190 jobConf.setLong(WALInputFormat.END_TIME_KEY, ts1 + 1); 191 splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); 192 assertEquals(1, splits.size()); 193 // Only the 1st entry from the 2nd file is in-range. 194 testSplit(splits.get(0), Bytes.toBytes("3")); 195 } 196 197 /** 198 * Test basic functionality 199 */ 200 @Test 201 public void testWALRecordReader() throws Exception { 202 final WALFactory walfactory = new WALFactory(conf, getName()); 203 WAL log = walfactory.getWAL(info); 204 byte[] value = Bytes.toBytes("value"); 205 WALEdit edit = new WALEdit(); 206 edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), EnvironmentEdgeManager.currentTime(), 207 value)); 208 long txid = 209 log.appendData(info, getWalKeyImpl(EnvironmentEdgeManager.currentTime(), scopes), edit); 210 log.sync(txid); 211 212 Thread.sleep(1); // make sure 2nd log gets a later timestamp 213 long secondTs = EnvironmentEdgeManager.currentTime(); 214 log.rollWriter(); 215 216 edit = new WALEdit(); 217 edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), EnvironmentEdgeManager.currentTime(), 218 value)); 219 txid = log.appendData(info, getWalKeyImpl(EnvironmentEdgeManager.currentTime(), scopes), edit); 220 log.sync(txid); 221 log.shutdown(); 222 walfactory.shutdown(); 223 long thirdTs = EnvironmentEdgeManager.currentTime(); 224 225 // should have 2 log files now 226 WALInputFormat input = new WALInputFormat(); 227 Configuration jobConf = new Configuration(conf); 228 jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString()); 229 230 // make sure both logs are found 231 List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); 232 assertEquals(2, splits.size()); 233 234 // should return exactly one KV 235 testSplit(splits.get(0), Bytes.toBytes("1")); 236 // same for the 2nd split 237 testSplit(splits.get(1), Bytes.toBytes("2")); 238 239 // now test basic time ranges: 240 241 // set an endtime, the 2nd log file can be ignored completely. 242 jobConf.setLong(WALInputFormat.END_TIME_KEY, secondTs - 1); 243 splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); 244 assertEquals(1, splits.size()); 245 testSplit(splits.get(0), Bytes.toBytes("1")); 246 247 // now set a start time 248 jobConf.setLong(WALInputFormat.END_TIME_KEY, Long.MAX_VALUE); 249 jobConf.setLong(WALInputFormat.START_TIME_KEY, thirdTs); 250 splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); 251 assertTrue(splits.isEmpty()); 252 } 253 254 /** 255 * Test WALRecordReader tolerance to moving WAL from active to archive directory 256 * @throws Exception exception 257 */ 258 @Test 259 public void testWALRecordReaderActiveArchiveTolerance() throws Exception { 260 final WALFactory walfactory = new WALFactory(conf, getName()); 261 WAL log = walfactory.getWAL(info); 262 byte[] value = Bytes.toBytes("value"); 263 WALEdit edit = new WALEdit(); 264 edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), EnvironmentEdgeManager.currentTime(), 265 value)); 266 long txid = 267 log.appendData(info, getWalKeyImpl(EnvironmentEdgeManager.currentTime(), scopes), edit); 268 log.sync(txid); 269 270 Thread.sleep(10); // make sure 2nd edit gets a later timestamp 271 272 edit = new WALEdit(); 273 edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), EnvironmentEdgeManager.currentTime(), 274 value)); 275 txid = log.appendData(info, getWalKeyImpl(EnvironmentEdgeManager.currentTime(), scopes), edit); 276 log.sync(txid); 277 log.shutdown(); 278 279 // should have 2 log entries now 280 WALInputFormat input = new WALInputFormat(); 281 Configuration jobConf = new Configuration(conf); 282 jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString()); 283 // make sure log is found 284 List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); 285 assertEquals(1, splits.size()); 286 WALInputFormat.WALSplit split = (WALInputFormat.WALSplit) splits.get(0); 287 LOG.debug("log=" + logDir + " file=" + split.getLogFileName()); 288 289 testSplitWithMovingWAL(splits.get(0), Bytes.toBytes("1"), Bytes.toBytes("2")); 290 } 291 292 protected WALKeyImpl getWalKeyImpl(final long time, NavigableMap<byte[], Integer> scopes) { 293 return new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, time, mvcc, scopes); 294 } 295 296 private WALRecordReader<WALKey> getReader() { 297 return new WALKeyRecordReader(); 298 } 299 300 /** 301 * Create a new reader from the split, and match the edits against the passed columns. 302 */ 303 private void testSplit(InputSplit split, byte[]... columns) throws Exception { 304 WALRecordReader<WALKey> reader = getReader(); 305 reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf)); 306 307 for (byte[] column : columns) { 308 assertTrue(reader.nextKeyValue()); 309 Cell cell = reader.getCurrentValue().getCells().get(0); 310 if ( 311 !Bytes.equals(column, 0, column.length, cell.getQualifierArray(), cell.getQualifierOffset(), 312 cell.getQualifierLength()) 313 ) { 314 assertTrue( 315 "expected [" + Bytes.toString(column) + "], actual [" + Bytes.toString( 316 cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "]", 317 false); 318 } 319 } 320 assertFalse(reader.nextKeyValue()); 321 reader.close(); 322 } 323 324 /** 325 * Create a new reader from the split, match the edits against the passed columns, moving WAL to 326 * archive in between readings 327 */ 328 private void testSplitWithMovingWAL(InputSplit split, byte[] col1, byte[] col2) throws Exception { 329 WALRecordReader<WALKey> reader = getReader(); 330 reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf)); 331 332 assertTrue(reader.nextKeyValue()); 333 Cell cell = reader.getCurrentValue().getCells().get(0); 334 if ( 335 !Bytes.equals(col1, 0, col1.length, cell.getQualifierArray(), cell.getQualifierOffset(), 336 cell.getQualifierLength()) 337 ) { 338 assertTrue( 339 "expected [" + Bytes.toString(col1) + "], actual [" + Bytes.toString( 340 cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "]", 341 false); 342 } 343 // Move log file to archive directory 344 // While WAL record reader is open 345 WALInputFormat.WALSplit split_ = (WALInputFormat.WALSplit) split; 346 Path logFile = new Path(split_.getLogFileName()); 347 Path archivedLogDir = getWALArchiveDir(conf); 348 Path archivedLogLocation = new Path(archivedLogDir, logFile.getName()); 349 assertNotEquals(split_.getLogFileName(), archivedLogLocation.toString()); 350 351 assertTrue(fs.rename(logFile, archivedLogLocation)); 352 assertTrue(fs.exists(archivedLogDir)); 353 assertFalse(fs.exists(logFile)); 354 // TODO: This is not behaving as expected. WALInputFormat#WALKeyRecordReader doesn't open 355 // TODO: the archivedLogLocation to read next key value. 356 assertTrue(reader.nextKeyValue()); 357 cell = reader.getCurrentValue().getCells().get(0); 358 if ( 359 !Bytes.equals(col2, 0, col2.length, cell.getQualifierArray(), cell.getQualifierOffset(), 360 cell.getQualifierLength()) 361 ) { 362 assertTrue( 363 "expected [" + Bytes.toString(col2) + "], actual [" + Bytes.toString( 364 cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "]", 365 false); 366 } 367 reader.close(); 368 } 369 370 private Path getWALArchiveDir(Configuration conf) throws IOException { 371 Path rootDir = CommonFSUtils.getWALRootDir(conf); 372 String archiveDir = AbstractFSWALProvider.getWALArchiveDirectoryName(conf, getServerName()); 373 return new Path(rootDir, archiveDir); 374 } 375}