001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.fail; 022 023import java.io.OutputStream; 024import java.util.HashMap; 025import java.util.Map; 026import java.util.Properties; 027import org.apache.hadoop.fs.FileStatus; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.HBaseTestingUtil; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.Put; 033import org.apache.hadoop.hbase.client.Table; 034import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 035import org.apache.hadoop.hbase.testclassification.LargeTests; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.io.MapFile; 038import org.junit.jupiter.api.AfterAll; 039import org.junit.jupiter.api.BeforeAll; 040import org.junit.jupiter.api.Tag; 041import org.junit.jupiter.api.Test; 042import org.junit.jupiter.api.TestInfo; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 047import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 048 049/** 050 * Basic test for the HashTable M/R tool 051 */ 052@Tag(LargeTests.TAG) 053public class TestHashTable { 054 055 private static final Logger LOG = LoggerFactory.getLogger(TestHashTable.class); 056 057 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 058 059 @BeforeAll 060 public static void beforeClass() throws Exception { 061 TEST_UTIL.startMiniCluster(3); 062 } 063 064 @AfterAll 065 public static void afterClass() throws Exception { 066 TEST_UTIL.shutdownMiniCluster(); 067 } 068 069 @Test 070 public void testHashTable(TestInfo testInfo) throws Exception { 071 final TableName tableName = TableName.valueOf(testInfo.getTestMethod().get().getName()); 072 final byte[] family = Bytes.toBytes("family"); 073 final byte[] column1 = Bytes.toBytes("c1"); 074 final byte[] column2 = Bytes.toBytes("c2"); 075 final byte[] column3 = Bytes.toBytes("c3"); 076 077 int numRows = 100; 078 int numRegions = 10; 079 int numHashFiles = 3; 080 081 byte[][] splitRows = new byte[numRegions - 1][]; 082 for (int i = 1; i < numRegions; i++) { 083 splitRows[i - 1] = Bytes.toBytes(numRows * i / numRegions); 084 } 085 086 long timestamp = 1430764183454L; 087 // put rows into the first table 088 Table t1 = TEST_UTIL.createTable(tableName, family, splitRows); 089 for (int i = 0; i < numRows; i++) { 090 Put p = new Put(Bytes.toBytes(i), timestamp); 091 p.addColumn(family, column1, column1); 092 p.addColumn(family, column2, column2); 093 p.addColumn(family, column3, column3); 094 t1.put(p); 095 } 096 t1.close(); 097 098 HashTable hashTable = new HashTable(TEST_UTIL.getConfiguration()); 099 100 Path testDir = TEST_UTIL.getDataTestDirOnTestFS(tableName.getNameAsString()); 101 102 long batchSize = 300; 103 int code = 104 hashTable.run(new String[] { "--batchsize=" + batchSize, "--numhashfiles=" + numHashFiles, 105 "--scanbatch=2", tableName.getNameAsString(), testDir.toString() }); 106 assertEquals(0, code, "test job failed"); 107 108 FileSystem fs = TEST_UTIL.getTestFileSystem(); 109 110 HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir); 111 assertEquals(tableName.getNameAsString(), tableHash.tableName); 112 assertEquals(batchSize, tableHash.batchSize); 113 assertEquals(numHashFiles, tableHash.numHashFiles); 114 assertEquals(numHashFiles - 1, tableHash.partitions.size()); 115 for (ImmutableBytesWritable bytes : tableHash.partitions) { 116 LOG.debug("partition: " + Bytes.toInt(bytes.get())); 117 } 118 119 ImmutableMap<Integer, ImmutableBytesWritable> expectedHashes = 120 ImmutableMap.<Integer, ImmutableBytesWritable> builder() 121 .put(-1, new ImmutableBytesWritable(Bytes.fromHex("714cb10a9e3b5569852980edd8c6ca2f"))) 122 .put(5, new ImmutableBytesWritable(Bytes.fromHex("28d961d9252ce8f8d44a07b38d3e1d96"))) 123 .put(10, new ImmutableBytesWritable(Bytes.fromHex("f6bbc4a224d8fd929b783a92599eaffa"))) 124 .put(15, new ImmutableBytesWritable(Bytes.fromHex("522deb5d97f73a414ecc11457be46881"))) 125 .put(20, new ImmutableBytesWritable(Bytes.fromHex("b026f2611aaa46f7110116d807545352"))) 126 .put(25, new ImmutableBytesWritable(Bytes.fromHex("39ffc1a3094aa12a2e90ffd9cef2ce93"))) 127 .put(30, new ImmutableBytesWritable(Bytes.fromHex("f6b4d75727ce9a30ac29e4f08f601666"))) 128 .put(35, new ImmutableBytesWritable(Bytes.fromHex("422e2d2f1eb79a8f02171a705a42c090"))) 129 .put(40, new ImmutableBytesWritable(Bytes.fromHex("559ad61c900fffefea0a15abf8a97bc3"))) 130 .put(45, new ImmutableBytesWritable(Bytes.fromHex("23019084513eca41cee436b2a29611cb"))) 131 .put(50, new ImmutableBytesWritable(Bytes.fromHex("b40467d222ddb4949b142fe145ee9edc"))) 132 .put(55, new ImmutableBytesWritable(Bytes.fromHex("372bf89fcd8ca4b7ab3c1add9d07f7e4"))) 133 .put(60, new ImmutableBytesWritable(Bytes.fromHex("69ae0585e6255de27dce974e332b8f8b"))) 134 .put(65, new ImmutableBytesWritable(Bytes.fromHex("8029610044297aad0abdbecd485d8e59"))) 135 .put(70, new ImmutableBytesWritable(Bytes.fromHex("de5f784f7f78987b6e57ecfd81c8646f"))) 136 .put(75, new ImmutableBytesWritable(Bytes.fromHex("1cd757cc4e1715c8c3b1c24447a1ec56"))) 137 .put(80, new ImmutableBytesWritable(Bytes.fromHex("f9a53aacfeb6142b08066615e7038095"))) 138 .put(85, new ImmutableBytesWritable(Bytes.fromHex("89b872b7e639df32d3276b33928c0c91"))) 139 .put(90, new ImmutableBytesWritable(Bytes.fromHex("45eeac0646d46a474ea0484175faed38"))) 140 .put(95, new ImmutableBytesWritable(Bytes.fromHex("f57c447e32a08f4bf1abb2892839ac56"))) 141 .build(); 142 143 Map<Integer, ImmutableBytesWritable> actualHashes = new HashMap<>(); 144 Path dataDir = new Path(testDir, HashTable.HASH_DATA_DIR); 145 for (int i = 0; i < numHashFiles; i++) { 146 Path hashPath = new Path(dataDir, HashTable.TableHash.getDataFileName(i)); 147 148 MapFile.Reader reader = new MapFile.Reader(hashPath, fs.getConf()); 149 ImmutableBytesWritable key = new ImmutableBytesWritable(); 150 ImmutableBytesWritable hash = new ImmutableBytesWritable(); 151 while (reader.next(key, hash)) { 152 String keyString = Bytes.toHex(key.get(), key.getOffset(), key.getLength()); 153 LOG.debug("Key: " + (keyString.isEmpty() ? "-1" : Integer.parseInt(keyString, 16)) 154 + " Hash: " + Bytes.toHex(hash.get(), hash.getOffset(), hash.getLength())); 155 156 int intKey = -1; 157 if (key.getLength() > 0) { 158 intKey = Bytes.toInt(key.get(), key.getOffset(), key.getLength()); 159 } 160 if (actualHashes.containsKey(intKey)) { 161 fail("duplicate key in data files: " + intKey); 162 } 163 actualHashes.put(intKey, new ImmutableBytesWritable(hash.copyBytes())); 164 } 165 reader.close(); 166 } 167 168 FileStatus[] files = fs.listStatus(testDir); 169 for (FileStatus file : files) { 170 LOG.debug("Output file: " + file.getPath()); 171 } 172 173 files = fs.listStatus(dataDir); 174 for (FileStatus file : files) { 175 LOG.debug("Data file: " + file.getPath()); 176 } 177 178 if (!expectedHashes.equals(actualHashes)) { 179 LOG.error("Diff: " + Maps.difference(expectedHashes, actualHashes)); 180 } 181 assertEquals(expectedHashes, actualHashes); 182 183 TEST_UTIL.deleteTable(tableName); 184 TEST_UTIL.cleanupDataTestDirOnTestFS(); 185 } 186 187 @Test 188 public void testHashTableWithSha256(TestInfo testInfo) throws Exception { 189 final TableName tableName = TableName.valueOf(testInfo.getTestMethod().get().getName()); 190 final byte[] family = Bytes.toBytes("family"); 191 final byte[] column1 = Bytes.toBytes("c1"); 192 final byte[] column2 = Bytes.toBytes("c2"); 193 final byte[] column3 = Bytes.toBytes("c3"); 194 195 int numRows = 100; 196 int numRegions = 10; 197 int numHashFiles = 3; 198 199 byte[][] splitRows = new byte[numRegions - 1][]; 200 for (int i = 1; i < numRegions; i++) { 201 splitRows[i - 1] = Bytes.toBytes(numRows * i / numRegions); 202 } 203 204 long timestamp = 1430764183454L; 205 Table t1 = TEST_UTIL.createTable(tableName, family, splitRows); 206 for (int i = 0; i < numRows; i++) { 207 Put p = new Put(Bytes.toBytes(i), timestamp); 208 p.addColumn(family, column1, column1); 209 p.addColumn(family, column2, column2); 210 p.addColumn(family, column3, column3); 211 t1.put(p); 212 } 213 t1.close(); 214 215 HashTable hashTable = new HashTable(TEST_UTIL.getConfiguration()); 216 Path testDir = TEST_UTIL.getDataTestDirOnTestFS(tableName.getNameAsString()); 217 218 long batchSize = 300; 219 int code = hashTable.run( 220 new String[] { "--batchsize=" + batchSize, "--numhashfiles=" + numHashFiles, "--scanbatch=2", 221 "--hashAlgorithm=SHA-256", tableName.getNameAsString(), testDir.toString() }); 222 assertEquals(0, code, "test job failed"); 223 224 FileSystem fs = TEST_UTIL.getTestFileSystem(); 225 HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir); 226 assertEquals("SHA-256", tableHash.hashAlgorithm, 227 "manifest must record the algorithm used to produce the digests"); 228 229 ImmutableMap<Integer, ImmutableBytesWritable> expectedHashes = 230 ImmutableMap.<Integer, ImmutableBytesWritable> builder() 231 .put(-1, 232 new ImmutableBytesWritable( 233 Bytes.fromHex("e1452041ec73beb9b5677c0b74ed73a9118ca502d9b2b9abe62ba18d92fc51be"))) 234 .put(5, 235 new ImmutableBytesWritable( 236 Bytes.fromHex("6c55999354be6571a8912b7781d37359e88153173cb5fcf143211682d7ac06c5"))) 237 .put(10, 238 new ImmutableBytesWritable( 239 Bytes.fromHex("44e9c6aedd838e9a9c78d1983ce5139fda3ef3b0b8a3812893512e7c6f05c51f"))) 240 .put(15, 241 new ImmutableBytesWritable( 242 Bytes.fromHex("57f18d831a22057155fb9cfb9bf4706a1c2cf134fc1e92262c8748ca545b58a1"))) 243 .put(20, 244 new ImmutableBytesWritable( 245 Bytes.fromHex("83119dfb3deec8901f69cccac31c1039c624870de0cff4b85946147359966d42"))) 246 .put(25, 247 new ImmutableBytesWritable( 248 Bytes.fromHex("933eabcc837b7e7b24be2b553b1ec50fb00e95cbf3d8f898f59f5f8bbace0a60"))) 249 .put(30, 250 new ImmutableBytesWritable( 251 Bytes.fromHex("b6a3752581d74f362f64b59d56a96ad52763b3245dd5bfc85f6fe9261f2d03f1"))) 252 .put(35, 253 new ImmutableBytesWritable( 254 Bytes.fromHex("d3784bac940584dbc0754eff73bc39cce4f9c4aec87939747fff4b0ecc6a0617"))) 255 .put(40, 256 new ImmutableBytesWritable( 257 Bytes.fromHex("87f4b810b751abd64e9c22cb7b40b5ce600965e4b8eda2c0eae075d5623088c2"))) 258 .put(45, 259 new ImmutableBytesWritable( 260 Bytes.fromHex("ce1f422fcdbe0f926e10b68cb3ead497066560235a1341d29151a9e1847deaab"))) 261 .put(50, 262 new ImmutableBytesWritable( 263 Bytes.fromHex("118c771b1eeabe8523f1ad96fb5bf16537d76e0b3855d84c3dbac864de726229"))) 264 .put(55, 265 new ImmutableBytesWritable( 266 Bytes.fromHex("00dfe840a275aca3de9268ea61699881a441d47fea93071bca69c39bf7845dac"))) 267 .put(60, 268 new ImmutableBytesWritable( 269 Bytes.fromHex("062239ede0306fd9046eb5a3a2f66d997b37c8c1a4defc35789644e66930fff1"))) 270 .put(65, 271 new ImmutableBytesWritable( 272 Bytes.fromHex("09a63a94681e75edf975f9b46fe94f1e592840a627cac728a77728b7f9f695aa"))) 273 .put(70, 274 new ImmutableBytesWritable( 275 Bytes.fromHex("e634097804d269cbaeef49ce7a009a1388e6f636700badcab05fe20759f6043f"))) 276 .put(75, 277 new ImmutableBytesWritable( 278 Bytes.fromHex("69f614ccc16a9c651538681525be1b2e40859c9833a55d9009d77ef39abaffcd"))) 279 .put(80, 280 new ImmutableBytesWritable( 281 Bytes.fromHex("6530b957c8064fc043620bee89647960de0d27a0f986b40f183f5347093a12d2"))) 282 .put(85, 283 new ImmutableBytesWritable( 284 Bytes.fromHex("403ed0417cd8ab955cbd4c8fe84218cd152b95da9237300050e9b7c90c809faf"))) 285 .put(90, 286 new ImmutableBytesWritable( 287 Bytes.fromHex("e27fb9193ae3363fec70a148e62df7c57d514dd7de74a6a332fbda002af67efb"))) 288 .put(95, 289 new ImmutableBytesWritable( 290 Bytes.fromHex("a31cb9d55e37f17c773a6eee757f15d6d7fe52d77cd1037fcb7ee00ed2bef6c9"))) 291 .build(); 292 293 Map<Integer, ImmutableBytesWritable> actualHashes = new HashMap<>(); 294 Path dataDir = new Path(testDir, HashTable.HASH_DATA_DIR); 295 for (int i = 0; i < numHashFiles; i++) { 296 Path hashPath = new Path(dataDir, HashTable.TableHash.getDataFileName(i)); 297 try (MapFile.Reader reader = new MapFile.Reader(hashPath, fs.getConf())) { 298 ImmutableBytesWritable key = new ImmutableBytesWritable(); 299 ImmutableBytesWritable hash = new ImmutableBytesWritable(); 300 while (reader.next(key, hash)) { 301 int intKey = -1; 302 if (key.getLength() > 0) { 303 intKey = Bytes.toInt(key.get(), key.getOffset(), key.getLength()); 304 } 305 if (actualHashes.containsKey(intKey)) { 306 fail("duplicate key in data files: " + intKey); 307 } 308 actualHashes.put(intKey, new ImmutableBytesWritable(hash.copyBytes())); 309 } 310 } 311 } 312 313 if (!expectedHashes.equals(actualHashes)) { 314 LOG.error("Diff: " + Maps.difference(expectedHashes, actualHashes)); 315 } 316 assertEquals(expectedHashes, actualHashes); 317 318 TEST_UTIL.deleteTable(tableName); 319 TEST_UTIL.cleanupDataTestDirOnTestFS(); 320 } 321 322 /** 323 * A manifest written by an older HashTable does not carry the hashAlgorithm property. Reading 324 * such a manifest must default to MD5 so existing on-disk hash data stays usable. 325 */ 326 @Test 327 public void testManifestWithoutAlgorithmDefaultsToMd5(TestInfo testInfo) throws Exception { 328 Path testDir = 329 TEST_UTIL.getDataTestDirOnTestFS(testInfo.getTestMethod().get().getName() + "_legacy"); 330 FileSystem fs = TEST_UTIL.getTestFileSystem(); 331 fs.mkdirs(testDir); 332 333 // hand-craft a legacy manifest with no hashAlgorithm property 334 Properties p = new Properties(); 335 p.setProperty("table", "legacy"); 336 p.setProperty("targetBatchSize", "8000"); 337 p.setProperty("numHashFiles", "1"); 338 p.setProperty("rawScan", "false"); 339 Path manifest = new Path(testDir, HashTable.MANIFEST_FILE_NAME); 340 try (OutputStream out = fs.create(manifest)) { 341 p.store(out, null); 342 } 343 344 // write an empty partitions file so TableHash.read() succeeds 345 HashTable.TableHash empty = new HashTable.TableHash(); 346 empty.partitions = new java.util.ArrayList<>(); 347 empty.writePartitionFile(fs.getConf(), new Path(testDir, HashTable.PARTITIONS_FILE_NAME)); 348 349 HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir); 350 assertEquals(HashTable.DEFAULT_HASH_ALGORITHM, tableHash.hashAlgorithm, 351 "Manifests without an algorithm property must default to MD5 for back-compat"); 352 353 TEST_UTIL.cleanupDataTestDirOnTestFS(); 354 } 355}