001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapred; 019 020import static org.junit.jupiter.api.Assertions.assertTrue; 021 022import java.io.IOException; 023import java.util.Iterator; 024import java.util.List; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.client.Result; 028import org.apache.hadoop.hbase.client.Scan; 029import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 030import org.apache.hadoop.hbase.testclassification.LargeTests; 031import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; 032import org.apache.hadoop.io.NullWritable; 033import org.apache.hadoop.mapred.FileOutputFormat; 034import org.apache.hadoop.mapred.JobClient; 035import org.apache.hadoop.mapred.JobConf; 036import org.apache.hadoop.mapred.OutputCollector; 037import org.apache.hadoop.mapred.Reporter; 038import org.apache.hadoop.mapred.RunningJob; 039import org.junit.jupiter.api.Tag; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 044 045@Tag(VerySlowMapReduceTests.TAG) 046@Tag(LargeTests.TAG) 047public class TestMultiTableSnapshotInputFormat 048 extends org.apache.hadoop.hbase.mapreduce.TestMultiTableSnapshotInputFormat { 049 050 private static final Logger LOG = 051 LoggerFactory.getLogger(TestMultiTableSnapshotInputFormat.class); 052 053 @Override 054 protected void runJob(String jobName, Configuration c, List<Scan> scans) 055 throws IOException, InterruptedException, ClassNotFoundException { 056 JobConf job = new JobConf(TEST_UTIL.getConfiguration()); 057 058 job.setJobName(jobName); 059 job.setMapperClass(Mapper.class); 060 job.setReducerClass(Reducer.class); 061 062 TableMapReduceUtil.initMultiTableSnapshotMapperJob(getSnapshotScanMapping(scans), Mapper.class, 063 ImmutableBytesWritable.class, ImmutableBytesWritable.class, job, true, restoreDir); 064 065 TableMapReduceUtil.addDependencyJars(job); 066 067 job.setReducerClass(Reducer.class); 068 job.setNumReduceTasks(1); // one to get final "first" and "last" key 069 FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); 070 LOG.info("Started " + job.getJobName()); 071 072 RunningJob runningJob = JobClient.runJob(job); 073 runningJob.waitForCompletion(); 074 assertTrue(runningJob.isSuccessful()); 075 LOG.info("After map/reduce completion - job " + jobName); 076 } 077 078 public static class Mapper extends TestMultiTableSnapshotInputFormat.ScanMapper 079 implements TableMap<ImmutableBytesWritable, ImmutableBytesWritable> { 080 081 @Override 082 public void map(ImmutableBytesWritable key, Result value, 083 OutputCollector<ImmutableBytesWritable, ImmutableBytesWritable> outputCollector, 084 Reporter reporter) throws IOException { 085 makeAssertions(key, value); 086 outputCollector.collect(key, key); 087 } 088 089 /** 090 * Closes this stream and releases any system resources associated with it. If the stream is 091 * already closed then invoking this method has no effect. 092 * @throws IOException if an I/O error occurs 093 */ 094 @Override 095 public void close() throws IOException { 096 } 097 098 @Override 099 public void configure(JobConf jobConf) { 100 101 } 102 } 103 104 public static class Reducer extends TestMultiTableSnapshotInputFormat.ScanReducer 105 implements org.apache.hadoop.mapred.Reducer<ImmutableBytesWritable, ImmutableBytesWritable, 106 NullWritable, NullWritable> { 107 108 private JobConf jobConf; 109 110 @Override 111 public void reduce(ImmutableBytesWritable key, Iterator<ImmutableBytesWritable> values, 112 OutputCollector<NullWritable, NullWritable> outputCollector, Reporter reporter) 113 throws IOException { 114 makeAssertions(key, Lists.newArrayList(values)); 115 } 116 117 /** 118 * Closes this stream and releases any system resources associated with it. If the stream is 119 * already closed then invoking this method has no effect. 120 * @throws IOException if an I/O error occurs 121 */ 122 @Override 123 public void close() throws IOException { 124 super.cleanup(this.jobConf); 125 } 126 127 @Override 128 public void configure(JobConf jobConf) { 129 this.jobConf = jobConf; 130 } 131 } 132}