001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.mockito.Matchers.anyObject;
024import static org.mockito.Mockito.doAnswer;
025import static org.mockito.Mockito.doReturn;
026import static org.mockito.Mockito.doThrow;
027import static org.mockito.Mockito.mock;
028import static org.mockito.Mockito.spy;
029
030import java.io.IOException;
031import java.util.Arrays;
032import java.util.Map;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CompareOperator;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseConfiguration;
037import org.apache.hadoop.hbase.HBaseTestingUtility;
038import org.apache.hadoop.hbase.NotServingRegionException;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.Connection;
041import org.apache.hadoop.hbase.client.ConnectionFactory;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.Result;
044import org.apache.hadoop.hbase.client.ResultScanner;
045import org.apache.hadoop.hbase.client.Scan;
046import org.apache.hadoop.hbase.client.Table;
047import org.apache.hadoop.hbase.filter.Filter;
048import org.apache.hadoop.hbase.filter.RegexStringComparator;
049import org.apache.hadoop.hbase.filter.RowFilter;
050import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
051import org.apache.hadoop.hbase.testclassification.LargeTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.io.NullWritable;
054import org.apache.hadoop.mapred.JobConf;
055import org.apache.hadoop.mapred.JobConfigurable;
056import org.apache.hadoop.mapred.MiniMRCluster;
057import org.apache.hadoop.mapreduce.InputFormat;
058import org.apache.hadoop.mapreduce.Job;
059import org.apache.hadoop.mapreduce.JobContext;
060import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
061import org.junit.AfterClass;
062import org.junit.Before;
063import org.junit.BeforeClass;
064import org.junit.ClassRule;
065import org.junit.Test;
066import org.junit.experimental.categories.Category;
067import org.mockito.invocation.InvocationOnMock;
068import org.mockito.stubbing.Answer;
069import org.slf4j.Logger;
070import org.slf4j.LoggerFactory;
071
072/**
073 * This tests the TableInputFormat and its recovery semantics
074 */
075@Category(LargeTests.class)
076public class TestTableInputFormat {
077
078  @ClassRule
079  public static final HBaseClassTestRule CLASS_RULE =
080    HBaseClassTestRule.forClass(TestTableInputFormat.class);
081
082  private static final Logger LOG = LoggerFactory.getLogger(TestTableInputFormat.class);
083
084  private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
085  private static MiniMRCluster mrCluster;
086  static final byte[] FAMILY = Bytes.toBytes("family");
087
088  private static final byte[][] columns = new byte[][] { FAMILY };
089
090  @BeforeClass
091  public static void beforeClass() throws Exception {
092    UTIL.startMiniCluster();
093  }
094
095  @AfterClass
096  public static void afterClass() throws Exception {
097    UTIL.shutdownMiniCluster();
098  }
099
100  @Before
101  public void before() throws IOException {
102    LOG.info("before");
103    UTIL.ensureSomeRegionServersAvailable(1);
104    LOG.info("before done");
105  }
106
107  /**
108   * Setup a table with two rows and values. n * @return A Table instance for the created table. n
109   */
110  public static Table createTable(byte[] tableName) throws IOException {
111    return createTable(tableName, new byte[][] { FAMILY });
112  }
113
114  /**
115   * Setup a table with two rows and values per column family. n * @return A Table instance for the
116   * created table. n
117   */
118  public static Table createTable(byte[] tableName, byte[][] families) throws IOException {
119    Table table = UTIL.createTable(TableName.valueOf(tableName), families);
120    Put p = new Put("aaa".getBytes());
121    for (byte[] family : families) {
122      p.addColumn(family, null, "value aaa".getBytes());
123    }
124    table.put(p);
125    p = new Put("bbb".getBytes());
126    for (byte[] family : families) {
127      p.addColumn(family, null, "value bbb".getBytes());
128    }
129    table.put(p);
130    return table;
131  }
132
133  /**
134   * Verify that the result and key have expected values.
135   * @param r             single row result
136   * @param key           the row key
137   * @param expectedKey   the expected key
138   * @param expectedValue the expected value
139   * @return true if the result contains the expected key and value, false otherwise.
140   */
141  static boolean checkResult(Result r, ImmutableBytesWritable key, byte[] expectedKey,
142    byte[] expectedValue) {
143    assertEquals(0, key.compareTo(expectedKey));
144    Map<byte[], byte[]> vals = r.getFamilyMap(FAMILY);
145    byte[] value = vals.values().iterator().next();
146    assertTrue(Arrays.equals(value, expectedValue));
147    return true; // if succeed
148  }
149
150  /**
151   * Create table data and run tests on specified htable using the o.a.h.hbase.mapreduce API. nnn
152   */
153  static void runTestMapreduce(Table table) throws IOException, InterruptedException {
154    org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl trr =
155      new org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl();
156    Scan s = new Scan();
157    s.setStartRow("aaa".getBytes());
158    s.setStopRow("zzz".getBytes());
159    s.addFamily(FAMILY);
160    trr.setScan(s);
161    trr.setHTable(table);
162
163    trr.initialize(null, null);
164    Result r = new Result();
165    ImmutableBytesWritable key = new ImmutableBytesWritable();
166
167    boolean more = trr.nextKeyValue();
168    assertTrue(more);
169    key = trr.getCurrentKey();
170    r = trr.getCurrentValue();
171    checkResult(r, key, "aaa".getBytes(), "value aaa".getBytes());
172
173    more = trr.nextKeyValue();
174    assertTrue(more);
175    key = trr.getCurrentKey();
176    r = trr.getCurrentValue();
177    checkResult(r, key, "bbb".getBytes(), "value bbb".getBytes());
178
179    // no more data
180    more = trr.nextKeyValue();
181    assertFalse(more);
182  }
183
184  /**
185   * Create a table that IOE's on first scanner next call n
186   */
187  static Table createIOEScannerTable(byte[] name, final int failCnt) throws IOException {
188    // build up a mock scanner stuff to fail the first time
189    Answer<ResultScanner> a = new Answer<ResultScanner>() {
190      int cnt = 0;
191
192      @Override
193      public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
194        // first invocation return the busted mock scanner
195        if (cnt++ < failCnt) {
196          // create mock ResultScanner that always fails.
197          Scan scan = mock(Scan.class);
198          doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe
199          ResultScanner scanner = mock(ResultScanner.class);
200          // simulate TimeoutException / IOException
201          doThrow(new IOException("Injected exception")).when(scanner).next();
202          return scanner;
203        }
204
205        // otherwise return the real scanner.
206        return (ResultScanner) invocation.callRealMethod();
207      }
208    };
209
210    Table htable = spy(createTable(name));
211    doAnswer(a).when(htable).getScanner((Scan) anyObject());
212    return htable;
213  }
214
215  /**
216   * Create a table that throws a NotServingRegionException on first scanner next call n
217   */
218  static Table createDNRIOEScannerTable(byte[] name, final int failCnt) throws IOException {
219    // build up a mock scanner stuff to fail the first time
220    Answer<ResultScanner> a = new Answer<ResultScanner>() {
221      int cnt = 0;
222
223      @Override
224      public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
225        // first invocation return the busted mock scanner
226        if (cnt++ < failCnt) {
227          // create mock ResultScanner that always fails.
228          Scan scan = mock(Scan.class);
229          doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe
230          ResultScanner scanner = mock(ResultScanner.class);
231
232          invocation.callRealMethod(); // simulate NotServingRegionException
233          doThrow(new NotServingRegionException("Injected simulated TimeoutException"))
234            .when(scanner).next();
235          return scanner;
236        }
237
238        // otherwise return the real scanner.
239        return (ResultScanner) invocation.callRealMethod();
240      }
241    };
242
243    Table htable = spy(createTable(name));
244    doAnswer(a).when(htable).getScanner((Scan) anyObject());
245    return htable;
246  }
247
248  /**
249   * Run test assuming no errors using newer mapreduce api nn
250   */
251  @Test
252  public void testTableRecordReaderMapreduce() throws IOException, InterruptedException {
253    Table table = createTable("table1-mr".getBytes());
254    runTestMapreduce(table);
255  }
256
257  /**
258   * Run test assuming Scanner IOException failure using newer mapreduce api nn
259   */
260  @Test
261  public void testTableRecordReaderScannerFailMapreduce() throws IOException, InterruptedException {
262    Table htable = createIOEScannerTable("table2-mr".getBytes(), 1);
263    runTestMapreduce(htable);
264  }
265
266  /**
267   * Run test assuming Scanner IOException failure using newer mapreduce api nn
268   */
269  @Test(expected = IOException.class)
270  public void testTableRecordReaderScannerFailMapreduceTwice()
271    throws IOException, InterruptedException {
272    Table htable = createIOEScannerTable("table3-mr".getBytes(), 2);
273    runTestMapreduce(htable);
274  }
275
276  /**
277   * Run test assuming NotServingRegionException using newer mapreduce api n * @throws
278   * org.apache.hadoop.hbase.DoNotRetryIOException
279   */
280  @Test
281  public void testTableRecordReaderScannerTimeoutMapreduce()
282    throws IOException, InterruptedException {
283    Table htable = createDNRIOEScannerTable("table4-mr".getBytes(), 1);
284    runTestMapreduce(htable);
285  }
286
287  /**
288   * Run test assuming NotServingRegionException using newer mapreduce api n * @throws
289   * org.apache.hadoop.hbase.NotServingRegionException
290   */
291  @Test(expected = org.apache.hadoop.hbase.NotServingRegionException.class)
292  public void testTableRecordReaderScannerTimeoutMapreduceTwice()
293    throws IOException, InterruptedException {
294    Table htable = createDNRIOEScannerTable("table5-mr".getBytes(), 2);
295    runTestMapreduce(htable);
296  }
297
298  /**
299   * Verify the example we present in javadocs on TableInputFormatBase
300   */
301  @Test
302  public void testExtensionOfTableInputFormatBase()
303    throws IOException, InterruptedException, ClassNotFoundException {
304    LOG.info("testing use of an InputFormat taht extends InputFormatBase");
305    final Table htable = createTable(Bytes.toBytes("exampleTable"),
306      new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
307    testInputFormat(ExampleTIF.class);
308  }
309
310  @Test
311  public void testJobConfigurableExtensionOfTableInputFormatBase()
312    throws IOException, InterruptedException, ClassNotFoundException {
313    LOG.info(
314      "testing use of an InputFormat taht extends InputFormatBase, " + "using JobConfigurable.");
315    final Table htable = createTable(Bytes.toBytes("exampleJobConfigurableTable"),
316      new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
317    testInputFormat(ExampleJobConfigurableTIF.class);
318  }
319
320  @Test
321  public void testDeprecatedExtensionOfTableInputFormatBase()
322    throws IOException, InterruptedException, ClassNotFoundException {
323    LOG.info("testing use of an InputFormat taht extends InputFormatBase, "
324      + "using the approach documented in 0.98.");
325    final Table htable = createTable(Bytes.toBytes("exampleDeprecatedTable"),
326      new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
327    testInputFormat(ExampleDeprecatedTIF.class);
328  }
329
330  void testInputFormat(Class<? extends InputFormat> clazz)
331    throws IOException, InterruptedException, ClassNotFoundException {
332    final Job job = MapreduceTestingShim.createJob(UTIL.getConfiguration());
333    job.setInputFormatClass(clazz);
334    job.setOutputFormatClass(NullOutputFormat.class);
335    job.setMapperClass(ExampleVerifier.class);
336    job.setNumReduceTasks(0);
337
338    LOG.debug("submitting job.");
339    assertTrue("job failed!", job.waitForCompletion(true));
340    assertEquals("Saw the wrong number of instances of the filtered-for row.", 2, job.getCounters()
341      .findCounter(TestTableInputFormat.class.getName() + ":row", "aaa").getValue());
342    assertEquals("Saw any instances of the filtered out row.", 0, job.getCounters()
343      .findCounter(TestTableInputFormat.class.getName() + ":row", "bbb").getValue());
344    assertEquals("Saw the wrong number of instances of columnA.", 1, job.getCounters()
345      .findCounter(TestTableInputFormat.class.getName() + ":family", "columnA").getValue());
346    assertEquals("Saw the wrong number of instances of columnB.", 1, job.getCounters()
347      .findCounter(TestTableInputFormat.class.getName() + ":family", "columnB").getValue());
348    assertEquals("Saw the wrong count of values for the filtered-for row.", 2, job.getCounters()
349      .findCounter(TestTableInputFormat.class.getName() + ":value", "value aaa").getValue());
350    assertEquals("Saw the wrong count of values for the filtered-out row.", 0, job.getCounters()
351      .findCounter(TestTableInputFormat.class.getName() + ":value", "value bbb").getValue());
352  }
353
354  public static class ExampleVerifier extends TableMapper<NullWritable, NullWritable> {
355
356    @Override
357    public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException {
358      for (Cell cell : value.listCells()) {
359        context
360          .getCounter(TestTableInputFormat.class.getName() + ":row",
361            Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()))
362          .increment(1l);
363        context
364          .getCounter(TestTableInputFormat.class.getName() + ":family",
365            Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()))
366          .increment(1l);
367        context
368          .getCounter(TestTableInputFormat.class.getName() + ":value",
369            Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()))
370          .increment(1l);
371      }
372    }
373
374  }
375
376  public static class ExampleDeprecatedTIF extends TableInputFormatBase implements JobConfigurable {
377
378    @Override
379    public void configure(JobConf job) {
380      try {
381        Connection connection = ConnectionFactory.createConnection(job);
382        Table exampleTable = connection.getTable(TableName.valueOf(("exampleDeprecatedTable")));
383        // mandatory
384        initializeTable(connection, exampleTable.getName());
385        byte[][] inputColumns = new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") };
386        // optional
387        Scan scan = new Scan();
388        for (byte[] family : inputColumns) {
389          scan.addFamily(family);
390        }
391        Filter exampleFilter =
392          new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*"));
393        scan.setFilter(exampleFilter);
394        setScan(scan);
395      } catch (IOException exception) {
396        throw new RuntimeException("Failed to configure for job.", exception);
397      }
398    }
399
400  }
401
402  public static class ExampleJobConfigurableTIF extends TableInputFormatBase
403    implements JobConfigurable {
404
405    @Override
406    public void configure(JobConf job) {
407      try {
408        Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job));
409        TableName tableName = TableName.valueOf("exampleJobConfigurableTable");
410        // mandatory
411        initializeTable(connection, tableName);
412        byte[][] inputColumns = new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") };
413        // optional
414        Scan scan = new Scan();
415        for (byte[] family : inputColumns) {
416          scan.addFamily(family);
417        }
418        Filter exampleFilter =
419          new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*"));
420        scan.setFilter(exampleFilter);
421        setScan(scan);
422      } catch (IOException exception) {
423        throw new RuntimeException("Failed to initialize.", exception);
424      }
425    }
426  }
427
428  public static class ExampleTIF extends TableInputFormatBase {
429
430    @Override
431    protected void initialize(JobContext job) throws IOException {
432      Connection connection =
433        ConnectionFactory.createConnection(HBaseConfiguration.create(job.getConfiguration()));
434      TableName tableName = TableName.valueOf("exampleTable");
435      // mandatory
436      initializeTable(connection, tableName);
437      byte[][] inputColumns = new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") };
438      // optional
439      Scan scan = new Scan();
440      for (byte[] family : inputColumns) {
441        scan.addFamily(family);
442      }
443      Filter exampleFilter =
444        new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*"));
445      scan.setFilter(exampleFilter);
446      setScan(scan);
447    }
448
449  }
450}