001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION;
021import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT;
022import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT;
023import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY;
024import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT;
025import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE;
026import static org.junit.jupiter.api.Assertions.assertEquals;
027import static org.junit.jupiter.api.Assertions.assertFalse;
028import static org.junit.jupiter.api.Assertions.assertTrue;
029import static org.mockito.Mockito.mock;
030import static org.mockito.Mockito.when;
031
032import java.io.IOException;
033import java.util.Arrays;
034import java.util.HashSet;
035import java.util.List;
036import java.util.Set;
037import java.util.stream.Collectors;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.FileStatus;
040import org.apache.hadoop.fs.FileSystem;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.hbase.HBaseTestingUtil;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.HDFSBlocksDistribution;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.client.Admin;
047import org.apache.hadoop.hbase.client.Result;
048import org.apache.hadoop.hbase.client.Scan;
049import org.apache.hadoop.hbase.client.Scan.ReadType;
050import org.apache.hadoop.hbase.client.Table;
051import org.apache.hadoop.hbase.client.TestTableSnapshotScanner;
052import org.apache.hadoop.hbase.io.HFileLink;
053import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
054import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionRecordReader;
055import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit;
056import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
057import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
058import org.apache.hadoop.hbase.testclassification.LargeTests;
059import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.hbase.util.CommonFSUtils;
062import org.apache.hadoop.hbase.util.RegionSplitter;
063import org.apache.hadoop.io.NullWritable;
064import org.apache.hadoop.mapreduce.InputSplit;
065import org.apache.hadoop.mapreduce.Job;
066import org.apache.hadoop.mapreduce.RecordReader;
067import org.apache.hadoop.mapreduce.Reducer;
068import org.apache.hadoop.mapreduce.TaskAttemptContext;
069import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
070import org.junit.jupiter.api.BeforeEach;
071import org.junit.jupiter.api.Tag;
072import org.junit.jupiter.api.Test;
073import org.junit.jupiter.api.TestInfo;
074import org.slf4j.Logger;
075import org.slf4j.LoggerFactory;
076
077import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
078
079@Tag(VerySlowMapReduceTests.TAG)
080@Tag(LargeTests.TAG)
081public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBase {
082
083  private static final Logger LOG = LoggerFactory.getLogger(TestTableSnapshotInputFormat.class);
084
085  private static final byte[] bbb = Bytes.toBytes("bbb");
086  private static final byte[] yyy = Bytes.toBytes("yyy");
087  private static final byte[] bbc = Bytes.toBytes("bbc");
088  private static final byte[] yya = Bytes.toBytes("yya");
089
090  private String name;
091
092  @BeforeEach
093  public void setUp(TestInfo testInfo) throws Exception {
094    name = testInfo.getTestMethod().get().getName();
095  }
096
097  @Override
098  protected byte[] getStartRow() {
099    return bbb;
100  }
101
102  @Override
103  protected byte[] getEndRow() {
104    return yyy;
105  }
106
107  @Test
108  public void testGetBestLocations() throws IOException {
109    TableSnapshotInputFormatImpl tsif = new TableSnapshotInputFormatImpl();
110    Configuration conf = UTIL.getConfiguration();
111
112    HDFSBlocksDistribution blockDistribution = new HDFSBlocksDistribution();
113    assertEquals(null, TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
114
115    blockDistribution.addHostsAndBlockWeight(new String[] { "h1" }, 1);
116    assertEquals(Lists.newArrayList("h1"),
117      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
118
119    blockDistribution.addHostsAndBlockWeight(new String[] { "h1" }, 1);
120    assertEquals(Lists.newArrayList("h1"),
121      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
122
123    blockDistribution.addHostsAndBlockWeight(new String[] { "h2" }, 1);
124    assertEquals(Lists.newArrayList("h1"),
125      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
126
127    blockDistribution = new HDFSBlocksDistribution();
128    blockDistribution.addHostsAndBlockWeight(new String[] { "h1" }, 10);
129    blockDistribution.addHostsAndBlockWeight(new String[] { "h2" }, 7);
130    blockDistribution.addHostsAndBlockWeight(new String[] { "h3" }, 5);
131    blockDistribution.addHostsAndBlockWeight(new String[] { "h4" }, 1);
132    assertEquals(Lists.newArrayList("h1"),
133      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
134
135    blockDistribution.addHostsAndBlockWeight(new String[] { "h2" }, 2);
136    assertEquals(Lists.newArrayList("h1", "h2"),
137      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
138
139    blockDistribution.addHostsAndBlockWeight(new String[] { "h2" }, 3);
140    assertEquals(Lists.newArrayList("h2", "h1"),
141      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
142
143    blockDistribution.addHostsAndBlockWeight(new String[] { "h3" }, 6);
144    blockDistribution.addHostsAndBlockWeight(new String[] { "h4" }, 9);
145
146    assertEquals(Lists.newArrayList("h2", "h3", "h4"),
147      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
148  }
149
150  @Test
151  public void testTableSnapshotRegionRecordReaderGetFilesRead() throws Exception {
152    final TableName tableName = TableName.valueOf(name);
153    String snapshotName = name + "_snapshot";
154    try {
155      // Setup: create table, load data, snapshot, and configure job with restore dir
156      createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1);
157
158      Configuration conf = UTIL.getConfiguration();
159      Job job = new Job(conf);
160      Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
161      Scan scan = new Scan().withStartRow(getStartRow()).withStopRow(getEndRow());
162      TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
163        TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
164        tmpTableDir);
165
166      // Get splits (one per region) and extract delegate split for restore path and region info
167      TableSnapshotInputFormat tsif = new TableSnapshotInputFormat();
168      List<InputSplit> splits = tsif.getSplits(job);
169      assertEquals(1, splits.size());
170
171      InputSplit split = splits.get(0);
172      assertTrue(split instanceof TableSnapshotRegionSplit);
173      TableSnapshotRegionSplit snapshotRegionSplit = (TableSnapshotRegionSplit) split;
174      TableSnapshotInputFormatImpl.InputSplit implSplit = snapshotRegionSplit.getDelegate();
175
176      // Collect expected store file paths from the restored region directory
177      Set<String> expectedFiles = new HashSet<>();
178      Path restorePath = new Path(implSplit.getRestoreDir());
179      FileSystem fs = restorePath.getFileSystem(conf);
180      Path tableDir =
181        CommonFSUtils.getTableDir(restorePath, implSplit.getTableDescriptor().getTableName());
182      Path regionPath = new Path(tableDir, implSplit.getRegionInfo().getEncodedName());
183      FileStatus[] familyDirs = fs.listStatus(regionPath);
184      if (familyDirs != null) {
185        for (FileStatus fam : familyDirs) {
186          if (fam.isDirectory()) {
187            FileStatus[] files = fs.listStatus(fam.getPath());
188            if (files != null) {
189              for (FileStatus f : files) {
190                if (f.isFile()) {
191                  String referenceFileName = f.getPath().getName();
192                  expectedFiles.add(HFileLink.getReferencedHFileName(referenceFileName));
193                }
194              }
195            }
196          }
197        }
198      }
199      assertFalse(expectedFiles.isEmpty(),
200        "Should have at least one store file after snapshot restore");
201
202      // Create record reader, initialize with split (opens underlying ClientSideRegionScanner)
203      TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class);
204      when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration());
205
206      RecordReader<ImmutableBytesWritable, Result> rr =
207        tsif.createRecordReader(split, taskAttemptContext);
208      assertTrue(rr instanceof TableSnapshotRegionRecordReader);
209      TableSnapshotRegionRecordReader recordReader = (TableSnapshotRegionRecordReader) rr;
210      recordReader.initialize(split, taskAttemptContext);
211
212      // Before close: getFilesRead() must be empty
213      Set<Path> filesReadBeforeClose = recordReader.getFilesRead();
214      assertTrue(filesReadBeforeClose.isEmpty(), "Should return empty set before closing");
215
216      // Read a few key-values; getFilesRead() must still be empty until close
217      int count = 0;
218      while (count < 3 && recordReader.nextKeyValue()) {
219        count++;
220      }
221
222      filesReadBeforeClose = recordReader.getFilesRead();
223      assertTrue(filesReadBeforeClose.isEmpty(),
224        "Should return empty set before closing even after reading");
225
226      // Close reader so underlying scanner reports files successfully read
227      recordReader.close();
228
229      // After close: getFilesRead() must match expected store file set
230      Set<String> filesReadAfterClose =
231        recordReader.getFilesRead().stream().map(Path::getName).collect(Collectors.toSet());
232
233      assertEquals(expectedFiles, filesReadAfterClose, "Should contain all expected file paths");
234    } finally {
235      UTIL.getAdmin().deleteSnapshot(snapshotName);
236      UTIL.deleteTable(tableName);
237    }
238  }
239
240  public static enum TestTableSnapshotCounters {
241    VALIDATION_ERROR
242  }
243
244  public static class TestTableSnapshotMapper
245    extends TableMapper<ImmutableBytesWritable, NullWritable> {
246    @Override
247    protected void map(ImmutableBytesWritable key, Result value, Context context)
248      throws IOException, InterruptedException {
249      // Validate a single row coming from the snapshot, and emit the row key
250      verifyRowFromMap(key, value);
251      context.write(key, NullWritable.get());
252    }
253  }
254
255  public static class TestTableSnapshotReducer
256    extends Reducer<ImmutableBytesWritable, NullWritable, NullWritable, NullWritable> {
257    HBaseTestingUtil.SeenRowTracker rowTracker = new HBaseTestingUtil.SeenRowTracker(bbb, yyy);
258
259    @Override
260    protected void reduce(ImmutableBytesWritable key, Iterable<NullWritable> values,
261      Context context) throws IOException, InterruptedException {
262      rowTracker.addRow(key.get());
263    }
264
265    @Override
266    protected void cleanup(Context context) throws IOException, InterruptedException {
267      rowTracker.validate();
268    }
269  }
270
271  @Test
272  public void testInitTableSnapshotMapperJobConfig() throws Exception {
273    final TableName tableName = TableName.valueOf(name);
274    String snapshotName = "foo";
275
276    try {
277      createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1);
278      Job job = new Job(UTIL.getConfiguration());
279      Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
280
281      TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, new Scan(),
282        TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
283        tmpTableDir);
284
285      // TODO: would be better to examine directly the cache instance that results from this
286      // config. Currently this is not possible because BlockCache initialization is static.
287      assertEquals(HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT,
288        job.getConfiguration().getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, -1), 0.01,
289        "Snapshot job should be configured for default LruBlockCache.");
290      assertEquals(0, job.getConfiguration().getFloat("hbase.bucketcache.size", -1), 0.01,
291        "Snapshot job should not use BucketCache.");
292    } finally {
293      UTIL.getAdmin().deleteSnapshot(snapshotName);
294      UTIL.deleteTable(tableName);
295    }
296  }
297
298  @Test
299  public void testWithMockedMapReduceSingleRegionByRegionLocation() throws Exception {
300    Configuration conf = UTIL.getConfiguration();
301    conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION, true);
302    try {
303      testWithMockedMapReduce(UTIL, name + "Snapshot", 1, 1, 1, true);
304    } finally {
305      conf.unset(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION);
306    }
307  }
308
309  @Override
310  public void testRestoreSnapshotDoesNotCreateBackRefLinksInit(TableName tableName,
311    String snapshotName, Path tmpTableDir) throws Exception {
312    Job job = new Job(UTIL.getConfiguration());
313    TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, new Scan(),
314      TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
315      tmpTableDir);
316  }
317
318  @Override
319  public void testWithMockedMapReduce(HBaseTestingUtil util, String snapshotName, int numRegions,
320    int numSplitsPerRegion, int expectedNumSplits, boolean setLocalityEnabledTo) throws Exception {
321    final TableName tableName = TableName.valueOf(name);
322    try {
323      createTableAndSnapshot(util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions);
324
325      Configuration conf = util.getConfiguration();
326      conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, setLocalityEnabledTo);
327      conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION,
328        SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT);
329      Job job = new Job(conf);
330      Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName);
331      Scan scan = new Scan().withStartRow(getStartRow()).withStopRow(getEndRow()); // limit the scan
332
333      if (numSplitsPerRegion > 1) {
334        TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
335          TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job,
336          false, tmpTableDir, new RegionSplitter.UniformSplit(), numSplitsPerRegion);
337      } else {
338        TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
339          TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job,
340          false, tmpTableDir);
341      }
342
343      verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow());
344
345    } finally {
346      util.getAdmin().deleteSnapshot(snapshotName);
347      util.deleteTable(tableName);
348    }
349  }
350
351  @Test
352  public void testWithMockedMapReduceWithSplitsPerRegion() throws Exception {
353    String snapshotName = "testWithMockedMapReduceMultiRegion";
354    final TableName tableName = TableName.valueOf(name);
355    try {
356      createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 10);
357
358      Configuration conf = UTIL.getConfiguration();
359      conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, false);
360      Job job = new Job(conf);
361      Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
362      // test scan with startRow and stopRow
363      Scan scan = new Scan().withStartRow(bbc).withStopRow(yya);
364
365      TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
366        TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
367        tmpTableDir, new RegionSplitter.UniformSplit(), 5);
368
369      verifyWithMockedMapReduce(job, 10, 40, bbc, yya);
370    } finally {
371      UTIL.getAdmin().deleteSnapshot(snapshotName);
372      UTIL.deleteTable(tableName);
373    }
374  }
375
376  @Test
377  public void testWithMockedMapReduceWithNoStartRowStopRow() throws Exception {
378    String snapshotName = "testWithMockedMapReduceMultiRegion";
379    final TableName tableName = TableName.valueOf(name);
380    try {
381      createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 10);
382
383      Configuration conf = UTIL.getConfiguration();
384      conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, false);
385      Job job = new Job(conf);
386      Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
387      // test scan without startRow and stopRow
388      Scan scan2 = new Scan();
389
390      TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan2,
391        TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
392        tmpTableDir, new RegionSplitter.UniformSplit(), 5);
393
394      verifyWithMockedMapReduce(job, 10, 50, HConstants.EMPTY_START_ROW,
395        HConstants.EMPTY_START_ROW);
396
397    } finally {
398      UTIL.getAdmin().deleteSnapshot(snapshotName);
399      UTIL.deleteTable(tableName);
400    }
401  }
402
403  @Test
404  public void testScanLimit() throws Exception {
405    final TableName tableName = TableName.valueOf(name);
406    final String snapshotName = tableName + "Snapshot";
407    Table table = null;
408    try {
409      UTIL.getConfiguration().setInt(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT, 10);
410      if (UTIL.getAdmin().tableExists(tableName)) {
411        UTIL.deleteTable(tableName);
412      }
413
414      UTIL.createTable(tableName, FAMILIES, new byte[][] { bbb, yyy });
415
416      Admin admin = UTIL.getAdmin();
417
418      int regionNum = admin.getRegions(tableName).size();
419      // put some stuff in the table
420      table = UTIL.getConnection().getTable(tableName);
421      UTIL.loadTable(table, FAMILIES);
422
423      Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration());
424      FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration());
425
426      SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES),
427        null, snapshotName, rootDir, fs, true);
428
429      Job job = new Job(UTIL.getConfiguration());
430      Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
431      Scan scan = new Scan();
432      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
433        TestTableSnapshotInputFormat.class);
434
435      TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
436        RowCounter.RowCounterMapper.class, NullWritable.class, NullWritable.class, job, true,
437        tmpTableDir);
438      assertTrue(job.waitForCompletion(true));
439      assertEquals(10 * regionNum,
440        job.getCounters().findCounter(RowCounter.RowCounterMapper.Counters.ROWS).getValue());
441    } finally {
442      if (table != null) {
443        table.close();
444      }
445      UTIL.getConfiguration().unset(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT);
446      UTIL.getAdmin().deleteSnapshot(snapshotName);
447      UTIL.deleteTable(tableName);
448    }
449  }
450
451  @Test
452  public void testNoDuplicateResultsWhenSplitting() throws Exception {
453    TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting");
454    String snapshotName = "testSnapshotBug";
455    try {
456      if (UTIL.getAdmin().tableExists(tableName)) {
457        UTIL.deleteTable(tableName);
458      }
459
460      UTIL.createTable(tableName, FAMILIES);
461      Admin admin = UTIL.getAdmin();
462
463      // put some stuff in the table
464      Table table = UTIL.getConnection().getTable(tableName);
465      UTIL.loadTable(table, FAMILIES);
466
467      // split to 2 regions
468      admin.split(tableName, Bytes.toBytes("eee"));
469      TestTableSnapshotScanner.blockUntilSplitFinished(UTIL, tableName, 2);
470
471      Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration());
472      FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration());
473
474      SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES),
475        null, snapshotName, rootDir, fs, true);
476
477      // load different values
478      byte[] value = Bytes.toBytes("after_snapshot_value");
479      UTIL.loadTable(table, FAMILIES, value);
480
481      // cause flush to create new files in the region
482      admin.flush(tableName);
483      table.close();
484
485      Job job = new Job(UTIL.getConfiguration());
486      Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
487      // limit the scan
488      Scan scan = new Scan().withStartRow(getStartRow()).withStopRow(getEndRow());
489
490      TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
491        TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
492        tmpTableDir);
493
494      verifyWithMockedMapReduce(job, 2, 2, getStartRow(), getEndRow());
495    } finally {
496      UTIL.getAdmin().deleteSnapshot(snapshotName);
497      UTIL.deleteTable(tableName);
498    }
499  }
500
501  @Test
502  public void testScannerReadTypeConfiguration() throws IOException {
503    Configuration conf = new Configuration(false);
504    // Explicitly set ReadTypes should persist
505    for (ReadType readType : Arrays.asList(ReadType.PREAD, ReadType.STREAM)) {
506      Scan scanWithReadType = new Scan();
507      scanWithReadType.setReadType(readType);
508      assertEquals(scanWithReadType.getReadType(),
509        serializeAndReturn(conf, scanWithReadType).getReadType());
510    }
511    // We should only see the DEFAULT ReadType getting updated to STREAM.
512    Scan scanWithoutReadType = new Scan();
513    assertEquals(ReadType.DEFAULT, scanWithoutReadType.getReadType());
514    assertEquals(ReadType.STREAM, serializeAndReturn(conf, scanWithoutReadType).getReadType());
515
516    // We should still be able to force a certain ReadType when DEFAULT is given.
517    conf.setEnum(SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE, ReadType.PREAD);
518    assertEquals(ReadType.DEFAULT, scanWithoutReadType.getReadType());
519    assertEquals(ReadType.PREAD, serializeAndReturn(conf, scanWithoutReadType).getReadType());
520  }
521
522  /**
523   * Serializes and deserializes the given scan in the same manner that TableSnapshotInputFormat
524   * does.
525   */
526  private Scan serializeAndReturn(Configuration conf, Scan s) throws IOException {
527    conf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(s));
528    return TableSnapshotInputFormatImpl.extractScanFromConf(conf);
529  }
530
531  private void verifyWithMockedMapReduce(Job job, int numRegions, int expectedNumSplits,
532    byte[] startRow, byte[] stopRow) throws IOException, InterruptedException {
533    TableSnapshotInputFormat tsif = new TableSnapshotInputFormat();
534    List<InputSplit> splits = tsif.getSplits(job);
535
536    assertEquals(expectedNumSplits, splits.size());
537
538    HBaseTestingUtil.SeenRowTracker rowTracker = new HBaseTestingUtil.SeenRowTracker(startRow,
539      stopRow.length > 0 ? stopRow : Bytes.toBytes("\uffff"));
540
541    boolean localityEnabled = job.getConfiguration().getBoolean(
542      SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT);
543
544    boolean byRegionLoc =
545      job.getConfiguration().getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION,
546        SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT);
547    for (int i = 0; i < splits.size(); i++) {
548      // validate input split
549      InputSplit split = splits.get(i);
550      assertTrue(split instanceof TableSnapshotRegionSplit);
551      TableSnapshotRegionSplit snapshotRegionSplit = (TableSnapshotRegionSplit) split;
552      if (localityEnabled) {
553        assertTrue(split.getLocations() != null && split.getLocations().length != 0);
554        if (byRegionLoc) {
555          // When it uses region location from meta, the hostname will be "localhost",
556          // the location from hdfs block location is "127.0.0.1".
557          assertEquals(1, split.getLocations().length);
558          assertTrue(split.getLocations()[0].equals("localhost"), "Not using region location!");
559        } else {
560          assertTrue(split.getLocations()[0].equals("127.0.0.1"), "Not using region location!");
561        }
562      } else {
563        assertTrue(split.getLocations() != null && split.getLocations().length == 0);
564      }
565
566      Scan scan =
567        TableMapReduceUtil.convertStringToScan(snapshotRegionSplit.getDelegate().getScan());
568      if (startRow.length > 0) {
569        assertTrue(Bytes.compareTo(startRow, scan.getStartRow()) <= 0,
570          Bytes.toStringBinary(startRow) + " should <= "
571            + Bytes.toStringBinary(scan.getStartRow()));
572      }
573      if (stopRow.length > 0) {
574        assertTrue(Bytes.compareTo(stopRow, scan.getStopRow()) >= 0,
575          Bytes.toStringBinary(stopRow) + " should >= " + Bytes.toStringBinary(scan.getStopRow()));
576      }
577      assertTrue(Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) < 0,
578        "startRow should < stopRow");
579
580      // validate record reader
581      TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class);
582      when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration());
583      RecordReader<ImmutableBytesWritable, Result> rr =
584        tsif.createRecordReader(split, taskAttemptContext);
585      rr.initialize(split, taskAttemptContext);
586
587      // validate we can read all the data back
588      while (rr.nextKeyValue()) {
589        byte[] row = rr.getCurrentKey().get();
590        verifyRowFromMap(rr.getCurrentKey(), rr.getCurrentValue());
591        rowTracker.addRow(row);
592      }
593
594      rr.close();
595    }
596
597    // validate all rows are seen
598    rowTracker.validate();
599  }
600
601  @Override
602  protected void testWithMapReduceImpl(HBaseTestingUtil util, TableName tableName,
603    String snapshotName, Path tableDir, int numRegions, int numSplitsPerRegion,
604    int expectedNumSplits, boolean shutdownCluster) throws Exception {
605    doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir,
606      numRegions, numSplitsPerRegion, expectedNumSplits, shutdownCluster);
607  }
608
609  // this is also called by the IntegrationTestTableSnapshotInputFormat
610  public static void doTestWithMapReduce(HBaseTestingUtil util, TableName tableName,
611    String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions,
612    int numSplitsPerRegion, int expectedNumSplits, boolean shutdownCluster) throws Exception {
613
614    LOG.info("testing with MapReduce");
615
616    LOG.info("create the table and snapshot");
617    createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions);
618
619    if (shutdownCluster) {
620      LOG.info("shutting down hbase cluster.");
621      util.shutdownMiniHBaseCluster();
622    }
623
624    try {
625      // create the job
626      Job job = new Job(util.getConfiguration());
627      Scan scan = new Scan().withStartRow(startRow).withStopRow(endRow); // limit the scan
628
629      job.setJarByClass(util.getClass());
630      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
631        TestTableSnapshotInputFormat.class);
632
633      if (numSplitsPerRegion > 1) {
634        TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
635          TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job,
636          true, tableDir, new RegionSplitter.UniformSplit(), numSplitsPerRegion);
637      } else {
638        TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
639          TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job,
640          true, tableDir);
641      }
642
643      job.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class);
644      job.setNumReduceTasks(1);
645      job.setOutputFormatClass(NullOutputFormat.class);
646
647      assertTrue(job.waitForCompletion(true));
648    } finally {
649      if (!shutdownCluster) {
650        util.getAdmin().deleteSnapshot(snapshotName);
651        util.deleteTable(tableName);
652      }
653    }
654  }
655
656  @Test
657  public void testWithMapReduceMultipleMappersPerRegion() throws Exception {
658    testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 5, 50, false);
659  }
660
661  @Test
662  public void testCleanRestoreDir() throws Exception {
663    TableName tableName = TableName.valueOf("test_table");
664    String snapshotName = "test_snapshot";
665    createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1);
666    Job job = Job.getInstance(UTIL.getConfiguration());
667    Path workingDir = UTIL.getDataTestDirOnTestFS(snapshotName);
668    TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, new Scan(),
669      TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
670      workingDir);
671    FileSystem fs = workingDir.getFileSystem(job.getConfiguration());
672    Path restorePath =
673      new Path(job.getConfiguration().get("hbase.TableSnapshotInputFormat.restore.dir"));
674    assertTrue(fs.exists(restorePath));
675    TableSnapshotInputFormat.cleanRestoreDir(job, snapshotName);
676    assertFalse(fs.exists(restorePath));
677  }
678
679  /**
680   * Test that explicitly restores a snapshot to a temp directory and reads the restored regions via
681   * ClientSideRegionScanner through a MapReduce job.
682   * <p>
683   * This test verifies the full workflow: 1. Create and load a table with data 2. Create a snapshot
684   * and restore the snapshot to a temporary directory 3. Configure a job to read the restored
685   * regions via ClientSideRegionScanner using TableSnapshotInputFormat and verify that it succeeds
686   * 4. Delete restored temporary directory 5. Configure a new job and verify that it fails
687   */
688  @Test
689  public void testReadFromRestoredSnapshotViaMR() throws Exception {
690    final TableName tableName = TableName.valueOf(name);
691    final String snapshotName = tableName + "_snapshot";
692    try {
693      if (UTIL.getAdmin().tableExists(tableName)) {
694        UTIL.deleteTable(tableName);
695      }
696      UTIL.createTable(tableName, FAMILIES, new byte[][] { bbb, yyy });
697
698      Admin admin = UTIL.getAdmin();
699      int regionNum = admin.getRegions(tableName).size();
700      LOG.info("Created table with {} regions", regionNum);
701
702      Table table = UTIL.getConnection().getTable(tableName);
703      UTIL.loadTable(table, FAMILIES);
704      table.close();
705
706      Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration());
707      FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration());
708      SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES),
709        null, snapshotName, rootDir, fs, true);
710      Path tempRestoreDir = UTIL.getDataTestDirOnTestFS("restore_" + snapshotName);
711      RestoreSnapshotHelper.copySnapshotForScanner(UTIL.getConfiguration(), fs, rootDir,
712        tempRestoreDir, snapshotName);
713      assertTrue(fs.exists(tempRestoreDir), "Restore directory should exist");
714
715      Job job = Job.getInstance(UTIL.getConfiguration());
716      job.setJarByClass(TestTableSnapshotInputFormat.class);
717      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
718        TestTableSnapshotInputFormat.class);
719      Scan scan = new Scan().withStartRow(getStartRow()).withStopRow(getEndRow());
720      Configuration conf = job.getConfiguration();
721      conf.set("hbase.TableSnapshotInputFormat.snapshot.name", snapshotName);
722      conf.set("hbase.TableSnapshotInputFormat.restore.dir", tempRestoreDir.toString());
723      conf.setInt("hbase.mapreduce.splits.per.region", 1);
724      job.setReducerClass(TestTableSnapshotReducer.class);
725      job.setNumReduceTasks(1);
726      job.setOutputFormatClass(NullOutputFormat.class);
727      TableMapReduceUtil.initTableMapperJob(snapshotName, // table name (snapshot name in this case)
728        scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job,
729        false, false, TableSnapshotInputFormat.class);
730      TableMapReduceUtil.resetCacheConfig(conf);
731      assertTrue(job.waitForCompletion(true));
732      assertTrue(job.isSuccessful());
733
734      // Now verify that job fails when restore directory is deleted
735      assertTrue(fs.delete(tempRestoreDir, true));
736      assertFalse(fs.exists(tempRestoreDir), "Restore directory should not exist after deletion");
737      Job failureJob = Job.getInstance(UTIL.getConfiguration());
738      failureJob.setJarByClass(TestTableSnapshotInputFormat.class);
739      TableMapReduceUtil.addDependencyJarsForClasses(failureJob.getConfiguration(),
740        TestTableSnapshotInputFormat.class);
741      Configuration failureConf = failureJob.getConfiguration();
742      // Configure job to use the deleted restore directory
743      failureConf.set("hbase.TableSnapshotInputFormat.snapshot.name", snapshotName);
744      failureConf.set("hbase.TableSnapshotInputFormat.restore.dir", tempRestoreDir.toString());
745      failureConf.setInt("hbase.mapreduce.splits.per.region", 1);
746      failureJob.setReducerClass(TestTableSnapshotReducer.class);
747      failureJob.setNumReduceTasks(1);
748      failureJob.setOutputFormatClass(NullOutputFormat.class);
749
750      TableMapReduceUtil.initTableMapperJob(snapshotName, scan, TestTableSnapshotMapper.class,
751        ImmutableBytesWritable.class, NullWritable.class, failureJob, false, false,
752        TableSnapshotInputFormat.class);
753      TableMapReduceUtil.resetCacheConfig(failureConf);
754
755      assertFalse(fs.exists(tempRestoreDir),
756        "Restore directory should not exist before job execution");
757      failureJob.waitForCompletion(true);
758
759      assertFalse(failureJob.isSuccessful(),
760        "Job should fail since the restored snapshot directory is deleted");
761
762    } finally {
763      try {
764        if (UTIL.getAdmin().tableExists(tableName)) {
765          UTIL.deleteTable(tableName);
766        }
767      } catch (Exception e) {
768        LOG.warn("Error deleting table", e);
769      }
770      try {
771        UTIL.getAdmin().deleteSnapshot(snapshotName);
772      } catch (Exception e) {
773        LOG.warn("Error deleting snapshot", e);
774      }
775    }
776  }
777}