001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.mockito.Matchers.anyObject;
024import static org.mockito.Mockito.doAnswer;
025import static org.mockito.Mockito.doReturn;
026import static org.mockito.Mockito.doThrow;
027import static org.mockito.Mockito.mock;
028import static org.mockito.Mockito.spy;
029
030import java.io.IOException;
031import java.util.Arrays;
032import java.util.Map;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CompareOperator;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseConfiguration;
037import org.apache.hadoop.hbase.HBaseTestingUtil;
038import org.apache.hadoop.hbase.NotServingRegionException;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.Connection;
041import org.apache.hadoop.hbase.client.ConnectionFactory;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.Result;
044import org.apache.hadoop.hbase.client.ResultScanner;
045import org.apache.hadoop.hbase.client.Scan;
046import org.apache.hadoop.hbase.client.Table;
047import org.apache.hadoop.hbase.filter.Filter;
048import org.apache.hadoop.hbase.filter.RegexStringComparator;
049import org.apache.hadoop.hbase.filter.RowFilter;
050import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
051import org.apache.hadoop.hbase.testclassification.LargeTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.io.NullWritable;
054import org.apache.hadoop.mapred.JobConf;
055import org.apache.hadoop.mapred.JobConfigurable;
056import org.apache.hadoop.mapred.MiniMRCluster;
057import org.apache.hadoop.mapreduce.InputFormat;
058import org.apache.hadoop.mapreduce.Job;
059import org.apache.hadoop.mapreduce.JobContext;
060import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
061import org.junit.AfterClass;
062import org.junit.Before;
063import org.junit.BeforeClass;
064import org.junit.ClassRule;
065import org.junit.Test;
066import org.junit.experimental.categories.Category;
067import org.mockito.invocation.InvocationOnMock;
068import org.mockito.stubbing.Answer;
069import org.slf4j.Logger;
070import org.slf4j.LoggerFactory;
071
072/**
073 * This tests the TableInputFormat and its recovery semantics
074 */
075@Category(LargeTests.class)
076public class TestTableInputFormat {
077
078  @ClassRule
079  public static final HBaseClassTestRule CLASS_RULE =
080    HBaseClassTestRule.forClass(TestTableInputFormat.class);
081
082  private static final Logger LOG = LoggerFactory.getLogger(TestTableInputFormat.class);
083
084  private final static HBaseTestingUtil UTIL = new HBaseTestingUtil();
085  private static MiniMRCluster mrCluster;
086  static final byte[] FAMILY = Bytes.toBytes("family");
087
088  private static final byte[][] columns = new byte[][] { FAMILY };
089
090  @BeforeClass
091  public static void beforeClass() throws Exception {
092    UTIL.startMiniCluster();
093  }
094
095  @AfterClass
096  public static void afterClass() throws Exception {
097    UTIL.shutdownMiniCluster();
098  }
099
100  @Before
101  public void before() throws IOException {
102    LOG.info("before");
103    UTIL.ensureSomeRegionServersAvailable(1);
104    LOG.info("before done");
105  }
106
107  /**
108   * Setup a table with two rows and values.
109   * @return A Table instance for the created table.
110   */
111  public static Table createTable(byte[] tableName) throws IOException {
112    return createTable(tableName, new byte[][] { FAMILY });
113  }
114
115  /**
116   * Setup a table with two rows and values per column family.
117   * @return A Table instance for the created table.
118   */
119  public static Table createTable(byte[] tableName, byte[][] families) throws IOException {
120    Table table = UTIL.createTable(TableName.valueOf(tableName), families);
121    Put p = new Put(Bytes.toBytes("aaa"));
122    for (byte[] family : families) {
123      p.addColumn(family, null, Bytes.toBytes("value aaa"));
124    }
125    table.put(p);
126    p = new Put(Bytes.toBytes("bbb"));
127    for (byte[] family : families) {
128      p.addColumn(family, null, Bytes.toBytes("value bbb"));
129    }
130    table.put(p);
131    return table;
132  }
133
134  /**
135   * Verify that the result and key have expected values.
136   * @param r             single row result
137   * @param key           the row key
138   * @param expectedKey   the expected key
139   * @param expectedValue the expected value
140   * @return true if the result contains the expected key and value, false otherwise.
141   */
142  static boolean checkResult(Result r, ImmutableBytesWritable key, byte[] expectedKey,
143    byte[] expectedValue) {
144    assertEquals(0, key.compareTo(expectedKey));
145    Map<byte[], byte[]> vals = r.getFamilyMap(FAMILY);
146    byte[] value = vals.values().iterator().next();
147    assertTrue(Arrays.equals(value, expectedValue));
148    return true; // if succeed
149  }
150
151  /**
152   * Create table data and run tests on specified htable using the o.a.h.hbase.mapreduce API.
153   */
154  static void runTestMapreduce(Table table) throws IOException, InterruptedException {
155    org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl trr =
156      new org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl();
157    Scan s = new Scan();
158    s.withStartRow(Bytes.toBytes("aaa"));
159    s.withStopRow(Bytes.toBytes("zzz"));
160    s.addFamily(FAMILY);
161    trr.setScan(s);
162    trr.setHTable(table);
163
164    trr.initialize(null, null);
165    Result r = new Result();
166    ImmutableBytesWritable key = new ImmutableBytesWritable();
167
168    boolean more = trr.nextKeyValue();
169    assertTrue(more);
170    key = trr.getCurrentKey();
171    r = trr.getCurrentValue();
172    checkResult(r, key, Bytes.toBytes("aaa"), Bytes.toBytes("value aaa"));
173
174    more = trr.nextKeyValue();
175    assertTrue(more);
176    key = trr.getCurrentKey();
177    r = trr.getCurrentValue();
178    checkResult(r, key, Bytes.toBytes("bbb"), Bytes.toBytes("value bbb"));
179
180    // no more data
181    more = trr.nextKeyValue();
182    assertFalse(more);
183  }
184
185  /**
186   * Create a table that IOE's on first scanner next call
187   */
188  static Table createIOEScannerTable(byte[] name, final int failCnt) throws IOException {
189    // build up a mock scanner stuff to fail the first time
190    Answer<ResultScanner> a = new Answer<ResultScanner>() {
191      int cnt = 0;
192
193      @Override
194      public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
195        // first invocation return the busted mock scanner
196        if (cnt++ < failCnt) {
197          // create mock ResultScanner that always fails.
198          Scan scan = mock(Scan.class);
199          doReturn(Bytes.toBytes("bogus")).when(scan).getStartRow(); // avoid npe
200          ResultScanner scanner = mock(ResultScanner.class);
201          // simulate TimeoutException / IOException
202          doThrow(new IOException("Injected exception")).when(scanner).next();
203          return scanner;
204        }
205
206        // otherwise return the real scanner.
207        return (ResultScanner) invocation.callRealMethod();
208      }
209    };
210
211    Table htable = spy(createTable(name));
212    doAnswer(a).when(htable).getScanner((Scan) anyObject());
213    return htable;
214  }
215
216  /**
217   * Create a table that throws a NotServingRegionException on first scanner next call
218   */
219  static Table createDNRIOEScannerTable(byte[] name, final int failCnt) throws IOException {
220    // build up a mock scanner stuff to fail the first time
221    Answer<ResultScanner> a = new Answer<ResultScanner>() {
222      int cnt = 0;
223
224      @Override
225      public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
226        // first invocation return the busted mock scanner
227        if (cnt++ < failCnt) {
228          // create mock ResultScanner that always fails.
229          Scan scan = mock(Scan.class);
230          doReturn(Bytes.toBytes("bogus")).when(scan).getStartRow(); // avoid npe
231          ResultScanner scanner = mock(ResultScanner.class);
232
233          invocation.callRealMethod(); // simulate NotServingRegionException
234          doThrow(new NotServingRegionException("Injected simulated TimeoutException"))
235            .when(scanner).next();
236          return scanner;
237        }
238
239        // otherwise return the real scanner.
240        return (ResultScanner) invocation.callRealMethod();
241      }
242    };
243
244    Table htable = spy(createTable(name));
245    doAnswer(a).when(htable).getScanner((Scan) anyObject());
246    return htable;
247  }
248
249  /**
250   * Run test assuming no errors using newer mapreduce api
251   */
252  @Test
253  public void testTableRecordReaderMapreduce() throws IOException, InterruptedException {
254    Table table = createTable(Bytes.toBytes("table1-mr"));
255    runTestMapreduce(table);
256  }
257
258  /**
259   * Run test assuming Scanner IOException failure using newer mapreduce api
260   */
261  @Test
262  public void testTableRecordReaderScannerFailMapreduce() throws IOException, InterruptedException {
263    Table htable = createIOEScannerTable(Bytes.toBytes("table2-mr"), 1);
264    runTestMapreduce(htable);
265  }
266
267  /**
268   * Run test assuming Scanner IOException failure using newer mapreduce api
269   */
270  @Test(expected = IOException.class)
271  public void testTableRecordReaderScannerFailMapreduceTwice()
272    throws IOException, InterruptedException {
273    Table htable = createIOEScannerTable(Bytes.toBytes("table3-mr"), 2);
274    runTestMapreduce(htable);
275  }
276
277  /**
278   * Run test assuming NotServingRegionException using newer mapreduce api
279   */
280  @Test
281  public void testTableRecordReaderScannerTimeoutMapreduce()
282    throws IOException, InterruptedException {
283    Table htable = createDNRIOEScannerTable(Bytes.toBytes("table4-mr"), 1);
284    runTestMapreduce(htable);
285  }
286
287  /**
288   * Run test assuming NotServingRegionException using newer mapreduce api
289   */
290  @Test(expected = org.apache.hadoop.hbase.NotServingRegionException.class)
291  public void testTableRecordReaderScannerTimeoutMapreduceTwice()
292    throws IOException, InterruptedException {
293    Table htable = createDNRIOEScannerTable(Bytes.toBytes("table5-mr"), 2);
294    runTestMapreduce(htable);
295  }
296
297  /**
298   * Verify the example we present in javadocs on TableInputFormatBase
299   */
300  @Test
301  public void testExtensionOfTableInputFormatBase()
302    throws IOException, InterruptedException, ClassNotFoundException {
303    LOG.info("testing use of an InputFormat taht extends InputFormatBase");
304    final Table htable = createTable(Bytes.toBytes("exampleTable"),
305      new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
306    testInputFormat(ExampleTIF.class);
307  }
308
309  @Test
310  public void testJobConfigurableExtensionOfTableInputFormatBase()
311    throws IOException, InterruptedException, ClassNotFoundException {
312    LOG.info(
313      "testing use of an InputFormat taht extends InputFormatBase, " + "using JobConfigurable.");
314    final Table htable = createTable(Bytes.toBytes("exampleJobConfigurableTable"),
315      new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
316    testInputFormat(ExampleJobConfigurableTIF.class);
317  }
318
319  @Test
320  public void testDeprecatedExtensionOfTableInputFormatBase()
321    throws IOException, InterruptedException, ClassNotFoundException {
322    LOG.info("testing use of an InputFormat taht extends InputFormatBase, "
323      + "using the approach documented in 0.98.");
324    final Table htable = createTable(Bytes.toBytes("exampleDeprecatedTable"),
325      new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
326    testInputFormat(ExampleDeprecatedTIF.class);
327  }
328
329  void testInputFormat(Class<? extends InputFormat> clazz)
330    throws IOException, InterruptedException, ClassNotFoundException {
331    final Job job = MapreduceTestingShim.createJob(UTIL.getConfiguration());
332    job.setInputFormatClass(clazz);
333    job.setOutputFormatClass(NullOutputFormat.class);
334    job.setMapperClass(ExampleVerifier.class);
335    job.setNumReduceTasks(0);
336
337    LOG.debug("submitting job.");
338    assertTrue("job failed!", job.waitForCompletion(true));
339    assertEquals("Saw the wrong number of instances of the filtered-for row.", 2, job.getCounters()
340      .findCounter(TestTableInputFormat.class.getName() + ":row", "aaa").getValue());
341    assertEquals("Saw any instances of the filtered out row.", 0, job.getCounters()
342      .findCounter(TestTableInputFormat.class.getName() + ":row", "bbb").getValue());
343    assertEquals("Saw the wrong number of instances of columnA.", 1, job.getCounters()
344      .findCounter(TestTableInputFormat.class.getName() + ":family", "columnA").getValue());
345    assertEquals("Saw the wrong number of instances of columnB.", 1, job.getCounters()
346      .findCounter(TestTableInputFormat.class.getName() + ":family", "columnB").getValue());
347    assertEquals("Saw the wrong count of values for the filtered-for row.", 2, job.getCounters()
348      .findCounter(TestTableInputFormat.class.getName() + ":value", "value aaa").getValue());
349    assertEquals("Saw the wrong count of values for the filtered-out row.", 0, job.getCounters()
350      .findCounter(TestTableInputFormat.class.getName() + ":value", "value bbb").getValue());
351  }
352
353  public static class ExampleVerifier extends TableMapper<NullWritable, NullWritable> {
354
355    @Override
356    public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException {
357      for (Cell cell : value.listCells()) {
358        context
359          .getCounter(TestTableInputFormat.class.getName() + ":row",
360            Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()))
361          .increment(1l);
362        context
363          .getCounter(TestTableInputFormat.class.getName() + ":family",
364            Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()))
365          .increment(1l);
366        context
367          .getCounter(TestTableInputFormat.class.getName() + ":value",
368            Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()))
369          .increment(1l);
370      }
371    }
372
373  }
374
375  public static class ExampleDeprecatedTIF extends TableInputFormatBase implements JobConfigurable {
376
377    @Override
378    public void configure(JobConf job) {
379      try {
380        Connection connection = ConnectionFactory.createConnection(job);
381        Table exampleTable = connection.getTable(TableName.valueOf(("exampleDeprecatedTable")));
382        // mandatory
383        initializeTable(connection, exampleTable.getName());
384        byte[][] inputColumns = new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") };
385        // optional
386        Scan scan = new Scan();
387        for (byte[] family : inputColumns) {
388          scan.addFamily(family);
389        }
390        Filter exampleFilter =
391          new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*"));
392        scan.setFilter(exampleFilter);
393        setScan(scan);
394      } catch (IOException exception) {
395        throw new RuntimeException("Failed to configure for job.", exception);
396      }
397    }
398
399  }
400
401  public static class ExampleJobConfigurableTIF extends TableInputFormatBase
402    implements JobConfigurable {
403
404    @Override
405    public void configure(JobConf job) {
406      try {
407        Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job));
408        TableName tableName = TableName.valueOf("exampleJobConfigurableTable");
409        // mandatory
410        initializeTable(connection, tableName);
411        byte[][] inputColumns = new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") };
412        // optional
413        Scan scan = new Scan();
414        for (byte[] family : inputColumns) {
415          scan.addFamily(family);
416        }
417        Filter exampleFilter =
418          new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*"));
419        scan.setFilter(exampleFilter);
420        setScan(scan);
421      } catch (IOException exception) {
422        throw new RuntimeException("Failed to initialize.", exception);
423      }
424    }
425  }
426
427  public static class ExampleTIF extends TableInputFormatBase {
428
429    @Override
430    protected void initialize(JobContext job) throws IOException {
431      Connection connection =
432        ConnectionFactory.createConnection(HBaseConfiguration.create(job.getConfiguration()));
433      TableName tableName = TableName.valueOf("exampleTable");
434      // mandatory
435      initializeTable(connection, tableName);
436      byte[][] inputColumns = new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") };
437      // optional
438      Scan scan = new Scan();
439      for (byte[] family : inputColumns) {
440        scan.addFamily(family);
441      }
442      Filter exampleFilter =
443        new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*"));
444      scan.setFilter(exampleFilter);
445      setScan(scan);
446    }
447
448  }
449}