001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
021import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
022import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
023import java.util.Iterator;
024import java.util.Map;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.ConcurrentMap;
027import java.util.concurrent.ConcurrentNavigableMap;
028import java.util.concurrent.ConcurrentSkipListMap;
029import java.util.concurrent.ThreadLocalRandom;
030import java.util.function.IntSupplier;
031import org.apache.commons.lang3.builder.ToStringBuilder;
032import org.apache.commons.lang3.builder.ToStringStyle;
033import org.apache.hadoop.hbase.ChoreService;
034import org.apache.hadoop.hbase.HRegionLocation;
035import org.apache.hadoop.hbase.ScheduledChore;
036import org.apache.hadoop.hbase.Stoppable;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
043
044/**
045 * <p>CatalogReplicaLoadBalanceReplicaSimpleSelector implements a simple catalog replica load
046 * balancing algorithm. It maintains a stale location cache for each table. Whenever client looks
047 * up location, it first check if the row is the stale location cache. If yes, the location from
048 * catalog replica is stale, it will go to the primary region to look up update-to-date location;
049 * otherwise, it will randomly pick up a replica region for lookup. When clients receive
050 * RegionNotServedException from region servers, it will add these region locations to the stale
051 * location cache. The stale cache will be cleaned up periodically by a chore.</p>
052 *
053 * It follows a simple algorithm to choose a replica to go:
054 *
055 * <ol>
056 *  <li>If there is no stale location entry for rows it looks up, it will randomly
057 *     pick a replica region to do lookup. </li>
058 *  <li>If the location from the replica region is stale, client gets RegionNotServedException
059 *     from region server, in this case, it will create StaleLocationCacheEntry in
060 *     CatalogReplicaLoadBalanceReplicaSimpleSelector.</li>
061 *  <li>When client tries to do location lookup, it checks StaleLocationCache first for rows it
062 *     tries to lookup, if entry exists, it will go with primary meta region to do lookup;
063 *     otherwise, it will follow step 1.</li>
064 *  <li>A chore will periodically run to clean up cache entries in the StaleLocationCache.</li>
065 * </ol>
066 */
067class CatalogReplicaLoadBalanceSimpleSelector implements
068  CatalogReplicaLoadBalanceSelector, Stoppable {
069  private static final Logger LOG =
070    LoggerFactory.getLogger(CatalogReplicaLoadBalanceSimpleSelector.class);
071  private final long STALE_CACHE_TIMEOUT_IN_MILLISECONDS = 3000; // 3 seconds
072  private final int STALE_CACHE_CLEAN_CHORE_INTERVAL_IN_MILLISECONDS = 1500; // 1.5 seconds
073  private final int REFRESH_REPLICA_COUNT_CHORE_INTERVAL_IN_MILLISECONDS = 60000; // 1 minute
074
075  /**
076   * StaleLocationCacheEntry is the entry when a stale location is reported by an client.
077   */
078  private static final class StaleLocationCacheEntry {
079    // timestamp in milliseconds
080    private final long timestamp;
081
082    private final byte[] endKey;
083
084    StaleLocationCacheEntry(final byte[] endKey) {
085      this.endKey = endKey;
086      timestamp = EnvironmentEdgeManager.currentTime();
087    }
088
089    public byte[] getEndKey() {
090      return this.endKey;
091    }
092
093    public long getTimestamp() {
094      return this.timestamp;
095    }
096
097    @Override
098    public String toString() {
099      return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
100        .append("endKey", endKey)
101        .append("timestamp", timestamp)
102        .toString();
103    }
104  }
105
106  private final ConcurrentMap<TableName, ConcurrentNavigableMap<byte[], StaleLocationCacheEntry>>
107    staleCache = new ConcurrentHashMap<>();
108  private volatile int numOfReplicas;
109  private final ChoreService choreService;
110  private final TableName tableName;
111  private final IntSupplier getNumOfReplicas;
112  private volatile boolean isStopped = false;
113  private final static int UNINITIALIZED_NUM_OF_REPLICAS = -1;
114
115  CatalogReplicaLoadBalanceSimpleSelector(TableName tableName, ChoreService choreService,
116    IntSupplier getNumOfReplicas) {
117    this.choreService = choreService;
118    this.tableName = tableName;
119    this.getNumOfReplicas = getNumOfReplicas;
120
121    // This numOfReplicas is going to be lazy initialized.
122    this.numOfReplicas = UNINITIALIZED_NUM_OF_REPLICAS;
123    // Start chores
124    this.choreService.scheduleChore(getCacheCleanupChore(this));
125    this.choreService.scheduleChore(getRefreshReplicaCountChore(this));
126  }
127
128  /**
129   * When a client runs into RegionNotServingException, it will call this method to
130   * update Selector's internal state.
131   * @param loc the location which causes exception.
132   */
133  public void onError(HRegionLocation loc) {
134    ConcurrentNavigableMap<byte[], StaleLocationCacheEntry> tableCache =
135      computeIfAbsent(staleCache, loc.getRegion().getTable(),
136        () -> new ConcurrentSkipListMap<>(BYTES_COMPARATOR));
137    byte[] startKey = loc.getRegion().getStartKey();
138    tableCache.putIfAbsent(startKey,
139      new StaleLocationCacheEntry(loc.getRegion().getEndKey()));
140    LOG.debug("Add entry to stale cache for table {} with startKey {}, {}",
141      loc.getRegion().getTable(), startKey, loc.getRegion().getEndKey());
142  }
143
144  /**
145   * Select an random replica id. In case there is no replica region configured, return
146   * the primary replica id.
147   * @return Replica id
148   */
149  private int getRandomReplicaId() {
150    int cachedNumOfReplicas = this.numOfReplicas;
151    if (cachedNumOfReplicas == UNINITIALIZED_NUM_OF_REPLICAS) {
152      cachedNumOfReplicas = refreshCatalogReplicaCount();
153      this.numOfReplicas = cachedNumOfReplicas;
154    }
155    // In case of no replica configured, return the primary region id.
156    if (cachedNumOfReplicas <= 1) {
157      return RegionInfo.DEFAULT_REPLICA_ID;
158    }
159    return 1 + ThreadLocalRandom.current().nextInt(cachedNumOfReplicas - 1);
160  }
161
162  /**
163   * When it looks up a location, it will call this method to find a replica region to go.
164   * For a normal case, > 99% of region locations from catalog/meta replica will be up to date.
165   * In extreme cases such as region server crashes, it will depends on how fast replication
166   * catches up.
167   *
168   * @param tablename table name it looks up
169   * @param row key it looks up.
170   * @param locateType locateType, Only BEFORE and CURRENT will be passed in.
171   * @return catalog replica id
172   */
173  public int select(final TableName tablename, final byte[] row,
174    final RegionLocateType locateType) {
175    Preconditions.checkArgument(locateType == RegionLocateType.BEFORE ||
176        locateType == RegionLocateType.CURRENT,
177      "Expected type BEFORE or CURRENT but got: %s", locateType);
178
179    ConcurrentNavigableMap<byte[], StaleLocationCacheEntry> tableCache = staleCache.get(tablename);
180
181    // If there is no entry in StaleCache, select a random replica id.
182    if (tableCache == null) {
183      return getRandomReplicaId();
184    }
185
186    Map.Entry<byte[], StaleLocationCacheEntry> entry;
187    boolean isEmptyStopRow = isEmptyStopRow(row);
188    // Only BEFORE and CURRENT are passed in.
189    if (locateType == RegionLocateType.BEFORE) {
190      entry = isEmptyStopRow ? tableCache.lastEntry() : tableCache.lowerEntry(row);
191    } else {
192      entry = tableCache.floorEntry(row);
193    }
194
195    // It is not in the stale cache, return a random replica id.
196    if (entry == null) {
197      return getRandomReplicaId();
198    }
199
200    // The entry here is a possible match for the location. Check if the entry times out first as
201    // long comparing is faster than comparing byte arrays(in most cases). It could remove
202    // stale entries faster. If the possible match entry does not time out, it will check if
203    // the entry is a match for the row passed in and select the replica id accordingly.
204    if ((EnvironmentEdgeManager.currentTime() - entry.getValue().getTimestamp()) >=
205      STALE_CACHE_TIMEOUT_IN_MILLISECONDS) {
206      LOG.debug("Entry for table {} with startKey {}, {} times out", tablename, entry.getKey(),
207        entry);
208      tableCache.remove(entry.getKey());
209      return getRandomReplicaId();
210    }
211
212    byte[] endKey =  entry.getValue().getEndKey();
213
214    // The following logic is borrowed from AsyncNonMetaRegionLocator.
215    if (isEmptyStopRow(endKey)) {
216      LOG.debug("Lookup {} goes to primary region", row);
217      return RegionInfo.DEFAULT_REPLICA_ID;
218    }
219
220    if (locateType == RegionLocateType.BEFORE) {
221      if (!isEmptyStopRow && Bytes.compareTo(endKey, row) >= 0) {
222        LOG.debug("Lookup {} goes to primary meta", row);
223        return RegionInfo.DEFAULT_REPLICA_ID;
224      }
225    } else {
226      if (Bytes.compareTo(row, endKey) < 0) {
227        LOG.debug("Lookup {} goes to primary meta", row);
228        return RegionInfo.DEFAULT_REPLICA_ID;
229      }
230    }
231
232    // Not in stale cache, return a random replica id.
233    return getRandomReplicaId();
234  }
235
236  // This class implements the Stoppable interface as chores needs a Stopable object, there is
237  // no-op on this Stoppable object currently.
238  @Override
239  public void stop(String why) {
240    isStopped = true;
241  }
242
243  @Override
244  public boolean isStopped() {
245    return isStopped;
246  }
247
248  private void cleanupReplicaReplicaStaleCache() {
249    long curTimeInMills = EnvironmentEdgeManager.currentTime();
250    for (ConcurrentNavigableMap<byte[], StaleLocationCacheEntry> tableCache : staleCache.values()) {
251      Iterator<Map.Entry<byte[], StaleLocationCacheEntry>> it =
252        tableCache.entrySet().iterator();
253      while (it.hasNext()) {
254        Map.Entry<byte[], StaleLocationCacheEntry> entry = it.next();
255        if (curTimeInMills - entry.getValue().getTimestamp() >=
256          STALE_CACHE_TIMEOUT_IN_MILLISECONDS) {
257          LOG.debug("clean entry {}, {} from stale cache", entry.getKey(), entry.getValue());
258          it.remove();
259        }
260      }
261    }
262  }
263
264  private int refreshCatalogReplicaCount() {
265    int newNumOfReplicas = this.getNumOfReplicas.getAsInt();
266    LOG.debug("Refreshed replica count {}", newNumOfReplicas);
267    if (newNumOfReplicas == 1) {
268      LOG.warn("Table {}'s region replica count is 1, maybe a misconfiguration or failure to "
269        + "fetch the replica count", tableName);
270    }
271    int cachedNumOfReplicas = this.numOfReplicas;
272
273    // If the returned number of replicas is 1, it is mostly caused by failure to fetch the
274    // replica count. Do not update the numOfReplicas in this case.
275    if ((cachedNumOfReplicas == UNINITIALIZED_NUM_OF_REPLICAS) ||
276      ((cachedNumOfReplicas != newNumOfReplicas) && (newNumOfReplicas != 1))) {
277      this.numOfReplicas = newNumOfReplicas;
278    }
279    return newNumOfReplicas;
280  }
281
282  private ScheduledChore getCacheCleanupChore(
283    final CatalogReplicaLoadBalanceSimpleSelector selector) {
284    return new ScheduledChore("CleanupCatalogReplicaStaleCache", this,
285      STALE_CACHE_CLEAN_CHORE_INTERVAL_IN_MILLISECONDS) {
286      @Override
287      protected void chore() {
288        selector.cleanupReplicaReplicaStaleCache();
289      }
290    };
291  }
292
293  private ScheduledChore getRefreshReplicaCountChore(
294    final CatalogReplicaLoadBalanceSimpleSelector selector) {
295    return new ScheduledChore("RefreshReplicaCountChore", this,
296      REFRESH_REPLICA_COUNT_CHORE_INTERVAL_IN_MILLISECONDS) {
297      @Override
298      protected void chore() {
299        selector.refreshCatalogReplicaCount();
300      }
301    };
302  }
303}