001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapred; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertThrows; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024import static org.mockito.ArgumentMatchers.any; 025import static org.mockito.Mockito.doAnswer; 026import static org.mockito.Mockito.doReturn; 027import static org.mockito.Mockito.doThrow; 028import static org.mockito.Mockito.mock; 029import static org.mockito.Mockito.spy; 030 031import java.io.IOException; 032import java.util.Arrays; 033import java.util.Map; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.CompareOperator; 037import org.apache.hadoop.hbase.HBaseConfiguration; 038import org.apache.hadoop.hbase.HBaseTestingUtil; 039import org.apache.hadoop.hbase.NotServingRegionException; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.client.Connection; 042import org.apache.hadoop.hbase.client.ConnectionFactory; 043import org.apache.hadoop.hbase.client.Put; 044import org.apache.hadoop.hbase.client.Result; 045import org.apache.hadoop.hbase.client.ResultScanner; 046import org.apache.hadoop.hbase.client.Scan; 047import org.apache.hadoop.hbase.client.Table; 048import org.apache.hadoop.hbase.filter.Filter; 049import org.apache.hadoop.hbase.filter.RegexStringComparator; 050import org.apache.hadoop.hbase.filter.RowFilter; 051import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 052import org.apache.hadoop.hbase.testclassification.LargeTests; 053import org.apache.hadoop.hbase.testclassification.MapReduceTests; 054import org.apache.hadoop.hbase.util.Bytes; 055import org.apache.hadoop.io.NullWritable; 056import org.apache.hadoop.mapred.InputFormat; 057import org.apache.hadoop.mapred.JobClient; 058import org.apache.hadoop.mapred.JobConf; 059import org.apache.hadoop.mapred.JobConfigurable; 060import org.apache.hadoop.mapred.OutputCollector; 061import org.apache.hadoop.mapred.Reporter; 062import org.apache.hadoop.mapred.RunningJob; 063import org.apache.hadoop.mapred.lib.NullOutputFormat; 064import org.junit.jupiter.api.AfterAll; 065import org.junit.jupiter.api.BeforeAll; 066import org.junit.jupiter.api.BeforeEach; 067import org.junit.jupiter.api.Tag; 068import org.junit.jupiter.api.Test; 069import org.mockito.invocation.InvocationOnMock; 070import org.mockito.stubbing.Answer; 071import org.slf4j.Logger; 072import org.slf4j.LoggerFactory; 073 074/** 075 * This tests the TableInputFormat and its recovery semantics 076 */ 077@Tag(MapReduceTests.TAG) 078@Tag(LargeTests.TAG) 079public class TestTableInputFormat { 080 081 private static final Logger LOG = LoggerFactory.getLogger(TestTableInputFormat.class); 082 083 private final static HBaseTestingUtil UTIL = new HBaseTestingUtil(); 084 085 static final byte[] FAMILY = Bytes.toBytes("family"); 086 087 private static final byte[][] columns = new byte[][] { FAMILY }; 088 089 @BeforeAll 090 public static void beforeClass() throws Exception { 091 UTIL.startMiniCluster(); 092 } 093 094 @AfterAll 095 public static void afterClass() throws Exception { 096 UTIL.shutdownMiniCluster(); 097 } 098 099 @BeforeEach 100 public void before() throws IOException { 101 LOG.info("before"); 102 UTIL.ensureSomeRegionServersAvailable(1); 103 LOG.info("before done"); 104 } 105 106 /** 107 * Setup a table with two rows and values. 108 * @param tableName the name of the table to create 109 * @return A Table instance for the created table. 110 */ 111 public static Table createTable(byte[] tableName) throws IOException { 112 return createTable(tableName, new byte[][] { FAMILY }); 113 } 114 115 /** 116 * Setup a table with two rows and values per column family. 117 * @return A Table instance for the created table. 118 */ 119 public static Table createTable(byte[] tableName, byte[][] families) throws IOException { 120 Table table = UTIL.createTable(TableName.valueOf(tableName), families); 121 Put p = new Put(Bytes.toBytes("aaa")); 122 for (byte[] family : families) { 123 p.addColumn(family, null, Bytes.toBytes("value aaa")); 124 } 125 table.put(p); 126 p = new Put(Bytes.toBytes("bbb")); 127 for (byte[] family : families) { 128 p.addColumn(family, null, Bytes.toBytes("value bbb")); 129 } 130 table.put(p); 131 return table; 132 } 133 134 /** 135 * Verify that the result and key have expected values. 136 * @param r single row result 137 * @param key the row key 138 * @param expectedKey the expected key 139 * @param expectedValue the expected value 140 * @return true if the result contains the expected key and value, false otherwise. 141 */ 142 static boolean checkResult(Result r, ImmutableBytesWritable key, byte[] expectedKey, 143 byte[] expectedValue) { 144 assertEquals(0, key.compareTo(expectedKey)); 145 Map<byte[], byte[]> vals = r.getFamilyMap(FAMILY); 146 byte[] value = vals.values().iterator().next(); 147 assertTrue(Arrays.equals(value, expectedValue)); 148 return true; // if succeed 149 } 150 151 /** 152 * Create table data and run tests on specified htable using the o.a.h.hbase.mapred API. 153 */ 154 static void runTestMapred(Table table) throws IOException { 155 org.apache.hadoop.hbase.mapred.TableRecordReader trr = 156 new org.apache.hadoop.hbase.mapred.TableRecordReader(); 157 trr.setStartRow(Bytes.toBytes("aaa")); 158 trr.setEndRow(Bytes.toBytes("zzz")); 159 trr.setHTable(table); 160 trr.setInputColumns(columns); 161 162 trr.init(); 163 Result r = new Result(); 164 ImmutableBytesWritable key = new ImmutableBytesWritable(); 165 166 boolean more = trr.next(key, r); 167 assertTrue(more); 168 checkResult(r, key, Bytes.toBytes("aaa"), Bytes.toBytes("value aaa")); 169 170 more = trr.next(key, r); 171 assertTrue(more); 172 checkResult(r, key, Bytes.toBytes("bbb"), Bytes.toBytes("value bbb")); 173 174 // no more data 175 more = trr.next(key, r); 176 assertFalse(more); 177 } 178 179 /** 180 * Create a table that IOE's on first scanner next call 181 */ 182 static Table createIOEScannerTable(byte[] name, final int failCnt) throws IOException { 183 // build up a mock scanner stuff to fail the first time 184 Answer<ResultScanner> a = new Answer<ResultScanner>() { 185 int cnt = 0; 186 187 @Override 188 public ResultScanner answer(InvocationOnMock invocation) throws Throwable { 189 // first invocation return the busted mock scanner 190 if (cnt++ < failCnt) { 191 // create mock ResultScanner that always fails. 192 Scan scan = mock(Scan.class); 193 doReturn(Bytes.toBytes("bogus")).when(scan).getStartRow(); // avoid npe 194 ResultScanner scanner = mock(ResultScanner.class); 195 // simulate TimeoutException / IOException 196 doThrow(new IOException("Injected exception")).when(scanner).next(); 197 return scanner; 198 } 199 200 // otherwise return the real scanner. 201 return (ResultScanner) invocation.callRealMethod(); 202 } 203 }; 204 205 Table htable = spy(createTable(name)); 206 doAnswer(a).when(htable).getScanner(any(Scan.class)); 207 return htable; 208 } 209 210 /** 211 * Create a table that throws a DoNoRetryIOException on first scanner next call 212 */ 213 static Table createDNRIOEScannerTable(byte[] name, final int failCnt) throws IOException { 214 // build up a mock scanner stuff to fail the first time 215 Answer<ResultScanner> a = new Answer<ResultScanner>() { 216 int cnt = 0; 217 218 @Override 219 public ResultScanner answer(InvocationOnMock invocation) throws Throwable { 220 // first invocation return the busted mock scanner 221 if (cnt++ < failCnt) { 222 // create mock ResultScanner that always fails. 223 Scan scan = mock(Scan.class); 224 doReturn(Bytes.toBytes("bogus")).when(scan).getStartRow(); // avoid npe 225 ResultScanner scanner = mock(ResultScanner.class); 226 227 invocation.callRealMethod(); // simulate NotServingRegionException 228 doThrow(new NotServingRegionException("Injected simulated TimeoutException")) 229 .when(scanner).next(); 230 return scanner; 231 } 232 233 // otherwise return the real scanner. 234 return (ResultScanner) invocation.callRealMethod(); 235 } 236 }; 237 238 Table htable = spy(createTable(name)); 239 doAnswer(a).when(htable).getScanner(any(Scan.class)); 240 return htable; 241 } 242 243 /** 244 * Run test assuming no errors using mapred api. 245 */ 246 @Test 247 public void testTableRecordReader() throws IOException { 248 Table table = createTable(Bytes.toBytes("table1")); 249 runTestMapred(table); 250 } 251 252 /** 253 * Run test assuming Scanner IOException failure using mapred api, 254 */ 255 @Test 256 public void testTableRecordReaderScannerFail() throws IOException { 257 Table htable = createIOEScannerTable(Bytes.toBytes("table2"), 1); 258 runTestMapred(htable); 259 } 260 261 /** 262 * Run test assuming Scanner IOException failure using mapred api, 263 */ 264 @Test 265 public void testTableRecordReaderScannerFailTwice() throws IOException { 266 Table htable = createIOEScannerTable(Bytes.toBytes("table3"), 2); 267 assertThrows(IOException.class, () -> runTestMapred(htable)); 268 } 269 270 /** 271 * Run test assuming NotServingRegionException using mapred api. 272 * @throws org.apache.hadoop.hbase.DoNotRetryIOException 273 */ 274 @Test 275 public void testTableRecordReaderScannerTimeout() throws IOException { 276 Table htable = createDNRIOEScannerTable(Bytes.toBytes("table4"), 1); 277 runTestMapred(htable); 278 } 279 280 /** 281 * Run test assuming NotServingRegionException using mapred api. 282 * @throws org.apache.hadoop.hbase.DoNotRetryIOException 283 */ 284 @Test 285 public void testTableRecordReaderScannerTimeoutTwice() throws IOException { 286 Table htable = createDNRIOEScannerTable(Bytes.toBytes("table5"), 2); 287 assertThrows(org.apache.hadoop.hbase.NotServingRegionException.class, 288 () -> runTestMapred(htable)); 289 } 290 291 /** 292 * Verify the example we present in javadocs on TableInputFormatBase 293 */ 294 @Test 295 public void testExtensionOfTableInputFormatBase() throws IOException { 296 LOG.info("testing use of an InputFormat taht extends InputFormatBase"); 297 final Table table = createTable(Bytes.toBytes("exampleTable"), 298 new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }); 299 testInputFormat(ExampleTIF.class); 300 } 301 302 @Test 303 public void testDeprecatedExtensionOfTableInputFormatBase() throws IOException { 304 LOG.info( 305 "testing use of an InputFormat taht extends InputFormatBase, " + "as it was given in 0.98."); 306 final Table table = createTable(Bytes.toBytes("exampleDeprecatedTable"), 307 new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }); 308 testInputFormat(ExampleDeprecatedTIF.class); 309 } 310 311 @Test 312 public void testJobConfigurableExtensionOfTableInputFormatBase() throws IOException { 313 LOG.info( 314 "testing use of an InputFormat taht extends InputFormatBase, " + "using JobConfigurable."); 315 final Table table = createTable(Bytes.toBytes("exampleJobConfigurableTable"), 316 new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }); 317 testInputFormat(ExampleJobConfigurableTIF.class); 318 } 319 320 void testInputFormat(Class<? extends InputFormat> clazz) throws IOException { 321 Configuration conf = UTIL.getConfiguration(); 322 final JobConf job = new JobConf(conf); 323 job.setInputFormat(clazz); 324 job.setOutputFormat(NullOutputFormat.class); 325 job.setMapperClass(ExampleVerifier.class); 326 job.setNumReduceTasks(0); 327 LOG.debug("submitting job."); 328 final RunningJob run = JobClient.runJob(job); 329 assertTrue(run.isSuccessful(), "job failed!"); 330 assertEquals(2, run.getCounters() 331 .findCounter(TestTableInputFormat.class.getName() + ":row", "aaa").getCounter(), 332 "Saw the wrong number of instances of the filtered-for row."); 333 assertEquals(0, run.getCounters() 334 .findCounter(TestTableInputFormat.class.getName() + ":row", "bbb").getCounter(), 335 "Saw any instances of the filtered out row."); 336 assertEquals(1, run.getCounters() 337 .findCounter(TestTableInputFormat.class.getName() + ":family", "columnA").getCounter(), 338 "Saw the wrong number of instances of columnA."); 339 assertEquals(1, run.getCounters() 340 .findCounter(TestTableInputFormat.class.getName() + ":family", "columnB").getCounter(), 341 "Saw the wrong number of instances of columnB."); 342 assertEquals(2, run.getCounters() 343 .findCounter(TestTableInputFormat.class.getName() + ":value", "value aaa").getCounter(), 344 "Saw the wrong count of values for the filtered-for row."); 345 assertEquals(0, run.getCounters() 346 .findCounter(TestTableInputFormat.class.getName() + ":value", "value bbb").getCounter(), 347 "Saw the wrong count of values for the filtered-out row."); 348 } 349 350 public static class ExampleVerifier implements TableMap<NullWritable, NullWritable> { 351 352 @Override 353 public void configure(JobConf conf) { 354 } 355 356 @Override 357 public void map(ImmutableBytesWritable key, Result value, 358 OutputCollector<NullWritable, NullWritable> output, Reporter reporter) throws IOException { 359 for (Cell cell : value.listCells()) { 360 reporter 361 .getCounter(TestTableInputFormat.class.getName() + ":row", 362 Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())) 363 .increment(1l); 364 reporter 365 .getCounter(TestTableInputFormat.class.getName() + ":family", 366 Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())) 367 .increment(1l); 368 reporter 369 .getCounter(TestTableInputFormat.class.getName() + ":value", 370 Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())) 371 .increment(1l); 372 } 373 } 374 375 @Override 376 public void close() { 377 } 378 379 } 380 381 public static class ExampleDeprecatedTIF extends TableInputFormatBase implements JobConfigurable { 382 383 @Override 384 public void configure(JobConf job) { 385 try { 386 Connection connection = ConnectionFactory.createConnection(job); 387 Table exampleTable = connection.getTable(TableName.valueOf("exampleDeprecatedTable")); 388 // mandatory 389 initializeTable(connection, exampleTable.getName()); 390 byte[][] inputColumns = new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }; 391 // mandatory 392 setInputColumns(inputColumns); 393 Filter exampleFilter = 394 new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*")); 395 // optional 396 setRowFilter(exampleFilter); 397 } catch (IOException exception) { 398 throw new RuntimeException("Failed to configure for job.", exception); 399 } 400 } 401 402 } 403 404 public static class ExampleJobConfigurableTIF extends ExampleTIF implements JobConfigurable { 405 406 @Override 407 public void configure(JobConf job) { 408 try { 409 initialize(job); 410 } catch (IOException exception) { 411 throw new RuntimeException("Failed to initialize.", exception); 412 } 413 } 414 415 @Override 416 protected void initialize(JobConf job) throws IOException { 417 initialize(job, "exampleJobConfigurableTable"); 418 } 419 } 420 421 public static class ExampleTIF extends TableInputFormatBase { 422 423 @Override 424 protected void initialize(JobConf job) throws IOException { 425 initialize(job, "exampleTable"); 426 } 427 428 protected void initialize(JobConf job, String table) throws IOException { 429 Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job)); 430 TableName tableName = TableName.valueOf(table); 431 // mandatory 432 initializeTable(connection, tableName); 433 byte[][] inputColumns = new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }; 434 // mandatory 435 setInputColumns(inputColumns); 436 Filter exampleFilter = 437 new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*")); 438 // optional 439 setRowFilter(exampleFilter); 440 } 441 442 } 443 444}