001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertTrue; 021import static org.junit.Assert.fail; 022 023import java.io.File; 024import java.io.IOException; 025import java.util.Iterator; 026import java.util.Map; 027import java.util.NavigableMap; 028import org.apache.hadoop.fs.FileUtil; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.HBaseTestingUtility; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.Put; 037import org.apache.hadoop.hbase.client.Result; 038import org.apache.hadoop.hbase.client.ResultScanner; 039import org.apache.hadoop.hbase.client.Scan; 040import org.apache.hadoop.hbase.client.Table; 041import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 042import org.apache.hadoop.hbase.testclassification.LargeTests; 043import org.apache.hadoop.hbase.testclassification.MapReduceTests; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.apache.hadoop.mapreduce.Job; 046import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 047import org.junit.AfterClass; 048import org.junit.BeforeClass; 049import org.junit.ClassRule; 050import org.junit.Test; 051import org.junit.experimental.categories.Category; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055/** 056 * Test Map/Reduce job over HBase tables. The map/reduce process we're testing on our tables is 057 * simple - take every row in the table, reverse the value of a particular cell, and write it back 058 * to the table. 059 */ 060@Category({ MapReduceTests.class, LargeTests.class }) 061public class TestMultithreadedTableMapper { 062 063 @ClassRule 064 public static final HBaseClassTestRule CLASS_RULE = 065 HBaseClassTestRule.forClass(TestMultithreadedTableMapper.class); 066 067 private static final Logger LOG = LoggerFactory.getLogger(TestMultithreadedTableMapper.class); 068 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 069 static final TableName MULTI_REGION_TABLE_NAME = TableName.valueOf("mrtest"); 070 static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); 071 static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text"); 072 static final int NUMBER_OF_THREADS = 10; 073 074 @BeforeClass 075 public static void beforeClass() throws Exception { 076 // Up the handlers; this test needs more than usual. 077 UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); 078 UTIL.startMiniCluster(); 079 Table table = UTIL.createMultiRegionTable(MULTI_REGION_TABLE_NAME, 080 new byte[][] { INPUT_FAMILY, OUTPUT_FAMILY }); 081 UTIL.loadTable(table, INPUT_FAMILY, false); 082 UTIL.waitUntilAllRegionsAssigned(MULTI_REGION_TABLE_NAME); 083 } 084 085 @AfterClass 086 public static void afterClass() throws Exception { 087 UTIL.shutdownMiniCluster(); 088 } 089 090 /** 091 * Pass the given key and processed record reduce 092 */ 093 public static class ProcessContentsMapper extends TableMapper<ImmutableBytesWritable, Put> { 094 095 /** 096 * Pass the key, and reversed value to reduce nnnn 097 */ 098 @Override 099 public void map(ImmutableBytesWritable key, Result value, Context context) 100 throws IOException, InterruptedException { 101 if (value.size() != 1) { 102 throw new IOException("There should only be one input column"); 103 } 104 Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> cf = value.getMap(); 105 if (!cf.containsKey(INPUT_FAMILY)) { 106 throw new IOException( 107 "Wrong input columns. Missing: '" + Bytes.toString(INPUT_FAMILY) + "'."); 108 } 109 // Get the original value and reverse it 110 String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY)); 111 StringBuilder newValue = new StringBuilder(originalValue); 112 newValue.reverse(); 113 // Now set the value to be collected 114 Put outval = new Put(key.get()); 115 outval.addColumn(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString())); 116 context.write(key, outval); 117 } 118 } 119 120 /** 121 * Test multithreadedTableMappper map/reduce against a multi-region table nnn 122 */ 123 @Test 124 public void testMultithreadedTableMapper() 125 throws IOException, InterruptedException, ClassNotFoundException { 126 runTestOnTable(UTIL.getConnection().getTable(MULTI_REGION_TABLE_NAME)); 127 } 128 129 private void runTestOnTable(Table table) 130 throws IOException, InterruptedException, ClassNotFoundException { 131 Job job = null; 132 try { 133 LOG.info("Before map/reduce startup"); 134 job = new Job(table.getConfiguration(), "process column contents"); 135 job.setNumReduceTasks(1); 136 Scan scan = new Scan(); 137 scan.addFamily(INPUT_FAMILY); 138 TableMapReduceUtil.initTableMapperJob(table.getName(), scan, MultithreadedTableMapper.class, 139 ImmutableBytesWritable.class, Put.class, job); 140 MultithreadedTableMapper.setMapperClass(job, ProcessContentsMapper.class); 141 MultithreadedTableMapper.setNumberOfThreads(job, NUMBER_OF_THREADS); 142 TableMapReduceUtil.initTableReducerJob(table.getName().getNameAsString(), 143 IdentityTableReducer.class, job); 144 FileOutputFormat.setOutputPath(job, new Path("test")); 145 LOG.info("Started " + table.getName()); 146 assertTrue(job.waitForCompletion(true)); 147 LOG.info("After map/reduce completion"); 148 // verify map-reduce results 149 verify(table.getName()); 150 } finally { 151 table.close(); 152 if (job != null) { 153 FileUtil.fullyDelete(new File(job.getConfiguration().get("hadoop.tmp.dir"))); 154 } 155 } 156 } 157 158 private void verify(TableName tableName) throws IOException { 159 Table table = UTIL.getConnection().getTable(tableName); 160 boolean verified = false; 161 long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000); 162 int numRetries = UTIL.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); 163 for (int i = 0; i < numRetries; i++) { 164 try { 165 LOG.info("Verification attempt #" + i); 166 verifyAttempt(table); 167 verified = true; 168 break; 169 } catch (NullPointerException e) { 170 // If here, a cell was empty. Presume its because updates came in 171 // after the scanner had been opened. Wait a while and retry. 172 LOG.debug("Verification attempt failed: " + e.getMessage()); 173 } 174 try { 175 Thread.sleep(pause); 176 } catch (InterruptedException e) { 177 // continue 178 } 179 } 180 assertTrue(verified); 181 table.close(); 182 } 183 184 /** 185 * Looks at every value of the mapreduce output and verifies that indeed the values have been 186 * reversed. 187 * @param table Table to scan. n * @throws NullPointerException if we failed to find a cell value 188 */ 189 private void verifyAttempt(final Table table) throws IOException, NullPointerException { 190 Scan scan = new Scan(); 191 scan.addFamily(INPUT_FAMILY); 192 scan.addFamily(OUTPUT_FAMILY); 193 ResultScanner scanner = table.getScanner(scan); 194 try { 195 Iterator<Result> itr = scanner.iterator(); 196 assertTrue(itr.hasNext()); 197 while (itr.hasNext()) { 198 Result r = itr.next(); 199 if (LOG.isDebugEnabled()) { 200 if (r.size() > 2) { 201 throw new IOException("Too many results, expected 2 got " + r.size()); 202 } 203 } 204 byte[] firstValue = null; 205 byte[] secondValue = null; 206 int count = 0; 207 for (Cell kv : r.listCells()) { 208 if (count == 0) { 209 firstValue = CellUtil.cloneValue(kv); 210 } else if (count == 1) { 211 secondValue = CellUtil.cloneValue(kv); 212 } else if (count == 2) { 213 break; 214 } 215 count++; 216 } 217 String first = ""; 218 if (firstValue == null) { 219 throw new NullPointerException(Bytes.toString(r.getRow()) + ": first value is null"); 220 } 221 first = Bytes.toString(firstValue); 222 String second = ""; 223 if (secondValue == null) { 224 throw new NullPointerException(Bytes.toString(r.getRow()) + ": second value is null"); 225 } 226 byte[] secondReversed = new byte[secondValue.length]; 227 for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) { 228 secondReversed[i] = secondValue[j]; 229 } 230 second = Bytes.toString(secondReversed); 231 if (first.compareTo(second) != 0) { 232 if (LOG.isDebugEnabled()) { 233 LOG.debug( 234 "second key is not the reverse of first. row=" + Bytes.toStringBinary(r.getRow()) 235 + ", first value=" + first + ", second value=" + second); 236 } 237 fail(); 238 } 239 } 240 } finally { 241 scanner.close(); 242 } 243 } 244 245}