001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileStatus; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.CellUtil; 033import org.apache.hadoop.hbase.HBaseTestingUtil; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.HDFSBlocksDistribution; 036import org.apache.hadoop.hbase.StartTestingClusterOption; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.Admin; 039import org.apache.hadoop.hbase.client.RegionLocator; 040import org.apache.hadoop.hbase.client.Result; 041import org.apache.hadoop.hbase.client.ResultScanner; 042import org.apache.hadoop.hbase.client.Scan; 043import org.apache.hadoop.hbase.client.Table; 044import org.apache.hadoop.hbase.regionserver.HRegion; 045import org.apache.hadoop.hbase.tool.BulkLoadHFiles; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.junit.After; 048import org.junit.AfterClass; 049import org.junit.Before; 050import org.junit.Test; 051import org.junit.runners.Parameterized.Parameter; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055public class MRIncrementalLoadTestBase extends HFileOutputFormat2TestBase { 056 057 private static final Logger LOG = LoggerFactory.getLogger(MRIncrementalLoadTestBase.class); 058 059 private static boolean SHOULD_KEEP_LOCALITY; 060 061 private static String[] HOSTNAMES; 062 063 @Parameter(0) 064 public boolean shouldChangeRegions; 065 066 @Parameter(1) 067 public boolean putSortReducer; 068 069 @Parameter(2) 070 public List<String> tableStr; 071 072 private Map<String, Table> allTables; 073 074 private List<HFileOutputFormat2.TableInfo> tableInfo; 075 076 private Path testDir; 077 078 protected static void setupCluster(boolean shouldKeepLocality) throws Exception { 079 SHOULD_KEEP_LOCALITY = shouldKeepLocality; 080 Configuration conf = UTIL.getConfiguration(); 081 conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality); 082 // We should change host count higher than hdfs replica count when MiniHBaseCluster supports 083 // explicit hostnames parameter just like MiniDFSCluster does. 084 int hostCount = shouldKeepLocality ? 3 : 1; 085 086 HOSTNAMES = new String[hostCount]; 087 for (int i = 0; i < hostCount; ++i) { 088 HOSTNAMES[i] = "datanode_" + i; 089 } 090 StartTestingClusterOption option = StartTestingClusterOption.builder() 091 .numRegionServers(hostCount).dataNodeHosts(HOSTNAMES).build(); 092 UTIL.getConfiguration().unset(HConstants.TEMPORARY_FS_DIRECTORY_KEY); 093 UTIL.startMiniCluster(option); 094 095 } 096 097 @AfterClass 098 public static void tearDownAfterClass() throws IOException { 099 UTIL.shutdownMiniCluster(); 100 } 101 102 @Before 103 public void setUp() throws IOException { 104 int regionNum = SHOULD_KEEP_LOCALITY ? 20 : 5; 105 allTables = new HashMap<>(tableStr.size()); 106 tableInfo = new ArrayList<>(tableStr.size()); 107 for (String tableStrSingle : tableStr) { 108 byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1); 109 TableName tableName = TableName.valueOf(tableStrSingle); 110 Table table = UTIL.createTable(tableName, FAMILIES, splitKeys); 111 112 RegionLocator r = UTIL.getConnection().getRegionLocator(tableName); 113 assertEquals("Should start with empty table", 0, HBaseTestingUtil.countRows(table)); 114 int numRegions = r.getStartKeys().length; 115 assertEquals("Should make " + regionNum + " regions", numRegions, regionNum); 116 117 allTables.put(tableStrSingle, table); 118 tableInfo.add(new HFileOutputFormat2.TableInfo(table.getDescriptor(), r)); 119 } 120 testDir = UTIL.getDataTestDirOnTestFS(tableStr.get(0)); 121 } 122 123 @After 124 public void tearDown() throws IOException { 125 for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) { 126 tableInfoSingle.getRegionLocator().close(); 127 } 128 tableInfo.clear(); 129 allTables.clear(); 130 for (String tableStrSingle : tableStr) { 131 UTIL.deleteTable(TableName.valueOf(tableStrSingle)); 132 } 133 } 134 135 @Test 136 public void doIncrementalLoadTest() throws Exception { 137 boolean writeMultipleTables = tableStr.size() > 1; 138 // Generate the bulk load files 139 runIncrementalPELoad(UTIL.getConfiguration(), tableInfo, testDir, putSortReducer); 140 if (writeMultipleTables) { 141 testDir = new Path(testDir, "default"); 142 } 143 144 for (Table tableSingle : allTables.values()) { 145 // This doesn't write into the table, just makes files 146 assertEquals("HFOF should not touch actual table", 0, 147 HBaseTestingUtil.countRows(tableSingle)); 148 } 149 int numTableDirs = 0; 150 FileStatus[] fss = testDir.getFileSystem(UTIL.getConfiguration()).listStatus(testDir); 151 for (FileStatus tf : fss) { 152 Path tablePath = testDir; 153 if (writeMultipleTables) { 154 if (allTables.containsKey(tf.getPath().getName())) { 155 ++numTableDirs; 156 tablePath = tf.getPath(); 157 } else { 158 continue; 159 } 160 } 161 162 // Make sure that a directory was created for every CF 163 int dir = 0; 164 fss = tablePath.getFileSystem(UTIL.getConfiguration()).listStatus(tablePath); 165 for (FileStatus f : fss) { 166 for (byte[] family : FAMILIES) { 167 if (Bytes.toString(family).equals(f.getPath().getName())) { 168 ++dir; 169 } 170 } 171 } 172 assertEquals("Column family not found in FS.", FAMILIES.length, dir); 173 } 174 if (writeMultipleTables) { 175 assertEquals("Dir for all input tables not created", numTableDirs, allTables.size()); 176 } 177 178 Admin admin = UTIL.getAdmin(); 179 180 // handle the split case 181 if (shouldChangeRegions) { 182 Table chosenTable = allTables.values().iterator().next(); 183 // Choose a semi-random table if multiple tables are available 184 LOG.info("Changing regions in table " + chosenTable.getName().getNameAsString()); 185 admin.disableTable(chosenTable.getName()); 186 UTIL.waitUntilNoRegionsInTransition(); 187 188 UTIL.deleteTable(chosenTable.getName()); 189 byte[][] newSplitKeys = generateRandomSplitKeys(14); 190 UTIL.createTable(chosenTable.getName(), FAMILIES, newSplitKeys); 191 UTIL.waitTableAvailable(chosenTable.getName()); 192 } 193 194 // Perform the actual load 195 for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) { 196 Path tableDir = testDir; 197 String tableNameStr = singleTableInfo.getTableDescriptor().getTableName().getNameAsString(); 198 LOG.info("Running BulkLoadHFiles on table" + tableNameStr); 199 if (writeMultipleTables) { 200 tableDir = new Path(testDir, tableNameStr); 201 } 202 Table currentTable = allTables.get(tableNameStr); 203 TableName currentTableName = currentTable.getName(); 204 BulkLoadHFiles.create(UTIL.getConfiguration()).bulkLoad(currentTableName, tableDir); 205 206 // Ensure data shows up 207 int expectedRows = 0; 208 if (putSortReducer) { 209 // no rows should be extracted 210 assertEquals("BulkLoadHFiles should put expected data in table", expectedRows, 211 HBaseTestingUtil.countRows(currentTable)); 212 } else { 213 expectedRows = NMapInputFormat.getNumMapTasks(UTIL.getConfiguration()) * ROWSPERSPLIT; 214 assertEquals("BulkLoadHFiles should put expected data in table", expectedRows, 215 HBaseTestingUtil.countRows(currentTable)); 216 Scan scan = new Scan(); 217 ResultScanner results = currentTable.getScanner(scan); 218 for (Result res : results) { 219 assertEquals(FAMILIES.length, res.rawCells().length); 220 Cell first = res.rawCells()[0]; 221 for (Cell kv : res.rawCells()) { 222 assertTrue(CellUtil.matchingRows(first, kv)); 223 assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv))); 224 } 225 } 226 results.close(); 227 } 228 String tableDigestBefore = UTIL.checksumRows(currentTable); 229 // Check region locality 230 HDFSBlocksDistribution hbd = new HDFSBlocksDistribution(); 231 for (HRegion region : UTIL.getHBaseCluster().getRegions(currentTableName)) { 232 hbd.add(region.getHDFSBlocksDistribution()); 233 } 234 for (String hostname : HOSTNAMES) { 235 float locality = hbd.getBlockLocalityIndex(hostname); 236 LOG.info("locality of [" + hostname + "]: " + locality); 237 assertEquals(100, (int) (locality * 100)); 238 } 239 240 // Cause regions to reopen 241 admin.disableTable(currentTableName); 242 while (!admin.isTableDisabled(currentTableName)) { 243 Thread.sleep(200); 244 LOG.info("Waiting for table to disable"); 245 } 246 admin.enableTable(currentTableName); 247 UTIL.waitTableAvailable(currentTableName); 248 assertEquals("Data should remain after reopening of regions", tableDigestBefore, 249 UTIL.checksumRows(currentTable)); 250 } 251 } 252}