001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapred;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.mockito.Matchers.anyObject;
024import static org.mockito.Mockito.doAnswer;
025import static org.mockito.Mockito.doReturn;
026import static org.mockito.Mockito.doThrow;
027import static org.mockito.Mockito.mock;
028import static org.mockito.Mockito.spy;
029
030import java.io.IOException;
031import java.util.Arrays;
032import java.util.Map;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.CompareOperator;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseConfiguration;
038import org.apache.hadoop.hbase.HBaseTestingUtility;
039import org.apache.hadoop.hbase.NotServingRegionException;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.client.Connection;
042import org.apache.hadoop.hbase.client.ConnectionFactory;
043import org.apache.hadoop.hbase.client.Put;
044import org.apache.hadoop.hbase.client.Result;
045import org.apache.hadoop.hbase.client.ResultScanner;
046import org.apache.hadoop.hbase.client.Scan;
047import org.apache.hadoop.hbase.client.Table;
048import org.apache.hadoop.hbase.filter.Filter;
049import org.apache.hadoop.hbase.filter.RegexStringComparator;
050import org.apache.hadoop.hbase.filter.RowFilter;
051import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
052import org.apache.hadoop.hbase.testclassification.LargeTests;
053import org.apache.hadoop.hbase.testclassification.MapReduceTests;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.apache.hadoop.io.NullWritable;
056import org.apache.hadoop.mapred.InputFormat;
057import org.apache.hadoop.mapred.JobClient;
058import org.apache.hadoop.mapred.JobConf;
059import org.apache.hadoop.mapred.JobConfigurable;
060import org.apache.hadoop.mapred.OutputCollector;
061import org.apache.hadoop.mapred.Reporter;
062import org.apache.hadoop.mapred.RunningJob;
063import org.apache.hadoop.mapred.lib.NullOutputFormat;
064import org.junit.AfterClass;
065import org.junit.Before;
066import org.junit.BeforeClass;
067import org.junit.ClassRule;
068import org.junit.Test;
069import org.junit.experimental.categories.Category;
070import org.mockito.invocation.InvocationOnMock;
071import org.mockito.stubbing.Answer;
072import org.slf4j.Logger;
073import org.slf4j.LoggerFactory;
074
075/**
076 * This tests the TableInputFormat and its recovery semantics
077 */
078@Category({MapReduceTests.class, LargeTests.class})
079public class TestTableInputFormat {
080
081  @ClassRule
082  public static final HBaseClassTestRule CLASS_RULE =
083      HBaseClassTestRule.forClass(TestTableInputFormat.class);
084
085  private static final Logger LOG = LoggerFactory.getLogger(TestTableInputFormat.class);
086
087  private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
088
089  static final byte[] FAMILY = Bytes.toBytes("family");
090
091  private static final byte[][] columns = new byte[][] { FAMILY };
092
093  @BeforeClass
094  public static void beforeClass() throws Exception {
095    UTIL.startMiniCluster();
096  }
097
098  @AfterClass
099  public static void afterClass() throws Exception {
100    UTIL.shutdownMiniCluster();
101  }
102
103  @Before
104  public void before() throws IOException {
105    LOG.info("before");
106    UTIL.ensureSomeRegionServersAvailable(1);
107    LOG.info("before done");
108  }
109
110  /**
111   * Setup a table with two rows and values.
112   *
113   * @param tableName
114   * @return
115   * @throws IOException
116   */
117  public static Table createTable(byte[] tableName) throws IOException {
118    return createTable(tableName, new byte[][] { FAMILY });
119  }
120
121  /**
122   * Setup a table with two rows and values per column family.
123   *
124   * @param tableName
125   * @return
126   * @throws IOException
127   */
128  public static Table createTable(byte[] tableName, byte[][] families) throws IOException {
129    Table table = UTIL.createTable(TableName.valueOf(tableName), families);
130    Put p = new Put("aaa".getBytes());
131    for (byte[] family : families) {
132      p.addColumn(family, null, "value aaa".getBytes());
133    }
134    table.put(p);
135    p = new Put("bbb".getBytes());
136    for (byte[] family : families) {
137      p.addColumn(family, null, "value bbb".getBytes());
138    }
139    table.put(p);
140    return table;
141  }
142
143  /**
144   * Verify that the result and key have expected values.
145   *
146   * @param r single row result
147   * @param key the row key
148   * @param expectedKey the expected key
149   * @param expectedValue the expected value
150   * @return true if the result contains the expected key and value, false otherwise.
151   */
152  static boolean checkResult(Result r, ImmutableBytesWritable key,
153      byte[] expectedKey, byte[] expectedValue) {
154    assertEquals(0, key.compareTo(expectedKey));
155    Map<byte[], byte[]> vals = r.getFamilyMap(FAMILY);
156    byte[] value = vals.values().iterator().next();
157    assertTrue(Arrays.equals(value, expectedValue));
158    return true; // if succeed
159  }
160
161  /**
162   * Create table data and run tests on specified htable using the
163   * o.a.h.hbase.mapred API.
164   *
165   * @param table
166   * @throws IOException
167   */
168  static void runTestMapred(Table table) throws IOException {
169    org.apache.hadoop.hbase.mapred.TableRecordReader trr =
170        new org.apache.hadoop.hbase.mapred.TableRecordReader();
171    trr.setStartRow("aaa".getBytes());
172    trr.setEndRow("zzz".getBytes());
173    trr.setHTable(table);
174    trr.setInputColumns(columns);
175
176    trr.init();
177    Result r = new Result();
178    ImmutableBytesWritable key = new ImmutableBytesWritable();
179
180    boolean more = trr.next(key, r);
181    assertTrue(more);
182    checkResult(r, key, "aaa".getBytes(), "value aaa".getBytes());
183
184    more = trr.next(key, r);
185    assertTrue(more);
186    checkResult(r, key, "bbb".getBytes(), "value bbb".getBytes());
187
188    // no more data
189    more = trr.next(key, r);
190    assertFalse(more);
191  }
192
193  /**
194   * Create a table that IOE's on first scanner next call
195   *
196   * @throws IOException
197   */
198  static Table createIOEScannerTable(byte[] name, final int failCnt)
199      throws IOException {
200    // build up a mock scanner stuff to fail the first time
201    Answer<ResultScanner> a = new Answer<ResultScanner>() {
202      int cnt = 0;
203
204      @Override
205      public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
206        // first invocation return the busted mock scanner
207        if (cnt++ < failCnt) {
208          // create mock ResultScanner that always fails.
209          Scan scan = mock(Scan.class);
210          doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe
211          ResultScanner scanner = mock(ResultScanner.class);
212          // simulate TimeoutException / IOException
213          doThrow(new IOException("Injected exception")).when(scanner).next();
214          return scanner;
215        }
216
217        // otherwise return the real scanner.
218        return (ResultScanner) invocation.callRealMethod();
219      }
220    };
221
222    Table htable = spy(createTable(name));
223    doAnswer(a).when(htable).getScanner((Scan) anyObject());
224    return htable;
225  }
226
227  /**
228   * Create a table that throws a DoNoRetryIOException on first scanner next
229   * call
230   *
231   * @throws IOException
232   */
233  static Table createDNRIOEScannerTable(byte[] name, final int failCnt)
234      throws IOException {
235    // build up a mock scanner stuff to fail the first time
236    Answer<ResultScanner> a = new Answer<ResultScanner>() {
237      int cnt = 0;
238
239      @Override
240      public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
241        // first invocation return the busted mock scanner
242        if (cnt++ < failCnt) {
243          // create mock ResultScanner that always fails.
244          Scan scan = mock(Scan.class);
245          doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe
246          ResultScanner scanner = mock(ResultScanner.class);
247
248          invocation.callRealMethod(); // simulate NotServingRegionException
249          doThrow(
250              new NotServingRegionException("Injected simulated TimeoutException"))
251              .when(scanner).next();
252          return scanner;
253        }
254
255        // otherwise return the real scanner.
256        return (ResultScanner) invocation.callRealMethod();
257      }
258    };
259
260    Table htable = spy(createTable(name));
261    doAnswer(a).when(htable).getScanner((Scan) anyObject());
262    return htable;
263  }
264
265  /**
266   * Run test assuming no errors using mapred api.
267   *
268   * @throws IOException
269   */
270  @Test
271  public void testTableRecordReader() throws IOException {
272    Table table = createTable("table1".getBytes());
273    runTestMapred(table);
274  }
275
276  /**
277   * Run test assuming Scanner IOException failure using mapred api,
278   *
279   * @throws IOException
280   */
281  @Test
282  public void testTableRecordReaderScannerFail() throws IOException {
283    Table htable = createIOEScannerTable("table2".getBytes(), 1);
284    runTestMapred(htable);
285  }
286
287  /**
288   * Run test assuming Scanner IOException failure using mapred api,
289   *
290   * @throws IOException
291   */
292  @Test(expected = IOException.class)
293  public void testTableRecordReaderScannerFailTwice() throws IOException {
294    Table htable = createIOEScannerTable("table3".getBytes(), 2);
295    runTestMapred(htable);
296  }
297
298  /**
299   * Run test assuming NotServingRegionException using mapred api.
300   *
301   * @throws org.apache.hadoop.hbase.DoNotRetryIOException
302   */
303  @Test
304  public void testTableRecordReaderScannerTimeout() throws IOException {
305    Table htable = createDNRIOEScannerTable("table4".getBytes(), 1);
306    runTestMapred(htable);
307  }
308
309  /**
310   * Run test assuming NotServingRegionException using mapred api.
311   *
312   * @throws org.apache.hadoop.hbase.DoNotRetryIOException
313   */
314  @Test(expected = org.apache.hadoop.hbase.NotServingRegionException.class)
315  public void testTableRecordReaderScannerTimeoutTwice() throws IOException {
316    Table htable = createDNRIOEScannerTable("table5".getBytes(), 2);
317    runTestMapred(htable);
318  }
319
320  /**
321   * Verify the example we present in javadocs on TableInputFormatBase
322   */
323  @Test
324  public void testExtensionOfTableInputFormatBase() throws IOException {
325    LOG.info("testing use of an InputFormat taht extends InputFormatBase");
326    final Table table = createTable(Bytes.toBytes("exampleTable"),
327      new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
328    testInputFormat(ExampleTIF.class);
329  }
330
331  @Test
332  public void testDeprecatedExtensionOfTableInputFormatBase() throws IOException {
333    LOG.info("testing use of an InputFormat taht extends InputFormatBase, "
334        + "as it was given in 0.98.");
335    final Table table = createTable(Bytes.toBytes("exampleDeprecatedTable"),
336      new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
337    testInputFormat(ExampleDeprecatedTIF.class);
338  }
339
340  @Test
341  public void testJobConfigurableExtensionOfTableInputFormatBase() throws IOException {
342    LOG.info("testing use of an InputFormat taht extends InputFormatBase, "
343        + "using JobConfigurable.");
344    final Table table = createTable(Bytes.toBytes("exampleJobConfigurableTable"),
345      new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
346    testInputFormat(ExampleJobConfigurableTIF.class);
347  }
348
349  void testInputFormat(Class<? extends InputFormat> clazz) throws IOException {
350    Configuration conf = UTIL.getConfiguration();
351    final JobConf job = new JobConf(conf);
352    job.setInputFormat(clazz);
353    job.setOutputFormat(NullOutputFormat.class);
354    job.setMapperClass(ExampleVerifier.class);
355    job.setNumReduceTasks(0);
356    LOG.debug("submitting job.");
357    final RunningJob run = JobClient.runJob(job);
358    assertTrue("job failed!", run.isSuccessful());
359    assertEquals("Saw the wrong number of instances of the filtered-for row.", 2, run.getCounters()
360        .findCounter(TestTableInputFormat.class.getName() + ":row", "aaa").getCounter());
361    assertEquals("Saw any instances of the filtered out row.", 0, run.getCounters()
362        .findCounter(TestTableInputFormat.class.getName() + ":row", "bbb").getCounter());
363    assertEquals("Saw the wrong number of instances of columnA.", 1, run.getCounters()
364        .findCounter(TestTableInputFormat.class.getName() + ":family", "columnA").getCounter());
365    assertEquals("Saw the wrong number of instances of columnB.", 1, run.getCounters()
366        .findCounter(TestTableInputFormat.class.getName() + ":family", "columnB").getCounter());
367    assertEquals("Saw the wrong count of values for the filtered-for row.", 2, run.getCounters()
368        .findCounter(TestTableInputFormat.class.getName() + ":value", "value aaa").getCounter());
369    assertEquals("Saw the wrong count of values for the filtered-out row.", 0, run.getCounters()
370        .findCounter(TestTableInputFormat.class.getName() + ":value", "value bbb").getCounter());
371  }
372
373  public static class ExampleVerifier implements TableMap<NullWritable, NullWritable> {
374
375    @Override
376    public void configure(JobConf conf) {
377    }
378
379    @Override
380    public void map(ImmutableBytesWritable key, Result value,
381        OutputCollector<NullWritable,NullWritable> output,
382        Reporter reporter) throws IOException {
383      for (Cell cell : value.listCells()) {
384        reporter.getCounter(TestTableInputFormat.class.getName() + ":row",
385            Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()))
386            .increment(1l);
387        reporter.getCounter(TestTableInputFormat.class.getName() + ":family",
388            Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()))
389            .increment(1l);
390        reporter.getCounter(TestTableInputFormat.class.getName() + ":value",
391            Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()))
392            .increment(1l);
393      }
394    }
395
396    @Override
397    public void close() {
398    }
399
400  }
401
402  public static class ExampleDeprecatedTIF extends TableInputFormatBase implements JobConfigurable {
403
404    @Override
405    public void configure(JobConf job) {
406      try {
407        Connection connection = ConnectionFactory.createConnection(job);
408        Table exampleTable = connection.getTable(TableName.valueOf("exampleDeprecatedTable"));
409        // mandatory
410        initializeTable(connection, exampleTable.getName());
411        byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
412          Bytes.toBytes("columnB") };
413        // mandatory
414        setInputColumns(inputColumns);
415        Filter exampleFilter =
416          new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*"));
417        // optional
418        setRowFilter(exampleFilter);
419      } catch (IOException exception) {
420        throw new RuntimeException("Failed to configure for job.", exception);
421      }
422    }
423
424  }
425
426  public static class ExampleJobConfigurableTIF extends ExampleTIF implements JobConfigurable {
427
428    @Override
429    public void configure(JobConf job) {
430      try {
431        initialize(job);
432      } catch (IOException exception) {
433        throw new RuntimeException("Failed to initialize.", exception);
434      }
435    }
436
437    @Override
438    protected void initialize(JobConf job) throws IOException {
439      initialize(job, "exampleJobConfigurableTable");
440    }
441  }
442
443
444  public static class ExampleTIF extends TableInputFormatBase {
445
446    @Override
447    protected void initialize(JobConf job) throws IOException {
448      initialize(job, "exampleTable");
449    }
450
451    protected void initialize(JobConf job, String table) throws IOException {
452      Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job));
453      TableName tableName = TableName.valueOf(table);
454      // mandatory
455      initializeTable(connection, tableName);
456      byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
457        Bytes.toBytes("columnB") };
458      // mandatory
459      setInputColumns(inputColumns);
460      Filter exampleFilter =
461        new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*"));
462      // optional
463      setRowFilter(exampleFilter);
464    }
465
466  }
467
468}
469