001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertEquals;
021
022import java.util.HashMap;
023import java.util.Map;
024import org.apache.hadoop.fs.FileStatus;
025import org.apache.hadoop.fs.FileSystem;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.HBaseClassTestRule;
028import org.apache.hadoop.hbase.HBaseTestingUtility;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.Put;
031import org.apache.hadoop.hbase.client.Table;
032import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
033import org.apache.hadoop.hbase.testclassification.LargeTests;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.apache.hadoop.io.MapFile;
036import org.junit.AfterClass;
037import org.junit.Assert;
038import org.junit.BeforeClass;
039import org.junit.ClassRule;
040import org.junit.Rule;
041import org.junit.Test;
042import org.junit.experimental.categories.Category;
043import org.junit.rules.TestName;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
048import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
049
050/**
051 * Basic test for the HashTable M/R tool
052 */
053@Category(LargeTests.class)
054public class TestHashTable {
055  @ClassRule
056  public static final HBaseClassTestRule CLASS_RULE =
057      HBaseClassTestRule.forClass(TestHashTable.class);
058
059  private static final Logger LOG = LoggerFactory.getLogger(TestHashTable.class);
060
061  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
062
063  @Rule
064  public TestName name = new TestName();
065
066  @BeforeClass
067  public static void beforeClass() throws Exception {
068    TEST_UTIL.startMiniCluster(3);
069  }
070
071  @AfterClass
072  public static void afterClass() throws Exception {
073    TEST_UTIL.shutdownMiniCluster();
074  }
075
076  @Test
077  public void testHashTable() throws Exception {
078    final TableName tableName = TableName.valueOf(name.getMethodName());
079    final byte[] family = Bytes.toBytes("family");
080    final byte[] column1 = Bytes.toBytes("c1");
081    final byte[] column2 = Bytes.toBytes("c2");
082    final byte[] column3 = Bytes.toBytes("c3");
083
084    int numRows = 100;
085    int numRegions = 10;
086    int numHashFiles = 3;
087
088    byte[][] splitRows = new byte[numRegions-1][];
089    for (int i = 1; i < numRegions; i++) {
090      splitRows[i-1] = Bytes.toBytes(numRows * i / numRegions);
091    }
092
093    long timestamp = 1430764183454L;
094    // put rows into the first table
095    Table t1 = TEST_UTIL.createTable(tableName, family, splitRows);
096    for (int i = 0; i < numRows; i++) {
097      Put p = new Put(Bytes.toBytes(i), timestamp);
098      p.addColumn(family, column1, column1);
099      p.addColumn(family, column2, column2);
100      p.addColumn(family, column3, column3);
101      t1.put(p);
102    }
103    t1.close();
104
105    HashTable hashTable = new HashTable(TEST_UTIL.getConfiguration());
106
107    Path testDir = TEST_UTIL.getDataTestDirOnTestFS(tableName.getNameAsString());
108
109    long batchSize = 300;
110    int code = hashTable.run(new String[] {
111      "--batchsize=" + batchSize,
112      "--numhashfiles=" + numHashFiles,
113      "--scanbatch=2",
114      tableName.getNameAsString(),
115      testDir.toString()
116    });
117    assertEquals("test job failed", 0, code);
118
119    FileSystem fs = TEST_UTIL.getTestFileSystem();
120
121    HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir);
122    assertEquals(tableName.getNameAsString(), tableHash.tableName);
123    assertEquals(batchSize, tableHash.batchSize);
124    assertEquals(numHashFiles, tableHash.numHashFiles);
125    assertEquals(numHashFiles - 1, tableHash.partitions.size());
126    for (ImmutableBytesWritable bytes : tableHash.partitions) {
127      LOG.debug("partition: " + Bytes.toInt(bytes.get()));
128    }
129
130    ImmutableMap<Integer, ImmutableBytesWritable> expectedHashes
131      = ImmutableMap.<Integer, ImmutableBytesWritable>builder()
132      .put(-1, new ImmutableBytesWritable(Bytes.fromHex("714cb10a9e3b5569852980edd8c6ca2f")))
133      .put(5, new ImmutableBytesWritable(Bytes.fromHex("28d961d9252ce8f8d44a07b38d3e1d96")))
134      .put(10, new ImmutableBytesWritable(Bytes.fromHex("f6bbc4a224d8fd929b783a92599eaffa")))
135      .put(15, new ImmutableBytesWritable(Bytes.fromHex("522deb5d97f73a414ecc11457be46881")))
136      .put(20, new ImmutableBytesWritable(Bytes.fromHex("b026f2611aaa46f7110116d807545352")))
137      .put(25, new ImmutableBytesWritable(Bytes.fromHex("39ffc1a3094aa12a2e90ffd9cef2ce93")))
138      .put(30, new ImmutableBytesWritable(Bytes.fromHex("f6b4d75727ce9a30ac29e4f08f601666")))
139      .put(35, new ImmutableBytesWritable(Bytes.fromHex("422e2d2f1eb79a8f02171a705a42c090")))
140      .put(40, new ImmutableBytesWritable(Bytes.fromHex("559ad61c900fffefea0a15abf8a97bc3")))
141      .put(45, new ImmutableBytesWritable(Bytes.fromHex("23019084513eca41cee436b2a29611cb")))
142      .put(50, new ImmutableBytesWritable(Bytes.fromHex("b40467d222ddb4949b142fe145ee9edc")))
143      .put(55, new ImmutableBytesWritable(Bytes.fromHex("372bf89fcd8ca4b7ab3c1add9d07f7e4")))
144      .put(60, new ImmutableBytesWritable(Bytes.fromHex("69ae0585e6255de27dce974e332b8f8b")))
145      .put(65, new ImmutableBytesWritable(Bytes.fromHex("8029610044297aad0abdbecd485d8e59")))
146      .put(70, new ImmutableBytesWritable(Bytes.fromHex("de5f784f7f78987b6e57ecfd81c8646f")))
147      .put(75, new ImmutableBytesWritable(Bytes.fromHex("1cd757cc4e1715c8c3b1c24447a1ec56")))
148      .put(80, new ImmutableBytesWritable(Bytes.fromHex("f9a53aacfeb6142b08066615e7038095")))
149      .put(85, new ImmutableBytesWritable(Bytes.fromHex("89b872b7e639df32d3276b33928c0c91")))
150      .put(90, new ImmutableBytesWritable(Bytes.fromHex("45eeac0646d46a474ea0484175faed38")))
151      .put(95, new ImmutableBytesWritable(Bytes.fromHex("f57c447e32a08f4bf1abb2892839ac56")))
152      .build();
153
154    Map<Integer, ImmutableBytesWritable> actualHashes = new HashMap<>();
155    Path dataDir = new Path(testDir, HashTable.HASH_DATA_DIR);
156    for (int i = 0; i < numHashFiles; i++) {
157      Path hashPath = new Path(dataDir, HashTable.TableHash.getDataFileName(i));
158
159      MapFile.Reader reader = new MapFile.Reader(hashPath, fs.getConf());
160      ImmutableBytesWritable key = new ImmutableBytesWritable();
161      ImmutableBytesWritable hash = new ImmutableBytesWritable();
162      while (reader.next(key, hash)) {
163        String keyString = Bytes.toHex(key.get(), key.getOffset(), key.getLength());
164        LOG.debug("Key: " + (keyString.isEmpty() ? "-1" : Integer.parseInt(keyString, 16))
165            + " Hash: " + Bytes.toHex(hash.get(), hash.getOffset(), hash.getLength()));
166
167        int intKey = -1;
168        if (key.getLength() > 0) {
169          intKey = Bytes.toInt(key.get(),  key.getOffset(), key.getLength());
170        }
171        if (actualHashes.containsKey(intKey)) {
172          Assert.fail("duplicate key in data files: " + intKey);
173        }
174        actualHashes.put(intKey, new ImmutableBytesWritable(hash.copyBytes()));
175      }
176      reader.close();
177    }
178
179    FileStatus[] files = fs.listStatus(testDir);
180    for (FileStatus file : files) {
181      LOG.debug("Output file: " + file.getPath());
182    }
183
184    files = fs.listStatus(dataDir);
185    for (FileStatus file : files) {
186      LOG.debug("Data file: " + file.getPath());
187    }
188
189    if (!expectedHashes.equals(actualHashes)) {
190      LOG.error("Diff: " + Maps.difference(expectedHashes, actualHashes));
191    }
192    Assert.assertEquals(expectedHashes, actualHashes);
193
194    TEST_UTIL.deleteTable(tableName);
195    TEST_UTIL.cleanupDataTestDirOnTestFS();
196  }
197}