001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapred; 019 020import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT; 021import static org.mockito.Mockito.mock; 022 023import java.io.IOException; 024import java.util.Iterator; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.HBaseClassTestRule; 027import org.apache.hadoop.hbase.HBaseTestingUtility; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.client.Result; 031import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 032import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatTestBase; 033import org.apache.hadoop.hbase.testclassification.LargeTests; 034import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.hbase.util.RegionSplitter; 037import org.apache.hadoop.io.NullWritable; 038import org.apache.hadoop.mapred.InputSplit; 039import org.apache.hadoop.mapred.JobClient; 040import org.apache.hadoop.mapred.JobConf; 041import org.apache.hadoop.mapred.MapReduceBase; 042import org.apache.hadoop.mapred.OutputCollector; 043import org.apache.hadoop.mapred.RecordReader; 044import org.apache.hadoop.mapred.Reducer; 045import org.apache.hadoop.mapred.Reporter; 046import org.apache.hadoop.mapred.RunningJob; 047import org.apache.hadoop.mapred.lib.NullOutputFormat; 048import org.junit.Assert; 049import org.junit.ClassRule; 050import org.junit.Ignore; 051import org.junit.Rule; 052import org.junit.Test; 053import org.junit.experimental.categories.Category; 054import org.junit.rules.TestName; 055 056@Category({VerySlowMapReduceTests.class, LargeTests.class}) 057public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBase { 058 059 @ClassRule 060 public static final HBaseClassTestRule CLASS_RULE = 061 HBaseClassTestRule.forClass(TestTableSnapshotInputFormat.class); 062 063 private static final byte[] aaa = Bytes.toBytes("aaa"); 064 private static final byte[] after_zzz = Bytes.toBytes("zz{"); // 'z' + 1 => '{' 065 private static final String COLUMNS = 066 Bytes.toString(FAMILIES[0]) + " " + Bytes.toString(FAMILIES[1]); 067 068 @Rule 069 public TestName name = new TestName(); 070 071 @Override 072 protected byte[] getStartRow() { 073 return aaa; 074 } 075 076 @Override 077 protected byte[] getEndRow() { 078 return after_zzz; 079 } 080 081 static class TestTableSnapshotMapper extends MapReduceBase 082 implements TableMap<ImmutableBytesWritable, NullWritable> { 083 @Override 084 public void map(ImmutableBytesWritable key, Result value, 085 OutputCollector<ImmutableBytesWritable, NullWritable> collector, Reporter reporter) 086 throws IOException { 087 verifyRowFromMap(key, value); 088 collector.collect(key, NullWritable.get()); 089 } 090 } 091 092 public static class TestTableSnapshotReducer extends MapReduceBase 093 implements Reducer<ImmutableBytesWritable, NullWritable, NullWritable, NullWritable> { 094 HBaseTestingUtility.SeenRowTracker rowTracker = 095 new HBaseTestingUtility.SeenRowTracker(aaa, after_zzz); 096 097 @Override 098 public void reduce(ImmutableBytesWritable key, Iterator<NullWritable> values, 099 OutputCollector<NullWritable, NullWritable> collector, Reporter reporter) 100 throws IOException { 101 rowTracker.addRow(key.get()); 102 } 103 104 @Override 105 public void close() { 106 rowTracker.validate(); 107 } 108 } 109 110 @Test 111 public void testInitTableSnapshotMapperJobConfig() throws Exception { 112 setupCluster(); 113 final TableName tableName = TableName.valueOf(name.getMethodName()); 114 String snapshotName = "foo"; 115 116 try { 117 createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1); 118 JobConf job = new JobConf(UTIL.getConfiguration()); 119 Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); 120 121 TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, 122 COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, 123 NullWritable.class, job, false, tmpTableDir); 124 125 // TODO: would be better to examine directly the cache instance that results from this 126 // config. Currently this is not possible because BlockCache initialization is static. 127 Assert.assertEquals( 128 "Snapshot job should be configured for default LruBlockCache.", 129 HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT, 130 job.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, -1), 0.01); 131 Assert.assertEquals( 132 "Snapshot job should not use BucketCache.", 133 0, job.getFloat("hbase.bucketcache.size", -1), 0.01); 134 } finally { 135 UTIL.getAdmin().deleteSnapshot(snapshotName); 136 UTIL.deleteTable(tableName); 137 tearDownCluster(); 138 } 139 } 140 141 // TODO: mapred does not support limiting input range by startrow, endrow. 142 // Thus the following tests must override parameterverification. 143 144 @Test 145 @Override 146 public void testWithMockedMapReduceMultiRegion() throws Exception { 147 testWithMockedMapReduce( 148 UTIL, "testWithMockedMapReduceMultiRegion", 10, 1, 10, true); 149 // It does not matter whether true or false is given to setLocalityEnabledTo, 150 // because it is not read in testWithMockedMapReduce(). 151 } 152 153 @Test 154 @Override 155 public void testWithMapReduceMultiRegion() throws Exception { 156 testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 1, 10, false); 157 } 158 159 @Test 160 @Override 161 // run the MR job while HBase is offline 162 public void testWithMapReduceAndOfflineHBaseMultiRegion() throws Exception { 163 testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 1, 10, true); 164 } 165 166 @Override 167 public void testRestoreSnapshotDoesNotCreateBackRefLinksInit(TableName tableName, 168 String snapshotName, Path tmpTableDir) throws Exception { 169 JobConf job = new JobConf(UTIL.getConfiguration()); 170 TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, 171 COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, 172 NullWritable.class, job, false, tmpTableDir); 173 } 174 175 @Override 176 protected void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName, 177 int numRegions, int numSplitsPerRegion, int expectedNumSplits, boolean setLocalityEnabledTo) 178 throws Exception { 179 setupCluster(); 180 final TableName tableName = TableName.valueOf(name.getMethodName()); 181 try { 182 createTableAndSnapshot( 183 util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions); 184 185 JobConf job = new JobConf(util.getConfiguration()); 186 // setLocalityEnabledTo is ignored no matter what is specified, so as to test the case that 187 // SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY is not explicitly specified 188 // and the default value is taken. 189 Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName); 190 191 if (numSplitsPerRegion > 1) { 192 TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, 193 COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, 194 NullWritable.class, job, false, tmpTableDir, new RegionSplitter.UniformSplit(), 195 numSplitsPerRegion); 196 } else { 197 TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, 198 COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, 199 NullWritable.class, job, false, tmpTableDir); 200 } 201 202 // mapred doesn't support start and end keys? o.O 203 verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow()); 204 205 } finally { 206 util.getAdmin().deleteSnapshot(snapshotName); 207 util.deleteTable(tableName); 208 tearDownCluster(); 209 } 210 } 211 212 private void verifyWithMockedMapReduce(JobConf job, int numRegions, int expectedNumSplits, 213 byte[] startRow, byte[] stopRow) throws IOException, InterruptedException { 214 TableSnapshotInputFormat tsif = new TableSnapshotInputFormat(); 215 InputSplit[] splits = tsif.getSplits(job, 0); 216 217 Assert.assertEquals(expectedNumSplits, splits.length); 218 219 HBaseTestingUtility.SeenRowTracker rowTracker = 220 new HBaseTestingUtility.SeenRowTracker(startRow, stopRow); 221 222 // SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY is not explicitly specified, 223 // so the default value is taken. 224 boolean localityEnabled = SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT; 225 226 for (int i = 0; i < splits.length; i++) { 227 // validate input split 228 InputSplit split = splits[i]; 229 Assert.assertTrue(split instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit); 230 if (localityEnabled) { 231 // When localityEnabled is true, meant to verify split.getLocations() 232 // by the following statement: 233 // Assert.assertTrue(split.getLocations() != null && split.getLocations().length != 0); 234 // However, getLocations() of some splits could return an empty array (length is 0), 235 // so drop the verification on length. 236 // TODO: investigate how to verify split.getLocations() when localityEnabled is true 237 Assert.assertTrue(split.getLocations() != null); 238 } else { 239 Assert.assertTrue(split.getLocations() != null && split.getLocations().length == 0); 240 } 241 242 // validate record reader 243 OutputCollector collector = mock(OutputCollector.class); 244 Reporter reporter = mock(Reporter.class); 245 RecordReader<ImmutableBytesWritable, Result> rr = tsif.getRecordReader(split, job, reporter); 246 247 // validate we can read all the data back 248 ImmutableBytesWritable key = rr.createKey(); 249 Result value = rr.createValue(); 250 while (rr.next(key, value)) { 251 verifyRowFromMap(key, value); 252 rowTracker.addRow(key.copyBytes()); 253 } 254 255 rr.close(); 256 } 257 258 // validate all rows are seen 259 rowTracker.validate(); 260 } 261 262 @Override 263 protected void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName, 264 String snapshotName, Path tableDir, int numRegions, int numSplitsPerRegion, int expectedNumSplits, 265 boolean shutdownCluster) throws Exception { 266 doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir, 267 numRegions, numSplitsPerRegion, expectedNumSplits, shutdownCluster); 268 } 269 270 // this is also called by the IntegrationTestTableSnapshotInputFormat 271 public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName, 272 String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions, 273 int numSplitsPerRegion,int expectedNumSplits, boolean shutdownCluster) throws Exception { 274 275 //create the table and snapshot 276 createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions); 277 278 if (shutdownCluster) { 279 util.shutdownMiniHBaseCluster(); 280 } 281 282 try { 283 // create the job 284 JobConf jobConf = new JobConf(util.getConfiguration()); 285 286 jobConf.setJarByClass(util.getClass()); 287 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJarsForClasses(jobConf, 288 TestTableSnapshotInputFormat.class); 289 290 if(numSplitsPerRegion > 1) { 291 TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS, 292 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, 293 NullWritable.class, jobConf, true, tableDir, new RegionSplitter.UniformSplit(), 294 numSplitsPerRegion); 295 } else { 296 TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS, 297 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, 298 NullWritable.class, jobConf, true, tableDir); 299 } 300 301 jobConf.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class); 302 jobConf.setNumReduceTasks(1); 303 jobConf.setOutputFormat(NullOutputFormat.class); 304 305 RunningJob job = JobClient.runJob(jobConf); 306 Assert.assertTrue(job.isSuccessful()); 307 } finally { 308 if (!shutdownCluster) { 309 util.getAdmin().deleteSnapshot(snapshotName); 310 util.deleteTable(tableName); 311 } 312 } 313 } 314 315 @Ignore // Ignored in mapred package because it keeps failing but allowed in mapreduce package. 316 @Test 317 public void testWithMapReduceMultipleMappersPerRegion() throws Exception { 318 testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 5, 50, false); 319 } 320}