001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertTrue; 021import static org.junit.Assert.fail; 022 023import java.io.IOException; 024import java.util.Iterator; 025import java.util.Map; 026import java.util.NavigableMap; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.CellUtil; 030import org.apache.hadoop.hbase.HBaseTestingUtility; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.client.Put; 034import org.apache.hadoop.hbase.client.Result; 035import org.apache.hadoop.hbase.client.ResultScanner; 036import org.apache.hadoop.hbase.client.Scan; 037import org.apache.hadoop.hbase.client.Table; 038import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.junit.AfterClass; 041import org.junit.BeforeClass; 042import org.junit.Test; 043import org.slf4j.Logger; 044 045/** 046 * A base class for a test Map/Reduce job over HBase tables. The map/reduce process we're testing on 047 * our tables is simple - take every row in the table, reverse the value of a particular cell, and 048 * write it back to the table. Implements common components between mapred and mapreduce 049 * implementations. 050 */ 051public abstract class TestTableMapReduceBase { 052 protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 053 protected static final TableName MULTI_REGION_TABLE_NAME = TableName.valueOf("mrtest"); 054 protected static final TableName TABLE_FOR_NEGATIVE_TESTS = TableName.valueOf("testfailuretable"); 055 protected static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); 056 protected static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text"); 057 058 protected static final byte[][] columns = new byte[][] { INPUT_FAMILY, OUTPUT_FAMILY }; 059 060 /** 061 * Retrieve my logger instance. 062 */ 063 protected abstract Logger getLog(); 064 065 /** 066 * Handles API-specifics for setting up and executing the job. 067 */ 068 protected abstract void runTestOnTable(Table table) throws IOException; 069 070 @BeforeClass 071 public static void beforeClass() throws Exception { 072 UTIL.startMiniCluster(); 073 Table table = UTIL.createMultiRegionTable(MULTI_REGION_TABLE_NAME, 074 new byte[][] { INPUT_FAMILY, OUTPUT_FAMILY }); 075 UTIL.loadTable(table, INPUT_FAMILY, false); 076 UTIL.createTable(TABLE_FOR_NEGATIVE_TESTS, new byte[][] { INPUT_FAMILY, OUTPUT_FAMILY }); 077 } 078 079 @AfterClass 080 public static void afterClass() throws Exception { 081 UTIL.deleteTable(TABLE_FOR_NEGATIVE_TESTS); 082 UTIL.shutdownMiniCluster(); 083 } 084 085 /** 086 * Test a map/reduce against a multi-region table n 087 */ 088 @Test 089 public void testMultiRegionTable() throws IOException { 090 runTestOnTable(UTIL.getConnection().getTable(MULTI_REGION_TABLE_NAME)); 091 } 092 093 @Test 094 public void testCombiner() throws IOException { 095 Configuration conf = new Configuration(UTIL.getConfiguration()); 096 // force use of combiner for testing purposes 097 conf.setInt("mapreduce.map.combine.minspills", 1); 098 runTestOnTable(UTIL.getConnection().getTable(MULTI_REGION_TABLE_NAME)); 099 } 100 101 /** 102 * Implements mapper logic for use across APIs. 103 */ 104 protected static Put map(ImmutableBytesWritable key, Result value) throws IOException { 105 if (value.size() != 1) { 106 throw new IOException("There should only be one input column"); 107 } 108 Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> cf = value.getMap(); 109 if (!cf.containsKey(INPUT_FAMILY)) { 110 throw new IOException( 111 "Wrong input columns. Missing: '" + Bytes.toString(INPUT_FAMILY) + "'."); 112 } 113 114 // Get the original value and reverse it 115 116 String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY)); 117 StringBuilder newValue = new StringBuilder(originalValue); 118 newValue.reverse(); 119 120 // Now set the value to be collected 121 122 Put outval = new Put(key.get()); 123 outval.addColumn(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString())); 124 return outval; 125 } 126 127 protected void verify(TableName tableName) throws IOException { 128 Table table = UTIL.getConnection().getTable(tableName); 129 boolean verified = false; 130 long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000); 131 int numRetries = UTIL.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); 132 for (int i = 0; i < numRetries; i++) { 133 try { 134 getLog().info("Verification attempt #" + i); 135 verifyAttempt(table); 136 verified = true; 137 break; 138 } catch (NullPointerException e) { 139 // If here, a cell was empty. Presume its because updates came in 140 // after the scanner had been opened. Wait a while and retry. 141 getLog().debug("Verification attempt failed: " + e.getMessage()); 142 } 143 try { 144 Thread.sleep(pause); 145 } catch (InterruptedException e) { 146 // continue 147 } 148 } 149 assertTrue(verified); 150 } 151 152 /** 153 * Looks at every value of the mapreduce output and verifies that indeed the values have been 154 * reversed. 155 * @param table Table to scan. n * @throws NullPointerException if we failed to find a cell value 156 */ 157 private void verifyAttempt(final Table table) throws IOException, NullPointerException { 158 Scan scan = new Scan(); 159 TableInputFormat.addColumns(scan, columns); 160 ResultScanner scanner = table.getScanner(scan); 161 try { 162 Iterator<Result> itr = scanner.iterator(); 163 assertTrue(itr.hasNext()); 164 while (itr.hasNext()) { 165 Result r = itr.next(); 166 if (getLog().isDebugEnabled()) { 167 if (r.size() > 2) { 168 throw new IOException("Too many results, expected 2 got " + r.size()); 169 } 170 } 171 byte[] firstValue = null; 172 byte[] secondValue = null; 173 int count = 0; 174 for (Cell kv : r.listCells()) { 175 if (count == 0) { 176 firstValue = CellUtil.cloneValue(kv); 177 } 178 if (count == 1) { 179 secondValue = CellUtil.cloneValue(kv); 180 } 181 count++; 182 if (count == 2) { 183 break; 184 } 185 } 186 187 if (firstValue == null) { 188 throw new NullPointerException(Bytes.toString(r.getRow()) + ": first value is null"); 189 } 190 String first = Bytes.toString(firstValue); 191 192 if (secondValue == null) { 193 throw new NullPointerException(Bytes.toString(r.getRow()) + ": second value is null"); 194 } 195 byte[] secondReversed = new byte[secondValue.length]; 196 for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) { 197 secondReversed[i] = secondValue[j]; 198 } 199 String second = Bytes.toString(secondReversed); 200 201 if (first.compareTo(second) != 0) { 202 if (getLog().isDebugEnabled()) { 203 getLog().debug( 204 "second key is not the reverse of first. row=" + Bytes.toStringBinary(r.getRow()) 205 + ", first value=" + first + ", second value=" + second); 206 } 207 fail(); 208 } 209 } 210 } finally { 211 scanner.close(); 212 } 213 } 214}