001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapred; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.mockito.Matchers.anyObject; 024import static org.mockito.Mockito.doAnswer; 025import static org.mockito.Mockito.doReturn; 026import static org.mockito.Mockito.doThrow; 027import static org.mockito.Mockito.mock; 028import static org.mockito.Mockito.spy; 029 030import java.io.IOException; 031import java.util.Arrays; 032import java.util.Map; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.CompareOperator; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseConfiguration; 038import org.apache.hadoop.hbase.HBaseTestingUtility; 039import org.apache.hadoop.hbase.NotServingRegionException; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.client.Connection; 042import org.apache.hadoop.hbase.client.ConnectionFactory; 043import org.apache.hadoop.hbase.client.Put; 044import org.apache.hadoop.hbase.client.Result; 045import org.apache.hadoop.hbase.client.ResultScanner; 046import org.apache.hadoop.hbase.client.Scan; 047import org.apache.hadoop.hbase.client.Table; 048import org.apache.hadoop.hbase.filter.Filter; 049import org.apache.hadoop.hbase.filter.RegexStringComparator; 050import org.apache.hadoop.hbase.filter.RowFilter; 051import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 052import org.apache.hadoop.hbase.testclassification.LargeTests; 053import org.apache.hadoop.hbase.testclassification.MapReduceTests; 054import org.apache.hadoop.hbase.util.Bytes; 055import org.apache.hadoop.io.NullWritable; 056import org.apache.hadoop.mapred.InputFormat; 057import org.apache.hadoop.mapred.JobClient; 058import org.apache.hadoop.mapred.JobConf; 059import org.apache.hadoop.mapred.JobConfigurable; 060import org.apache.hadoop.mapred.OutputCollector; 061import org.apache.hadoop.mapred.Reporter; 062import org.apache.hadoop.mapred.RunningJob; 063import org.apache.hadoop.mapred.lib.NullOutputFormat; 064import org.junit.AfterClass; 065import org.junit.Before; 066import org.junit.BeforeClass; 067import org.junit.ClassRule; 068import org.junit.Test; 069import org.junit.experimental.categories.Category; 070import org.mockito.invocation.InvocationOnMock; 071import org.mockito.stubbing.Answer; 072import org.slf4j.Logger; 073import org.slf4j.LoggerFactory; 074 075/** 076 * This tests the TableInputFormat and its recovery semantics 077 */ 078@Category({ MapReduceTests.class, LargeTests.class }) 079public class TestTableInputFormat { 080 081 @ClassRule 082 public static final HBaseClassTestRule CLASS_RULE = 083 HBaseClassTestRule.forClass(TestTableInputFormat.class); 084 085 private static final Logger LOG = LoggerFactory.getLogger(TestTableInputFormat.class); 086 087 private final static HBaseTestingUtility UTIL = new HBaseTestingUtility(); 088 089 static final byte[] FAMILY = Bytes.toBytes("family"); 090 091 private static final byte[][] columns = new byte[][] { FAMILY }; 092 093 @BeforeClass 094 public static void beforeClass() throws Exception { 095 UTIL.startMiniCluster(); 096 } 097 098 @AfterClass 099 public static void afterClass() throws Exception { 100 UTIL.shutdownMiniCluster(); 101 } 102 103 @Before 104 public void before() throws IOException { 105 LOG.info("before"); 106 UTIL.ensureSomeRegionServersAvailable(1); 107 LOG.info("before done"); 108 } 109 110 /** 111 * Setup a table with two rows and values. 112 * @param tableName the name of the table to create 113 * @return A Table instance for the created table. n 114 */ 115 public static Table createTable(byte[] tableName) throws IOException { 116 return createTable(tableName, new byte[][] { FAMILY }); 117 } 118 119 /** 120 * Setup a table with two rows and values per column family. n * @return A Table instance for the 121 * created table. n 122 */ 123 public static Table createTable(byte[] tableName, byte[][] families) throws IOException { 124 Table table = UTIL.createTable(TableName.valueOf(tableName), families); 125 Put p = new Put("aaa".getBytes()); 126 for (byte[] family : families) { 127 p.addColumn(family, null, "value aaa".getBytes()); 128 } 129 table.put(p); 130 p = new Put("bbb".getBytes()); 131 for (byte[] family : families) { 132 p.addColumn(family, null, "value bbb".getBytes()); 133 } 134 table.put(p); 135 return table; 136 } 137 138 /** 139 * Verify that the result and key have expected values. 140 * @param r single row result 141 * @param key the row key 142 * @param expectedKey the expected key 143 * @param expectedValue the expected value 144 * @return true if the result contains the expected key and value, false otherwise. 145 */ 146 static boolean checkResult(Result r, ImmutableBytesWritable key, byte[] expectedKey, 147 byte[] expectedValue) { 148 assertEquals(0, key.compareTo(expectedKey)); 149 Map<byte[], byte[]> vals = r.getFamilyMap(FAMILY); 150 byte[] value = vals.values().iterator().next(); 151 assertTrue(Arrays.equals(value, expectedValue)); 152 return true; // if succeed 153 } 154 155 /** 156 * Create table data and run tests on specified htable using the o.a.h.hbase.mapred API. nn 157 */ 158 static void runTestMapred(Table table) throws IOException { 159 org.apache.hadoop.hbase.mapred.TableRecordReader trr = 160 new org.apache.hadoop.hbase.mapred.TableRecordReader(); 161 trr.setStartRow("aaa".getBytes()); 162 trr.setEndRow("zzz".getBytes()); 163 trr.setHTable(table); 164 trr.setInputColumns(columns); 165 166 trr.init(); 167 Result r = new Result(); 168 ImmutableBytesWritable key = new ImmutableBytesWritable(); 169 170 boolean more = trr.next(key, r); 171 assertTrue(more); 172 checkResult(r, key, "aaa".getBytes(), "value aaa".getBytes()); 173 174 more = trr.next(key, r); 175 assertTrue(more); 176 checkResult(r, key, "bbb".getBytes(), "value bbb".getBytes()); 177 178 // no more data 179 more = trr.next(key, r); 180 assertFalse(more); 181 } 182 183 /** 184 * Create a table that IOE's on first scanner next call n 185 */ 186 static Table createIOEScannerTable(byte[] name, final int failCnt) throws IOException { 187 // build up a mock scanner stuff to fail the first time 188 Answer<ResultScanner> a = new Answer<ResultScanner>() { 189 int cnt = 0; 190 191 @Override 192 public ResultScanner answer(InvocationOnMock invocation) throws Throwable { 193 // first invocation return the busted mock scanner 194 if (cnt++ < failCnt) { 195 // create mock ResultScanner that always fails. 196 Scan scan = mock(Scan.class); 197 doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe 198 ResultScanner scanner = mock(ResultScanner.class); 199 // simulate TimeoutException / IOException 200 doThrow(new IOException("Injected exception")).when(scanner).next(); 201 return scanner; 202 } 203 204 // otherwise return the real scanner. 205 return (ResultScanner) invocation.callRealMethod(); 206 } 207 }; 208 209 Table htable = spy(createTable(name)); 210 doAnswer(a).when(htable).getScanner((Scan) anyObject()); 211 return htable; 212 } 213 214 /** 215 * Create a table that throws a DoNoRetryIOException on first scanner next call n 216 */ 217 static Table createDNRIOEScannerTable(byte[] name, final int failCnt) throws IOException { 218 // build up a mock scanner stuff to fail the first time 219 Answer<ResultScanner> a = new Answer<ResultScanner>() { 220 int cnt = 0; 221 222 @Override 223 public ResultScanner answer(InvocationOnMock invocation) throws Throwable { 224 // first invocation return the busted mock scanner 225 if (cnt++ < failCnt) { 226 // create mock ResultScanner that always fails. 227 Scan scan = mock(Scan.class); 228 doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe 229 ResultScanner scanner = mock(ResultScanner.class); 230 231 invocation.callRealMethod(); // simulate NotServingRegionException 232 doThrow(new NotServingRegionException("Injected simulated TimeoutException")) 233 .when(scanner).next(); 234 return scanner; 235 } 236 237 // otherwise return the real scanner. 238 return (ResultScanner) invocation.callRealMethod(); 239 } 240 }; 241 242 Table htable = spy(createTable(name)); 243 doAnswer(a).when(htable).getScanner((Scan) anyObject()); 244 return htable; 245 } 246 247 /** 248 * Run test assuming no errors using mapred api. n 249 */ 250 @Test 251 public void testTableRecordReader() throws IOException { 252 Table table = createTable("table1".getBytes()); 253 runTestMapred(table); 254 } 255 256 /** 257 * Run test assuming Scanner IOException failure using mapred api, n 258 */ 259 @Test 260 public void testTableRecordReaderScannerFail() throws IOException { 261 Table htable = createIOEScannerTable("table2".getBytes(), 1); 262 runTestMapred(htable); 263 } 264 265 /** 266 * Run test assuming Scanner IOException failure using mapred api, n 267 */ 268 @Test(expected = IOException.class) 269 public void testTableRecordReaderScannerFailTwice() throws IOException { 270 Table htable = createIOEScannerTable("table3".getBytes(), 2); 271 runTestMapred(htable); 272 } 273 274 /** 275 * Run test assuming NotServingRegionException using mapred api. 276 * @throws org.apache.hadoop.hbase.DoNotRetryIOException 277 */ 278 @Test 279 public void testTableRecordReaderScannerTimeout() throws IOException { 280 Table htable = createDNRIOEScannerTable("table4".getBytes(), 1); 281 runTestMapred(htable); 282 } 283 284 /** 285 * Run test assuming NotServingRegionException using mapred api. 286 * @throws org.apache.hadoop.hbase.DoNotRetryIOException 287 */ 288 @Test(expected = org.apache.hadoop.hbase.NotServingRegionException.class) 289 public void testTableRecordReaderScannerTimeoutTwice() throws IOException { 290 Table htable = createDNRIOEScannerTable("table5".getBytes(), 2); 291 runTestMapred(htable); 292 } 293 294 /** 295 * Verify the example we present in javadocs on TableInputFormatBase 296 */ 297 @Test 298 public void testExtensionOfTableInputFormatBase() throws IOException { 299 LOG.info("testing use of an InputFormat taht extends InputFormatBase"); 300 final Table table = createTable(Bytes.toBytes("exampleTable"), 301 new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }); 302 testInputFormat(ExampleTIF.class); 303 } 304 305 @Test 306 public void testDeprecatedExtensionOfTableInputFormatBase() throws IOException { 307 LOG.info( 308 "testing use of an InputFormat taht extends InputFormatBase, " + "as it was given in 0.98."); 309 final Table table = createTable(Bytes.toBytes("exampleDeprecatedTable"), 310 new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }); 311 testInputFormat(ExampleDeprecatedTIF.class); 312 } 313 314 @Test 315 public void testJobConfigurableExtensionOfTableInputFormatBase() throws IOException { 316 LOG.info( 317 "testing use of an InputFormat taht extends InputFormatBase, " + "using JobConfigurable."); 318 final Table table = createTable(Bytes.toBytes("exampleJobConfigurableTable"), 319 new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }); 320 testInputFormat(ExampleJobConfigurableTIF.class); 321 } 322 323 void testInputFormat(Class<? extends InputFormat> clazz) throws IOException { 324 Configuration conf = UTIL.getConfiguration(); 325 final JobConf job = new JobConf(conf); 326 job.setInputFormat(clazz); 327 job.setOutputFormat(NullOutputFormat.class); 328 job.setMapperClass(ExampleVerifier.class); 329 job.setNumReduceTasks(0); 330 LOG.debug("submitting job."); 331 final RunningJob run = JobClient.runJob(job); 332 assertTrue("job failed!", run.isSuccessful()); 333 assertEquals("Saw the wrong number of instances of the filtered-for row.", 2, run.getCounters() 334 .findCounter(TestTableInputFormat.class.getName() + ":row", "aaa").getCounter()); 335 assertEquals("Saw any instances of the filtered out row.", 0, run.getCounters() 336 .findCounter(TestTableInputFormat.class.getName() + ":row", "bbb").getCounter()); 337 assertEquals("Saw the wrong number of instances of columnA.", 1, run.getCounters() 338 .findCounter(TestTableInputFormat.class.getName() + ":family", "columnA").getCounter()); 339 assertEquals("Saw the wrong number of instances of columnB.", 1, run.getCounters() 340 .findCounter(TestTableInputFormat.class.getName() + ":family", "columnB").getCounter()); 341 assertEquals("Saw the wrong count of values for the filtered-for row.", 2, run.getCounters() 342 .findCounter(TestTableInputFormat.class.getName() + ":value", "value aaa").getCounter()); 343 assertEquals("Saw the wrong count of values for the filtered-out row.", 0, run.getCounters() 344 .findCounter(TestTableInputFormat.class.getName() + ":value", "value bbb").getCounter()); 345 } 346 347 public static class ExampleVerifier implements TableMap<NullWritable, NullWritable> { 348 349 @Override 350 public void configure(JobConf conf) { 351 } 352 353 @Override 354 public void map(ImmutableBytesWritable key, Result value, 355 OutputCollector<NullWritable, NullWritable> output, Reporter reporter) throws IOException { 356 for (Cell cell : value.listCells()) { 357 reporter 358 .getCounter(TestTableInputFormat.class.getName() + ":row", 359 Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())) 360 .increment(1l); 361 reporter 362 .getCounter(TestTableInputFormat.class.getName() + ":family", 363 Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())) 364 .increment(1l); 365 reporter 366 .getCounter(TestTableInputFormat.class.getName() + ":value", 367 Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())) 368 .increment(1l); 369 } 370 } 371 372 @Override 373 public void close() { 374 } 375 376 } 377 378 public static class ExampleDeprecatedTIF extends TableInputFormatBase implements JobConfigurable { 379 380 @Override 381 public void configure(JobConf job) { 382 try { 383 Connection connection = ConnectionFactory.createConnection(job); 384 Table exampleTable = connection.getTable(TableName.valueOf("exampleDeprecatedTable")); 385 // mandatory 386 initializeTable(connection, exampleTable.getName()); 387 byte[][] inputColumns = new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }; 388 // mandatory 389 setInputColumns(inputColumns); 390 Filter exampleFilter = 391 new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*")); 392 // optional 393 setRowFilter(exampleFilter); 394 } catch (IOException exception) { 395 throw new RuntimeException("Failed to configure for job.", exception); 396 } 397 } 398 399 } 400 401 public static class ExampleJobConfigurableTIF extends ExampleTIF implements JobConfigurable { 402 403 @Override 404 public void configure(JobConf job) { 405 try { 406 initialize(job); 407 } catch (IOException exception) { 408 throw new RuntimeException("Failed to initialize.", exception); 409 } 410 } 411 412 @Override 413 protected void initialize(JobConf job) throws IOException { 414 initialize(job, "exampleJobConfigurableTable"); 415 } 416 } 417 418 public static class ExampleTIF extends TableInputFormatBase { 419 420 @Override 421 protected void initialize(JobConf job) throws IOException { 422 initialize(job, "exampleTable"); 423 } 424 425 protected void initialize(JobConf job, String table) throws IOException { 426 Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job)); 427 TableName tableName = TableName.valueOf(table); 428 // mandatory 429 initializeTable(connection, tableName); 430 byte[][] inputColumns = new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }; 431 // mandatory 432 setInputColumns(inputColumns); 433 Filter exampleFilter = 434 new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*")); 435 // optional 436 setRowFilter(exampleFilter); 437 } 438 439 } 440 441}