001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.util.Set;
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.fs.Path;
023import org.apache.hadoop.hbase.HBaseConfiguration;
024import org.apache.hadoop.hbase.IntegrationTestBase;
025import org.apache.hadoop.hbase.IntegrationTestingUtility;
026import org.apache.hadoop.hbase.TableName;
027import org.apache.hadoop.hbase.testclassification.IntegrationTests;
028import org.apache.hadoop.hbase.util.Bytes;
029import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
030import org.apache.hadoop.util.ToolRunner;
031import org.junit.After;
032import org.junit.Before;
033import org.junit.experimental.categories.Category;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037/**
038 * An integration test to test {@link TableSnapshotInputFormat} which enables reading directly from
039 * snapshot files without going through hbase servers. This test creates a table and loads the table
040 * with the rows ranging from 'aaa' to 'zzz', and for each row, sets the columns f1:(null) and
041 * f2:(null) to be the the same as the row value.
042 *
043 * <pre>
044 * aaa, f1: =&gt; aaa
045 * aaa, f2: =&gt; aaa
046 * aab, f1: =&gt; aab
047 * ....
048 * zzz, f2: =&gt; zzz
049 * </pre>
050 *
051 * Then the test creates a snapshot from this table, and overrides the values in the original table
052 * with values 'after_snapshot_value'. The test, then runs a mapreduce job over the snapshot with a
053 * scan start row 'bbb' and stop row 'yyy'. The data is saved in a single reduce output file, and
054 * inspected later to verify that the MR job has seen all the values from the snapshot.
055 * <p>
056 * These parameters can be used to configure the job: <br>
057 * "IntegrationTestTableSnapshotInputFormat.table" =&gt; the name of the table <br>
058 * "IntegrationTestTableSnapshotInputFormat.snapshot" =&gt; the name of the snapshot <br>
059 * "IntegrationTestTableSnapshotInputFormat.numRegions" =&gt; number of regions in the table to be
060 * created (default, 32). <br>
061 * "IntegrationTestTableSnapshotInputFormat.tableDir" =&gt; temporary directory to restore the
062 * snapshot files
063 */
064@Category(IntegrationTests.class)
065// Not runnable as a unit test. See TestTableSnapshotInputFormat
066public class IntegrationTestTableSnapshotInputFormat extends IntegrationTestBase {
067  private static final Logger LOG =
068    LoggerFactory.getLogger(IntegrationTestTableSnapshotInputFormat.class);
069
070  private static final String TABLE_NAME_KEY = "IntegrationTestTableSnapshotInputFormat.table";
071  private static final String DEFAULT_TABLE_NAME = "IntegrationTestTableSnapshotInputFormat";
072
073  private static final String SNAPSHOT_NAME_KEY =
074    "IntegrationTestTableSnapshotInputFormat.snapshot";
075  private static final String NUM_REGIONS_KEY =
076    "IntegrationTestTableSnapshotInputFormat.numRegions";
077
078  private static final String MR_IMPLEMENTATION_KEY = "IntegrationTestTableSnapshotInputFormat.API";
079  private static final String MAPRED_IMPLEMENTATION = "mapred";
080  private static final String MAPREDUCE_IMPLEMENTATION = "mapreduce";
081
082  private static final int DEFAULT_NUM_REGIONS = 32;
083  private static final String TABLE_DIR_KEY = "IntegrationTestTableSnapshotInputFormat.tableDir";
084
085  private static final byte[] START_ROW = Bytes.toBytes("bbb");
086  private static final byte[] END_ROW = Bytes.toBytes("yyy");
087
088  // mapred API missing feature pairity with mapreduce. See comments in
089  // mapred.TestTableSnapshotInputFormat
090  private static final byte[] MAPRED_START_ROW = Bytes.toBytes("aaa");
091  private static final byte[] MAPRED_END_ROW = Bytes.toBytes("zz{"); // 'z' + 1 => '{'
092
093  @Override
094  public void setConf(Configuration conf) {
095    super.setConf(conf);
096    util = getTestingUtil(conf);
097  }
098
099  @Override
100  @Before
101  public void setUp() throws Exception {
102    super.setUp();
103    util = getTestingUtil(getConf());
104    util.initializeCluster(1);
105    this.setConf(util.getConfiguration());
106  }
107
108  @Override
109  @After
110  public void cleanUp() throws Exception {
111    util.restoreCluster();
112  }
113
114  @Override
115  public void setUpCluster() throws Exception {
116  }
117
118  @Override
119  public int runTestFromCommandLine() throws Exception {
120    Configuration conf = getConf();
121    TableName tableName = TableName.valueOf(conf.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME));
122    String snapshotName = conf.get(SNAPSHOT_NAME_KEY,
123      tableName.getQualifierAsString() + "_snapshot_" + EnvironmentEdgeManager.currentTime());
124    int numRegions = conf.getInt(NUM_REGIONS_KEY, DEFAULT_NUM_REGIONS);
125    String tableDirStr = conf.get(TABLE_DIR_KEY);
126    Path tableDir;
127    if (tableDirStr == null) {
128      tableDir = util.getDataTestDirOnTestFS(tableName.getQualifierAsString());
129    } else {
130      tableDir = new Path(tableDirStr);
131    }
132
133    final String mr = conf.get(MR_IMPLEMENTATION_KEY, MAPREDUCE_IMPLEMENTATION);
134    if (mr.equalsIgnoreCase(MAPREDUCE_IMPLEMENTATION)) {
135      /*
136       * We create the table using HBaseAdmin#createTable(), which will create the table with
137       * desired number of regions. We pass bbb as startKey and yyy as endKey, so if
138       * desiredNumRegions is > 2, we create regions empty - bbb and yyy - empty, and we create
139       * numRegions - 2 regions between bbb - yyy. The test uses a Scan with startRow bbb and endRow
140       * yyy, so, we expect the first and last region to be filtered out in the input format, and we
141       * expect numRegions - 2 splits between bbb and yyy.
142       */
143      LOG.debug("Running job with mapreduce API.");
144      int expectedNumSplits = numRegions > 2 ? numRegions - 2 : numRegions;
145
146      org.apache.hadoop.hbase.mapreduce.TestTableSnapshotInputFormat.doTestWithMapReduce(util,
147        tableName, snapshotName, START_ROW, END_ROW, tableDir, numRegions, 1, expectedNumSplits,
148        false);
149    } else if (mr.equalsIgnoreCase(MAPRED_IMPLEMENTATION)) {
150      /*
151       * Similar considerations to above. The difference is that mapred API does not support
152       * specifying start/end rows (or a scan object at all). Thus the omission of first and last
153       * regions are not performed. See comments in mapred.TestTableSnapshotInputFormat for details
154       * of how that test works around the problem. This feature should be added in follow-on work.
155       */
156      LOG.debug("Running job with mapred API.");
157      int expectedNumSplits = numRegions;
158
159      org.apache.hadoop.hbase.mapred.TestTableSnapshotInputFormat.doTestWithMapReduce(util,
160        tableName, snapshotName, MAPRED_START_ROW, MAPRED_END_ROW, tableDir, numRegions, 1,
161        expectedNumSplits, false);
162    } else {
163      throw new IllegalArgumentException("Unrecognized mapreduce implementation: " + mr + ".");
164    }
165
166    return 0;
167  }
168
169  @Override // Chaos Monkey is not intended to be run with this test
170  public TableName getTablename() {
171    return null;
172  }
173
174  @Override // Chaos Monkey is not inteded to be run with this test
175  protected Set<String> getColumnFamilies() {
176    return null;
177  }
178
179  public static void main(String[] args) throws Exception {
180    Configuration conf = HBaseConfiguration.create();
181    IntegrationTestingUtility.setUseDistributedCluster(conf);
182    int ret = ToolRunner.run(conf, new IntegrationTestTableSnapshotInputFormat(), args);
183    System.exit(ret);
184  }
185
186}