001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
021import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
022import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
023
024import java.util.Iterator;
025import java.util.Map;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.ConcurrentMap;
028import java.util.concurrent.ConcurrentNavigableMap;
029import java.util.concurrent.ConcurrentSkipListMap;
030import java.util.concurrent.ThreadLocalRandom;
031import java.util.function.IntSupplier;
032import org.apache.commons.lang3.builder.ToStringBuilder;
033import org.apache.commons.lang3.builder.ToStringStyle;
034import org.apache.hadoop.hbase.HRegionLocation;
035import org.apache.hadoop.hbase.ScheduledChore;
036import org.apache.hadoop.hbase.Stoppable;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
044
045/**
046 * <p>
047 * CatalogReplicaLoadBalanceReplicaSimpleSelector implements a simple catalog replica load balancing
048 * algorithm. It maintains a stale location cache for each table. Whenever client looks up location,
049 * it first check if the row is the stale location cache. If yes, the location from catalog replica
050 * is stale, it will go to the primary region to look up update-to-date location; otherwise, it will
051 * randomly pick up a replica region or primary region for lookup. When clients receive
052 * RegionNotServedException from region servers, it will add these region locations to the stale
053 * location cache. The stale cache will be cleaned up periodically by a chore.
054 * </p>
055 * It follows a simple algorithm to choose a meta replica region (including primary meta) to go:
056 * <ol>
057 * <li>If there is no stale location entry for rows it looks up, it will randomly pick a meta
058 * replica region (including primary meta) to do lookup.</li>
059 * <li>If the location from the replica region is stale, client gets RegionNotServedException from
060 * region server, in this case, it will create StaleLocationCacheEntry in
061 * CatalogReplicaLoadBalanceReplicaSimpleSelector.</li>
062 * <li>When client tries to do location lookup, it checks StaleLocationCache first for rows it tries
063 * to lookup, if entry exists, it will go with primary meta region to do lookup; otherwise, it will
064 * follow step 1.</li>
065 * <li>A chore will periodically run to clean up cache entries in the StaleLocationCache.</li>
066 * </ol>
067 */
068class CatalogReplicaLoadBalanceSimpleSelector
069  implements CatalogReplicaLoadBalanceSelector, Stoppable {
070  private static final Logger LOG =
071    LoggerFactory.getLogger(CatalogReplicaLoadBalanceSimpleSelector.class);
072  private final long STALE_CACHE_TIMEOUT_IN_MILLISECONDS = 3000; // 3 seconds
073  private final int STALE_CACHE_CLEAN_CHORE_INTERVAL_IN_MILLISECONDS = 1500; // 1.5 seconds
074  private final int REFRESH_REPLICA_COUNT_CHORE_INTERVAL_IN_MILLISECONDS = 60000; // 1 minute
075
076  /**
077   * StaleLocationCacheEntry is the entry when a stale location is reported by an client.
078   */
079  private static final class StaleLocationCacheEntry {
080    // timestamp in milliseconds
081    private final long timestamp;
082
083    private final byte[] endKey;
084
085    StaleLocationCacheEntry(final byte[] endKey) {
086      this.endKey = endKey;
087      timestamp = EnvironmentEdgeManager.currentTime();
088    }
089
090    public byte[] getEndKey() {
091      return this.endKey;
092    }
093
094    public long getTimestamp() {
095      return this.timestamp;
096    }
097
098    @Override
099    public String toString() {
100      return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("endKey", endKey)
101        .append("timestamp", timestamp).toString();
102    }
103  }
104
105  private final ConcurrentMap<TableName,
106    ConcurrentNavigableMap<byte[], StaleLocationCacheEntry>> staleCache = new ConcurrentHashMap<>();
107  private volatile int numOfReplicas;
108  private final AsyncConnectionImpl conn;
109  private final TableName tableName;
110  private final IntSupplier getNumOfReplicas;
111  private volatile boolean isStopped = false;
112
113  CatalogReplicaLoadBalanceSimpleSelector(TableName tableName, AsyncConnectionImpl conn,
114    IntSupplier getNumOfReplicas) {
115    this.conn = conn;
116    this.tableName = tableName;
117    this.getNumOfReplicas = getNumOfReplicas;
118
119    // This numOfReplicas is going to be lazy initialized.
120    this.numOfReplicas = CatalogReplicaLoadBalanceSelector.UNINITIALIZED_NUM_OF_REPLICAS;
121    // Start chores
122    this.conn.getChoreService().scheduleChore(getCacheCleanupChore(this));
123    this.conn.getChoreService().scheduleChore(getRefreshReplicaCountChore(this));
124  }
125
126  /**
127   * When a client runs into RegionNotServingException, it will call this method to update
128   * Selector's internal state.
129   * @param loc the location which causes exception.
130   */
131  @Override
132  public void onError(HRegionLocation loc) {
133    ConcurrentNavigableMap<byte[], StaleLocationCacheEntry> tableCache = computeIfAbsent(staleCache,
134      loc.getRegion().getTable(), () -> new ConcurrentSkipListMap<>(BYTES_COMPARATOR));
135    byte[] startKey = loc.getRegion().getStartKey();
136    tableCache.putIfAbsent(startKey, new StaleLocationCacheEntry(loc.getRegion().getEndKey()));
137    LOG.debug("Add entry to stale cache for table {} with startKey {}, {}",
138      loc.getRegion().getTable(), startKey, loc.getRegion().getEndKey());
139  }
140
141  /**
142   * Select an random replica id (including the primary replica id). In case there is no replica
143   * region configured, return the primary replica id.
144   * @return Replica id
145   */
146  private int getRandomReplicaId() {
147    int cachedNumOfReplicas = this.numOfReplicas;
148    if (cachedNumOfReplicas == CatalogReplicaLoadBalanceSelector.UNINITIALIZED_NUM_OF_REPLICAS) {
149      cachedNumOfReplicas = refreshCatalogReplicaCount();
150      this.numOfReplicas = cachedNumOfReplicas;
151    }
152    // In case of no replica configured, return the primary region id.
153    if (cachedNumOfReplicas <= 1) {
154      return RegionInfo.DEFAULT_REPLICA_ID;
155    }
156    return ThreadLocalRandom.current().nextInt(cachedNumOfReplicas);
157  }
158
159  /**
160   * When it looks up a location, it will call this method to find a replica region to go. For a
161   * normal case, > 99% of region locations from catalog/meta replica will be up to date. In extreme
162   * cases such as region server crashes, it will depends on how fast replication catches up.
163   * @param tableName  table name it looks up
164   * @param row        key it looks up.
165   * @param locateType locateType, Only BEFORE and CURRENT will be passed in.
166   * @return catalog replica id
167   */
168  @Override
169  public int select(final TableName tableName, final byte[] row,
170    final RegionLocateType locateType) {
171    Preconditions.checkArgument(
172      locateType == RegionLocateType.BEFORE || locateType == RegionLocateType.CURRENT,
173      "Expected type BEFORE or CURRENT but got: %s", locateType);
174
175    ConcurrentNavigableMap<byte[], StaleLocationCacheEntry> tableCache = staleCache.get(tableName);
176
177    // If there is no entry in StaleCache, select a random replica id.
178    if (tableCache == null) {
179      return getRandomReplicaId();
180    }
181
182    Map.Entry<byte[], StaleLocationCacheEntry> entry;
183    boolean isEmptyStopRow = isEmptyStopRow(row);
184    // Only BEFORE and CURRENT are passed in.
185    if (locateType == RegionLocateType.BEFORE) {
186      entry = isEmptyStopRow ? tableCache.lastEntry() : tableCache.lowerEntry(row);
187    } else {
188      entry = tableCache.floorEntry(row);
189    }
190
191    // It is not in the stale cache, return a random replica id.
192    if (entry == null) {
193      return getRandomReplicaId();
194    }
195
196    // The entry here is a possible match for the location. Check if the entry times out first as
197    // long comparing is faster than comparing byte arrays(in most cases). It could remove
198    // stale entries faster. If the possible match entry does not time out, it will check if
199    // the entry is a match for the row passed in and select the replica id accordingly.
200    if (
201      (EnvironmentEdgeManager.currentTime() - entry.getValue().getTimestamp())
202          >= STALE_CACHE_TIMEOUT_IN_MILLISECONDS
203    ) {
204      LOG.debug("Entry for table {} with startKey {}, {} times out", tableName, entry.getKey(),
205        entry);
206      tableCache.remove(entry.getKey());
207      return getRandomReplicaId();
208    }
209
210    byte[] endKey = entry.getValue().getEndKey();
211
212    // The following logic is borrowed from AsyncNonMetaRegionLocator.
213    if (isEmptyStopRow(endKey)) {
214      LOG.debug("Lookup {} goes to primary region", row);
215      return RegionInfo.DEFAULT_REPLICA_ID;
216    }
217
218    if (locateType == RegionLocateType.BEFORE) {
219      if (!isEmptyStopRow && Bytes.compareTo(endKey, row) >= 0) {
220        LOG.debug("Lookup {} goes to primary meta", row);
221        return RegionInfo.DEFAULT_REPLICA_ID;
222      }
223    } else {
224      if (Bytes.compareTo(row, endKey) < 0) {
225        LOG.debug("Lookup {} goes to primary meta", row);
226        return RegionInfo.DEFAULT_REPLICA_ID;
227      }
228    }
229
230    // Not in stale cache, return a random replica id.
231    return getRandomReplicaId();
232  }
233
234  // This class implements the Stoppable interface as chores needs a Stopable object, there is
235  // no-op on this Stoppable object currently.
236  @Override
237  public void stop(String why) {
238    isStopped = true;
239  }
240
241  @Override
242  public boolean isStopped() {
243    return isStopped;
244  }
245
246  private void cleanupReplicaReplicaStaleCache() {
247    long curTimeInMills = EnvironmentEdgeManager.currentTime();
248    for (ConcurrentNavigableMap<byte[], StaleLocationCacheEntry> tableCache : staleCache.values()) {
249      Iterator<Map.Entry<byte[], StaleLocationCacheEntry>> it = tableCache.entrySet().iterator();
250      while (it.hasNext()) {
251        Map.Entry<byte[], StaleLocationCacheEntry> entry = it.next();
252        if (
253          curTimeInMills - entry.getValue().getTimestamp() >= STALE_CACHE_TIMEOUT_IN_MILLISECONDS
254        ) {
255          LOG.debug("clean entry {}, {} from stale cache", entry.getKey(), entry.getValue());
256          it.remove();
257        }
258      }
259    }
260  }
261
262  private int refreshCatalogReplicaCount() {
263    int newNumOfReplicas = this.getNumOfReplicas.getAsInt();
264    LOG.debug("Refreshed replica count {}", newNumOfReplicas);
265    // If the returned number of replicas is -1, it is caused by failure to fetch the
266    // replica count. Do not update the numOfReplicas in this case.
267    if (newNumOfReplicas == CatalogReplicaLoadBalanceSelector.UNINITIALIZED_NUM_OF_REPLICAS) {
268      LOG.error("Failed to fetch Table {}'s region replica count", tableName);
269      return this.numOfReplicas;
270    }
271
272    int cachedNumOfReplicas = this.numOfReplicas;
273    if (
274      (cachedNumOfReplicas == UNINITIALIZED_NUM_OF_REPLICAS)
275        || (cachedNumOfReplicas != newNumOfReplicas)
276    ) {
277      this.numOfReplicas = newNumOfReplicas;
278    }
279    return newNumOfReplicas;
280  }
281
282  private ScheduledChore
283    getCacheCleanupChore(final CatalogReplicaLoadBalanceSimpleSelector selector) {
284    return new ScheduledChore("CleanupCatalogReplicaStaleCache", this,
285      STALE_CACHE_CLEAN_CHORE_INTERVAL_IN_MILLISECONDS) {
286      @Override
287      protected void chore() {
288        selector.cleanupReplicaReplicaStaleCache();
289      }
290    };
291  }
292
293  private ScheduledChore
294    getRefreshReplicaCountChore(final CatalogReplicaLoadBalanceSimpleSelector selector) {
295    return new ScheduledChore("RefreshReplicaCountChore", this,
296      REFRESH_REPLICA_COUNT_CHORE_INTERVAL_IN_MILLISECONDS) {
297      @Override
298      protected void chore() {
299        selector.refreshCatalogReplicaCount();
300      }
301    };
302  }
303}