001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapred; 019 020import static org.junit.Assert.assertTrue; 021 022import java.io.File; 023import java.io.IOException; 024import org.apache.hadoop.fs.FileUtil; 025import org.apache.hadoop.hbase.HBaseClassTestRule; 026import org.apache.hadoop.hbase.client.Put; 027import org.apache.hadoop.hbase.client.Result; 028import org.apache.hadoop.hbase.client.Table; 029import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 030import org.apache.hadoop.hbase.mapreduce.TestTableMapReduceBase; 031import org.apache.hadoop.hbase.testclassification.LargeTests; 032import org.apache.hadoop.hbase.testclassification.MapReduceTests; 033import org.apache.hadoop.hbase.util.Bytes; 034import org.apache.hadoop.mapred.JobClient; 035import org.apache.hadoop.mapred.JobConf; 036import org.apache.hadoop.mapred.MapReduceBase; 037import org.apache.hadoop.mapred.OutputCollector; 038import org.apache.hadoop.mapred.Reporter; 039import org.apache.hadoop.mapred.RunningJob; 040import org.junit.ClassRule; 041import org.junit.experimental.categories.Category; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045/** 046 * Test Map/Reduce job over HBase tables. The map/reduce process we're testing 047 * on our tables is simple - take every row in the table, reverse the value of 048 * a particular cell, and write it back to the table. 049 */ 050@Category({MapReduceTests.class, LargeTests.class}) 051@SuppressWarnings("deprecation") 052public class TestTableMapReduce extends TestTableMapReduceBase { 053 054 @ClassRule 055 public static final HBaseClassTestRule CLASS_RULE = 056 HBaseClassTestRule.forClass(TestTableMapReduce.class); 057 058 private static final Logger LOG = 059 LoggerFactory.getLogger(TestTableMapReduce.class.getName()); 060 061 protected Logger getLog() { return LOG; } 062 063 /** 064 * Pass the given key and processed record reduce 065 */ 066 static class ProcessContentsMapper extends MapReduceBase implements 067 TableMap<ImmutableBytesWritable, Put> { 068 069 /** 070 * Pass the key, and reversed value to reduce 071 */ 072 public void map(ImmutableBytesWritable key, Result value, 073 OutputCollector<ImmutableBytesWritable, Put> output, 074 Reporter reporter) 075 throws IOException { 076 output.collect(key, TestTableMapReduceBase.map(key, value)); 077 } 078 } 079 080 @Override 081 protected void runTestOnTable(Table table) throws IOException { 082 JobConf jobConf = null; 083 try { 084 LOG.info("Before map/reduce startup"); 085 jobConf = new JobConf(UTIL.getConfiguration(), TestTableMapReduce.class); 086 jobConf.setJobName("process column contents"); 087 jobConf.setNumReduceTasks(1); 088 TableMapReduceUtil.initTableMapJob(table.getName().getNameAsString(), 089 Bytes.toString(INPUT_FAMILY), ProcessContentsMapper.class, 090 ImmutableBytesWritable.class, Put.class, jobConf); 091 TableMapReduceUtil.initTableReduceJob(table.getName().getNameAsString(), 092 IdentityTableReduce.class, jobConf); 093 094 LOG.info("Started " + table.getName()); 095 RunningJob job = JobClient.runJob(jobConf); 096 assertTrue(job.isSuccessful()); 097 LOG.info("After map/reduce completion"); 098 099 // verify map-reduce results 100 verify(table.getName()); 101 } finally { 102 if (jobConf != null) { 103 FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir"))); 104 } 105 } 106 } 107} 108