001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapred;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertThrows;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024import static org.mockito.ArgumentMatchers.any;
025import static org.mockito.Mockito.doAnswer;
026import static org.mockito.Mockito.doReturn;
027import static org.mockito.Mockito.doThrow;
028import static org.mockito.Mockito.mock;
029import static org.mockito.Mockito.spy;
030
031import java.io.IOException;
032import java.util.Arrays;
033import java.util.Map;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.CompareOperator;
037import org.apache.hadoop.hbase.HBaseConfiguration;
038import org.apache.hadoop.hbase.HBaseTestingUtil;
039import org.apache.hadoop.hbase.NotServingRegionException;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.client.Connection;
042import org.apache.hadoop.hbase.client.ConnectionFactory;
043import org.apache.hadoop.hbase.client.Put;
044import org.apache.hadoop.hbase.client.Result;
045import org.apache.hadoop.hbase.client.ResultScanner;
046import org.apache.hadoop.hbase.client.Scan;
047import org.apache.hadoop.hbase.client.Table;
048import org.apache.hadoop.hbase.filter.Filter;
049import org.apache.hadoop.hbase.filter.RegexStringComparator;
050import org.apache.hadoop.hbase.filter.RowFilter;
051import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
052import org.apache.hadoop.hbase.testclassification.LargeTests;
053import org.apache.hadoop.hbase.testclassification.MapReduceTests;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.apache.hadoop.io.NullWritable;
056import org.apache.hadoop.mapred.InputFormat;
057import org.apache.hadoop.mapred.JobClient;
058import org.apache.hadoop.mapred.JobConf;
059import org.apache.hadoop.mapred.JobConfigurable;
060import org.apache.hadoop.mapred.OutputCollector;
061import org.apache.hadoop.mapred.Reporter;
062import org.apache.hadoop.mapred.RunningJob;
063import org.apache.hadoop.mapred.lib.NullOutputFormat;
064import org.junit.jupiter.api.AfterAll;
065import org.junit.jupiter.api.BeforeAll;
066import org.junit.jupiter.api.BeforeEach;
067import org.junit.jupiter.api.Tag;
068import org.junit.jupiter.api.Test;
069import org.mockito.invocation.InvocationOnMock;
070import org.mockito.stubbing.Answer;
071import org.slf4j.Logger;
072import org.slf4j.LoggerFactory;
073
074/**
075 * This tests the TableInputFormat and its recovery semantics
076 */
077@Tag(MapReduceTests.TAG)
078@Tag(LargeTests.TAG)
079public class TestTableInputFormat {
080
081  private static final Logger LOG = LoggerFactory.getLogger(TestTableInputFormat.class);
082
083  private final static HBaseTestingUtil UTIL = new HBaseTestingUtil();
084
085  static final byte[] FAMILY = Bytes.toBytes("family");
086
087  private static final byte[][] columns = new byte[][] { FAMILY };
088
089  @BeforeAll
090  public static void beforeClass() throws Exception {
091    UTIL.startMiniCluster();
092  }
093
094  @AfterAll
095  public static void afterClass() throws Exception {
096    UTIL.shutdownMiniCluster();
097  }
098
099  @BeforeEach
100  public void before() throws IOException {
101    LOG.info("before");
102    UTIL.ensureSomeRegionServersAvailable(1);
103    LOG.info("before done");
104  }
105
106  /**
107   * Setup a table with two rows and values.
108   * @param tableName the name of the table to create
109   * @return A Table instance for the created table.
110   */
111  public static Table createTable(byte[] tableName) throws IOException {
112    return createTable(tableName, new byte[][] { FAMILY });
113  }
114
115  /**
116   * Setup a table with two rows and values per column family.
117   * @return A Table instance for the created table.
118   */
119  public static Table createTable(byte[] tableName, byte[][] families) throws IOException {
120    Table table = UTIL.createTable(TableName.valueOf(tableName), families);
121    Put p = new Put(Bytes.toBytes("aaa"));
122    for (byte[] family : families) {
123      p.addColumn(family, null, Bytes.toBytes("value aaa"));
124    }
125    table.put(p);
126    p = new Put(Bytes.toBytes("bbb"));
127    for (byte[] family : families) {
128      p.addColumn(family, null, Bytes.toBytes("value bbb"));
129    }
130    table.put(p);
131    return table;
132  }
133
134  /**
135   * Verify that the result and key have expected values.
136   * @param r             single row result
137   * @param key           the row key
138   * @param expectedKey   the expected key
139   * @param expectedValue the expected value
140   * @return true if the result contains the expected key and value, false otherwise.
141   */
142  static boolean checkResult(Result r, ImmutableBytesWritable key, byte[] expectedKey,
143    byte[] expectedValue) {
144    assertEquals(0, key.compareTo(expectedKey));
145    Map<byte[], byte[]> vals = r.getFamilyMap(FAMILY);
146    byte[] value = vals.values().iterator().next();
147    assertTrue(Arrays.equals(value, expectedValue));
148    return true; // if succeed
149  }
150
151  /**
152   * Create table data and run tests on specified htable using the o.a.h.hbase.mapred API.
153   */
154  static void runTestMapred(Table table) throws IOException {
155    org.apache.hadoop.hbase.mapred.TableRecordReader trr =
156      new org.apache.hadoop.hbase.mapred.TableRecordReader();
157    trr.setStartRow(Bytes.toBytes("aaa"));
158    trr.setEndRow(Bytes.toBytes("zzz"));
159    trr.setHTable(table);
160    trr.setInputColumns(columns);
161
162    trr.init();
163    Result r = new Result();
164    ImmutableBytesWritable key = new ImmutableBytesWritable();
165
166    boolean more = trr.next(key, r);
167    assertTrue(more);
168    checkResult(r, key, Bytes.toBytes("aaa"), Bytes.toBytes("value aaa"));
169
170    more = trr.next(key, r);
171    assertTrue(more);
172    checkResult(r, key, Bytes.toBytes("bbb"), Bytes.toBytes("value bbb"));
173
174    // no more data
175    more = trr.next(key, r);
176    assertFalse(more);
177  }
178
179  /**
180   * Create a table that IOE's on first scanner next call
181   */
182  static Table createIOEScannerTable(byte[] name, final int failCnt) throws IOException {
183    // build up a mock scanner stuff to fail the first time
184    Answer<ResultScanner> a = new Answer<ResultScanner>() {
185      int cnt = 0;
186
187      @Override
188      public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
189        // first invocation return the busted mock scanner
190        if (cnt++ < failCnt) {
191          // create mock ResultScanner that always fails.
192          Scan scan = mock(Scan.class);
193          doReturn(Bytes.toBytes("bogus")).when(scan).getStartRow(); // avoid npe
194          ResultScanner scanner = mock(ResultScanner.class);
195          // simulate TimeoutException / IOException
196          doThrow(new IOException("Injected exception")).when(scanner).next();
197          return scanner;
198        }
199
200        // otherwise return the real scanner.
201        return (ResultScanner) invocation.callRealMethod();
202      }
203    };
204
205    Table htable = spy(createTable(name));
206    doAnswer(a).when(htable).getScanner(any(Scan.class));
207    return htable;
208  }
209
210  /**
211   * Create a table that throws a DoNoRetryIOException on first scanner next call
212   */
213  static Table createDNRIOEScannerTable(byte[] name, final int failCnt) throws IOException {
214    // build up a mock scanner stuff to fail the first time
215    Answer<ResultScanner> a = new Answer<ResultScanner>() {
216      int cnt = 0;
217
218      @Override
219      public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
220        // first invocation return the busted mock scanner
221        if (cnt++ < failCnt) {
222          // create mock ResultScanner that always fails.
223          Scan scan = mock(Scan.class);
224          doReturn(Bytes.toBytes("bogus")).when(scan).getStartRow(); // avoid npe
225          ResultScanner scanner = mock(ResultScanner.class);
226
227          invocation.callRealMethod(); // simulate NotServingRegionException
228          doThrow(new NotServingRegionException("Injected simulated TimeoutException"))
229            .when(scanner).next();
230          return scanner;
231        }
232
233        // otherwise return the real scanner.
234        return (ResultScanner) invocation.callRealMethod();
235      }
236    };
237
238    Table htable = spy(createTable(name));
239    doAnswer(a).when(htable).getScanner(any(Scan.class));
240    return htable;
241  }
242
243  /**
244   * Run test assuming no errors using mapred api.
245   */
246  @Test
247  public void testTableRecordReader() throws IOException {
248    Table table = createTable(Bytes.toBytes("table1"));
249    runTestMapred(table);
250  }
251
252  /**
253   * Run test assuming Scanner IOException failure using mapred api,
254   */
255  @Test
256  public void testTableRecordReaderScannerFail() throws IOException {
257    Table htable = createIOEScannerTable(Bytes.toBytes("table2"), 1);
258    runTestMapred(htable);
259  }
260
261  /**
262   * Run test assuming Scanner IOException failure using mapred api,
263   */
264  @Test
265  public void testTableRecordReaderScannerFailTwice() throws IOException {
266    Table htable = createIOEScannerTable(Bytes.toBytes("table3"), 2);
267    assertThrows(IOException.class, () -> runTestMapred(htable));
268  }
269
270  /**
271   * Run test assuming NotServingRegionException using mapred api.
272   * @throws org.apache.hadoop.hbase.DoNotRetryIOException
273   */
274  @Test
275  public void testTableRecordReaderScannerTimeout() throws IOException {
276    Table htable = createDNRIOEScannerTable(Bytes.toBytes("table4"), 1);
277    runTestMapred(htable);
278  }
279
280  /**
281   * Run test assuming NotServingRegionException using mapred api.
282   * @throws org.apache.hadoop.hbase.DoNotRetryIOException
283   */
284  @Test
285  public void testTableRecordReaderScannerTimeoutTwice() throws IOException {
286    Table htable = createDNRIOEScannerTable(Bytes.toBytes("table5"), 2);
287    assertThrows(org.apache.hadoop.hbase.NotServingRegionException.class,
288      () -> runTestMapred(htable));
289  }
290
291  /**
292   * Verify the example we present in javadocs on TableInputFormatBase
293   */
294  @Test
295  public void testExtensionOfTableInputFormatBase() throws IOException {
296    LOG.info("testing use of an InputFormat taht extends InputFormatBase");
297    final Table table = createTable(Bytes.toBytes("exampleTable"),
298      new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
299    testInputFormat(ExampleTIF.class);
300  }
301
302  @Test
303  public void testDeprecatedExtensionOfTableInputFormatBase() throws IOException {
304    LOG.info(
305      "testing use of an InputFormat taht extends InputFormatBase, " + "as it was given in 0.98.");
306    final Table table = createTable(Bytes.toBytes("exampleDeprecatedTable"),
307      new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
308    testInputFormat(ExampleDeprecatedTIF.class);
309  }
310
311  @Test
312  public void testJobConfigurableExtensionOfTableInputFormatBase() throws IOException {
313    LOG.info(
314      "testing use of an InputFormat taht extends InputFormatBase, " + "using JobConfigurable.");
315    final Table table = createTable(Bytes.toBytes("exampleJobConfigurableTable"),
316      new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
317    testInputFormat(ExampleJobConfigurableTIF.class);
318  }
319
320  void testInputFormat(Class<? extends InputFormat> clazz) throws IOException {
321    Configuration conf = UTIL.getConfiguration();
322    final JobConf job = new JobConf(conf);
323    job.setInputFormat(clazz);
324    job.setOutputFormat(NullOutputFormat.class);
325    job.setMapperClass(ExampleVerifier.class);
326    job.setNumReduceTasks(0);
327    LOG.debug("submitting job.");
328    final RunningJob run = JobClient.runJob(job);
329    assertTrue(run.isSuccessful(), "job failed!");
330    assertEquals(2, run.getCounters()
331      .findCounter(TestTableInputFormat.class.getName() + ":row", "aaa").getCounter(),
332      "Saw the wrong number of instances of the filtered-for row.");
333    assertEquals(0, run.getCounters()
334      .findCounter(TestTableInputFormat.class.getName() + ":row", "bbb").getCounter(),
335      "Saw any instances of the filtered out row.");
336    assertEquals(1, run.getCounters()
337      .findCounter(TestTableInputFormat.class.getName() + ":family", "columnA").getCounter(),
338      "Saw the wrong number of instances of columnA.");
339    assertEquals(1, run.getCounters()
340      .findCounter(TestTableInputFormat.class.getName() + ":family", "columnB").getCounter(),
341      "Saw the wrong number of instances of columnB.");
342    assertEquals(2, run.getCounters()
343      .findCounter(TestTableInputFormat.class.getName() + ":value", "value aaa").getCounter(),
344      "Saw the wrong count of values for the filtered-for row.");
345    assertEquals(0, run.getCounters()
346      .findCounter(TestTableInputFormat.class.getName() + ":value", "value bbb").getCounter(),
347      "Saw the wrong count of values for the filtered-out row.");
348  }
349
350  public static class ExampleVerifier implements TableMap<NullWritable, NullWritable> {
351
352    @Override
353    public void configure(JobConf conf) {
354    }
355
356    @Override
357    public void map(ImmutableBytesWritable key, Result value,
358      OutputCollector<NullWritable, NullWritable> output, Reporter reporter) throws IOException {
359      for (Cell cell : value.listCells()) {
360        reporter
361          .getCounter(TestTableInputFormat.class.getName() + ":row",
362            Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()))
363          .increment(1l);
364        reporter
365          .getCounter(TestTableInputFormat.class.getName() + ":family",
366            Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()))
367          .increment(1l);
368        reporter
369          .getCounter(TestTableInputFormat.class.getName() + ":value",
370            Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()))
371          .increment(1l);
372      }
373    }
374
375    @Override
376    public void close() {
377    }
378
379  }
380
381  public static class ExampleDeprecatedTIF extends TableInputFormatBase implements JobConfigurable {
382
383    @Override
384    public void configure(JobConf job) {
385      try {
386        Connection connection = ConnectionFactory.createConnection(job);
387        Table exampleTable = connection.getTable(TableName.valueOf("exampleDeprecatedTable"));
388        // mandatory
389        initializeTable(connection, exampleTable.getName());
390        byte[][] inputColumns = new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") };
391        // mandatory
392        setInputColumns(inputColumns);
393        Filter exampleFilter =
394          new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*"));
395        // optional
396        setRowFilter(exampleFilter);
397      } catch (IOException exception) {
398        throw new RuntimeException("Failed to configure for job.", exception);
399      }
400    }
401
402  }
403
404  public static class ExampleJobConfigurableTIF extends ExampleTIF implements JobConfigurable {
405
406    @Override
407    public void configure(JobConf job) {
408      try {
409        initialize(job);
410      } catch (IOException exception) {
411        throw new RuntimeException("Failed to initialize.", exception);
412      }
413    }
414
415    @Override
416    protected void initialize(JobConf job) throws IOException {
417      initialize(job, "exampleJobConfigurableTable");
418    }
419  }
420
421  public static class ExampleTIF extends TableInputFormatBase {
422
423    @Override
424    protected void initialize(JobConf job) throws IOException {
425      initialize(job, "exampleTable");
426    }
427
428    protected void initialize(JobConf job, String table) throws IOException {
429      Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job));
430      TableName tableName = TableName.valueOf(table);
431      // mandatory
432      initializeTable(connection, tableName);
433      byte[][] inputColumns = new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") };
434      // mandatory
435      setInputColumns(inputColumns);
436      Filter exampleFilter =
437        new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*"));
438      // optional
439      setRowFilter(exampleFilter);
440    }
441
442  }
443
444}