001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertNotNull; 021import static org.junit.Assert.assertTrue; 022import static org.junit.Assert.fail; 023 024import java.io.File; 025import java.io.IOException; 026import java.util.Map; 027import java.util.NavigableMap; 028import org.apache.hadoop.fs.FileUtil; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.TableNotEnabledException; 033import org.apache.hadoop.hbase.TableNotFoundException; 034import org.apache.hadoop.hbase.client.Admin; 035import org.apache.hadoop.hbase.client.Put; 036import org.apache.hadoop.hbase.client.Result; 037import org.apache.hadoop.hbase.client.Scan; 038import org.apache.hadoop.hbase.client.Table; 039import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 040import org.apache.hadoop.hbase.testclassification.LargeTests; 041import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.apache.hadoop.mapreduce.Counter; 044import org.apache.hadoop.mapreduce.Counters; 045import org.apache.hadoop.mapreduce.Job; 046import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 047import org.junit.ClassRule; 048import org.junit.Test; 049import org.junit.experimental.categories.Category; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053/** 054 * Test Map/Reduce job over HBase tables. The map/reduce process we're testing 055 * on our tables is simple - take every row in the table, reverse the value of 056 * a particular cell, and write it back to the table. 057 */ 058 059@Category({VerySlowMapReduceTests.class, LargeTests.class}) 060public class TestTableMapReduce extends TestTableMapReduceBase { 061 062 @ClassRule 063 public static final HBaseClassTestRule CLASS_RULE = 064 HBaseClassTestRule.forClass(TestTableMapReduce.class); 065 066 private static final Logger LOG = LoggerFactory.getLogger(TestTableMapReduce.class); 067 068 @Override 069 protected Logger getLog() { return LOG; } 070 071 /** 072 * Pass the given key and processed record reduce 073 */ 074 static class ProcessContentsMapper extends TableMapper<ImmutableBytesWritable, Put> { 075 076 /** 077 * Pass the key, and reversed value to reduce 078 * 079 * @param key 080 * @param value 081 * @param context 082 * @throws IOException 083 */ 084 @Override 085 public void map(ImmutableBytesWritable key, Result value, 086 Context context) 087 throws IOException, InterruptedException { 088 if (value.size() != 1) { 089 throw new IOException("There should only be one input column"); 090 } 091 Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> 092 cf = value.getMap(); 093 if(!cf.containsKey(INPUT_FAMILY)) { 094 throw new IOException("Wrong input columns. Missing: '" + 095 Bytes.toString(INPUT_FAMILY) + "'."); 096 } 097 098 // Get the original value and reverse it 099 String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY)); 100 StringBuilder newValue = new StringBuilder(originalValue); 101 newValue.reverse(); 102 // Now set the value to be collected 103 Put outval = new Put(key.get()); 104 outval.addColumn(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString())); 105 context.write(key, outval); 106 } 107 } 108 109 @Override 110 protected void runTestOnTable(Table table) throws IOException { 111 Job job = null; 112 try { 113 LOG.info("Before map/reduce startup"); 114 job = new Job(table.getConfiguration(), "process column contents"); 115 job.setNumReduceTasks(1); 116 Scan scan = new Scan(); 117 scan.addFamily(INPUT_FAMILY); 118 TableMapReduceUtil.initTableMapperJob( 119 table.getName().getNameAsString(), scan, 120 ProcessContentsMapper.class, ImmutableBytesWritable.class, 121 Put.class, job); 122 TableMapReduceUtil.initTableReducerJob( 123 table.getName().getNameAsString(), 124 IdentityTableReducer.class, job); 125 FileOutputFormat.setOutputPath(job, new Path("test")); 126 LOG.info("Started " + table.getName().getNameAsString()); 127 assertTrue(job.waitForCompletion(true)); 128 LOG.info("After map/reduce completion"); 129 130 // verify map-reduce results 131 verify(table.getName()); 132 133 verifyJobCountersAreEmitted(job); 134 } catch (InterruptedException e) { 135 throw new IOException(e); 136 } catch (ClassNotFoundException e) { 137 throw new IOException(e); 138 } finally { 139 table.close(); 140 if (job != null) { 141 FileUtil.fullyDelete( 142 new File(job.getConfiguration().get("hadoop.tmp.dir"))); 143 } 144 } 145 } 146 147 /** 148 * Verify scan counters are emitted from the job 149 * @param job 150 * @throws IOException 151 */ 152 private void verifyJobCountersAreEmitted(Job job) throws IOException { 153 Counters counters = job.getCounters(); 154 Counter counter 155 = counters.findCounter(TableRecordReaderImpl.HBASE_COUNTER_GROUP_NAME, "RPC_CALLS"); 156 assertNotNull("Unable to find Job counter for HBase scan metrics, RPC_CALLS", counter); 157 assertTrue("Counter value for RPC_CALLS should be larger than 0", counter.getValue() > 0); 158 } 159 160 @Test(expected = TableNotEnabledException.class) 161 public void testWritingToDisabledTable() throws IOException { 162 163 try (Admin admin = UTIL.getConnection().getAdmin(); 164 Table table = UTIL.getConnection().getTable(TABLE_FOR_NEGATIVE_TESTS)) { 165 admin.disableTable(table.getName()); 166 runTestOnTable(table); 167 fail("Should not have reached here, should have thrown an exception"); 168 } 169 } 170 171 @Test(expected = TableNotFoundException.class) 172 public void testWritingToNonExistentTable() throws IOException { 173 174 try (Table table = UTIL.getConnection().getTable(TableName.valueOf("table-does-not-exist"))) { 175 runTestOnTable(table); 176 fail("Should not have reached here, should have thrown an exception"); 177 } 178 } 179}