001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.mockito.Matchers.anyObject; 024import static org.mockito.Mockito.doAnswer; 025import static org.mockito.Mockito.doReturn; 026import static org.mockito.Mockito.doThrow; 027import static org.mockito.Mockito.mock; 028import static org.mockito.Mockito.spy; 029 030import java.io.IOException; 031import java.util.Arrays; 032import java.util.Map; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.CompareOperator; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseConfiguration; 037import org.apache.hadoop.hbase.HBaseTestingUtility; 038import org.apache.hadoop.hbase.NotServingRegionException; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.Connection; 041import org.apache.hadoop.hbase.client.ConnectionFactory; 042import org.apache.hadoop.hbase.client.Put; 043import org.apache.hadoop.hbase.client.Result; 044import org.apache.hadoop.hbase.client.ResultScanner; 045import org.apache.hadoop.hbase.client.Scan; 046import org.apache.hadoop.hbase.client.Table; 047import org.apache.hadoop.hbase.filter.Filter; 048import org.apache.hadoop.hbase.filter.RegexStringComparator; 049import org.apache.hadoop.hbase.filter.RowFilter; 050import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 051import org.apache.hadoop.hbase.testclassification.LargeTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.io.NullWritable; 054import org.apache.hadoop.mapred.JobConf; 055import org.apache.hadoop.mapred.JobConfigurable; 056import org.apache.hadoop.mapred.MiniMRCluster; 057import org.apache.hadoop.mapreduce.InputFormat; 058import org.apache.hadoop.mapreduce.Job; 059import org.apache.hadoop.mapreduce.JobContext; 060import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 061import org.junit.AfterClass; 062import org.junit.Before; 063import org.junit.BeforeClass; 064import org.junit.ClassRule; 065import org.junit.Test; 066import org.junit.experimental.categories.Category; 067import org.mockito.invocation.InvocationOnMock; 068import org.mockito.stubbing.Answer; 069import org.slf4j.Logger; 070import org.slf4j.LoggerFactory; 071 072/** 073 * This tests the TableInputFormat and its recovery semantics 074 */ 075@Category(LargeTests.class) 076public class TestTableInputFormat { 077 078 @ClassRule 079 public static final HBaseClassTestRule CLASS_RULE = 080 HBaseClassTestRule.forClass(TestTableInputFormat.class); 081 082 private static final Logger LOG = LoggerFactory.getLogger(TestTableInputFormat.class); 083 084 private final static HBaseTestingUtility UTIL = new HBaseTestingUtility(); 085 private static MiniMRCluster mrCluster; 086 static final byte[] FAMILY = Bytes.toBytes("family"); 087 088 private static final byte[][] columns = new byte[][] { FAMILY }; 089 090 @BeforeClass 091 public static void beforeClass() throws Exception { 092 UTIL.startMiniCluster(); 093 } 094 095 @AfterClass 096 public static void afterClass() throws Exception { 097 UTIL.shutdownMiniCluster(); 098 } 099 100 @Before 101 public void before() throws IOException { 102 LOG.info("before"); 103 UTIL.ensureSomeRegionServersAvailable(1); 104 LOG.info("before done"); 105 } 106 107 /** 108 * Setup a table with two rows and values. n * @return A Table instance for the created table. n 109 */ 110 public static Table createTable(byte[] tableName) throws IOException { 111 return createTable(tableName, new byte[][] { FAMILY }); 112 } 113 114 /** 115 * Setup a table with two rows and values per column family. n * @return A Table instance for the 116 * created table. n 117 */ 118 public static Table createTable(byte[] tableName, byte[][] families) throws IOException { 119 Table table = UTIL.createTable(TableName.valueOf(tableName), families); 120 Put p = new Put("aaa".getBytes()); 121 for (byte[] family : families) { 122 p.addColumn(family, null, "value aaa".getBytes()); 123 } 124 table.put(p); 125 p = new Put("bbb".getBytes()); 126 for (byte[] family : families) { 127 p.addColumn(family, null, "value bbb".getBytes()); 128 } 129 table.put(p); 130 return table; 131 } 132 133 /** 134 * Verify that the result and key have expected values. 135 * @param r single row result 136 * @param key the row key 137 * @param expectedKey the expected key 138 * @param expectedValue the expected value 139 * @return true if the result contains the expected key and value, false otherwise. 140 */ 141 static boolean checkResult(Result r, ImmutableBytesWritable key, byte[] expectedKey, 142 byte[] expectedValue) { 143 assertEquals(0, key.compareTo(expectedKey)); 144 Map<byte[], byte[]> vals = r.getFamilyMap(FAMILY); 145 byte[] value = vals.values().iterator().next(); 146 assertTrue(Arrays.equals(value, expectedValue)); 147 return true; // if succeed 148 } 149 150 /** 151 * Create table data and run tests on specified htable using the o.a.h.hbase.mapreduce API. nnn 152 */ 153 static void runTestMapreduce(Table table) throws IOException, InterruptedException { 154 org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl trr = 155 new org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl(); 156 Scan s = new Scan(); 157 s.setStartRow("aaa".getBytes()); 158 s.setStopRow("zzz".getBytes()); 159 s.addFamily(FAMILY); 160 trr.setScan(s); 161 trr.setHTable(table); 162 163 trr.initialize(null, null); 164 Result r = new Result(); 165 ImmutableBytesWritable key = new ImmutableBytesWritable(); 166 167 boolean more = trr.nextKeyValue(); 168 assertTrue(more); 169 key = trr.getCurrentKey(); 170 r = trr.getCurrentValue(); 171 checkResult(r, key, "aaa".getBytes(), "value aaa".getBytes()); 172 173 more = trr.nextKeyValue(); 174 assertTrue(more); 175 key = trr.getCurrentKey(); 176 r = trr.getCurrentValue(); 177 checkResult(r, key, "bbb".getBytes(), "value bbb".getBytes()); 178 179 // no more data 180 more = trr.nextKeyValue(); 181 assertFalse(more); 182 } 183 184 /** 185 * Create a table that IOE's on first scanner next call n 186 */ 187 static Table createIOEScannerTable(byte[] name, final int failCnt) throws IOException { 188 // build up a mock scanner stuff to fail the first time 189 Answer<ResultScanner> a = new Answer<ResultScanner>() { 190 int cnt = 0; 191 192 @Override 193 public ResultScanner answer(InvocationOnMock invocation) throws Throwable { 194 // first invocation return the busted mock scanner 195 if (cnt++ < failCnt) { 196 // create mock ResultScanner that always fails. 197 Scan scan = mock(Scan.class); 198 doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe 199 ResultScanner scanner = mock(ResultScanner.class); 200 // simulate TimeoutException / IOException 201 doThrow(new IOException("Injected exception")).when(scanner).next(); 202 return scanner; 203 } 204 205 // otherwise return the real scanner. 206 return (ResultScanner) invocation.callRealMethod(); 207 } 208 }; 209 210 Table htable = spy(createTable(name)); 211 doAnswer(a).when(htable).getScanner((Scan) anyObject()); 212 return htable; 213 } 214 215 /** 216 * Create a table that throws a NotServingRegionException on first scanner next call n 217 */ 218 static Table createDNRIOEScannerTable(byte[] name, final int failCnt) throws IOException { 219 // build up a mock scanner stuff to fail the first time 220 Answer<ResultScanner> a = new Answer<ResultScanner>() { 221 int cnt = 0; 222 223 @Override 224 public ResultScanner answer(InvocationOnMock invocation) throws Throwable { 225 // first invocation return the busted mock scanner 226 if (cnt++ < failCnt) { 227 // create mock ResultScanner that always fails. 228 Scan scan = mock(Scan.class); 229 doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe 230 ResultScanner scanner = mock(ResultScanner.class); 231 232 invocation.callRealMethod(); // simulate NotServingRegionException 233 doThrow(new NotServingRegionException("Injected simulated TimeoutException")) 234 .when(scanner).next(); 235 return scanner; 236 } 237 238 // otherwise return the real scanner. 239 return (ResultScanner) invocation.callRealMethod(); 240 } 241 }; 242 243 Table htable = spy(createTable(name)); 244 doAnswer(a).when(htable).getScanner((Scan) anyObject()); 245 return htable; 246 } 247 248 /** 249 * Run test assuming no errors using newer mapreduce api nn 250 */ 251 @Test 252 public void testTableRecordReaderMapreduce() throws IOException, InterruptedException { 253 Table table = createTable("table1-mr".getBytes()); 254 runTestMapreduce(table); 255 } 256 257 /** 258 * Run test assuming Scanner IOException failure using newer mapreduce api nn 259 */ 260 @Test 261 public void testTableRecordReaderScannerFailMapreduce() throws IOException, InterruptedException { 262 Table htable = createIOEScannerTable("table2-mr".getBytes(), 1); 263 runTestMapreduce(htable); 264 } 265 266 /** 267 * Run test assuming Scanner IOException failure using newer mapreduce api nn 268 */ 269 @Test(expected = IOException.class) 270 public void testTableRecordReaderScannerFailMapreduceTwice() 271 throws IOException, InterruptedException { 272 Table htable = createIOEScannerTable("table3-mr".getBytes(), 2); 273 runTestMapreduce(htable); 274 } 275 276 /** 277 * Run test assuming NotServingRegionException using newer mapreduce api n * @throws 278 * org.apache.hadoop.hbase.DoNotRetryIOException 279 */ 280 @Test 281 public void testTableRecordReaderScannerTimeoutMapreduce() 282 throws IOException, InterruptedException { 283 Table htable = createDNRIOEScannerTable("table4-mr".getBytes(), 1); 284 runTestMapreduce(htable); 285 } 286 287 /** 288 * Run test assuming NotServingRegionException using newer mapreduce api n * @throws 289 * org.apache.hadoop.hbase.NotServingRegionException 290 */ 291 @Test(expected = org.apache.hadoop.hbase.NotServingRegionException.class) 292 public void testTableRecordReaderScannerTimeoutMapreduceTwice() 293 throws IOException, InterruptedException { 294 Table htable = createDNRIOEScannerTable("table5-mr".getBytes(), 2); 295 runTestMapreduce(htable); 296 } 297 298 /** 299 * Verify the example we present in javadocs on TableInputFormatBase 300 */ 301 @Test 302 public void testExtensionOfTableInputFormatBase() 303 throws IOException, InterruptedException, ClassNotFoundException { 304 LOG.info("testing use of an InputFormat taht extends InputFormatBase"); 305 final Table htable = createTable(Bytes.toBytes("exampleTable"), 306 new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }); 307 testInputFormat(ExampleTIF.class); 308 } 309 310 @Test 311 public void testJobConfigurableExtensionOfTableInputFormatBase() 312 throws IOException, InterruptedException, ClassNotFoundException { 313 LOG.info( 314 "testing use of an InputFormat taht extends InputFormatBase, " + "using JobConfigurable."); 315 final Table htable = createTable(Bytes.toBytes("exampleJobConfigurableTable"), 316 new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }); 317 testInputFormat(ExampleJobConfigurableTIF.class); 318 } 319 320 @Test 321 public void testDeprecatedExtensionOfTableInputFormatBase() 322 throws IOException, InterruptedException, ClassNotFoundException { 323 LOG.info("testing use of an InputFormat taht extends InputFormatBase, " 324 + "using the approach documented in 0.98."); 325 final Table htable = createTable(Bytes.toBytes("exampleDeprecatedTable"), 326 new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }); 327 testInputFormat(ExampleDeprecatedTIF.class); 328 } 329 330 void testInputFormat(Class<? extends InputFormat> clazz) 331 throws IOException, InterruptedException, ClassNotFoundException { 332 final Job job = MapreduceTestingShim.createJob(UTIL.getConfiguration()); 333 job.setInputFormatClass(clazz); 334 job.setOutputFormatClass(NullOutputFormat.class); 335 job.setMapperClass(ExampleVerifier.class); 336 job.setNumReduceTasks(0); 337 338 LOG.debug("submitting job."); 339 assertTrue("job failed!", job.waitForCompletion(true)); 340 assertEquals("Saw the wrong number of instances of the filtered-for row.", 2, job.getCounters() 341 .findCounter(TestTableInputFormat.class.getName() + ":row", "aaa").getValue()); 342 assertEquals("Saw any instances of the filtered out row.", 0, job.getCounters() 343 .findCounter(TestTableInputFormat.class.getName() + ":row", "bbb").getValue()); 344 assertEquals("Saw the wrong number of instances of columnA.", 1, job.getCounters() 345 .findCounter(TestTableInputFormat.class.getName() + ":family", "columnA").getValue()); 346 assertEquals("Saw the wrong number of instances of columnB.", 1, job.getCounters() 347 .findCounter(TestTableInputFormat.class.getName() + ":family", "columnB").getValue()); 348 assertEquals("Saw the wrong count of values for the filtered-for row.", 2, job.getCounters() 349 .findCounter(TestTableInputFormat.class.getName() + ":value", "value aaa").getValue()); 350 assertEquals("Saw the wrong count of values for the filtered-out row.", 0, job.getCounters() 351 .findCounter(TestTableInputFormat.class.getName() + ":value", "value bbb").getValue()); 352 } 353 354 public static class ExampleVerifier extends TableMapper<NullWritable, NullWritable> { 355 356 @Override 357 public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException { 358 for (Cell cell : value.listCells()) { 359 context 360 .getCounter(TestTableInputFormat.class.getName() + ":row", 361 Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())) 362 .increment(1l); 363 context 364 .getCounter(TestTableInputFormat.class.getName() + ":family", 365 Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())) 366 .increment(1l); 367 context 368 .getCounter(TestTableInputFormat.class.getName() + ":value", 369 Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())) 370 .increment(1l); 371 } 372 } 373 374 } 375 376 public static class ExampleDeprecatedTIF extends TableInputFormatBase implements JobConfigurable { 377 378 @Override 379 public void configure(JobConf job) { 380 try { 381 Connection connection = ConnectionFactory.createConnection(job); 382 Table exampleTable = connection.getTable(TableName.valueOf(("exampleDeprecatedTable"))); 383 // mandatory 384 initializeTable(connection, exampleTable.getName()); 385 byte[][] inputColumns = new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }; 386 // optional 387 Scan scan = new Scan(); 388 for (byte[] family : inputColumns) { 389 scan.addFamily(family); 390 } 391 Filter exampleFilter = 392 new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*")); 393 scan.setFilter(exampleFilter); 394 setScan(scan); 395 } catch (IOException exception) { 396 throw new RuntimeException("Failed to configure for job.", exception); 397 } 398 } 399 400 } 401 402 public static class ExampleJobConfigurableTIF extends TableInputFormatBase 403 implements JobConfigurable { 404 405 @Override 406 public void configure(JobConf job) { 407 try { 408 Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job)); 409 TableName tableName = TableName.valueOf("exampleJobConfigurableTable"); 410 // mandatory 411 initializeTable(connection, tableName); 412 byte[][] inputColumns = new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }; 413 // optional 414 Scan scan = new Scan(); 415 for (byte[] family : inputColumns) { 416 scan.addFamily(family); 417 } 418 Filter exampleFilter = 419 new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*")); 420 scan.setFilter(exampleFilter); 421 setScan(scan); 422 } catch (IOException exception) { 423 throw new RuntimeException("Failed to initialize.", exception); 424 } 425 } 426 } 427 428 public static class ExampleTIF extends TableInputFormatBase { 429 430 @Override 431 protected void initialize(JobContext job) throws IOException { 432 Connection connection = 433 ConnectionFactory.createConnection(HBaseConfiguration.create(job.getConfiguration())); 434 TableName tableName = TableName.valueOf("exampleTable"); 435 // mandatory 436 initializeTable(connection, tableName); 437 byte[][] inputColumns = new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }; 438 // optional 439 Scan scan = new Scan(); 440 for (byte[] family : inputColumns) { 441 scan.addFamily(family); 442 } 443 Filter exampleFilter = 444 new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*")); 445 scan.setFilter(exampleFilter); 446 setScan(scan); 447 } 448 449 } 450}