001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023
024import java.io.IOException;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.stream.Collectors;
030import org.apache.hadoop.hbase.HBaseTestingUtil;
031import org.apache.hadoop.hbase.HRegionLocation;
032import org.apache.hadoop.hbase.ServerName;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.Admin;
035import org.apache.hadoop.hbase.client.Connection;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.client.TableDescriptor;
038import org.apache.hadoop.hbase.quotas.QuotaUtil;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
044
045public final class BalancerConditionalsTestUtil {
046
047  private static final Logger LOG = LoggerFactory.getLogger(BalancerConditionalsTestUtil.class);
048
049  private BalancerConditionalsTestUtil() {
050  }
051
052  static byte[][] generateSplits(int numRegions) {
053    byte[][] splitKeys = new byte[numRegions - 1][];
054    for (int i = 0; i < numRegions - 1; i++) {
055      splitKeys[i] =
056        Bytes.toBytes(String.format("%09d", (i + 1) * (Integer.MAX_VALUE / numRegions)));
057    }
058    return splitKeys;
059  }
060
061  static void printRegionLocations(Connection connection) throws IOException {
062    Admin admin = connection.getAdmin();
063
064    // Get all table names in the cluster
065    Set<TableName> tableNames = admin.listTableDescriptors(true).stream()
066      .map(TableDescriptor::getTableName).collect(Collectors.toSet());
067
068    // Group regions by server
069    Map<ServerName, Map<TableName, List<RegionInfo>>> serverToRegions =
070      admin.getClusterMetrics().getLiveServerMetrics().keySet().stream()
071        .collect(Collectors.toMap(server -> server, server -> {
072          try {
073            return listRegionsByTable(connection, server, tableNames);
074          } catch (IOException e) {
075            throw new RuntimeException(e);
076          }
077        }));
078
079    // Pretty print region locations
080    StringBuilder regionLocationOutput = new StringBuilder();
081    regionLocationOutput.append("Pretty printing region locations...\n");
082    serverToRegions.forEach((server, tableRegions) -> {
083      regionLocationOutput.append("Server: " + server.getServerName() + "\n");
084      tableRegions.forEach((table, regions) -> {
085        if (regions.isEmpty()) {
086          return;
087        }
088        regionLocationOutput.append("  Table: " + table.getNameAsString() + "\n");
089        regions.forEach(region -> regionLocationOutput
090          .append(String.format("    Region: %s, start: %s, end: %s, replica: %s\n",
091            region.getEncodedName(), Bytes.toString(region.getStartKey()),
092            Bytes.toString(region.getEndKey()), region.getReplicaId())));
093      });
094    });
095    LOG.info(regionLocationOutput.toString());
096  }
097
098  private static Map<TableName, List<RegionInfo>> listRegionsByTable(Connection connection,
099    ServerName server, Set<TableName> tableNames) throws IOException {
100    Admin admin = connection.getAdmin();
101
102    // Find regions for each table
103    return tableNames.stream().collect(Collectors.toMap(tableName -> tableName, tableName -> {
104      List<RegionInfo> allRegions = null;
105      try {
106        allRegions = admin.getRegions(server);
107      } catch (IOException e) {
108        throw new RuntimeException(e);
109      }
110      return allRegions.stream().filter(region -> region.getTable().equals(tableName))
111        .collect(Collectors.toList());
112    }));
113  }
114
115  static void validateReplicaDistribution(Connection connection, TableName tableName,
116    boolean shouldBeDistributed) {
117    Map<ServerName, List<RegionInfo>> serverToRegions = null;
118    try {
119      serverToRegions = connection.getRegionLocator(tableName).getAllRegionLocations().stream()
120        .collect(Collectors.groupingBy(location -> location.getServerName(),
121          Collectors.mapping(location -> location.getRegion(), Collectors.toList())));
122    } catch (IOException e) {
123      throw new RuntimeException(e);
124    }
125
126    if (shouldBeDistributed) {
127      // Ensure no server hosts more than one replica of any region
128      for (Map.Entry<ServerName, List<RegionInfo>> serverAndRegions : serverToRegions.entrySet()) {
129        List<RegionInfo> regionInfos = serverAndRegions.getValue();
130        Set<byte[]> startKeys = new HashSet<>();
131        for (RegionInfo regionInfo : regionInfos) {
132          // each region should have a distinct start key
133          assertFalse(
134            "Each region should have its own start key, "
135              + "demonstrating it is not a replica of any others on this host",
136            startKeys.contains(regionInfo.getStartKey()));
137          startKeys.add(regionInfo.getStartKey());
138        }
139      }
140    } else {
141      // Ensure all replicas are on the same server
142      assertEquals("All regions should share one server", 1, serverToRegions.size());
143    }
144  }
145
146  static void validateRegionLocations(Map<TableName, Set<ServerName>> tableToServers,
147    TableName productTableName, boolean shouldBeBalanced) {
148    ServerName metaServer =
149      tableToServers.get(TableName.META_TABLE_NAME).stream().findFirst().orElseThrow();
150    ServerName quotaServer =
151      tableToServers.get(QuotaUtil.QUOTA_TABLE_NAME).stream().findFirst().orElseThrow();
152    Set<ServerName> productServers = tableToServers.get(productTableName);
153
154    if (shouldBeBalanced) {
155      for (ServerName server : productServers) {
156        assertNotEquals("Meta table and product table should not share servers", server,
157          metaServer);
158        assertNotEquals("Quota table and product table should not share servers", server,
159          quotaServer);
160      }
161      assertNotEquals("The meta server and quotas server should be different", metaServer,
162        quotaServer);
163    } else {
164      for (ServerName server : productServers) {
165        assertEquals("Meta table and product table must share servers", server, metaServer);
166        assertEquals("Quota table and product table must share servers", server, quotaServer);
167      }
168      assertEquals("The meta server and quotas server must be the same", metaServer, quotaServer);
169    }
170  }
171
172  static Map<TableName, Set<ServerName>> getTableToServers(Connection connection,
173    Set<TableName> tableNames) {
174    return tableNames.stream().collect(Collectors.toMap(t -> t, t -> {
175      try {
176        return connection.getRegionLocator(t).getAllRegionLocations().stream()
177          .map(HRegionLocation::getServerName).collect(Collectors.toSet());
178      } catch (IOException e) {
179        throw new RuntimeException(e);
180      }
181    }));
182  }
183
184  @FunctionalInterface
185  interface AssertionRunnable {
186    void run() throws AssertionError;
187  }
188
189  static void validateAssertionsWithRetries(HBaseTestingUtil testUtil, boolean runBalancerOnFailure,
190    AssertionRunnable assertion) {
191    validateAssertionsWithRetries(testUtil, runBalancerOnFailure, ImmutableSet.of(assertion));
192  }
193
194  static void validateAssertionsWithRetries(HBaseTestingUtil testUtil, boolean runBalancerOnFailure,
195    Set<AssertionRunnable> assertions) {
196    int maxAttempts = 50;
197    for (int i = 0; i < maxAttempts; i++) {
198      try {
199        for (AssertionRunnable assertion : assertions) {
200          assertion.run();
201        }
202      } catch (AssertionError e) {
203        if (i == maxAttempts - 1) {
204          throw e;
205        }
206        try {
207          LOG.warn("Failed to validate region locations. Will retry", e);
208          Thread.sleep(1000);
209          BalancerConditionalsTestUtil.printRegionLocations(testUtil.getConnection());
210          if (runBalancerOnFailure) {
211            testUtil.getAdmin().balance();
212          }
213          Thread.sleep(1000);
214        } catch (Exception ex) {
215          throw new RuntimeException(ex);
216        }
217      }
218    }
219  }
220
221}