001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapred;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.mockito.Matchers.anyObject;
024import static org.mockito.Mockito.doAnswer;
025import static org.mockito.Mockito.doReturn;
026import static org.mockito.Mockito.doThrow;
027import static org.mockito.Mockito.mock;
028import static org.mockito.Mockito.spy;
029
030import java.io.IOException;
031import java.util.Arrays;
032import java.util.Map;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.CompareOperator;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseConfiguration;
038import org.apache.hadoop.hbase.HBaseTestingUtility;
039import org.apache.hadoop.hbase.NotServingRegionException;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.client.Connection;
042import org.apache.hadoop.hbase.client.ConnectionFactory;
043import org.apache.hadoop.hbase.client.Put;
044import org.apache.hadoop.hbase.client.Result;
045import org.apache.hadoop.hbase.client.ResultScanner;
046import org.apache.hadoop.hbase.client.Scan;
047import org.apache.hadoop.hbase.client.Table;
048import org.apache.hadoop.hbase.filter.Filter;
049import org.apache.hadoop.hbase.filter.RegexStringComparator;
050import org.apache.hadoop.hbase.filter.RowFilter;
051import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
052import org.apache.hadoop.hbase.testclassification.LargeTests;
053import org.apache.hadoop.hbase.testclassification.MapReduceTests;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.apache.hadoop.io.NullWritable;
056import org.apache.hadoop.mapred.InputFormat;
057import org.apache.hadoop.mapred.JobClient;
058import org.apache.hadoop.mapred.JobConf;
059import org.apache.hadoop.mapred.JobConfigurable;
060import org.apache.hadoop.mapred.OutputCollector;
061import org.apache.hadoop.mapred.Reporter;
062import org.apache.hadoop.mapred.RunningJob;
063import org.apache.hadoop.mapred.lib.NullOutputFormat;
064import org.junit.AfterClass;
065import org.junit.Before;
066import org.junit.BeforeClass;
067import org.junit.ClassRule;
068import org.junit.Test;
069import org.junit.experimental.categories.Category;
070import org.mockito.invocation.InvocationOnMock;
071import org.mockito.stubbing.Answer;
072import org.slf4j.Logger;
073import org.slf4j.LoggerFactory;
074
075/**
076 * This tests the TableInputFormat and its recovery semantics
077 */
078@Category({ MapReduceTests.class, LargeTests.class })
079public class TestTableInputFormat {
080
081  @ClassRule
082  public static final HBaseClassTestRule CLASS_RULE =
083    HBaseClassTestRule.forClass(TestTableInputFormat.class);
084
085  private static final Logger LOG = LoggerFactory.getLogger(TestTableInputFormat.class);
086
087  private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
088
089  static final byte[] FAMILY = Bytes.toBytes("family");
090
091  private static final byte[][] columns = new byte[][] { FAMILY };
092
093  @BeforeClass
094  public static void beforeClass() throws Exception {
095    UTIL.startMiniCluster();
096  }
097
098  @AfterClass
099  public static void afterClass() throws Exception {
100    UTIL.shutdownMiniCluster();
101  }
102
103  @Before
104  public void before() throws IOException {
105    LOG.info("before");
106    UTIL.ensureSomeRegionServersAvailable(1);
107    LOG.info("before done");
108  }
109
110  /**
111   * Setup a table with two rows and values.
112   * @param tableName the name of the table to create
113   * @return A Table instance for the created table. n
114   */
115  public static Table createTable(byte[] tableName) throws IOException {
116    return createTable(tableName, new byte[][] { FAMILY });
117  }
118
119  /**
120   * Setup a table with two rows and values per column family. n * @return A Table instance for the
121   * created table. n
122   */
123  public static Table createTable(byte[] tableName, byte[][] families) throws IOException {
124    Table table = UTIL.createTable(TableName.valueOf(tableName), families);
125    Put p = new Put("aaa".getBytes());
126    for (byte[] family : families) {
127      p.addColumn(family, null, "value aaa".getBytes());
128    }
129    table.put(p);
130    p = new Put("bbb".getBytes());
131    for (byte[] family : families) {
132      p.addColumn(family, null, "value bbb".getBytes());
133    }
134    table.put(p);
135    return table;
136  }
137
138  /**
139   * Verify that the result and key have expected values.
140   * @param r             single row result
141   * @param key           the row key
142   * @param expectedKey   the expected key
143   * @param expectedValue the expected value
144   * @return true if the result contains the expected key and value, false otherwise.
145   */
146  static boolean checkResult(Result r, ImmutableBytesWritable key, byte[] expectedKey,
147    byte[] expectedValue) {
148    assertEquals(0, key.compareTo(expectedKey));
149    Map<byte[], byte[]> vals = r.getFamilyMap(FAMILY);
150    byte[] value = vals.values().iterator().next();
151    assertTrue(Arrays.equals(value, expectedValue));
152    return true; // if succeed
153  }
154
155  /**
156   * Create table data and run tests on specified htable using the o.a.h.hbase.mapred API. nn
157   */
158  static void runTestMapred(Table table) throws IOException {
159    org.apache.hadoop.hbase.mapred.TableRecordReader trr =
160      new org.apache.hadoop.hbase.mapred.TableRecordReader();
161    trr.setStartRow("aaa".getBytes());
162    trr.setEndRow("zzz".getBytes());
163    trr.setHTable(table);
164    trr.setInputColumns(columns);
165
166    trr.init();
167    Result r = new Result();
168    ImmutableBytesWritable key = new ImmutableBytesWritable();
169
170    boolean more = trr.next(key, r);
171    assertTrue(more);
172    checkResult(r, key, "aaa".getBytes(), "value aaa".getBytes());
173
174    more = trr.next(key, r);
175    assertTrue(more);
176    checkResult(r, key, "bbb".getBytes(), "value bbb".getBytes());
177
178    // no more data
179    more = trr.next(key, r);
180    assertFalse(more);
181  }
182
183  /**
184   * Create a table that IOE's on first scanner next call n
185   */
186  static Table createIOEScannerTable(byte[] name, final int failCnt) throws IOException {
187    // build up a mock scanner stuff to fail the first time
188    Answer<ResultScanner> a = new Answer<ResultScanner>() {
189      int cnt = 0;
190
191      @Override
192      public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
193        // first invocation return the busted mock scanner
194        if (cnt++ < failCnt) {
195          // create mock ResultScanner that always fails.
196          Scan scan = mock(Scan.class);
197          doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe
198          ResultScanner scanner = mock(ResultScanner.class);
199          // simulate TimeoutException / IOException
200          doThrow(new IOException("Injected exception")).when(scanner).next();
201          return scanner;
202        }
203
204        // otherwise return the real scanner.
205        return (ResultScanner) invocation.callRealMethod();
206      }
207    };
208
209    Table htable = spy(createTable(name));
210    doAnswer(a).when(htable).getScanner((Scan) anyObject());
211    return htable;
212  }
213
214  /**
215   * Create a table that throws a DoNoRetryIOException on first scanner next call n
216   */
217  static Table createDNRIOEScannerTable(byte[] name, final int failCnt) throws IOException {
218    // build up a mock scanner stuff to fail the first time
219    Answer<ResultScanner> a = new Answer<ResultScanner>() {
220      int cnt = 0;
221
222      @Override
223      public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
224        // first invocation return the busted mock scanner
225        if (cnt++ < failCnt) {
226          // create mock ResultScanner that always fails.
227          Scan scan = mock(Scan.class);
228          doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe
229          ResultScanner scanner = mock(ResultScanner.class);
230
231          invocation.callRealMethod(); // simulate NotServingRegionException
232          doThrow(new NotServingRegionException("Injected simulated TimeoutException"))
233            .when(scanner).next();
234          return scanner;
235        }
236
237        // otherwise return the real scanner.
238        return (ResultScanner) invocation.callRealMethod();
239      }
240    };
241
242    Table htable = spy(createTable(name));
243    doAnswer(a).when(htable).getScanner((Scan) anyObject());
244    return htable;
245  }
246
247  /**
248   * Run test assuming no errors using mapred api. n
249   */
250  @Test
251  public void testTableRecordReader() throws IOException {
252    Table table = createTable("table1".getBytes());
253    runTestMapred(table);
254  }
255
256  /**
257   * Run test assuming Scanner IOException failure using mapred api, n
258   */
259  @Test
260  public void testTableRecordReaderScannerFail() throws IOException {
261    Table htable = createIOEScannerTable("table2".getBytes(), 1);
262    runTestMapred(htable);
263  }
264
265  /**
266   * Run test assuming Scanner IOException failure using mapred api, n
267   */
268  @Test(expected = IOException.class)
269  public void testTableRecordReaderScannerFailTwice() throws IOException {
270    Table htable = createIOEScannerTable("table3".getBytes(), 2);
271    runTestMapred(htable);
272  }
273
274  /**
275   * Run test assuming NotServingRegionException using mapred api.
276   * @throws org.apache.hadoop.hbase.DoNotRetryIOException
277   */
278  @Test
279  public void testTableRecordReaderScannerTimeout() throws IOException {
280    Table htable = createDNRIOEScannerTable("table4".getBytes(), 1);
281    runTestMapred(htable);
282  }
283
284  /**
285   * Run test assuming NotServingRegionException using mapred api.
286   * @throws org.apache.hadoop.hbase.DoNotRetryIOException
287   */
288  @Test(expected = org.apache.hadoop.hbase.NotServingRegionException.class)
289  public void testTableRecordReaderScannerTimeoutTwice() throws IOException {
290    Table htable = createDNRIOEScannerTable("table5".getBytes(), 2);
291    runTestMapred(htable);
292  }
293
294  /**
295   * Verify the example we present in javadocs on TableInputFormatBase
296   */
297  @Test
298  public void testExtensionOfTableInputFormatBase() throws IOException {
299    LOG.info("testing use of an InputFormat taht extends InputFormatBase");
300    final Table table = createTable(Bytes.toBytes("exampleTable"),
301      new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
302    testInputFormat(ExampleTIF.class);
303  }
304
305  @Test
306  public void testDeprecatedExtensionOfTableInputFormatBase() throws IOException {
307    LOG.info(
308      "testing use of an InputFormat taht extends InputFormatBase, " + "as it was given in 0.98.");
309    final Table table = createTable(Bytes.toBytes("exampleDeprecatedTable"),
310      new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
311    testInputFormat(ExampleDeprecatedTIF.class);
312  }
313
314  @Test
315  public void testJobConfigurableExtensionOfTableInputFormatBase() throws IOException {
316    LOG.info(
317      "testing use of an InputFormat taht extends InputFormatBase, " + "using JobConfigurable.");
318    final Table table = createTable(Bytes.toBytes("exampleJobConfigurableTable"),
319      new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
320    testInputFormat(ExampleJobConfigurableTIF.class);
321  }
322
323  void testInputFormat(Class<? extends InputFormat> clazz) throws IOException {
324    Configuration conf = UTIL.getConfiguration();
325    final JobConf job = new JobConf(conf);
326    job.setInputFormat(clazz);
327    job.setOutputFormat(NullOutputFormat.class);
328    job.setMapperClass(ExampleVerifier.class);
329    job.setNumReduceTasks(0);
330    LOG.debug("submitting job.");
331    final RunningJob run = JobClient.runJob(job);
332    assertTrue("job failed!", run.isSuccessful());
333    assertEquals("Saw the wrong number of instances of the filtered-for row.", 2, run.getCounters()
334      .findCounter(TestTableInputFormat.class.getName() + ":row", "aaa").getCounter());
335    assertEquals("Saw any instances of the filtered out row.", 0, run.getCounters()
336      .findCounter(TestTableInputFormat.class.getName() + ":row", "bbb").getCounter());
337    assertEquals("Saw the wrong number of instances of columnA.", 1, run.getCounters()
338      .findCounter(TestTableInputFormat.class.getName() + ":family", "columnA").getCounter());
339    assertEquals("Saw the wrong number of instances of columnB.", 1, run.getCounters()
340      .findCounter(TestTableInputFormat.class.getName() + ":family", "columnB").getCounter());
341    assertEquals("Saw the wrong count of values for the filtered-for row.", 2, run.getCounters()
342      .findCounter(TestTableInputFormat.class.getName() + ":value", "value aaa").getCounter());
343    assertEquals("Saw the wrong count of values for the filtered-out row.", 0, run.getCounters()
344      .findCounter(TestTableInputFormat.class.getName() + ":value", "value bbb").getCounter());
345  }
346
347  public static class ExampleVerifier implements TableMap<NullWritable, NullWritable> {
348
349    @Override
350    public void configure(JobConf conf) {
351    }
352
353    @Override
354    public void map(ImmutableBytesWritable key, Result value,
355      OutputCollector<NullWritable, NullWritable> output, Reporter reporter) throws IOException {
356      for (Cell cell : value.listCells()) {
357        reporter
358          .getCounter(TestTableInputFormat.class.getName() + ":row",
359            Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()))
360          .increment(1l);
361        reporter
362          .getCounter(TestTableInputFormat.class.getName() + ":family",
363            Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()))
364          .increment(1l);
365        reporter
366          .getCounter(TestTableInputFormat.class.getName() + ":value",
367            Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()))
368          .increment(1l);
369      }
370    }
371
372    @Override
373    public void close() {
374    }
375
376  }
377
378  public static class ExampleDeprecatedTIF extends TableInputFormatBase implements JobConfigurable {
379
380    @Override
381    public void configure(JobConf job) {
382      try {
383        Connection connection = ConnectionFactory.createConnection(job);
384        Table exampleTable = connection.getTable(TableName.valueOf("exampleDeprecatedTable"));
385        // mandatory
386        initializeTable(connection, exampleTable.getName());
387        byte[][] inputColumns = new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") };
388        // mandatory
389        setInputColumns(inputColumns);
390        Filter exampleFilter =
391          new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*"));
392        // optional
393        setRowFilter(exampleFilter);
394      } catch (IOException exception) {
395        throw new RuntimeException("Failed to configure for job.", exception);
396      }
397    }
398
399  }
400
401  public static class ExampleJobConfigurableTIF extends ExampleTIF implements JobConfigurable {
402
403    @Override
404    public void configure(JobConf job) {
405      try {
406        initialize(job);
407      } catch (IOException exception) {
408        throw new RuntimeException("Failed to initialize.", exception);
409      }
410    }
411
412    @Override
413    protected void initialize(JobConf job) throws IOException {
414      initialize(job, "exampleJobConfigurableTable");
415    }
416  }
417
418  public static class ExampleTIF extends TableInputFormatBase {
419
420    @Override
421    protected void initialize(JobConf job) throws IOException {
422      initialize(job, "exampleTable");
423    }
424
425    protected void initialize(JobConf job, String table) throws IOException {
426      Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job));
427      TableName tableName = TableName.valueOf(table);
428      // mandatory
429      initializeTable(connection, tableName);
430      byte[][] inputColumns = new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") };
431      // mandatory
432      setInputColumns(inputColumns);
433      Filter exampleFilter =
434        new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*"));
435      // optional
436      setRowFilter(exampleFilter);
437    }
438
439  }
440
441}