001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import java.util.Optional;
021import java.util.concurrent.ThreadLocalRandom;
022import org.apache.yetus.audience.InterfaceAudience;
023
024@InterfaceAudience.Private
025class LocalityBasedCandidateGenerator extends CandidateGenerator {
026
027  @Override
028  BalanceAction generate(BalancerClusterState cluster) {
029    // iterate through regions until you find one that is not on ideal host
030    // start from a random point to avoid always balance the regions in front
031    if (cluster.numRegions > 0) {
032      int startIndex = ThreadLocalRandom.current().nextInt(cluster.numRegions);
033      for (int i = 0; i < cluster.numRegions; i++) {
034        int region = (startIndex + i) % cluster.numRegions;
035        int currentServer = cluster.regionIndexToServerIndex[region];
036        if (
037          currentServer != cluster.getOrComputeRegionsToMostLocalEntities(
038            BalancerClusterState.LocalityType.SERVER)[region]
039        ) {
040          Optional<BalanceAction> potential = tryMoveOrSwap(cluster, currentServer, region,
041            cluster.getOrComputeRegionsToMostLocalEntities(
042              BalancerClusterState.LocalityType.SERVER)[region]);
043          if (potential.isPresent()) {
044            return potential.get();
045          }
046        }
047      }
048    }
049    return BalanceAction.NULL_ACTION;
050  }
051
052  private Optional<BalanceAction> tryMoveOrSwap(BalancerClusterState cluster, int fromServer,
053    int fromRegion, int toServer) {
054    // Try move first. We know apriori fromRegion has the highest locality on toServer
055    if (cluster.serverHasTooFewRegions(toServer)) {
056      return Optional.of(getAction(fromServer, fromRegion, toServer, -1));
057    }
058    // Compare locality gain/loss from swapping fromRegion with regions on toServer
059    double fromRegionLocalityDelta = getWeightedLocality(cluster, fromRegion, toServer)
060      - getWeightedLocality(cluster, fromRegion, fromServer);
061    int toServertotalRegions = cluster.regionsPerServer[toServer].length;
062    if (toServertotalRegions > 0) {
063      int startIndex = ThreadLocalRandom.current().nextInt(toServertotalRegions);
064      for (int i = 0; i < toServertotalRegions; i++) {
065        int toRegionIndex = (startIndex + i) % toServertotalRegions;
066        int toRegion = cluster.regionsPerServer[toServer][toRegionIndex];
067        double toRegionLocalityDelta = getWeightedLocality(cluster, toRegion, fromServer)
068          - getWeightedLocality(cluster, toRegion, toServer);
069        // If locality would remain neutral or improve, attempt the swap
070        if (fromRegionLocalityDelta + toRegionLocalityDelta >= 0) {
071          return Optional.of(getAction(fromServer, fromRegion, toServer, toRegion));
072        }
073      }
074    }
075    return Optional.empty();
076  }
077
078  private double getWeightedLocality(BalancerClusterState cluster, int region, int server) {
079    return cluster.getOrComputeWeightedLocality(region, server,
080      BalancerClusterState.LocalityType.SERVER);
081  }
082}