001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.fail;
022
023import java.io.OutputStream;
024import java.util.HashMap;
025import java.util.Map;
026import java.util.Properties;
027import org.apache.hadoop.fs.FileStatus;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.HBaseTestingUtil;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.Put;
033import org.apache.hadoop.hbase.client.Table;
034import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
035import org.apache.hadoop.hbase.testclassification.LargeTests;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.io.MapFile;
038import org.junit.jupiter.api.AfterAll;
039import org.junit.jupiter.api.BeforeAll;
040import org.junit.jupiter.api.Tag;
041import org.junit.jupiter.api.Test;
042import org.junit.jupiter.api.TestInfo;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
047import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
048
049/**
050 * Basic test for the HashTable M/R tool
051 */
052@Tag(LargeTests.TAG)
053public class TestHashTable {
054
055  private static final Logger LOG = LoggerFactory.getLogger(TestHashTable.class);
056
057  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
058
059  @BeforeAll
060  public static void beforeClass() throws Exception {
061    TEST_UTIL.startMiniCluster(3);
062  }
063
064  @AfterAll
065  public static void afterClass() throws Exception {
066    TEST_UTIL.shutdownMiniCluster();
067  }
068
069  @Test
070  public void testHashTable(TestInfo testInfo) throws Exception {
071    final TableName tableName = TableName.valueOf(testInfo.getTestMethod().get().getName());
072    final byte[] family = Bytes.toBytes("family");
073    final byte[] column1 = Bytes.toBytes("c1");
074    final byte[] column2 = Bytes.toBytes("c2");
075    final byte[] column3 = Bytes.toBytes("c3");
076
077    int numRows = 100;
078    int numRegions = 10;
079    int numHashFiles = 3;
080
081    byte[][] splitRows = new byte[numRegions - 1][];
082    for (int i = 1; i < numRegions; i++) {
083      splitRows[i - 1] = Bytes.toBytes(numRows * i / numRegions);
084    }
085
086    long timestamp = 1430764183454L;
087    // put rows into the first table
088    Table t1 = TEST_UTIL.createTable(tableName, family, splitRows);
089    for (int i = 0; i < numRows; i++) {
090      Put p = new Put(Bytes.toBytes(i), timestamp);
091      p.addColumn(family, column1, column1);
092      p.addColumn(family, column2, column2);
093      p.addColumn(family, column3, column3);
094      t1.put(p);
095    }
096    t1.close();
097
098    HashTable hashTable = new HashTable(TEST_UTIL.getConfiguration());
099
100    Path testDir = TEST_UTIL.getDataTestDirOnTestFS(tableName.getNameAsString());
101
102    long batchSize = 300;
103    int code =
104      hashTable.run(new String[] { "--batchsize=" + batchSize, "--numhashfiles=" + numHashFiles,
105        "--scanbatch=2", tableName.getNameAsString(), testDir.toString() });
106    assertEquals(0, code, "test job failed");
107
108    FileSystem fs = TEST_UTIL.getTestFileSystem();
109
110    HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir);
111    assertEquals(tableName.getNameAsString(), tableHash.tableName);
112    assertEquals(batchSize, tableHash.batchSize);
113    assertEquals(numHashFiles, tableHash.numHashFiles);
114    assertEquals(numHashFiles - 1, tableHash.partitions.size());
115    for (ImmutableBytesWritable bytes : tableHash.partitions) {
116      LOG.debug("partition: " + Bytes.toInt(bytes.get()));
117    }
118
119    ImmutableMap<Integer, ImmutableBytesWritable> expectedHashes =
120      ImmutableMap.<Integer, ImmutableBytesWritable> builder()
121        .put(-1, new ImmutableBytesWritable(Bytes.fromHex("714cb10a9e3b5569852980edd8c6ca2f")))
122        .put(5, new ImmutableBytesWritable(Bytes.fromHex("28d961d9252ce8f8d44a07b38d3e1d96")))
123        .put(10, new ImmutableBytesWritable(Bytes.fromHex("f6bbc4a224d8fd929b783a92599eaffa")))
124        .put(15, new ImmutableBytesWritable(Bytes.fromHex("522deb5d97f73a414ecc11457be46881")))
125        .put(20, new ImmutableBytesWritable(Bytes.fromHex("b026f2611aaa46f7110116d807545352")))
126        .put(25, new ImmutableBytesWritable(Bytes.fromHex("39ffc1a3094aa12a2e90ffd9cef2ce93")))
127        .put(30, new ImmutableBytesWritable(Bytes.fromHex("f6b4d75727ce9a30ac29e4f08f601666")))
128        .put(35, new ImmutableBytesWritable(Bytes.fromHex("422e2d2f1eb79a8f02171a705a42c090")))
129        .put(40, new ImmutableBytesWritable(Bytes.fromHex("559ad61c900fffefea0a15abf8a97bc3")))
130        .put(45, new ImmutableBytesWritable(Bytes.fromHex("23019084513eca41cee436b2a29611cb")))
131        .put(50, new ImmutableBytesWritable(Bytes.fromHex("b40467d222ddb4949b142fe145ee9edc")))
132        .put(55, new ImmutableBytesWritable(Bytes.fromHex("372bf89fcd8ca4b7ab3c1add9d07f7e4")))
133        .put(60, new ImmutableBytesWritable(Bytes.fromHex("69ae0585e6255de27dce974e332b8f8b")))
134        .put(65, new ImmutableBytesWritable(Bytes.fromHex("8029610044297aad0abdbecd485d8e59")))
135        .put(70, new ImmutableBytesWritable(Bytes.fromHex("de5f784f7f78987b6e57ecfd81c8646f")))
136        .put(75, new ImmutableBytesWritable(Bytes.fromHex("1cd757cc4e1715c8c3b1c24447a1ec56")))
137        .put(80, new ImmutableBytesWritable(Bytes.fromHex("f9a53aacfeb6142b08066615e7038095")))
138        .put(85, new ImmutableBytesWritable(Bytes.fromHex("89b872b7e639df32d3276b33928c0c91")))
139        .put(90, new ImmutableBytesWritable(Bytes.fromHex("45eeac0646d46a474ea0484175faed38")))
140        .put(95, new ImmutableBytesWritable(Bytes.fromHex("f57c447e32a08f4bf1abb2892839ac56")))
141        .build();
142
143    Map<Integer, ImmutableBytesWritable> actualHashes = new HashMap<>();
144    Path dataDir = new Path(testDir, HashTable.HASH_DATA_DIR);
145    for (int i = 0; i < numHashFiles; i++) {
146      Path hashPath = new Path(dataDir, HashTable.TableHash.getDataFileName(i));
147
148      MapFile.Reader reader = new MapFile.Reader(hashPath, fs.getConf());
149      ImmutableBytesWritable key = new ImmutableBytesWritable();
150      ImmutableBytesWritable hash = new ImmutableBytesWritable();
151      while (reader.next(key, hash)) {
152        String keyString = Bytes.toHex(key.get(), key.getOffset(), key.getLength());
153        LOG.debug("Key: " + (keyString.isEmpty() ? "-1" : Integer.parseInt(keyString, 16))
154          + " Hash: " + Bytes.toHex(hash.get(), hash.getOffset(), hash.getLength()));
155
156        int intKey = -1;
157        if (key.getLength() > 0) {
158          intKey = Bytes.toInt(key.get(), key.getOffset(), key.getLength());
159        }
160        if (actualHashes.containsKey(intKey)) {
161          fail("duplicate key in data files: " + intKey);
162        }
163        actualHashes.put(intKey, new ImmutableBytesWritable(hash.copyBytes()));
164      }
165      reader.close();
166    }
167
168    FileStatus[] files = fs.listStatus(testDir);
169    for (FileStatus file : files) {
170      LOG.debug("Output file: " + file.getPath());
171    }
172
173    files = fs.listStatus(dataDir);
174    for (FileStatus file : files) {
175      LOG.debug("Data file: " + file.getPath());
176    }
177
178    if (!expectedHashes.equals(actualHashes)) {
179      LOG.error("Diff: " + Maps.difference(expectedHashes, actualHashes));
180    }
181    assertEquals(expectedHashes, actualHashes);
182
183    TEST_UTIL.deleteTable(tableName);
184    TEST_UTIL.cleanupDataTestDirOnTestFS();
185  }
186
187  @Test
188  public void testHashTableWithSha256(TestInfo testInfo) throws Exception {
189    final TableName tableName = TableName.valueOf(testInfo.getTestMethod().get().getName());
190    final byte[] family = Bytes.toBytes("family");
191    final byte[] column1 = Bytes.toBytes("c1");
192    final byte[] column2 = Bytes.toBytes("c2");
193    final byte[] column3 = Bytes.toBytes("c3");
194
195    int numRows = 100;
196    int numRegions = 10;
197    int numHashFiles = 3;
198
199    byte[][] splitRows = new byte[numRegions - 1][];
200    for (int i = 1; i < numRegions; i++) {
201      splitRows[i - 1] = Bytes.toBytes(numRows * i / numRegions);
202    }
203
204    long timestamp = 1430764183454L;
205    Table t1 = TEST_UTIL.createTable(tableName, family, splitRows);
206    for (int i = 0; i < numRows; i++) {
207      Put p = new Put(Bytes.toBytes(i), timestamp);
208      p.addColumn(family, column1, column1);
209      p.addColumn(family, column2, column2);
210      p.addColumn(family, column3, column3);
211      t1.put(p);
212    }
213    t1.close();
214
215    HashTable hashTable = new HashTable(TEST_UTIL.getConfiguration());
216    Path testDir = TEST_UTIL.getDataTestDirOnTestFS(tableName.getNameAsString());
217
218    long batchSize = 300;
219    int code = hashTable.run(
220      new String[] { "--batchsize=" + batchSize, "--numhashfiles=" + numHashFiles, "--scanbatch=2",
221        "--hashAlgorithm=SHA-256", tableName.getNameAsString(), testDir.toString() });
222    assertEquals(0, code, "test job failed");
223
224    FileSystem fs = TEST_UTIL.getTestFileSystem();
225    HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir);
226    assertEquals("SHA-256", tableHash.hashAlgorithm,
227      "manifest must record the algorithm used to produce the digests");
228
229    ImmutableMap<Integer, ImmutableBytesWritable> expectedHashes =
230      ImmutableMap.<Integer, ImmutableBytesWritable> builder()
231        .put(-1,
232          new ImmutableBytesWritable(
233            Bytes.fromHex("e1452041ec73beb9b5677c0b74ed73a9118ca502d9b2b9abe62ba18d92fc51be")))
234        .put(5,
235          new ImmutableBytesWritable(
236            Bytes.fromHex("6c55999354be6571a8912b7781d37359e88153173cb5fcf143211682d7ac06c5")))
237        .put(10,
238          new ImmutableBytesWritable(
239            Bytes.fromHex("44e9c6aedd838e9a9c78d1983ce5139fda3ef3b0b8a3812893512e7c6f05c51f")))
240        .put(15,
241          new ImmutableBytesWritable(
242            Bytes.fromHex("57f18d831a22057155fb9cfb9bf4706a1c2cf134fc1e92262c8748ca545b58a1")))
243        .put(20,
244          new ImmutableBytesWritable(
245            Bytes.fromHex("83119dfb3deec8901f69cccac31c1039c624870de0cff4b85946147359966d42")))
246        .put(25,
247          new ImmutableBytesWritable(
248            Bytes.fromHex("933eabcc837b7e7b24be2b553b1ec50fb00e95cbf3d8f898f59f5f8bbace0a60")))
249        .put(30,
250          new ImmutableBytesWritable(
251            Bytes.fromHex("b6a3752581d74f362f64b59d56a96ad52763b3245dd5bfc85f6fe9261f2d03f1")))
252        .put(35,
253          new ImmutableBytesWritable(
254            Bytes.fromHex("d3784bac940584dbc0754eff73bc39cce4f9c4aec87939747fff4b0ecc6a0617")))
255        .put(40,
256          new ImmutableBytesWritable(
257            Bytes.fromHex("87f4b810b751abd64e9c22cb7b40b5ce600965e4b8eda2c0eae075d5623088c2")))
258        .put(45,
259          new ImmutableBytesWritable(
260            Bytes.fromHex("ce1f422fcdbe0f926e10b68cb3ead497066560235a1341d29151a9e1847deaab")))
261        .put(50,
262          new ImmutableBytesWritable(
263            Bytes.fromHex("118c771b1eeabe8523f1ad96fb5bf16537d76e0b3855d84c3dbac864de726229")))
264        .put(55,
265          new ImmutableBytesWritable(
266            Bytes.fromHex("00dfe840a275aca3de9268ea61699881a441d47fea93071bca69c39bf7845dac")))
267        .put(60,
268          new ImmutableBytesWritable(
269            Bytes.fromHex("062239ede0306fd9046eb5a3a2f66d997b37c8c1a4defc35789644e66930fff1")))
270        .put(65,
271          new ImmutableBytesWritable(
272            Bytes.fromHex("09a63a94681e75edf975f9b46fe94f1e592840a627cac728a77728b7f9f695aa")))
273        .put(70,
274          new ImmutableBytesWritable(
275            Bytes.fromHex("e634097804d269cbaeef49ce7a009a1388e6f636700badcab05fe20759f6043f")))
276        .put(75,
277          new ImmutableBytesWritable(
278            Bytes.fromHex("69f614ccc16a9c651538681525be1b2e40859c9833a55d9009d77ef39abaffcd")))
279        .put(80,
280          new ImmutableBytesWritable(
281            Bytes.fromHex("6530b957c8064fc043620bee89647960de0d27a0f986b40f183f5347093a12d2")))
282        .put(85,
283          new ImmutableBytesWritable(
284            Bytes.fromHex("403ed0417cd8ab955cbd4c8fe84218cd152b95da9237300050e9b7c90c809faf")))
285        .put(90,
286          new ImmutableBytesWritable(
287            Bytes.fromHex("e27fb9193ae3363fec70a148e62df7c57d514dd7de74a6a332fbda002af67efb")))
288        .put(95,
289          new ImmutableBytesWritable(
290            Bytes.fromHex("a31cb9d55e37f17c773a6eee757f15d6d7fe52d77cd1037fcb7ee00ed2bef6c9")))
291        .build();
292
293    Map<Integer, ImmutableBytesWritable> actualHashes = new HashMap<>();
294    Path dataDir = new Path(testDir, HashTable.HASH_DATA_DIR);
295    for (int i = 0; i < numHashFiles; i++) {
296      Path hashPath = new Path(dataDir, HashTable.TableHash.getDataFileName(i));
297      try (MapFile.Reader reader = new MapFile.Reader(hashPath, fs.getConf())) {
298        ImmutableBytesWritable key = new ImmutableBytesWritable();
299        ImmutableBytesWritable hash = new ImmutableBytesWritable();
300        while (reader.next(key, hash)) {
301          int intKey = -1;
302          if (key.getLength() > 0) {
303            intKey = Bytes.toInt(key.get(), key.getOffset(), key.getLength());
304          }
305          if (actualHashes.containsKey(intKey)) {
306            fail("duplicate key in data files: " + intKey);
307          }
308          actualHashes.put(intKey, new ImmutableBytesWritable(hash.copyBytes()));
309        }
310      }
311    }
312
313    if (!expectedHashes.equals(actualHashes)) {
314      LOG.error("Diff: " + Maps.difference(expectedHashes, actualHashes));
315    }
316    assertEquals(expectedHashes, actualHashes);
317
318    TEST_UTIL.deleteTable(tableName);
319    TEST_UTIL.cleanupDataTestDirOnTestFS();
320  }
321
322  /**
323   * A manifest written by an older HashTable does not carry the hashAlgorithm property. Reading
324   * such a manifest must default to MD5 so existing on-disk hash data stays usable.
325   */
326  @Test
327  public void testManifestWithoutAlgorithmDefaultsToMd5(TestInfo testInfo) throws Exception {
328    Path testDir =
329      TEST_UTIL.getDataTestDirOnTestFS(testInfo.getTestMethod().get().getName() + "_legacy");
330    FileSystem fs = TEST_UTIL.getTestFileSystem();
331    fs.mkdirs(testDir);
332
333    // hand-craft a legacy manifest with no hashAlgorithm property
334    Properties p = new Properties();
335    p.setProperty("table", "legacy");
336    p.setProperty("targetBatchSize", "8000");
337    p.setProperty("numHashFiles", "1");
338    p.setProperty("rawScan", "false");
339    Path manifest = new Path(testDir, HashTable.MANIFEST_FILE_NAME);
340    try (OutputStream out = fs.create(manifest)) {
341      p.store(out, null);
342    }
343
344    // write an empty partitions file so TableHash.read() succeeds
345    HashTable.TableHash empty = new HashTable.TableHash();
346    empty.partitions = new java.util.ArrayList<>();
347    empty.writePartitionFile(fs.getConf(), new Path(testDir, HashTable.PARTITIONS_FILE_NAME));
348
349    HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir);
350    assertEquals(HashTable.DEFAULT_HASH_ALGORITHM, tableHash.hashAlgorithm,
351      "Manifests without an algorithm property must default to MD5 for back-compat");
352
353    TEST_UTIL.cleanupDataTestDirOnTestFS();
354  }
355}