001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapred;
019
020import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT;
021import static org.mockito.Mockito.mock;
022
023import java.io.IOException;
024import java.util.Iterator;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.HBaseClassTestRule;
027import org.apache.hadoop.hbase.HBaseTestingUtility;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.Result;
031import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
032import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatTestBase;
033import org.apache.hadoop.hbase.testclassification.LargeTests;
034import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.hbase.util.RegionSplitter;
037import org.apache.hadoop.io.NullWritable;
038import org.apache.hadoop.mapred.InputSplit;
039import org.apache.hadoop.mapred.JobClient;
040import org.apache.hadoop.mapred.JobConf;
041import org.apache.hadoop.mapred.MapReduceBase;
042import org.apache.hadoop.mapred.OutputCollector;
043import org.apache.hadoop.mapred.RecordReader;
044import org.apache.hadoop.mapred.Reducer;
045import org.apache.hadoop.mapred.Reporter;
046import org.apache.hadoop.mapred.RunningJob;
047import org.apache.hadoop.mapred.lib.NullOutputFormat;
048import org.junit.Assert;
049import org.junit.ClassRule;
050import org.junit.Ignore;
051import org.junit.Rule;
052import org.junit.Test;
053import org.junit.experimental.categories.Category;
054import org.junit.rules.TestName;
055
056@Category({VerySlowMapReduceTests.class, LargeTests.class})
057public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBase {
058
059  @ClassRule
060  public static final HBaseClassTestRule CLASS_RULE =
061      HBaseClassTestRule.forClass(TestTableSnapshotInputFormat.class);
062
063  private static final byte[] aaa = Bytes.toBytes("aaa");
064  private static final byte[] after_zzz = Bytes.toBytes("zz{"); // 'z' + 1 => '{'
065  private static final String COLUMNS =
066    Bytes.toString(FAMILIES[0]) + " " + Bytes.toString(FAMILIES[1]);
067
068  @Rule
069  public TestName name = new TestName();
070
071  @Override
072  protected byte[] getStartRow() {
073    return aaa;
074  }
075
076  @Override
077  protected byte[] getEndRow() {
078    return after_zzz;
079  }
080
081  static class TestTableSnapshotMapper extends MapReduceBase
082      implements TableMap<ImmutableBytesWritable, NullWritable> {
083    @Override
084    public void map(ImmutableBytesWritable key, Result value,
085        OutputCollector<ImmutableBytesWritable, NullWritable> collector, Reporter reporter)
086        throws IOException {
087      verifyRowFromMap(key, value);
088      collector.collect(key, NullWritable.get());
089    }
090  }
091
092  public static class TestTableSnapshotReducer extends MapReduceBase
093      implements Reducer<ImmutableBytesWritable, NullWritable, NullWritable, NullWritable> {
094    HBaseTestingUtility.SeenRowTracker rowTracker =
095      new HBaseTestingUtility.SeenRowTracker(aaa, after_zzz);
096
097    @Override
098    public void reduce(ImmutableBytesWritable key, Iterator<NullWritable> values,
099        OutputCollector<NullWritable, NullWritable> collector, Reporter reporter)
100        throws IOException {
101      rowTracker.addRow(key.get());
102    }
103
104    @Override
105    public void close() {
106      rowTracker.validate();
107    }
108  }
109
110  @Test
111  public void testInitTableSnapshotMapperJobConfig() throws Exception {
112    final TableName tableName = TableName.valueOf(name.getMethodName());
113    String snapshotName = "foo";
114
115    try {
116      createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1);
117      JobConf job = new JobConf(UTIL.getConfiguration());
118      Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
119
120      TableMapReduceUtil.initTableSnapshotMapJob(snapshotName,
121        COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
122        NullWritable.class, job, false, tmpTableDir);
123
124      // TODO: would be better to examine directly the cache instance that results from this
125      // config. Currently this is not possible because BlockCache initialization is static.
126      Assert.assertEquals(
127        "Snapshot job should be configured for default LruBlockCache.",
128        HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT,
129        job.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, -1), 0.01);
130      Assert.assertEquals(
131        "Snapshot job should not use BucketCache.",
132        0, job.getFloat("hbase.bucketcache.size", -1), 0.01);
133    } finally {
134      UTIL.getAdmin().deleteSnapshot(snapshotName);
135      UTIL.deleteTable(tableName);
136    }
137  }
138
139  // TODO: mapred does not support limiting input range by startrow, endrow.
140  // Thus the following tests must override parameterverification.
141
142  @Test
143  @Override
144  public void testWithMockedMapReduceMultiRegion() throws Exception {
145    testWithMockedMapReduce(
146        UTIL, "testWithMockedMapReduceMultiRegion", 10, 1, 10, true);
147        // It does not matter whether true or false is given to setLocalityEnabledTo,
148        // because it is not read in testWithMockedMapReduce().
149  }
150
151  @Test
152  @Override
153  public void testWithMapReduceMultiRegion() throws Exception {
154    testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 1, 10, false);
155  }
156
157  @Test
158  @Override
159  // run the MR job while HBase is offline
160  public void testWithMapReduceAndOfflineHBaseMultiRegion() throws Exception {
161    testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 1, 10, true);
162  }
163
164  @Override
165  public void testRestoreSnapshotDoesNotCreateBackRefLinksInit(TableName tableName,
166      String snapshotName, Path tmpTableDir) throws Exception {
167    JobConf job = new JobConf(UTIL.getConfiguration());
168    TableMapReduceUtil.initTableSnapshotMapJob(snapshotName,
169      COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
170      NullWritable.class, job, false, tmpTableDir);
171  }
172
173  @Override
174  protected void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName,
175      int numRegions, int numSplitsPerRegion, int expectedNumSplits, boolean setLocalityEnabledTo)
176      throws Exception {
177    final TableName tableName = TableName.valueOf(name.getMethodName());
178    try {
179      createTableAndSnapshot(
180        util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions);
181
182      JobConf job = new JobConf(util.getConfiguration());
183      // setLocalityEnabledTo is ignored no matter what is specified, so as to test the case that
184      // SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY is not explicitly specified
185      // and the default value is taken.
186      Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName);
187
188      if (numSplitsPerRegion > 1) {
189        TableMapReduceUtil.initTableSnapshotMapJob(snapshotName,
190                COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
191                NullWritable.class, job, false, tmpTableDir, new RegionSplitter.UniformSplit(),
192                numSplitsPerRegion);
193      } else {
194        TableMapReduceUtil.initTableSnapshotMapJob(snapshotName,
195                COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
196                NullWritable.class, job, false, tmpTableDir);
197      }
198
199      // mapred doesn't support start and end keys? o.O
200      verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow());
201
202    } finally {
203      util.getAdmin().deleteSnapshot(snapshotName);
204      util.deleteTable(tableName);
205    }
206  }
207
208  private void verifyWithMockedMapReduce(JobConf job, int numRegions, int expectedNumSplits,
209      byte[] startRow, byte[] stopRow) throws IOException, InterruptedException {
210    TableSnapshotInputFormat tsif = new TableSnapshotInputFormat();
211    InputSplit[] splits = tsif.getSplits(job, 0);
212
213    Assert.assertEquals(expectedNumSplits, splits.length);
214
215    HBaseTestingUtility.SeenRowTracker rowTracker =
216      new HBaseTestingUtility.SeenRowTracker(startRow, stopRow);
217
218    // SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY is not explicitly specified,
219    // so the default value is taken.
220    boolean localityEnabled = SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT;
221
222    for (int i = 0; i < splits.length; i++) {
223      // validate input split
224      InputSplit split = splits[i];
225      Assert.assertTrue(split instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit);
226      if (localityEnabled) {
227        // When localityEnabled is true, meant to verify split.getLocations()
228        // by the following statement:
229        //   Assert.assertTrue(split.getLocations() != null && split.getLocations().length != 0);
230        // However, getLocations() of some splits could return an empty array (length is 0),
231        // so drop the verification on length.
232        // TODO: investigate how to verify split.getLocations() when localityEnabled is true
233        Assert.assertTrue(split.getLocations() != null);
234      } else {
235        Assert.assertTrue(split.getLocations() != null && split.getLocations().length == 0);
236      }
237
238      // validate record reader
239      OutputCollector collector = mock(OutputCollector.class);
240      Reporter reporter = mock(Reporter.class);
241      RecordReader<ImmutableBytesWritable, Result> rr = tsif.getRecordReader(split, job, reporter);
242
243      // validate we can read all the data back
244      ImmutableBytesWritable key = rr.createKey();
245      Result value = rr.createValue();
246      while (rr.next(key, value)) {
247        verifyRowFromMap(key, value);
248        rowTracker.addRow(key.copyBytes());
249      }
250
251      rr.close();
252    }
253
254    // validate all rows are seen
255    rowTracker.validate();
256  }
257
258  @Override
259  protected void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName,
260      String snapshotName, Path tableDir, int numRegions, int numSplitsPerRegion,
261      int expectedNumSplits, boolean shutdownCluster) throws Exception {
262    doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir,
263      numRegions, numSplitsPerRegion, expectedNumSplits, shutdownCluster);
264  }
265
266  // this is also called by the IntegrationTestTableSnapshotInputFormat
267  public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName,
268      String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions,
269      int numSplitsPerRegion,int expectedNumSplits, boolean shutdownCluster) throws Exception {
270
271    //create the table and snapshot
272    createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions);
273
274    if (shutdownCluster) {
275      util.shutdownMiniHBaseCluster();
276    }
277
278    try {
279      // create the job
280      JobConf jobConf = new JobConf(util.getConfiguration());
281
282      jobConf.setJarByClass(util.getClass());
283      org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJarsForClasses(jobConf,
284        TestTableSnapshotInputFormat.class);
285
286      if(numSplitsPerRegion > 1) {
287        TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS,
288                TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
289                NullWritable.class, jobConf, true, tableDir, new RegionSplitter.UniformSplit(),
290                numSplitsPerRegion);
291      } else {
292        TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS,
293                TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
294                NullWritable.class, jobConf, true, tableDir);
295      }
296
297      jobConf.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class);
298      jobConf.setNumReduceTasks(1);
299      jobConf.setOutputFormat(NullOutputFormat.class);
300
301      RunningJob job = JobClient.runJob(jobConf);
302      Assert.assertTrue(job.isSuccessful());
303    } finally {
304      if (!shutdownCluster) {
305        util.getAdmin().deleteSnapshot(snapshotName);
306        util.deleteTable(tableName);
307      }
308    }
309  }
310
311  @Ignore // Ignored in mapred package because it keeps failing but allowed in mapreduce package.
312  @Test
313  public void testWithMapReduceMultipleMappersPerRegion() throws Exception {
314    testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 5, 50, false);
315  }
316}