001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapred; 019 020import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT; 021import static org.mockito.Mockito.mock; 022 023import java.io.IOException; 024import java.util.Iterator; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.HBaseClassTestRule; 027import org.apache.hadoop.hbase.HBaseTestingUtility; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.client.Result; 031import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 032import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatTestBase; 033import org.apache.hadoop.hbase.testclassification.LargeTests; 034import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.hbase.util.RegionSplitter; 037import org.apache.hadoop.io.NullWritable; 038import org.apache.hadoop.mapred.InputSplit; 039import org.apache.hadoop.mapred.JobClient; 040import org.apache.hadoop.mapred.JobConf; 041import org.apache.hadoop.mapred.MapReduceBase; 042import org.apache.hadoop.mapred.OutputCollector; 043import org.apache.hadoop.mapred.RecordReader; 044import org.apache.hadoop.mapred.Reducer; 045import org.apache.hadoop.mapred.Reporter; 046import org.apache.hadoop.mapred.RunningJob; 047import org.apache.hadoop.mapred.lib.NullOutputFormat; 048import org.junit.Assert; 049import org.junit.ClassRule; 050import org.junit.Ignore; 051import org.junit.Rule; 052import org.junit.Test; 053import org.junit.experimental.categories.Category; 054import org.junit.rules.TestName; 055 056@Category({VerySlowMapReduceTests.class, LargeTests.class}) 057public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBase { 058 059 @ClassRule 060 public static final HBaseClassTestRule CLASS_RULE = 061 HBaseClassTestRule.forClass(TestTableSnapshotInputFormat.class); 062 063 private static final byte[] aaa = Bytes.toBytes("aaa"); 064 private static final byte[] after_zzz = Bytes.toBytes("zz{"); // 'z' + 1 => '{' 065 private static final String COLUMNS = 066 Bytes.toString(FAMILIES[0]) + " " + Bytes.toString(FAMILIES[1]); 067 068 @Rule 069 public TestName name = new TestName(); 070 071 @Override 072 protected byte[] getStartRow() { 073 return aaa; 074 } 075 076 @Override 077 protected byte[] getEndRow() { 078 return after_zzz; 079 } 080 081 static class TestTableSnapshotMapper extends MapReduceBase 082 implements TableMap<ImmutableBytesWritable, NullWritable> { 083 @Override 084 public void map(ImmutableBytesWritable key, Result value, 085 OutputCollector<ImmutableBytesWritable, NullWritable> collector, Reporter reporter) 086 throws IOException { 087 verifyRowFromMap(key, value); 088 collector.collect(key, NullWritable.get()); 089 } 090 } 091 092 public static class TestTableSnapshotReducer extends MapReduceBase 093 implements Reducer<ImmutableBytesWritable, NullWritable, NullWritable, NullWritable> { 094 HBaseTestingUtility.SeenRowTracker rowTracker = 095 new HBaseTestingUtility.SeenRowTracker(aaa, after_zzz); 096 097 @Override 098 public void reduce(ImmutableBytesWritable key, Iterator<NullWritable> values, 099 OutputCollector<NullWritable, NullWritable> collector, Reporter reporter) 100 throws IOException { 101 rowTracker.addRow(key.get()); 102 } 103 104 @Override 105 public void close() { 106 rowTracker.validate(); 107 } 108 } 109 110 @Test 111 public void testInitTableSnapshotMapperJobConfig() throws Exception { 112 final TableName tableName = TableName.valueOf(name.getMethodName()); 113 String snapshotName = "foo"; 114 115 try { 116 createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1); 117 JobConf job = new JobConf(UTIL.getConfiguration()); 118 Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); 119 120 TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, 121 COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, 122 NullWritable.class, job, false, tmpTableDir); 123 124 // TODO: would be better to examine directly the cache instance that results from this 125 // config. Currently this is not possible because BlockCache initialization is static. 126 Assert.assertEquals( 127 "Snapshot job should be configured for default LruBlockCache.", 128 HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT, 129 job.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, -1), 0.01); 130 Assert.assertEquals( 131 "Snapshot job should not use BucketCache.", 132 0, job.getFloat("hbase.bucketcache.size", -1), 0.01); 133 } finally { 134 UTIL.getAdmin().deleteSnapshot(snapshotName); 135 UTIL.deleteTable(tableName); 136 } 137 } 138 139 // TODO: mapred does not support limiting input range by startrow, endrow. 140 // Thus the following tests must override parameterverification. 141 142 @Test 143 @Override 144 public void testWithMockedMapReduceMultiRegion() throws Exception { 145 testWithMockedMapReduce( 146 UTIL, "testWithMockedMapReduceMultiRegion", 10, 1, 10, true); 147 // It does not matter whether true or false is given to setLocalityEnabledTo, 148 // because it is not read in testWithMockedMapReduce(). 149 } 150 151 @Test 152 @Override 153 public void testWithMapReduceMultiRegion() throws Exception { 154 testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 1, 10, false); 155 } 156 157 @Test 158 @Override 159 // run the MR job while HBase is offline 160 public void testWithMapReduceAndOfflineHBaseMultiRegion() throws Exception { 161 testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 1, 10, true); 162 } 163 164 @Override 165 public void testRestoreSnapshotDoesNotCreateBackRefLinksInit(TableName tableName, 166 String snapshotName, Path tmpTableDir) throws Exception { 167 JobConf job = new JobConf(UTIL.getConfiguration()); 168 TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, 169 COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, 170 NullWritable.class, job, false, tmpTableDir); 171 } 172 173 @Override 174 protected void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName, 175 int numRegions, int numSplitsPerRegion, int expectedNumSplits, boolean setLocalityEnabledTo) 176 throws Exception { 177 final TableName tableName = TableName.valueOf(name.getMethodName()); 178 try { 179 createTableAndSnapshot( 180 util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions); 181 182 JobConf job = new JobConf(util.getConfiguration()); 183 // setLocalityEnabledTo is ignored no matter what is specified, so as to test the case that 184 // SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY is not explicitly specified 185 // and the default value is taken. 186 Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName); 187 188 if (numSplitsPerRegion > 1) { 189 TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, 190 COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, 191 NullWritable.class, job, false, tmpTableDir, new RegionSplitter.UniformSplit(), 192 numSplitsPerRegion); 193 } else { 194 TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, 195 COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, 196 NullWritable.class, job, false, tmpTableDir); 197 } 198 199 // mapred doesn't support start and end keys? o.O 200 verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow()); 201 202 } finally { 203 util.getAdmin().deleteSnapshot(snapshotName); 204 util.deleteTable(tableName); 205 } 206 } 207 208 private void verifyWithMockedMapReduce(JobConf job, int numRegions, int expectedNumSplits, 209 byte[] startRow, byte[] stopRow) throws IOException, InterruptedException { 210 TableSnapshotInputFormat tsif = new TableSnapshotInputFormat(); 211 InputSplit[] splits = tsif.getSplits(job, 0); 212 213 Assert.assertEquals(expectedNumSplits, splits.length); 214 215 HBaseTestingUtility.SeenRowTracker rowTracker = 216 new HBaseTestingUtility.SeenRowTracker(startRow, stopRow); 217 218 // SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY is not explicitly specified, 219 // so the default value is taken. 220 boolean localityEnabled = SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT; 221 222 for (int i = 0; i < splits.length; i++) { 223 // validate input split 224 InputSplit split = splits[i]; 225 Assert.assertTrue(split instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit); 226 if (localityEnabled) { 227 // When localityEnabled is true, meant to verify split.getLocations() 228 // by the following statement: 229 // Assert.assertTrue(split.getLocations() != null && split.getLocations().length != 0); 230 // However, getLocations() of some splits could return an empty array (length is 0), 231 // so drop the verification on length. 232 // TODO: investigate how to verify split.getLocations() when localityEnabled is true 233 Assert.assertTrue(split.getLocations() != null); 234 } else { 235 Assert.assertTrue(split.getLocations() != null && split.getLocations().length == 0); 236 } 237 238 // validate record reader 239 OutputCollector collector = mock(OutputCollector.class); 240 Reporter reporter = mock(Reporter.class); 241 RecordReader<ImmutableBytesWritable, Result> rr = tsif.getRecordReader(split, job, reporter); 242 243 // validate we can read all the data back 244 ImmutableBytesWritable key = rr.createKey(); 245 Result value = rr.createValue(); 246 while (rr.next(key, value)) { 247 verifyRowFromMap(key, value); 248 rowTracker.addRow(key.copyBytes()); 249 } 250 251 rr.close(); 252 } 253 254 // validate all rows are seen 255 rowTracker.validate(); 256 } 257 258 @Override 259 protected void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName, 260 String snapshotName, Path tableDir, int numRegions, int numSplitsPerRegion, 261 int expectedNumSplits, boolean shutdownCluster) throws Exception { 262 doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir, 263 numRegions, numSplitsPerRegion, expectedNumSplits, shutdownCluster); 264 } 265 266 // this is also called by the IntegrationTestTableSnapshotInputFormat 267 public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName, 268 String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions, 269 int numSplitsPerRegion,int expectedNumSplits, boolean shutdownCluster) throws Exception { 270 271 //create the table and snapshot 272 createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions); 273 274 if (shutdownCluster) { 275 util.shutdownMiniHBaseCluster(); 276 } 277 278 try { 279 // create the job 280 JobConf jobConf = new JobConf(util.getConfiguration()); 281 282 jobConf.setJarByClass(util.getClass()); 283 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJarsForClasses(jobConf, 284 TestTableSnapshotInputFormat.class); 285 286 if(numSplitsPerRegion > 1) { 287 TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS, 288 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, 289 NullWritable.class, jobConf, true, tableDir, new RegionSplitter.UniformSplit(), 290 numSplitsPerRegion); 291 } else { 292 TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS, 293 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, 294 NullWritable.class, jobConf, true, tableDir); 295 } 296 297 jobConf.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class); 298 jobConf.setNumReduceTasks(1); 299 jobConf.setOutputFormat(NullOutputFormat.class); 300 301 RunningJob job = JobClient.runJob(jobConf); 302 Assert.assertTrue(job.isSuccessful()); 303 } finally { 304 if (!shutdownCluster) { 305 util.getAdmin().deleteSnapshot(snapshotName); 306 util.deleteTable(tableName); 307 } 308 } 309 } 310 311 @Ignore // Ignored in mapred package because it keeps failing but allowed in mapreduce package. 312 @Test 313 public void testWithMapReduceMultipleMappersPerRegion() throws Exception { 314 testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 5, 50, false); 315 } 316}