001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION;
021import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT;
022import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT;
023import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY;
024import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT;
025import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE;
026import static org.junit.Assert.assertEquals;
027import static org.mockito.Mockito.mock;
028import static org.mockito.Mockito.when;
029
030import java.io.IOException;
031import java.util.Arrays;
032import java.util.List;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseTestingUtility;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.HDFSBlocksDistribution;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.client.Admin;
042import org.apache.hadoop.hbase.client.Result;
043import org.apache.hadoop.hbase.client.Scan;
044import org.apache.hadoop.hbase.client.Scan.ReadType;
045import org.apache.hadoop.hbase.client.Table;
046import org.apache.hadoop.hbase.client.TestTableSnapshotScanner;
047import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
048import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit;
049import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
050import org.apache.hadoop.hbase.testclassification.LargeTests;
051import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.util.CommonFSUtils;
054import org.apache.hadoop.hbase.util.RegionSplitter;
055import org.apache.hadoop.io.NullWritable;
056import org.apache.hadoop.mapreduce.InputSplit;
057import org.apache.hadoop.mapreduce.Job;
058import org.apache.hadoop.mapreduce.RecordReader;
059import org.apache.hadoop.mapreduce.Reducer;
060import org.apache.hadoop.mapreduce.TaskAttemptContext;
061import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
062import org.junit.Assert;
063import org.junit.ClassRule;
064import org.junit.Rule;
065import org.junit.Test;
066import org.junit.experimental.categories.Category;
067import org.junit.rules.TestName;
068import org.slf4j.Logger;
069import org.slf4j.LoggerFactory;
070
071import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
072
073@Category({ VerySlowMapReduceTests.class, LargeTests.class })
074public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBase {
075
076  @ClassRule
077  public static final HBaseClassTestRule CLASS_RULE =
078    HBaseClassTestRule.forClass(TestTableSnapshotInputFormat.class);
079
080  private static final Logger LOG = LoggerFactory.getLogger(TestTableSnapshotInputFormat.class);
081
082  private static final byte[] bbb = Bytes.toBytes("bbb");
083  private static final byte[] yyy = Bytes.toBytes("yyy");
084  private static final byte[] bbc = Bytes.toBytes("bbc");
085  private static final byte[] yya = Bytes.toBytes("yya");
086
087  @Rule
088  public TestName name = new TestName();
089
090  @Override
091  protected byte[] getStartRow() {
092    return bbb;
093  }
094
095  @Override
096  protected byte[] getEndRow() {
097    return yyy;
098  }
099
100  @Test
101  public void testGetBestLocations() throws IOException {
102    TableSnapshotInputFormatImpl tsif = new TableSnapshotInputFormatImpl();
103    Configuration conf = UTIL.getConfiguration();
104
105    HDFSBlocksDistribution blockDistribution = new HDFSBlocksDistribution();
106    Assert.assertEquals(null,
107      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
108
109    blockDistribution.addHostsAndBlockWeight(new String[] { "h1" }, 1);
110    Assert.assertEquals(Lists.newArrayList("h1"),
111      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
112
113    blockDistribution.addHostsAndBlockWeight(new String[] { "h1" }, 1);
114    Assert.assertEquals(Lists.newArrayList("h1"),
115      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
116
117    blockDistribution.addHostsAndBlockWeight(new String[] { "h2" }, 1);
118    Assert.assertEquals(Lists.newArrayList("h1"),
119      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
120
121    blockDistribution = new HDFSBlocksDistribution();
122    blockDistribution.addHostsAndBlockWeight(new String[] { "h1" }, 10);
123    blockDistribution.addHostsAndBlockWeight(new String[] { "h2" }, 7);
124    blockDistribution.addHostsAndBlockWeight(new String[] { "h3" }, 5);
125    blockDistribution.addHostsAndBlockWeight(new String[] { "h4" }, 1);
126    Assert.assertEquals(Lists.newArrayList("h1"),
127      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
128
129    blockDistribution.addHostsAndBlockWeight(new String[] { "h2" }, 2);
130    Assert.assertEquals(Lists.newArrayList("h1", "h2"),
131      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
132
133    blockDistribution.addHostsAndBlockWeight(new String[] { "h2" }, 3);
134    Assert.assertEquals(Lists.newArrayList("h2", "h1"),
135      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
136
137    blockDistribution.addHostsAndBlockWeight(new String[] { "h3" }, 6);
138    blockDistribution.addHostsAndBlockWeight(new String[] { "h4" }, 9);
139
140    Assert.assertEquals(Lists.newArrayList("h2", "h3", "h4"),
141      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
142  }
143
144  public static enum TestTableSnapshotCounters {
145    VALIDATION_ERROR
146  }
147
148  public static class TestTableSnapshotMapper
149    extends TableMapper<ImmutableBytesWritable, NullWritable> {
150    @Override
151    protected void map(ImmutableBytesWritable key, Result value, Context context)
152      throws IOException, InterruptedException {
153      // Validate a single row coming from the snapshot, and emit the row key
154      verifyRowFromMap(key, value);
155      context.write(key, NullWritable.get());
156    }
157  }
158
159  public static class TestTableSnapshotReducer
160    extends Reducer<ImmutableBytesWritable, NullWritable, NullWritable, NullWritable> {
161    HBaseTestingUtility.SeenRowTracker rowTracker =
162      new HBaseTestingUtility.SeenRowTracker(bbb, yyy);
163
164    @Override
165    protected void reduce(ImmutableBytesWritable key, Iterable<NullWritable> values,
166      Context context) throws IOException, InterruptedException {
167      rowTracker.addRow(key.get());
168    }
169
170    @Override
171    protected void cleanup(Context context) throws IOException, InterruptedException {
172      rowTracker.validate();
173    }
174  }
175
176  @Test
177  public void testInitTableSnapshotMapperJobConfig() throws Exception {
178    final TableName tableName = TableName.valueOf(name.getMethodName());
179    String snapshotName = "foo";
180
181    try {
182      createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1);
183      Job job = new Job(UTIL.getConfiguration());
184      Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
185
186      TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, new Scan(),
187        TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
188        tmpTableDir);
189
190      // TODO: would be better to examine directly the cache instance that results from this
191      // config. Currently this is not possible because BlockCache initialization is static.
192      Assert.assertEquals("Snapshot job should be configured for default LruBlockCache.",
193        HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT,
194        job.getConfiguration().getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, -1), 0.01);
195      Assert.assertEquals("Snapshot job should not use BucketCache.", 0,
196        job.getConfiguration().getFloat("hbase.bucketcache.size", -1), 0.01);
197    } finally {
198      UTIL.getAdmin().deleteSnapshot(snapshotName);
199      UTIL.deleteTable(tableName);
200    }
201  }
202
203  @Test
204  public void testWithMockedMapReduceSingleRegionByRegionLocation() throws Exception {
205    Configuration conf = UTIL.getConfiguration();
206    conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION, true);
207    try {
208      testWithMockedMapReduce(UTIL, name.getMethodName() + "Snapshot", 1, 1, 1, true);
209    } finally {
210      conf.unset(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION);
211    }
212  }
213
214  @Override
215  public void testRestoreSnapshotDoesNotCreateBackRefLinksInit(TableName tableName,
216    String snapshotName, Path tmpTableDir) throws Exception {
217    Job job = new Job(UTIL.getConfiguration());
218    TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, new Scan(),
219      TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
220      tmpTableDir);
221  }
222
223  @Override
224  public void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName, int numRegions,
225    int numSplitsPerRegion, int expectedNumSplits, boolean setLocalityEnabledTo) throws Exception {
226    final TableName tableName = TableName.valueOf(name.getMethodName());
227    try {
228      createTableAndSnapshot(util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions);
229
230      Configuration conf = util.getConfiguration();
231      conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, setLocalityEnabledTo);
232      conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION,
233        SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT);
234      Job job = new Job(conf);
235      Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName);
236      Scan scan = new Scan(getStartRow(), getEndRow()); // limit the scan
237
238      if (numSplitsPerRegion > 1) {
239        TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
240          TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job,
241          false, tmpTableDir, new RegionSplitter.UniformSplit(), numSplitsPerRegion);
242      } else {
243        TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
244          TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job,
245          false, tmpTableDir);
246      }
247
248      verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow());
249
250    } finally {
251      util.getAdmin().deleteSnapshot(snapshotName);
252      util.deleteTable(tableName);
253    }
254  }
255
256  @Test
257  public void testWithMockedMapReduceWithSplitsPerRegion() throws Exception {
258    String snapshotName = "testWithMockedMapReduceMultiRegion";
259    final TableName tableName = TableName.valueOf(name.getMethodName());
260    try {
261      createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 10);
262
263      Configuration conf = UTIL.getConfiguration();
264      conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, false);
265      Job job = new Job(conf);
266      Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
267      // test scan with startRow and stopRow
268      Scan scan = new Scan(bbc, yya);
269
270      TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
271        TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
272        tmpTableDir, new RegionSplitter.UniformSplit(), 5);
273
274      verifyWithMockedMapReduce(job, 10, 40, bbc, yya);
275    } finally {
276      UTIL.getAdmin().deleteSnapshot(snapshotName);
277      UTIL.deleteTable(tableName);
278    }
279  }
280
281  @Test
282  public void testWithMockedMapReduceWithNoStartRowStopRow() throws Exception {
283    String snapshotName = "testWithMockedMapReduceMultiRegion";
284    final TableName tableName = TableName.valueOf(name.getMethodName());
285    try {
286      createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 10);
287
288      Configuration conf = UTIL.getConfiguration();
289      conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, false);
290      Job job = new Job(conf);
291      Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
292      // test scan without startRow and stopRow
293      Scan scan2 = new Scan();
294
295      TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan2,
296        TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
297        tmpTableDir, new RegionSplitter.UniformSplit(), 5);
298
299      verifyWithMockedMapReduce(job, 10, 50, HConstants.EMPTY_START_ROW,
300        HConstants.EMPTY_START_ROW);
301
302    } finally {
303      UTIL.getAdmin().deleteSnapshot(snapshotName);
304      UTIL.deleteTable(tableName);
305    }
306  }
307
308  @Test
309  public void testScanLimit() throws Exception {
310    final TableName tableName = TableName.valueOf(name.getMethodName());
311    final String snapshotName = tableName + "Snapshot";
312    Table table = null;
313    try {
314      UTIL.getConfiguration().setInt(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT, 10);
315      if (UTIL.getAdmin().tableExists(tableName)) {
316        UTIL.deleteTable(tableName);
317      }
318
319      UTIL.createTable(tableName, FAMILIES, new byte[][] { bbb, yyy });
320
321      Admin admin = UTIL.getAdmin();
322
323      int regionNum = admin.getRegions(tableName).size();
324      // put some stuff in the table
325      table = UTIL.getConnection().getTable(tableName);
326      UTIL.loadTable(table, FAMILIES);
327
328      Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration());
329      FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration());
330
331      SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES),
332        null, snapshotName, rootDir, fs, true);
333
334      Job job = new Job(UTIL.getConfiguration());
335      Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
336      Scan scan = new Scan();
337      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
338        TestTableSnapshotInputFormat.class);
339
340      TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
341        RowCounter.RowCounterMapper.class, NullWritable.class, NullWritable.class, job, true,
342        tmpTableDir);
343      Assert.assertTrue(job.waitForCompletion(true));
344      Assert.assertEquals(10 * regionNum,
345        job.getCounters().findCounter(RowCounter.RowCounterMapper.Counters.ROWS).getValue());
346    } finally {
347      if (table != null) {
348        table.close();
349      }
350      UTIL.getConfiguration().unset(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT);
351      UTIL.getAdmin().deleteSnapshot(snapshotName);
352      UTIL.deleteTable(tableName);
353    }
354  }
355
356  @Test
357  public void testNoDuplicateResultsWhenSplitting() throws Exception {
358    TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting");
359    String snapshotName = "testSnapshotBug";
360    try {
361      if (UTIL.getAdmin().tableExists(tableName)) {
362        UTIL.deleteTable(tableName);
363      }
364
365      UTIL.createTable(tableName, FAMILIES);
366      Admin admin = UTIL.getAdmin();
367
368      // put some stuff in the table
369      Table table = UTIL.getConnection().getTable(tableName);
370      UTIL.loadTable(table, FAMILIES);
371
372      // split to 2 regions
373      admin.split(tableName, Bytes.toBytes("eee"));
374      TestTableSnapshotScanner.blockUntilSplitFinished(UTIL, tableName, 2);
375
376      Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration());
377      FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration());
378
379      SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES),
380        null, snapshotName, rootDir, fs, true);
381
382      // load different values
383      byte[] value = Bytes.toBytes("after_snapshot_value");
384      UTIL.loadTable(table, FAMILIES, value);
385
386      // cause flush to create new files in the region
387      admin.flush(tableName);
388      table.close();
389
390      Job job = new Job(UTIL.getConfiguration());
391      Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
392      // limit the scan
393      Scan scan = new Scan().withStartRow(getStartRow()).withStopRow(getEndRow());
394
395      TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
396        TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
397        tmpTableDir);
398
399      verifyWithMockedMapReduce(job, 2, 2, getStartRow(), getEndRow());
400    } finally {
401      UTIL.getAdmin().deleteSnapshot(snapshotName);
402      UTIL.deleteTable(tableName);
403    }
404  }
405
406  @Test
407  public void testScannerReadTypeConfiguration() throws IOException {
408    Configuration conf = new Configuration(false);
409    // Explicitly set ReadTypes should persist
410    for (ReadType readType : Arrays.asList(ReadType.PREAD, ReadType.STREAM)) {
411      Scan scanWithReadType = new Scan();
412      scanWithReadType.setReadType(readType);
413      assertEquals(scanWithReadType.getReadType(),
414        serializeAndReturn(conf, scanWithReadType).getReadType());
415    }
416    // We should only see the DEFAULT ReadType getting updated to STREAM.
417    Scan scanWithoutReadType = new Scan();
418    assertEquals(ReadType.DEFAULT, scanWithoutReadType.getReadType());
419    assertEquals(ReadType.STREAM, serializeAndReturn(conf, scanWithoutReadType).getReadType());
420
421    // We should still be able to force a certain ReadType when DEFAULT is given.
422    conf.setEnum(SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE, ReadType.PREAD);
423    assertEquals(ReadType.DEFAULT, scanWithoutReadType.getReadType());
424    assertEquals(ReadType.PREAD, serializeAndReturn(conf, scanWithoutReadType).getReadType());
425  }
426
427  /**
428   * Serializes and deserializes the given scan in the same manner that TableSnapshotInputFormat
429   * does.
430   */
431  private Scan serializeAndReturn(Configuration conf, Scan s) throws IOException {
432    conf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(s));
433    return TableSnapshotInputFormatImpl.extractScanFromConf(conf);
434  }
435
436  private void verifyWithMockedMapReduce(Job job, int numRegions, int expectedNumSplits,
437    byte[] startRow, byte[] stopRow) throws IOException, InterruptedException {
438    TableSnapshotInputFormat tsif = new TableSnapshotInputFormat();
439    List<InputSplit> splits = tsif.getSplits(job);
440
441    Assert.assertEquals(expectedNumSplits, splits.size());
442
443    HBaseTestingUtility.SeenRowTracker rowTracker = new HBaseTestingUtility.SeenRowTracker(startRow,
444      stopRow.length > 0 ? stopRow : Bytes.toBytes("\uffff"));
445
446    boolean localityEnabled = job.getConfiguration().getBoolean(
447      SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT);
448
449    boolean byRegionLoc =
450      job.getConfiguration().getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION,
451        SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT);
452    for (int i = 0; i < splits.size(); i++) {
453      // validate input split
454      InputSplit split = splits.get(i);
455      Assert.assertTrue(split instanceof TableSnapshotRegionSplit);
456      TableSnapshotRegionSplit snapshotRegionSplit = (TableSnapshotRegionSplit) split;
457      if (localityEnabled) {
458        Assert.assertTrue(split.getLocations() != null && split.getLocations().length != 0);
459        if (byRegionLoc) {
460          // When it uses region location from meta, the hostname will be "localhost",
461          // the location from hdfs block location is "127.0.0.1".
462          Assert.assertEquals(1, split.getLocations().length);
463          Assert.assertTrue("Not using region location!",
464            split.getLocations()[0].equals("localhost"));
465        } else {
466          Assert.assertTrue("Not using region location!",
467            split.getLocations()[0].equals("127.0.0.1"));
468        }
469      } else {
470        Assert.assertTrue(split.getLocations() != null && split.getLocations().length == 0);
471      }
472
473      Scan scan =
474        TableMapReduceUtil.convertStringToScan(snapshotRegionSplit.getDelegate().getScan());
475      if (startRow.length > 0) {
476        Assert.assertTrue(
477          Bytes.toStringBinary(startRow) + " should <= " + Bytes.toStringBinary(scan.getStartRow()),
478          Bytes.compareTo(startRow, scan.getStartRow()) <= 0);
479      }
480      if (stopRow.length > 0) {
481        Assert.assertTrue(
482          Bytes.toStringBinary(stopRow) + " should >= " + Bytes.toStringBinary(scan.getStopRow()),
483          Bytes.compareTo(stopRow, scan.getStopRow()) >= 0);
484      }
485      Assert.assertTrue("startRow should < stopRow",
486        Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) < 0);
487
488      // validate record reader
489      TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class);
490      when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration());
491      RecordReader<ImmutableBytesWritable, Result> rr =
492        tsif.createRecordReader(split, taskAttemptContext);
493      rr.initialize(split, taskAttemptContext);
494
495      // validate we can read all the data back
496      while (rr.nextKeyValue()) {
497        byte[] row = rr.getCurrentKey().get();
498        verifyRowFromMap(rr.getCurrentKey(), rr.getCurrentValue());
499        rowTracker.addRow(row);
500      }
501
502      rr.close();
503    }
504
505    // validate all rows are seen
506    rowTracker.validate();
507  }
508
509  @Override
510  protected void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName,
511    String snapshotName, Path tableDir, int numRegions, int numSplitsPerRegion,
512    int expectedNumSplits, boolean shutdownCluster) throws Exception {
513    doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir,
514      numRegions, numSplitsPerRegion, expectedNumSplits, shutdownCluster);
515  }
516
517  // this is also called by the IntegrationTestTableSnapshotInputFormat
518  public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName,
519    String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions,
520    int numSplitsPerRegion, int expectedNumSplits, boolean shutdownCluster) throws Exception {
521
522    LOG.info("testing with MapReduce");
523
524    LOG.info("create the table and snapshot");
525    createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions);
526
527    if (shutdownCluster) {
528      LOG.info("shutting down hbase cluster.");
529      util.shutdownMiniHBaseCluster();
530    }
531
532    try {
533      // create the job
534      Job job = new Job(util.getConfiguration());
535      Scan scan = new Scan(startRow, endRow); // limit the scan
536
537      job.setJarByClass(util.getClass());
538      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
539        TestTableSnapshotInputFormat.class);
540
541      if (numSplitsPerRegion > 1) {
542        TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
543          TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job,
544          true, tableDir, new RegionSplitter.UniformSplit(), numSplitsPerRegion);
545      } else {
546        TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
547          TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job,
548          true, tableDir);
549      }
550
551      job.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class);
552      job.setNumReduceTasks(1);
553      job.setOutputFormatClass(NullOutputFormat.class);
554
555      Assert.assertTrue(job.waitForCompletion(true));
556    } finally {
557      if (!shutdownCluster) {
558        util.getAdmin().deleteSnapshot(snapshotName);
559        util.deleteTable(tableName);
560      }
561    }
562  }
563
564  @Test
565  public void testWithMapReduceMultipleMappersPerRegion() throws Exception {
566    testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 5, 50, false);
567  }
568
569  @Test
570  public void testCleanRestoreDir() throws Exception {
571    TableName tableName = TableName.valueOf("test_table");
572    String snapshotName = "test_snapshot";
573    createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1);
574    Job job = Job.getInstance(UTIL.getConfiguration());
575    Path workingDir = UTIL.getDataTestDirOnTestFS(snapshotName);
576    TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, new Scan(),
577      TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
578      workingDir);
579    FileSystem fs = workingDir.getFileSystem(job.getConfiguration());
580    Path restorePath =
581      new Path(job.getConfiguration().get("hbase.TableSnapshotInputFormat.restore.dir"));
582    Assert.assertTrue(fs.exists(restorePath));
583    TableSnapshotInputFormat.cleanRestoreDir(job, snapshotName);
584    Assert.assertFalse(fs.exists(restorePath));
585  }
586}