001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.mockito.Matchers.anyObject; 024import static org.mockito.Mockito.doAnswer; 025import static org.mockito.Mockito.doReturn; 026import static org.mockito.Mockito.doThrow; 027import static org.mockito.Mockito.mock; 028import static org.mockito.Mockito.spy; 029 030import java.io.IOException; 031import java.util.Arrays; 032import java.util.Map; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.CompareOperator; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseConfiguration; 037import org.apache.hadoop.hbase.HBaseTestingUtil; 038import org.apache.hadoop.hbase.NotServingRegionException; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.Connection; 041import org.apache.hadoop.hbase.client.ConnectionFactory; 042import org.apache.hadoop.hbase.client.Put; 043import org.apache.hadoop.hbase.client.Result; 044import org.apache.hadoop.hbase.client.ResultScanner; 045import org.apache.hadoop.hbase.client.Scan; 046import org.apache.hadoop.hbase.client.Table; 047import org.apache.hadoop.hbase.filter.Filter; 048import org.apache.hadoop.hbase.filter.RegexStringComparator; 049import org.apache.hadoop.hbase.filter.RowFilter; 050import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 051import org.apache.hadoop.hbase.testclassification.LargeTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.io.NullWritable; 054import org.apache.hadoop.mapred.JobConf; 055import org.apache.hadoop.mapred.JobConfigurable; 056import org.apache.hadoop.mapred.MiniMRCluster; 057import org.apache.hadoop.mapreduce.InputFormat; 058import org.apache.hadoop.mapreduce.Job; 059import org.apache.hadoop.mapreduce.JobContext; 060import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 061import org.junit.AfterClass; 062import org.junit.Before; 063import org.junit.BeforeClass; 064import org.junit.ClassRule; 065import org.junit.Test; 066import org.junit.experimental.categories.Category; 067import org.mockito.invocation.InvocationOnMock; 068import org.mockito.stubbing.Answer; 069import org.slf4j.Logger; 070import org.slf4j.LoggerFactory; 071 072/** 073 * This tests the TableInputFormat and its recovery semantics 074 */ 075@Category(LargeTests.class) 076public class TestTableInputFormat { 077 078 @ClassRule 079 public static final HBaseClassTestRule CLASS_RULE = 080 HBaseClassTestRule.forClass(TestTableInputFormat.class); 081 082 private static final Logger LOG = LoggerFactory.getLogger(TestTableInputFormat.class); 083 084 private final static HBaseTestingUtil UTIL = new HBaseTestingUtil(); 085 private static MiniMRCluster mrCluster; 086 static final byte[] FAMILY = Bytes.toBytes("family"); 087 088 private static final byte[][] columns = new byte[][] { FAMILY }; 089 090 @BeforeClass 091 public static void beforeClass() throws Exception { 092 UTIL.startMiniCluster(); 093 } 094 095 @AfterClass 096 public static void afterClass() throws Exception { 097 UTIL.shutdownMiniCluster(); 098 } 099 100 @Before 101 public void before() throws IOException { 102 LOG.info("before"); 103 UTIL.ensureSomeRegionServersAvailable(1); 104 LOG.info("before done"); 105 } 106 107 /** 108 * Setup a table with two rows and values. 109 * @return A Table instance for the created table. 110 */ 111 public static Table createTable(byte[] tableName) throws IOException { 112 return createTable(tableName, new byte[][] { FAMILY }); 113 } 114 115 /** 116 * Setup a table with two rows and values per column family. 117 * @return A Table instance for the created table. 118 */ 119 public static Table createTable(byte[] tableName, byte[][] families) throws IOException { 120 Table table = UTIL.createTable(TableName.valueOf(tableName), families); 121 Put p = new Put(Bytes.toBytes("aaa")); 122 for (byte[] family : families) { 123 p.addColumn(family, null, Bytes.toBytes("value aaa")); 124 } 125 table.put(p); 126 p = new Put(Bytes.toBytes("bbb")); 127 for (byte[] family : families) { 128 p.addColumn(family, null, Bytes.toBytes("value bbb")); 129 } 130 table.put(p); 131 return table; 132 } 133 134 /** 135 * Verify that the result and key have expected values. 136 * @param r single row result 137 * @param key the row key 138 * @param expectedKey the expected key 139 * @param expectedValue the expected value 140 * @return true if the result contains the expected key and value, false otherwise. 141 */ 142 static boolean checkResult(Result r, ImmutableBytesWritable key, byte[] expectedKey, 143 byte[] expectedValue) { 144 assertEquals(0, key.compareTo(expectedKey)); 145 Map<byte[], byte[]> vals = r.getFamilyMap(FAMILY); 146 byte[] value = vals.values().iterator().next(); 147 assertTrue(Arrays.equals(value, expectedValue)); 148 return true; // if succeed 149 } 150 151 /** 152 * Create table data and run tests on specified htable using the o.a.h.hbase.mapreduce API. 153 */ 154 static void runTestMapreduce(Table table) throws IOException, InterruptedException { 155 org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl trr = 156 new org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl(); 157 Scan s = new Scan(); 158 s.withStartRow(Bytes.toBytes("aaa")); 159 s.withStopRow(Bytes.toBytes("zzz")); 160 s.addFamily(FAMILY); 161 trr.setScan(s); 162 trr.setHTable(table); 163 164 trr.initialize(null, null); 165 Result r = new Result(); 166 ImmutableBytesWritable key = new ImmutableBytesWritable(); 167 168 boolean more = trr.nextKeyValue(); 169 assertTrue(more); 170 key = trr.getCurrentKey(); 171 r = trr.getCurrentValue(); 172 checkResult(r, key, Bytes.toBytes("aaa"), Bytes.toBytes("value aaa")); 173 174 more = trr.nextKeyValue(); 175 assertTrue(more); 176 key = trr.getCurrentKey(); 177 r = trr.getCurrentValue(); 178 checkResult(r, key, Bytes.toBytes("bbb"), Bytes.toBytes("value bbb")); 179 180 // no more data 181 more = trr.nextKeyValue(); 182 assertFalse(more); 183 } 184 185 /** 186 * Create a table that IOE's on first scanner next call 187 */ 188 static Table createIOEScannerTable(byte[] name, final int failCnt) throws IOException { 189 // build up a mock scanner stuff to fail the first time 190 Answer<ResultScanner> a = new Answer<ResultScanner>() { 191 int cnt = 0; 192 193 @Override 194 public ResultScanner answer(InvocationOnMock invocation) throws Throwable { 195 // first invocation return the busted mock scanner 196 if (cnt++ < failCnt) { 197 // create mock ResultScanner that always fails. 198 Scan scan = mock(Scan.class); 199 doReturn(Bytes.toBytes("bogus")).when(scan).getStartRow(); // avoid npe 200 ResultScanner scanner = mock(ResultScanner.class); 201 // simulate TimeoutException / IOException 202 doThrow(new IOException("Injected exception")).when(scanner).next(); 203 return scanner; 204 } 205 206 // otherwise return the real scanner. 207 return (ResultScanner) invocation.callRealMethod(); 208 } 209 }; 210 211 Table htable = spy(createTable(name)); 212 doAnswer(a).when(htable).getScanner((Scan) anyObject()); 213 return htable; 214 } 215 216 /** 217 * Create a table that throws a NotServingRegionException on first scanner next call 218 */ 219 static Table createDNRIOEScannerTable(byte[] name, final int failCnt) throws IOException { 220 // build up a mock scanner stuff to fail the first time 221 Answer<ResultScanner> a = new Answer<ResultScanner>() { 222 int cnt = 0; 223 224 @Override 225 public ResultScanner answer(InvocationOnMock invocation) throws Throwable { 226 // first invocation return the busted mock scanner 227 if (cnt++ < failCnt) { 228 // create mock ResultScanner that always fails. 229 Scan scan = mock(Scan.class); 230 doReturn(Bytes.toBytes("bogus")).when(scan).getStartRow(); // avoid npe 231 ResultScanner scanner = mock(ResultScanner.class); 232 233 invocation.callRealMethod(); // simulate NotServingRegionException 234 doThrow(new NotServingRegionException("Injected simulated TimeoutException")) 235 .when(scanner).next(); 236 return scanner; 237 } 238 239 // otherwise return the real scanner. 240 return (ResultScanner) invocation.callRealMethod(); 241 } 242 }; 243 244 Table htable = spy(createTable(name)); 245 doAnswer(a).when(htable).getScanner((Scan) anyObject()); 246 return htable; 247 } 248 249 /** 250 * Run test assuming no errors using newer mapreduce api 251 */ 252 @Test 253 public void testTableRecordReaderMapreduce() throws IOException, InterruptedException { 254 Table table = createTable(Bytes.toBytes("table1-mr")); 255 runTestMapreduce(table); 256 } 257 258 /** 259 * Run test assuming Scanner IOException failure using newer mapreduce api 260 */ 261 @Test 262 public void testTableRecordReaderScannerFailMapreduce() throws IOException, InterruptedException { 263 Table htable = createIOEScannerTable(Bytes.toBytes("table2-mr"), 1); 264 runTestMapreduce(htable); 265 } 266 267 /** 268 * Run test assuming Scanner IOException failure using newer mapreduce api 269 */ 270 @Test(expected = IOException.class) 271 public void testTableRecordReaderScannerFailMapreduceTwice() 272 throws IOException, InterruptedException { 273 Table htable = createIOEScannerTable(Bytes.toBytes("table3-mr"), 2); 274 runTestMapreduce(htable); 275 } 276 277 /** 278 * Run test assuming NotServingRegionException using newer mapreduce api 279 */ 280 @Test 281 public void testTableRecordReaderScannerTimeoutMapreduce() 282 throws IOException, InterruptedException { 283 Table htable = createDNRIOEScannerTable(Bytes.toBytes("table4-mr"), 1); 284 runTestMapreduce(htable); 285 } 286 287 /** 288 * Run test assuming NotServingRegionException using newer mapreduce api 289 */ 290 @Test(expected = org.apache.hadoop.hbase.NotServingRegionException.class) 291 public void testTableRecordReaderScannerTimeoutMapreduceTwice() 292 throws IOException, InterruptedException { 293 Table htable = createDNRIOEScannerTable(Bytes.toBytes("table5-mr"), 2); 294 runTestMapreduce(htable); 295 } 296 297 /** 298 * Verify the example we present in javadocs on TableInputFormatBase 299 */ 300 @Test 301 public void testExtensionOfTableInputFormatBase() 302 throws IOException, InterruptedException, ClassNotFoundException { 303 LOG.info("testing use of an InputFormat taht extends InputFormatBase"); 304 final Table htable = createTable(Bytes.toBytes("exampleTable"), 305 new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }); 306 testInputFormat(ExampleTIF.class); 307 } 308 309 @Test 310 public void testJobConfigurableExtensionOfTableInputFormatBase() 311 throws IOException, InterruptedException, ClassNotFoundException { 312 LOG.info( 313 "testing use of an InputFormat taht extends InputFormatBase, " + "using JobConfigurable."); 314 final Table htable = createTable(Bytes.toBytes("exampleJobConfigurableTable"), 315 new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }); 316 testInputFormat(ExampleJobConfigurableTIF.class); 317 } 318 319 @Test 320 public void testDeprecatedExtensionOfTableInputFormatBase() 321 throws IOException, InterruptedException, ClassNotFoundException { 322 LOG.info("testing use of an InputFormat taht extends InputFormatBase, " 323 + "using the approach documented in 0.98."); 324 final Table htable = createTable(Bytes.toBytes("exampleDeprecatedTable"), 325 new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }); 326 testInputFormat(ExampleDeprecatedTIF.class); 327 } 328 329 void testInputFormat(Class<? extends InputFormat> clazz) 330 throws IOException, InterruptedException, ClassNotFoundException { 331 final Job job = MapreduceTestingShim.createJob(UTIL.getConfiguration()); 332 job.setInputFormatClass(clazz); 333 job.setOutputFormatClass(NullOutputFormat.class); 334 job.setMapperClass(ExampleVerifier.class); 335 job.setNumReduceTasks(0); 336 337 LOG.debug("submitting job."); 338 assertTrue("job failed!", job.waitForCompletion(true)); 339 assertEquals("Saw the wrong number of instances of the filtered-for row.", 2, job.getCounters() 340 .findCounter(TestTableInputFormat.class.getName() + ":row", "aaa").getValue()); 341 assertEquals("Saw any instances of the filtered out row.", 0, job.getCounters() 342 .findCounter(TestTableInputFormat.class.getName() + ":row", "bbb").getValue()); 343 assertEquals("Saw the wrong number of instances of columnA.", 1, job.getCounters() 344 .findCounter(TestTableInputFormat.class.getName() + ":family", "columnA").getValue()); 345 assertEquals("Saw the wrong number of instances of columnB.", 1, job.getCounters() 346 .findCounter(TestTableInputFormat.class.getName() + ":family", "columnB").getValue()); 347 assertEquals("Saw the wrong count of values for the filtered-for row.", 2, job.getCounters() 348 .findCounter(TestTableInputFormat.class.getName() + ":value", "value aaa").getValue()); 349 assertEquals("Saw the wrong count of values for the filtered-out row.", 0, job.getCounters() 350 .findCounter(TestTableInputFormat.class.getName() + ":value", "value bbb").getValue()); 351 } 352 353 public static class ExampleVerifier extends TableMapper<NullWritable, NullWritable> { 354 355 @Override 356 public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException { 357 for (Cell cell : value.listCells()) { 358 context 359 .getCounter(TestTableInputFormat.class.getName() + ":row", 360 Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())) 361 .increment(1l); 362 context 363 .getCounter(TestTableInputFormat.class.getName() + ":family", 364 Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())) 365 .increment(1l); 366 context 367 .getCounter(TestTableInputFormat.class.getName() + ":value", 368 Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())) 369 .increment(1l); 370 } 371 } 372 373 } 374 375 public static class ExampleDeprecatedTIF extends TableInputFormatBase implements JobConfigurable { 376 377 @Override 378 public void configure(JobConf job) { 379 try { 380 Connection connection = ConnectionFactory.createConnection(job); 381 Table exampleTable = connection.getTable(TableName.valueOf(("exampleDeprecatedTable"))); 382 // mandatory 383 initializeTable(connection, exampleTable.getName()); 384 byte[][] inputColumns = new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }; 385 // optional 386 Scan scan = new Scan(); 387 for (byte[] family : inputColumns) { 388 scan.addFamily(family); 389 } 390 Filter exampleFilter = 391 new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*")); 392 scan.setFilter(exampleFilter); 393 setScan(scan); 394 } catch (IOException exception) { 395 throw new RuntimeException("Failed to configure for job.", exception); 396 } 397 } 398 399 } 400 401 public static class ExampleJobConfigurableTIF extends TableInputFormatBase 402 implements JobConfigurable { 403 404 @Override 405 public void configure(JobConf job) { 406 try { 407 Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job)); 408 TableName tableName = TableName.valueOf("exampleJobConfigurableTable"); 409 // mandatory 410 initializeTable(connection, tableName); 411 byte[][] inputColumns = new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }; 412 // optional 413 Scan scan = new Scan(); 414 for (byte[] family : inputColumns) { 415 scan.addFamily(family); 416 } 417 Filter exampleFilter = 418 new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*")); 419 scan.setFilter(exampleFilter); 420 setScan(scan); 421 } catch (IOException exception) { 422 throw new RuntimeException("Failed to initialize.", exception); 423 } 424 } 425 } 426 427 public static class ExampleTIF extends TableInputFormatBase { 428 429 @Override 430 protected void initialize(JobContext job) throws IOException { 431 Connection connection = 432 ConnectionFactory.createConnection(HBaseConfiguration.create(job.getConfiguration())); 433 TableName tableName = TableName.valueOf("exampleTable"); 434 // mandatory 435 initializeTable(connection, tableName); 436 byte[][] inputColumns = new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }; 437 // optional 438 Scan scan = new Scan(); 439 for (byte[] family : inputColumns) { 440 scan.addFamily(family); 441 } 442 Filter exampleFilter = 443 new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*")); 444 scan.setFilter(exampleFilter); 445 setScan(scan); 446 } 447 448 } 449}