001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT;
021import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY;
022import static org.mockito.Mockito.mock;
023import static org.mockito.Mockito.when;
024
025import java.io.IOException;
026import java.util.Arrays;
027import java.util.List;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.HDFSBlocksDistribution;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.Admin;
037import org.apache.hadoop.hbase.client.Result;
038import org.apache.hadoop.hbase.client.Scan;
039import org.apache.hadoop.hbase.client.Table;
040import org.apache.hadoop.hbase.client.TestTableSnapshotScanner;
041import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
042import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit;
043import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
044import org.apache.hadoop.hbase.testclassification.LargeTests;
045import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.hadoop.hbase.util.FSUtils;
048import org.apache.hadoop.hbase.util.RegionSplitter;
049import org.apache.hadoop.io.NullWritable;
050import org.apache.hadoop.mapreduce.InputSplit;
051import org.apache.hadoop.mapreduce.Job;
052import org.apache.hadoop.mapreduce.RecordReader;
053import org.apache.hadoop.mapreduce.Reducer;
054import org.apache.hadoop.mapreduce.TaskAttemptContext;
055import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
056import org.junit.After;
057import org.junit.Assert;
058import org.junit.ClassRule;
059import org.junit.Rule;
060import org.junit.Test;
061import org.junit.experimental.categories.Category;
062import org.junit.rules.TestName;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
067
068@Category({VerySlowMapReduceTests.class, LargeTests.class})
069public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBase {
070
071  @ClassRule
072  public static final HBaseClassTestRule CLASS_RULE =
073      HBaseClassTestRule.forClass(TestTableSnapshotInputFormat.class);
074
075  private static final Logger LOG = LoggerFactory.getLogger(TestTableSnapshotInputFormat.class);
076
077  private static final byte[] bbb = Bytes.toBytes("bbb");
078  private static final byte[] yyy = Bytes.toBytes("yyy");
079  private static final byte[] bbc = Bytes.toBytes("bbc");
080  private static final byte[] yya = Bytes.toBytes("yya");
081
082  @Rule
083  public TestName name = new TestName();
084
085  @Override
086  protected byte[] getStartRow() {
087    return bbb;
088  }
089
090  @Override
091  protected byte[] getEndRow() {
092    return yyy;
093  }
094
095  @After
096  public void tearDown() throws Exception {
097  }
098
099  @Test
100  public void testGetBestLocations() throws IOException {
101    TableSnapshotInputFormatImpl tsif = new TableSnapshotInputFormatImpl();
102    Configuration conf = UTIL.getConfiguration();
103
104    HDFSBlocksDistribution blockDistribution = new HDFSBlocksDistribution();
105    Assert.assertEquals(null,
106      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
107
108    blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 1);
109    Assert.assertEquals(Lists.newArrayList("h1"),
110      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
111
112    blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 1);
113    Assert.assertEquals(Lists.newArrayList("h1"),
114      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
115
116    blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 1);
117    Assert.assertEquals(Lists.newArrayList("h1"),
118      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
119
120    blockDistribution = new HDFSBlocksDistribution();
121    blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 10);
122    blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 7);
123    blockDistribution.addHostsAndBlockWeight(new String[] {"h3"}, 5);
124    blockDistribution.addHostsAndBlockWeight(new String[] {"h4"}, 1);
125    Assert.assertEquals(Lists.newArrayList("h1"),
126      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
127
128    blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 2);
129    Assert.assertEquals(Lists.newArrayList("h1", "h2"),
130      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
131
132    blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 3);
133    Assert.assertEquals(Lists.newArrayList("h2", "h1"),
134      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
135
136    blockDistribution.addHostsAndBlockWeight(new String[] {"h3"}, 6);
137    blockDistribution.addHostsAndBlockWeight(new String[] {"h4"}, 9);
138
139    Assert.assertEquals(Lists.newArrayList("h2", "h3", "h4"),
140      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
141  }
142
143  public static enum TestTableSnapshotCounters {
144    VALIDATION_ERROR
145  }
146
147  public static class TestTableSnapshotMapper
148    extends TableMapper<ImmutableBytesWritable, NullWritable> {
149    @Override
150    protected void map(ImmutableBytesWritable key, Result value,
151        Context context) throws IOException, InterruptedException {
152      // Validate a single row coming from the snapshot, and emit the row key
153      verifyRowFromMap(key, value);
154      context.write(key, NullWritable.get());
155    }
156  }
157
158  public static class TestTableSnapshotReducer
159    extends Reducer<ImmutableBytesWritable, NullWritable, NullWritable, NullWritable> {
160    HBaseTestingUtility.SeenRowTracker rowTracker =
161        new HBaseTestingUtility.SeenRowTracker(bbb, yyy);
162    @Override
163    protected void reduce(ImmutableBytesWritable key, Iterable<NullWritable> values,
164       Context context) throws IOException, InterruptedException {
165      rowTracker.addRow(key.get());
166    }
167
168    @Override
169    protected void cleanup(Context context) throws IOException,
170        InterruptedException {
171      rowTracker.validate();
172    }
173  }
174
175  @Test
176  public void testInitTableSnapshotMapperJobConfig() throws Exception {
177    setupCluster();
178    final TableName tableName = TableName.valueOf(name.getMethodName());
179    String snapshotName = "foo";
180
181    try {
182      createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1);
183      Job job = new Job(UTIL.getConfiguration());
184      Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
185
186      TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
187        new Scan(), TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
188        NullWritable.class, job, false, tmpTableDir);
189
190      // TODO: would be better to examine directly the cache instance that results from this
191      // config. Currently this is not possible because BlockCache initialization is static.
192      Assert.assertEquals(
193        "Snapshot job should be configured for default LruBlockCache.",
194        HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT,
195        job.getConfiguration().getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, -1), 0.01);
196      Assert.assertEquals(
197        "Snapshot job should not use BucketCache.",
198        0, job.getConfiguration().getFloat("hbase.bucketcache.size", -1), 0.01);
199    } finally {
200      UTIL.getAdmin().deleteSnapshot(snapshotName);
201      UTIL.deleteTable(tableName);
202      tearDownCluster();
203    }
204  }
205
206  @Override
207  public void testRestoreSnapshotDoesNotCreateBackRefLinksInit(TableName tableName,
208      String snapshotName, Path tmpTableDir) throws Exception {
209    Job job = new Job(UTIL.getConfiguration());
210    TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
211      new Scan(), TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
212      NullWritable.class, job, false, tmpTableDir);
213  }
214
215  @Override
216  public void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName,
217      int numRegions, int numSplitsPerRegion, int expectedNumSplits, boolean setLocalityEnabledTo)
218      throws Exception {
219    setupCluster();
220    final TableName tableName = TableName.valueOf(name.getMethodName());
221    try {
222      createTableAndSnapshot(
223        util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions);
224
225      Configuration conf = util.getConfiguration();
226      conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, setLocalityEnabledTo);
227      Job job = new Job(conf);
228      Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName);
229      Scan scan = new Scan(getStartRow(), getEndRow()); // limit the scan
230
231      if (numSplitsPerRegion > 1) {
232        TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
233                scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
234                NullWritable.class, job, false, tmpTableDir, new RegionSplitter.UniformSplit(),
235                numSplitsPerRegion);
236      } else {
237        TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
238                scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
239                NullWritable.class, job, false, tmpTableDir);
240      }
241
242      verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow());
243
244    } finally {
245      util.getAdmin().deleteSnapshot(snapshotName);
246      util.deleteTable(tableName);
247      tearDownCluster();
248    }
249  }
250
251  @Test
252  public void testWithMockedMapReduceWithSplitsPerRegion() throws Exception {
253    setupCluster();
254    String snapshotName = "testWithMockedMapReduceMultiRegion";
255    final TableName tableName = TableName.valueOf(name.getMethodName());
256    try {
257      createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 10);
258
259      Configuration conf = UTIL.getConfiguration();
260      conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, false);
261      Job job = new Job(conf);
262      Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
263      // test scan with startRow and stopRow
264      Scan scan = new Scan(bbc, yya);
265
266      TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
267        TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
268        tmpTableDir, new RegionSplitter.UniformSplit(), 5);
269
270      verifyWithMockedMapReduce(job, 10, 40, bbc, yya);
271    } finally {
272      UTIL.getAdmin().deleteSnapshot(snapshotName);
273      UTIL.deleteTable(tableName);
274      tearDownCluster();
275    }
276  }
277
278  @Test
279  public void testWithMockedMapReduceWithNoStartRowStopRow() throws Exception {
280    setupCluster();
281    String snapshotName = "testWithMockedMapReduceMultiRegion";
282    final TableName tableName = TableName.valueOf(name.getMethodName());
283    try {
284      createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 10);
285
286      Configuration conf = UTIL.getConfiguration();
287      conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, false);
288      Job job = new Job(conf);
289      Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
290      // test scan without startRow and stopRow
291      Scan scan2 = new Scan();
292
293      TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan2,
294        TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
295        tmpTableDir, new RegionSplitter.UniformSplit(), 5);
296
297      verifyWithMockedMapReduce(job, 10, 50, HConstants.EMPTY_START_ROW,
298        HConstants.EMPTY_START_ROW);
299
300    } finally {
301      UTIL.getAdmin().deleteSnapshot(snapshotName);
302      UTIL.deleteTable(tableName);
303      tearDownCluster();
304    }
305  }
306
307  @Test
308  public void testNoDuplicateResultsWhenSplitting() throws Exception {
309    setupCluster();
310    TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting");
311    String snapshotName = "testSnapshotBug";
312    try {
313      if (UTIL.getAdmin().tableExists(tableName)) {
314        UTIL.deleteTable(tableName);
315      }
316
317      UTIL.createTable(tableName, FAMILIES);
318      Admin admin = UTIL.getAdmin();
319
320      // put some stuff in the table
321      Table table = UTIL.getConnection().getTable(tableName);
322      UTIL.loadTable(table, FAMILIES);
323
324      // split to 2 regions
325      admin.split(tableName, Bytes.toBytes("eee"));
326      TestTableSnapshotScanner.blockUntilSplitFinished(UTIL, tableName, 2);
327
328      Path rootDir = FSUtils.getRootDir(UTIL.getConfiguration());
329      FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration());
330
331      SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES),
332        null, snapshotName, rootDir, fs, true);
333
334      // load different values
335      byte[] value = Bytes.toBytes("after_snapshot_value");
336      UTIL.loadTable(table, FAMILIES, value);
337
338      // cause flush to create new files in the region
339      admin.flush(tableName);
340      table.close();
341
342      Job job = new Job(UTIL.getConfiguration());
343      Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
344      // limit the scan
345      Scan scan = new Scan().withStartRow(getStartRow()).withStopRow(getEndRow());
346
347      TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
348        TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
349        tmpTableDir);
350
351      verifyWithMockedMapReduce(job, 2, 2, getStartRow(), getEndRow());
352    } finally {
353      UTIL.getAdmin().deleteSnapshot(snapshotName);
354      UTIL.deleteTable(tableName);
355      tearDownCluster();
356    }
357  }
358
359  private void verifyWithMockedMapReduce(Job job, int numRegions, int expectedNumSplits,
360      byte[] startRow, byte[] stopRow)
361      throws IOException, InterruptedException {
362    TableSnapshotInputFormat tsif = new TableSnapshotInputFormat();
363    List<InputSplit> splits = tsif.getSplits(job);
364
365    Assert.assertEquals(expectedNumSplits, splits.size());
366
367    HBaseTestingUtility.SeenRowTracker rowTracker = new HBaseTestingUtility.SeenRowTracker(startRow,
368        stopRow.length > 0 ? stopRow : Bytes.toBytes("\uffff"));
369
370    boolean localityEnabled =
371        job.getConfiguration().getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY,
372                                          SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT);
373
374    for (int i = 0; i < splits.size(); i++) {
375      // validate input split
376      InputSplit split = splits.get(i);
377      Assert.assertTrue(split instanceof TableSnapshotRegionSplit);
378      TableSnapshotRegionSplit snapshotRegionSplit = (TableSnapshotRegionSplit) split;
379      if (localityEnabled) {
380        Assert.assertTrue(split.getLocations() != null && split.getLocations().length != 0);
381      } else {
382        Assert.assertTrue(split.getLocations() != null && split.getLocations().length == 0);
383      }
384
385      Scan scan =
386          TableMapReduceUtil.convertStringToScan(snapshotRegionSplit.getDelegate().getScan());
387      if (startRow.length > 0) {
388        Assert.assertTrue(
389          Bytes.toStringBinary(startRow) + " should <= " + Bytes.toStringBinary(scan.getStartRow()),
390          Bytes.compareTo(startRow, scan.getStartRow()) <= 0);
391      }
392      if (stopRow.length > 0) {
393        Assert.assertTrue(
394          Bytes.toStringBinary(stopRow) + " should >= " + Bytes.toStringBinary(scan.getStopRow()),
395          Bytes.compareTo(stopRow, scan.getStopRow()) >= 0);
396      }
397      Assert.assertTrue("startRow should < stopRow",
398        Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) < 0);
399
400      // validate record reader
401      TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class);
402      when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration());
403      RecordReader<ImmutableBytesWritable, Result> rr =
404          tsif.createRecordReader(split, taskAttemptContext);
405      rr.initialize(split, taskAttemptContext);
406
407      // validate we can read all the data back
408      while (rr.nextKeyValue()) {
409        byte[] row = rr.getCurrentKey().get();
410        verifyRowFromMap(rr.getCurrentKey(), rr.getCurrentValue());
411        rowTracker.addRow(row);
412      }
413
414      rr.close();
415    }
416
417    // validate all rows are seen
418    rowTracker.validate();
419  }
420
421  @Override
422  protected void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName,
423      String snapshotName, Path tableDir, int numRegions, int numSplitsPerRegion,
424      int expectedNumSplits, boolean shutdownCluster) throws Exception {
425    doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir,
426      numRegions, numSplitsPerRegion, expectedNumSplits, shutdownCluster);
427  }
428
429  // this is also called by the IntegrationTestTableSnapshotInputFormat
430  public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName,
431      String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions,
432      int numSplitsPerRegion, int expectedNumSplits, boolean shutdownCluster) throws Exception {
433
434    LOG.info("testing with MapReduce");
435
436    LOG.info("create the table and snapshot");
437    createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions);
438
439    if (shutdownCluster) {
440      LOG.info("shutting down hbase cluster.");
441      util.shutdownMiniHBaseCluster();
442    }
443
444    try {
445      // create the job
446      Job job = new Job(util.getConfiguration());
447      Scan scan = new Scan(startRow, endRow); // limit the scan
448
449      job.setJarByClass(util.getClass());
450      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
451              TestTableSnapshotInputFormat.class);
452
453      if (numSplitsPerRegion > 1) {
454        TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
455                scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
456                NullWritable.class, job, true, tableDir, new RegionSplitter.UniformSplit(),
457                numSplitsPerRegion);
458      } else {
459        TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
460                scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
461                NullWritable.class, job, true, tableDir);
462      }
463
464      job.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class);
465      job.setNumReduceTasks(1);
466      job.setOutputFormatClass(NullOutputFormat.class);
467
468      Assert.assertTrue(job.waitForCompletion(true));
469    } finally {
470      if (!shutdownCluster) {
471        util.getAdmin().deleteSnapshot(snapshotName);
472        util.deleteTable(tableName);
473      }
474    }
475  }
476
477  @Test
478  public void testWithMapReduceMultipleMappersPerRegion() throws Exception {
479    testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 5, 50, false);
480  }
481}