001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION;
021import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT;
022import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT;
023import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY;
024import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT;
025import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE;
026import static org.junit.Assert.assertEquals;
027import static org.mockito.Mockito.mock;
028import static org.mockito.Mockito.when;
029
030import java.io.IOException;
031import java.util.Arrays;
032import java.util.List;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseTestingUtil;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.HDFSBlocksDistribution;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.client.Admin;
042import org.apache.hadoop.hbase.client.Result;
043import org.apache.hadoop.hbase.client.Scan;
044import org.apache.hadoop.hbase.client.Scan.ReadType;
045import org.apache.hadoop.hbase.client.Table;
046import org.apache.hadoop.hbase.client.TestTableSnapshotScanner;
047import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
048import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit;
049import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
050import org.apache.hadoop.hbase.testclassification.LargeTests;
051import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.util.CommonFSUtils;
054import org.apache.hadoop.hbase.util.RegionSplitter;
055import org.apache.hadoop.io.NullWritable;
056import org.apache.hadoop.mapreduce.InputSplit;
057import org.apache.hadoop.mapreduce.Job;
058import org.apache.hadoop.mapreduce.RecordReader;
059import org.apache.hadoop.mapreduce.Reducer;
060import org.apache.hadoop.mapreduce.TaskAttemptContext;
061import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
062import org.junit.Assert;
063import org.junit.ClassRule;
064import org.junit.Rule;
065import org.junit.Test;
066import org.junit.experimental.categories.Category;
067import org.junit.rules.TestName;
068import org.slf4j.Logger;
069import org.slf4j.LoggerFactory;
070
071import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
072
073@Category({ VerySlowMapReduceTests.class, LargeTests.class })
074public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBase {
075
076  @ClassRule
077  public static final HBaseClassTestRule CLASS_RULE =
078    HBaseClassTestRule.forClass(TestTableSnapshotInputFormat.class);
079
080  private static final Logger LOG = LoggerFactory.getLogger(TestTableSnapshotInputFormat.class);
081
082  private static final byte[] bbb = Bytes.toBytes("bbb");
083  private static final byte[] yyy = Bytes.toBytes("yyy");
084  private static final byte[] bbc = Bytes.toBytes("bbc");
085  private static final byte[] yya = Bytes.toBytes("yya");
086
087  @Rule
088  public TestName name = new TestName();
089
090  @Override
091  protected byte[] getStartRow() {
092    return bbb;
093  }
094
095  @Override
096  protected byte[] getEndRow() {
097    return yyy;
098  }
099
100  @Test
101  public void testGetBestLocations() throws IOException {
102    TableSnapshotInputFormatImpl tsif = new TableSnapshotInputFormatImpl();
103    Configuration conf = UTIL.getConfiguration();
104
105    HDFSBlocksDistribution blockDistribution = new HDFSBlocksDistribution();
106    Assert.assertEquals(null,
107      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
108
109    blockDistribution.addHostsAndBlockWeight(new String[] { "h1" }, 1);
110    Assert.assertEquals(Lists.newArrayList("h1"),
111      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
112
113    blockDistribution.addHostsAndBlockWeight(new String[] { "h1" }, 1);
114    Assert.assertEquals(Lists.newArrayList("h1"),
115      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
116
117    blockDistribution.addHostsAndBlockWeight(new String[] { "h2" }, 1);
118    Assert.assertEquals(Lists.newArrayList("h1"),
119      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
120
121    blockDistribution = new HDFSBlocksDistribution();
122    blockDistribution.addHostsAndBlockWeight(new String[] { "h1" }, 10);
123    blockDistribution.addHostsAndBlockWeight(new String[] { "h2" }, 7);
124    blockDistribution.addHostsAndBlockWeight(new String[] { "h3" }, 5);
125    blockDistribution.addHostsAndBlockWeight(new String[] { "h4" }, 1);
126    Assert.assertEquals(Lists.newArrayList("h1"),
127      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
128
129    blockDistribution.addHostsAndBlockWeight(new String[] { "h2" }, 2);
130    Assert.assertEquals(Lists.newArrayList("h1", "h2"),
131      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
132
133    blockDistribution.addHostsAndBlockWeight(new String[] { "h2" }, 3);
134    Assert.assertEquals(Lists.newArrayList("h2", "h1"),
135      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
136
137    blockDistribution.addHostsAndBlockWeight(new String[] { "h3" }, 6);
138    blockDistribution.addHostsAndBlockWeight(new String[] { "h4" }, 9);
139
140    Assert.assertEquals(Lists.newArrayList("h2", "h3", "h4"),
141      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
142  }
143
144  public static enum TestTableSnapshotCounters {
145    VALIDATION_ERROR
146  }
147
148  public static class TestTableSnapshotMapper
149    extends TableMapper<ImmutableBytesWritable, NullWritable> {
150    @Override
151    protected void map(ImmutableBytesWritable key, Result value, Context context)
152      throws IOException, InterruptedException {
153      // Validate a single row coming from the snapshot, and emit the row key
154      verifyRowFromMap(key, value);
155      context.write(key, NullWritable.get());
156    }
157  }
158
159  public static class TestTableSnapshotReducer
160    extends Reducer<ImmutableBytesWritable, NullWritable, NullWritable, NullWritable> {
161    HBaseTestingUtil.SeenRowTracker rowTracker = new HBaseTestingUtil.SeenRowTracker(bbb, yyy);
162
163    @Override
164    protected void reduce(ImmutableBytesWritable key, Iterable<NullWritable> values,
165      Context context) throws IOException, InterruptedException {
166      rowTracker.addRow(key.get());
167    }
168
169    @Override
170    protected void cleanup(Context context) throws IOException, InterruptedException {
171      rowTracker.validate();
172    }
173  }
174
175  @Test
176  public void testInitTableSnapshotMapperJobConfig() throws Exception {
177    final TableName tableName = TableName.valueOf(name.getMethodName());
178    String snapshotName = "foo";
179
180    try {
181      createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1);
182      Job job = new Job(UTIL.getConfiguration());
183      Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
184
185      TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, new Scan(),
186        TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
187        tmpTableDir);
188
189      // TODO: would be better to examine directly the cache instance that results from this
190      // config. Currently this is not possible because BlockCache initialization is static.
191      Assert.assertEquals("Snapshot job should be configured for default LruBlockCache.",
192        HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT,
193        job.getConfiguration().getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, -1), 0.01);
194      Assert.assertEquals("Snapshot job should not use BucketCache.", 0,
195        job.getConfiguration().getFloat("hbase.bucketcache.size", -1), 0.01);
196    } finally {
197      UTIL.getAdmin().deleteSnapshot(snapshotName);
198      UTIL.deleteTable(tableName);
199    }
200  }
201
202  @Test
203  public void testWithMockedMapReduceSingleRegionByRegionLocation() throws Exception {
204    Configuration conf = UTIL.getConfiguration();
205    conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION, true);
206    try {
207      testWithMockedMapReduce(UTIL, name.getMethodName() + "Snapshot", 1, 1, 1, true);
208    } finally {
209      conf.unset(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION);
210    }
211  }
212
213  @Override
214  public void testRestoreSnapshotDoesNotCreateBackRefLinksInit(TableName tableName,
215    String snapshotName, Path tmpTableDir) throws Exception {
216    Job job = new Job(UTIL.getConfiguration());
217    TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, new Scan(),
218      TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
219      tmpTableDir);
220  }
221
222  @Override
223  public void testWithMockedMapReduce(HBaseTestingUtil util, String snapshotName, int numRegions,
224    int numSplitsPerRegion, int expectedNumSplits, boolean setLocalityEnabledTo) throws Exception {
225    final TableName tableName = TableName.valueOf(name.getMethodName());
226    try {
227      createTableAndSnapshot(util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions);
228
229      Configuration conf = util.getConfiguration();
230      conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, setLocalityEnabledTo);
231      conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION,
232        SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT);
233      Job job = new Job(conf);
234      Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName);
235      Scan scan = new Scan().withStartRow(getStartRow()).withStopRow(getEndRow()); // limit the scan
236
237      if (numSplitsPerRegion > 1) {
238        TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
239          TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job,
240          false, tmpTableDir, new RegionSplitter.UniformSplit(), numSplitsPerRegion);
241      } else {
242        TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
243          TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job,
244          false, tmpTableDir);
245      }
246
247      verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow());
248
249    } finally {
250      util.getAdmin().deleteSnapshot(snapshotName);
251      util.deleteTable(tableName);
252    }
253  }
254
255  @Test
256  public void testWithMockedMapReduceWithSplitsPerRegion() throws Exception {
257    String snapshotName = "testWithMockedMapReduceMultiRegion";
258    final TableName tableName = TableName.valueOf(name.getMethodName());
259    try {
260      createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 10);
261
262      Configuration conf = UTIL.getConfiguration();
263      conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, false);
264      Job job = new Job(conf);
265      Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
266      // test scan with startRow and stopRow
267      Scan scan = new Scan().withStartRow(bbc).withStopRow(yya);
268
269      TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
270        TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
271        tmpTableDir, new RegionSplitter.UniformSplit(), 5);
272
273      verifyWithMockedMapReduce(job, 10, 40, bbc, yya);
274    } finally {
275      UTIL.getAdmin().deleteSnapshot(snapshotName);
276      UTIL.deleteTable(tableName);
277    }
278  }
279
280  @Test
281  public void testWithMockedMapReduceWithNoStartRowStopRow() throws Exception {
282    String snapshotName = "testWithMockedMapReduceMultiRegion";
283    final TableName tableName = TableName.valueOf(name.getMethodName());
284    try {
285      createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 10);
286
287      Configuration conf = UTIL.getConfiguration();
288      conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, false);
289      Job job = new Job(conf);
290      Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
291      // test scan without startRow and stopRow
292      Scan scan2 = new Scan();
293
294      TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan2,
295        TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
296        tmpTableDir, new RegionSplitter.UniformSplit(), 5);
297
298      verifyWithMockedMapReduce(job, 10, 50, HConstants.EMPTY_START_ROW,
299        HConstants.EMPTY_START_ROW);
300
301    } finally {
302      UTIL.getAdmin().deleteSnapshot(snapshotName);
303      UTIL.deleteTable(tableName);
304    }
305  }
306
307  @Test
308  public void testScanLimit() throws Exception {
309    final TableName tableName = TableName.valueOf(name.getMethodName());
310    final String snapshotName = tableName + "Snapshot";
311    Table table = null;
312    try {
313      UTIL.getConfiguration().setInt(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT, 10);
314      if (UTIL.getAdmin().tableExists(tableName)) {
315        UTIL.deleteTable(tableName);
316      }
317
318      UTIL.createTable(tableName, FAMILIES, new byte[][] { bbb, yyy });
319
320      Admin admin = UTIL.getAdmin();
321
322      int regionNum = admin.getRegions(tableName).size();
323      // put some stuff in the table
324      table = UTIL.getConnection().getTable(tableName);
325      UTIL.loadTable(table, FAMILIES);
326
327      Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration());
328      FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration());
329
330      SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES),
331        null, snapshotName, rootDir, fs, true);
332
333      Job job = new Job(UTIL.getConfiguration());
334      Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
335      Scan scan = new Scan();
336      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
337        TestTableSnapshotInputFormat.class);
338
339      TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
340        RowCounter.RowCounterMapper.class, NullWritable.class, NullWritable.class, job, true,
341        tmpTableDir);
342      Assert.assertTrue(job.waitForCompletion(true));
343      Assert.assertEquals(10 * regionNum,
344        job.getCounters().findCounter(RowCounter.RowCounterMapper.Counters.ROWS).getValue());
345    } finally {
346      if (table != null) {
347        table.close();
348      }
349      UTIL.getConfiguration().unset(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT);
350      UTIL.getAdmin().deleteSnapshot(snapshotName);
351      UTIL.deleteTable(tableName);
352    }
353  }
354
355  @Test
356  public void testNoDuplicateResultsWhenSplitting() throws Exception {
357    TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting");
358    String snapshotName = "testSnapshotBug";
359    try {
360      if (UTIL.getAdmin().tableExists(tableName)) {
361        UTIL.deleteTable(tableName);
362      }
363
364      UTIL.createTable(tableName, FAMILIES);
365      Admin admin = UTIL.getAdmin();
366
367      // put some stuff in the table
368      Table table = UTIL.getConnection().getTable(tableName);
369      UTIL.loadTable(table, FAMILIES);
370
371      // split to 2 regions
372      admin.split(tableName, Bytes.toBytes("eee"));
373      TestTableSnapshotScanner.blockUntilSplitFinished(UTIL, tableName, 2);
374
375      Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration());
376      FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration());
377
378      SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES),
379        null, snapshotName, rootDir, fs, true);
380
381      // load different values
382      byte[] value = Bytes.toBytes("after_snapshot_value");
383      UTIL.loadTable(table, FAMILIES, value);
384
385      // cause flush to create new files in the region
386      admin.flush(tableName);
387      table.close();
388
389      Job job = new Job(UTIL.getConfiguration());
390      Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
391      // limit the scan
392      Scan scan = new Scan().withStartRow(getStartRow()).withStopRow(getEndRow());
393
394      TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
395        TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
396        tmpTableDir);
397
398      verifyWithMockedMapReduce(job, 2, 2, getStartRow(), getEndRow());
399    } finally {
400      UTIL.getAdmin().deleteSnapshot(snapshotName);
401      UTIL.deleteTable(tableName);
402    }
403  }
404
405  @Test
406  public void testScannerReadTypeConfiguration() throws IOException {
407    Configuration conf = new Configuration(false);
408    // Explicitly set ReadTypes should persist
409    for (ReadType readType : Arrays.asList(ReadType.PREAD, ReadType.STREAM)) {
410      Scan scanWithReadType = new Scan();
411      scanWithReadType.setReadType(readType);
412      assertEquals(scanWithReadType.getReadType(),
413        serializeAndReturn(conf, scanWithReadType).getReadType());
414    }
415    // We should only see the DEFAULT ReadType getting updated to STREAM.
416    Scan scanWithoutReadType = new Scan();
417    assertEquals(ReadType.DEFAULT, scanWithoutReadType.getReadType());
418    assertEquals(ReadType.STREAM, serializeAndReturn(conf, scanWithoutReadType).getReadType());
419
420    // We should still be able to force a certain ReadType when DEFAULT is given.
421    conf.setEnum(SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE, ReadType.PREAD);
422    assertEquals(ReadType.DEFAULT, scanWithoutReadType.getReadType());
423    assertEquals(ReadType.PREAD, serializeAndReturn(conf, scanWithoutReadType).getReadType());
424  }
425
426  /**
427   * Serializes and deserializes the given scan in the same manner that TableSnapshotInputFormat
428   * does.
429   */
430  private Scan serializeAndReturn(Configuration conf, Scan s) throws IOException {
431    conf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(s));
432    return TableSnapshotInputFormatImpl.extractScanFromConf(conf);
433  }
434
435  private void verifyWithMockedMapReduce(Job job, int numRegions, int expectedNumSplits,
436    byte[] startRow, byte[] stopRow) throws IOException, InterruptedException {
437    TableSnapshotInputFormat tsif = new TableSnapshotInputFormat();
438    List<InputSplit> splits = tsif.getSplits(job);
439
440    Assert.assertEquals(expectedNumSplits, splits.size());
441
442    HBaseTestingUtil.SeenRowTracker rowTracker = new HBaseTestingUtil.SeenRowTracker(startRow,
443      stopRow.length > 0 ? stopRow : Bytes.toBytes("\uffff"));
444
445    boolean localityEnabled = job.getConfiguration().getBoolean(
446      SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT);
447
448    boolean byRegionLoc =
449      job.getConfiguration().getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION,
450        SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT);
451    for (int i = 0; i < splits.size(); i++) {
452      // validate input split
453      InputSplit split = splits.get(i);
454      Assert.assertTrue(split instanceof TableSnapshotRegionSplit);
455      TableSnapshotRegionSplit snapshotRegionSplit = (TableSnapshotRegionSplit) split;
456      if (localityEnabled) {
457        Assert.assertTrue(split.getLocations() != null && split.getLocations().length != 0);
458        if (byRegionLoc) {
459          // When it uses region location from meta, the hostname will be "localhost",
460          // the location from hdfs block location is "127.0.0.1".
461          Assert.assertEquals(1, split.getLocations().length);
462          Assert.assertTrue("Not using region location!",
463            split.getLocations()[0].equals("localhost"));
464        } else {
465          Assert.assertTrue("Not using region location!",
466            split.getLocations()[0].equals("127.0.0.1"));
467        }
468      } else {
469        Assert.assertTrue(split.getLocations() != null && split.getLocations().length == 0);
470      }
471
472      Scan scan =
473        TableMapReduceUtil.convertStringToScan(snapshotRegionSplit.getDelegate().getScan());
474      if (startRow.length > 0) {
475        Assert.assertTrue(
476          Bytes.toStringBinary(startRow) + " should <= " + Bytes.toStringBinary(scan.getStartRow()),
477          Bytes.compareTo(startRow, scan.getStartRow()) <= 0);
478      }
479      if (stopRow.length > 0) {
480        Assert.assertTrue(
481          Bytes.toStringBinary(stopRow) + " should >= " + Bytes.toStringBinary(scan.getStopRow()),
482          Bytes.compareTo(stopRow, scan.getStopRow()) >= 0);
483      }
484      Assert.assertTrue("startRow should < stopRow",
485        Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) < 0);
486
487      // validate record reader
488      TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class);
489      when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration());
490      RecordReader<ImmutableBytesWritable, Result> rr =
491        tsif.createRecordReader(split, taskAttemptContext);
492      rr.initialize(split, taskAttemptContext);
493
494      // validate we can read all the data back
495      while (rr.nextKeyValue()) {
496        byte[] row = rr.getCurrentKey().get();
497        verifyRowFromMap(rr.getCurrentKey(), rr.getCurrentValue());
498        rowTracker.addRow(row);
499      }
500
501      rr.close();
502    }
503
504    // validate all rows are seen
505    rowTracker.validate();
506  }
507
508  @Override
509  protected void testWithMapReduceImpl(HBaseTestingUtil util, TableName tableName,
510    String snapshotName, Path tableDir, int numRegions, int numSplitsPerRegion,
511    int expectedNumSplits, boolean shutdownCluster) throws Exception {
512    doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir,
513      numRegions, numSplitsPerRegion, expectedNumSplits, shutdownCluster);
514  }
515
516  // this is also called by the IntegrationTestTableSnapshotInputFormat
517  public static void doTestWithMapReduce(HBaseTestingUtil util, TableName tableName,
518    String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions,
519    int numSplitsPerRegion, int expectedNumSplits, boolean shutdownCluster) throws Exception {
520
521    LOG.info("testing with MapReduce");
522
523    LOG.info("create the table and snapshot");
524    createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions);
525
526    if (shutdownCluster) {
527      LOG.info("shutting down hbase cluster.");
528      util.shutdownMiniHBaseCluster();
529    }
530
531    try {
532      // create the job
533      Job job = new Job(util.getConfiguration());
534      Scan scan = new Scan().withStartRow(startRow).withStopRow(endRow); // limit the scan
535
536      job.setJarByClass(util.getClass());
537      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
538        TestTableSnapshotInputFormat.class);
539
540      if (numSplitsPerRegion > 1) {
541        TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
542          TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job,
543          true, tableDir, new RegionSplitter.UniformSplit(), numSplitsPerRegion);
544      } else {
545        TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
546          TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job,
547          true, tableDir);
548      }
549
550      job.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class);
551      job.setNumReduceTasks(1);
552      job.setOutputFormatClass(NullOutputFormat.class);
553
554      Assert.assertTrue(job.waitForCompletion(true));
555    } finally {
556      if (!shutdownCluster) {
557        util.getAdmin().deleteSnapshot(snapshotName);
558        util.deleteTable(tableName);
559      }
560    }
561  }
562
563  @Test
564  public void testWithMapReduceMultipleMappersPerRegion() throws Exception {
565    testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 5, 50, false);
566  }
567
568  @Test
569  public void testCleanRestoreDir() throws Exception {
570    TableName tableName = TableName.valueOf("test_table");
571    String snapshotName = "test_snapshot";
572    createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1);
573    Job job = Job.getInstance(UTIL.getConfiguration());
574    Path workingDir = UTIL.getDataTestDirOnTestFS(snapshotName);
575    TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, new Scan(),
576      TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
577      workingDir);
578    FileSystem fs = workingDir.getFileSystem(job.getConfiguration());
579    Path restorePath =
580      new Path(job.getConfiguration().get("hbase.TableSnapshotInputFormat.restore.dir"));
581    Assert.assertTrue(fs.exists(restorePath));
582    TableSnapshotInputFormat.cleanRestoreDir(job, snapshotName);
583    Assert.assertFalse(fs.exists(restorePath));
584  }
585}