001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import edu.umd.cs.findbugs.annotations.Nullable;
021import java.io.IOException;
022import java.util.Collection;
023import java.util.List;
024import java.util.Map;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.TableName;
027import org.apache.hadoop.hbase.client.Scan;
028import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
029import org.apache.hadoop.hbase.logging.Log4jUtils;
030import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
031import org.apache.hadoop.hbase.testclassification.LargeTests;
032import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
033import org.apache.hadoop.hbase.util.Bytes;
034import org.apache.hadoop.hbase.util.CommonFSUtils;
035import org.apache.hadoop.mapreduce.Job;
036import org.junit.jupiter.api.BeforeAll;
037import org.junit.jupiter.api.BeforeEach;
038import org.junit.jupiter.api.Tag;
039
040import org.apache.hbase.thirdparty.com.google.common.base.Function;
041import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
042import org.apache.hbase.thirdparty.com.google.common.collect.Multimaps;
043
044@Tag(VerySlowMapReduceTests.TAG)
045@Tag(LargeTests.TAG)
046public class TestMultiTableSnapshotInputFormat extends MultiTableInputFormatTestBase {
047
048  protected Path restoreDir;
049
050  @BeforeAll
051  public static void setUpSnapshots() throws Exception {
052    Log4jUtils.enableDebug(MultiTableSnapshotInputFormat.class);
053    Log4jUtils.enableDebug(MultiTableSnapshotInputFormatImpl.class);
054
055    // take a snapshot of every table we have.
056    for (String tableName : TABLES) {
057      SnapshotTestingUtils.createSnapshotAndValidate(TEST_UTIL.getAdmin(),
058        TableName.valueOf(tableName), ImmutableList.of(INPUT_FAMILY), null,
059        snapshotNameForTable(tableName), CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration()),
060        TEST_UTIL.getTestFileSystem(), true);
061    }
062  }
063
064  @BeforeEach
065  public void setUp() throws Exception {
066    this.restoreDir = TEST_UTIL.getRandomDir();
067  }
068
069  @Override
070  protected void initJob(List<Scan> scans, Job job) throws IOException {
071    TableMapReduceUtil.initMultiTableSnapshotMapperJob(getSnapshotScanMapping(scans),
072      ScanMapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job, true,
073      restoreDir);
074  }
075
076  protected Map<String, Collection<Scan>> getSnapshotScanMapping(final List<Scan> scans) {
077    return Multimaps.index(scans, new Function<Scan, String>() {
078      @Nullable
079      @Override
080      public String apply(Scan input) {
081        return snapshotNameForTable(
082          Bytes.toStringBinary(input.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME)));
083      }
084    }).asMap();
085  }
086
087  public static String snapshotNameForTable(String tableName) {
088    return tableName + "_snapshot";
089  }
090
091}