001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021 022import java.util.ArrayList; 023import java.util.List; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.FileStatus; 026import org.apache.hadoop.fs.LocatedFileStatus; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HBaseTestingUtil; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.regionserver.HRegionServer; 031import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 032import org.apache.hadoop.hbase.testclassification.MapReduceTests; 033import org.apache.hadoop.hbase.testclassification.MediumTests; 034import org.apache.hadoop.hbase.util.CommonFSUtils; 035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 036import org.apache.hadoop.mapreduce.InputSplit; 037import org.apache.hadoop.mapreduce.Job; 038import org.apache.hadoop.mapreduce.JobContext; 039import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 040import org.junit.jupiter.api.BeforeAll; 041import org.junit.jupiter.api.Tag; 042import org.junit.jupiter.api.Test; 043import org.mockito.Mockito; 044 045@Tag(MapReduceTests.TAG) 046@Tag(MediumTests.TAG) 047public class TestWALInputFormat { 048 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 049 050 @BeforeAll 051 public static void setupClass() throws Exception { 052 TEST_UTIL.startMiniCluster(); 053 TEST_UTIL.createWALRootDir(); 054 } 055 056 /** 057 * Test the primitive start/end time filtering. 058 */ 059 @Test 060 public void testAddFile() { 061 List<FileStatus> lfss = new ArrayList<>(); 062 LocatedFileStatus lfs = Mockito.mock(LocatedFileStatus.class); 063 long now = EnvironmentEdgeManager.currentTime(); 064 Mockito.when(lfs.getPath()).thenReturn(new Path("/name." + now)); 065 WALInputFormat.addFile(lfss, lfs, now, now); 066 assertEquals(1, lfss.size()); 067 WALInputFormat.addFile(lfss, lfs, now - 1, now - 1); 068 assertEquals(1, lfss.size()); 069 WALInputFormat.addFile(lfss, lfs, now - 2, now - 1); 070 assertEquals(1, lfss.size()); 071 WALInputFormat.addFile(lfss, lfs, now - 2, now); 072 assertEquals(2, lfss.size()); 073 WALInputFormat.addFile(lfss, lfs, Long.MIN_VALUE, now); 074 assertEquals(3, lfss.size()); 075 WALInputFormat.addFile(lfss, lfs, Long.MIN_VALUE, Long.MAX_VALUE); 076 assertEquals(4, lfss.size()); 077 WALInputFormat.addFile(lfss, lfs, now, now + 2); 078 assertEquals(5, lfss.size()); 079 WALInputFormat.addFile(lfss, lfs, now + 1, now + 2); 080 assertEquals(5, lfss.size()); 081 Mockito.when(lfs.getPath()).thenReturn(new Path("/name")); 082 WALInputFormat.addFile(lfss, lfs, Long.MIN_VALUE, Long.MAX_VALUE); 083 assertEquals(6, lfss.size()); 084 Mockito.when(lfs.getPath()).thenReturn(new Path("/name.123")); 085 WALInputFormat.addFile(lfss, lfs, Long.MIN_VALUE, Long.MAX_VALUE); 086 assertEquals(7, lfss.size()); 087 Mockito.when(lfs.getPath()).thenReturn(new Path("/name." + now + ".meta")); 088 WALInputFormat.addFile(lfss, lfs, now, now); 089 assertEquals(8, lfss.size()); 090 } 091 092 @Test 093 public void testHandlesArchivedWALFiles() throws Exception { 094 Configuration conf = TEST_UTIL.getConfiguration(); 095 JobContext ctx = Mockito.mock(JobContext.class); 096 Mockito.when(ctx.getConfiguration()).thenReturn(conf); 097 Job job = Job.getInstance(conf); 098 TableMapReduceUtil.initCredentialsForCluster(job, conf); 099 Mockito.when(ctx.getCredentials()).thenReturn(job.getCredentials()); 100 101 // Setup WAL file, then archive it 102 HRegionServer rs = TEST_UTIL.getHBaseCluster().getRegionServer(0); 103 AbstractFSWAL wal = (AbstractFSWAL) rs.getWALs().get(0); 104 Path walPath = wal.getCurrentFileName(); 105 TEST_UTIL.getConfiguration().set(FileInputFormat.INPUT_DIR, walPath.toString()); 106 TEST_UTIL.getConfiguration().set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";"); 107 108 Path rootDir = CommonFSUtils.getWALRootDir(conf); 109 Path archiveWal = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); 110 archiveWal = new Path(archiveWal, walPath.getName()); 111 TEST_UTIL.getTestFileSystem().delete(walPath, true); 112 TEST_UTIL.getTestFileSystem().mkdirs(archiveWal.getParent()); 113 TEST_UTIL.getTestFileSystem().create(archiveWal).close(); 114 115 // Test for that we can read from the archived WAL file 116 WALInputFormat wif = new WALInputFormat(); 117 List<InputSplit> splits = wif.getSplits(ctx); 118 assertEquals(1, splits.size()); 119 WALInputFormat.WALSplit split = (WALInputFormat.WALSplit) splits.get(0); 120 assertEquals(archiveWal.toString(), split.getLogFileName()); 121 } 122 123}