001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileStatus; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.CellUtil; 033import org.apache.hadoop.hbase.HBaseTestingUtil; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.HDFSBlocksDistribution; 036import org.apache.hadoop.hbase.StartTestingClusterOption; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.Admin; 039import org.apache.hadoop.hbase.client.RegionLocator; 040import org.apache.hadoop.hbase.client.Result; 041import org.apache.hadoop.hbase.client.ResultScanner; 042import org.apache.hadoop.hbase.client.Scan; 043import org.apache.hadoop.hbase.client.Table; 044import org.apache.hadoop.hbase.regionserver.HRegion; 045import org.apache.hadoop.hbase.tool.BulkLoadHFiles; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.junit.jupiter.api.AfterAll; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051public class MRIncrementalLoadTestBase extends HFileOutputFormat2TestBase { 052 053 private static final Logger LOG = LoggerFactory.getLogger(MRIncrementalLoadTestBase.class); 054 055 private static boolean SHOULD_KEEP_LOCALITY; 056 057 private static String[] HOSTNAMES; 058 059 public boolean shouldChangeRegions; 060 061 public boolean putSortReducer; 062 063 public List<String> tableStr; 064 065 private Map<String, Table> allTables; 066 067 private List<HFileOutputFormat2.TableInfo> tableInfo; 068 069 private Path testDir; 070 071 protected static void setupCluster(boolean shouldKeepLocality) throws Exception { 072 SHOULD_KEEP_LOCALITY = shouldKeepLocality; 073 Configuration conf = UTIL.getConfiguration(); 074 conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality); 075 // We should change host count higher than hdfs replica count when MiniHBaseCluster supports 076 // explicit hostnames parameter just like MiniDFSCluster does. 077 int hostCount = shouldKeepLocality ? 3 : 1; 078 079 HOSTNAMES = new String[hostCount]; 080 for (int i = 0; i < hostCount; ++i) { 081 HOSTNAMES[i] = "datanode_" + i; 082 } 083 StartTestingClusterOption option = StartTestingClusterOption.builder() 084 .numRegionServers(hostCount).dataNodeHosts(HOSTNAMES).build(); 085 UTIL.getConfiguration().unset(HConstants.TEMPORARY_FS_DIRECTORY_KEY); 086 UTIL.startMiniCluster(option); 087 088 } 089 090 @AfterAll 091 public static void tearDownAfterClass() throws IOException { 092 UTIL.shutdownMiniCluster(); 093 } 094 095 public void setUp() throws IOException { 096 int regionNum = SHOULD_KEEP_LOCALITY ? 20 : 5; 097 allTables = new HashMap<>(tableStr.size()); 098 tableInfo = new ArrayList<>(tableStr.size()); 099 for (String tableStrSingle : tableStr) { 100 byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1); 101 TableName tableName = TableName.valueOf(tableStrSingle); 102 Table table = UTIL.createTable(tableName, FAMILIES, splitKeys); 103 104 RegionLocator r = UTIL.getConnection().getRegionLocator(tableName); 105 assertEquals(0, HBaseTestingUtil.countRows(table), "Should start with empty table"); 106 int numRegions = r.getStartKeys().length; 107 assertEquals(numRegions, regionNum, "Should make " + regionNum + " regions"); 108 109 allTables.put(tableStrSingle, table); 110 tableInfo.add(new HFileOutputFormat2.TableInfo(table.getDescriptor(), r)); 111 } 112 testDir = UTIL.getDataTestDirOnTestFS(tableStr.get(0)); 113 } 114 115 public void tearDown() throws IOException { 116 for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) { 117 tableInfoSingle.getRegionLocator().close(); 118 } 119 tableInfo.clear(); 120 allTables.clear(); 121 for (String tableStrSingle : tableStr) { 122 UTIL.deleteTable(TableName.valueOf(tableStrSingle)); 123 } 124 } 125 126 protected void runTest(boolean shouldChangeRegions, boolean putSortReducer, List<String> tableStr) 127 throws Exception { 128 this.shouldChangeRegions = shouldChangeRegions; 129 this.putSortReducer = putSortReducer; 130 this.tableStr = tableStr; 131 setUp(); 132 try { 133 doIncrementalLoadTest(); 134 } finally { 135 tearDown(); 136 } 137 } 138 139 public void doIncrementalLoadTest() throws Exception { 140 boolean writeMultipleTables = tableStr.size() > 1; 141 // Generate the bulk load files 142 runIncrementalPELoad(UTIL.getConfiguration(), tableInfo, testDir, putSortReducer); 143 if (writeMultipleTables) { 144 testDir = new Path(testDir, "default"); 145 } 146 147 for (Table tableSingle : allTables.values()) { 148 // This doesn't write into the table, just makes files 149 assertEquals(0, HBaseTestingUtil.countRows(tableSingle), 150 "HFOF should not touch actual table"); 151 } 152 int numTableDirs = 0; 153 FileStatus[] fss = testDir.getFileSystem(UTIL.getConfiguration()).listStatus(testDir); 154 for (FileStatus tf : fss) { 155 Path tablePath = testDir; 156 if (writeMultipleTables) { 157 if (allTables.containsKey(tf.getPath().getName())) { 158 ++numTableDirs; 159 tablePath = tf.getPath(); 160 } else { 161 continue; 162 } 163 } 164 165 // Make sure that a directory was created for every CF 166 int dir = 0; 167 fss = tablePath.getFileSystem(UTIL.getConfiguration()).listStatus(tablePath); 168 for (FileStatus f : fss) { 169 for (byte[] family : FAMILIES) { 170 if (Bytes.toString(family).equals(f.getPath().getName())) { 171 ++dir; 172 } 173 } 174 } 175 assertEquals(FAMILIES.length, dir, "Column family not found in FS."); 176 } 177 if (writeMultipleTables) { 178 assertEquals(numTableDirs, allTables.size(), "Dir for all input tables not created"); 179 } 180 181 Admin admin = UTIL.getAdmin(); 182 183 // handle the split case 184 if (shouldChangeRegions) { 185 Table chosenTable = allTables.values().iterator().next(); 186 // Choose a semi-random table if multiple tables are available 187 LOG.info("Changing regions in table " + chosenTable.getName().getNameAsString()); 188 admin.disableTable(chosenTable.getName()); 189 UTIL.waitUntilNoRegionsInTransition(); 190 191 UTIL.deleteTable(chosenTable.getName()); 192 byte[][] newSplitKeys = generateRandomSplitKeys(14); 193 UTIL.createTable(chosenTable.getName(), FAMILIES, newSplitKeys); 194 UTIL.waitTableAvailable(chosenTable.getName()); 195 } 196 197 // Perform the actual load 198 for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) { 199 Path tableDir = testDir; 200 String tableNameStr = singleTableInfo.getTableDescriptor().getTableName().getNameAsString(); 201 LOG.info("Running BulkLoadHFiles on table" + tableNameStr); 202 if (writeMultipleTables) { 203 tableDir = new Path(testDir, tableNameStr); 204 } 205 Table currentTable = allTables.get(tableNameStr); 206 TableName currentTableName = currentTable.getName(); 207 BulkLoadHFiles.create(UTIL.getConfiguration()).bulkLoad(currentTableName, tableDir); 208 209 // Ensure data shows up 210 int expectedRows = 0; 211 if (putSortReducer) { 212 // no rows should be extracted 213 assertEquals(expectedRows, HBaseTestingUtil.countRows(currentTable), 214 "BulkLoadHFiles should put expected data in table"); 215 } else { 216 expectedRows = NMapInputFormat.getNumMapTasks(UTIL.getConfiguration()) * ROWSPERSPLIT; 217 assertEquals(expectedRows, HBaseTestingUtil.countRows(currentTable), 218 "BulkLoadHFiles should put expected data in table"); 219 Scan scan = new Scan(); 220 ResultScanner results = currentTable.getScanner(scan); 221 for (Result res : results) { 222 assertEquals(FAMILIES.length, res.rawCells().length); 223 Cell first = res.rawCells()[0]; 224 for (Cell kv : res.rawCells()) { 225 assertTrue(CellUtil.matchingRows(first, kv)); 226 assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv))); 227 } 228 } 229 results.close(); 230 } 231 String tableDigestBefore = UTIL.checksumRows(currentTable); 232 // Check region locality 233 HDFSBlocksDistribution hbd = new HDFSBlocksDistribution(); 234 for (HRegion region : UTIL.getHBaseCluster().getRegions(currentTableName)) { 235 hbd.add(region.getHDFSBlocksDistribution()); 236 } 237 for (String hostname : HOSTNAMES) { 238 float locality = hbd.getBlockLocalityIndex(hostname); 239 LOG.info("locality of [" + hostname + "]: " + locality); 240 assertEquals(100, (int) (locality * 100)); 241 } 242 243 // Cause regions to reopen 244 admin.disableTable(currentTableName); 245 while (!admin.isTableDisabled(currentTableName)) { 246 Thread.sleep(200); 247 LOG.info("Waiting for table to disable"); 248 } 249 admin.enableTable(currentTableName); 250 UTIL.waitTableAvailable(currentTableName); 251 assertEquals(tableDigestBefore, UTIL.checksumRows(currentTable), 252 "Data should remain after reopening of regions"); 253 } 254 } 255}