001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.balancer; 019 020import java.util.Collection; 021import org.apache.hadoop.conf.Configuration; 022import org.apache.yetus.audience.InterfaceAudience; 023 024/** 025 * Lightweight cost function that mirrors TableSkewCostFunction but aggregates storefile sizes (in 026 * MB) per table using the CostFromRegionLoadFunction framework. For each table, it computes a 027 * per-server aggregated storefile size by summing the average storefile size for each region (if 028 * there are multiple load metrics, it averages them). The imbalance cost (as computed by 029 * DoubleArrayCost) is then used to drive the balancer to reduce differences between servers. 030 */ 031@InterfaceAudience.Private 032public class StoreFileTableSkewCostFunction extends CostFromRegionLoadFunction { 033 034 private static final String STOREFILE_TABLE_SKEW_COST_KEY = 035 "hbase.master.balancer.stochastic.storefileTableSkewCost"; 036 private static final float DEFAULT_STOREFILE_TABLE_SKEW_COST = 35; 037 038 // One DoubleArrayCost instance per table. 039 private DoubleArrayCost[] costsPerTable; 040 041 public StoreFileTableSkewCostFunction(Configuration conf) { 042 this.setMultiplier( 043 conf.getFloat(STOREFILE_TABLE_SKEW_COST_KEY, DEFAULT_STOREFILE_TABLE_SKEW_COST)); 044 } 045 046 @Override 047 public void prepare(BalancerClusterState cluster) { 048 // First, set the cluster state and allocate one DoubleArrayCost per table. 049 this.cluster = cluster; 050 costsPerTable = new DoubleArrayCost[cluster.numTables]; 051 for (int tableIdx = 0; tableIdx < cluster.numTables; tableIdx++) { 052 costsPerTable[tableIdx] = new DoubleArrayCost(); 053 costsPerTable[tableIdx].prepare(cluster.numServers); 054 final int tableIndex = tableIdx; 055 costsPerTable[tableIdx].applyCostsChange(costs -> { 056 // For each server, compute the aggregated storefile size for this table. 057 for (int server = 0; server < cluster.numServers; server++) { 058 double totalStorefileMB = 0; 059 // Sum over all regions on this server that belong to the given table. 060 for (int region : cluster.regionsPerServer[server]) { 061 if (cluster.regionIndexToTableIndex[region] == tableIndex) { 062 Collection<BalancerRegionLoad> loads = cluster.getRegionLoads()[region]; 063 double regionCost = 0; 064 if (loads != null && !loads.isEmpty()) { 065 // Average the storefile sizes if there are multiple measurements. 066 for (BalancerRegionLoad rl : loads) { 067 regionCost += getCostFromRl(rl); 068 } 069 regionCost /= loads.size(); 070 } 071 totalStorefileMB += regionCost; 072 } 073 } 074 costs[server] = totalStorefileMB; 075 } 076 }); 077 } 078 } 079 080 @Override 081 protected void regionMoved(int region, int oldServer, int newServer) { 082 // Determine the affected table. 083 int tableIdx = cluster.regionIndexToTableIndex[region]; 084 costsPerTable[tableIdx].applyCostsChange(costs -> { 085 // Recompute for the old server if applicable. 086 updateStoreFilePerServerPerTableCosts(oldServer, tableIdx, costs); 087 // Recompute for the new server. 088 updateStoreFilePerServerPerTableCosts(newServer, tableIdx, costs); 089 }); 090 } 091 092 private void updateStoreFilePerServerPerTableCosts(int newServer, int tableIdx, double[] costs) { 093 if (newServer >= 0) { 094 double totalStorefileMB = 0; 095 for (int r : cluster.regionsPerServer[newServer]) { 096 if (cluster.regionIndexToTableIndex[r] == tableIdx) { 097 Collection<BalancerRegionLoad> loads = cluster.getRegionLoads()[r]; 098 double regionCost = 0; 099 if (loads != null && !loads.isEmpty()) { 100 for (BalancerRegionLoad rl : loads) { 101 regionCost += getCostFromRl(rl); 102 } 103 regionCost /= loads.size(); 104 } 105 totalStorefileMB += regionCost; 106 } 107 } 108 costs[newServer] = totalStorefileMB; 109 } 110 } 111 112 @Override 113 protected double cost() { 114 double totalCost = 0; 115 // Sum the imbalance cost over all tables. 116 for (DoubleArrayCost dac : costsPerTable) { 117 totalCost += dac.cost(); 118 } 119 return totalCost; 120 } 121 122 @Override 123 protected double getCostFromRl(BalancerRegionLoad rl) { 124 // Use storefile size in MB as the metric. 125 return rl.getStorefileSizeMB(); 126 } 127}