001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertNotEquals;
023
024import java.io.IOException;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.stream.Collectors;
030import org.apache.hadoop.hbase.HBaseTestingUtil;
031import org.apache.hadoop.hbase.HRegionLocation;
032import org.apache.hadoop.hbase.ServerName;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.Admin;
035import org.apache.hadoop.hbase.client.Connection;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.client.TableDescriptor;
038import org.apache.hadoop.hbase.quotas.QuotaUtil;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
044
045public final class BalancerConditionalsTestUtil {
046
047  private static final Logger LOG = LoggerFactory.getLogger(BalancerConditionalsTestUtil.class);
048
049  private BalancerConditionalsTestUtil() {
050  }
051
052  static byte[][] generateSplits(int numRegions) {
053    byte[][] splitKeys = new byte[numRegions - 1][];
054    for (int i = 0; i < numRegions - 1; i++) {
055      splitKeys[i] =
056        Bytes.toBytes(String.format("%09d", (i + 1) * (Integer.MAX_VALUE / numRegions)));
057    }
058    return splitKeys;
059  }
060
061  static void printRegionLocations(Connection connection) throws IOException {
062    Admin admin = connection.getAdmin();
063
064    // Get all table names in the cluster
065    Set<TableName> tableNames = admin.listTableDescriptors(true).stream()
066      .map(TableDescriptor::getTableName).collect(Collectors.toSet());
067
068    // Group regions by server
069    Map<ServerName, Map<TableName, List<RegionInfo>>> serverToRegions =
070      admin.getClusterMetrics().getLiveServerMetrics().keySet().stream()
071        .collect(Collectors.toMap(server -> server, server -> {
072          try {
073            return listRegionsByTable(connection, server, tableNames);
074          } catch (IOException e) {
075            throw new RuntimeException(e);
076          }
077        }));
078
079    // Pretty print region locations
080    StringBuilder regionLocationOutput = new StringBuilder();
081    regionLocationOutput.append("Pretty printing region locations...\n");
082    serverToRegions.forEach((server, tableRegions) -> {
083      regionLocationOutput.append("Server: " + server.getServerName() + "\n");
084      tableRegions.forEach((table, regions) -> {
085        if (regions.isEmpty()) {
086          return;
087        }
088        regionLocationOutput.append("  Table: " + table.getNameAsString() + "\n");
089        regions.forEach(region -> regionLocationOutput
090          .append(String.format("    Region: %s, start: %s, end: %s, replica: %s\n",
091            region.getEncodedName(), Bytes.toString(region.getStartKey()),
092            Bytes.toString(region.getEndKey()), region.getReplicaId())));
093      });
094    });
095    LOG.info(regionLocationOutput.toString());
096  }
097
098  private static Map<TableName, List<RegionInfo>> listRegionsByTable(Connection connection,
099    ServerName server, Set<TableName> tableNames) throws IOException {
100    Admin admin = connection.getAdmin();
101
102    // Find regions for each table
103    return tableNames.stream().collect(Collectors.toMap(tableName -> tableName, tableName -> {
104      List<RegionInfo> allRegions = null;
105      try {
106        allRegions = admin.getRegions(server);
107      } catch (IOException e) {
108        throw new RuntimeException(e);
109      }
110      return allRegions.stream().filter(region -> region.getTable().equals(tableName))
111        .collect(Collectors.toList());
112    }));
113  }
114
115  static void validateReplicaDistribution(Connection connection, TableName tableName,
116    boolean shouldBeDistributed) {
117    Map<ServerName, List<RegionInfo>> serverToRegions = null;
118    try {
119      serverToRegions = connection.getRegionLocator(tableName).getAllRegionLocations().stream()
120        .collect(Collectors.groupingBy(location -> location.getServerName(),
121          Collectors.mapping(location -> location.getRegion(), Collectors.toList())));
122    } catch (IOException e) {
123      throw new RuntimeException(e);
124    }
125
126    if (shouldBeDistributed) {
127      // Ensure no server hosts more than one replica of any region
128      for (Map.Entry<ServerName, List<RegionInfo>> serverAndRegions : serverToRegions.entrySet()) {
129        List<RegionInfo> regionInfos = serverAndRegions.getValue();
130        Set<byte[]> startKeys = new HashSet<>();
131        for (RegionInfo regionInfo : regionInfos) {
132          // each region should have a distinct start key
133          assertFalse(startKeys.contains(regionInfo.getStartKey()),
134            "Each region should have its own start key, "
135              + "demonstrating it is not a replica of any others on this host");
136          startKeys.add(regionInfo.getStartKey());
137        }
138      }
139    } else {
140      // Ensure all replicas are on the same server
141      assertEquals(1, serverToRegions.size(), "All regions should share one server");
142    }
143  }
144
145  static void validateRegionLocations(Map<TableName, Set<ServerName>> tableToServers,
146    TableName productTableName, boolean shouldBeBalanced) {
147    ServerName metaServer =
148      tableToServers.get(TableName.META_TABLE_NAME).stream().findFirst().orElseThrow();
149    ServerName quotaServer =
150      tableToServers.get(QuotaUtil.QUOTA_TABLE_NAME).stream().findFirst().orElseThrow();
151    Set<ServerName> productServers = tableToServers.get(productTableName);
152
153    if (shouldBeBalanced) {
154      for (ServerName server : productServers) {
155        assertNotEquals(server, metaServer,
156          "Meta table and product table should not share servers");
157        assertNotEquals(server, quotaServer,
158          "Quota table and product table should not share servers");
159      }
160      assertNotEquals(metaServer, quotaServer,
161        "The meta server and quotas server should be different");
162    } else {
163      for (ServerName server : productServers) {
164        assertEquals(server, metaServer, "Meta table and product table must share servers");
165        assertEquals(server, quotaServer, "Quota table and product table must share servers");
166      }
167      assertEquals(metaServer, quotaServer, "The meta server and quotas server must be the same");
168    }
169  }
170
171  static Map<TableName, Set<ServerName>> getTableToServers(Connection connection,
172    Set<TableName> tableNames) {
173    return tableNames.stream().collect(Collectors.toMap(t -> t, t -> {
174      try {
175        return connection.getRegionLocator(t).getAllRegionLocations().stream()
176          .map(HRegionLocation::getServerName).collect(Collectors.toSet());
177      } catch (IOException e) {
178        throw new RuntimeException(e);
179      }
180    }));
181  }
182
183  @FunctionalInterface
184  interface AssertionRunnable {
185    void run() throws AssertionError;
186  }
187
188  static void validateAssertionsWithRetries(HBaseTestingUtil testUtil, boolean runBalancerOnFailure,
189    AssertionRunnable assertion) {
190    validateAssertionsWithRetries(testUtil, runBalancerOnFailure, ImmutableSet.of(assertion));
191  }
192
193  static void validateAssertionsWithRetries(HBaseTestingUtil testUtil, boolean runBalancerOnFailure,
194    Set<AssertionRunnable> assertions) {
195    int maxAttempts = 50;
196    for (int i = 0; i < maxAttempts; i++) {
197      try {
198        for (AssertionRunnable assertion : assertions) {
199          assertion.run();
200        }
201      } catch (AssertionError e) {
202        if (i == maxAttempts - 1) {
203          throw e;
204        }
205        try {
206          LOG.warn("Failed to validate region locations. Will retry", e);
207          Thread.sleep(1000);
208          BalancerConditionalsTestUtil.printRegionLocations(testUtil.getConnection());
209          if (runBalancerOnFailure) {
210            testUtil.getAdmin().balance();
211          }
212          Thread.sleep(1000);
213        } catch (Exception ex) {
214          throw new RuntimeException(ex);
215        }
216      }
217    }
218  }
219
220}