001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; 021import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR; 022import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 023 024import java.util.Iterator; 025import java.util.Map; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.ConcurrentMap; 028import java.util.concurrent.ConcurrentNavigableMap; 029import java.util.concurrent.ConcurrentSkipListMap; 030import java.util.concurrent.ThreadLocalRandom; 031import java.util.function.IntSupplier; 032import org.apache.commons.lang3.builder.ToStringBuilder; 033import org.apache.commons.lang3.builder.ToStringStyle; 034import org.apache.hadoop.hbase.HRegionLocation; 035import org.apache.hadoop.hbase.ScheduledChore; 036import org.apache.hadoop.hbase.Stoppable; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 044 045/** 046 * <p> 047 * CatalogReplicaLoadBalanceReplicaSimpleSelector implements a simple catalog replica load balancing 048 * algorithm. It maintains a stale location cache for each table. Whenever client looks up location, 049 * it first check if the row is the stale location cache. If yes, the location from catalog replica 050 * is stale, it will go to the primary region to look up update-to-date location; otherwise, it will 051 * randomly pick up a replica region or primary region for lookup. When clients receive 052 * RegionNotServedException from region servers, it will add these region locations to the stale 053 * location cache. The stale cache will be cleaned up periodically by a chore. 054 * </p> 055 * It follows a simple algorithm to choose a meta replica region (including primary meta) to go: 056 * <ol> 057 * <li>If there is no stale location entry for rows it looks up, it will randomly pick a meta 058 * replica region (including primary meta) to do lookup.</li> 059 * <li>If the location from the replica region is stale, client gets RegionNotServedException from 060 * region server, in this case, it will create StaleLocationCacheEntry in 061 * CatalogReplicaLoadBalanceReplicaSimpleSelector.</li> 062 * <li>When client tries to do location lookup, it checks StaleLocationCache first for rows it tries 063 * to lookup, if entry exists, it will go with primary meta region to do lookup; otherwise, it will 064 * follow step 1.</li> 065 * <li>A chore will periodically run to clean up cache entries in the StaleLocationCache.</li> 066 * </ol> 067 */ 068class CatalogReplicaLoadBalanceSimpleSelector 069 implements CatalogReplicaLoadBalanceSelector, Stoppable { 070 private static final Logger LOG = 071 LoggerFactory.getLogger(CatalogReplicaLoadBalanceSimpleSelector.class); 072 private final long STALE_CACHE_TIMEOUT_IN_MILLISECONDS = 3000; // 3 seconds 073 private final int STALE_CACHE_CLEAN_CHORE_INTERVAL_IN_MILLISECONDS = 1500; // 1.5 seconds 074 private final int REFRESH_REPLICA_COUNT_CHORE_INTERVAL_IN_MILLISECONDS = 60000; // 1 minute 075 076 /** 077 * StaleLocationCacheEntry is the entry when a stale location is reported by an client. 078 */ 079 private static final class StaleLocationCacheEntry { 080 // timestamp in milliseconds 081 private final long timestamp; 082 083 private final byte[] endKey; 084 085 StaleLocationCacheEntry(final byte[] endKey) { 086 this.endKey = endKey; 087 timestamp = EnvironmentEdgeManager.currentTime(); 088 } 089 090 public byte[] getEndKey() { 091 return this.endKey; 092 } 093 094 public long getTimestamp() { 095 return this.timestamp; 096 } 097 098 @Override 099 public String toString() { 100 return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("endKey", endKey) 101 .append("timestamp", timestamp).toString(); 102 } 103 } 104 105 private final ConcurrentMap<TableName, 106 ConcurrentNavigableMap<byte[], StaleLocationCacheEntry>> staleCache = new ConcurrentHashMap<>(); 107 private volatile int numOfReplicas; 108 private final AsyncConnectionImpl conn; 109 private final TableName tableName; 110 private final IntSupplier getNumOfReplicas; 111 private volatile boolean isStopped = false; 112 113 CatalogReplicaLoadBalanceSimpleSelector(TableName tableName, AsyncConnectionImpl conn, 114 IntSupplier getNumOfReplicas) { 115 this.conn = conn; 116 this.tableName = tableName; 117 this.getNumOfReplicas = getNumOfReplicas; 118 119 // This numOfReplicas is going to be lazy initialized. 120 this.numOfReplicas = CatalogReplicaLoadBalanceSelector.UNINITIALIZED_NUM_OF_REPLICAS; 121 // Start chores 122 this.conn.getChoreService().scheduleChore(getCacheCleanupChore(this)); 123 this.conn.getChoreService().scheduleChore(getRefreshReplicaCountChore(this)); 124 } 125 126 /** 127 * When a client runs into RegionNotServingException, it will call this method to update 128 * Selector's internal state. 129 * @param loc the location which causes exception. 130 */ 131 @Override 132 public void onError(HRegionLocation loc) { 133 ConcurrentNavigableMap<byte[], StaleLocationCacheEntry> tableCache = computeIfAbsent(staleCache, 134 loc.getRegion().getTable(), () -> new ConcurrentSkipListMap<>(BYTES_COMPARATOR)); 135 byte[] startKey = loc.getRegion().getStartKey(); 136 tableCache.putIfAbsent(startKey, new StaleLocationCacheEntry(loc.getRegion().getEndKey())); 137 LOG.debug("Add entry to stale cache for table {} with startKey {}, {}", 138 loc.getRegion().getTable(), startKey, loc.getRegion().getEndKey()); 139 } 140 141 /** 142 * Select an random replica id (including the primary replica id). In case there is no replica 143 * region configured, return the primary replica id. 144 * @return Replica id 145 */ 146 private int getRandomReplicaId() { 147 int cachedNumOfReplicas = this.numOfReplicas; 148 if (cachedNumOfReplicas == CatalogReplicaLoadBalanceSelector.UNINITIALIZED_NUM_OF_REPLICAS) { 149 cachedNumOfReplicas = refreshCatalogReplicaCount(); 150 this.numOfReplicas = cachedNumOfReplicas; 151 } 152 // In case of no replica configured, return the primary region id. 153 if (cachedNumOfReplicas <= 1) { 154 return RegionInfo.DEFAULT_REPLICA_ID; 155 } 156 return ThreadLocalRandom.current().nextInt(cachedNumOfReplicas); 157 } 158 159 /** 160 * When it looks up a location, it will call this method to find a replica region to go. For a 161 * normal case, > 99% of region locations from catalog/meta replica will be up to date. In extreme 162 * cases such as region server crashes, it will depends on how fast replication catches up. 163 * @param tableName table name it looks up 164 * @param row key it looks up. 165 * @param locateType locateType, Only BEFORE and CURRENT will be passed in. 166 * @return catalog replica id 167 */ 168 @Override 169 public int select(final TableName tableName, final byte[] row, 170 final RegionLocateType locateType) { 171 Preconditions.checkArgument( 172 locateType == RegionLocateType.BEFORE || locateType == RegionLocateType.CURRENT, 173 "Expected type BEFORE or CURRENT but got: %s", locateType); 174 175 ConcurrentNavigableMap<byte[], StaleLocationCacheEntry> tableCache = staleCache.get(tableName); 176 177 // If there is no entry in StaleCache, select a random replica id. 178 if (tableCache == null) { 179 return getRandomReplicaId(); 180 } 181 182 Map.Entry<byte[], StaleLocationCacheEntry> entry; 183 boolean isEmptyStopRow = isEmptyStopRow(row); 184 // Only BEFORE and CURRENT are passed in. 185 if (locateType == RegionLocateType.BEFORE) { 186 entry = isEmptyStopRow ? tableCache.lastEntry() : tableCache.lowerEntry(row); 187 } else { 188 entry = tableCache.floorEntry(row); 189 } 190 191 // It is not in the stale cache, return a random replica id. 192 if (entry == null) { 193 return getRandomReplicaId(); 194 } 195 196 // The entry here is a possible match for the location. Check if the entry times out first as 197 // long comparing is faster than comparing byte arrays(in most cases). It could remove 198 // stale entries faster. If the possible match entry does not time out, it will check if 199 // the entry is a match for the row passed in and select the replica id accordingly. 200 if ( 201 (EnvironmentEdgeManager.currentTime() - entry.getValue().getTimestamp()) 202 >= STALE_CACHE_TIMEOUT_IN_MILLISECONDS 203 ) { 204 LOG.debug("Entry for table {} with startKey {}, {} times out", tableName, entry.getKey(), 205 entry); 206 tableCache.remove(entry.getKey()); 207 return getRandomReplicaId(); 208 } 209 210 byte[] endKey = entry.getValue().getEndKey(); 211 212 // The following logic is borrowed from AsyncNonMetaRegionLocator. 213 if (isEmptyStopRow(endKey)) { 214 LOG.debug("Lookup {} goes to primary region", row); 215 return RegionInfo.DEFAULT_REPLICA_ID; 216 } 217 218 if (locateType == RegionLocateType.BEFORE) { 219 if (!isEmptyStopRow && Bytes.compareTo(endKey, row) >= 0) { 220 LOG.debug("Lookup {} goes to primary meta", row); 221 return RegionInfo.DEFAULT_REPLICA_ID; 222 } 223 } else { 224 if (Bytes.compareTo(row, endKey) < 0) { 225 LOG.debug("Lookup {} goes to primary meta", row); 226 return RegionInfo.DEFAULT_REPLICA_ID; 227 } 228 } 229 230 // Not in stale cache, return a random replica id. 231 return getRandomReplicaId(); 232 } 233 234 // This class implements the Stoppable interface as chores needs a Stopable object, there is 235 // no-op on this Stoppable object currently. 236 @Override 237 public void stop(String why) { 238 isStopped = true; 239 } 240 241 @Override 242 public boolean isStopped() { 243 return isStopped; 244 } 245 246 private void cleanupReplicaReplicaStaleCache() { 247 long curTimeInMills = EnvironmentEdgeManager.currentTime(); 248 for (ConcurrentNavigableMap<byte[], StaleLocationCacheEntry> tableCache : staleCache.values()) { 249 Iterator<Map.Entry<byte[], StaleLocationCacheEntry>> it = tableCache.entrySet().iterator(); 250 while (it.hasNext()) { 251 Map.Entry<byte[], StaleLocationCacheEntry> entry = it.next(); 252 if ( 253 curTimeInMills - entry.getValue().getTimestamp() >= STALE_CACHE_TIMEOUT_IN_MILLISECONDS 254 ) { 255 LOG.debug("clean entry {}, {} from stale cache", entry.getKey(), entry.getValue()); 256 it.remove(); 257 } 258 } 259 } 260 } 261 262 private int refreshCatalogReplicaCount() { 263 int newNumOfReplicas = this.getNumOfReplicas.getAsInt(); 264 LOG.debug("Refreshed replica count {}", newNumOfReplicas); 265 // If the returned number of replicas is -1, it is caused by failure to fetch the 266 // replica count. Do not update the numOfReplicas in this case. 267 if (newNumOfReplicas == CatalogReplicaLoadBalanceSelector.UNINITIALIZED_NUM_OF_REPLICAS) { 268 LOG.error("Failed to fetch Table {}'s region replica count", tableName); 269 return this.numOfReplicas; 270 } 271 272 int cachedNumOfReplicas = this.numOfReplicas; 273 if ( 274 (cachedNumOfReplicas == UNINITIALIZED_NUM_OF_REPLICAS) 275 || (cachedNumOfReplicas != newNumOfReplicas) 276 ) { 277 this.numOfReplicas = newNumOfReplicas; 278 } 279 return newNumOfReplicas; 280 } 281 282 private ScheduledChore 283 getCacheCleanupChore(final CatalogReplicaLoadBalanceSimpleSelector selector) { 284 return new ScheduledChore("CleanupCatalogReplicaStaleCache", this, 285 STALE_CACHE_CLEAN_CHORE_INTERVAL_IN_MILLISECONDS) { 286 @Override 287 protected void chore() { 288 selector.cleanupReplicaReplicaStaleCache(); 289 } 290 }; 291 } 292 293 private ScheduledChore 294 getRefreshReplicaCountChore(final CatalogReplicaLoadBalanceSimpleSelector selector) { 295 return new ScheduledChore("RefreshReplicaCountChore", this, 296 REFRESH_REPLICA_COUNT_CHORE_INTERVAL_IN_MILLISECONDS) { 297 @Override 298 protected void chore() { 299 selector.refreshCatalogReplicaCount(); 300 } 301 }; 302 } 303}