001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
021import static org.apache.hadoop.hbase.HConstants.NINES;
022import static org.apache.hadoop.hbase.HConstants.ZEROES;
023import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
024import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
025import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations;
026import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
027import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
028import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
029import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
030import static org.apache.hadoop.hbase.client.RegionInfo.createRegionName;
031import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
032import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
033
034import java.io.IOException;
035import java.util.Arrays;
036import java.util.HashSet;
037import java.util.Iterator;
038import java.util.LinkedHashMap;
039import java.util.Map;
040import java.util.Optional;
041import java.util.Set;
042import java.util.concurrent.CompletableFuture;
043import java.util.concurrent.ConcurrentHashMap;
044import java.util.concurrent.ConcurrentMap;
045import java.util.concurrent.ConcurrentNavigableMap;
046import java.util.concurrent.ConcurrentSkipListMap;
047import org.apache.commons.lang3.ObjectUtils;
048import org.apache.hadoop.hbase.HBaseIOException;
049import org.apache.hadoop.hbase.HConstants;
050import org.apache.hadoop.hbase.HRegionLocation;
051import org.apache.hadoop.hbase.MetaTableAccessor;
052import org.apache.hadoop.hbase.RegionLocations;
053import org.apache.hadoop.hbase.TableName;
054import org.apache.hadoop.hbase.TableNotFoundException;
055import org.apache.hadoop.hbase.client.Scan.ReadType;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.apache.yetus.audience.InterfaceAudience;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
062import org.apache.hbase.thirdparty.com.google.common.base.Objects;
063
064/**
065 * The asynchronous locator for regions other than meta.
066 */
067@InterfaceAudience.Private
068class AsyncNonMetaRegionLocator {
069
070  private static final Logger LOG = LoggerFactory.getLogger(AsyncNonMetaRegionLocator.class);
071
072  @VisibleForTesting
073  static final String MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE =
074    "hbase.client.meta.max.concurrent.locate.per.table";
075
076  private static final int DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE = 8;
077
078  @VisibleForTesting
079  static String LOCATE_PREFETCH_LIMIT = "hbase.client.locate.prefetch.limit";
080
081  private static final int DEFAULT_LOCATE_PREFETCH_LIMIT = 10;
082
083  private final AsyncConnectionImpl conn;
084
085  private final int maxConcurrentLocateRequestPerTable;
086
087  private final int locatePrefetchLimit;
088
089  private final ConcurrentMap<TableName, TableCache> cache = new ConcurrentHashMap<>();
090
091  private static final class LocateRequest {
092
093    private final byte[] row;
094
095    private final RegionLocateType locateType;
096
097    public LocateRequest(byte[] row, RegionLocateType locateType) {
098      this.row = row;
099      this.locateType = locateType;
100    }
101
102    @Override
103    public int hashCode() {
104      return Bytes.hashCode(row) ^ locateType.hashCode();
105    }
106
107    @Override
108    public boolean equals(Object obj) {
109      if (obj == null || obj.getClass() != LocateRequest.class) {
110        return false;
111      }
112      LocateRequest that = (LocateRequest) obj;
113      return locateType.equals(that.locateType) && Bytes.equals(row, that.row);
114    }
115  }
116
117  private static final class TableCache {
118
119    private final ConcurrentNavigableMap<byte[], RegionLocations> cache =
120      new ConcurrentSkipListMap<>(BYTES_COMPARATOR);
121
122    private final Set<LocateRequest> pendingRequests = new HashSet<>();
123
124    private final Map<LocateRequest, CompletableFuture<RegionLocations>> allRequests =
125      new LinkedHashMap<>();
126
127    public boolean hasQuota(int max) {
128      return pendingRequests.size() < max;
129    }
130
131    public boolean isPending(LocateRequest req) {
132      return pendingRequests.contains(req);
133    }
134
135    public void send(LocateRequest req) {
136      pendingRequests.add(req);
137    }
138
139    public Optional<LocateRequest> getCandidate() {
140      return allRequests.keySet().stream().filter(r -> !isPending(r)).findFirst();
141    }
142
143    public void clearCompletedRequests(RegionLocations locations) {
144      for (Iterator<Map.Entry<LocateRequest, CompletableFuture<RegionLocations>>> iter =
145        allRequests.entrySet().iterator(); iter.hasNext();) {
146        Map.Entry<LocateRequest, CompletableFuture<RegionLocations>> entry = iter.next();
147        if (tryComplete(entry.getKey(), entry.getValue(), locations)) {
148          iter.remove();
149        }
150      }
151    }
152
153    private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations> future,
154        RegionLocations locations) {
155      if (future.isDone()) {
156        return true;
157      }
158      if (locations == null) {
159        return false;
160      }
161      HRegionLocation loc = ObjectUtils.firstNonNull(locations.getRegionLocations());
162      // we should at least have one location available, otherwise the request should fail and
163      // should not arrive here
164      assert loc != null;
165      boolean completed;
166      if (req.locateType.equals(RegionLocateType.BEFORE)) {
167        // for locating the row before current row, the common case is to find the previous region
168        // in reverse scan, so we check the endKey first. In general, the condition should be
169        // startKey < req.row and endKey >= req.row. Here we split it to endKey == req.row ||
170        // (endKey > req.row && startKey < req.row). The two conditions are equal since startKey <
171        // endKey.
172        byte[] endKey = loc.getRegion().getEndKey();
173        int c = Bytes.compareTo(endKey, req.row);
174        completed = c == 0 || ((c > 0 || Bytes.equals(EMPTY_END_ROW, endKey)) &&
175          Bytes.compareTo(loc.getRegion().getStartKey(), req.row) < 0);
176      } else {
177        completed = loc.getRegion().containsRow(req.row);
178      }
179      if (completed) {
180        future.complete(locations);
181        return true;
182      } else {
183        return false;
184      }
185    }
186  }
187
188  AsyncNonMetaRegionLocator(AsyncConnectionImpl conn) {
189    this.conn = conn;
190    this.maxConcurrentLocateRequestPerTable = conn.getConfiguration().getInt(
191      MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE);
192    this.locatePrefetchLimit =
193      conn.getConfiguration().getInt(LOCATE_PREFETCH_LIMIT, DEFAULT_LOCATE_PREFETCH_LIMIT);
194  }
195
196  private TableCache getTableCache(TableName tableName) {
197    return computeIfAbsent(cache, tableName, TableCache::new);
198  }
199
200  private boolean isEqual(RegionLocations locs1, RegionLocations locs2) {
201    HRegionLocation[] locArr1 = locs1.getRegionLocations();
202    HRegionLocation[] locArr2 = locs2.getRegionLocations();
203    if (locArr1.length != locArr2.length) {
204      return false;
205    }
206    for (int i = 0; i < locArr1.length; i++) {
207      // do not need to compare region info
208      HRegionLocation loc1 = locArr1[i];
209      HRegionLocation loc2 = locArr2[i];
210      if (loc1 == null) {
211        if (loc2 != null) {
212          return false;
213        }
214      } else {
215        if (loc2 == null) {
216          return false;
217        }
218        if (loc1.getSeqNum() != loc2.getSeqNum()) {
219          return false;
220        }
221        if (!Objects.equal(loc1.getServerName(), loc2.getServerName())) {
222          return false;
223        }
224      }
225    }
226    return true;
227  }
228
229  // if we successfully add the locations to cache, return the locations, otherwise return the one
230  // which prevents us being added. The upper layer can use this value to complete pending requests.
231  private RegionLocations addToCache(TableCache tableCache, RegionLocations locs) {
232    LOG.trace("Try adding {} to cache", locs);
233    byte[] startKey = locs.getRegionLocation().getRegion().getStartKey();
234    for (;;) {
235      RegionLocations oldLocs = tableCache.cache.putIfAbsent(startKey, locs);
236      if (oldLocs == null) {
237        return locs;
238      }
239      // check whether the regions are the same, this usually happens when table is split/merged, or
240      // deleted and recreated again.
241      RegionInfo region = locs.getRegionLocation().getRegion();
242      RegionInfo oldRegion = oldLocs.getRegionLocation().getRegion();
243      if (region.getEncodedName().equals(oldRegion.getEncodedName())) {
244        RegionLocations mergedLocs = oldLocs.mergeLocations(locs);
245        if (isEqual(mergedLocs, oldLocs)) {
246          // the merged one is the same with the old one, give up
247          LOG.trace("Will not add {} to cache because the old value {} " +
248            " is newer than us or has the same server name." +
249            " Maybe it is updated before we replace it", locs, oldLocs);
250          return oldLocs;
251        }
252        if (tableCache.cache.replace(startKey, oldLocs, mergedLocs)) {
253          return mergedLocs;
254        }
255      } else {
256        // the region is different, here we trust the one we fetched. This maybe wrong but finally
257        // the upper layer can detect this and trigger removal of the wrong locations
258        if (LOG.isDebugEnabled()) {
259          LOG.debug("The newnly fetch region {} is different from the old one {} for row '{}'," +
260            " try replaing the old one...", region, oldRegion, Bytes.toStringBinary(startKey));
261        }
262        if (tableCache.cache.replace(startKey, oldLocs, locs)) {
263          return locs;
264        }
265      }
266    }
267  }
268
269  private void complete(TableName tableName, LocateRequest req, RegionLocations locs,
270      Throwable error) {
271    if (error != null) {
272      LOG.warn("Failed to locate region in '" + tableName + "', row='" +
273        Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType, error);
274    }
275    Optional<LocateRequest> toSend = Optional.empty();
276    TableCache tableCache = getTableCache(tableName);
277    if (locs != null) {
278      RegionLocations addedLocs = addToCache(tableCache, locs);
279      synchronized (tableCache) {
280        tableCache.pendingRequests.remove(req);
281        tableCache.clearCompletedRequests(addedLocs);
282        // Remove a complete locate request in a synchronized block, so the table cache must have
283        // quota to send a candidate request.
284        toSend = tableCache.getCandidate();
285        toSend.ifPresent(r -> tableCache.send(r));
286      }
287      toSend.ifPresent(r -> locateInMeta(tableName, r));
288    } else {
289      // we meet an error
290      assert error != null;
291      synchronized (tableCache) {
292        tableCache.pendingRequests.remove(req);
293        // fail the request itself, no matter whether it is a DoNotRetryIOException, as we have
294        // already retried several times
295        CompletableFuture<?> future = tableCache.allRequests.remove(req);
296        if (future != null) {
297          future.completeExceptionally(error);
298        }
299        tableCache.clearCompletedRequests(null);
300        // Remove a complete locate request in a synchronized block, so the table cache must have
301        // quota to send a candidate request.
302        toSend = tableCache.getCandidate();
303        toSend.ifPresent(r -> tableCache.send(r));
304      }
305      toSend.ifPresent(r -> locateInMeta(tableName, r));
306    }
307  }
308
309  // return whether we should stop the scan
310  private boolean onScanNext(TableName tableName, LocateRequest req, Result result) {
311    RegionLocations locs = MetaTableAccessor.getRegionLocations(result);
312    if (LOG.isDebugEnabled()) {
313      LOG.debug("The fetched location of '{}', row='{}', locateType={} is {}", tableName,
314        Bytes.toStringBinary(req.row), req.locateType, locs);
315    }
316    // remove HRegionLocation with null location, i.e, getServerName returns null.
317    if (locs != null) {
318      locs = locs.removeElementsWithNullLocation();
319    }
320
321    // the default region location should always be presented when fetching from meta, otherwise
322    // let's fail the request.
323    if (locs == null || locs.getDefaultRegionLocation() == null) {
324      complete(tableName, req, null,
325        new HBaseIOException(String.format("No location found for '%s', row='%s', locateType=%s",
326          tableName, Bytes.toStringBinary(req.row), req.locateType)));
327      return true;
328    }
329    HRegionLocation loc = locs.getDefaultRegionLocation();
330    RegionInfo info = loc.getRegion();
331    if (info == null) {
332      complete(tableName, req, null,
333        new HBaseIOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s",
334          tableName, Bytes.toStringBinary(req.row), req.locateType)));
335      return true;
336    }
337    if (info.isSplitParent()) {
338      return false;
339    }
340    complete(tableName, req, locs, null);
341    return true;
342  }
343
344  private RegionLocations locateRowInCache(TableCache tableCache, TableName tableName, byte[] row,
345      int replicaId) {
346    Map.Entry<byte[], RegionLocations> entry = tableCache.cache.floorEntry(row);
347    if (entry == null) {
348      return null;
349    }
350    RegionLocations locs = entry.getValue();
351    HRegionLocation loc = locs.getRegionLocation(replicaId);
352    if (loc == null) {
353      return null;
354    }
355    byte[] endKey = loc.getRegion().getEndKey();
356    if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
357      if (LOG.isTraceEnabled()) {
358        LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
359          Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId);
360      }
361      return locs;
362    } else {
363      return null;
364    }
365  }
366
367  private RegionLocations locateRowBeforeInCache(TableCache tableCache, TableName tableName,
368      byte[] row, int replicaId) {
369    boolean isEmptyStopRow = isEmptyStopRow(row);
370    Map.Entry<byte[], RegionLocations> entry =
371      isEmptyStopRow ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row);
372    if (entry == null) {
373      return null;
374    }
375    RegionLocations locs = entry.getValue();
376    HRegionLocation loc = locs.getRegionLocation(replicaId);
377    if (loc == null) {
378      return null;
379    }
380    if (isEmptyStopRow(loc.getRegion().getEndKey()) ||
381      (!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0)) {
382      if (LOG.isTraceEnabled()) {
383        LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
384          Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId);
385      }
386      return locs;
387    } else {
388      return null;
389    }
390  }
391
392  private void locateInMeta(TableName tableName, LocateRequest req) {
393    if (LOG.isTraceEnabled()) {
394      LOG.trace("Try locate '" + tableName + "', row='" + Bytes.toStringBinary(req.row) +
395        "', locateType=" + req.locateType + " in meta");
396    }
397    byte[] metaStartKey;
398    if (req.locateType.equals(RegionLocateType.BEFORE)) {
399      if (isEmptyStopRow(req.row)) {
400        byte[] binaryTableName = tableName.getName();
401        metaStartKey = Arrays.copyOf(binaryTableName, binaryTableName.length + 1);
402      } else {
403        metaStartKey = createRegionName(tableName, req.row, ZEROES, false);
404      }
405    } else {
406      metaStartKey = createRegionName(tableName, req.row, NINES, false);
407    }
408    byte[] metaStopKey =
409      RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false);
410    conn.getTable(META_TABLE_NAME)
411      .scan(new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true)
412        .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(locatePrefetchLimit)
413        .setReadType(ReadType.PREAD), new AdvancedScanResultConsumer() {
414
415          private boolean completeNormally = false;
416
417          private boolean tableNotFound = true;
418
419          @Override
420          public void onError(Throwable error) {
421            complete(tableName, req, null, error);
422          }
423
424          @Override
425          public void onComplete() {
426            if (tableNotFound) {
427              complete(tableName, req, null, new TableNotFoundException(tableName));
428            } else if (!completeNormally) {
429              complete(tableName, req, null, new IOException("Unable to find region for '" +
430                Bytes.toStringBinary(req.row) + "' in " + tableName));
431            }
432          }
433
434          @Override
435          public void onNext(Result[] results, ScanController controller) {
436            if (results.length == 0) {
437              return;
438            }
439            tableNotFound = false;
440            int i = 0;
441            for (; i < results.length; i++) {
442              if (onScanNext(tableName, req, results[i])) {
443                completeNormally = true;
444                controller.terminate();
445                i++;
446                break;
447              }
448            }
449            // Add the remaining results into cache
450            if (i < results.length) {
451              TableCache tableCache = getTableCache(tableName);
452              for (; i < results.length; i++) {
453                RegionLocations locs = MetaTableAccessor.getRegionLocations(results[i]);
454                if (locs == null) {
455                  continue;
456                }
457                HRegionLocation loc = locs.getDefaultRegionLocation();
458                if (loc == null) {
459                  continue;
460                }
461                RegionInfo info = loc.getRegion();
462                if (info == null || info.isOffline() || info.isSplitParent()) {
463                  continue;
464                }
465                RegionLocations addedLocs = addToCache(tableCache, locs);
466                synchronized (tableCache) {
467                  tableCache.clearCompletedRequests(addedLocs);
468                }
469              }
470            }
471          }
472        });
473  }
474
475  private RegionLocations locateInCache(TableCache tableCache, TableName tableName, byte[] row,
476      int replicaId, RegionLocateType locateType) {
477    return locateType.equals(RegionLocateType.BEFORE)
478      ? locateRowBeforeInCache(tableCache, tableName, row, replicaId)
479      : locateRowInCache(tableCache, tableName, row, replicaId);
480  }
481
482  // locateToPrevious is true means we will use the start key of a region to locate the region
483  // placed before it. Used for reverse scan. See the comment of
484  // AsyncRegionLocator.getPreviousRegionLocation.
485  private CompletableFuture<RegionLocations> getRegionLocationsInternal(TableName tableName,
486      byte[] row, int replicaId, RegionLocateType locateType, boolean reload) {
487    // AFTER should be convert to CURRENT before calling this method
488    assert !locateType.equals(RegionLocateType.AFTER);
489    TableCache tableCache = getTableCache(tableName);
490    if (!reload) {
491      RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType);
492      if (isGood(locs, replicaId)) {
493        return CompletableFuture.completedFuture(locs);
494      }
495    }
496    CompletableFuture<RegionLocations> future;
497    LocateRequest req;
498    boolean sendRequest = false;
499    synchronized (tableCache) {
500      // check again
501      if (!reload) {
502        RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType);
503        if (isGood(locs, replicaId)) {
504          return CompletableFuture.completedFuture(locs);
505        }
506      }
507      req = new LocateRequest(row, locateType);
508      future = tableCache.allRequests.get(req);
509      if (future == null) {
510        future = new CompletableFuture<>();
511        tableCache.allRequests.put(req, future);
512        if (tableCache.hasQuota(maxConcurrentLocateRequestPerTable) && !tableCache.isPending(req)) {
513          tableCache.send(req);
514          sendRequest = true;
515        }
516      }
517    }
518    if (sendRequest) {
519      locateInMeta(tableName, req);
520    }
521    return future;
522  }
523
524  CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
525      int replicaId, RegionLocateType locateType, boolean reload) {
526    // as we know the exact row after us, so we can just create the new row, and use the same
527    // algorithm to locate it.
528    if (locateType.equals(RegionLocateType.AFTER)) {
529      row = createClosestRowAfter(row);
530      locateType = RegionLocateType.CURRENT;
531    }
532    return getRegionLocationsInternal(tableName, row, replicaId, locateType, reload);
533  }
534
535  private void removeLocationFromCache(HRegionLocation loc) {
536    TableCache tableCache = cache.get(loc.getRegion().getTable());
537    if (tableCache == null) {
538      return;
539    }
540    byte[] startKey = loc.getRegion().getStartKey();
541    for (;;) {
542      RegionLocations oldLocs = tableCache.cache.get(startKey);
543      if (oldLocs == null) {
544        return;
545      }
546      HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId());
547      if (!canUpdateOnError(loc, oldLoc)) {
548        return;
549      }
550      RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
551      if (newLocs == null) {
552        if (tableCache.cache.remove(startKey, oldLocs)) {
553          return;
554        }
555      } else {
556        if (tableCache.cache.replace(startKey, oldLocs, newLocs)) {
557          return;
558        }
559      }
560    }
561  }
562
563  private void addLocationToCache(HRegionLocation loc) {
564    addToCache(getTableCache(loc.getRegion().getTable()), createRegionLocations(loc));
565  }
566
567  private HRegionLocation getCachedLocation(HRegionLocation loc) {
568    TableCache tableCache = cache.get(loc.getRegion().getTable());
569    if (tableCache == null) {
570      return null;
571    }
572    RegionLocations locs = tableCache.cache.get(loc.getRegion().getStartKey());
573    return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null;
574  }
575
576  void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
577    AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCachedLocation,
578      this::addLocationToCache, this::removeLocationFromCache);
579  }
580
581  void clearCache(TableName tableName) {
582    TableCache tableCache = cache.remove(tableName);
583    if (tableCache == null) {
584      return;
585    }
586    synchronized (tableCache) {
587      if (!tableCache.allRequests.isEmpty()) {
588        IOException error = new IOException("Cache cleared");
589        tableCache.allRequests.values().forEach(f -> f.completeExceptionally(error));
590      }
591    }
592  }
593
594  // only used for testing whether we have cached the location for a region.
595  @VisibleForTesting
596  RegionLocations getRegionLocationInCache(TableName tableName, byte[] row) {
597    TableCache tableCache = cache.get(tableName);
598    if (tableCache == null) {
599      return null;
600    }
601    return locateRowInCache(tableCache, tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID);
602  }
603}