001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; 021import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR; 022import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 023import java.util.Iterator; 024import java.util.Map; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027import java.util.concurrent.ConcurrentNavigableMap; 028import java.util.concurrent.ConcurrentSkipListMap; 029import java.util.concurrent.ThreadLocalRandom; 030import java.util.function.IntSupplier; 031import org.apache.commons.lang3.builder.ToStringBuilder; 032import org.apache.commons.lang3.builder.ToStringStyle; 033import org.apache.hadoop.hbase.ChoreService; 034import org.apache.hadoop.hbase.HRegionLocation; 035import org.apache.hadoop.hbase.ScheduledChore; 036import org.apache.hadoop.hbase.Stoppable; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 043 044/** 045 * <p>CatalogReplicaLoadBalanceReplicaSimpleSelector implements a simple catalog replica load 046 * balancing algorithm. It maintains a stale location cache for each table. Whenever client looks 047 * up location, it first check if the row is the stale location cache. If yes, the location from 048 * catalog replica is stale, it will go to the primary region to look up update-to-date location; 049 * otherwise, it will randomly pick up a replica region for lookup. When clients receive 050 * RegionNotServedException from region servers, it will add these region locations to the stale 051 * location cache. The stale cache will be cleaned up periodically by a chore.</p> 052 * 053 * It follows a simple algorithm to choose a replica to go: 054 * 055 * <ol> 056 * <li>If there is no stale location entry for rows it looks up, it will randomly 057 * pick a replica region to do lookup. </li> 058 * <li>If the location from the replica region is stale, client gets RegionNotServedException 059 * from region server, in this case, it will create StaleLocationCacheEntry in 060 * CatalogReplicaLoadBalanceReplicaSimpleSelector.</li> 061 * <li>When client tries to do location lookup, it checks StaleLocationCache first for rows it 062 * tries to lookup, if entry exists, it will go with primary meta region to do lookup; 063 * otherwise, it will follow step 1.</li> 064 * <li>A chore will periodically run to clean up cache entries in the StaleLocationCache.</li> 065 * </ol> 066 */ 067class CatalogReplicaLoadBalanceSimpleSelector implements 068 CatalogReplicaLoadBalanceSelector, Stoppable { 069 private static final Logger LOG = 070 LoggerFactory.getLogger(CatalogReplicaLoadBalanceSimpleSelector.class); 071 private final long STALE_CACHE_TIMEOUT_IN_MILLISECONDS = 3000; // 3 seconds 072 private final int STALE_CACHE_CLEAN_CHORE_INTERVAL_IN_MILLISECONDS = 1500; // 1.5 seconds 073 private final int REFRESH_REPLICA_COUNT_CHORE_INTERVAL_IN_MILLISECONDS = 60000; // 1 minute 074 075 /** 076 * StaleLocationCacheEntry is the entry when a stale location is reported by an client. 077 */ 078 private static final class StaleLocationCacheEntry { 079 // timestamp in milliseconds 080 private final long timestamp; 081 082 private final byte[] endKey; 083 084 StaleLocationCacheEntry(final byte[] endKey) { 085 this.endKey = endKey; 086 timestamp = EnvironmentEdgeManager.currentTime(); 087 } 088 089 public byte[] getEndKey() { 090 return this.endKey; 091 } 092 093 public long getTimestamp() { 094 return this.timestamp; 095 } 096 097 @Override 098 public String toString() { 099 return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) 100 .append("endKey", endKey) 101 .append("timestamp", timestamp) 102 .toString(); 103 } 104 } 105 106 private final ConcurrentMap<TableName, ConcurrentNavigableMap<byte[], StaleLocationCacheEntry>> 107 staleCache = new ConcurrentHashMap<>(); 108 private volatile int numOfReplicas; 109 private final ChoreService choreService; 110 private final TableName tableName; 111 private final IntSupplier getNumOfReplicas; 112 private volatile boolean isStopped = false; 113 private final static int UNINITIALIZED_NUM_OF_REPLICAS = -1; 114 115 CatalogReplicaLoadBalanceSimpleSelector(TableName tableName, ChoreService choreService, 116 IntSupplier getNumOfReplicas) { 117 this.choreService = choreService; 118 this.tableName = tableName; 119 this.getNumOfReplicas = getNumOfReplicas; 120 121 // This numOfReplicas is going to be lazy initialized. 122 this.numOfReplicas = UNINITIALIZED_NUM_OF_REPLICAS; 123 // Start chores 124 this.choreService.scheduleChore(getCacheCleanupChore(this)); 125 this.choreService.scheduleChore(getRefreshReplicaCountChore(this)); 126 } 127 128 /** 129 * When a client runs into RegionNotServingException, it will call this method to 130 * update Selector's internal state. 131 * @param loc the location which causes exception. 132 */ 133 public void onError(HRegionLocation loc) { 134 ConcurrentNavigableMap<byte[], StaleLocationCacheEntry> tableCache = 135 computeIfAbsent(staleCache, loc.getRegion().getTable(), 136 () -> new ConcurrentSkipListMap<>(BYTES_COMPARATOR)); 137 byte[] startKey = loc.getRegion().getStartKey(); 138 tableCache.putIfAbsent(startKey, 139 new StaleLocationCacheEntry(loc.getRegion().getEndKey())); 140 LOG.debug("Add entry to stale cache for table {} with startKey {}, {}", 141 loc.getRegion().getTable(), startKey, loc.getRegion().getEndKey()); 142 } 143 144 /** 145 * Select an random replica id. In case there is no replica region configured, return 146 * the primary replica id. 147 * @return Replica id 148 */ 149 private int getRandomReplicaId() { 150 int cachedNumOfReplicas = this.numOfReplicas; 151 if (cachedNumOfReplicas == UNINITIALIZED_NUM_OF_REPLICAS) { 152 cachedNumOfReplicas = refreshCatalogReplicaCount(); 153 this.numOfReplicas = cachedNumOfReplicas; 154 } 155 // In case of no replica configured, return the primary region id. 156 if (cachedNumOfReplicas <= 1) { 157 return RegionInfo.DEFAULT_REPLICA_ID; 158 } 159 return 1 + ThreadLocalRandom.current().nextInt(cachedNumOfReplicas - 1); 160 } 161 162 /** 163 * When it looks up a location, it will call this method to find a replica region to go. 164 * For a normal case, > 99% of region locations from catalog/meta replica will be up to date. 165 * In extreme cases such as region server crashes, it will depends on how fast replication 166 * catches up. 167 * 168 * @param tablename table name it looks up 169 * @param row key it looks up. 170 * @param locateType locateType, Only BEFORE and CURRENT will be passed in. 171 * @return catalog replica id 172 */ 173 public int select(final TableName tablename, final byte[] row, 174 final RegionLocateType locateType) { 175 Preconditions.checkArgument(locateType == RegionLocateType.BEFORE || 176 locateType == RegionLocateType.CURRENT, 177 "Expected type BEFORE or CURRENT but got: %s", locateType); 178 179 ConcurrentNavigableMap<byte[], StaleLocationCacheEntry> tableCache = staleCache.get(tablename); 180 181 // If there is no entry in StaleCache, select a random replica id. 182 if (tableCache == null) { 183 return getRandomReplicaId(); 184 } 185 186 Map.Entry<byte[], StaleLocationCacheEntry> entry; 187 boolean isEmptyStopRow = isEmptyStopRow(row); 188 // Only BEFORE and CURRENT are passed in. 189 if (locateType == RegionLocateType.BEFORE) { 190 entry = isEmptyStopRow ? tableCache.lastEntry() : tableCache.lowerEntry(row); 191 } else { 192 entry = tableCache.floorEntry(row); 193 } 194 195 // It is not in the stale cache, return a random replica id. 196 if (entry == null) { 197 return getRandomReplicaId(); 198 } 199 200 // The entry here is a possible match for the location. Check if the entry times out first as 201 // long comparing is faster than comparing byte arrays(in most cases). It could remove 202 // stale entries faster. If the possible match entry does not time out, it will check if 203 // the entry is a match for the row passed in and select the replica id accordingly. 204 if ((EnvironmentEdgeManager.currentTime() - entry.getValue().getTimestamp()) >= 205 STALE_CACHE_TIMEOUT_IN_MILLISECONDS) { 206 LOG.debug("Entry for table {} with startKey {}, {} times out", tablename, entry.getKey(), 207 entry); 208 tableCache.remove(entry.getKey()); 209 return getRandomReplicaId(); 210 } 211 212 byte[] endKey = entry.getValue().getEndKey(); 213 214 // The following logic is borrowed from AsyncNonMetaRegionLocator. 215 if (isEmptyStopRow(endKey)) { 216 LOG.debug("Lookup {} goes to primary region", row); 217 return RegionInfo.DEFAULT_REPLICA_ID; 218 } 219 220 if (locateType == RegionLocateType.BEFORE) { 221 if (!isEmptyStopRow && Bytes.compareTo(endKey, row) >= 0) { 222 LOG.debug("Lookup {} goes to primary meta", row); 223 return RegionInfo.DEFAULT_REPLICA_ID; 224 } 225 } else { 226 if (Bytes.compareTo(row, endKey) < 0) { 227 LOG.debug("Lookup {} goes to primary meta", row); 228 return RegionInfo.DEFAULT_REPLICA_ID; 229 } 230 } 231 232 // Not in stale cache, return a random replica id. 233 return getRandomReplicaId(); 234 } 235 236 // This class implements the Stoppable interface as chores needs a Stopable object, there is 237 // no-op on this Stoppable object currently. 238 @Override 239 public void stop(String why) { 240 isStopped = true; 241 } 242 243 @Override 244 public boolean isStopped() { 245 return isStopped; 246 } 247 248 private void cleanupReplicaReplicaStaleCache() { 249 long curTimeInMills = EnvironmentEdgeManager.currentTime(); 250 for (ConcurrentNavigableMap<byte[], StaleLocationCacheEntry> tableCache : staleCache.values()) { 251 Iterator<Map.Entry<byte[], StaleLocationCacheEntry>> it = 252 tableCache.entrySet().iterator(); 253 while (it.hasNext()) { 254 Map.Entry<byte[], StaleLocationCacheEntry> entry = it.next(); 255 if (curTimeInMills - entry.getValue().getTimestamp() >= 256 STALE_CACHE_TIMEOUT_IN_MILLISECONDS) { 257 LOG.debug("clean entry {}, {} from stale cache", entry.getKey(), entry.getValue()); 258 it.remove(); 259 } 260 } 261 } 262 } 263 264 private int refreshCatalogReplicaCount() { 265 int newNumOfReplicas = this.getNumOfReplicas.getAsInt(); 266 LOG.debug("Refreshed replica count {}", newNumOfReplicas); 267 if (newNumOfReplicas == 1) { 268 LOG.warn("Table {}'s region replica count is 1, maybe a misconfiguration or failure to " 269 + "fetch the replica count", tableName); 270 } 271 int cachedNumOfReplicas = this.numOfReplicas; 272 273 // If the returned number of replicas is 1, it is mostly caused by failure to fetch the 274 // replica count. Do not update the numOfReplicas in this case. 275 if ((cachedNumOfReplicas == UNINITIALIZED_NUM_OF_REPLICAS) || 276 ((cachedNumOfReplicas != newNumOfReplicas) && (newNumOfReplicas != 1))) { 277 this.numOfReplicas = newNumOfReplicas; 278 } 279 return newNumOfReplicas; 280 } 281 282 private ScheduledChore getCacheCleanupChore( 283 final CatalogReplicaLoadBalanceSimpleSelector selector) { 284 return new ScheduledChore("CleanupCatalogReplicaStaleCache", this, 285 STALE_CACHE_CLEAN_CHORE_INTERVAL_IN_MILLISECONDS) { 286 @Override 287 protected void chore() { 288 selector.cleanupReplicaReplicaStaleCache(); 289 } 290 }; 291 } 292 293 private ScheduledChore getRefreshReplicaCountChore( 294 final CatalogReplicaLoadBalanceSimpleSelector selector) { 295 return new ScheduledChore("RefreshReplicaCountChore", this, 296 REFRESH_REPLICA_COUNT_CHORE_INTERVAL_IN_MILLISECONDS) { 297 @Override 298 protected void chore() { 299 selector.refreshCatalogReplicaCount(); 300 } 301 }; 302 } 303}