001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
021import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
022import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
023
024import java.util.Iterator;
025import java.util.Map;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.ConcurrentMap;
028import java.util.concurrent.ConcurrentNavigableMap;
029import java.util.concurrent.ConcurrentSkipListMap;
030import java.util.concurrent.ThreadLocalRandom;
031import java.util.function.IntSupplier;
032import org.apache.commons.lang3.builder.ToStringBuilder;
033import org.apache.commons.lang3.builder.ToStringStyle;
034import org.apache.hadoop.hbase.ChoreService;
035import org.apache.hadoop.hbase.HRegionLocation;
036import org.apache.hadoop.hbase.ScheduledChore;
037import org.apache.hadoop.hbase.Stoppable;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
045
046/**
047 * <p>
048 * CatalogReplicaLoadBalanceReplicaSimpleSelector implements a simple catalog replica load balancing
049 * algorithm. It maintains a stale location cache for each table. Whenever client looks up location,
050 * it first check if the row is the stale location cache. If yes, the location from catalog replica
051 * is stale, it will go to the primary region to look up update-to-date location; otherwise, it will
052 * randomly pick up a replica region or primary region for lookup. When clients receive
053 * RegionNotServedException from region servers, it will add these region locations to the stale
054 * location cache. The stale cache will be cleaned up periodically by a chore.
055 * </p>
056 * It follows a simple algorithm to choose a meta replica region (including primary meta) to go:
057 * <ol>
058 * <li>If there is no stale location entry for rows it looks up, it will randomly pick a meta
059 * replica region (including primary meta) to do lookup.</li>
060 * <li>If the location from the replica region is stale, client gets RegionNotServedException from
061 * region server, in this case, it will create StaleLocationCacheEntry in
062 * CatalogReplicaLoadBalanceReplicaSimpleSelector.</li>
063 * <li>When client tries to do location lookup, it checks StaleLocationCache first for rows it tries
064 * to lookup, if entry exists, it will go with primary meta region to do lookup; otherwise, it will
065 * follow step 1.</li>
066 * <li>A chore will periodically run to clean up cache entries in the StaleLocationCache.</li>
067 * </ol>
068 */
069class CatalogReplicaLoadBalanceSimpleSelector
070  implements CatalogReplicaLoadBalanceSelector, Stoppable {
071  private static final Logger LOG =
072    LoggerFactory.getLogger(CatalogReplicaLoadBalanceSimpleSelector.class);
073  private final long STALE_CACHE_TIMEOUT_IN_MILLISECONDS = 3000; // 3 seconds
074  private final int STALE_CACHE_CLEAN_CHORE_INTERVAL_IN_MILLISECONDS = 1500; // 1.5 seconds
075  private final int REFRESH_REPLICA_COUNT_CHORE_INTERVAL_IN_MILLISECONDS = 60000; // 1 minute
076
077  /**
078   * StaleLocationCacheEntry is the entry when a stale location is reported by an client.
079   */
080  private static final class StaleLocationCacheEntry {
081    // timestamp in milliseconds
082    private final long timestamp;
083
084    private final byte[] endKey;
085
086    StaleLocationCacheEntry(final byte[] endKey) {
087      this.endKey = endKey;
088      timestamp = EnvironmentEdgeManager.currentTime();
089    }
090
091    public byte[] getEndKey() {
092      return this.endKey;
093    }
094
095    public long getTimestamp() {
096      return this.timestamp;
097    }
098
099    @Override
100    public String toString() {
101      return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("endKey", endKey)
102        .append("timestamp", timestamp).toString();
103    }
104  }
105
106  private final ConcurrentMap<TableName,
107    ConcurrentNavigableMap<byte[], StaleLocationCacheEntry>> staleCache = new ConcurrentHashMap<>();
108  private volatile int numOfReplicas;
109  private final ChoreService choreService;
110  private final TableName tableName;
111  private final IntSupplier getNumOfReplicas;
112  private volatile boolean isStopped = false;
113
114  CatalogReplicaLoadBalanceSimpleSelector(TableName tableName, ChoreService choreService,
115    IntSupplier getNumOfReplicas) {
116    this.choreService = choreService;
117    this.tableName = tableName;
118    this.getNumOfReplicas = getNumOfReplicas;
119
120    // This numOfReplicas is going to be lazy initialized.
121    this.numOfReplicas = CatalogReplicaLoadBalanceSelector.UNINITIALIZED_NUM_OF_REPLICAS;
122    // Start chores
123    this.choreService.scheduleChore(getCacheCleanupChore(this));
124    this.choreService.scheduleChore(getRefreshReplicaCountChore(this));
125  }
126
127  /**
128   * When a client runs into RegionNotServingException, it will call this method to update
129   * Selector's internal state.
130   * @param loc the location which causes exception.
131   */
132  @Override
133  public void onError(HRegionLocation loc) {
134    ConcurrentNavigableMap<byte[], StaleLocationCacheEntry> tableCache = computeIfAbsent(staleCache,
135      loc.getRegion().getTable(), () -> new ConcurrentSkipListMap<>(BYTES_COMPARATOR));
136    byte[] startKey = loc.getRegion().getStartKey();
137    tableCache.putIfAbsent(startKey, new StaleLocationCacheEntry(loc.getRegion().getEndKey()));
138    LOG.debug("Add entry to stale cache for table {} with startKey {}, {}",
139      loc.getRegion().getTable(), startKey, loc.getRegion().getEndKey());
140  }
141
142  /**
143   * Select an random replica id (including the primary replica id). In case there is no replica
144   * region configured, return the primary replica id.
145   * @return Replica id
146   */
147  private int getRandomReplicaId() {
148    int cachedNumOfReplicas = this.numOfReplicas;
149    if (cachedNumOfReplicas == CatalogReplicaLoadBalanceSelector.UNINITIALIZED_NUM_OF_REPLICAS) {
150      cachedNumOfReplicas = refreshCatalogReplicaCount();
151      this.numOfReplicas = cachedNumOfReplicas;
152    }
153    // In case of no replica configured, return the primary region id.
154    if (cachedNumOfReplicas <= 1) {
155      return RegionInfo.DEFAULT_REPLICA_ID;
156    }
157    return ThreadLocalRandom.current().nextInt(cachedNumOfReplicas);
158  }
159
160  /**
161   * When it looks up a location, it will call this method to find a replica region to go. For a
162   * normal case, > 99% of region locations from catalog/meta replica will be up to date. In extreme
163   * cases such as region server crashes, it will depends on how fast replication catches up.
164   * @param tableName  table name it looks up
165   * @param row        key it looks up.
166   * @param locateType locateType, Only BEFORE and CURRENT will be passed in.
167   * @return catalog replica id
168   */
169  @Override
170  public int select(final TableName tableName, final byte[] row,
171    final RegionLocateType locateType) {
172    Preconditions.checkArgument(
173      locateType == RegionLocateType.BEFORE || locateType == RegionLocateType.CURRENT,
174      "Expected type BEFORE or CURRENT but got: %s", locateType);
175
176    ConcurrentNavigableMap<byte[], StaleLocationCacheEntry> tableCache = staleCache.get(tableName);
177
178    // If there is no entry in StaleCache, select a random replica id.
179    if (tableCache == null) {
180      return getRandomReplicaId();
181    }
182
183    Map.Entry<byte[], StaleLocationCacheEntry> entry;
184    boolean isEmptyStopRow = isEmptyStopRow(row);
185    // Only BEFORE and CURRENT are passed in.
186    if (locateType == RegionLocateType.BEFORE) {
187      entry = isEmptyStopRow ? tableCache.lastEntry() : tableCache.lowerEntry(row);
188    } else {
189      entry = tableCache.floorEntry(row);
190    }
191
192    // It is not in the stale cache, return a random replica id.
193    if (entry == null) {
194      return getRandomReplicaId();
195    }
196
197    // The entry here is a possible match for the location. Check if the entry times out first as
198    // long comparing is faster than comparing byte arrays(in most cases). It could remove
199    // stale entries faster. If the possible match entry does not time out, it will check if
200    // the entry is a match for the row passed in and select the replica id accordingly.
201    if (
202      (EnvironmentEdgeManager.currentTime() - entry.getValue().getTimestamp())
203          >= STALE_CACHE_TIMEOUT_IN_MILLISECONDS
204    ) {
205      LOG.debug("Entry for table {} with startKey {}, {} times out", tableName, entry.getKey(),
206        entry);
207      tableCache.remove(entry.getKey());
208      return getRandomReplicaId();
209    }
210
211    byte[] endKey = entry.getValue().getEndKey();
212
213    // The following logic is borrowed from AsyncNonMetaRegionLocator.
214    if (isEmptyStopRow(endKey)) {
215      LOG.debug("Lookup {} goes to primary region", row);
216      return RegionInfo.DEFAULT_REPLICA_ID;
217    }
218
219    if (locateType == RegionLocateType.BEFORE) {
220      if (!isEmptyStopRow && Bytes.compareTo(endKey, row) >= 0) {
221        LOG.debug("Lookup {} goes to primary meta", row);
222        return RegionInfo.DEFAULT_REPLICA_ID;
223      }
224    } else {
225      if (Bytes.compareTo(row, endKey) < 0) {
226        LOG.debug("Lookup {} goes to primary meta", row);
227        return RegionInfo.DEFAULT_REPLICA_ID;
228      }
229    }
230
231    // Not in stale cache, return a random replica id.
232    return getRandomReplicaId();
233  }
234
235  // This class implements the Stoppable interface as chores needs a Stopable object, there is
236  // no-op on this Stoppable object currently.
237  @Override
238  public void stop(String why) {
239    isStopped = true;
240  }
241
242  @Override
243  public boolean isStopped() {
244    return isStopped;
245  }
246
247  private void cleanupReplicaReplicaStaleCache() {
248    long curTimeInMills = EnvironmentEdgeManager.currentTime();
249    for (ConcurrentNavigableMap<byte[], StaleLocationCacheEntry> tableCache : staleCache.values()) {
250      Iterator<Map.Entry<byte[], StaleLocationCacheEntry>> it = tableCache.entrySet().iterator();
251      while (it.hasNext()) {
252        Map.Entry<byte[], StaleLocationCacheEntry> entry = it.next();
253        if (
254          curTimeInMills - entry.getValue().getTimestamp() >= STALE_CACHE_TIMEOUT_IN_MILLISECONDS
255        ) {
256          LOG.debug("clean entry {}, {} from stale cache", entry.getKey(), entry.getValue());
257          it.remove();
258        }
259      }
260    }
261  }
262
263  private int refreshCatalogReplicaCount() {
264    int newNumOfReplicas = this.getNumOfReplicas.getAsInt();
265    LOG.debug("Refreshed replica count {}", newNumOfReplicas);
266    // If the returned number of replicas is -1, it is caused by failure to fetch the
267    // replica count. Do not update the numOfReplicas in this case.
268    if (newNumOfReplicas == CatalogReplicaLoadBalanceSelector.UNINITIALIZED_NUM_OF_REPLICAS) {
269      LOG.error("Failed to fetch Table {}'s region replica count", tableName);
270      return this.numOfReplicas;
271    }
272
273    int cachedNumOfReplicas = this.numOfReplicas;
274    if (
275      (cachedNumOfReplicas == UNINITIALIZED_NUM_OF_REPLICAS)
276        || (cachedNumOfReplicas != newNumOfReplicas)
277    ) {
278      this.numOfReplicas = newNumOfReplicas;
279    }
280    return newNumOfReplicas;
281  }
282
283  private ScheduledChore
284    getCacheCleanupChore(final CatalogReplicaLoadBalanceSimpleSelector selector) {
285    return new ScheduledChore("CleanupCatalogReplicaStaleCache", this,
286      STALE_CACHE_CLEAN_CHORE_INTERVAL_IN_MILLISECONDS) {
287      @Override
288      protected void chore() {
289        selector.cleanupReplicaReplicaStaleCache();
290      }
291    };
292  }
293
294  private ScheduledChore
295    getRefreshReplicaCountChore(final CatalogReplicaLoadBalanceSimpleSelector selector) {
296    return new ScheduledChore("RefreshReplicaCountChore", this,
297      REFRESH_REPLICA_COUNT_CHORE_INTERVAL_IN_MILLISECONDS) {
298      @Override
299      protected void chore() {
300        selector.refreshCatalogReplicaCount();
301      }
302    };
303  }
304}