001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.fail; 022 023import java.util.HashMap; 024import java.util.Map; 025import org.apache.hadoop.fs.FileStatus; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HBaseTestingUtil; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.client.Put; 031import org.apache.hadoop.hbase.client.Table; 032import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 033import org.apache.hadoop.hbase.testclassification.LargeTests; 034import org.apache.hadoop.hbase.util.Bytes; 035import org.apache.hadoop.io.MapFile; 036import org.junit.jupiter.api.AfterAll; 037import org.junit.jupiter.api.BeforeAll; 038import org.junit.jupiter.api.Tag; 039import org.junit.jupiter.api.Test; 040import org.junit.jupiter.api.TestInfo; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 045import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 046 047/** 048 * Basic test for the HashTable M/R tool 049 */ 050@Tag(LargeTests.TAG) 051public class TestHashTable { 052 053 private static final Logger LOG = LoggerFactory.getLogger(TestHashTable.class); 054 055 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 056 057 @BeforeAll 058 public static void beforeClass() throws Exception { 059 TEST_UTIL.startMiniCluster(3); 060 } 061 062 @AfterAll 063 public static void afterClass() throws Exception { 064 TEST_UTIL.shutdownMiniCluster(); 065 } 066 067 @Test 068 public void testHashTable(TestInfo testInfo) throws Exception { 069 final TableName tableName = TableName.valueOf(testInfo.getTestMethod().get().getName()); 070 final byte[] family = Bytes.toBytes("family"); 071 final byte[] column1 = Bytes.toBytes("c1"); 072 final byte[] column2 = Bytes.toBytes("c2"); 073 final byte[] column3 = Bytes.toBytes("c3"); 074 075 int numRows = 100; 076 int numRegions = 10; 077 int numHashFiles = 3; 078 079 byte[][] splitRows = new byte[numRegions - 1][]; 080 for (int i = 1; i < numRegions; i++) { 081 splitRows[i - 1] = Bytes.toBytes(numRows * i / numRegions); 082 } 083 084 long timestamp = 1430764183454L; 085 // put rows into the first table 086 Table t1 = TEST_UTIL.createTable(tableName, family, splitRows); 087 for (int i = 0; i < numRows; i++) { 088 Put p = new Put(Bytes.toBytes(i), timestamp); 089 p.addColumn(family, column1, column1); 090 p.addColumn(family, column2, column2); 091 p.addColumn(family, column3, column3); 092 t1.put(p); 093 } 094 t1.close(); 095 096 HashTable hashTable = new HashTable(TEST_UTIL.getConfiguration()); 097 098 Path testDir = TEST_UTIL.getDataTestDirOnTestFS(tableName.getNameAsString()); 099 100 long batchSize = 300; 101 int code = 102 hashTable.run(new String[] { "--batchsize=" + batchSize, "--numhashfiles=" + numHashFiles, 103 "--scanbatch=2", tableName.getNameAsString(), testDir.toString() }); 104 assertEquals(0, code, "test job failed"); 105 106 FileSystem fs = TEST_UTIL.getTestFileSystem(); 107 108 HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir); 109 assertEquals(tableName.getNameAsString(), tableHash.tableName); 110 assertEquals(batchSize, tableHash.batchSize); 111 assertEquals(numHashFiles, tableHash.numHashFiles); 112 assertEquals(numHashFiles - 1, tableHash.partitions.size()); 113 for (ImmutableBytesWritable bytes : tableHash.partitions) { 114 LOG.debug("partition: " + Bytes.toInt(bytes.get())); 115 } 116 117 ImmutableMap<Integer, ImmutableBytesWritable> expectedHashes = 118 ImmutableMap.<Integer, ImmutableBytesWritable> builder() 119 .put(-1, new ImmutableBytesWritable(Bytes.fromHex("714cb10a9e3b5569852980edd8c6ca2f"))) 120 .put(5, new ImmutableBytesWritable(Bytes.fromHex("28d961d9252ce8f8d44a07b38d3e1d96"))) 121 .put(10, new ImmutableBytesWritable(Bytes.fromHex("f6bbc4a224d8fd929b783a92599eaffa"))) 122 .put(15, new ImmutableBytesWritable(Bytes.fromHex("522deb5d97f73a414ecc11457be46881"))) 123 .put(20, new ImmutableBytesWritable(Bytes.fromHex("b026f2611aaa46f7110116d807545352"))) 124 .put(25, new ImmutableBytesWritable(Bytes.fromHex("39ffc1a3094aa12a2e90ffd9cef2ce93"))) 125 .put(30, new ImmutableBytesWritable(Bytes.fromHex("f6b4d75727ce9a30ac29e4f08f601666"))) 126 .put(35, new ImmutableBytesWritable(Bytes.fromHex("422e2d2f1eb79a8f02171a705a42c090"))) 127 .put(40, new ImmutableBytesWritable(Bytes.fromHex("559ad61c900fffefea0a15abf8a97bc3"))) 128 .put(45, new ImmutableBytesWritable(Bytes.fromHex("23019084513eca41cee436b2a29611cb"))) 129 .put(50, new ImmutableBytesWritable(Bytes.fromHex("b40467d222ddb4949b142fe145ee9edc"))) 130 .put(55, new ImmutableBytesWritable(Bytes.fromHex("372bf89fcd8ca4b7ab3c1add9d07f7e4"))) 131 .put(60, new ImmutableBytesWritable(Bytes.fromHex("69ae0585e6255de27dce974e332b8f8b"))) 132 .put(65, new ImmutableBytesWritable(Bytes.fromHex("8029610044297aad0abdbecd485d8e59"))) 133 .put(70, new ImmutableBytesWritable(Bytes.fromHex("de5f784f7f78987b6e57ecfd81c8646f"))) 134 .put(75, new ImmutableBytesWritable(Bytes.fromHex("1cd757cc4e1715c8c3b1c24447a1ec56"))) 135 .put(80, new ImmutableBytesWritable(Bytes.fromHex("f9a53aacfeb6142b08066615e7038095"))) 136 .put(85, new ImmutableBytesWritable(Bytes.fromHex("89b872b7e639df32d3276b33928c0c91"))) 137 .put(90, new ImmutableBytesWritable(Bytes.fromHex("45eeac0646d46a474ea0484175faed38"))) 138 .put(95, new ImmutableBytesWritable(Bytes.fromHex("f57c447e32a08f4bf1abb2892839ac56"))) 139 .build(); 140 141 Map<Integer, ImmutableBytesWritable> actualHashes = new HashMap<>(); 142 Path dataDir = new Path(testDir, HashTable.HASH_DATA_DIR); 143 for (int i = 0; i < numHashFiles; i++) { 144 Path hashPath = new Path(dataDir, HashTable.TableHash.getDataFileName(i)); 145 146 MapFile.Reader reader = new MapFile.Reader(hashPath, fs.getConf()); 147 ImmutableBytesWritable key = new ImmutableBytesWritable(); 148 ImmutableBytesWritable hash = new ImmutableBytesWritable(); 149 while (reader.next(key, hash)) { 150 String keyString = Bytes.toHex(key.get(), key.getOffset(), key.getLength()); 151 LOG.debug("Key: " + (keyString.isEmpty() ? "-1" : Integer.parseInt(keyString, 16)) 152 + " Hash: " + Bytes.toHex(hash.get(), hash.getOffset(), hash.getLength())); 153 154 int intKey = -1; 155 if (key.getLength() > 0) { 156 intKey = Bytes.toInt(key.get(), key.getOffset(), key.getLength()); 157 } 158 if (actualHashes.containsKey(intKey)) { 159 fail("duplicate key in data files: " + intKey); 160 } 161 actualHashes.put(intKey, new ImmutableBytesWritable(hash.copyBytes())); 162 } 163 reader.close(); 164 } 165 166 FileStatus[] files = fs.listStatus(testDir); 167 for (FileStatus file : files) { 168 LOG.debug("Output file: " + file.getPath()); 169 } 170 171 files = fs.listStatus(dataDir); 172 for (FileStatus file : files) { 173 LOG.debug("Data file: " + file.getPath()); 174 } 175 176 if (!expectedHashes.equals(actualHashes)) { 177 LOG.error("Diff: " + Maps.difference(expectedHashes, actualHashes)); 178 } 179 assertEquals(expectedHashes, actualHashes); 180 181 TEST_UTIL.deleteTable(tableName); 182 TEST_UTIL.cleanupDataTestDirOnTestFS(); 183 } 184}