001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021 022import java.util.HashMap; 023import java.util.Map; 024import org.apache.hadoop.fs.FileStatus; 025import org.apache.hadoop.fs.FileSystem; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.HBaseClassTestRule; 028import org.apache.hadoop.hbase.HBaseTestingUtility; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.client.Put; 031import org.apache.hadoop.hbase.client.Table; 032import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 033import org.apache.hadoop.hbase.testclassification.LargeTests; 034import org.apache.hadoop.hbase.util.Bytes; 035import org.apache.hadoop.io.MapFile; 036import org.junit.AfterClass; 037import org.junit.Assert; 038import org.junit.BeforeClass; 039import org.junit.ClassRule; 040import org.junit.Rule; 041import org.junit.Test; 042import org.junit.experimental.categories.Category; 043import org.junit.rules.TestName; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 048import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 049 050/** 051 * Basic test for the HashTable M/R tool 052 */ 053@Category(LargeTests.class) 054public class TestHashTable { 055 056 @ClassRule 057 public static final HBaseClassTestRule CLASS_RULE = 058 HBaseClassTestRule.forClass(TestHashTable.class); 059 060 private static final Logger LOG = LoggerFactory.getLogger(TestHashTable.class); 061 062 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 063 064 @Rule 065 public TestName name = new TestName(); 066 067 @BeforeClass 068 public static void beforeClass() throws Exception { 069 TEST_UTIL.startMiniCluster(3); 070 } 071 072 @AfterClass 073 public static void afterClass() throws Exception { 074 TEST_UTIL.shutdownMiniCluster(); 075 } 076 077 @Test 078 public void testHashTable() throws Exception { 079 final TableName tableName = TableName.valueOf(name.getMethodName()); 080 final byte[] family = Bytes.toBytes("family"); 081 final byte[] column1 = Bytes.toBytes("c1"); 082 final byte[] column2 = Bytes.toBytes("c2"); 083 final byte[] column3 = Bytes.toBytes("c3"); 084 085 int numRows = 100; 086 int numRegions = 10; 087 int numHashFiles = 3; 088 089 byte[][] splitRows = new byte[numRegions-1][]; 090 for (int i = 1; i < numRegions; i++) { 091 splitRows[i-1] = Bytes.toBytes(numRows * i / numRegions); 092 } 093 094 long timestamp = 1430764183454L; 095 // put rows into the first table 096 Table t1 = TEST_UTIL.createTable(tableName, family, splitRows); 097 for (int i = 0; i < numRows; i++) { 098 Put p = new Put(Bytes.toBytes(i), timestamp); 099 p.addColumn(family, column1, column1); 100 p.addColumn(family, column2, column2); 101 p.addColumn(family, column3, column3); 102 t1.put(p); 103 } 104 t1.close(); 105 106 HashTable hashTable = new HashTable(TEST_UTIL.getConfiguration()); 107 108 Path testDir = TEST_UTIL.getDataTestDirOnTestFS(tableName.getNameAsString()); 109 110 long batchSize = 300; 111 int code = hashTable.run(new String[] { 112 "--batchsize=" + batchSize, 113 "--numhashfiles=" + numHashFiles, 114 "--scanbatch=2", 115 tableName.getNameAsString(), 116 testDir.toString()}); 117 assertEquals("test job failed", 0, code); 118 119 FileSystem fs = TEST_UTIL.getTestFileSystem(); 120 121 HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir); 122 assertEquals(tableName.getNameAsString(), tableHash.tableName); 123 assertEquals(batchSize, tableHash.batchSize); 124 assertEquals(numHashFiles, tableHash.numHashFiles); 125 assertEquals(numHashFiles - 1, tableHash.partitions.size()); 126 for (ImmutableBytesWritable bytes : tableHash.partitions) { 127 LOG.debug("partition: " + Bytes.toInt(bytes.get())); 128 } 129 130 ImmutableMap<Integer, ImmutableBytesWritable> expectedHashes 131 = ImmutableMap.<Integer, ImmutableBytesWritable>builder() 132 .put(-1, new ImmutableBytesWritable(Bytes.fromHex("714cb10a9e3b5569852980edd8c6ca2f"))) 133 .put(5, new ImmutableBytesWritable(Bytes.fromHex("28d961d9252ce8f8d44a07b38d3e1d96"))) 134 .put(10, new ImmutableBytesWritable(Bytes.fromHex("f6bbc4a224d8fd929b783a92599eaffa"))) 135 .put(15, new ImmutableBytesWritable(Bytes.fromHex("522deb5d97f73a414ecc11457be46881"))) 136 .put(20, new ImmutableBytesWritable(Bytes.fromHex("b026f2611aaa46f7110116d807545352"))) 137 .put(25, new ImmutableBytesWritable(Bytes.fromHex("39ffc1a3094aa12a2e90ffd9cef2ce93"))) 138 .put(30, new ImmutableBytesWritable(Bytes.fromHex("f6b4d75727ce9a30ac29e4f08f601666"))) 139 .put(35, new ImmutableBytesWritable(Bytes.fromHex("422e2d2f1eb79a8f02171a705a42c090"))) 140 .put(40, new ImmutableBytesWritable(Bytes.fromHex("559ad61c900fffefea0a15abf8a97bc3"))) 141 .put(45, new ImmutableBytesWritable(Bytes.fromHex("23019084513eca41cee436b2a29611cb"))) 142 .put(50, new ImmutableBytesWritable(Bytes.fromHex("b40467d222ddb4949b142fe145ee9edc"))) 143 .put(55, new ImmutableBytesWritable(Bytes.fromHex("372bf89fcd8ca4b7ab3c1add9d07f7e4"))) 144 .put(60, new ImmutableBytesWritable(Bytes.fromHex("69ae0585e6255de27dce974e332b8f8b"))) 145 .put(65, new ImmutableBytesWritable(Bytes.fromHex("8029610044297aad0abdbecd485d8e59"))) 146 .put(70, new ImmutableBytesWritable(Bytes.fromHex("de5f784f7f78987b6e57ecfd81c8646f"))) 147 .put(75, new ImmutableBytesWritable(Bytes.fromHex("1cd757cc4e1715c8c3b1c24447a1ec56"))) 148 .put(80, new ImmutableBytesWritable(Bytes.fromHex("f9a53aacfeb6142b08066615e7038095"))) 149 .put(85, new ImmutableBytesWritable(Bytes.fromHex("89b872b7e639df32d3276b33928c0c91"))) 150 .put(90, new ImmutableBytesWritable(Bytes.fromHex("45eeac0646d46a474ea0484175faed38"))) 151 .put(95, new ImmutableBytesWritable(Bytes.fromHex("f57c447e32a08f4bf1abb2892839ac56"))) 152 .build(); 153 154 Map<Integer, ImmutableBytesWritable> actualHashes = new HashMap<>(); 155 Path dataDir = new Path(testDir, HashTable.HASH_DATA_DIR); 156 for (int i = 0; i < numHashFiles; i++) { 157 Path hashPath = new Path(dataDir, HashTable.TableHash.getDataFileName(i)); 158 159 MapFile.Reader reader = new MapFile.Reader(hashPath, fs.getConf()); 160 ImmutableBytesWritable key = new ImmutableBytesWritable(); 161 ImmutableBytesWritable hash = new ImmutableBytesWritable(); 162 while(reader.next(key, hash)) { 163 String keyString = Bytes.toHex(key.get(), key.getOffset(), key.getLength()); 164 LOG.debug("Key: " + (keyString.isEmpty() ? "-1" : Integer.parseInt(keyString, 16)) 165 + " Hash: " + Bytes.toHex(hash.get(), hash.getOffset(), hash.getLength())); 166 167 int intKey = -1; 168 if (key.getLength() > 0) { 169 intKey = Bytes.toInt(key.get(), key.getOffset(), key.getLength()); 170 } 171 if (actualHashes.containsKey(intKey)) { 172 Assert.fail("duplicate key in data files: " + intKey); 173 } 174 actualHashes.put(intKey, new ImmutableBytesWritable(hash.copyBytes())); 175 } 176 reader.close(); 177 } 178 179 FileStatus[] files = fs.listStatus(testDir); 180 for (FileStatus file : files) { 181 LOG.debug("Output file: " + file.getPath()); 182 } 183 184 files = fs.listStatus(dataDir); 185 for (FileStatus file : files) { 186 LOG.debug("Data file: " + file.getPath()); 187 } 188 189 if (!expectedHashes.equals(actualHashes)) { 190 LOG.error("Diff: " + Maps.difference(expectedHashes, actualHashes)); 191 } 192 Assert.assertEquals(expectedHashes, actualHashes); 193 194 TEST_UTIL.deleteTable(tableName); 195 TEST_UTIL.cleanupDataTestDirOnTestFS(); 196 } 197 198 199}