001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import java.util.Collection;
021import org.apache.hadoop.conf.Configuration;
022import org.apache.yetus.audience.InterfaceAudience;
023
024/**
025 * Lightweight cost function that mirrors TableSkewCostFunction but aggregates storefile sizes (in
026 * MB) per table using the CostFromRegionLoadFunction framework. For each table, it computes a
027 * per-server aggregated storefile size by summing the average storefile size for each region (if
028 * there are multiple load metrics, it averages them). The imbalance cost (as computed by
029 * DoubleArrayCost) is then used to drive the balancer to reduce differences between servers.
030 */
031@InterfaceAudience.Private
032public class StoreFileTableSkewCostFunction extends CostFromRegionLoadFunction {
033
034  private static final String STOREFILE_TABLE_SKEW_COST_KEY =
035    "hbase.master.balancer.stochastic.storefileTableSkewCost";
036  private static final float DEFAULT_STOREFILE_TABLE_SKEW_COST = 35;
037
038  // One DoubleArrayCost instance per table.
039  private DoubleArrayCost[] costsPerTable;
040
041  public StoreFileTableSkewCostFunction(Configuration conf) {
042    this.setMultiplier(
043      conf.getFloat(STOREFILE_TABLE_SKEW_COST_KEY, DEFAULT_STOREFILE_TABLE_SKEW_COST));
044  }
045
046  @Override
047  public void prepare(BalancerClusterState cluster) {
048    // First, set the cluster state and allocate one DoubleArrayCost per table.
049    this.cluster = cluster;
050    costsPerTable = new DoubleArrayCost[cluster.numTables];
051    for (int tableIdx = 0; tableIdx < cluster.numTables; tableIdx++) {
052      costsPerTable[tableIdx] = new DoubleArrayCost();
053      costsPerTable[tableIdx].prepare(cluster.numServers);
054      final int tableIndex = tableIdx;
055      costsPerTable[tableIdx].applyCostsChange(costs -> {
056        // For each server, compute the aggregated storefile size for this table.
057        for (int server = 0; server < cluster.numServers; server++) {
058          double totalStorefileMB = 0;
059          // Sum over all regions on this server that belong to the given table.
060          for (int region : cluster.regionsPerServer[server]) {
061            if (cluster.regionIndexToTableIndex[region] == tableIndex) {
062              Collection<BalancerRegionLoad> loads = cluster.getRegionLoads()[region];
063              double regionCost = 0;
064              if (loads != null && !loads.isEmpty()) {
065                // Average the storefile sizes if there are multiple measurements.
066                for (BalancerRegionLoad rl : loads) {
067                  regionCost += getCostFromRl(rl);
068                }
069                regionCost /= loads.size();
070              }
071              totalStorefileMB += regionCost;
072            }
073          }
074          costs[server] = totalStorefileMB;
075        }
076      });
077    }
078  }
079
080  @Override
081  protected void regionMoved(int region, int oldServer, int newServer) {
082    // Determine the affected table.
083    int tableIdx = cluster.regionIndexToTableIndex[region];
084    costsPerTable[tableIdx].applyCostsChange(costs -> {
085      // Recompute for the old server if applicable.
086      updateStoreFilePerServerPerTableCosts(oldServer, tableIdx, costs);
087      // Recompute for the new server.
088      updateStoreFilePerServerPerTableCosts(newServer, tableIdx, costs);
089    });
090  }
091
092  private void updateStoreFilePerServerPerTableCosts(int newServer, int tableIdx, double[] costs) {
093    if (newServer >= 0) {
094      double totalStorefileMB = 0;
095      for (int r : cluster.regionsPerServer[newServer]) {
096        if (cluster.regionIndexToTableIndex[r] == tableIdx) {
097          Collection<BalancerRegionLoad> loads = cluster.getRegionLoads()[r];
098          double regionCost = 0;
099          if (loads != null && !loads.isEmpty()) {
100            for (BalancerRegionLoad rl : loads) {
101              regionCost += getCostFromRl(rl);
102            }
103            regionCost /= loads.size();
104          }
105          totalStorefileMB += regionCost;
106        }
107      }
108      costs[newServer] = totalStorefileMB;
109    }
110  }
111
112  @Override
113  protected double cost() {
114    double totalCost = 0;
115    // Sum the imbalance cost over all tables.
116    for (DoubleArrayCost dac : costsPerTable) {
117      totalCost += dac.cost();
118    }
119    return totalCost;
120  }
121
122  @Override
123  protected double getCostFromRl(BalancerRegionLoad rl) {
124    // Use storefile size in MB as the metric.
125    return rl.getStorefileSizeMB();
126  }
127}