001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT;
021import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY;
022import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT;
023import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION;
024import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT;
025import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE;
026import static org.junit.Assert.assertEquals;
027import static org.mockito.Mockito.mock;
028import static org.mockito.Mockito.when;
029
030import java.io.IOException;
031import java.util.Arrays;
032import java.util.List;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseTestingUtil;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.HDFSBlocksDistribution;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.client.Admin;
042import org.apache.hadoop.hbase.client.Result;
043import org.apache.hadoop.hbase.client.Scan;
044import org.apache.hadoop.hbase.client.Scan.ReadType;
045import org.apache.hadoop.hbase.client.Table;
046import org.apache.hadoop.hbase.client.TestTableSnapshotScanner;
047import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
048import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit;
049import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
050import org.apache.hadoop.hbase.testclassification.LargeTests;
051import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.util.CommonFSUtils;
054import org.apache.hadoop.hbase.util.RegionSplitter;
055import org.apache.hadoop.io.NullWritable;
056import org.apache.hadoop.mapreduce.InputSplit;
057import org.apache.hadoop.mapreduce.Job;
058import org.apache.hadoop.mapreduce.RecordReader;
059import org.apache.hadoop.mapreduce.Reducer;
060import org.apache.hadoop.mapreduce.TaskAttemptContext;
061import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
062import org.junit.Assert;
063import org.junit.ClassRule;
064import org.junit.Rule;
065import org.junit.Test;
066import org.junit.experimental.categories.Category;
067import org.junit.rules.TestName;
068import org.slf4j.Logger;
069import org.slf4j.LoggerFactory;
070
071import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
072
073@Category({VerySlowMapReduceTests.class, LargeTests.class})
074public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBase {
075
076  @ClassRule
077  public static final HBaseClassTestRule CLASS_RULE =
078      HBaseClassTestRule.forClass(TestTableSnapshotInputFormat.class);
079
080  private static final Logger LOG = LoggerFactory.getLogger(TestTableSnapshotInputFormat.class);
081
082  private static final byte[] bbb = Bytes.toBytes("bbb");
083  private static final byte[] yyy = Bytes.toBytes("yyy");
084  private static final byte[] bbc = Bytes.toBytes("bbc");
085  private static final byte[] yya = Bytes.toBytes("yya");
086
087  @Rule
088  public TestName name = new TestName();
089
090  @Override
091  protected byte[] getStartRow() {
092    return bbb;
093  }
094
095  @Override
096  protected byte[] getEndRow() {
097    return yyy;
098  }
099
100
101  @Test
102  public void testGetBestLocations() throws IOException {
103    TableSnapshotInputFormatImpl tsif = new TableSnapshotInputFormatImpl();
104    Configuration conf = UTIL.getConfiguration();
105
106    HDFSBlocksDistribution blockDistribution = new HDFSBlocksDistribution();
107    Assert.assertEquals(null,
108      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
109
110    blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 1);
111    Assert.assertEquals(Lists.newArrayList("h1"),
112      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
113
114    blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 1);
115    Assert.assertEquals(Lists.newArrayList("h1"),
116      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
117
118    blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 1);
119    Assert.assertEquals(Lists.newArrayList("h1"),
120      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
121
122    blockDistribution = new HDFSBlocksDistribution();
123    blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 10);
124    blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 7);
125    blockDistribution.addHostsAndBlockWeight(new String[] {"h3"}, 5);
126    blockDistribution.addHostsAndBlockWeight(new String[] {"h4"}, 1);
127    Assert.assertEquals(Lists.newArrayList("h1"),
128      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
129
130    blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 2);
131    Assert.assertEquals(Lists.newArrayList("h1", "h2"),
132      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
133
134    blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 3);
135    Assert.assertEquals(Lists.newArrayList("h2", "h1"),
136      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
137
138    blockDistribution.addHostsAndBlockWeight(new String[] {"h3"}, 6);
139    blockDistribution.addHostsAndBlockWeight(new String[] {"h4"}, 9);
140
141    Assert.assertEquals(Lists.newArrayList("h2", "h3", "h4"),
142      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
143  }
144
145  public static enum TestTableSnapshotCounters {
146    VALIDATION_ERROR
147  }
148
149  public static class TestTableSnapshotMapper
150    extends TableMapper<ImmutableBytesWritable, NullWritable> {
151    @Override
152    protected void map(ImmutableBytesWritable key, Result value,
153        Context context) throws IOException, InterruptedException {
154      // Validate a single row coming from the snapshot, and emit the row key
155      verifyRowFromMap(key, value);
156      context.write(key, NullWritable.get());
157    }
158  }
159
160  public static class TestTableSnapshotReducer
161    extends Reducer<ImmutableBytesWritable, NullWritable, NullWritable, NullWritable> {
162    HBaseTestingUtil.SeenRowTracker rowTracker =
163        new HBaseTestingUtil.SeenRowTracker(bbb, yyy);
164    @Override
165    protected void reduce(ImmutableBytesWritable key, Iterable<NullWritable> values,
166       Context context) throws IOException, InterruptedException {
167      rowTracker.addRow(key.get());
168    }
169
170    @Override
171    protected void cleanup(Context context) throws IOException,
172        InterruptedException {
173      rowTracker.validate();
174    }
175  }
176
177  @Test
178  public void testInitTableSnapshotMapperJobConfig() throws Exception {
179    final TableName tableName = TableName.valueOf(name.getMethodName());
180    String snapshotName = "foo";
181
182    try {
183      createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1);
184      Job job = new Job(UTIL.getConfiguration());
185      Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
186
187      TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
188        new Scan(), TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
189        NullWritable.class, job, false, tmpTableDir);
190
191      // TODO: would be better to examine directly the cache instance that results from this
192      // config. Currently this is not possible because BlockCache initialization is static.
193      Assert.assertEquals(
194        "Snapshot job should be configured for default LruBlockCache.",
195        HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT,
196        job.getConfiguration().getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, -1), 0.01);
197      Assert.assertEquals(
198        "Snapshot job should not use BucketCache.",
199        0, job.getConfiguration().getFloat("hbase.bucketcache.size", -1), 0.01);
200    } finally {
201      UTIL.getAdmin().deleteSnapshot(snapshotName);
202      UTIL.deleteTable(tableName);
203    }
204  }
205
206  @Test
207  public void testWithMockedMapReduceSingleRegionByRegionLocation() throws Exception {
208    Configuration conf = UTIL.getConfiguration();
209    conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION, true);
210    try {
211      testWithMockedMapReduce(UTIL, name.getMethodName() + "Snapshot", 1, 1, 1,
212        true);
213    } finally {
214      conf.unset(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION);
215    }
216  }
217
218  @Override
219  public void testRestoreSnapshotDoesNotCreateBackRefLinksInit(TableName tableName,
220      String snapshotName, Path tmpTableDir) throws Exception {
221    Job job = new Job(UTIL.getConfiguration());
222    TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
223      new Scan(), TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
224      NullWritable.class, job, false, tmpTableDir);
225  }
226
227  @Override
228  public void testWithMockedMapReduce(HBaseTestingUtil util, String snapshotName,
229      int numRegions, int numSplitsPerRegion, int expectedNumSplits, boolean setLocalityEnabledTo)
230      throws Exception {
231    final TableName tableName = TableName.valueOf(name.getMethodName());
232    try {
233      createTableAndSnapshot(
234        util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions);
235
236      Configuration conf = util.getConfiguration();
237      conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, setLocalityEnabledTo);
238      conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION,
239        SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT);
240      Job job = new Job(conf);
241      Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName);
242      Scan scan = new Scan().withStartRow(getStartRow()).withStopRow(getEndRow()); // limit the scan
243
244      if (numSplitsPerRegion > 1) {
245        TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
246                scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
247                NullWritable.class, job, false, tmpTableDir, new RegionSplitter.UniformSplit(),
248                numSplitsPerRegion);
249      } else {
250        TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
251                scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
252                NullWritable.class, job, false, tmpTableDir);
253      }
254
255      verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow());
256
257    } finally {
258      util.getAdmin().deleteSnapshot(snapshotName);
259      util.deleteTable(tableName);
260    }
261  }
262
263  @Test
264  public void testWithMockedMapReduceWithSplitsPerRegion() throws Exception {
265    String snapshotName = "testWithMockedMapReduceMultiRegion";
266    final TableName tableName = TableName.valueOf(name.getMethodName());
267    try {
268      createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 10);
269
270      Configuration conf = UTIL.getConfiguration();
271      conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, false);
272      Job job = new Job(conf);
273      Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
274      // test scan with startRow and stopRow
275      Scan scan = new Scan().withStartRow(bbc).withStopRow(yya);
276
277      TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
278        TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
279        tmpTableDir, new RegionSplitter.UniformSplit(), 5);
280
281      verifyWithMockedMapReduce(job, 10, 40, bbc, yya);
282    } finally {
283      UTIL.getAdmin().deleteSnapshot(snapshotName);
284      UTIL.deleteTable(tableName);
285    }
286  }
287
288  @Test
289  public void testWithMockedMapReduceWithNoStartRowStopRow() throws Exception {
290    String snapshotName = "testWithMockedMapReduceMultiRegion";
291    final TableName tableName = TableName.valueOf(name.getMethodName());
292    try {
293      createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 10);
294
295      Configuration conf = UTIL.getConfiguration();
296      conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, false);
297      Job job = new Job(conf);
298      Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
299      // test scan without startRow and stopRow
300      Scan scan2 = new Scan();
301
302      TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan2,
303        TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
304        tmpTableDir, new RegionSplitter.UniformSplit(), 5);
305
306      verifyWithMockedMapReduce(job, 10, 50, HConstants.EMPTY_START_ROW,
307        HConstants.EMPTY_START_ROW);
308
309    } finally {
310      UTIL.getAdmin().deleteSnapshot(snapshotName);
311      UTIL.deleteTable(tableName);
312    }
313  }
314
315  @Test
316  public void testScanLimit() throws Exception {
317    final TableName tableName = TableName.valueOf(name.getMethodName());
318    final String snapshotName = tableName + "Snapshot";
319    Table table = null;
320    try {
321      UTIL.getConfiguration().setInt(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT, 10);
322      if (UTIL.getAdmin().tableExists(tableName)) {
323        UTIL.deleteTable(tableName);
324      }
325
326      UTIL.createTable(tableName, FAMILIES, new byte[][] { bbb, yyy });
327
328      Admin admin = UTIL.getAdmin();
329
330      int regionNum = admin.getRegions(tableName).size();
331      // put some stuff in the table
332      table = UTIL.getConnection().getTable(tableName);
333      UTIL.loadTable(table, FAMILIES);
334
335      Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration());
336      FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration());
337
338      SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES),
339        null, snapshotName, rootDir, fs, true);
340
341      Job job = new Job(UTIL.getConfiguration());
342      Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
343      Scan scan = new Scan();
344      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
345        TestTableSnapshotInputFormat.class);
346
347      TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
348        RowCounter.RowCounterMapper.class, NullWritable.class, NullWritable.class, job, true,
349        tmpTableDir);
350      Assert.assertTrue(job.waitForCompletion(true));
351      Assert.assertEquals(10 * regionNum,
352        job.getCounters().findCounter(RowCounter.RowCounterMapper.Counters.ROWS).getValue());
353    } finally {
354      if (table != null) {
355        table.close();
356      }
357      UTIL.getConfiguration().unset(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT);
358      UTIL.getAdmin().deleteSnapshot(snapshotName);
359      UTIL.deleteTable(tableName);
360    }
361  }
362
363  @Test
364  public void testNoDuplicateResultsWhenSplitting() throws Exception {
365    TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting");
366    String snapshotName = "testSnapshotBug";
367    try {
368      if (UTIL.getAdmin().tableExists(tableName)) {
369        UTIL.deleteTable(tableName);
370      }
371
372      UTIL.createTable(tableName, FAMILIES);
373      Admin admin = UTIL.getAdmin();
374
375      // put some stuff in the table
376      Table table = UTIL.getConnection().getTable(tableName);
377      UTIL.loadTable(table, FAMILIES);
378
379      // split to 2 regions
380      admin.split(tableName, Bytes.toBytes("eee"));
381      TestTableSnapshotScanner.blockUntilSplitFinished(UTIL, tableName, 2);
382
383      Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration());
384      FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration());
385
386      SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES),
387        null, snapshotName, rootDir, fs, true);
388
389      // load different values
390      byte[] value = Bytes.toBytes("after_snapshot_value");
391      UTIL.loadTable(table, FAMILIES, value);
392
393      // cause flush to create new files in the region
394      admin.flush(tableName);
395      table.close();
396
397      Job job = new Job(UTIL.getConfiguration());
398      Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
399      // limit the scan
400      Scan scan = new Scan().withStartRow(getStartRow()).withStopRow(getEndRow());
401
402      TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
403        TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
404        tmpTableDir);
405
406      verifyWithMockedMapReduce(job, 2, 2, getStartRow(), getEndRow());
407    } finally {
408      UTIL.getAdmin().deleteSnapshot(snapshotName);
409      UTIL.deleteTable(tableName);
410    }
411  }
412
413  @Test
414  public void testScannerReadTypeConfiguration() throws IOException {
415    Configuration conf = new Configuration(false);
416    // Explicitly set ReadTypes should persist
417    for (ReadType readType : Arrays.asList(ReadType.PREAD, ReadType.STREAM)) {
418      Scan scanWithReadType = new Scan();
419      scanWithReadType.setReadType(readType);
420      assertEquals(scanWithReadType.getReadType(),
421          serializeAndReturn(conf, scanWithReadType).getReadType());
422    }
423    // We should only see the DEFAULT ReadType getting updated to STREAM.
424    Scan scanWithoutReadType = new Scan();
425    assertEquals(ReadType.DEFAULT, scanWithoutReadType.getReadType());
426    assertEquals(ReadType.STREAM, serializeAndReturn(conf, scanWithoutReadType).getReadType());
427
428    // We should still be able to force a certain ReadType when DEFAULT is given.
429    conf.setEnum(SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE, ReadType.PREAD);
430    assertEquals(ReadType.DEFAULT, scanWithoutReadType.getReadType());
431    assertEquals(ReadType.PREAD, serializeAndReturn(conf, scanWithoutReadType).getReadType());
432  }
433
434  /**
435   * Serializes and deserializes the given scan in the same manner that
436   * TableSnapshotInputFormat does.
437   */
438  private Scan serializeAndReturn(Configuration conf, Scan s) throws IOException {
439    conf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(s));
440    return TableSnapshotInputFormatImpl.extractScanFromConf(conf);
441  }
442
443  private void verifyWithMockedMapReduce(Job job, int numRegions, int expectedNumSplits,
444      byte[] startRow, byte[] stopRow)
445      throws IOException, InterruptedException {
446    TableSnapshotInputFormat tsif = new TableSnapshotInputFormat();
447    List<InputSplit> splits = tsif.getSplits(job);
448
449    Assert.assertEquals(expectedNumSplits, splits.size());
450
451    HBaseTestingUtil.SeenRowTracker rowTracker = new HBaseTestingUtil.SeenRowTracker(startRow,
452        stopRow.length > 0 ? stopRow : Bytes.toBytes("\uffff"));
453
454    boolean localityEnabled =
455        job.getConfiguration().getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY,
456                                          SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT);
457
458    boolean byRegionLoc =
459      job.getConfiguration().getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION,
460                                        SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT);
461    for (int i = 0; i < splits.size(); i++) {
462      // validate input split
463      InputSplit split = splits.get(i);
464      Assert.assertTrue(split instanceof TableSnapshotRegionSplit);
465      TableSnapshotRegionSplit snapshotRegionSplit = (TableSnapshotRegionSplit) split;
466      if (localityEnabled) {
467        Assert.assertTrue(split.getLocations() != null && split.getLocations().length != 0);
468        if (byRegionLoc) {
469          // When it uses region location from meta, the hostname will be "localhost",
470          // the location from hdfs block location is "127.0.0.1".
471          Assert.assertEquals(1, split.getLocations().length);
472          Assert.assertTrue("Not using region location!",
473            split.getLocations()[0].equals("localhost"));
474        } else {
475          Assert.assertTrue("Not using region location!",
476            split.getLocations()[0].equals("127.0.0.1"));
477        }
478      } else {
479        Assert.assertTrue(split.getLocations() != null && split.getLocations().length == 0);
480      }
481
482      Scan scan =
483          TableMapReduceUtil.convertStringToScan(snapshotRegionSplit.getDelegate().getScan());
484      if (startRow.length > 0) {
485        Assert.assertTrue(
486          Bytes.toStringBinary(startRow) + " should <= " + Bytes.toStringBinary(scan.getStartRow()),
487          Bytes.compareTo(startRow, scan.getStartRow()) <= 0);
488      }
489      if (stopRow.length > 0) {
490        Assert.assertTrue(
491          Bytes.toStringBinary(stopRow) + " should >= " + Bytes.toStringBinary(scan.getStopRow()),
492          Bytes.compareTo(stopRow, scan.getStopRow()) >= 0);
493      }
494      Assert.assertTrue("startRow should < stopRow",
495        Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) < 0);
496
497      // validate record reader
498      TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class);
499      when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration());
500      RecordReader<ImmutableBytesWritable, Result> rr =
501          tsif.createRecordReader(split, taskAttemptContext);
502      rr.initialize(split, taskAttemptContext);
503
504      // validate we can read all the data back
505      while (rr.nextKeyValue()) {
506        byte[] row = rr.getCurrentKey().get();
507        verifyRowFromMap(rr.getCurrentKey(), rr.getCurrentValue());
508        rowTracker.addRow(row);
509      }
510
511      rr.close();
512    }
513
514    // validate all rows are seen
515    rowTracker.validate();
516  }
517
518  @Override
519  protected void testWithMapReduceImpl(HBaseTestingUtil util, TableName tableName,
520      String snapshotName, Path tableDir, int numRegions, int numSplitsPerRegion,
521      int expectedNumSplits, boolean shutdownCluster) throws Exception {
522    doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir,
523      numRegions, numSplitsPerRegion, expectedNumSplits, shutdownCluster);
524  }
525
526  // this is also called by the IntegrationTestTableSnapshotInputFormat
527  public static void doTestWithMapReduce(HBaseTestingUtil util, TableName tableName,
528      String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions,
529      int numSplitsPerRegion, int expectedNumSplits, boolean shutdownCluster) throws Exception {
530
531    LOG.info("testing with MapReduce");
532
533    LOG.info("create the table and snapshot");
534    createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions);
535
536    if (shutdownCluster) {
537      LOG.info("shutting down hbase cluster.");
538      util.shutdownMiniHBaseCluster();
539    }
540
541    try {
542      // create the job
543      Job job = new Job(util.getConfiguration());
544      Scan scan = new Scan().withStartRow(startRow).withStopRow(endRow); // limit the scan
545
546      job.setJarByClass(util.getClass());
547      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
548              TestTableSnapshotInputFormat.class);
549
550      if (numSplitsPerRegion > 1) {
551        TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
552                scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
553                NullWritable.class, job, true, tableDir, new RegionSplitter.UniformSplit(),
554                numSplitsPerRegion);
555      } else {
556        TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
557                scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
558                NullWritable.class, job, true, tableDir);
559      }
560
561      job.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class);
562      job.setNumReduceTasks(1);
563      job.setOutputFormatClass(NullOutputFormat.class);
564
565      Assert.assertTrue(job.waitForCompletion(true));
566    } finally {
567      if (!shutdownCluster) {
568        util.getAdmin().deleteSnapshot(snapshotName);
569        util.deleteTable(tableName);
570      }
571    }
572  }
573
574  @Test
575  public void testWithMapReduceMultipleMappersPerRegion() throws Exception {
576    testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 5, 50, false);
577  }
578
579  @Test
580  public void testCleanRestoreDir() throws Exception {
581    TableName tableName = TableName.valueOf("test_table");
582    String snapshotName = "test_snapshot";
583    createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1);
584    Job job = Job.getInstance(UTIL.getConfiguration());
585    Path workingDir = UTIL.getDataTestDirOnTestFS(snapshotName);
586    TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
587      new Scan(), TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
588      NullWritable.class, job, false, workingDir);
589    FileSystem fs = workingDir.getFileSystem(job.getConfiguration());
590    Path restorePath = new Path(job.getConfiguration()
591      .get("hbase.TableSnapshotInputFormat.restore.dir"));
592    Assert.assertTrue(fs.exists(restorePath));
593    TableSnapshotInputFormat.cleanRestoreDir(job, snapshotName);
594    Assert.assertFalse(fs.exists(restorePath));
595  }
596}