001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapred;
019
020import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT;
021import static org.mockito.Mockito.mock;
022
023import java.io.IOException;
024import java.util.Iterator;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.HBaseClassTestRule;
027import org.apache.hadoop.hbase.HBaseTestingUtil;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.Result;
031import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
032import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatTestBase;
033import org.apache.hadoop.hbase.testclassification.LargeTests;
034import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.hbase.util.RegionSplitter;
037import org.apache.hadoop.io.NullWritable;
038import org.apache.hadoop.mapred.InputSplit;
039import org.apache.hadoop.mapred.JobClient;
040import org.apache.hadoop.mapred.JobConf;
041import org.apache.hadoop.mapred.MapReduceBase;
042import org.apache.hadoop.mapred.OutputCollector;
043import org.apache.hadoop.mapred.RecordReader;
044import org.apache.hadoop.mapred.Reducer;
045import org.apache.hadoop.mapred.Reporter;
046import org.apache.hadoop.mapred.RunningJob;
047import org.apache.hadoop.mapred.lib.NullOutputFormat;
048import org.junit.Assert;
049import org.junit.ClassRule;
050import org.junit.Ignore;
051import org.junit.Rule;
052import org.junit.Test;
053import org.junit.experimental.categories.Category;
054import org.junit.rules.TestName;
055
056@Category({ VerySlowMapReduceTests.class, LargeTests.class })
057public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBase {
058
059  @ClassRule
060  public static final HBaseClassTestRule CLASS_RULE =
061    HBaseClassTestRule.forClass(TestTableSnapshotInputFormat.class);
062
063  private static final byte[] aaa = Bytes.toBytes("aaa");
064  private static final byte[] after_zzz = Bytes.toBytes("zz{"); // 'z' + 1 => '{'
065  private static final String COLUMNS =
066    Bytes.toString(FAMILIES[0]) + " " + Bytes.toString(FAMILIES[1]);
067
068  @Rule
069  public TestName name = new TestName();
070
071  @Override
072  protected byte[] getStartRow() {
073    return aaa;
074  }
075
076  @Override
077  protected byte[] getEndRow() {
078    return after_zzz;
079  }
080
081  static class TestTableSnapshotMapper extends MapReduceBase
082    implements TableMap<ImmutableBytesWritable, NullWritable> {
083    @Override
084    public void map(ImmutableBytesWritable key, Result value,
085      OutputCollector<ImmutableBytesWritable, NullWritable> collector, Reporter reporter)
086      throws IOException {
087      verifyRowFromMap(key, value);
088      collector.collect(key, NullWritable.get());
089    }
090  }
091
092  public static class TestTableSnapshotReducer extends MapReduceBase
093    implements Reducer<ImmutableBytesWritable, NullWritable, NullWritable, NullWritable> {
094    HBaseTestingUtil.SeenRowTracker rowTracker =
095      new HBaseTestingUtil.SeenRowTracker(aaa, after_zzz);
096
097    @Override
098    public void reduce(ImmutableBytesWritable key, Iterator<NullWritable> values,
099      OutputCollector<NullWritable, NullWritable> collector, Reporter reporter) throws IOException {
100      rowTracker.addRow(key.get());
101    }
102
103    @Override
104    public void close() {
105      rowTracker.validate();
106    }
107  }
108
109  @Test
110  public void testInitTableSnapshotMapperJobConfig() throws Exception {
111    final TableName tableName = TableName.valueOf(name.getMethodName());
112    String snapshotName = "foo";
113
114    try {
115      createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1);
116      JobConf job = new JobConf(UTIL.getConfiguration());
117      Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
118
119      TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS,
120        TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
121        tmpTableDir);
122
123      // TODO: would be better to examine directly the cache instance that results from this
124      // config. Currently this is not possible because BlockCache initialization is static.
125      Assert.assertEquals("Snapshot job should be configured for default LruBlockCache.",
126        HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT,
127        job.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, -1), 0.01);
128      Assert.assertEquals("Snapshot job should not use BucketCache.", 0,
129        job.getFloat("hbase.bucketcache.size", -1), 0.01);
130    } finally {
131      UTIL.getAdmin().deleteSnapshot(snapshotName);
132      UTIL.deleteTable(tableName);
133    }
134  }
135
136  // TODO: mapred does not support limiting input range by startrow, endrow.
137  // Thus the following tests must override parameterverification.
138
139  @Test
140  @Override
141  public void testWithMockedMapReduceMultiRegion() throws Exception {
142    testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 1, 10, true);
143    // It does not matter whether true or false is given to setLocalityEnabledTo,
144    // because it is not read in testWithMockedMapReduce().
145  }
146
147  @Test
148  @Override
149  public void testWithMapReduceMultiRegion() throws Exception {
150    testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 1, 10, false);
151  }
152
153  @Test
154  @Override
155  // run the MR job while HBase is offline
156  public void testWithMapReduceAndOfflineHBaseMultiRegion() throws Exception {
157    testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 1, 10, true);
158  }
159
160  @Override
161  public void testRestoreSnapshotDoesNotCreateBackRefLinksInit(TableName tableName,
162    String snapshotName, Path tmpTableDir) throws Exception {
163    JobConf job = new JobConf(UTIL.getConfiguration());
164    TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS, TestTableSnapshotMapper.class,
165      ImmutableBytesWritable.class, NullWritable.class, job, false, tmpTableDir);
166  }
167
168  @Override
169  protected void testWithMockedMapReduce(HBaseTestingUtil util, String snapshotName, int numRegions,
170    int numSplitsPerRegion, int expectedNumSplits, boolean setLocalityEnabledTo) throws Exception {
171    final TableName tableName = TableName.valueOf(name.getMethodName());
172    try {
173      createTableAndSnapshot(util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions);
174
175      JobConf job = new JobConf(util.getConfiguration());
176      // setLocalityEnabledTo is ignored no matter what is specified, so as to test the case that
177      // SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY is not explicitly specified
178      // and the default value is taken.
179      Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName);
180
181      if (numSplitsPerRegion > 1) {
182        TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS,
183          TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job,
184          false, tmpTableDir, new RegionSplitter.UniformSplit(), numSplitsPerRegion);
185      } else {
186        TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS,
187          TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job,
188          false, tmpTableDir);
189      }
190
191      // mapred doesn't support start and end keys? o.O
192      verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow());
193
194    } finally {
195      util.getAdmin().deleteSnapshot(snapshotName);
196      util.deleteTable(tableName);
197    }
198  }
199
200  private void verifyWithMockedMapReduce(JobConf job, int numRegions, int expectedNumSplits,
201    byte[] startRow, byte[] stopRow) throws IOException, InterruptedException {
202    TableSnapshotInputFormat tsif = new TableSnapshotInputFormat();
203    InputSplit[] splits = tsif.getSplits(job, 0);
204
205    Assert.assertEquals(expectedNumSplits, splits.length);
206
207    HBaseTestingUtil.SeenRowTracker rowTracker =
208      new HBaseTestingUtil.SeenRowTracker(startRow, stopRow);
209
210    // SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY is not explicitly specified,
211    // so the default value is taken.
212    boolean localityEnabled = SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT;
213
214    for (int i = 0; i < splits.length; i++) {
215      // validate input split
216      InputSplit split = splits[i];
217      Assert.assertTrue(split instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit);
218      if (localityEnabled) {
219        // When localityEnabled is true, meant to verify split.getLocations()
220        // by the following statement:
221        // Assert.assertTrue(split.getLocations() != null && split.getLocations().length != 0);
222        // However, getLocations() of some splits could return an empty array (length is 0),
223        // so drop the verification on length.
224        // TODO: investigate how to verify split.getLocations() when localityEnabled is true
225        Assert.assertTrue(split.getLocations() != null);
226      } else {
227        Assert.assertTrue(split.getLocations() != null && split.getLocations().length == 0);
228      }
229
230      // validate record reader
231      OutputCollector collector = mock(OutputCollector.class);
232      Reporter reporter = mock(Reporter.class);
233      RecordReader<ImmutableBytesWritable, Result> rr = tsif.getRecordReader(split, job, reporter);
234
235      // validate we can read all the data back
236      ImmutableBytesWritable key = rr.createKey();
237      Result value = rr.createValue();
238      while (rr.next(key, value)) {
239        verifyRowFromMap(key, value);
240        rowTracker.addRow(key.copyBytes());
241      }
242
243      rr.close();
244    }
245
246    // validate all rows are seen
247    rowTracker.validate();
248  }
249
250  @Override
251  protected void testWithMapReduceImpl(HBaseTestingUtil util, TableName tableName,
252    String snapshotName, Path tableDir, int numRegions, int numSplitsPerRegion,
253    int expectedNumSplits, boolean shutdownCluster) throws Exception {
254    doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir,
255      numRegions, numSplitsPerRegion, expectedNumSplits, shutdownCluster);
256  }
257
258  // this is also called by the IntegrationTestTableSnapshotInputFormat
259  public static void doTestWithMapReduce(HBaseTestingUtil util, TableName tableName,
260    String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions,
261    int numSplitsPerRegion, int expectedNumSplits, boolean shutdownCluster) throws Exception {
262
263    // create the table and snapshot
264    createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions);
265
266    if (shutdownCluster) {
267      util.shutdownMiniHBaseCluster();
268    }
269
270    try {
271      // create the job
272      JobConf jobConf = new JobConf(util.getConfiguration());
273
274      jobConf.setJarByClass(util.getClass());
275      org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJarsForClasses(jobConf,
276        TestTableSnapshotInputFormat.class);
277
278      if (numSplitsPerRegion > 1) {
279        TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS,
280          TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, jobConf,
281          true, tableDir, new RegionSplitter.UniformSplit(), numSplitsPerRegion);
282      } else {
283        TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS,
284          TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, jobConf,
285          true, tableDir);
286      }
287
288      jobConf.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class);
289      jobConf.setNumReduceTasks(1);
290      jobConf.setOutputFormat(NullOutputFormat.class);
291
292      RunningJob job = JobClient.runJob(jobConf);
293      Assert.assertTrue(job.isSuccessful());
294    } finally {
295      if (!shutdownCluster) {
296        util.getAdmin().deleteSnapshot(snapshotName);
297        util.deleteTable(tableName);
298      }
299    }
300  }
301
302  @Ignore // Ignored in mapred package because it keeps failing but allowed in mapreduce package.
303  @Test
304  public void testWithMapReduceMultipleMappersPerRegion() throws Exception {
305    testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 5, 50, false);
306  }
307}