001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.jupiter.api.Assertions.assertNotNull; 021import static org.junit.jupiter.api.Assertions.assertThrows; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023 024import java.io.File; 025import java.io.IOException; 026import java.util.Map; 027import java.util.NavigableMap; 028import org.apache.hadoop.fs.FileUtil; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.TableNotEnabledException; 032import org.apache.hadoop.hbase.TableNotFoundException; 033import org.apache.hadoop.hbase.client.Admin; 034import org.apache.hadoop.hbase.client.Put; 035import org.apache.hadoop.hbase.client.Result; 036import org.apache.hadoop.hbase.client.Scan; 037import org.apache.hadoop.hbase.client.Table; 038import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 039import org.apache.hadoop.hbase.testclassification.LargeTests; 040import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.apache.hadoop.mapreduce.Counter; 043import org.apache.hadoop.mapreduce.Counters; 044import org.apache.hadoop.mapreduce.Job; 045import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 046import org.junit.jupiter.api.Tag; 047import org.junit.jupiter.api.Test; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051/** 052 * Test Map/Reduce job over HBase tables. The map/reduce process we're testing on our tables is 053 * simple - take every row in the table, reverse the value of a particular cell, and write it back 054 * to the table. 055 */ 056 057@Tag(VerySlowMapReduceTests.TAG) 058@Tag(LargeTests.TAG) 059public class TestTableMapReduce extends TestTableMapReduceBase { 060 061 private static final Logger LOG = LoggerFactory.getLogger(TestTableMapReduce.class); 062 063 @Override 064 protected Logger getLog() { 065 return LOG; 066 } 067 068 /** 069 * Pass the given key and processed record reduce 070 */ 071 static class ProcessContentsMapper extends TableMapper<ImmutableBytesWritable, Put> { 072 073 /** 074 * Pass the key, and reversed value to reduce 075 */ 076 @Override 077 public void map(ImmutableBytesWritable key, Result value, Context context) 078 throws IOException, InterruptedException { 079 if (value.size() != 1) { 080 throw new IOException("There should only be one input column"); 081 } 082 Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> cf = value.getMap(); 083 if (!cf.containsKey(INPUT_FAMILY)) { 084 throw new IOException( 085 "Wrong input columns. Missing: '" + Bytes.toString(INPUT_FAMILY) + "'."); 086 } 087 088 // Get the original value and reverse it 089 String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY)); 090 StringBuilder newValue = new StringBuilder(originalValue); 091 newValue.reverse(); 092 // Now set the value to be collected 093 Put outval = new Put(key.get()); 094 outval.addColumn(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString())); 095 context.write(key, outval); 096 } 097 } 098 099 @Override 100 protected void runTestOnTable(Table table) throws IOException { 101 Job job = null; 102 try { 103 LOG.info("Before map/reduce startup"); 104 job = new Job(table.getConfiguration(), "process column contents"); 105 job.setNumReduceTasks(1); 106 Scan scan = new Scan(); 107 scan.addFamily(INPUT_FAMILY); 108 TableMapReduceUtil.initTableMapperJob(table.getName().getNameAsString(), scan, 109 ProcessContentsMapper.class, ImmutableBytesWritable.class, Put.class, job); 110 TableMapReduceUtil.initTableReducerJob(table.getName().getNameAsString(), 111 IdentityTableReducer.class, job); 112 FileOutputFormat.setOutputPath(job, new Path("test")); 113 LOG.info("Started " + table.getName().getNameAsString()); 114 assertTrue(job.waitForCompletion(true)); 115 LOG.info("After map/reduce completion"); 116 117 // verify map-reduce results 118 verify(table.getName()); 119 120 verifyJobCountersAreEmitted(job); 121 } catch (InterruptedException e) { 122 throw new IOException(e); 123 } catch (ClassNotFoundException e) { 124 throw new IOException(e); 125 } finally { 126 table.close(); 127 if (job != null) { 128 FileUtil.fullyDelete(new File(job.getConfiguration().get("hadoop.tmp.dir"))); 129 } 130 } 131 } 132 133 /** 134 * Verify scan counters are emitted from the job 135 */ 136 private void verifyJobCountersAreEmitted(Job job) throws IOException { 137 Counters counters = job.getCounters(); 138 Counter counter = 139 counters.findCounter(TableRecordReaderImpl.HBASE_COUNTER_GROUP_NAME, "RPC_CALLS"); 140 assertNotNull(counter, "Unable to find Job counter for HBase scan metrics, RPC_CALLS"); 141 assertTrue(counter.getValue() > 0, "Counter value for RPC_CALLS should be larger than 0"); 142 } 143 144 @Test 145 public void testWritingToDisabledTable() throws IOException { 146 assertThrows(TableNotEnabledException.class, () -> { 147 try (Admin admin = UTIL.getConnection().getAdmin(); 148 Table table = UTIL.getConnection().getTable(TABLE_FOR_NEGATIVE_TESTS)) { 149 admin.disableTable(table.getName()); 150 runTestOnTable(table); 151 } 152 }); 153 } 154 155 @Test 156 public void testWritingToNonExistentTable() throws IOException { 157 assertThrows(TableNotFoundException.class, () -> { 158 try (Table table = UTIL.getConnection().getTable(TableName.valueOf("table-does-not-exist"))) { 159 runTestOnTable(table); 160 } 161 }); 162 } 163}