001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapred;
019
020import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT;
021import static org.mockito.Mockito.mock;
022
023import java.io.IOException;
024import java.util.Iterator;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.HBaseClassTestRule;
027import org.apache.hadoop.hbase.HBaseTestingUtility;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.Result;
031import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
032import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatTestBase;
033import org.apache.hadoop.hbase.testclassification.LargeTests;
034import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.hbase.util.RegionSplitter;
037import org.apache.hadoop.io.NullWritable;
038import org.apache.hadoop.mapred.InputSplit;
039import org.apache.hadoop.mapred.JobClient;
040import org.apache.hadoop.mapred.JobConf;
041import org.apache.hadoop.mapred.MapReduceBase;
042import org.apache.hadoop.mapred.OutputCollector;
043import org.apache.hadoop.mapred.RecordReader;
044import org.apache.hadoop.mapred.Reducer;
045import org.apache.hadoop.mapred.Reporter;
046import org.apache.hadoop.mapred.RunningJob;
047import org.apache.hadoop.mapred.lib.NullOutputFormat;
048import org.junit.Assert;
049import org.junit.ClassRule;
050import org.junit.Ignore;
051import org.junit.Rule;
052import org.junit.Test;
053import org.junit.experimental.categories.Category;
054import org.junit.rules.TestName;
055
056@Category({ VerySlowMapReduceTests.class, LargeTests.class })
057public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBase {
058
059  @ClassRule
060  public static final HBaseClassTestRule CLASS_RULE =
061    HBaseClassTestRule.forClass(TestTableSnapshotInputFormat.class);
062
063  private static final byte[] aaa = Bytes.toBytes("aaa");
064  private static final byte[] after_zzz = Bytes.toBytes("zz{"); // 'z' + 1 => '{'
065  private static final String COLUMNS =
066    Bytes.toString(FAMILIES[0]) + " " + Bytes.toString(FAMILIES[1]);
067
068  @Rule
069  public TestName name = new TestName();
070
071  @Override
072  protected byte[] getStartRow() {
073    return aaa;
074  }
075
076  @Override
077  protected byte[] getEndRow() {
078    return after_zzz;
079  }
080
081  static class TestTableSnapshotMapper extends MapReduceBase
082    implements TableMap<ImmutableBytesWritable, NullWritable> {
083    @Override
084    public void map(ImmutableBytesWritable key, Result value,
085      OutputCollector<ImmutableBytesWritable, NullWritable> collector, Reporter reporter)
086      throws IOException {
087      verifyRowFromMap(key, value);
088      collector.collect(key, NullWritable.get());
089    }
090  }
091
092  public static class TestTableSnapshotReducer extends MapReduceBase
093    implements Reducer<ImmutableBytesWritable, NullWritable, NullWritable, NullWritable> {
094    HBaseTestingUtility.SeenRowTracker rowTracker =
095      new HBaseTestingUtility.SeenRowTracker(aaa, after_zzz);
096
097    @Override
098    public void reduce(ImmutableBytesWritable key, Iterator<NullWritable> values,
099      OutputCollector<NullWritable, NullWritable> collector, Reporter reporter) throws IOException {
100      rowTracker.addRow(key.get());
101    }
102
103    @Override
104    public void close() {
105      rowTracker.validate();
106    }
107  }
108
109  @Test
110  public void testInitTableSnapshotMapperJobConfig() throws Exception {
111    final TableName tableName = TableName.valueOf(name.getMethodName());
112    String snapshotName = "foo";
113
114    try {
115      createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1);
116      JobConf job = new JobConf(UTIL.getConfiguration());
117      Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
118
119      TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS,
120        TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
121        tmpTableDir);
122
123      // TODO: would be better to examine directly the cache instance that results from this
124      // config. Currently this is not possible because BlockCache initialization is static.
125      Assert.assertEquals("Snapshot job should be configured for default LruBlockCache.",
126        HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT,
127        job.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, -1), 0.01);
128      Assert.assertEquals("Snapshot job should not use BucketCache.", 0,
129        job.getFloat("hbase.bucketcache.size", -1), 0.01);
130    } finally {
131      UTIL.getAdmin().deleteSnapshot(snapshotName);
132      UTIL.deleteTable(tableName);
133    }
134  }
135
136  // TODO: mapred does not support limiting input range by startrow, endrow.
137  // Thus the following tests must override parameterverification.
138
139  @Test
140  @Override
141  public void testWithMockedMapReduceMultiRegion() throws Exception {
142    testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 1, 10, true);
143    // It does not matter whether true or false is given to setLocalityEnabledTo,
144    // because it is not read in testWithMockedMapReduce().
145  }
146
147  @Test
148  @Override
149  public void testWithMapReduceMultiRegion() throws Exception {
150    testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 1, 10, false);
151  }
152
153  @Test
154  @Override
155  // run the MR job while HBase is offline
156  public void testWithMapReduceAndOfflineHBaseMultiRegion() throws Exception {
157    testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 1, 10, true);
158  }
159
160  @Override
161  public void testRestoreSnapshotDoesNotCreateBackRefLinksInit(TableName tableName,
162    String snapshotName, Path tmpTableDir) throws Exception {
163    JobConf job = new JobConf(UTIL.getConfiguration());
164    TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS, TestTableSnapshotMapper.class,
165      ImmutableBytesWritable.class, NullWritable.class, job, false, tmpTableDir);
166  }
167
168  @Override
169  protected void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName,
170    int numRegions, int numSplitsPerRegion, int expectedNumSplits, boolean setLocalityEnabledTo)
171    throws Exception {
172    final TableName tableName = TableName.valueOf(name.getMethodName());
173    try {
174      createTableAndSnapshot(util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions);
175
176      JobConf job = new JobConf(util.getConfiguration());
177      // setLocalityEnabledTo is ignored no matter what is specified, so as to test the case that
178      // SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY is not explicitly specified
179      // and the default value is taken.
180      Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName);
181
182      if (numSplitsPerRegion > 1) {
183        TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS,
184          TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job,
185          false, tmpTableDir, new RegionSplitter.UniformSplit(), numSplitsPerRegion);
186      } else {
187        TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS,
188          TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job,
189          false, tmpTableDir);
190      }
191
192      // mapred doesn't support start and end keys? o.O
193      verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow());
194
195    } finally {
196      util.getAdmin().deleteSnapshot(snapshotName);
197      util.deleteTable(tableName);
198    }
199  }
200
201  private void verifyWithMockedMapReduce(JobConf job, int numRegions, int expectedNumSplits,
202    byte[] startRow, byte[] stopRow) throws IOException, InterruptedException {
203    TableSnapshotInputFormat tsif = new TableSnapshotInputFormat();
204    InputSplit[] splits = tsif.getSplits(job, 0);
205
206    Assert.assertEquals(expectedNumSplits, splits.length);
207
208    HBaseTestingUtility.SeenRowTracker rowTracker =
209      new HBaseTestingUtility.SeenRowTracker(startRow, stopRow);
210
211    // SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY is not explicitly specified,
212    // so the default value is taken.
213    boolean localityEnabled = SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT;
214
215    for (int i = 0; i < splits.length; i++) {
216      // validate input split
217      InputSplit split = splits[i];
218      Assert.assertTrue(split instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit);
219      if (localityEnabled) {
220        // When localityEnabled is true, meant to verify split.getLocations()
221        // by the following statement:
222        // Assert.assertTrue(split.getLocations() != null && split.getLocations().length != 0);
223        // However, getLocations() of some splits could return an empty array (length is 0),
224        // so drop the verification on length.
225        // TODO: investigate how to verify split.getLocations() when localityEnabled is true
226        Assert.assertTrue(split.getLocations() != null);
227      } else {
228        Assert.assertTrue(split.getLocations() != null && split.getLocations().length == 0);
229      }
230
231      // validate record reader
232      OutputCollector collector = mock(OutputCollector.class);
233      Reporter reporter = mock(Reporter.class);
234      RecordReader<ImmutableBytesWritable, Result> rr = tsif.getRecordReader(split, job, reporter);
235
236      // validate we can read all the data back
237      ImmutableBytesWritable key = rr.createKey();
238      Result value = rr.createValue();
239      while (rr.next(key, value)) {
240        verifyRowFromMap(key, value);
241        rowTracker.addRow(key.copyBytes());
242      }
243
244      rr.close();
245    }
246
247    // validate all rows are seen
248    rowTracker.validate();
249  }
250
251  @Override
252  protected void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName,
253    String snapshotName, Path tableDir, int numRegions, int numSplitsPerRegion,
254    int expectedNumSplits, boolean shutdownCluster) throws Exception {
255    doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir,
256      numRegions, numSplitsPerRegion, expectedNumSplits, shutdownCluster);
257  }
258
259  // this is also called by the IntegrationTestTableSnapshotInputFormat
260  public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName,
261    String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions,
262    int numSplitsPerRegion, int expectedNumSplits, boolean shutdownCluster) throws Exception {
263
264    // create the table and snapshot
265    createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions);
266
267    if (shutdownCluster) {
268      util.shutdownMiniHBaseCluster();
269    }
270
271    try {
272      // create the job
273      JobConf jobConf = new JobConf(util.getConfiguration());
274
275      jobConf.setJarByClass(util.getClass());
276      org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJarsForClasses(jobConf,
277        TestTableSnapshotInputFormat.class);
278
279      if (numSplitsPerRegion > 1) {
280        TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS,
281          TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, jobConf,
282          true, tableDir, new RegionSplitter.UniformSplit(), numSplitsPerRegion);
283      } else {
284        TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS,
285          TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, jobConf,
286          true, tableDir);
287      }
288
289      jobConf.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class);
290      jobConf.setNumReduceTasks(1);
291      jobConf.setOutputFormat(NullOutputFormat.class);
292
293      RunningJob job = JobClient.runJob(jobConf);
294      Assert.assertTrue(job.isSuccessful());
295    } finally {
296      if (!shutdownCluster) {
297        util.getAdmin().deleteSnapshot(snapshotName);
298        util.deleteTable(tableName);
299      }
300    }
301  }
302
303  @Ignore // Ignored in mapred package because it keeps failing but allowed in mapreduce package.
304  @Test
305  public void testWithMapReduceMultipleMappersPerRegion() throws Exception {
306    testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 5, 50, false);
307  }
308}