001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapred; 019 020import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT; 021import static org.mockito.Mockito.mock; 022 023import java.io.IOException; 024import java.util.Iterator; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.HBaseClassTestRule; 027import org.apache.hadoop.hbase.HBaseTestingUtility; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.client.Result; 031import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 032import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatTestBase; 033import org.apache.hadoop.hbase.testclassification.LargeTests; 034import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.hbase.util.RegionSplitter; 037import org.apache.hadoop.io.NullWritable; 038import org.apache.hadoop.mapred.InputSplit; 039import org.apache.hadoop.mapred.JobClient; 040import org.apache.hadoop.mapred.JobConf; 041import org.apache.hadoop.mapred.MapReduceBase; 042import org.apache.hadoop.mapred.OutputCollector; 043import org.apache.hadoop.mapred.RecordReader; 044import org.apache.hadoop.mapred.Reducer; 045import org.apache.hadoop.mapred.Reporter; 046import org.apache.hadoop.mapred.RunningJob; 047import org.apache.hadoop.mapred.lib.NullOutputFormat; 048import org.junit.Assert; 049import org.junit.ClassRule; 050import org.junit.Ignore; 051import org.junit.Rule; 052import org.junit.Test; 053import org.junit.experimental.categories.Category; 054import org.junit.rules.TestName; 055 056@Category({ VerySlowMapReduceTests.class, LargeTests.class }) 057public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBase { 058 059 @ClassRule 060 public static final HBaseClassTestRule CLASS_RULE = 061 HBaseClassTestRule.forClass(TestTableSnapshotInputFormat.class); 062 063 private static final byte[] aaa = Bytes.toBytes("aaa"); 064 private static final byte[] after_zzz = Bytes.toBytes("zz{"); // 'z' + 1 => '{' 065 private static final String COLUMNS = 066 Bytes.toString(FAMILIES[0]) + " " + Bytes.toString(FAMILIES[1]); 067 068 @Rule 069 public TestName name = new TestName(); 070 071 @Override 072 protected byte[] getStartRow() { 073 return aaa; 074 } 075 076 @Override 077 protected byte[] getEndRow() { 078 return after_zzz; 079 } 080 081 static class TestTableSnapshotMapper extends MapReduceBase 082 implements TableMap<ImmutableBytesWritable, NullWritable> { 083 @Override 084 public void map(ImmutableBytesWritable key, Result value, 085 OutputCollector<ImmutableBytesWritable, NullWritable> collector, Reporter reporter) 086 throws IOException { 087 verifyRowFromMap(key, value); 088 collector.collect(key, NullWritable.get()); 089 } 090 } 091 092 public static class TestTableSnapshotReducer extends MapReduceBase 093 implements Reducer<ImmutableBytesWritable, NullWritable, NullWritable, NullWritable> { 094 HBaseTestingUtility.SeenRowTracker rowTracker = 095 new HBaseTestingUtility.SeenRowTracker(aaa, after_zzz); 096 097 @Override 098 public void reduce(ImmutableBytesWritable key, Iterator<NullWritable> values, 099 OutputCollector<NullWritable, NullWritable> collector, Reporter reporter) throws IOException { 100 rowTracker.addRow(key.get()); 101 } 102 103 @Override 104 public void close() { 105 rowTracker.validate(); 106 } 107 } 108 109 @Test 110 public void testInitTableSnapshotMapperJobConfig() throws Exception { 111 final TableName tableName = TableName.valueOf(name.getMethodName()); 112 String snapshotName = "foo"; 113 114 try { 115 createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1); 116 JobConf job = new JobConf(UTIL.getConfiguration()); 117 Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); 118 119 TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS, 120 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false, 121 tmpTableDir); 122 123 // TODO: would be better to examine directly the cache instance that results from this 124 // config. Currently this is not possible because BlockCache initialization is static. 125 Assert.assertEquals("Snapshot job should be configured for default LruBlockCache.", 126 HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT, 127 job.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, -1), 0.01); 128 Assert.assertEquals("Snapshot job should not use BucketCache.", 0, 129 job.getFloat("hbase.bucketcache.size", -1), 0.01); 130 } finally { 131 UTIL.getAdmin().deleteSnapshot(snapshotName); 132 UTIL.deleteTable(tableName); 133 } 134 } 135 136 // TODO: mapred does not support limiting input range by startrow, endrow. 137 // Thus the following tests must override parameterverification. 138 139 @Test 140 @Override 141 public void testWithMockedMapReduceMultiRegion() throws Exception { 142 testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 1, 10, true); 143 // It does not matter whether true or false is given to setLocalityEnabledTo, 144 // because it is not read in testWithMockedMapReduce(). 145 } 146 147 @Test 148 @Override 149 public void testWithMapReduceMultiRegion() throws Exception { 150 testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 1, 10, false); 151 } 152 153 @Test 154 @Override 155 // run the MR job while HBase is offline 156 public void testWithMapReduceAndOfflineHBaseMultiRegion() throws Exception { 157 testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 1, 10, true); 158 } 159 160 @Override 161 public void testRestoreSnapshotDoesNotCreateBackRefLinksInit(TableName tableName, 162 String snapshotName, Path tmpTableDir) throws Exception { 163 JobConf job = new JobConf(UTIL.getConfiguration()); 164 TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS, TestTableSnapshotMapper.class, 165 ImmutableBytesWritable.class, NullWritable.class, job, false, tmpTableDir); 166 } 167 168 @Override 169 protected void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName, 170 int numRegions, int numSplitsPerRegion, int expectedNumSplits, boolean setLocalityEnabledTo) 171 throws Exception { 172 final TableName tableName = TableName.valueOf(name.getMethodName()); 173 try { 174 createTableAndSnapshot(util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions); 175 176 JobConf job = new JobConf(util.getConfiguration()); 177 // setLocalityEnabledTo is ignored no matter what is specified, so as to test the case that 178 // SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY is not explicitly specified 179 // and the default value is taken. 180 Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName); 181 182 if (numSplitsPerRegion > 1) { 183 TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS, 184 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, 185 false, tmpTableDir, new RegionSplitter.UniformSplit(), numSplitsPerRegion); 186 } else { 187 TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS, 188 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, 189 false, tmpTableDir); 190 } 191 192 // mapred doesn't support start and end keys? o.O 193 verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow()); 194 195 } finally { 196 util.getAdmin().deleteSnapshot(snapshotName); 197 util.deleteTable(tableName); 198 } 199 } 200 201 private void verifyWithMockedMapReduce(JobConf job, int numRegions, int expectedNumSplits, 202 byte[] startRow, byte[] stopRow) throws IOException, InterruptedException { 203 TableSnapshotInputFormat tsif = new TableSnapshotInputFormat(); 204 InputSplit[] splits = tsif.getSplits(job, 0); 205 206 Assert.assertEquals(expectedNumSplits, splits.length); 207 208 HBaseTestingUtility.SeenRowTracker rowTracker = 209 new HBaseTestingUtility.SeenRowTracker(startRow, stopRow); 210 211 // SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY is not explicitly specified, 212 // so the default value is taken. 213 boolean localityEnabled = SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT; 214 215 for (int i = 0; i < splits.length; i++) { 216 // validate input split 217 InputSplit split = splits[i]; 218 Assert.assertTrue(split instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit); 219 if (localityEnabled) { 220 // When localityEnabled is true, meant to verify split.getLocations() 221 // by the following statement: 222 // Assert.assertTrue(split.getLocations() != null && split.getLocations().length != 0); 223 // However, getLocations() of some splits could return an empty array (length is 0), 224 // so drop the verification on length. 225 // TODO: investigate how to verify split.getLocations() when localityEnabled is true 226 Assert.assertTrue(split.getLocations() != null); 227 } else { 228 Assert.assertTrue(split.getLocations() != null && split.getLocations().length == 0); 229 } 230 231 // validate record reader 232 OutputCollector collector = mock(OutputCollector.class); 233 Reporter reporter = mock(Reporter.class); 234 RecordReader<ImmutableBytesWritable, Result> rr = tsif.getRecordReader(split, job, reporter); 235 236 // validate we can read all the data back 237 ImmutableBytesWritable key = rr.createKey(); 238 Result value = rr.createValue(); 239 while (rr.next(key, value)) { 240 verifyRowFromMap(key, value); 241 rowTracker.addRow(key.copyBytes()); 242 } 243 244 rr.close(); 245 } 246 247 // validate all rows are seen 248 rowTracker.validate(); 249 } 250 251 @Override 252 protected void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName, 253 String snapshotName, Path tableDir, int numRegions, int numSplitsPerRegion, 254 int expectedNumSplits, boolean shutdownCluster) throws Exception { 255 doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir, 256 numRegions, numSplitsPerRegion, expectedNumSplits, shutdownCluster); 257 } 258 259 // this is also called by the IntegrationTestTableSnapshotInputFormat 260 public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName, 261 String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions, 262 int numSplitsPerRegion, int expectedNumSplits, boolean shutdownCluster) throws Exception { 263 264 // create the table and snapshot 265 createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions); 266 267 if (shutdownCluster) { 268 util.shutdownMiniHBaseCluster(); 269 } 270 271 try { 272 // create the job 273 JobConf jobConf = new JobConf(util.getConfiguration()); 274 275 jobConf.setJarByClass(util.getClass()); 276 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJarsForClasses(jobConf, 277 TestTableSnapshotInputFormat.class); 278 279 if (numSplitsPerRegion > 1) { 280 TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS, 281 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, jobConf, 282 true, tableDir, new RegionSplitter.UniformSplit(), numSplitsPerRegion); 283 } else { 284 TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS, 285 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, jobConf, 286 true, tableDir); 287 } 288 289 jobConf.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class); 290 jobConf.setNumReduceTasks(1); 291 jobConf.setOutputFormat(NullOutputFormat.class); 292 293 RunningJob job = JobClient.runJob(jobConf); 294 Assert.assertTrue(job.isSuccessful()); 295 } finally { 296 if (!shutdownCluster) { 297 util.getAdmin().deleteSnapshot(snapshotName); 298 util.deleteTable(tableName); 299 } 300 } 301 } 302 303 @Ignore // Ignored in mapred package because it keeps failing but allowed in mapreduce package. 304 @Test 305 public void testWithMapReduceMultipleMappersPerRegion() throws Exception { 306 testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 5, 50, false); 307 } 308}