001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021 022import java.util.ArrayList; 023import java.util.List; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.FileStatus; 026import org.apache.hadoop.fs.LocatedFileStatus; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseTestingUtil; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.regionserver.HRegionServer; 032import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 033import org.apache.hadoop.hbase.testclassification.MapReduceTests; 034import org.apache.hadoop.hbase.testclassification.MediumTests; 035import org.apache.hadoop.hbase.util.CommonFSUtils; 036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 037import org.apache.hadoop.mapreduce.InputSplit; 038import org.apache.hadoop.mapreduce.Job; 039import org.apache.hadoop.mapreduce.JobContext; 040import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 041import org.junit.BeforeClass; 042import org.junit.ClassRule; 043import org.junit.Test; 044import org.junit.experimental.categories.Category; 045import org.mockito.Mockito; 046 047@Category({ MapReduceTests.class, MediumTests.class }) 048public class TestWALInputFormat { 049 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 050 051 @ClassRule 052 public static final HBaseClassTestRule CLASS_RULE = 053 HBaseClassTestRule.forClass(TestWALInputFormat.class); 054 055 @BeforeClass 056 public static void setupClass() throws Exception { 057 TEST_UTIL.startMiniCluster(); 058 TEST_UTIL.createWALRootDir(); 059 } 060 061 /** 062 * Test the primitive start/end time filtering. 063 */ 064 @Test 065 public void testAddFile() { 066 List<FileStatus> lfss = new ArrayList<>(); 067 LocatedFileStatus lfs = Mockito.mock(LocatedFileStatus.class); 068 long now = EnvironmentEdgeManager.currentTime(); 069 Mockito.when(lfs.getPath()).thenReturn(new Path("/name." + now)); 070 WALInputFormat.addFile(lfss, lfs, now, now); 071 assertEquals(1, lfss.size()); 072 WALInputFormat.addFile(lfss, lfs, now - 1, now - 1); 073 assertEquals(1, lfss.size()); 074 WALInputFormat.addFile(lfss, lfs, now - 2, now - 1); 075 assertEquals(1, lfss.size()); 076 WALInputFormat.addFile(lfss, lfs, now - 2, now); 077 assertEquals(2, lfss.size()); 078 WALInputFormat.addFile(lfss, lfs, Long.MIN_VALUE, now); 079 assertEquals(3, lfss.size()); 080 WALInputFormat.addFile(lfss, lfs, Long.MIN_VALUE, Long.MAX_VALUE); 081 assertEquals(4, lfss.size()); 082 WALInputFormat.addFile(lfss, lfs, now, now + 2); 083 assertEquals(5, lfss.size()); 084 WALInputFormat.addFile(lfss, lfs, now + 1, now + 2); 085 assertEquals(5, lfss.size()); 086 Mockito.when(lfs.getPath()).thenReturn(new Path("/name")); 087 WALInputFormat.addFile(lfss, lfs, Long.MIN_VALUE, Long.MAX_VALUE); 088 assertEquals(6, lfss.size()); 089 Mockito.when(lfs.getPath()).thenReturn(new Path("/name.123")); 090 WALInputFormat.addFile(lfss, lfs, Long.MIN_VALUE, Long.MAX_VALUE); 091 assertEquals(7, lfss.size()); 092 Mockito.when(lfs.getPath()).thenReturn(new Path("/name." + now + ".meta")); 093 WALInputFormat.addFile(lfss, lfs, now, now); 094 assertEquals(8, lfss.size()); 095 } 096 097 @Test 098 public void testHandlesArchivedWALFiles() throws Exception { 099 Configuration conf = TEST_UTIL.getConfiguration(); 100 JobContext ctx = Mockito.mock(JobContext.class); 101 Mockito.when(ctx.getConfiguration()).thenReturn(conf); 102 Job job = Job.getInstance(conf); 103 TableMapReduceUtil.initCredentialsForCluster(job, conf); 104 Mockito.when(ctx.getCredentials()).thenReturn(job.getCredentials()); 105 106 // Setup WAL file, then archive it 107 HRegionServer rs = TEST_UTIL.getHBaseCluster().getRegionServer(0); 108 AbstractFSWAL wal = (AbstractFSWAL) rs.getWALs().get(0); 109 Path walPath = wal.getCurrentFileName(); 110 TEST_UTIL.getConfiguration().set(FileInputFormat.INPUT_DIR, walPath.toString()); 111 TEST_UTIL.getConfiguration().set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";"); 112 113 Path rootDir = CommonFSUtils.getWALRootDir(conf); 114 Path archiveWal = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); 115 archiveWal = new Path(archiveWal, walPath.getName()); 116 TEST_UTIL.getTestFileSystem().delete(walPath, true); 117 TEST_UTIL.getTestFileSystem().mkdirs(archiveWal.getParent()); 118 TEST_UTIL.getTestFileSystem().create(archiveWal).close(); 119 120 // Test for that we can read from the archived WAL file 121 WALInputFormat wif = new WALInputFormat(); 122 List<InputSplit> splits = wif.getSplits(ctx); 123 assertEquals(1, splits.size()); 124 WALInputFormat.WALSplit split = (WALInputFormat.WALSplit) splits.get(0); 125 assertEquals(archiveWal.toString(), split.getLogFileName()); 126 } 127 128}