001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertThrows; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024import static org.mockito.Mockito.*; 025import static org.mockito.Mockito.doReturn; 026import static org.mockito.Mockito.doThrow; 027import static org.mockito.Mockito.mock; 028import static org.mockito.Mockito.spy; 029 030import java.io.IOException; 031import java.util.Arrays; 032import java.util.Map; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.CompareOperator; 035import org.apache.hadoop.hbase.HBaseConfiguration; 036import org.apache.hadoop.hbase.HBaseTestingUtil; 037import org.apache.hadoop.hbase.NotServingRegionException; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.Connection; 040import org.apache.hadoop.hbase.client.ConnectionFactory; 041import org.apache.hadoop.hbase.client.Put; 042import org.apache.hadoop.hbase.client.Result; 043import org.apache.hadoop.hbase.client.ResultScanner; 044import org.apache.hadoop.hbase.client.Scan; 045import org.apache.hadoop.hbase.client.Table; 046import org.apache.hadoop.hbase.filter.Filter; 047import org.apache.hadoop.hbase.filter.RegexStringComparator; 048import org.apache.hadoop.hbase.filter.RowFilter; 049import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 050import org.apache.hadoop.hbase.testclassification.LargeTests; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.apache.hadoop.io.NullWritable; 053import org.apache.hadoop.mapred.JobConf; 054import org.apache.hadoop.mapred.JobConfigurable; 055import org.apache.hadoop.mapred.MiniMRCluster; 056import org.apache.hadoop.mapreduce.InputFormat; 057import org.apache.hadoop.mapreduce.Job; 058import org.apache.hadoop.mapreduce.JobContext; 059import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 060import org.junit.jupiter.api.AfterAll; 061import org.junit.jupiter.api.BeforeAll; 062import org.junit.jupiter.api.BeforeEach; 063import org.junit.jupiter.api.Tag; 064import org.junit.jupiter.api.Test; 065import org.mockito.invocation.InvocationOnMock; 066import org.mockito.stubbing.Answer; 067import org.slf4j.Logger; 068import org.slf4j.LoggerFactory; 069 070/** 071 * This tests the TableInputFormat and its recovery semantics 072 */ 073@Tag(LargeTests.TAG) 074public class TestTableInputFormat { 075 076 private static final Logger LOG = LoggerFactory.getLogger(TestTableInputFormat.class); 077 078 private final static HBaseTestingUtil UTIL = new HBaseTestingUtil(); 079 private static MiniMRCluster mrCluster; 080 static final byte[] FAMILY = Bytes.toBytes("family"); 081 082 private static final byte[][] columns = new byte[][] { FAMILY }; 083 084 @BeforeAll 085 public static void beforeClass() throws Exception { 086 UTIL.startMiniCluster(); 087 } 088 089 @AfterAll 090 public static void afterClass() throws Exception { 091 UTIL.shutdownMiniCluster(); 092 } 093 094 @BeforeEach 095 public void before() throws IOException { 096 LOG.info("before"); 097 UTIL.ensureSomeRegionServersAvailable(1); 098 LOG.info("before done"); 099 } 100 101 /** 102 * Setup a table with two rows and values. 103 * @return A Table instance for the created table. 104 */ 105 public static Table createTable(byte[] tableName) throws IOException { 106 return createTable(tableName, new byte[][] { FAMILY }); 107 } 108 109 /** 110 * Setup a table with two rows and values per column family. 111 * @return A Table instance for the created table. 112 */ 113 public static Table createTable(byte[] tableName, byte[][] families) throws IOException { 114 Table table = UTIL.createTable(TableName.valueOf(tableName), families); 115 Put p = new Put(Bytes.toBytes("aaa")); 116 for (byte[] family : families) { 117 p.addColumn(family, null, Bytes.toBytes("value aaa")); 118 } 119 table.put(p); 120 p = new Put(Bytes.toBytes("bbb")); 121 for (byte[] family : families) { 122 p.addColumn(family, null, Bytes.toBytes("value bbb")); 123 } 124 table.put(p); 125 return table; 126 } 127 128 /** 129 * Verify that the result and key have expected values. 130 * @param r single row result 131 * @param key the row key 132 * @param expectedKey the expected key 133 * @param expectedValue the expected value 134 * @return true if the result contains the expected key and value, false otherwise. 135 */ 136 static boolean checkResult(Result r, ImmutableBytesWritable key, byte[] expectedKey, 137 byte[] expectedValue) { 138 assertEquals(0, key.compareTo(expectedKey)); 139 Map<byte[], byte[]> vals = r.getFamilyMap(FAMILY); 140 byte[] value = vals.values().iterator().next(); 141 assertTrue(Arrays.equals(value, expectedValue)); 142 return true; // if succeed 143 } 144 145 /** 146 * Create table data and run tests on specified htable using the o.a.h.hbase.mapreduce API. 147 */ 148 static void runTestMapreduce(Table table) throws IOException, InterruptedException { 149 org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl trr = 150 new org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl(); 151 Scan s = new Scan(); 152 s.withStartRow(Bytes.toBytes("aaa")); 153 s.withStopRow(Bytes.toBytes("zzz")); 154 s.addFamily(FAMILY); 155 trr.setScan(s); 156 trr.setHTable(table); 157 158 trr.initialize(null, null); 159 Result r = new Result(); 160 ImmutableBytesWritable key = new ImmutableBytesWritable(); 161 162 boolean more = trr.nextKeyValue(); 163 assertTrue(more); 164 key = trr.getCurrentKey(); 165 r = trr.getCurrentValue(); 166 checkResult(r, key, Bytes.toBytes("aaa"), Bytes.toBytes("value aaa")); 167 168 more = trr.nextKeyValue(); 169 assertTrue(more); 170 key = trr.getCurrentKey(); 171 r = trr.getCurrentValue(); 172 checkResult(r, key, Bytes.toBytes("bbb"), Bytes.toBytes("value bbb")); 173 174 // no more data 175 more = trr.nextKeyValue(); 176 assertFalse(more); 177 } 178 179 /** 180 * Create a table that IOE's on first scanner next call 181 */ 182 static Table createIOEScannerTable(byte[] name, final int failCnt) throws IOException { 183 // build up a mock scanner stuff to fail the first time 184 Answer<ResultScanner> a = new Answer<ResultScanner>() { 185 int cnt = 0; 186 187 @Override 188 public ResultScanner answer(InvocationOnMock invocation) throws Throwable { 189 // first invocation return the busted mock scanner 190 if (cnt++ < failCnt) { 191 // create mock ResultScanner that always fails. 192 Scan scan = mock(Scan.class); 193 doReturn(Bytes.toBytes("bogus")).when(scan).getStartRow(); // avoid npe 194 ResultScanner scanner = mock(ResultScanner.class); 195 // simulate TimeoutException / IOException 196 doThrow(new IOException("Injected exception")).when(scanner).next(); 197 return scanner; 198 } 199 200 // otherwise return the real scanner. 201 return (ResultScanner) invocation.callRealMethod(); 202 } 203 }; 204 205 Table htable = spy(createTable(name)); 206 doAnswer(a).when(htable).getScanner(any(Scan.class)); 207 return htable; 208 } 209 210 /** 211 * Create a table that throws a NotServingRegionException on first scanner next call 212 */ 213 static Table createDNRIOEScannerTable(byte[] name, final int failCnt) throws IOException { 214 // build up a mock scanner stuff to fail the first time 215 Answer<ResultScanner> a = new Answer<ResultScanner>() { 216 int cnt = 0; 217 218 @Override 219 public ResultScanner answer(InvocationOnMock invocation) throws Throwable { 220 // first invocation return the busted mock scanner 221 if (cnt++ < failCnt) { 222 // create mock ResultScanner that always fails. 223 Scan scan = mock(Scan.class); 224 doReturn(Bytes.toBytes("bogus")).when(scan).getStartRow(); // avoid npe 225 ResultScanner scanner = mock(ResultScanner.class); 226 227 invocation.callRealMethod(); // simulate NotServingRegionException 228 doThrow(new NotServingRegionException("Injected simulated TimeoutException")) 229 .when(scanner).next(); 230 return scanner; 231 } 232 233 // otherwise return the real scanner. 234 return (ResultScanner) invocation.callRealMethod(); 235 } 236 }; 237 238 Table htable = spy(createTable(name)); 239 doAnswer(a).when(htable).getScanner(any(Scan.class)); 240 return htable; 241 } 242 243 /** 244 * Run test assuming no errors using newer mapreduce api 245 */ 246 @Test 247 public void testTableRecordReaderMapreduce() throws IOException, InterruptedException { 248 Table table = createTable(Bytes.toBytes("table1-mr")); 249 runTestMapreduce(table); 250 } 251 252 /** 253 * Run test assuming Scanner IOException failure using newer mapreduce api 254 */ 255 @Test 256 public void testTableRecordReaderScannerFailMapreduce() throws IOException, InterruptedException { 257 Table htable = createIOEScannerTable(Bytes.toBytes("table2-mr"), 1); 258 runTestMapreduce(htable); 259 } 260 261 /** 262 * Run test assuming Scanner IOException failure using newer mapreduce api 263 */ 264 @Test 265 public void testTableRecordReaderScannerFailMapreduceTwice() 266 throws IOException, InterruptedException { 267 Table htable = createIOEScannerTable(Bytes.toBytes("table3-mr"), 2); 268 assertThrows(IOException.class, () -> runTestMapreduce(htable)); 269 } 270 271 /** 272 * Run test assuming NotServingRegionException using newer mapreduce api 273 */ 274 @Test 275 public void testTableRecordReaderScannerTimeoutMapreduce() 276 throws IOException, InterruptedException { 277 Table htable = createDNRIOEScannerTable(Bytes.toBytes("table4-mr"), 1); 278 runTestMapreduce(htable); 279 } 280 281 /** 282 * Run test assuming NotServingRegionException using newer mapreduce api 283 */ 284 @Test 285 public void testTableRecordReaderScannerTimeoutMapreduceTwice() 286 throws IOException, InterruptedException { 287 Table htable = createDNRIOEScannerTable(Bytes.toBytes("table5-mr"), 2); 288 assertThrows(org.apache.hadoop.hbase.NotServingRegionException.class, 289 () -> runTestMapreduce(htable)); 290 } 291 292 /** 293 * Verify the example we present in javadocs on TableInputFormatBase 294 */ 295 @Test 296 public void testExtensionOfTableInputFormatBase() 297 throws IOException, InterruptedException, ClassNotFoundException { 298 LOG.info("testing use of an InputFormat taht extends InputFormatBase"); 299 final Table htable = createTable(Bytes.toBytes("exampleTable"), 300 new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }); 301 testInputFormat(ExampleTIF.class); 302 } 303 304 @Test 305 public void testJobConfigurableExtensionOfTableInputFormatBase() 306 throws IOException, InterruptedException, ClassNotFoundException { 307 LOG.info( 308 "testing use of an InputFormat taht extends InputFormatBase, " + "using JobConfigurable."); 309 final Table htable = createTable(Bytes.toBytes("exampleJobConfigurableTable"), 310 new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }); 311 testInputFormat(ExampleJobConfigurableTIF.class); 312 } 313 314 @Test 315 public void testDeprecatedExtensionOfTableInputFormatBase() 316 throws IOException, InterruptedException, ClassNotFoundException { 317 LOG.info("testing use of an InputFormat taht extends InputFormatBase, " 318 + "using the approach documented in 0.98."); 319 final Table htable = createTable(Bytes.toBytes("exampleDeprecatedTable"), 320 new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }); 321 testInputFormat(ExampleDeprecatedTIF.class); 322 } 323 324 void testInputFormat(Class<? extends InputFormat> clazz) 325 throws IOException, InterruptedException, ClassNotFoundException { 326 final Job job = MapreduceTestingShim.createJob(UTIL.getConfiguration()); 327 job.setInputFormatClass(clazz); 328 job.setOutputFormatClass(NullOutputFormat.class); 329 job.setMapperClass(ExampleVerifier.class); 330 job.setNumReduceTasks(0); 331 332 LOG.debug("submitting job."); 333 assertTrue(job.waitForCompletion(true), "job failed!"); 334 assertEquals(2, job.getCounters() 335 .findCounter(TestTableInputFormat.class.getName() + ":row", "aaa").getValue(), 336 "Saw the wrong number of instances of the filtered-for row."); 337 assertEquals(0, job.getCounters() 338 .findCounter(TestTableInputFormat.class.getName() + ":row", "bbb").getValue(), 339 "Saw any instances of the filtered out row."); 340 assertEquals(1, job.getCounters() 341 .findCounter(TestTableInputFormat.class.getName() + ":family", "columnA").getValue(), 342 "Saw the wrong number of instances of columnA."); 343 assertEquals(1, job.getCounters() 344 .findCounter(TestTableInputFormat.class.getName() + ":family", "columnB").getValue(), 345 "Saw the wrong number of instances of columnB."); 346 assertEquals(2, job.getCounters() 347 .findCounter(TestTableInputFormat.class.getName() + ":value", "value aaa").getValue(), 348 "Saw the wrong count of values for the filtered-for row."); 349 assertEquals(0, job.getCounters() 350 .findCounter(TestTableInputFormat.class.getName() + ":value", "value bbb").getValue(), 351 "Saw the wrong count of values for the filtered-out row."); 352 } 353 354 public static class ExampleVerifier extends TableMapper<NullWritable, NullWritable> { 355 356 @Override 357 public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException { 358 for (Cell cell : value.listCells()) { 359 context 360 .getCounter(TestTableInputFormat.class.getName() + ":row", 361 Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())) 362 .increment(1l); 363 context 364 .getCounter(TestTableInputFormat.class.getName() + ":family", 365 Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())) 366 .increment(1l); 367 context 368 .getCounter(TestTableInputFormat.class.getName() + ":value", 369 Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())) 370 .increment(1l); 371 } 372 } 373 374 } 375 376 public static class ExampleDeprecatedTIF extends TableInputFormatBase implements JobConfigurable { 377 378 @Override 379 public void configure(JobConf job) { 380 try { 381 Connection connection = ConnectionFactory.createConnection(job); 382 Table exampleTable = connection.getTable(TableName.valueOf(("exampleDeprecatedTable"))); 383 // mandatory 384 initializeTable(connection, exampleTable.getName()); 385 byte[][] inputColumns = new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }; 386 // optional 387 Scan scan = new Scan(); 388 for (byte[] family : inputColumns) { 389 scan.addFamily(family); 390 } 391 Filter exampleFilter = 392 new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*")); 393 scan.setFilter(exampleFilter); 394 setScan(scan); 395 } catch (IOException exception) { 396 throw new RuntimeException("Failed to configure for job.", exception); 397 } 398 } 399 400 } 401 402 public static class ExampleJobConfigurableTIF extends TableInputFormatBase 403 implements JobConfigurable { 404 405 @Override 406 public void configure(JobConf job) { 407 try { 408 Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job)); 409 TableName tableName = TableName.valueOf("exampleJobConfigurableTable"); 410 // mandatory 411 initializeTable(connection, tableName); 412 byte[][] inputColumns = new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }; 413 // optional 414 Scan scan = new Scan(); 415 for (byte[] family : inputColumns) { 416 scan.addFamily(family); 417 } 418 Filter exampleFilter = 419 new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*")); 420 scan.setFilter(exampleFilter); 421 setScan(scan); 422 } catch (IOException exception) { 423 throw new RuntimeException("Failed to initialize.", exception); 424 } 425 } 426 } 427 428 public static class ExampleTIF extends TableInputFormatBase { 429 430 @Override 431 protected void initialize(JobContext job) throws IOException { 432 Connection connection = 433 ConnectionFactory.createConnection(HBaseConfiguration.create(job.getConfiguration())); 434 TableName tableName = TableName.valueOf("exampleTable"); 435 // mandatory 436 initializeTable(connection, tableName); 437 byte[][] inputColumns = new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }; 438 // optional 439 Scan scan = new Scan(); 440 for (byte[] family : inputColumns) { 441 scan.addFamily(family); 442 } 443 Filter exampleFilter = 444 new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*")); 445 scan.setFilter(exampleFilter); 446 setScan(scan); 447 } 448 449 } 450}