001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertThrows;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024import static org.mockito.Mockito.*;
025import static org.mockito.Mockito.doReturn;
026import static org.mockito.Mockito.doThrow;
027import static org.mockito.Mockito.mock;
028import static org.mockito.Mockito.spy;
029
030import java.io.IOException;
031import java.util.Arrays;
032import java.util.Map;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CompareOperator;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.NotServingRegionException;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.Connection;
040import org.apache.hadoop.hbase.client.ConnectionFactory;
041import org.apache.hadoop.hbase.client.Put;
042import org.apache.hadoop.hbase.client.Result;
043import org.apache.hadoop.hbase.client.ResultScanner;
044import org.apache.hadoop.hbase.client.Scan;
045import org.apache.hadoop.hbase.client.Table;
046import org.apache.hadoop.hbase.filter.Filter;
047import org.apache.hadoop.hbase.filter.RegexStringComparator;
048import org.apache.hadoop.hbase.filter.RowFilter;
049import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
050import org.apache.hadoop.hbase.testclassification.LargeTests;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.apache.hadoop.io.NullWritable;
053import org.apache.hadoop.mapred.JobConf;
054import org.apache.hadoop.mapred.JobConfigurable;
055import org.apache.hadoop.mapred.MiniMRCluster;
056import org.apache.hadoop.mapreduce.InputFormat;
057import org.apache.hadoop.mapreduce.Job;
058import org.apache.hadoop.mapreduce.JobContext;
059import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
060import org.junit.jupiter.api.AfterAll;
061import org.junit.jupiter.api.BeforeAll;
062import org.junit.jupiter.api.BeforeEach;
063import org.junit.jupiter.api.Tag;
064import org.junit.jupiter.api.Test;
065import org.mockito.invocation.InvocationOnMock;
066import org.mockito.stubbing.Answer;
067import org.slf4j.Logger;
068import org.slf4j.LoggerFactory;
069
070/**
071 * This tests the TableInputFormat and its recovery semantics
072 */
073@Tag(LargeTests.TAG)
074public class TestTableInputFormat {
075
076  private static final Logger LOG = LoggerFactory.getLogger(TestTableInputFormat.class);
077
078  private final static HBaseTestingUtil UTIL = new HBaseTestingUtil();
079  private static MiniMRCluster mrCluster;
080  static final byte[] FAMILY = Bytes.toBytes("family");
081
082  private static final byte[][] columns = new byte[][] { FAMILY };
083
084  @BeforeAll
085  public static void beforeClass() throws Exception {
086    UTIL.startMiniCluster();
087  }
088
089  @AfterAll
090  public static void afterClass() throws Exception {
091    UTIL.shutdownMiniCluster();
092  }
093
094  @BeforeEach
095  public void before() throws IOException {
096    LOG.info("before");
097    UTIL.ensureSomeRegionServersAvailable(1);
098    LOG.info("before done");
099  }
100
101  /**
102   * Setup a table with two rows and values.
103   * @return A Table instance for the created table.
104   */
105  public static Table createTable(byte[] tableName) throws IOException {
106    return createTable(tableName, new byte[][] { FAMILY });
107  }
108
109  /**
110   * Setup a table with two rows and values per column family.
111   * @return A Table instance for the created table.
112   */
113  public static Table createTable(byte[] tableName, byte[][] families) throws IOException {
114    Table table = UTIL.createTable(TableName.valueOf(tableName), families);
115    Put p = new Put(Bytes.toBytes("aaa"));
116    for (byte[] family : families) {
117      p.addColumn(family, null, Bytes.toBytes("value aaa"));
118    }
119    table.put(p);
120    p = new Put(Bytes.toBytes("bbb"));
121    for (byte[] family : families) {
122      p.addColumn(family, null, Bytes.toBytes("value bbb"));
123    }
124    table.put(p);
125    return table;
126  }
127
128  /**
129   * Verify that the result and key have expected values.
130   * @param r             single row result
131   * @param key           the row key
132   * @param expectedKey   the expected key
133   * @param expectedValue the expected value
134   * @return true if the result contains the expected key and value, false otherwise.
135   */
136  static boolean checkResult(Result r, ImmutableBytesWritable key, byte[] expectedKey,
137    byte[] expectedValue) {
138    assertEquals(0, key.compareTo(expectedKey));
139    Map<byte[], byte[]> vals = r.getFamilyMap(FAMILY);
140    byte[] value = vals.values().iterator().next();
141    assertTrue(Arrays.equals(value, expectedValue));
142    return true; // if succeed
143  }
144
145  /**
146   * Create table data and run tests on specified htable using the o.a.h.hbase.mapreduce API.
147   */
148  static void runTestMapreduce(Table table) throws IOException, InterruptedException {
149    org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl trr =
150      new org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl();
151    Scan s = new Scan();
152    s.withStartRow(Bytes.toBytes("aaa"));
153    s.withStopRow(Bytes.toBytes("zzz"));
154    s.addFamily(FAMILY);
155    trr.setScan(s);
156    trr.setHTable(table);
157
158    trr.initialize(null, null);
159    Result r = new Result();
160    ImmutableBytesWritable key = new ImmutableBytesWritable();
161
162    boolean more = trr.nextKeyValue();
163    assertTrue(more);
164    key = trr.getCurrentKey();
165    r = trr.getCurrentValue();
166    checkResult(r, key, Bytes.toBytes("aaa"), Bytes.toBytes("value aaa"));
167
168    more = trr.nextKeyValue();
169    assertTrue(more);
170    key = trr.getCurrentKey();
171    r = trr.getCurrentValue();
172    checkResult(r, key, Bytes.toBytes("bbb"), Bytes.toBytes("value bbb"));
173
174    // no more data
175    more = trr.nextKeyValue();
176    assertFalse(more);
177  }
178
179  /**
180   * Create a table that IOE's on first scanner next call
181   */
182  static Table createIOEScannerTable(byte[] name, final int failCnt) throws IOException {
183    // build up a mock scanner stuff to fail the first time
184    Answer<ResultScanner> a = new Answer<ResultScanner>() {
185      int cnt = 0;
186
187      @Override
188      public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
189        // first invocation return the busted mock scanner
190        if (cnt++ < failCnt) {
191          // create mock ResultScanner that always fails.
192          Scan scan = mock(Scan.class);
193          doReturn(Bytes.toBytes("bogus")).when(scan).getStartRow(); // avoid npe
194          ResultScanner scanner = mock(ResultScanner.class);
195          // simulate TimeoutException / IOException
196          doThrow(new IOException("Injected exception")).when(scanner).next();
197          return scanner;
198        }
199
200        // otherwise return the real scanner.
201        return (ResultScanner) invocation.callRealMethod();
202      }
203    };
204
205    Table htable = spy(createTable(name));
206    doAnswer(a).when(htable).getScanner(any(Scan.class));
207    return htable;
208  }
209
210  /**
211   * Create a table that throws a NotServingRegionException on first scanner next call
212   */
213  static Table createDNRIOEScannerTable(byte[] name, final int failCnt) throws IOException {
214    // build up a mock scanner stuff to fail the first time
215    Answer<ResultScanner> a = new Answer<ResultScanner>() {
216      int cnt = 0;
217
218      @Override
219      public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
220        // first invocation return the busted mock scanner
221        if (cnt++ < failCnt) {
222          // create mock ResultScanner that always fails.
223          Scan scan = mock(Scan.class);
224          doReturn(Bytes.toBytes("bogus")).when(scan).getStartRow(); // avoid npe
225          ResultScanner scanner = mock(ResultScanner.class);
226
227          invocation.callRealMethod(); // simulate NotServingRegionException
228          doThrow(new NotServingRegionException("Injected simulated TimeoutException"))
229            .when(scanner).next();
230          return scanner;
231        }
232
233        // otherwise return the real scanner.
234        return (ResultScanner) invocation.callRealMethod();
235      }
236    };
237
238    Table htable = spy(createTable(name));
239    doAnswer(a).when(htable).getScanner(any(Scan.class));
240    return htable;
241  }
242
243  /**
244   * Run test assuming no errors using newer mapreduce api
245   */
246  @Test
247  public void testTableRecordReaderMapreduce() throws IOException, InterruptedException {
248    Table table = createTable(Bytes.toBytes("table1-mr"));
249    runTestMapreduce(table);
250  }
251
252  /**
253   * Run test assuming Scanner IOException failure using newer mapreduce api
254   */
255  @Test
256  public void testTableRecordReaderScannerFailMapreduce() throws IOException, InterruptedException {
257    Table htable = createIOEScannerTable(Bytes.toBytes("table2-mr"), 1);
258    runTestMapreduce(htable);
259  }
260
261  /**
262   * Run test assuming Scanner IOException failure using newer mapreduce api
263   */
264  @Test
265  public void testTableRecordReaderScannerFailMapreduceTwice()
266    throws IOException, InterruptedException {
267    Table htable = createIOEScannerTable(Bytes.toBytes("table3-mr"), 2);
268    assertThrows(IOException.class, () -> runTestMapreduce(htable));
269  }
270
271  /**
272   * Run test assuming NotServingRegionException using newer mapreduce api
273   */
274  @Test
275  public void testTableRecordReaderScannerTimeoutMapreduce()
276    throws IOException, InterruptedException {
277    Table htable = createDNRIOEScannerTable(Bytes.toBytes("table4-mr"), 1);
278    runTestMapreduce(htable);
279  }
280
281  /**
282   * Run test assuming NotServingRegionException using newer mapreduce api
283   */
284  @Test
285  public void testTableRecordReaderScannerTimeoutMapreduceTwice()
286    throws IOException, InterruptedException {
287    Table htable = createDNRIOEScannerTable(Bytes.toBytes("table5-mr"), 2);
288    assertThrows(org.apache.hadoop.hbase.NotServingRegionException.class,
289      () -> runTestMapreduce(htable));
290  }
291
292  /**
293   * Verify the example we present in javadocs on TableInputFormatBase
294   */
295  @Test
296  public void testExtensionOfTableInputFormatBase()
297    throws IOException, InterruptedException, ClassNotFoundException {
298    LOG.info("testing use of an InputFormat taht extends InputFormatBase");
299    final Table htable = createTable(Bytes.toBytes("exampleTable"),
300      new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
301    testInputFormat(ExampleTIF.class);
302  }
303
304  @Test
305  public void testJobConfigurableExtensionOfTableInputFormatBase()
306    throws IOException, InterruptedException, ClassNotFoundException {
307    LOG.info(
308      "testing use of an InputFormat taht extends InputFormatBase, " + "using JobConfigurable.");
309    final Table htable = createTable(Bytes.toBytes("exampleJobConfigurableTable"),
310      new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
311    testInputFormat(ExampleJobConfigurableTIF.class);
312  }
313
314  @Test
315  public void testDeprecatedExtensionOfTableInputFormatBase()
316    throws IOException, InterruptedException, ClassNotFoundException {
317    LOG.info("testing use of an InputFormat taht extends InputFormatBase, "
318      + "using the approach documented in 0.98.");
319    final Table htable = createTable(Bytes.toBytes("exampleDeprecatedTable"),
320      new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
321    testInputFormat(ExampleDeprecatedTIF.class);
322  }
323
324  void testInputFormat(Class<? extends InputFormat> clazz)
325    throws IOException, InterruptedException, ClassNotFoundException {
326    final Job job = MapreduceTestingShim.createJob(UTIL.getConfiguration());
327    job.setInputFormatClass(clazz);
328    job.setOutputFormatClass(NullOutputFormat.class);
329    job.setMapperClass(ExampleVerifier.class);
330    job.setNumReduceTasks(0);
331
332    LOG.debug("submitting job.");
333    assertTrue(job.waitForCompletion(true), "job failed!");
334    assertEquals(2, job.getCounters()
335      .findCounter(TestTableInputFormat.class.getName() + ":row", "aaa").getValue(),
336      "Saw the wrong number of instances of the filtered-for row.");
337    assertEquals(0, job.getCounters()
338      .findCounter(TestTableInputFormat.class.getName() + ":row", "bbb").getValue(),
339      "Saw any instances of the filtered out row.");
340    assertEquals(1, job.getCounters()
341      .findCounter(TestTableInputFormat.class.getName() + ":family", "columnA").getValue(),
342      "Saw the wrong number of instances of columnA.");
343    assertEquals(1, job.getCounters()
344      .findCounter(TestTableInputFormat.class.getName() + ":family", "columnB").getValue(),
345      "Saw the wrong number of instances of columnB.");
346    assertEquals(2, job.getCounters()
347      .findCounter(TestTableInputFormat.class.getName() + ":value", "value aaa").getValue(),
348      "Saw the wrong count of values for the filtered-for row.");
349    assertEquals(0, job.getCounters()
350      .findCounter(TestTableInputFormat.class.getName() + ":value", "value bbb").getValue(),
351      "Saw the wrong count of values for the filtered-out row.");
352  }
353
354  public static class ExampleVerifier extends TableMapper<NullWritable, NullWritable> {
355
356    @Override
357    public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException {
358      for (Cell cell : value.listCells()) {
359        context
360          .getCounter(TestTableInputFormat.class.getName() + ":row",
361            Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()))
362          .increment(1l);
363        context
364          .getCounter(TestTableInputFormat.class.getName() + ":family",
365            Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()))
366          .increment(1l);
367        context
368          .getCounter(TestTableInputFormat.class.getName() + ":value",
369            Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()))
370          .increment(1l);
371      }
372    }
373
374  }
375
376  public static class ExampleDeprecatedTIF extends TableInputFormatBase implements JobConfigurable {
377
378    @Override
379    public void configure(JobConf job) {
380      try {
381        Connection connection = ConnectionFactory.createConnection(job);
382        Table exampleTable = connection.getTable(TableName.valueOf(("exampleDeprecatedTable")));
383        // mandatory
384        initializeTable(connection, exampleTable.getName());
385        byte[][] inputColumns = new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") };
386        // optional
387        Scan scan = new Scan();
388        for (byte[] family : inputColumns) {
389          scan.addFamily(family);
390        }
391        Filter exampleFilter =
392          new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*"));
393        scan.setFilter(exampleFilter);
394        setScan(scan);
395      } catch (IOException exception) {
396        throw new RuntimeException("Failed to configure for job.", exception);
397      }
398    }
399
400  }
401
402  public static class ExampleJobConfigurableTIF extends TableInputFormatBase
403    implements JobConfigurable {
404
405    @Override
406    public void configure(JobConf job) {
407      try {
408        Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job));
409        TableName tableName = TableName.valueOf("exampleJobConfigurableTable");
410        // mandatory
411        initializeTable(connection, tableName);
412        byte[][] inputColumns = new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") };
413        // optional
414        Scan scan = new Scan();
415        for (byte[] family : inputColumns) {
416          scan.addFamily(family);
417        }
418        Filter exampleFilter =
419          new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*"));
420        scan.setFilter(exampleFilter);
421        setScan(scan);
422      } catch (IOException exception) {
423        throw new RuntimeException("Failed to initialize.", exception);
424      }
425    }
426  }
427
428  public static class ExampleTIF extends TableInputFormatBase {
429
430    @Override
431    protected void initialize(JobContext job) throws IOException {
432      Connection connection =
433        ConnectionFactory.createConnection(HBaseConfiguration.create(job.getConfiguration()));
434      TableName tableName = TableName.valueOf("exampleTable");
435      // mandatory
436      initializeTable(connection, tableName);
437      byte[][] inputColumns = new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") };
438      // optional
439      Scan scan = new Scan();
440      for (byte[] family : inputColumns) {
441        scan.addFamily(family);
442      }
443      Filter exampleFilter =
444        new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*"));
445      scan.setFilter(exampleFilter);
446      setScan(scan);
447    }
448
449  }
450}