001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertNotEquals; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024import static org.junit.jupiter.api.Assertions.fail; 025 026import java.io.IOException; 027import java.util.List; 028import java.util.NavigableMap; 029import java.util.TreeMap; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.HBaseTestingUtil; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.KeyValue; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.RegionInfo; 040import org.apache.hadoop.hbase.client.RegionInfoBuilder; 041import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALKeyRecordReader; 042import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALRecordReader; 043import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 044import org.apache.hadoop.hbase.testclassification.MapReduceTests; 045import org.apache.hadoop.hbase.testclassification.MediumTests; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.hbase.util.CommonFSUtils; 048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 049import org.apache.hadoop.hbase.util.Threads; 050import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 051import org.apache.hadoop.hbase.wal.WAL; 052import org.apache.hadoop.hbase.wal.WALEdit; 053import org.apache.hadoop.hbase.wal.WALEditInternalHelper; 054import org.apache.hadoop.hbase.wal.WALFactory; 055import org.apache.hadoop.hbase.wal.WALKey; 056import org.apache.hadoop.hbase.wal.WALKeyImpl; 057import org.apache.hadoop.mapreduce.InputSplit; 058import org.apache.hadoop.mapreduce.MapReduceTestUtil; 059import org.junit.jupiter.api.AfterAll; 060import org.junit.jupiter.api.BeforeAll; 061import org.junit.jupiter.api.BeforeEach; 062import org.junit.jupiter.api.Tag; 063import org.junit.jupiter.api.Test; 064import org.slf4j.Logger; 065import org.slf4j.LoggerFactory; 066 067/** 068 * JUnit tests for the WALRecordReader 069 */ 070@Tag(MapReduceTests.TAG) 071@Tag(MediumTests.TAG) 072public class TestWALRecordReader { 073 074 private static final Logger LOG = LoggerFactory.getLogger(TestWALRecordReader.class); 075 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 076 private static Configuration conf; 077 private static FileSystem fs; 078 private static Path hbaseDir; 079 private static FileSystem walFs; 080 private static Path walRootDir; 081 // visible for TestHLogRecordReader 082 static final TableName tableName = TableName.valueOf(getName()); 083 private static final byte[] rowName = tableName.getName(); 084 // visible for TestHLogRecordReader 085 static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); 086 private static final byte[] family = Bytes.toBytes("column"); 087 private static final byte[] value = Bytes.toBytes("value"); 088 private static Path logDir; 089 protected MultiVersionConcurrencyControl mvcc; 090 protected static NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 091 092 private static String getName() { 093 return "TestWALRecordReader"; 094 } 095 096 private static String getServerName() { 097 ServerName serverName = ServerName.valueOf("TestWALRecordReader", 1, 1); 098 return serverName.toString(); 099 } 100 101 @BeforeEach 102 public void setUp() throws Exception { 103 fs.delete(hbaseDir, true); 104 walFs.delete(walRootDir, true); 105 mvcc = new MultiVersionConcurrencyControl(); 106 } 107 108 @BeforeAll 109 public static void setUpBeforeClass() throws Exception { 110 // Make block sizes small. 111 conf = TEST_UTIL.getConfiguration(); 112 conf.setInt("dfs.blocksize", 1024 * 1024); 113 conf.setInt("dfs.replication", 1); 114 TEST_UTIL.startMiniDFSCluster(1); 115 116 conf = TEST_UTIL.getConfiguration(); 117 fs = TEST_UTIL.getDFSCluster().getFileSystem(); 118 119 hbaseDir = TEST_UTIL.createRootDir(); 120 walRootDir = TEST_UTIL.createWALRootDir(); 121 walFs = CommonFSUtils.getWALFileSystem(conf); 122 logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME); 123 } 124 125 @AfterAll 126 public static void tearDownAfterClass() throws Exception { 127 fs.delete(hbaseDir, true); 128 walFs.delete(walRootDir, true); 129 TEST_UTIL.shutdownMiniCluster(); 130 } 131 132 /** 133 * Test partial reads from the WALs based on passed time range. 134 */ 135 @Test 136 public void testPartialRead() throws Exception { 137 final WALFactory walfactory = new WALFactory(conf, getName()); 138 WAL log = walfactory.getWAL(info); 139 // This test depends on timestamp being millisecond based and the filename of the WAL also 140 // being millisecond based. 141 long ts = EnvironmentEdgeManager.currentTime(); 142 WALEdit edit = new WALEdit(); 143 WALEditInternalHelper.addExtendedCell(edit, 144 new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value)); 145 log.appendData(info, getWalKeyImpl(ts, scopes), edit); 146 edit = new WALEdit(); 147 WALEditInternalHelper.addExtendedCell(edit, 148 new KeyValue(rowName, family, Bytes.toBytes("2"), ts + 1, value)); 149 log.appendData(info, getWalKeyImpl(ts + 1, scopes), edit); 150 log.sync(); 151 Threads.sleep(10); 152 LOG.info("Before 1st WAL roll " + log.toString()); 153 log.rollWriter(); 154 LOG.info("Past 1st WAL roll " + log.toString()); 155 156 Thread.sleep(1); 157 long ts1 = EnvironmentEdgeManager.currentTime(); 158 159 edit = new WALEdit(); 160 WALEditInternalHelper.addExtendedCell(edit, 161 new KeyValue(rowName, family, Bytes.toBytes("3"), ts1 + 1, value)); 162 log.appendData(info, getWalKeyImpl(ts1 + 1, scopes), edit); 163 edit = new WALEdit(); 164 WALEditInternalHelper.addExtendedCell(edit, 165 new KeyValue(rowName, family, Bytes.toBytes("4"), ts1 + 2, value)); 166 log.appendData(info, getWalKeyImpl(ts1 + 2, scopes), edit); 167 log.sync(); 168 log.shutdown(); 169 walfactory.shutdown(); 170 LOG.info("Closed WAL " + log.toString()); 171 172 WALInputFormat input = new WALInputFormat(); 173 Configuration jobConf = new Configuration(conf); 174 jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString()); 175 jobConf.setLong(WALInputFormat.END_TIME_KEY, ts); 176 177 // Only 1st file is considered, and only its 1st entry is in-range. 178 List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); 179 assertEquals(1, splits.size()); 180 testSplit(splits.get(0), Bytes.toBytes("1")); 181 182 jobConf.setLong(WALInputFormat.END_TIME_KEY, ts1 + 1); 183 splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); 184 assertEquals(2, splits.size()); 185 // Both entries from first file are in-range. 186 testSplit(splits.get(0), Bytes.toBytes("1"), Bytes.toBytes("2")); 187 // Only the 1st entry from the 2nd file is in-range. 188 testSplit(splits.get(1), Bytes.toBytes("3")); 189 190 jobConf.setLong(WALInputFormat.START_TIME_KEY, ts + 1); 191 jobConf.setLong(WALInputFormat.END_TIME_KEY, ts1 + 1); 192 splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); 193 assertEquals(1, splits.size()); 194 // Only the 1st entry from the 2nd file is in-range. 195 testSplit(splits.get(0), Bytes.toBytes("3")); 196 } 197 198 /** 199 * Test basic functionality 200 */ 201 @Test 202 public void testWALRecordReader() throws Exception { 203 final WALFactory walfactory = new WALFactory(conf, getName()); 204 WAL log = walfactory.getWAL(info); 205 byte[] value = Bytes.toBytes("value"); 206 WALEdit edit = new WALEdit(); 207 WALEditInternalHelper.addExtendedCell(edit, new KeyValue(rowName, family, Bytes.toBytes("1"), 208 EnvironmentEdgeManager.currentTime(), value)); 209 long txid = 210 log.appendData(info, getWalKeyImpl(EnvironmentEdgeManager.currentTime(), scopes), edit); 211 log.sync(txid); 212 213 Thread.sleep(1); // make sure 2nd log gets a later timestamp 214 long secondTs = EnvironmentEdgeManager.currentTime(); 215 log.rollWriter(); 216 217 edit = new WALEdit(); 218 WALEditInternalHelper.addExtendedCell(edit, new KeyValue(rowName, family, Bytes.toBytes("2"), 219 EnvironmentEdgeManager.currentTime(), value)); 220 txid = log.appendData(info, getWalKeyImpl(EnvironmentEdgeManager.currentTime(), scopes), edit); 221 log.sync(txid); 222 log.shutdown(); 223 walfactory.shutdown(); 224 long thirdTs = EnvironmentEdgeManager.currentTime(); 225 226 // should have 2 log files now 227 WALInputFormat input = new WALInputFormat(); 228 Configuration jobConf = new Configuration(conf); 229 jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString()); 230 231 // make sure both logs are found 232 List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); 233 assertEquals(2, splits.size()); 234 235 // should return exactly one KV 236 testSplit(splits.get(0), Bytes.toBytes("1")); 237 // same for the 2nd split 238 testSplit(splits.get(1), Bytes.toBytes("2")); 239 240 // now test basic time ranges: 241 242 // set an endtime, the 2nd log file can be ignored completely. 243 jobConf.setLong(WALInputFormat.END_TIME_KEY, secondTs - 1); 244 splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); 245 assertEquals(1, splits.size()); 246 testSplit(splits.get(0), Bytes.toBytes("1")); 247 248 // now set a start time 249 jobConf.setLong(WALInputFormat.END_TIME_KEY, Long.MAX_VALUE); 250 jobConf.setLong(WALInputFormat.START_TIME_KEY, thirdTs); 251 splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); 252 assertTrue(splits.isEmpty()); 253 } 254 255 /** 256 * Test WALRecordReader tolerance to moving WAL from active to archive directory 257 * @throws Exception exception 258 */ 259 @Test 260 public void testWALRecordReaderActiveArchiveTolerance() throws Exception { 261 final WALFactory walfactory = new WALFactory(conf, getName()); 262 WAL log = walfactory.getWAL(info); 263 byte[] value = Bytes.toBytes("value"); 264 WALEdit edit = new WALEdit(); 265 WALEditInternalHelper.addExtendedCell(edit, new KeyValue(rowName, family, Bytes.toBytes("1"), 266 EnvironmentEdgeManager.currentTime(), value)); 267 long txid = 268 log.appendData(info, getWalKeyImpl(EnvironmentEdgeManager.currentTime(), scopes), edit); 269 log.sync(txid); 270 271 Thread.sleep(10); // make sure 2nd edit gets a later timestamp 272 273 edit = new WALEdit(); 274 WALEditInternalHelper.addExtendedCell(edit, new KeyValue(rowName, family, Bytes.toBytes("2"), 275 EnvironmentEdgeManager.currentTime(), value)); 276 txid = log.appendData(info, getWalKeyImpl(EnvironmentEdgeManager.currentTime(), scopes), edit); 277 log.sync(txid); 278 log.shutdown(); 279 280 // should have 2 log entries now 281 WALInputFormat input = new WALInputFormat(); 282 Configuration jobConf = new Configuration(conf); 283 jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString()); 284 // make sure log is found 285 List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); 286 assertEquals(1, splits.size()); 287 WALInputFormat.WALSplit split = (WALInputFormat.WALSplit) splits.get(0); 288 LOG.debug("log=" + logDir + " file=" + split.getLogFileName()); 289 290 testSplitWithMovingWAL(splits.get(0), Bytes.toBytes("1"), Bytes.toBytes("2")); 291 } 292 293 protected WALKeyImpl getWalKeyImpl(final long time, NavigableMap<byte[], Integer> scopes) { 294 return new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, time, mvcc, scopes); 295 } 296 297 private WALRecordReader<WALKey> getReader() { 298 return new WALKeyRecordReader(); 299 } 300 301 /** 302 * Create a new reader from the split, and match the edits against the passed columns. 303 */ 304 private void testSplit(InputSplit split, byte[]... columns) throws Exception { 305 WALRecordReader<WALKey> reader = getReader(); 306 reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf)); 307 308 for (byte[] column : columns) { 309 assertTrue(reader.nextKeyValue()); 310 Cell cell = reader.getCurrentValue().getCells().get(0); 311 if ( 312 !Bytes.equals(column, 0, column.length, cell.getQualifierArray(), cell.getQualifierOffset(), 313 cell.getQualifierLength()) 314 ) { 315 fail("expected [" + Bytes.toString(column) + "], actual [" + Bytes.toString( 316 cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "]"); 317 } 318 } 319 assertFalse(reader.nextKeyValue()); 320 reader.close(); 321 } 322 323 /** 324 * Create a new reader from the split, match the edits against the passed columns, moving WAL to 325 * archive in between readings 326 */ 327 private void testSplitWithMovingWAL(InputSplit split, byte[] col1, byte[] col2) throws Exception { 328 WALRecordReader<WALKey> reader = getReader(); 329 reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf)); 330 331 assertTrue(reader.nextKeyValue()); 332 Cell cell = reader.getCurrentValue().getCells().get(0); 333 if ( 334 !Bytes.equals(col1, 0, col1.length, cell.getQualifierArray(), cell.getQualifierOffset(), 335 cell.getQualifierLength()) 336 ) { 337 fail("expected [" + Bytes.toString(col1) + "], actual [" + Bytes.toString( 338 cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "]"); 339 } 340 // Move log file to archive directory 341 // While WAL record reader is open 342 WALInputFormat.WALSplit split_ = (WALInputFormat.WALSplit) split; 343 Path logFile = new Path(split_.getLogFileName()); 344 Path archivedLogDir = getWALArchiveDir(conf); 345 Path archivedLogLocation = new Path(archivedLogDir, logFile.getName()); 346 assertNotEquals(split_.getLogFileName(), archivedLogLocation.toString()); 347 348 assertTrue(fs.rename(logFile, archivedLogLocation)); 349 assertTrue(fs.exists(archivedLogDir)); 350 assertFalse(fs.exists(logFile)); 351 // TODO: This is not behaving as expected. WALInputFormat#WALKeyRecordReader doesn't open 352 // TODO: the archivedLogLocation to read next key value. 353 assertTrue(reader.nextKeyValue()); 354 cell = reader.getCurrentValue().getCells().get(0); 355 if ( 356 !Bytes.equals(col2, 0, col2.length, cell.getQualifierArray(), cell.getQualifierOffset(), 357 cell.getQualifierLength()) 358 ) { 359 fail("expected [" + Bytes.toString(col2) + "], actual [" + Bytes.toString( 360 cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "]"); 361 } 362 reader.close(); 363 } 364 365 private Path getWALArchiveDir(Configuration conf) throws IOException { 366 Path rootDir = CommonFSUtils.getWALRootDir(conf); 367 String archiveDir = AbstractFSWALProvider.getWALArchiveDirectoryName(conf, getServerName()); 368 return new Path(rootDir, archiveDir); 369 } 370}