001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertTrue; 021import static org.junit.Assert.fail; 022 023import java.io.File; 024import java.io.IOException; 025import java.util.Iterator; 026import java.util.Map; 027import java.util.NavigableMap; 028import org.apache.hadoop.fs.FileUtil; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.HBaseTestingUtility; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.Put; 037import org.apache.hadoop.hbase.client.Result; 038import org.apache.hadoop.hbase.client.ResultScanner; 039import org.apache.hadoop.hbase.client.Scan; 040import org.apache.hadoop.hbase.client.Table; 041import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 042import org.apache.hadoop.hbase.testclassification.LargeTests; 043import org.apache.hadoop.hbase.testclassification.MapReduceTests; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.apache.hadoop.mapreduce.Job; 046import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 047import org.junit.AfterClass; 048import org.junit.BeforeClass; 049import org.junit.ClassRule; 050import org.junit.Test; 051import org.junit.experimental.categories.Category; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055/** 056 * Test Map/Reduce job over HBase tables. The map/reduce process we're testing 057 * on our tables is simple - take every row in the table, reverse the value of 058 * a particular cell, and write it back to the table. 059 */ 060@Category({MapReduceTests.class, LargeTests.class}) 061public class TestMultithreadedTableMapper { 062 063 @ClassRule 064 public static final HBaseClassTestRule CLASS_RULE = 065 HBaseClassTestRule.forClass(TestMultithreadedTableMapper.class); 066 067 private static final Logger LOG = LoggerFactory.getLogger(TestMultithreadedTableMapper.class); 068 private static final HBaseTestingUtility UTIL = 069 new HBaseTestingUtility(); 070 static final TableName MULTI_REGION_TABLE_NAME = TableName.valueOf("mrtest"); 071 static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); 072 static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text"); 073 static final int NUMBER_OF_THREADS = 10; 074 075 @BeforeClass 076 public static void beforeClass() throws Exception { 077 // Up the handlers; this test needs more than usual. 078 UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); 079 UTIL.startMiniCluster(); 080 Table table = 081 UTIL.createMultiRegionTable(MULTI_REGION_TABLE_NAME, new byte[][] { INPUT_FAMILY, 082 OUTPUT_FAMILY }); 083 UTIL.loadTable(table, INPUT_FAMILY, false); 084 UTIL.waitUntilAllRegionsAssigned(MULTI_REGION_TABLE_NAME); 085 } 086 087 @AfterClass 088 public static void afterClass() throws Exception { 089 UTIL.shutdownMiniCluster(); 090 } 091 092 /** 093 * Pass the given key and processed record reduce 094 */ 095 public static class ProcessContentsMapper 096 extends TableMapper<ImmutableBytesWritable, Put> { 097 098 /** 099 * Pass the key, and reversed value to reduce 100 * 101 * @param key 102 * @param value 103 * @param context 104 * @throws IOException 105 */ 106 @Override 107 public void map(ImmutableBytesWritable key, Result value, 108 Context context) 109 throws IOException, InterruptedException { 110 if (value.size() != 1) { 111 throw new IOException("There should only be one input column"); 112 } 113 Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> 114 cf = value.getMap(); 115 if(!cf.containsKey(INPUT_FAMILY)) { 116 throw new IOException("Wrong input columns. Missing: '" + 117 Bytes.toString(INPUT_FAMILY) + "'."); 118 } 119 // Get the original value and reverse it 120 String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY)); 121 StringBuilder newValue = new StringBuilder(originalValue); 122 newValue.reverse(); 123 // Now set the value to be collected 124 Put outval = new Put(key.get()); 125 outval.addColumn(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString())); 126 context.write(key, outval); 127 } 128 } 129 130 /** 131 * Test multithreadedTableMappper map/reduce against a multi-region table 132 * @throws IOException 133 * @throws ClassNotFoundException 134 * @throws InterruptedException 135 */ 136 @Test 137 public void testMultithreadedTableMapper() 138 throws IOException, InterruptedException, ClassNotFoundException { 139 runTestOnTable(UTIL.getConnection().getTable(MULTI_REGION_TABLE_NAME)); 140 } 141 142 private void runTestOnTable(Table table) 143 throws IOException, InterruptedException, ClassNotFoundException { 144 Job job = null; 145 try { 146 LOG.info("Before map/reduce startup"); 147 job = new Job(table.getConfiguration(), "process column contents"); 148 job.setNumReduceTasks(1); 149 Scan scan = new Scan(); 150 scan.addFamily(INPUT_FAMILY); 151 TableMapReduceUtil.initTableMapperJob( 152 table.getName(), scan, 153 MultithreadedTableMapper.class, ImmutableBytesWritable.class, 154 Put.class, job); 155 MultithreadedTableMapper.setMapperClass(job, ProcessContentsMapper.class); 156 MultithreadedTableMapper.setNumberOfThreads(job, NUMBER_OF_THREADS); 157 TableMapReduceUtil.initTableReducerJob( 158 table.getName().getNameAsString(), 159 IdentityTableReducer.class, job); 160 FileOutputFormat.setOutputPath(job, new Path("test")); 161 LOG.info("Started " + table.getName()); 162 assertTrue(job.waitForCompletion(true)); 163 LOG.info("After map/reduce completion"); 164 // verify map-reduce results 165 verify(table.getName()); 166 } finally { 167 table.close(); 168 if (job != null) { 169 FileUtil.fullyDelete( 170 new File(job.getConfiguration().get("hadoop.tmp.dir"))); 171 } 172 } 173 } 174 175 private void verify(TableName tableName) throws IOException { 176 Table table = UTIL.getConnection().getTable(tableName); 177 boolean verified = false; 178 long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000); 179 int numRetries = UTIL.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); 180 for (int i = 0; i < numRetries; i++) { 181 try { 182 LOG.info("Verification attempt #" + i); 183 verifyAttempt(table); 184 verified = true; 185 break; 186 } catch (NullPointerException e) { 187 // If here, a cell was empty. Presume its because updates came in 188 // after the scanner had been opened. Wait a while and retry. 189 LOG.debug("Verification attempt failed: " + e.getMessage()); 190 } 191 try { 192 Thread.sleep(pause); 193 } catch (InterruptedException e) { 194 // continue 195 } 196 } 197 assertTrue(verified); 198 table.close(); 199 } 200 201 /** 202 * Looks at every value of the mapreduce output and verifies that indeed 203 * the values have been reversed. 204 * 205 * @param table Table to scan. 206 * @throws IOException 207 * @throws NullPointerException if we failed to find a cell value 208 */ 209 private void verifyAttempt(final Table table) 210 throws IOException, NullPointerException { 211 Scan scan = new Scan(); 212 scan.addFamily(INPUT_FAMILY); 213 scan.addFamily(OUTPUT_FAMILY); 214 ResultScanner scanner = table.getScanner(scan); 215 try { 216 Iterator<Result> itr = scanner.iterator(); 217 assertTrue(itr.hasNext()); 218 while(itr.hasNext()) { 219 Result r = itr.next(); 220 if (LOG.isDebugEnabled()) { 221 if (r.size() > 2 ) { 222 throw new IOException("Too many results, expected 2 got " + 223 r.size()); 224 } 225 } 226 byte[] firstValue = null; 227 byte[] secondValue = null; 228 int count = 0; 229 for(Cell kv : r.listCells()) { 230 if (count == 0) { 231 firstValue = CellUtil.cloneValue(kv); 232 }else if (count == 1) { 233 secondValue = CellUtil.cloneValue(kv); 234 }else if (count == 2) { 235 break; 236 } 237 count++; 238 } 239 String first = ""; 240 if (firstValue == null) { 241 throw new NullPointerException(Bytes.toString(r.getRow()) + 242 ": first value is null"); 243 } 244 first = Bytes.toString(firstValue); 245 String second = ""; 246 if (secondValue == null) { 247 throw new NullPointerException(Bytes.toString(r.getRow()) + 248 ": second value is null"); 249 } 250 byte[] secondReversed = new byte[secondValue.length]; 251 for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) { 252 secondReversed[i] = secondValue[j]; 253 } 254 second = Bytes.toString(secondReversed); 255 if (first.compareTo(second) != 0) { 256 if (LOG.isDebugEnabled()) { 257 LOG.debug("second key is not the reverse of first. row=" + 258 Bytes.toStringBinary(r.getRow()) + ", first value=" + first + 259 ", second value=" + second); 260 } 261 fail(); 262 } 263 } 264 } finally { 265 scanner.close(); 266 } 267 } 268 269} 270