001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapred; 019 020import static org.junit.Assert.assertTrue; 021 022import java.io.IOException; 023import java.util.Iterator; 024import java.util.List; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.HBaseClassTestRule; 028import org.apache.hadoop.hbase.client.Result; 029import org.apache.hadoop.hbase.client.Scan; 030import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 031import org.apache.hadoop.hbase.testclassification.LargeTests; 032import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; 033import org.apache.hadoop.io.NullWritable; 034import org.apache.hadoop.mapred.FileOutputFormat; 035import org.apache.hadoop.mapred.JobClient; 036import org.apache.hadoop.mapred.JobConf; 037import org.apache.hadoop.mapred.OutputCollector; 038import org.apache.hadoop.mapred.Reporter; 039import org.apache.hadoop.mapred.RunningJob; 040import org.junit.ClassRule; 041import org.junit.experimental.categories.Category; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 046 047@Category({ VerySlowMapReduceTests.class, LargeTests.class }) 048public class TestMultiTableSnapshotInputFormat 049 extends org.apache.hadoop.hbase.mapreduce.TestMultiTableSnapshotInputFormat { 050 051 @ClassRule 052 public static final HBaseClassTestRule CLASS_RULE = 053 HBaseClassTestRule.forClass(TestMultiTableSnapshotInputFormat.class); 054 055 private static final Logger LOG = 056 LoggerFactory.getLogger(TestMultiTableSnapshotInputFormat.class); 057 058 @Override 059 protected void runJob(String jobName, Configuration c, List<Scan> scans) 060 throws IOException, InterruptedException, ClassNotFoundException { 061 JobConf job = new JobConf(TEST_UTIL.getConfiguration()); 062 063 job.setJobName(jobName); 064 job.setMapperClass(Mapper.class); 065 job.setReducerClass(Reducer.class); 066 067 TableMapReduceUtil.initMultiTableSnapshotMapperJob(getSnapshotScanMapping(scans), Mapper.class, 068 ImmutableBytesWritable.class, ImmutableBytesWritable.class, job, true, restoreDir); 069 070 TableMapReduceUtil.addDependencyJars(job); 071 072 job.setReducerClass(Reducer.class); 073 job.setNumReduceTasks(1); // one to get final "first" and "last" key 074 FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); 075 LOG.info("Started " + job.getJobName()); 076 077 RunningJob runningJob = JobClient.runJob(job); 078 runningJob.waitForCompletion(); 079 assertTrue(runningJob.isSuccessful()); 080 LOG.info("After map/reduce completion - job " + jobName); 081 } 082 083 public static class Mapper extends TestMultiTableSnapshotInputFormat.ScanMapper 084 implements TableMap<ImmutableBytesWritable, ImmutableBytesWritable> { 085 086 @Override 087 public void map(ImmutableBytesWritable key, Result value, 088 OutputCollector<ImmutableBytesWritable, ImmutableBytesWritable> outputCollector, 089 Reporter reporter) throws IOException { 090 makeAssertions(key, value); 091 outputCollector.collect(key, key); 092 } 093 094 /** 095 * Closes this stream and releases any system resources associated with it. If the stream is 096 * already closed then invoking this method has no effect. 097 * @throws IOException if an I/O error occurs 098 */ 099 @Override 100 public void close() throws IOException { 101 } 102 103 @Override 104 public void configure(JobConf jobConf) { 105 106 } 107 } 108 109 public static class Reducer extends TestMultiTableSnapshotInputFormat.ScanReducer 110 implements org.apache.hadoop.mapred.Reducer<ImmutableBytesWritable, ImmutableBytesWritable, 111 NullWritable, NullWritable> { 112 113 private JobConf jobConf; 114 115 @Override 116 public void reduce(ImmutableBytesWritable key, Iterator<ImmutableBytesWritable> values, 117 OutputCollector<NullWritable, NullWritable> outputCollector, Reporter reporter) 118 throws IOException { 119 makeAssertions(key, Lists.newArrayList(values)); 120 } 121 122 /** 123 * Closes this stream and releases any system resources associated with it. If the stream is 124 * already closed then invoking this method has no effect. 125 * @throws IOException if an I/O error occurs 126 */ 127 @Override 128 public void close() throws IOException { 129 super.cleanup(this.jobConf); 130 } 131 132 @Override 133 public void configure(JobConf jobConf) { 134 this.jobConf = jobConf; 135 } 136 } 137}