001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapred; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.mockito.Matchers.anyObject; 024import static org.mockito.Mockito.doAnswer; 025import static org.mockito.Mockito.doReturn; 026import static org.mockito.Mockito.doThrow; 027import static org.mockito.Mockito.mock; 028import static org.mockito.Mockito.spy; 029 030import java.io.IOException; 031import java.util.Arrays; 032import java.util.Map; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.CompareOperator; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseConfiguration; 038import org.apache.hadoop.hbase.HBaseTestingUtility; 039import org.apache.hadoop.hbase.NotServingRegionException; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.client.Connection; 042import org.apache.hadoop.hbase.client.ConnectionFactory; 043import org.apache.hadoop.hbase.client.Put; 044import org.apache.hadoop.hbase.client.Result; 045import org.apache.hadoop.hbase.client.ResultScanner; 046import org.apache.hadoop.hbase.client.Scan; 047import org.apache.hadoop.hbase.client.Table; 048import org.apache.hadoop.hbase.filter.Filter; 049import org.apache.hadoop.hbase.filter.RegexStringComparator; 050import org.apache.hadoop.hbase.filter.RowFilter; 051import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 052import org.apache.hadoop.hbase.testclassification.LargeTests; 053import org.apache.hadoop.hbase.testclassification.MapReduceTests; 054import org.apache.hadoop.hbase.util.Bytes; 055import org.apache.hadoop.io.NullWritable; 056import org.apache.hadoop.mapred.InputFormat; 057import org.apache.hadoop.mapred.JobClient; 058import org.apache.hadoop.mapred.JobConf; 059import org.apache.hadoop.mapred.JobConfigurable; 060import org.apache.hadoop.mapred.OutputCollector; 061import org.apache.hadoop.mapred.Reporter; 062import org.apache.hadoop.mapred.RunningJob; 063import org.apache.hadoop.mapred.lib.NullOutputFormat; 064import org.junit.AfterClass; 065import org.junit.Before; 066import org.junit.BeforeClass; 067import org.junit.ClassRule; 068import org.junit.Test; 069import org.junit.experimental.categories.Category; 070import org.mockito.invocation.InvocationOnMock; 071import org.mockito.stubbing.Answer; 072import org.slf4j.Logger; 073import org.slf4j.LoggerFactory; 074 075/** 076 * This tests the TableInputFormat and its recovery semantics 077 */ 078@Category({MapReduceTests.class, LargeTests.class}) 079public class TestTableInputFormat { 080 081 @ClassRule 082 public static final HBaseClassTestRule CLASS_RULE = 083 HBaseClassTestRule.forClass(TestTableInputFormat.class); 084 085 private static final Logger LOG = LoggerFactory.getLogger(TestTableInputFormat.class); 086 087 private final static HBaseTestingUtility UTIL = new HBaseTestingUtility(); 088 089 static final byte[] FAMILY = Bytes.toBytes("family"); 090 091 private static final byte[][] columns = new byte[][] { FAMILY }; 092 093 @BeforeClass 094 public static void beforeClass() throws Exception { 095 UTIL.startMiniCluster(); 096 } 097 098 @AfterClass 099 public static void afterClass() throws Exception { 100 UTIL.shutdownMiniCluster(); 101 } 102 103 @Before 104 public void before() throws IOException { 105 LOG.info("before"); 106 UTIL.ensureSomeRegionServersAvailable(1); 107 LOG.info("before done"); 108 } 109 110 /** 111 * Setup a table with two rows and values. 112 * 113 * @param tableName 114 * @return 115 * @throws IOException 116 */ 117 public static Table createTable(byte[] tableName) throws IOException { 118 return createTable(tableName, new byte[][] { FAMILY }); 119 } 120 121 /** 122 * Setup a table with two rows and values per column family. 123 * 124 * @param tableName 125 * @return 126 * @throws IOException 127 */ 128 public static Table createTable(byte[] tableName, byte[][] families) throws IOException { 129 Table table = UTIL.createTable(TableName.valueOf(tableName), families); 130 Put p = new Put("aaa".getBytes()); 131 for (byte[] family : families) { 132 p.addColumn(family, null, "value aaa".getBytes()); 133 } 134 table.put(p); 135 p = new Put("bbb".getBytes()); 136 for (byte[] family : families) { 137 p.addColumn(family, null, "value bbb".getBytes()); 138 } 139 table.put(p); 140 return table; 141 } 142 143 /** 144 * Verify that the result and key have expected values. 145 * 146 * @param r single row result 147 * @param key the row key 148 * @param expectedKey the expected key 149 * @param expectedValue the expected value 150 * @return true if the result contains the expected key and value, false otherwise. 151 */ 152 static boolean checkResult(Result r, ImmutableBytesWritable key, 153 byte[] expectedKey, byte[] expectedValue) { 154 assertEquals(0, key.compareTo(expectedKey)); 155 Map<byte[], byte[]> vals = r.getFamilyMap(FAMILY); 156 byte[] value = vals.values().iterator().next(); 157 assertTrue(Arrays.equals(value, expectedValue)); 158 return true; // if succeed 159 } 160 161 /** 162 * Create table data and run tests on specified htable using the 163 * o.a.h.hbase.mapred API. 164 * 165 * @param table 166 * @throws IOException 167 */ 168 static void runTestMapred(Table table) throws IOException { 169 org.apache.hadoop.hbase.mapred.TableRecordReader trr = 170 new org.apache.hadoop.hbase.mapred.TableRecordReader(); 171 trr.setStartRow("aaa".getBytes()); 172 trr.setEndRow("zzz".getBytes()); 173 trr.setHTable(table); 174 trr.setInputColumns(columns); 175 176 trr.init(); 177 Result r = new Result(); 178 ImmutableBytesWritable key = new ImmutableBytesWritable(); 179 180 boolean more = trr.next(key, r); 181 assertTrue(more); 182 checkResult(r, key, "aaa".getBytes(), "value aaa".getBytes()); 183 184 more = trr.next(key, r); 185 assertTrue(more); 186 checkResult(r, key, "bbb".getBytes(), "value bbb".getBytes()); 187 188 // no more data 189 more = trr.next(key, r); 190 assertFalse(more); 191 } 192 193 /** 194 * Create a table that IOE's on first scanner next call 195 * 196 * @throws IOException 197 */ 198 static Table createIOEScannerTable(byte[] name, final int failCnt) 199 throws IOException { 200 // build up a mock scanner stuff to fail the first time 201 Answer<ResultScanner> a = new Answer<ResultScanner>() { 202 int cnt = 0; 203 204 @Override 205 public ResultScanner answer(InvocationOnMock invocation) throws Throwable { 206 // first invocation return the busted mock scanner 207 if (cnt++ < failCnt) { 208 // create mock ResultScanner that always fails. 209 Scan scan = mock(Scan.class); 210 doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe 211 ResultScanner scanner = mock(ResultScanner.class); 212 // simulate TimeoutException / IOException 213 doThrow(new IOException("Injected exception")).when(scanner).next(); 214 return scanner; 215 } 216 217 // otherwise return the real scanner. 218 return (ResultScanner) invocation.callRealMethod(); 219 } 220 }; 221 222 Table htable = spy(createTable(name)); 223 doAnswer(a).when(htable).getScanner((Scan) anyObject()); 224 return htable; 225 } 226 227 /** 228 * Create a table that throws a DoNoRetryIOException on first scanner next 229 * call 230 * 231 * @throws IOException 232 */ 233 static Table createDNRIOEScannerTable(byte[] name, final int failCnt) 234 throws IOException { 235 // build up a mock scanner stuff to fail the first time 236 Answer<ResultScanner> a = new Answer<ResultScanner>() { 237 int cnt = 0; 238 239 @Override 240 public ResultScanner answer(InvocationOnMock invocation) throws Throwable { 241 // first invocation return the busted mock scanner 242 if (cnt++ < failCnt) { 243 // create mock ResultScanner that always fails. 244 Scan scan = mock(Scan.class); 245 doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe 246 ResultScanner scanner = mock(ResultScanner.class); 247 248 invocation.callRealMethod(); // simulate NotServingRegionException 249 doThrow( 250 new NotServingRegionException("Injected simulated TimeoutException")) 251 .when(scanner).next(); 252 return scanner; 253 } 254 255 // otherwise return the real scanner. 256 return (ResultScanner) invocation.callRealMethod(); 257 } 258 }; 259 260 Table htable = spy(createTable(name)); 261 doAnswer(a).when(htable).getScanner((Scan) anyObject()); 262 return htable; 263 } 264 265 /** 266 * Run test assuming no errors using mapred api. 267 * 268 * @throws IOException 269 */ 270 @Test 271 public void testTableRecordReader() throws IOException { 272 Table table = createTable("table1".getBytes()); 273 runTestMapred(table); 274 } 275 276 /** 277 * Run test assuming Scanner IOException failure using mapred api, 278 * 279 * @throws IOException 280 */ 281 @Test 282 public void testTableRecordReaderScannerFail() throws IOException { 283 Table htable = createIOEScannerTable("table2".getBytes(), 1); 284 runTestMapred(htable); 285 } 286 287 /** 288 * Run test assuming Scanner IOException failure using mapred api, 289 * 290 * @throws IOException 291 */ 292 @Test(expected = IOException.class) 293 public void testTableRecordReaderScannerFailTwice() throws IOException { 294 Table htable = createIOEScannerTable("table3".getBytes(), 2); 295 runTestMapred(htable); 296 } 297 298 /** 299 * Run test assuming NotServingRegionException using mapred api. 300 * 301 * @throws org.apache.hadoop.hbase.DoNotRetryIOException 302 */ 303 @Test 304 public void testTableRecordReaderScannerTimeout() throws IOException { 305 Table htable = createDNRIOEScannerTable("table4".getBytes(), 1); 306 runTestMapred(htable); 307 } 308 309 /** 310 * Run test assuming NotServingRegionException using mapred api. 311 * 312 * @throws org.apache.hadoop.hbase.DoNotRetryIOException 313 */ 314 @Test(expected = org.apache.hadoop.hbase.NotServingRegionException.class) 315 public void testTableRecordReaderScannerTimeoutTwice() throws IOException { 316 Table htable = createDNRIOEScannerTable("table5".getBytes(), 2); 317 runTestMapred(htable); 318 } 319 320 /** 321 * Verify the example we present in javadocs on TableInputFormatBase 322 */ 323 @Test 324 public void testExtensionOfTableInputFormatBase() throws IOException { 325 LOG.info("testing use of an InputFormat taht extends InputFormatBase"); 326 final Table table = createTable(Bytes.toBytes("exampleTable"), 327 new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }); 328 testInputFormat(ExampleTIF.class); 329 } 330 331 @Test 332 public void testDeprecatedExtensionOfTableInputFormatBase() throws IOException { 333 LOG.info("testing use of an InputFormat taht extends InputFormatBase, " 334 + "as it was given in 0.98."); 335 final Table table = createTable(Bytes.toBytes("exampleDeprecatedTable"), 336 new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }); 337 testInputFormat(ExampleDeprecatedTIF.class); 338 } 339 340 @Test 341 public void testJobConfigurableExtensionOfTableInputFormatBase() throws IOException { 342 LOG.info("testing use of an InputFormat taht extends InputFormatBase, " 343 + "using JobConfigurable."); 344 final Table table = createTable(Bytes.toBytes("exampleJobConfigurableTable"), 345 new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }); 346 testInputFormat(ExampleJobConfigurableTIF.class); 347 } 348 349 void testInputFormat(Class<? extends InputFormat> clazz) throws IOException { 350 Configuration conf = UTIL.getConfiguration(); 351 final JobConf job = new JobConf(conf); 352 job.setInputFormat(clazz); 353 job.setOutputFormat(NullOutputFormat.class); 354 job.setMapperClass(ExampleVerifier.class); 355 job.setNumReduceTasks(0); 356 LOG.debug("submitting job."); 357 final RunningJob run = JobClient.runJob(job); 358 assertTrue("job failed!", run.isSuccessful()); 359 assertEquals("Saw the wrong number of instances of the filtered-for row.", 2, run.getCounters() 360 .findCounter(TestTableInputFormat.class.getName() + ":row", "aaa").getCounter()); 361 assertEquals("Saw any instances of the filtered out row.", 0, run.getCounters() 362 .findCounter(TestTableInputFormat.class.getName() + ":row", "bbb").getCounter()); 363 assertEquals("Saw the wrong number of instances of columnA.", 1, run.getCounters() 364 .findCounter(TestTableInputFormat.class.getName() + ":family", "columnA").getCounter()); 365 assertEquals("Saw the wrong number of instances of columnB.", 1, run.getCounters() 366 .findCounter(TestTableInputFormat.class.getName() + ":family", "columnB").getCounter()); 367 assertEquals("Saw the wrong count of values for the filtered-for row.", 2, run.getCounters() 368 .findCounter(TestTableInputFormat.class.getName() + ":value", "value aaa").getCounter()); 369 assertEquals("Saw the wrong count of values for the filtered-out row.", 0, run.getCounters() 370 .findCounter(TestTableInputFormat.class.getName() + ":value", "value bbb").getCounter()); 371 } 372 373 public static class ExampleVerifier implements TableMap<NullWritable, NullWritable> { 374 375 @Override 376 public void configure(JobConf conf) { 377 } 378 379 @Override 380 public void map(ImmutableBytesWritable key, Result value, 381 OutputCollector<NullWritable,NullWritable> output, 382 Reporter reporter) throws IOException { 383 for (Cell cell : value.listCells()) { 384 reporter.getCounter(TestTableInputFormat.class.getName() + ":row", 385 Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())) 386 .increment(1l); 387 reporter.getCounter(TestTableInputFormat.class.getName() + ":family", 388 Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())) 389 .increment(1l); 390 reporter.getCounter(TestTableInputFormat.class.getName() + ":value", 391 Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())) 392 .increment(1l); 393 } 394 } 395 396 @Override 397 public void close() { 398 } 399 400 } 401 402 public static class ExampleDeprecatedTIF extends TableInputFormatBase implements JobConfigurable { 403 404 @Override 405 public void configure(JobConf job) { 406 try { 407 Connection connection = ConnectionFactory.createConnection(job); 408 Table exampleTable = connection.getTable(TableName.valueOf("exampleDeprecatedTable")); 409 // mandatory 410 initializeTable(connection, exampleTable.getName()); 411 byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"), 412 Bytes.toBytes("columnB") }; 413 // mandatory 414 setInputColumns(inputColumns); 415 Filter exampleFilter = 416 new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*")); 417 // optional 418 setRowFilter(exampleFilter); 419 } catch (IOException exception) { 420 throw new RuntimeException("Failed to configure for job.", exception); 421 } 422 } 423 424 } 425 426 public static class ExampleJobConfigurableTIF extends ExampleTIF implements JobConfigurable { 427 428 @Override 429 public void configure(JobConf job) { 430 try { 431 initialize(job); 432 } catch (IOException exception) { 433 throw new RuntimeException("Failed to initialize.", exception); 434 } 435 } 436 437 @Override 438 protected void initialize(JobConf job) throws IOException { 439 initialize(job, "exampleJobConfigurableTable"); 440 } 441 } 442 443 444 public static class ExampleTIF extends TableInputFormatBase { 445 446 @Override 447 protected void initialize(JobConf job) throws IOException { 448 initialize(job, "exampleTable"); 449 } 450 451 protected void initialize(JobConf job, String table) throws IOException { 452 Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job)); 453 TableName tableName = TableName.valueOf(table); 454 // mandatory 455 initializeTable(connection, tableName); 456 byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"), 457 Bytes.toBytes("columnB") }; 458 // mandatory 459 setInputColumns(inputColumns); 460 Filter exampleFilter = 461 new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*")); 462 // optional 463 setRowFilter(exampleFilter); 464 } 465 466 } 467 468} 469