001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapred;
019
020import static org.junit.jupiter.api.Assertions.assertTrue;
021
022import java.io.IOException;
023import java.util.Iterator;
024import java.util.List;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.client.Result;
028import org.apache.hadoop.hbase.client.Scan;
029import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
030import org.apache.hadoop.hbase.testclassification.LargeTests;
031import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
032import org.apache.hadoop.io.NullWritable;
033import org.apache.hadoop.mapred.FileOutputFormat;
034import org.apache.hadoop.mapred.JobClient;
035import org.apache.hadoop.mapred.JobConf;
036import org.apache.hadoop.mapred.OutputCollector;
037import org.apache.hadoop.mapred.Reporter;
038import org.apache.hadoop.mapred.RunningJob;
039import org.junit.jupiter.api.Tag;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
044
045@Tag(VerySlowMapReduceTests.TAG)
046@Tag(LargeTests.TAG)
047public class TestMultiTableSnapshotInputFormat
048  extends org.apache.hadoop.hbase.mapreduce.TestMultiTableSnapshotInputFormat {
049
050  private static final Logger LOG =
051    LoggerFactory.getLogger(TestMultiTableSnapshotInputFormat.class);
052
053  @Override
054  protected void runJob(String jobName, Configuration c, List<Scan> scans)
055    throws IOException, InterruptedException, ClassNotFoundException {
056    JobConf job = new JobConf(TEST_UTIL.getConfiguration());
057
058    job.setJobName(jobName);
059    job.setMapperClass(Mapper.class);
060    job.setReducerClass(Reducer.class);
061
062    TableMapReduceUtil.initMultiTableSnapshotMapperJob(getSnapshotScanMapping(scans), Mapper.class,
063      ImmutableBytesWritable.class, ImmutableBytesWritable.class, job, true, restoreDir);
064
065    TableMapReduceUtil.addDependencyJars(job);
066
067    job.setReducerClass(Reducer.class);
068    job.setNumReduceTasks(1); // one to get final "first" and "last" key
069    FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
070    LOG.info("Started " + job.getJobName());
071
072    RunningJob runningJob = JobClient.runJob(job);
073    runningJob.waitForCompletion();
074    assertTrue(runningJob.isSuccessful());
075    LOG.info("After map/reduce completion - job " + jobName);
076  }
077
078  public static class Mapper extends TestMultiTableSnapshotInputFormat.ScanMapper
079    implements TableMap<ImmutableBytesWritable, ImmutableBytesWritable> {
080
081    @Override
082    public void map(ImmutableBytesWritable key, Result value,
083      OutputCollector<ImmutableBytesWritable, ImmutableBytesWritable> outputCollector,
084      Reporter reporter) throws IOException {
085      makeAssertions(key, value);
086      outputCollector.collect(key, key);
087    }
088
089    /**
090     * Closes this stream and releases any system resources associated with it. If the stream is
091     * already closed then invoking this method has no effect.
092     * @throws IOException if an I/O error occurs
093     */
094    @Override
095    public void close() throws IOException {
096    }
097
098    @Override
099    public void configure(JobConf jobConf) {
100
101    }
102  }
103
104  public static class Reducer extends TestMultiTableSnapshotInputFormat.ScanReducer
105    implements org.apache.hadoop.mapred.Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
106      NullWritable, NullWritable> {
107
108    private JobConf jobConf;
109
110    @Override
111    public void reduce(ImmutableBytesWritable key, Iterator<ImmutableBytesWritable> values,
112      OutputCollector<NullWritable, NullWritable> outputCollector, Reporter reporter)
113      throws IOException {
114      makeAssertions(key, Lists.newArrayList(values));
115    }
116
117    /**
118     * Closes this stream and releases any system resources associated with it. If the stream is
119     * already closed then invoking this method has no effect.
120     * @throws IOException if an I/O error occurs
121     */
122    @Override
123    public void close() throws IOException {
124      super.cleanup(this.jobConf);
125    }
126
127    @Override
128    public void configure(JobConf jobConf) {
129      this.jobConf = jobConf;
130    }
131  }
132}