001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapred;
019
020import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT;
021import static org.mockito.Mockito.mock;
022
023import java.io.IOException;
024import java.util.Iterator;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.HBaseClassTestRule;
027import org.apache.hadoop.hbase.HBaseTestingUtility;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.Result;
031import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
032import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatTestBase;
033import org.apache.hadoop.hbase.testclassification.LargeTests;
034import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.hbase.util.RegionSplitter;
037import org.apache.hadoop.io.NullWritable;
038import org.apache.hadoop.mapred.InputSplit;
039import org.apache.hadoop.mapred.JobClient;
040import org.apache.hadoop.mapred.JobConf;
041import org.apache.hadoop.mapred.MapReduceBase;
042import org.apache.hadoop.mapred.OutputCollector;
043import org.apache.hadoop.mapred.RecordReader;
044import org.apache.hadoop.mapred.Reducer;
045import org.apache.hadoop.mapred.Reporter;
046import org.apache.hadoop.mapred.RunningJob;
047import org.apache.hadoop.mapred.lib.NullOutputFormat;
048import org.junit.Assert;
049import org.junit.ClassRule;
050import org.junit.Ignore;
051import org.junit.Rule;
052import org.junit.Test;
053import org.junit.experimental.categories.Category;
054import org.junit.rules.TestName;
055
056@Category({VerySlowMapReduceTests.class, LargeTests.class})
057public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBase {
058
059  @ClassRule
060  public static final HBaseClassTestRule CLASS_RULE =
061      HBaseClassTestRule.forClass(TestTableSnapshotInputFormat.class);
062
063  private static final byte[] aaa = Bytes.toBytes("aaa");
064  private static final byte[] after_zzz = Bytes.toBytes("zz{"); // 'z' + 1 => '{'
065  private static final String COLUMNS =
066    Bytes.toString(FAMILIES[0]) + " " + Bytes.toString(FAMILIES[1]);
067
068  @Rule
069  public TestName name = new TestName();
070
071  @Override
072  protected byte[] getStartRow() {
073    return aaa;
074  }
075
076  @Override
077  protected byte[] getEndRow() {
078    return after_zzz;
079  }
080
081  static class TestTableSnapshotMapper extends MapReduceBase
082      implements TableMap<ImmutableBytesWritable, NullWritable> {
083    @Override
084    public void map(ImmutableBytesWritable key, Result value,
085        OutputCollector<ImmutableBytesWritable, NullWritable> collector, Reporter reporter)
086        throws IOException {
087      verifyRowFromMap(key, value);
088      collector.collect(key, NullWritable.get());
089    }
090  }
091
092  public static class TestTableSnapshotReducer extends MapReduceBase
093      implements Reducer<ImmutableBytesWritable, NullWritable, NullWritable, NullWritable> {
094    HBaseTestingUtility.SeenRowTracker rowTracker =
095      new HBaseTestingUtility.SeenRowTracker(aaa, after_zzz);
096
097    @Override
098    public void reduce(ImmutableBytesWritable key, Iterator<NullWritable> values,
099        OutputCollector<NullWritable, NullWritable> collector, Reporter reporter)
100        throws IOException {
101      rowTracker.addRow(key.get());
102    }
103
104    @Override
105    public void close() {
106      rowTracker.validate();
107    }
108  }
109
110  @Test
111  public void testInitTableSnapshotMapperJobConfig() throws Exception {
112    setupCluster();
113    final TableName tableName = TableName.valueOf(name.getMethodName());
114    String snapshotName = "foo";
115
116    try {
117      createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1);
118      JobConf job = new JobConf(UTIL.getConfiguration());
119      Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
120
121      TableMapReduceUtil.initTableSnapshotMapJob(snapshotName,
122        COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
123        NullWritable.class, job, false, tmpTableDir);
124
125      // TODO: would be better to examine directly the cache instance that results from this
126      // config. Currently this is not possible because BlockCache initialization is static.
127      Assert.assertEquals(
128        "Snapshot job should be configured for default LruBlockCache.",
129        HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT,
130        job.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, -1), 0.01);
131      Assert.assertEquals(
132        "Snapshot job should not use BucketCache.",
133        0, job.getFloat("hbase.bucketcache.size", -1), 0.01);
134    } finally {
135      UTIL.getAdmin().deleteSnapshot(snapshotName);
136      UTIL.deleteTable(tableName);
137      tearDownCluster();
138    }
139  }
140
141  // TODO: mapred does not support limiting input range by startrow, endrow.
142  // Thus the following tests must override parameterverification.
143
144  @Test
145  @Override
146  public void testWithMockedMapReduceMultiRegion() throws Exception {
147    testWithMockedMapReduce(
148        UTIL, "testWithMockedMapReduceMultiRegion", 10, 1, 10, true);
149        // It does not matter whether true or false is given to setLocalityEnabledTo,
150        // because it is not read in testWithMockedMapReduce().
151  }
152
153  @Test
154  @Override
155  public void testWithMapReduceMultiRegion() throws Exception {
156    testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 1, 10, false);
157  }
158
159  @Test
160  @Override
161  // run the MR job while HBase is offline
162  public void testWithMapReduceAndOfflineHBaseMultiRegion() throws Exception {
163    testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 1, 10, true);
164  }
165
166  @Override
167  public void testRestoreSnapshotDoesNotCreateBackRefLinksInit(TableName tableName,
168      String snapshotName, Path tmpTableDir) throws Exception {
169    JobConf job = new JobConf(UTIL.getConfiguration());
170    TableMapReduceUtil.initTableSnapshotMapJob(snapshotName,
171      COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
172      NullWritable.class, job, false, tmpTableDir);
173  }
174
175  @Override
176  protected void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName,
177      int numRegions, int numSplitsPerRegion, int expectedNumSplits, boolean setLocalityEnabledTo)
178      throws Exception {
179    setupCluster();
180    final TableName tableName = TableName.valueOf(name.getMethodName());
181    try {
182      createTableAndSnapshot(
183        util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions);
184
185      JobConf job = new JobConf(util.getConfiguration());
186      // setLocalityEnabledTo is ignored no matter what is specified, so as to test the case that
187      // SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY is not explicitly specified
188      // and the default value is taken.
189      Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName);
190
191      if (numSplitsPerRegion > 1) {
192        TableMapReduceUtil.initTableSnapshotMapJob(snapshotName,
193                COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
194                NullWritable.class, job, false, tmpTableDir, new RegionSplitter.UniformSplit(),
195                numSplitsPerRegion);
196      } else {
197        TableMapReduceUtil.initTableSnapshotMapJob(snapshotName,
198                COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
199                NullWritable.class, job, false, tmpTableDir);
200      }
201
202      // mapred doesn't support start and end keys? o.O
203      verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow());
204
205    } finally {
206      util.getAdmin().deleteSnapshot(snapshotName);
207      util.deleteTable(tableName);
208      tearDownCluster();
209    }
210  }
211
212  private void verifyWithMockedMapReduce(JobConf job, int numRegions, int expectedNumSplits,
213      byte[] startRow, byte[] stopRow) throws IOException, InterruptedException {
214    TableSnapshotInputFormat tsif = new TableSnapshotInputFormat();
215    InputSplit[] splits = tsif.getSplits(job, 0);
216
217    Assert.assertEquals(expectedNumSplits, splits.length);
218
219    HBaseTestingUtility.SeenRowTracker rowTracker =
220      new HBaseTestingUtility.SeenRowTracker(startRow, stopRow);
221
222    // SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY is not explicitly specified,
223    // so the default value is taken.
224    boolean localityEnabled = SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT;
225
226    for (int i = 0; i < splits.length; i++) {
227      // validate input split
228      InputSplit split = splits[i];
229      Assert.assertTrue(split instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit);
230      if (localityEnabled) {
231        // When localityEnabled is true, meant to verify split.getLocations()
232        // by the following statement:
233        //   Assert.assertTrue(split.getLocations() != null && split.getLocations().length != 0);
234        // However, getLocations() of some splits could return an empty array (length is 0),
235        // so drop the verification on length.
236        // TODO: investigate how to verify split.getLocations() when localityEnabled is true
237        Assert.assertTrue(split.getLocations() != null);
238      } else {
239        Assert.assertTrue(split.getLocations() != null && split.getLocations().length == 0);
240      }
241
242      // validate record reader
243      OutputCollector collector = mock(OutputCollector.class);
244      Reporter reporter = mock(Reporter.class);
245      RecordReader<ImmutableBytesWritable, Result> rr = tsif.getRecordReader(split, job, reporter);
246
247      // validate we can read all the data back
248      ImmutableBytesWritable key = rr.createKey();
249      Result value = rr.createValue();
250      while (rr.next(key, value)) {
251        verifyRowFromMap(key, value);
252        rowTracker.addRow(key.copyBytes());
253      }
254
255      rr.close();
256    }
257
258    // validate all rows are seen
259    rowTracker.validate();
260  }
261
262  @Override
263  protected void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName,
264      String snapshotName, Path tableDir, int numRegions, int numSplitsPerRegion, int expectedNumSplits,
265      boolean shutdownCluster) throws Exception {
266    doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir,
267      numRegions, numSplitsPerRegion, expectedNumSplits, shutdownCluster);
268  }
269
270  // this is also called by the IntegrationTestTableSnapshotInputFormat
271  public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName,
272      String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions,
273      int numSplitsPerRegion,int expectedNumSplits, boolean shutdownCluster) throws Exception {
274
275    //create the table and snapshot
276    createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions);
277
278    if (shutdownCluster) {
279      util.shutdownMiniHBaseCluster();
280    }
281
282    try {
283      // create the job
284      JobConf jobConf = new JobConf(util.getConfiguration());
285
286      jobConf.setJarByClass(util.getClass());
287      org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJarsForClasses(jobConf,
288        TestTableSnapshotInputFormat.class);
289
290      if(numSplitsPerRegion > 1) {
291        TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS,
292                TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
293                NullWritable.class, jobConf, true, tableDir, new RegionSplitter.UniformSplit(),
294                numSplitsPerRegion);
295      } else {
296        TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS,
297                TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
298                NullWritable.class, jobConf, true, tableDir);
299      }
300
301      jobConf.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class);
302      jobConf.setNumReduceTasks(1);
303      jobConf.setOutputFormat(NullOutputFormat.class);
304
305      RunningJob job = JobClient.runJob(jobConf);
306      Assert.assertTrue(job.isSuccessful());
307    } finally {
308      if (!shutdownCluster) {
309        util.getAdmin().deleteSnapshot(snapshotName);
310        util.deleteTable(tableName);
311      }
312    }
313  }
314
315  @Ignore // Ignored in mapred package because it keeps failing but allowed in mapreduce package.
316  @Test
317  public void testWithMapReduceMultipleMappersPerRegion() throws Exception {
318    testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 5, 50, false);
319  }
320}