001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.fail;
022
023import java.util.HashMap;
024import java.util.Map;
025import org.apache.hadoop.fs.FileStatus;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.HBaseTestingUtil;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.Put;
031import org.apache.hadoop.hbase.client.Table;
032import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
033import org.apache.hadoop.hbase.testclassification.LargeTests;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.apache.hadoop.io.MapFile;
036import org.junit.jupiter.api.AfterAll;
037import org.junit.jupiter.api.BeforeAll;
038import org.junit.jupiter.api.Tag;
039import org.junit.jupiter.api.Test;
040import org.junit.jupiter.api.TestInfo;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
045import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
046
047/**
048 * Basic test for the HashTable M/R tool
049 */
050@Tag(LargeTests.TAG)
051public class TestHashTable {
052
053  private static final Logger LOG = LoggerFactory.getLogger(TestHashTable.class);
054
055  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
056
057  @BeforeAll
058  public static void beforeClass() throws Exception {
059    TEST_UTIL.startMiniCluster(3);
060  }
061
062  @AfterAll
063  public static void afterClass() throws Exception {
064    TEST_UTIL.shutdownMiniCluster();
065  }
066
067  @Test
068  public void testHashTable(TestInfo testInfo) throws Exception {
069    final TableName tableName = TableName.valueOf(testInfo.getTestMethod().get().getName());
070    final byte[] family = Bytes.toBytes("family");
071    final byte[] column1 = Bytes.toBytes("c1");
072    final byte[] column2 = Bytes.toBytes("c2");
073    final byte[] column3 = Bytes.toBytes("c3");
074
075    int numRows = 100;
076    int numRegions = 10;
077    int numHashFiles = 3;
078
079    byte[][] splitRows = new byte[numRegions - 1][];
080    for (int i = 1; i < numRegions; i++) {
081      splitRows[i - 1] = Bytes.toBytes(numRows * i / numRegions);
082    }
083
084    long timestamp = 1430764183454L;
085    // put rows into the first table
086    Table t1 = TEST_UTIL.createTable(tableName, family, splitRows);
087    for (int i = 0; i < numRows; i++) {
088      Put p = new Put(Bytes.toBytes(i), timestamp);
089      p.addColumn(family, column1, column1);
090      p.addColumn(family, column2, column2);
091      p.addColumn(family, column3, column3);
092      t1.put(p);
093    }
094    t1.close();
095
096    HashTable hashTable = new HashTable(TEST_UTIL.getConfiguration());
097
098    Path testDir = TEST_UTIL.getDataTestDirOnTestFS(tableName.getNameAsString());
099
100    long batchSize = 300;
101    int code =
102      hashTable.run(new String[] { "--batchsize=" + batchSize, "--numhashfiles=" + numHashFiles,
103        "--scanbatch=2", tableName.getNameAsString(), testDir.toString() });
104    assertEquals(0, code, "test job failed");
105
106    FileSystem fs = TEST_UTIL.getTestFileSystem();
107
108    HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir);
109    assertEquals(tableName.getNameAsString(), tableHash.tableName);
110    assertEquals(batchSize, tableHash.batchSize);
111    assertEquals(numHashFiles, tableHash.numHashFiles);
112    assertEquals(numHashFiles - 1, tableHash.partitions.size());
113    for (ImmutableBytesWritable bytes : tableHash.partitions) {
114      LOG.debug("partition: " + Bytes.toInt(bytes.get()));
115    }
116
117    ImmutableMap<Integer, ImmutableBytesWritable> expectedHashes =
118      ImmutableMap.<Integer, ImmutableBytesWritable> builder()
119        .put(-1, new ImmutableBytesWritable(Bytes.fromHex("714cb10a9e3b5569852980edd8c6ca2f")))
120        .put(5, new ImmutableBytesWritable(Bytes.fromHex("28d961d9252ce8f8d44a07b38d3e1d96")))
121        .put(10, new ImmutableBytesWritable(Bytes.fromHex("f6bbc4a224d8fd929b783a92599eaffa")))
122        .put(15, new ImmutableBytesWritable(Bytes.fromHex("522deb5d97f73a414ecc11457be46881")))
123        .put(20, new ImmutableBytesWritable(Bytes.fromHex("b026f2611aaa46f7110116d807545352")))
124        .put(25, new ImmutableBytesWritable(Bytes.fromHex("39ffc1a3094aa12a2e90ffd9cef2ce93")))
125        .put(30, new ImmutableBytesWritable(Bytes.fromHex("f6b4d75727ce9a30ac29e4f08f601666")))
126        .put(35, new ImmutableBytesWritable(Bytes.fromHex("422e2d2f1eb79a8f02171a705a42c090")))
127        .put(40, new ImmutableBytesWritable(Bytes.fromHex("559ad61c900fffefea0a15abf8a97bc3")))
128        .put(45, new ImmutableBytesWritable(Bytes.fromHex("23019084513eca41cee436b2a29611cb")))
129        .put(50, new ImmutableBytesWritable(Bytes.fromHex("b40467d222ddb4949b142fe145ee9edc")))
130        .put(55, new ImmutableBytesWritable(Bytes.fromHex("372bf89fcd8ca4b7ab3c1add9d07f7e4")))
131        .put(60, new ImmutableBytesWritable(Bytes.fromHex("69ae0585e6255de27dce974e332b8f8b")))
132        .put(65, new ImmutableBytesWritable(Bytes.fromHex("8029610044297aad0abdbecd485d8e59")))
133        .put(70, new ImmutableBytesWritable(Bytes.fromHex("de5f784f7f78987b6e57ecfd81c8646f")))
134        .put(75, new ImmutableBytesWritable(Bytes.fromHex("1cd757cc4e1715c8c3b1c24447a1ec56")))
135        .put(80, new ImmutableBytesWritable(Bytes.fromHex("f9a53aacfeb6142b08066615e7038095")))
136        .put(85, new ImmutableBytesWritable(Bytes.fromHex("89b872b7e639df32d3276b33928c0c91")))
137        .put(90, new ImmutableBytesWritable(Bytes.fromHex("45eeac0646d46a474ea0484175faed38")))
138        .put(95, new ImmutableBytesWritable(Bytes.fromHex("f57c447e32a08f4bf1abb2892839ac56")))
139        .build();
140
141    Map<Integer, ImmutableBytesWritable> actualHashes = new HashMap<>();
142    Path dataDir = new Path(testDir, HashTable.HASH_DATA_DIR);
143    for (int i = 0; i < numHashFiles; i++) {
144      Path hashPath = new Path(dataDir, HashTable.TableHash.getDataFileName(i));
145
146      MapFile.Reader reader = new MapFile.Reader(hashPath, fs.getConf());
147      ImmutableBytesWritable key = new ImmutableBytesWritable();
148      ImmutableBytesWritable hash = new ImmutableBytesWritable();
149      while (reader.next(key, hash)) {
150        String keyString = Bytes.toHex(key.get(), key.getOffset(), key.getLength());
151        LOG.debug("Key: " + (keyString.isEmpty() ? "-1" : Integer.parseInt(keyString, 16))
152          + " Hash: " + Bytes.toHex(hash.get(), hash.getOffset(), hash.getLength()));
153
154        int intKey = -1;
155        if (key.getLength() > 0) {
156          intKey = Bytes.toInt(key.get(), key.getOffset(), key.getLength());
157        }
158        if (actualHashes.containsKey(intKey)) {
159          fail("duplicate key in data files: " + intKey);
160        }
161        actualHashes.put(intKey, new ImmutableBytesWritable(hash.copyBytes()));
162      }
163      reader.close();
164    }
165
166    FileStatus[] files = fs.listStatus(testDir);
167    for (FileStatus file : files) {
168      LOG.debug("Output file: " + file.getPath());
169    }
170
171    files = fs.listStatus(dataDir);
172    for (FileStatus file : files) {
173      LOG.debug("Data file: " + file.getPath());
174    }
175
176    if (!expectedHashes.equals(actualHashes)) {
177      LOG.error("Diff: " + Maps.difference(expectedHashes, actualHashes));
178    }
179    assertEquals(expectedHashes, actualHashes);
180
181    TEST_UTIL.deleteTable(tableName);
182    TEST_UTIL.cleanupDataTestDirOnTestFS();
183  }
184}