001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.mockito.Matchers.anyObject;
024import static org.mockito.Mockito.doAnswer;
025import static org.mockito.Mockito.doReturn;
026import static org.mockito.Mockito.doThrow;
027import static org.mockito.Mockito.mock;
028import static org.mockito.Mockito.spy;
029
030import java.io.IOException;
031import java.util.Arrays;
032import java.util.Map;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CompareOperator;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseConfiguration;
037import org.apache.hadoop.hbase.HBaseTestingUtility;
038import org.apache.hadoop.hbase.NotServingRegionException;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.Connection;
041import org.apache.hadoop.hbase.client.ConnectionFactory;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.Result;
044import org.apache.hadoop.hbase.client.ResultScanner;
045import org.apache.hadoop.hbase.client.Scan;
046import org.apache.hadoop.hbase.client.Table;
047import org.apache.hadoop.hbase.filter.Filter;
048import org.apache.hadoop.hbase.filter.RegexStringComparator;
049import org.apache.hadoop.hbase.filter.RowFilter;
050import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
051import org.apache.hadoop.hbase.testclassification.LargeTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.io.NullWritable;
054import org.apache.hadoop.mapred.JobConf;
055import org.apache.hadoop.mapred.JobConfigurable;
056import org.apache.hadoop.mapred.MiniMRCluster;
057import org.apache.hadoop.mapreduce.InputFormat;
058import org.apache.hadoop.mapreduce.Job;
059import org.apache.hadoop.mapreduce.JobContext;
060import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
061import org.junit.AfterClass;
062import org.junit.Before;
063import org.junit.BeforeClass;
064import org.junit.ClassRule;
065import org.junit.Test;
066import org.junit.experimental.categories.Category;
067import org.mockito.invocation.InvocationOnMock;
068import org.mockito.stubbing.Answer;
069import org.slf4j.Logger;
070import org.slf4j.LoggerFactory;
071
072/**
073 * This tests the TableInputFormat and its recovery semantics
074 *
075 */
076@Category(LargeTests.class)
077public class TestTableInputFormat {
078
079  @ClassRule
080  public static final HBaseClassTestRule CLASS_RULE =
081      HBaseClassTestRule.forClass(TestTableInputFormat.class);
082
083  private static final Logger LOG = LoggerFactory.getLogger(TestTableInputFormat.class);
084
085  private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
086  private static MiniMRCluster mrCluster;
087  static final byte[] FAMILY = Bytes.toBytes("family");
088
089  private static final byte[][] columns = new byte[][] { FAMILY };
090
091  @BeforeClass
092  public static void beforeClass() throws Exception {
093    UTIL.startMiniCluster();
094  }
095
096  @AfterClass
097  public static void afterClass() throws Exception {
098    UTIL.shutdownMiniCluster();
099  }
100
101  @Before
102  public void before() throws IOException {
103    LOG.info("before");
104    UTIL.ensureSomeRegionServersAvailable(1);
105    LOG.info("before done");
106  }
107
108  /**
109   * Setup a table with two rows and values.
110   *
111   * @param tableName
112   * @return
113   * @throws IOException
114   */
115  public static Table createTable(byte[] tableName) throws IOException {
116    return createTable(tableName, new byte[][] { FAMILY });
117  }
118
119  /**
120   * Setup a table with two rows and values per column family.
121   *
122   * @param tableName
123   * @return
124   * @throws IOException
125   */
126  public static Table createTable(byte[] tableName, byte[][] families) throws IOException {
127    Table table = UTIL.createTable(TableName.valueOf(tableName), families);
128    Put p = new Put("aaa".getBytes());
129    for (byte[] family : families) {
130      p.addColumn(family, null, "value aaa".getBytes());
131    }
132    table.put(p);
133    p = new Put("bbb".getBytes());
134    for (byte[] family : families) {
135      p.addColumn(family, null, "value bbb".getBytes());
136    }
137    table.put(p);
138    return table;
139  }
140
141  /**
142   * Verify that the result and key have expected values.
143   *
144   * @param r single row result
145   * @param key the row key
146   * @param expectedKey the expected key
147   * @param expectedValue the expected value
148   * @return true if the result contains the expected key and value, false otherwise.
149   */
150  static boolean checkResult(Result r, ImmutableBytesWritable key,
151      byte[] expectedKey, byte[] expectedValue) {
152    assertEquals(0, key.compareTo(expectedKey));
153    Map<byte[], byte[]> vals = r.getFamilyMap(FAMILY);
154    byte[] value = vals.values().iterator().next();
155    assertTrue(Arrays.equals(value, expectedValue));
156    return true; // if succeed
157  }
158
159  /**
160   * Create table data and run tests on specified htable using the
161   * o.a.h.hbase.mapreduce API.
162   *
163   * @param table
164   * @throws IOException
165   * @throws InterruptedException
166   */
167  static void runTestMapreduce(Table table) throws IOException,
168      InterruptedException {
169    org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl trr =
170        new org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl();
171    Scan s = new Scan();
172    s.setStartRow("aaa".getBytes());
173    s.setStopRow("zzz".getBytes());
174    s.addFamily(FAMILY);
175    trr.setScan(s);
176    trr.setHTable(table);
177
178    trr.initialize(null, null);
179    Result r = new Result();
180    ImmutableBytesWritable key = new ImmutableBytesWritable();
181
182    boolean more = trr.nextKeyValue();
183    assertTrue(more);
184    key = trr.getCurrentKey();
185    r = trr.getCurrentValue();
186    checkResult(r, key, "aaa".getBytes(), "value aaa".getBytes());
187
188    more = trr.nextKeyValue();
189    assertTrue(more);
190    key = trr.getCurrentKey();
191    r = trr.getCurrentValue();
192    checkResult(r, key, "bbb".getBytes(), "value bbb".getBytes());
193
194    // no more data
195    more = trr.nextKeyValue();
196    assertFalse(more);
197  }
198
199  /**
200   * Create a table that IOE's on first scanner next call
201   *
202   * @throws IOException
203   */
204  static Table createIOEScannerTable(byte[] name, final int failCnt)
205      throws IOException {
206    // build up a mock scanner stuff to fail the first time
207    Answer<ResultScanner> a = new Answer<ResultScanner>() {
208      int cnt = 0;
209
210      @Override
211      public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
212        // first invocation return the busted mock scanner
213        if (cnt++ < failCnt) {
214          // create mock ResultScanner that always fails.
215          Scan scan = mock(Scan.class);
216          doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe
217          ResultScanner scanner = mock(ResultScanner.class);
218          // simulate TimeoutException / IOException
219          doThrow(new IOException("Injected exception")).when(scanner).next();
220          return scanner;
221        }
222
223        // otherwise return the real scanner.
224        return (ResultScanner) invocation.callRealMethod();
225      }
226    };
227
228    Table htable = spy(createTable(name));
229    doAnswer(a).when(htable).getScanner((Scan) anyObject());
230    return htable;
231  }
232
233  /**
234   * Create a table that throws a NotServingRegionException on first scanner
235   * next call
236   *
237   * @throws IOException
238   */
239  static Table createDNRIOEScannerTable(byte[] name, final int failCnt)
240      throws IOException {
241    // build up a mock scanner stuff to fail the first time
242    Answer<ResultScanner> a = new Answer<ResultScanner>() {
243      int cnt = 0;
244
245      @Override
246      public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
247        // first invocation return the busted mock scanner
248        if (cnt++ < failCnt) {
249          // create mock ResultScanner that always fails.
250          Scan scan = mock(Scan.class);
251          doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe
252          ResultScanner scanner = mock(ResultScanner.class);
253
254          invocation.callRealMethod(); // simulate NotServingRegionException
255          doThrow(
256              new NotServingRegionException("Injected simulated TimeoutException"))
257              .when(scanner).next();
258          return scanner;
259        }
260
261        // otherwise return the real scanner.
262        return (ResultScanner) invocation.callRealMethod();
263      }
264    };
265
266    Table htable = spy(createTable(name));
267    doAnswer(a).when(htable).getScanner((Scan) anyObject());
268    return htable;
269  }
270
271  /**
272   * Run test assuming no errors using newer mapreduce api
273   *
274   * @throws IOException
275   * @throws InterruptedException
276   */
277  @Test
278  public void testTableRecordReaderMapreduce() throws IOException,
279      InterruptedException {
280    Table table = createTable("table1-mr".getBytes());
281    runTestMapreduce(table);
282  }
283
284  /**
285   * Run test assuming Scanner IOException failure using newer mapreduce api
286   *
287   * @throws IOException
288   * @throws InterruptedException
289   */
290  @Test
291  public void testTableRecordReaderScannerFailMapreduce() throws IOException,
292      InterruptedException {
293    Table htable = createIOEScannerTable("table2-mr".getBytes(), 1);
294    runTestMapreduce(htable);
295  }
296
297  /**
298   * Run test assuming Scanner IOException failure using newer mapreduce api
299   *
300   * @throws IOException
301   * @throws InterruptedException
302   */
303  @Test(expected = IOException.class)
304  public void testTableRecordReaderScannerFailMapreduceTwice() throws IOException,
305      InterruptedException {
306    Table htable = createIOEScannerTable("table3-mr".getBytes(), 2);
307    runTestMapreduce(htable);
308  }
309
310  /**
311   * Run test assuming NotServingRegionException using newer mapreduce api
312   *
313   * @throws InterruptedException
314   * @throws org.apache.hadoop.hbase.DoNotRetryIOException
315   */
316  @Test
317  public void testTableRecordReaderScannerTimeoutMapreduce()
318      throws IOException, InterruptedException {
319    Table htable = createDNRIOEScannerTable("table4-mr".getBytes(), 1);
320    runTestMapreduce(htable);
321  }
322
323  /**
324   * Run test assuming NotServingRegionException using newer mapreduce api
325   *
326   * @throws InterruptedException
327   * @throws org.apache.hadoop.hbase.NotServingRegionException
328   */
329  @Test(expected = org.apache.hadoop.hbase.NotServingRegionException.class)
330  public void testTableRecordReaderScannerTimeoutMapreduceTwice()
331      throws IOException, InterruptedException {
332    Table htable = createDNRIOEScannerTable("table5-mr".getBytes(), 2);
333    runTestMapreduce(htable);
334  }
335
336  /**
337   * Verify the example we present in javadocs on TableInputFormatBase
338   */
339  @Test
340  public void testExtensionOfTableInputFormatBase()
341      throws IOException, InterruptedException, ClassNotFoundException {
342    LOG.info("testing use of an InputFormat taht extends InputFormatBase");
343    final Table htable = createTable(Bytes.toBytes("exampleTable"),
344      new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
345    testInputFormat(ExampleTIF.class);
346  }
347
348  @Test
349  public void testJobConfigurableExtensionOfTableInputFormatBase()
350      throws IOException, InterruptedException, ClassNotFoundException {
351    LOG.info("testing use of an InputFormat taht extends InputFormatBase, " +
352        "using JobConfigurable.");
353    final Table htable = createTable(Bytes.toBytes("exampleJobConfigurableTable"),
354      new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
355    testInputFormat(ExampleJobConfigurableTIF.class);
356  }
357
358  @Test
359  public void testDeprecatedExtensionOfTableInputFormatBase()
360      throws IOException, InterruptedException, ClassNotFoundException {
361    LOG.info("testing use of an InputFormat taht extends InputFormatBase, " +
362        "using the approach documented in 0.98.");
363    final Table htable = createTable(Bytes.toBytes("exampleDeprecatedTable"),
364      new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
365    testInputFormat(ExampleDeprecatedTIF.class);
366  }
367
368  void testInputFormat(Class<? extends InputFormat> clazz)
369      throws IOException, InterruptedException, ClassNotFoundException {
370    final Job job = MapreduceTestingShim.createJob(UTIL.getConfiguration());
371    job.setInputFormatClass(clazz);
372    job.setOutputFormatClass(NullOutputFormat.class);
373    job.setMapperClass(ExampleVerifier.class);
374    job.setNumReduceTasks(0);
375
376    LOG.debug("submitting job.");
377    assertTrue("job failed!", job.waitForCompletion(true));
378    assertEquals("Saw the wrong number of instances of the filtered-for row.", 2, job.getCounters()
379        .findCounter(TestTableInputFormat.class.getName() + ":row", "aaa").getValue());
380    assertEquals("Saw any instances of the filtered out row.", 0, job.getCounters()
381        .findCounter(TestTableInputFormat.class.getName() + ":row", "bbb").getValue());
382    assertEquals("Saw the wrong number of instances of columnA.", 1, job.getCounters()
383        .findCounter(TestTableInputFormat.class.getName() + ":family", "columnA").getValue());
384    assertEquals("Saw the wrong number of instances of columnB.", 1, job.getCounters()
385        .findCounter(TestTableInputFormat.class.getName() + ":family", "columnB").getValue());
386    assertEquals("Saw the wrong count of values for the filtered-for row.", 2, job.getCounters()
387        .findCounter(TestTableInputFormat.class.getName() + ":value", "value aaa").getValue());
388    assertEquals("Saw the wrong count of values for the filtered-out row.", 0, job.getCounters()
389        .findCounter(TestTableInputFormat.class.getName() + ":value", "value bbb").getValue());
390  }
391
392  public static class ExampleVerifier extends TableMapper<NullWritable, NullWritable> {
393
394    @Override
395    public void map(ImmutableBytesWritable key, Result value, Context context)
396        throws IOException {
397      for (Cell cell : value.listCells()) {
398        context.getCounter(TestTableInputFormat.class.getName() + ":row",
399            Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()))
400            .increment(1l);
401        context.getCounter(TestTableInputFormat.class.getName() + ":family",
402            Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()))
403            .increment(1l);
404        context.getCounter(TestTableInputFormat.class.getName() + ":value",
405            Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()))
406            .increment(1l);
407      }
408    }
409
410  }
411
412  public static class ExampleDeprecatedTIF extends TableInputFormatBase implements JobConfigurable {
413
414    @Override
415    public void configure(JobConf job) {
416      try {
417        Connection connection = ConnectionFactory.createConnection(job);
418        Table exampleTable = connection.getTable(TableName.valueOf(("exampleDeprecatedTable")));
419        // mandatory
420        initializeTable(connection, exampleTable.getName());
421        byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
422          Bytes.toBytes("columnB") };
423        // optional
424        Scan scan = new Scan();
425        for (byte[] family : inputColumns) {
426          scan.addFamily(family);
427        }
428        Filter exampleFilter =
429          new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*"));
430        scan.setFilter(exampleFilter);
431        setScan(scan);
432      } catch (IOException exception) {
433        throw new RuntimeException("Failed to configure for job.", exception);
434      }
435    }
436
437  }
438
439
440  public static class ExampleJobConfigurableTIF extends TableInputFormatBase
441      implements JobConfigurable {
442
443    @Override
444    public void configure(JobConf job) {
445      try {
446        Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job));
447        TableName tableName = TableName.valueOf("exampleJobConfigurableTable");
448        // mandatory
449        initializeTable(connection, tableName);
450        byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
451          Bytes.toBytes("columnB") };
452        //optional
453        Scan scan = new Scan();
454        for (byte[] family : inputColumns) {
455          scan.addFamily(family);
456        }
457        Filter exampleFilter =
458          new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*"));
459        scan.setFilter(exampleFilter);
460        setScan(scan);
461      } catch (IOException exception) {
462        throw new RuntimeException("Failed to initialize.", exception);
463      }
464    }
465  }
466
467
468  public static class ExampleTIF extends TableInputFormatBase {
469
470    @Override
471    protected void initialize(JobContext job) throws IOException {
472      Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(
473          job.getConfiguration()));
474      TableName tableName = TableName.valueOf("exampleTable");
475      // mandatory
476      initializeTable(connection, tableName);
477      byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
478        Bytes.toBytes("columnB") };
479      //optional
480      Scan scan = new Scan();
481      for (byte[] family : inputColumns) {
482        scan.addFamily(family);
483      }
484      Filter exampleFilter =
485        new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*"));
486      scan.setFilter(exampleFilter);
487      setScan(scan);
488    }
489
490  }
491}
492