001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapreduce; 020 021import static org.junit.Assert.assertTrue; 022import static org.junit.Assert.fail; 023 024import java.io.IOException; 025import java.util.Iterator; 026import java.util.Map; 027import java.util.NavigableMap; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.CellUtil; 031import org.apache.hadoop.hbase.HBaseTestingUtility; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.Put; 035import org.apache.hadoop.hbase.client.Result; 036import org.apache.hadoop.hbase.client.ResultScanner; 037import org.apache.hadoop.hbase.client.Scan; 038import org.apache.hadoop.hbase.client.Table; 039import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.junit.AfterClass; 042import org.junit.BeforeClass; 043import org.junit.Test; 044import org.slf4j.Logger; 045 046/** 047 * A base class for a test Map/Reduce job over HBase tables. The map/reduce process we're testing 048 * on our tables is simple - take every row in the table, reverse the value of a particular cell, 049 * and write it back to the table. Implements common components between mapred and mapreduce 050 * implementations. 051 */ 052public abstract class TestTableMapReduceBase { 053 protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 054 protected static final TableName MULTI_REGION_TABLE_NAME = TableName.valueOf("mrtest"); 055 protected static final TableName TABLE_FOR_NEGATIVE_TESTS = TableName.valueOf("testfailuretable"); 056 protected static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); 057 protected static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text"); 058 059 protected static final byte[][] columns = new byte[][] { 060 INPUT_FAMILY, 061 OUTPUT_FAMILY 062 }; 063 064 /** 065 * Retrieve my logger instance. 066 */ 067 protected abstract Logger getLog(); 068 069 /** 070 * Handles API-specifics for setting up and executing the job. 071 */ 072 protected abstract void runTestOnTable(Table table) throws IOException; 073 074 @BeforeClass 075 public static void beforeClass() throws Exception { 076 UTIL.startMiniCluster(); 077 Table table = 078 UTIL.createMultiRegionTable(MULTI_REGION_TABLE_NAME, new byte[][] { INPUT_FAMILY, 079 OUTPUT_FAMILY }); 080 UTIL.loadTable(table, INPUT_FAMILY, false); 081 UTIL.createTable(TABLE_FOR_NEGATIVE_TESTS, new byte[][] { INPUT_FAMILY, OUTPUT_FAMILY }); 082 } 083 084 @AfterClass 085 public static void afterClass() throws Exception { 086 UTIL.deleteTable(TABLE_FOR_NEGATIVE_TESTS); 087 UTIL.shutdownMiniCluster(); 088 } 089 090 /** 091 * Test a map/reduce against a multi-region table 092 * @throws IOException 093 */ 094 @Test 095 public void testMultiRegionTable() throws IOException { 096 runTestOnTable(UTIL.getConnection().getTable(MULTI_REGION_TABLE_NAME)); 097 } 098 099 @Test 100 public void testCombiner() throws IOException { 101 Configuration conf = new Configuration(UTIL.getConfiguration()); 102 // force use of combiner for testing purposes 103 conf.setInt("mapreduce.map.combine.minspills", 1); 104 runTestOnTable(UTIL.getConnection().getTable(MULTI_REGION_TABLE_NAME)); 105 } 106 107 /** 108 * Implements mapper logic for use across APIs. 109 */ 110 protected static Put map(ImmutableBytesWritable key, Result value) throws IOException { 111 if (value.size() != 1) { 112 throw new IOException("There should only be one input column"); 113 } 114 Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> 115 cf = value.getMap(); 116 if(!cf.containsKey(INPUT_FAMILY)) { 117 throw new IOException("Wrong input columns. Missing: '" + 118 Bytes.toString(INPUT_FAMILY) + "'."); 119 } 120 121 // Get the original value and reverse it 122 123 String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY)); 124 StringBuilder newValue = new StringBuilder(originalValue); 125 newValue.reverse(); 126 127 // Now set the value to be collected 128 129 Put outval = new Put(key.get()); 130 outval.addColumn(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString())); 131 return outval; 132 } 133 134 protected void verify(TableName tableName) throws IOException { 135 Table table = UTIL.getConnection().getTable(tableName); 136 boolean verified = false; 137 long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000); 138 int numRetries = UTIL.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); 139 for (int i = 0; i < numRetries; i++) { 140 try { 141 getLog().info("Verification attempt #" + i); 142 verifyAttempt(table); 143 verified = true; 144 break; 145 } catch (NullPointerException e) { 146 // If here, a cell was empty. Presume its because updates came in 147 // after the scanner had been opened. Wait a while and retry. 148 getLog().debug("Verification attempt failed: " + e.getMessage()); 149 } 150 try { 151 Thread.sleep(pause); 152 } catch (InterruptedException e) { 153 // continue 154 } 155 } 156 assertTrue(verified); 157 } 158 159 /** 160 * Looks at every value of the mapreduce output and verifies that indeed 161 * the values have been reversed. 162 * @param table Table to scan. 163 * @throws IOException 164 * @throws NullPointerException if we failed to find a cell value 165 */ 166 private void verifyAttempt(final Table table) throws IOException, NullPointerException { 167 Scan scan = new Scan(); 168 TableInputFormat.addColumns(scan, columns); 169 ResultScanner scanner = table.getScanner(scan); 170 try { 171 Iterator<Result> itr = scanner.iterator(); 172 assertTrue(itr.hasNext()); 173 while(itr.hasNext()) { 174 Result r = itr.next(); 175 if (getLog().isDebugEnabled()) { 176 if (r.size() > 2 ) { 177 throw new IOException("Too many results, expected 2 got " + 178 r.size()); 179 } 180 } 181 byte[] firstValue = null; 182 byte[] secondValue = null; 183 int count = 0; 184 for(Cell kv : r.listCells()) { 185 if (count == 0) { 186 firstValue = CellUtil.cloneValue(kv); 187 } 188 if (count == 1) { 189 secondValue = CellUtil.cloneValue(kv); 190 } 191 count++; 192 if (count == 2) { 193 break; 194 } 195 } 196 197 198 if (firstValue == null) { 199 throw new NullPointerException(Bytes.toString(r.getRow()) + 200 ": first value is null"); 201 } 202 String first = Bytes.toString(firstValue); 203 204 if (secondValue == null) { 205 throw new NullPointerException(Bytes.toString(r.getRow()) + 206 ": second value is null"); 207 } 208 byte[] secondReversed = new byte[secondValue.length]; 209 for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) { 210 secondReversed[i] = secondValue[j]; 211 } 212 String second = Bytes.toString(secondReversed); 213 214 if (first.compareTo(second) != 0) { 215 if (getLog().isDebugEnabled()) { 216 getLog().debug("second key is not the reverse of first. row=" + 217 Bytes.toStringBinary(r.getRow()) + ", first value=" + first + 218 ", second value=" + second); 219 } 220 fail(); 221 } 222 } 223 } finally { 224 scanner.close(); 225 } 226 } 227}