001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.balancer; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotEquals; 023 024import java.io.IOException; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029import java.util.stream.Collectors; 030import org.apache.hadoop.hbase.HBaseTestingUtil; 031import org.apache.hadoop.hbase.HRegionLocation; 032import org.apache.hadoop.hbase.ServerName; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.Admin; 035import org.apache.hadoop.hbase.client.Connection; 036import org.apache.hadoop.hbase.client.RegionInfo; 037import org.apache.hadoop.hbase.client.TableDescriptor; 038import org.apache.hadoop.hbase.quotas.QuotaUtil; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; 044 045public final class BalancerConditionalsTestUtil { 046 047 private static final Logger LOG = LoggerFactory.getLogger(BalancerConditionalsTestUtil.class); 048 049 private BalancerConditionalsTestUtil() { 050 } 051 052 static byte[][] generateSplits(int numRegions) { 053 byte[][] splitKeys = new byte[numRegions - 1][]; 054 for (int i = 0; i < numRegions - 1; i++) { 055 splitKeys[i] = 056 Bytes.toBytes(String.format("%09d", (i + 1) * (Integer.MAX_VALUE / numRegions))); 057 } 058 return splitKeys; 059 } 060 061 static void printRegionLocations(Connection connection) throws IOException { 062 Admin admin = connection.getAdmin(); 063 064 // Get all table names in the cluster 065 Set<TableName> tableNames = admin.listTableDescriptors(true).stream() 066 .map(TableDescriptor::getTableName).collect(Collectors.toSet()); 067 068 // Group regions by server 069 Map<ServerName, Map<TableName, List<RegionInfo>>> serverToRegions = 070 admin.getClusterMetrics().getLiveServerMetrics().keySet().stream() 071 .collect(Collectors.toMap(server -> server, server -> { 072 try { 073 return listRegionsByTable(connection, server, tableNames); 074 } catch (IOException e) { 075 throw new RuntimeException(e); 076 } 077 })); 078 079 // Pretty print region locations 080 StringBuilder regionLocationOutput = new StringBuilder(); 081 regionLocationOutput.append("Pretty printing region locations...\n"); 082 serverToRegions.forEach((server, tableRegions) -> { 083 regionLocationOutput.append("Server: " + server.getServerName() + "\n"); 084 tableRegions.forEach((table, regions) -> { 085 if (regions.isEmpty()) { 086 return; 087 } 088 regionLocationOutput.append(" Table: " + table.getNameAsString() + "\n"); 089 regions.forEach(region -> regionLocationOutput 090 .append(String.format(" Region: %s, start: %s, end: %s, replica: %s\n", 091 region.getEncodedName(), Bytes.toString(region.getStartKey()), 092 Bytes.toString(region.getEndKey()), region.getReplicaId()))); 093 }); 094 }); 095 LOG.info(regionLocationOutput.toString()); 096 } 097 098 private static Map<TableName, List<RegionInfo>> listRegionsByTable(Connection connection, 099 ServerName server, Set<TableName> tableNames) throws IOException { 100 Admin admin = connection.getAdmin(); 101 102 // Find regions for each table 103 return tableNames.stream().collect(Collectors.toMap(tableName -> tableName, tableName -> { 104 List<RegionInfo> allRegions = null; 105 try { 106 allRegions = admin.getRegions(server); 107 } catch (IOException e) { 108 throw new RuntimeException(e); 109 } 110 return allRegions.stream().filter(region -> region.getTable().equals(tableName)) 111 .collect(Collectors.toList()); 112 })); 113 } 114 115 static void validateReplicaDistribution(Connection connection, TableName tableName, 116 boolean shouldBeDistributed) { 117 Map<ServerName, List<RegionInfo>> serverToRegions = null; 118 try { 119 serverToRegions = connection.getRegionLocator(tableName).getAllRegionLocations().stream() 120 .collect(Collectors.groupingBy(location -> location.getServerName(), 121 Collectors.mapping(location -> location.getRegion(), Collectors.toList()))); 122 } catch (IOException e) { 123 throw new RuntimeException(e); 124 } 125 126 if (shouldBeDistributed) { 127 // Ensure no server hosts more than one replica of any region 128 for (Map.Entry<ServerName, List<RegionInfo>> serverAndRegions : serverToRegions.entrySet()) { 129 List<RegionInfo> regionInfos = serverAndRegions.getValue(); 130 Set<byte[]> startKeys = new HashSet<>(); 131 for (RegionInfo regionInfo : regionInfos) { 132 // each region should have a distinct start key 133 assertFalse( 134 "Each region should have its own start key, " 135 + "demonstrating it is not a replica of any others on this host", 136 startKeys.contains(regionInfo.getStartKey())); 137 startKeys.add(regionInfo.getStartKey()); 138 } 139 } 140 } else { 141 // Ensure all replicas are on the same server 142 assertEquals("All regions should share one server", 1, serverToRegions.size()); 143 } 144 } 145 146 static void validateRegionLocations(Map<TableName, Set<ServerName>> tableToServers, 147 TableName productTableName, boolean shouldBeBalanced) { 148 ServerName metaServer = 149 tableToServers.get(TableName.META_TABLE_NAME).stream().findFirst().orElseThrow(); 150 ServerName quotaServer = 151 tableToServers.get(QuotaUtil.QUOTA_TABLE_NAME).stream().findFirst().orElseThrow(); 152 Set<ServerName> productServers = tableToServers.get(productTableName); 153 154 if (shouldBeBalanced) { 155 for (ServerName server : productServers) { 156 assertNotEquals("Meta table and product table should not share servers", server, 157 metaServer); 158 assertNotEquals("Quota table and product table should not share servers", server, 159 quotaServer); 160 } 161 assertNotEquals("The meta server and quotas server should be different", metaServer, 162 quotaServer); 163 } else { 164 for (ServerName server : productServers) { 165 assertEquals("Meta table and product table must share servers", server, metaServer); 166 assertEquals("Quota table and product table must share servers", server, quotaServer); 167 } 168 assertEquals("The meta server and quotas server must be the same", metaServer, quotaServer); 169 } 170 } 171 172 static Map<TableName, Set<ServerName>> getTableToServers(Connection connection, 173 Set<TableName> tableNames) { 174 return tableNames.stream().collect(Collectors.toMap(t -> t, t -> { 175 try { 176 return connection.getRegionLocator(t).getAllRegionLocations().stream() 177 .map(HRegionLocation::getServerName).collect(Collectors.toSet()); 178 } catch (IOException e) { 179 throw new RuntimeException(e); 180 } 181 })); 182 } 183 184 @FunctionalInterface 185 interface AssertionRunnable { 186 void run() throws AssertionError; 187 } 188 189 static void validateAssertionsWithRetries(HBaseTestingUtil testUtil, boolean runBalancerOnFailure, 190 AssertionRunnable assertion) { 191 validateAssertionsWithRetries(testUtil, runBalancerOnFailure, ImmutableSet.of(assertion)); 192 } 193 194 static void validateAssertionsWithRetries(HBaseTestingUtil testUtil, boolean runBalancerOnFailure, 195 Set<AssertionRunnable> assertions) { 196 int maxAttempts = 50; 197 for (int i = 0; i < maxAttempts; i++) { 198 try { 199 for (AssertionRunnable assertion : assertions) { 200 assertion.run(); 201 } 202 } catch (AssertionError e) { 203 if (i == maxAttempts - 1) { 204 throw e; 205 } 206 try { 207 LOG.warn("Failed to validate region locations. Will retry", e); 208 Thread.sleep(1000); 209 BalancerConditionalsTestUtil.printRegionLocations(testUtil.getConnection()); 210 if (runBalancerOnFailure) { 211 testUtil.getAdmin().balance(); 212 } 213 Thread.sleep(1000); 214 } catch (Exception ex) { 215 throw new RuntimeException(ex); 216 } 217 } 218 } 219 } 220 221}