001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapred; 019 020import static org.junit.jupiter.api.Assertions.assertTrue; 021 022import java.io.File; 023import java.io.IOException; 024import org.apache.hadoop.fs.FileUtil; 025import org.apache.hadoop.hbase.client.Put; 026import org.apache.hadoop.hbase.client.Result; 027import org.apache.hadoop.hbase.client.Table; 028import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 029import org.apache.hadoop.hbase.mapreduce.TestTableMapReduceBase; 030import org.apache.hadoop.hbase.testclassification.LargeTests; 031import org.apache.hadoop.hbase.testclassification.MapReduceTests; 032import org.apache.hadoop.hbase.util.Bytes; 033import org.apache.hadoop.mapred.JobClient; 034import org.apache.hadoop.mapred.JobConf; 035import org.apache.hadoop.mapred.MapReduceBase; 036import org.apache.hadoop.mapred.OutputCollector; 037import org.apache.hadoop.mapred.Reporter; 038import org.apache.hadoop.mapred.RunningJob; 039import org.junit.jupiter.api.Tag; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043/** 044 * Test Map/Reduce job over HBase tables. The map/reduce process we're testing on our tables is 045 * simple - take every row in the table, reverse the value of a particular cell, and write it back 046 * to the table. 047 */ 048@Tag(MapReduceTests.TAG) 049@Tag(LargeTests.TAG) 050@SuppressWarnings("deprecation") 051public class TestTableMapReduce extends TestTableMapReduceBase { 052 053 private static final Logger LOG = LoggerFactory.getLogger(TestTableMapReduce.class.getName()); 054 055 protected Logger getLog() { 056 return LOG; 057 } 058 059 /** 060 * Pass the given key and processed record reduce 061 */ 062 static class ProcessContentsMapper extends MapReduceBase 063 implements TableMap<ImmutableBytesWritable, Put> { 064 065 /** 066 * Pass the key, and reversed value to reduce 067 */ 068 public void map(ImmutableBytesWritable key, Result value, 069 OutputCollector<ImmutableBytesWritable, Put> output, Reporter reporter) throws IOException { 070 output.collect(key, TestTableMapReduceBase.map(key, value)); 071 } 072 } 073 074 @Override 075 protected void runTestOnTable(Table table) throws IOException { 076 JobConf jobConf = null; 077 try { 078 LOG.info("Before map/reduce startup"); 079 jobConf = new JobConf(UTIL.getConfiguration(), TestTableMapReduce.class); 080 jobConf.setJobName("process column contents"); 081 jobConf.setNumReduceTasks(1); 082 TableMapReduceUtil.initTableMapJob(table.getName().getNameAsString(), 083 Bytes.toString(INPUT_FAMILY), ProcessContentsMapper.class, ImmutableBytesWritable.class, 084 Put.class, jobConf); 085 TableMapReduceUtil.initTableReduceJob(table.getName().getNameAsString(), 086 IdentityTableReduce.class, jobConf); 087 088 LOG.info("Started " + table.getName()); 089 RunningJob job = JobClient.runJob(jobConf); 090 assertTrue(job.isSuccessful()); 091 LOG.info("After map/reduce completion"); 092 093 // verify map-reduce results 094 verify(table.getName()); 095 } finally { 096 if (jobConf != null) { 097 FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir"))); 098 } 099 } 100 } 101}