001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.jupiter.api.Assertions.assertTrue; 021import static org.junit.jupiter.api.Assertions.fail; 022 023import java.io.File; 024import java.io.IOException; 025import java.util.Iterator; 026import java.util.Map; 027import java.util.NavigableMap; 028import org.apache.hadoop.fs.FileUtil; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.HBaseTestingUtil; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.Put; 036import org.apache.hadoop.hbase.client.Result; 037import org.apache.hadoop.hbase.client.ResultScanner; 038import org.apache.hadoop.hbase.client.Scan; 039import org.apache.hadoop.hbase.client.Table; 040import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 041import org.apache.hadoop.hbase.testclassification.LargeTests; 042import org.apache.hadoop.hbase.testclassification.MapReduceTests; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.apache.hadoop.mapreduce.Job; 045import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 046import org.junit.jupiter.api.AfterAll; 047import org.junit.jupiter.api.BeforeAll; 048import org.junit.jupiter.api.Tag; 049import org.junit.jupiter.api.Test; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053/** 054 * Test Map/Reduce job over HBase tables. The map/reduce process we're testing on our tables is 055 * simple - take every row in the table, reverse the value of a particular cell, and write it back 056 * to the table. 057 */ 058@Tag(MapReduceTests.TAG) 059@Tag(LargeTests.TAG) 060public class TestMultithreadedTableMapper { 061 062 private static final Logger LOG = LoggerFactory.getLogger(TestMultithreadedTableMapper.class); 063 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 064 static final TableName MULTI_REGION_TABLE_NAME = TableName.valueOf("mrtest"); 065 static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); 066 static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text"); 067 static final int NUMBER_OF_THREADS = 10; 068 069 @BeforeAll 070 public static void beforeClass() throws Exception { 071 // Up the handlers; this test needs more than usual. 072 UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); 073 UTIL.startMiniCluster(); 074 Table table = UTIL.createMultiRegionTable(MULTI_REGION_TABLE_NAME, 075 new byte[][] { INPUT_FAMILY, OUTPUT_FAMILY }); 076 UTIL.loadTable(table, INPUT_FAMILY, false); 077 UTIL.waitUntilAllRegionsAssigned(MULTI_REGION_TABLE_NAME); 078 } 079 080 @AfterAll 081 public static void afterClass() throws Exception { 082 UTIL.shutdownMiniCluster(); 083 } 084 085 /** 086 * Pass the given key and processed record reduce 087 */ 088 public static class ProcessContentsMapper extends TableMapper<ImmutableBytesWritable, Put> { 089 090 /** 091 * Pass the key, and reversed value to reduce 092 */ 093 @Override 094 public void map(ImmutableBytesWritable key, Result value, Context context) 095 throws IOException, InterruptedException { 096 if (value.size() != 1) { 097 throw new IOException("There should only be one input column"); 098 } 099 Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> cf = value.getMap(); 100 if (!cf.containsKey(INPUT_FAMILY)) { 101 throw new IOException( 102 "Wrong input columns. Missing: '" + Bytes.toString(INPUT_FAMILY) + "'."); 103 } 104 // Get the original value and reverse it 105 String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY)); 106 StringBuilder newValue = new StringBuilder(originalValue); 107 newValue.reverse(); 108 // Now set the value to be collected 109 Put outval = new Put(key.get()); 110 outval.addColumn(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString())); 111 context.write(key, outval); 112 } 113 } 114 115 /** 116 * Test multithreadedTableMappper map/reduce against a multi-region table 117 */ 118 @Test 119 public void testMultithreadedTableMapper() 120 throws IOException, InterruptedException, ClassNotFoundException { 121 runTestOnTable(UTIL.getConnection().getTable(MULTI_REGION_TABLE_NAME)); 122 } 123 124 private void runTestOnTable(Table table) 125 throws IOException, InterruptedException, ClassNotFoundException { 126 Job job = null; 127 try { 128 LOG.info("Before map/reduce startup"); 129 job = new Job(table.getConfiguration(), "process column contents"); 130 job.setNumReduceTasks(1); 131 Scan scan = new Scan(); 132 scan.addFamily(INPUT_FAMILY); 133 TableMapReduceUtil.initTableMapperJob(table.getName(), scan, MultithreadedTableMapper.class, 134 ImmutableBytesWritable.class, Put.class, job); 135 MultithreadedTableMapper.setMapperClass(job, ProcessContentsMapper.class); 136 MultithreadedTableMapper.setNumberOfThreads(job, NUMBER_OF_THREADS); 137 TableMapReduceUtil.initTableReducerJob(table.getName().getNameAsString(), 138 IdentityTableReducer.class, job); 139 FileOutputFormat.setOutputPath(job, new Path("test")); 140 LOG.info("Started " + table.getName()); 141 assertTrue(job.waitForCompletion(true)); 142 LOG.info("After map/reduce completion"); 143 // verify map-reduce results 144 verify(table.getName()); 145 } finally { 146 table.close(); 147 if (job != null) { 148 FileUtil.fullyDelete(new File(job.getConfiguration().get("hadoop.tmp.dir"))); 149 } 150 } 151 } 152 153 private void verify(TableName tableName) throws IOException { 154 Table table = UTIL.getConnection().getTable(tableName); 155 boolean verified = false; 156 long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000); 157 int numRetries = UTIL.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); 158 for (int i = 0; i < numRetries; i++) { 159 try { 160 LOG.info("Verification attempt #" + i); 161 verifyAttempt(table); 162 verified = true; 163 break; 164 } catch (NullPointerException e) { 165 // If here, a cell was empty. Presume its because updates came in 166 // after the scanner had been opened. Wait a while and retry. 167 LOG.debug("Verification attempt failed: " + e.getMessage()); 168 } 169 try { 170 Thread.sleep(pause); 171 } catch (InterruptedException e) { 172 // continue 173 } 174 } 175 assertTrue(verified); 176 table.close(); 177 } 178 179 /** 180 * Looks at every value of the mapreduce output and verifies that indeed the values have been 181 * reversed. 182 * @param table Table to scan. 183 * @throws NullPointerException if we failed to find a cell value 184 */ 185 private void verifyAttempt(final Table table) throws IOException, NullPointerException { 186 Scan scan = new Scan(); 187 scan.addFamily(INPUT_FAMILY); 188 scan.addFamily(OUTPUT_FAMILY); 189 ResultScanner scanner = table.getScanner(scan); 190 try { 191 Iterator<Result> itr = scanner.iterator(); 192 assertTrue(itr.hasNext()); 193 while (itr.hasNext()) { 194 Result r = itr.next(); 195 if (LOG.isDebugEnabled()) { 196 if (r.size() > 2) { 197 throw new IOException("Too many results, expected 2 got " + r.size()); 198 } 199 } 200 byte[] firstValue = null; 201 byte[] secondValue = null; 202 int count = 0; 203 for (Cell kv : r.listCells()) { 204 if (count == 0) { 205 firstValue = CellUtil.cloneValue(kv); 206 } else if (count == 1) { 207 secondValue = CellUtil.cloneValue(kv); 208 } else if (count == 2) { 209 break; 210 } 211 count++; 212 } 213 String first = ""; 214 if (firstValue == null) { 215 throw new NullPointerException(Bytes.toString(r.getRow()) + ": first value is null"); 216 } 217 first = Bytes.toString(firstValue); 218 String second = ""; 219 if (secondValue == null) { 220 throw new NullPointerException(Bytes.toString(r.getRow()) + ": second value is null"); 221 } 222 byte[] secondReversed = new byte[secondValue.length]; 223 for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) { 224 secondReversed[i] = secondValue[j]; 225 } 226 second = Bytes.toString(secondReversed); 227 if (first.compareTo(second) != 0) { 228 if (LOG.isDebugEnabled()) { 229 LOG.debug( 230 "second key is not the reverse of first. row=" + Bytes.toStringBinary(r.getRow()) 231 + ", first value=" + first + ", second value=" + second); 232 } 233 fail(); 234 } 235 } 236 } finally { 237 scanner.close(); 238 } 239 } 240 241}