001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.mockito.Matchers.anyObject; 024import static org.mockito.Mockito.doAnswer; 025import static org.mockito.Mockito.doReturn; 026import static org.mockito.Mockito.doThrow; 027import static org.mockito.Mockito.mock; 028import static org.mockito.Mockito.spy; 029 030import java.io.IOException; 031import java.util.Arrays; 032import java.util.Map; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.CompareOperator; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseConfiguration; 037import org.apache.hadoop.hbase.HBaseTestingUtility; 038import org.apache.hadoop.hbase.NotServingRegionException; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.Connection; 041import org.apache.hadoop.hbase.client.ConnectionFactory; 042import org.apache.hadoop.hbase.client.Put; 043import org.apache.hadoop.hbase.client.Result; 044import org.apache.hadoop.hbase.client.ResultScanner; 045import org.apache.hadoop.hbase.client.Scan; 046import org.apache.hadoop.hbase.client.Table; 047import org.apache.hadoop.hbase.filter.Filter; 048import org.apache.hadoop.hbase.filter.RegexStringComparator; 049import org.apache.hadoop.hbase.filter.RowFilter; 050import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 051import org.apache.hadoop.hbase.testclassification.LargeTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.io.NullWritable; 054import org.apache.hadoop.mapred.JobConf; 055import org.apache.hadoop.mapred.JobConfigurable; 056import org.apache.hadoop.mapred.MiniMRCluster; 057import org.apache.hadoop.mapreduce.InputFormat; 058import org.apache.hadoop.mapreduce.Job; 059import org.apache.hadoop.mapreduce.JobContext; 060import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 061import org.junit.AfterClass; 062import org.junit.Before; 063import org.junit.BeforeClass; 064import org.junit.ClassRule; 065import org.junit.Test; 066import org.junit.experimental.categories.Category; 067import org.mockito.invocation.InvocationOnMock; 068import org.mockito.stubbing.Answer; 069import org.slf4j.Logger; 070import org.slf4j.LoggerFactory; 071 072/** 073 * This tests the TableInputFormat and its recovery semantics 074 * 075 */ 076@Category(LargeTests.class) 077public class TestTableInputFormat { 078 079 @ClassRule 080 public static final HBaseClassTestRule CLASS_RULE = 081 HBaseClassTestRule.forClass(TestTableInputFormat.class); 082 083 private static final Logger LOG = LoggerFactory.getLogger(TestTableInputFormat.class); 084 085 private final static HBaseTestingUtility UTIL = new HBaseTestingUtility(); 086 private static MiniMRCluster mrCluster; 087 static final byte[] FAMILY = Bytes.toBytes("family"); 088 089 private static final byte[][] columns = new byte[][] { FAMILY }; 090 091 @BeforeClass 092 public static void beforeClass() throws Exception { 093 UTIL.startMiniCluster(); 094 } 095 096 @AfterClass 097 public static void afterClass() throws Exception { 098 UTIL.shutdownMiniCluster(); 099 } 100 101 @Before 102 public void before() throws IOException { 103 LOG.info("before"); 104 UTIL.ensureSomeRegionServersAvailable(1); 105 LOG.info("before done"); 106 } 107 108 /** 109 * Setup a table with two rows and values. 110 * 111 * @param tableName 112 * @return 113 * @throws IOException 114 */ 115 public static Table createTable(byte[] tableName) throws IOException { 116 return createTable(tableName, new byte[][] { FAMILY }); 117 } 118 119 /** 120 * Setup a table with two rows and values per column family. 121 * 122 * @param tableName 123 * @return 124 * @throws IOException 125 */ 126 public static Table createTable(byte[] tableName, byte[][] families) throws IOException { 127 Table table = UTIL.createTable(TableName.valueOf(tableName), families); 128 Put p = new Put("aaa".getBytes()); 129 for (byte[] family : families) { 130 p.addColumn(family, null, "value aaa".getBytes()); 131 } 132 table.put(p); 133 p = new Put("bbb".getBytes()); 134 for (byte[] family : families) { 135 p.addColumn(family, null, "value bbb".getBytes()); 136 } 137 table.put(p); 138 return table; 139 } 140 141 /** 142 * Verify that the result and key have expected values. 143 * 144 * @param r single row result 145 * @param key the row key 146 * @param expectedKey the expected key 147 * @param expectedValue the expected value 148 * @return true if the result contains the expected key and value, false otherwise. 149 */ 150 static boolean checkResult(Result r, ImmutableBytesWritable key, 151 byte[] expectedKey, byte[] expectedValue) { 152 assertEquals(0, key.compareTo(expectedKey)); 153 Map<byte[], byte[]> vals = r.getFamilyMap(FAMILY); 154 byte[] value = vals.values().iterator().next(); 155 assertTrue(Arrays.equals(value, expectedValue)); 156 return true; // if succeed 157 } 158 159 /** 160 * Create table data and run tests on specified htable using the 161 * o.a.h.hbase.mapreduce API. 162 * 163 * @param table 164 * @throws IOException 165 * @throws InterruptedException 166 */ 167 static void runTestMapreduce(Table table) throws IOException, 168 InterruptedException { 169 org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl trr = 170 new org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl(); 171 Scan s = new Scan(); 172 s.setStartRow("aaa".getBytes()); 173 s.setStopRow("zzz".getBytes()); 174 s.addFamily(FAMILY); 175 trr.setScan(s); 176 trr.setHTable(table); 177 178 trr.initialize(null, null); 179 Result r = new Result(); 180 ImmutableBytesWritable key = new ImmutableBytesWritable(); 181 182 boolean more = trr.nextKeyValue(); 183 assertTrue(more); 184 key = trr.getCurrentKey(); 185 r = trr.getCurrentValue(); 186 checkResult(r, key, "aaa".getBytes(), "value aaa".getBytes()); 187 188 more = trr.nextKeyValue(); 189 assertTrue(more); 190 key = trr.getCurrentKey(); 191 r = trr.getCurrentValue(); 192 checkResult(r, key, "bbb".getBytes(), "value bbb".getBytes()); 193 194 // no more data 195 more = trr.nextKeyValue(); 196 assertFalse(more); 197 } 198 199 /** 200 * Create a table that IOE's on first scanner next call 201 * 202 * @throws IOException 203 */ 204 static Table createIOEScannerTable(byte[] name, final int failCnt) 205 throws IOException { 206 // build up a mock scanner stuff to fail the first time 207 Answer<ResultScanner> a = new Answer<ResultScanner>() { 208 int cnt = 0; 209 210 @Override 211 public ResultScanner answer(InvocationOnMock invocation) throws Throwable { 212 // first invocation return the busted mock scanner 213 if (cnt++ < failCnt) { 214 // create mock ResultScanner that always fails. 215 Scan scan = mock(Scan.class); 216 doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe 217 ResultScanner scanner = mock(ResultScanner.class); 218 // simulate TimeoutException / IOException 219 doThrow(new IOException("Injected exception")).when(scanner).next(); 220 return scanner; 221 } 222 223 // otherwise return the real scanner. 224 return (ResultScanner) invocation.callRealMethod(); 225 } 226 }; 227 228 Table htable = spy(createTable(name)); 229 doAnswer(a).when(htable).getScanner((Scan) anyObject()); 230 return htable; 231 } 232 233 /** 234 * Create a table that throws a NotServingRegionException on first scanner 235 * next call 236 * 237 * @throws IOException 238 */ 239 static Table createDNRIOEScannerTable(byte[] name, final int failCnt) 240 throws IOException { 241 // build up a mock scanner stuff to fail the first time 242 Answer<ResultScanner> a = new Answer<ResultScanner>() { 243 int cnt = 0; 244 245 @Override 246 public ResultScanner answer(InvocationOnMock invocation) throws Throwable { 247 // first invocation return the busted mock scanner 248 if (cnt++ < failCnt) { 249 // create mock ResultScanner that always fails. 250 Scan scan = mock(Scan.class); 251 doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe 252 ResultScanner scanner = mock(ResultScanner.class); 253 254 invocation.callRealMethod(); // simulate NotServingRegionException 255 doThrow( 256 new NotServingRegionException("Injected simulated TimeoutException")) 257 .when(scanner).next(); 258 return scanner; 259 } 260 261 // otherwise return the real scanner. 262 return (ResultScanner) invocation.callRealMethod(); 263 } 264 }; 265 266 Table htable = spy(createTable(name)); 267 doAnswer(a).when(htable).getScanner((Scan) anyObject()); 268 return htable; 269 } 270 271 /** 272 * Run test assuming no errors using newer mapreduce api 273 * 274 * @throws IOException 275 * @throws InterruptedException 276 */ 277 @Test 278 public void testTableRecordReaderMapreduce() throws IOException, 279 InterruptedException { 280 Table table = createTable("table1-mr".getBytes()); 281 runTestMapreduce(table); 282 } 283 284 /** 285 * Run test assuming Scanner IOException failure using newer mapreduce api 286 * 287 * @throws IOException 288 * @throws InterruptedException 289 */ 290 @Test 291 public void testTableRecordReaderScannerFailMapreduce() throws IOException, 292 InterruptedException { 293 Table htable = createIOEScannerTable("table2-mr".getBytes(), 1); 294 runTestMapreduce(htable); 295 } 296 297 /** 298 * Run test assuming Scanner IOException failure using newer mapreduce api 299 * 300 * @throws IOException 301 * @throws InterruptedException 302 */ 303 @Test(expected = IOException.class) 304 public void testTableRecordReaderScannerFailMapreduceTwice() throws IOException, 305 InterruptedException { 306 Table htable = createIOEScannerTable("table3-mr".getBytes(), 2); 307 runTestMapreduce(htable); 308 } 309 310 /** 311 * Run test assuming NotServingRegionException using newer mapreduce api 312 * 313 * @throws InterruptedException 314 * @throws org.apache.hadoop.hbase.DoNotRetryIOException 315 */ 316 @Test 317 public void testTableRecordReaderScannerTimeoutMapreduce() 318 throws IOException, InterruptedException { 319 Table htable = createDNRIOEScannerTable("table4-mr".getBytes(), 1); 320 runTestMapreduce(htable); 321 } 322 323 /** 324 * Run test assuming NotServingRegionException using newer mapreduce api 325 * 326 * @throws InterruptedException 327 * @throws org.apache.hadoop.hbase.NotServingRegionException 328 */ 329 @Test(expected = org.apache.hadoop.hbase.NotServingRegionException.class) 330 public void testTableRecordReaderScannerTimeoutMapreduceTwice() 331 throws IOException, InterruptedException { 332 Table htable = createDNRIOEScannerTable("table5-mr".getBytes(), 2); 333 runTestMapreduce(htable); 334 } 335 336 /** 337 * Verify the example we present in javadocs on TableInputFormatBase 338 */ 339 @Test 340 public void testExtensionOfTableInputFormatBase() 341 throws IOException, InterruptedException, ClassNotFoundException { 342 LOG.info("testing use of an InputFormat taht extends InputFormatBase"); 343 final Table htable = createTable(Bytes.toBytes("exampleTable"), 344 new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }); 345 testInputFormat(ExampleTIF.class); 346 } 347 348 @Test 349 public void testJobConfigurableExtensionOfTableInputFormatBase() 350 throws IOException, InterruptedException, ClassNotFoundException { 351 LOG.info("testing use of an InputFormat taht extends InputFormatBase, " + 352 "using JobConfigurable."); 353 final Table htable = createTable(Bytes.toBytes("exampleJobConfigurableTable"), 354 new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }); 355 testInputFormat(ExampleJobConfigurableTIF.class); 356 } 357 358 @Test 359 public void testDeprecatedExtensionOfTableInputFormatBase() 360 throws IOException, InterruptedException, ClassNotFoundException { 361 LOG.info("testing use of an InputFormat taht extends InputFormatBase, " + 362 "using the approach documented in 0.98."); 363 final Table htable = createTable(Bytes.toBytes("exampleDeprecatedTable"), 364 new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }); 365 testInputFormat(ExampleDeprecatedTIF.class); 366 } 367 368 void testInputFormat(Class<? extends InputFormat> clazz) 369 throws IOException, InterruptedException, ClassNotFoundException { 370 final Job job = MapreduceTestingShim.createJob(UTIL.getConfiguration()); 371 job.setInputFormatClass(clazz); 372 job.setOutputFormatClass(NullOutputFormat.class); 373 job.setMapperClass(ExampleVerifier.class); 374 job.setNumReduceTasks(0); 375 376 LOG.debug("submitting job."); 377 assertTrue("job failed!", job.waitForCompletion(true)); 378 assertEquals("Saw the wrong number of instances of the filtered-for row.", 2, job.getCounters() 379 .findCounter(TestTableInputFormat.class.getName() + ":row", "aaa").getValue()); 380 assertEquals("Saw any instances of the filtered out row.", 0, job.getCounters() 381 .findCounter(TestTableInputFormat.class.getName() + ":row", "bbb").getValue()); 382 assertEquals("Saw the wrong number of instances of columnA.", 1, job.getCounters() 383 .findCounter(TestTableInputFormat.class.getName() + ":family", "columnA").getValue()); 384 assertEquals("Saw the wrong number of instances of columnB.", 1, job.getCounters() 385 .findCounter(TestTableInputFormat.class.getName() + ":family", "columnB").getValue()); 386 assertEquals("Saw the wrong count of values for the filtered-for row.", 2, job.getCounters() 387 .findCounter(TestTableInputFormat.class.getName() + ":value", "value aaa").getValue()); 388 assertEquals("Saw the wrong count of values for the filtered-out row.", 0, job.getCounters() 389 .findCounter(TestTableInputFormat.class.getName() + ":value", "value bbb").getValue()); 390 } 391 392 public static class ExampleVerifier extends TableMapper<NullWritable, NullWritable> { 393 394 @Override 395 public void map(ImmutableBytesWritable key, Result value, Context context) 396 throws IOException { 397 for (Cell cell : value.listCells()) { 398 context.getCounter(TestTableInputFormat.class.getName() + ":row", 399 Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())) 400 .increment(1l); 401 context.getCounter(TestTableInputFormat.class.getName() + ":family", 402 Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())) 403 .increment(1l); 404 context.getCounter(TestTableInputFormat.class.getName() + ":value", 405 Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())) 406 .increment(1l); 407 } 408 } 409 410 } 411 412 public static class ExampleDeprecatedTIF extends TableInputFormatBase implements JobConfigurable { 413 414 @Override 415 public void configure(JobConf job) { 416 try { 417 Connection connection = ConnectionFactory.createConnection(job); 418 Table exampleTable = connection.getTable(TableName.valueOf(("exampleDeprecatedTable"))); 419 // mandatory 420 initializeTable(connection, exampleTable.getName()); 421 byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"), 422 Bytes.toBytes("columnB") }; 423 // optional 424 Scan scan = new Scan(); 425 for (byte[] family : inputColumns) { 426 scan.addFamily(family); 427 } 428 Filter exampleFilter = 429 new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*")); 430 scan.setFilter(exampleFilter); 431 setScan(scan); 432 } catch (IOException exception) { 433 throw new RuntimeException("Failed to configure for job.", exception); 434 } 435 } 436 437 } 438 439 440 public static class ExampleJobConfigurableTIF extends TableInputFormatBase 441 implements JobConfigurable { 442 443 @Override 444 public void configure(JobConf job) { 445 try { 446 Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job)); 447 TableName tableName = TableName.valueOf("exampleJobConfigurableTable"); 448 // mandatory 449 initializeTable(connection, tableName); 450 byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"), 451 Bytes.toBytes("columnB") }; 452 //optional 453 Scan scan = new Scan(); 454 for (byte[] family : inputColumns) { 455 scan.addFamily(family); 456 } 457 Filter exampleFilter = 458 new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*")); 459 scan.setFilter(exampleFilter); 460 setScan(scan); 461 } catch (IOException exception) { 462 throw new RuntimeException("Failed to initialize.", exception); 463 } 464 } 465 } 466 467 468 public static class ExampleTIF extends TableInputFormatBase { 469 470 @Override 471 protected void initialize(JobContext job) throws IOException { 472 Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create( 473 job.getConfiguration())); 474 TableName tableName = TableName.valueOf("exampleTable"); 475 // mandatory 476 initializeTable(connection, tableName); 477 byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"), 478 Bytes.toBytes("columnB") }; 479 //optional 480 Scan scan = new Scan(); 481 for (byte[] family : inputColumns) { 482 scan.addFamily(family); 483 } 484 Filter exampleFilter = 485 new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*")); 486 scan.setFilter(exampleFilter); 487 setScan(scan); 488 } 489 490 } 491} 492