001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
021import static org.apache.hadoop.hbase.HConstants.NINES;
022import static org.apache.hadoop.hbase.HConstants.ZEROES;
023import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
024import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
025import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations;
026import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
027import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
028import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
029import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
030import static org.apache.hadoop.hbase.client.RegionInfo.createRegionName;
031import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
032import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
033
034import java.io.IOException;
035import java.util.Arrays;
036import java.util.HashSet;
037import java.util.Iterator;
038import java.util.LinkedHashMap;
039import java.util.Map;
040import java.util.Optional;
041import java.util.Set;
042import java.util.concurrent.CompletableFuture;
043import java.util.concurrent.ConcurrentHashMap;
044import java.util.concurrent.ConcurrentMap;
045import java.util.concurrent.ConcurrentNavigableMap;
046import java.util.concurrent.ConcurrentSkipListMap;
047import org.apache.commons.lang3.ObjectUtils;
048import org.apache.hadoop.hbase.HBaseIOException;
049import org.apache.hadoop.hbase.HConstants;
050import org.apache.hadoop.hbase.HRegionLocation;
051import org.apache.hadoop.hbase.MetaTableAccessor;
052import org.apache.hadoop.hbase.RegionLocations;
053import org.apache.hadoop.hbase.TableName;
054import org.apache.hadoop.hbase.TableNotFoundException;
055import org.apache.hadoop.hbase.client.Scan.ReadType;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.apache.yetus.audience.InterfaceAudience;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
062import org.apache.hbase.thirdparty.com.google.common.base.Objects;
063
064/**
065 * The asynchronous locator for regions other than meta.
066 */
067@InterfaceAudience.Private
068class AsyncNonMetaRegionLocator {
069
070  private static final Logger LOG = LoggerFactory.getLogger(AsyncNonMetaRegionLocator.class);
071
072  @VisibleForTesting
073  static final String MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE =
074    "hbase.client.meta.max.concurrent.locate.per.table";
075
076  private static final int DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE = 8;
077
078  @VisibleForTesting
079  static String LOCATE_PREFETCH_LIMIT = "hbase.client.locate.prefetch.limit";
080
081  private static final int DEFAULT_LOCATE_PREFETCH_LIMIT = 10;
082
083  private final AsyncConnectionImpl conn;
084
085  private final int maxConcurrentLocateRequestPerTable;
086
087  private final int locatePrefetchLimit;
088
089  private final ConcurrentMap<TableName, TableCache> cache = new ConcurrentHashMap<>();
090
091  private static final class LocateRequest {
092
093    private final byte[] row;
094
095    private final RegionLocateType locateType;
096
097    public LocateRequest(byte[] row, RegionLocateType locateType) {
098      this.row = row;
099      this.locateType = locateType;
100    }
101
102    @Override
103    public int hashCode() {
104      return Bytes.hashCode(row) ^ locateType.hashCode();
105    }
106
107    @Override
108    public boolean equals(Object obj) {
109      if (obj == null || obj.getClass() != LocateRequest.class) {
110        return false;
111      }
112      LocateRequest that = (LocateRequest) obj;
113      return locateType.equals(that.locateType) && Bytes.equals(row, that.row);
114    }
115  }
116
117  private static final class TableCache {
118
119    private final ConcurrentNavigableMap<byte[], RegionLocations> cache =
120      new ConcurrentSkipListMap<>(BYTES_COMPARATOR);
121
122    private final Set<LocateRequest> pendingRequests = new HashSet<>();
123
124    private final Map<LocateRequest, CompletableFuture<RegionLocations>> allRequests =
125      new LinkedHashMap<>();
126
127    public boolean hasQuota(int max) {
128      return pendingRequests.size() < max;
129    }
130
131    public boolean isPending(LocateRequest req) {
132      return pendingRequests.contains(req);
133    }
134
135    public void send(LocateRequest req) {
136      pendingRequests.add(req);
137    }
138
139    public Optional<LocateRequest> getCandidate() {
140      return allRequests.keySet().stream().filter(r -> !isPending(r)).findFirst();
141    }
142
143    public void clearCompletedRequests(Optional<RegionLocations> locations) {
144      for (Iterator<Map.Entry<LocateRequest, CompletableFuture<RegionLocations>>> iter =
145        allRequests.entrySet().iterator(); iter.hasNext();) {
146        Map.Entry<LocateRequest, CompletableFuture<RegionLocations>> entry = iter.next();
147        if (tryComplete(entry.getKey(), entry.getValue(), locations)) {
148          iter.remove();
149        }
150      }
151    }
152
153    private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations> future,
154        Optional<RegionLocations> locations) {
155      if (future.isDone()) {
156        return true;
157      }
158      if (!locations.isPresent()) {
159        return false;
160      }
161      RegionLocations locs = locations.get();
162      HRegionLocation loc = ObjectUtils.firstNonNull(locs.getRegionLocations());
163      // we should at least have one location available, otherwise the request should fail and
164      // should not arrive here
165      assert loc != null;
166      boolean completed;
167      if (req.locateType.equals(RegionLocateType.BEFORE)) {
168        // for locating the row before current row, the common case is to find the previous region
169        // in reverse scan, so we check the endKey first. In general, the condition should be
170        // startKey < req.row and endKey >= req.row. Here we split it to endKey == req.row ||
171        // (endKey > req.row && startKey < req.row). The two conditions are equal since startKey <
172        // endKey.
173        byte[] endKey = loc.getRegion().getEndKey();
174        int c = Bytes.compareTo(endKey, req.row);
175        completed = c == 0 || ((c > 0 || Bytes.equals(EMPTY_END_ROW, endKey)) &&
176          Bytes.compareTo(loc.getRegion().getStartKey(), req.row) < 0);
177      } else {
178        completed = loc.getRegion().containsRow(req.row);
179      }
180      if (completed) {
181        future.complete(locs);
182        return true;
183      } else {
184        return false;
185      }
186    }
187  }
188
189  AsyncNonMetaRegionLocator(AsyncConnectionImpl conn) {
190    this.conn = conn;
191    this.maxConcurrentLocateRequestPerTable = conn.getConfiguration().getInt(
192      MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE);
193    this.locatePrefetchLimit =
194      conn.getConfiguration().getInt(LOCATE_PREFETCH_LIMIT, DEFAULT_LOCATE_PREFETCH_LIMIT);
195  }
196
197  private TableCache getTableCache(TableName tableName) {
198    return computeIfAbsent(cache, tableName, TableCache::new);
199  }
200
201  private boolean isEqual(RegionLocations locs1, RegionLocations locs2) {
202    HRegionLocation[] locArr1 = locs1.getRegionLocations();
203    HRegionLocation[] locArr2 = locs2.getRegionLocations();
204    if (locArr1.length != locArr2.length) {
205      return false;
206    }
207    for (int i = 0; i < locArr1.length; i++) {
208      // do not need to compare region info
209      HRegionLocation loc1 = locArr1[i];
210      HRegionLocation loc2 = locArr2[i];
211      if (loc1 == null) {
212        if (loc2 != null) {
213          return false;
214        }
215      } else {
216        if (loc2 == null) {
217          return false;
218        }
219        if (loc1.getSeqNum() != loc2.getSeqNum()) {
220          return false;
221        }
222        if (!Objects.equal(loc1.getServerName(), loc2.getServerName())) {
223          return false;
224        }
225      }
226    }
227    return true;
228  }
229
230  // if we successfully add the locations to cache, return the locations, otherwise return the one
231  // which prevents us being added. The upper layer can use this value to complete pending requests.
232  private RegionLocations addToCache(TableCache tableCache, RegionLocations locs) {
233    LOG.trace("Try adding {} to cache", locs);
234    byte[] startKey = locs.getDefaultRegionLocation().getRegion().getStartKey();
235    for (;;) {
236      RegionLocations oldLocs = tableCache.cache.putIfAbsent(startKey, locs);
237      if (oldLocs == null) {
238        return locs;
239      }
240      // check whether the regions are the same, this usually happens when table is split/merged, or
241      // deleted and recreated again.
242      RegionInfo region = locs.getRegionLocation().getRegion();
243      RegionInfo oldRegion = oldLocs.getRegionLocation().getRegion();
244      if (region.getEncodedName().equals(oldRegion.getEncodedName())) {
245        RegionLocations mergedLocs = oldLocs.mergeLocations(locs);
246        if (isEqual(mergedLocs, oldLocs)) {
247          // the merged one is the same with the old one, give up
248          LOG.trace("Will not add {} to cache because the old value {} " +
249            " is newer than us or has the same server name." +
250            " Maybe it is updated before we replace it", locs, oldLocs);
251          return oldLocs;
252        }
253        if (tableCache.cache.replace(startKey, oldLocs, mergedLocs)) {
254          return mergedLocs;
255        }
256      } else {
257        // the region is different, here we trust the one we fetched. This maybe wrong but finally
258        // the upper layer can detect this and trigger removal of the wrong locations
259        if (LOG.isDebugEnabled()) {
260          LOG.debug("The newnly fetch region {} is different from the old one {} for row '{}'," +
261            " try replaing the old one...", region, oldRegion, Bytes.toStringBinary(startKey));
262        }
263        if (tableCache.cache.replace(startKey, oldLocs, locs)) {
264          return locs;
265        }
266      }
267    }
268  }
269
270  private void complete(TableName tableName, LocateRequest req, RegionLocations locs,
271      Throwable error) {
272    if (error != null) {
273      LOG.warn("Failed to locate region in '" + tableName + "', row='" +
274        Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType, error);
275    }
276    Optional<LocateRequest> toSend = Optional.empty();
277    TableCache tableCache = getTableCache(tableName);
278    if (locs != null) {
279      RegionLocations addedLocs = addToCache(tableCache, locs);
280      synchronized (tableCache) {
281        tableCache.pendingRequests.remove(req);
282        tableCache.clearCompletedRequests(Optional.of(addedLocs));
283        // Remove a complete locate request in a synchronized block, so the table cache must have
284        // quota to send a candidate request.
285        toSend = tableCache.getCandidate();
286        toSend.ifPresent(r -> tableCache.send(r));
287      }
288      toSend.ifPresent(r -> locateInMeta(tableName, r));
289    } else {
290      // we meet an error
291      assert error != null;
292      synchronized (tableCache) {
293        tableCache.pendingRequests.remove(req);
294        // fail the request itself, no matter whether it is a DoNotRetryIOException, as we have
295        // already retried several times
296        CompletableFuture<?> future = tableCache.allRequests.remove(req);
297        if (future != null) {
298          future.completeExceptionally(error);
299        }
300        tableCache.clearCompletedRequests(Optional.empty());
301        // Remove a complete locate request in a synchronized block, so the table cache must have
302        // quota to send a candidate request.
303        toSend = tableCache.getCandidate();
304        toSend.ifPresent(r -> tableCache.send(r));
305      }
306      toSend.ifPresent(r -> locateInMeta(tableName, r));
307    }
308  }
309
310  // return whether we should stop the scan
311  private boolean onScanNext(TableName tableName, LocateRequest req, Result result) {
312    RegionLocations locs = MetaTableAccessor.getRegionLocations(result);
313    if (LOG.isDebugEnabled()) {
314      LOG.debug("The fetched location of '{}', row='{}', locateType={} is {}", tableName,
315        Bytes.toStringBinary(req.row), req.locateType, locs);
316    }
317
318    // the default region location should always be presented when fetching from meta, otherwise
319    // let's fail the request.
320    if (locs == null || locs.getDefaultRegionLocation() == null) {
321      complete(tableName, req, null,
322        new HBaseIOException(String.format("No location found for '%s', row='%s', locateType=%s",
323          tableName, Bytes.toStringBinary(req.row), req.locateType)));
324      return true;
325    }
326    HRegionLocation loc = locs.getDefaultRegionLocation();
327    RegionInfo info = loc.getRegion();
328    if (info == null) {
329      complete(tableName, req, null,
330        new HBaseIOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s",
331          tableName, Bytes.toStringBinary(req.row), req.locateType)));
332      return true;
333    }
334    if (info.isSplitParent()) {
335      return false;
336    }
337    complete(tableName, req, locs, null);
338    return true;
339  }
340
341  private RegionLocations locateRowInCache(TableCache tableCache, TableName tableName, byte[] row,
342      int replicaId) {
343    Map.Entry<byte[], RegionLocations> entry = tableCache.cache.floorEntry(row);
344    if (entry == null) {
345      return null;
346    }
347    RegionLocations locs = entry.getValue();
348    HRegionLocation loc = locs.getRegionLocation(replicaId);
349    if (loc == null) {
350      return null;
351    }
352    byte[] endKey = loc.getRegion().getEndKey();
353    if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
354      if (LOG.isTraceEnabled()) {
355        LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
356          Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId);
357      }
358      return locs;
359    } else {
360      return null;
361    }
362  }
363
364  private RegionLocations locateRowBeforeInCache(TableCache tableCache, TableName tableName,
365      byte[] row, int replicaId) {
366    boolean isEmptyStopRow = isEmptyStopRow(row);
367    Map.Entry<byte[], RegionLocations> entry =
368      isEmptyStopRow ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row);
369    if (entry == null) {
370      return null;
371    }
372    RegionLocations locs = entry.getValue();
373    HRegionLocation loc = locs.getRegionLocation(replicaId);
374    if (loc == null) {
375      return null;
376    }
377    if (isEmptyStopRow(loc.getRegion().getEndKey()) ||
378      (!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0)) {
379      if (LOG.isTraceEnabled()) {
380        LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
381          Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId);
382      }
383      return locs;
384    } else {
385      return null;
386    }
387  }
388
389  private void locateInMeta(TableName tableName, LocateRequest req) {
390    if (LOG.isTraceEnabled()) {
391      LOG.trace("Try locate '" + tableName + "', row='" + Bytes.toStringBinary(req.row) +
392        "', locateType=" + req.locateType + " in meta");
393    }
394    byte[] metaStartKey;
395    if (req.locateType.equals(RegionLocateType.BEFORE)) {
396      if (isEmptyStopRow(req.row)) {
397        byte[] binaryTableName = tableName.getName();
398        metaStartKey = Arrays.copyOf(binaryTableName, binaryTableName.length + 1);
399      } else {
400        metaStartKey = createRegionName(tableName, req.row, ZEROES, false);
401      }
402    } else {
403      metaStartKey = createRegionName(tableName, req.row, NINES, false);
404    }
405    byte[] metaStopKey =
406      RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false);
407    conn.getTable(META_TABLE_NAME)
408      .scan(new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true)
409        .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(locatePrefetchLimit)
410        .setReadType(ReadType.PREAD), new AdvancedScanResultConsumer() {
411
412          private boolean completeNormally = false;
413
414          private boolean tableNotFound = true;
415
416          @Override
417          public void onError(Throwable error) {
418            complete(tableName, req, null, error);
419          }
420
421          @Override
422          public void onComplete() {
423            if (tableNotFound) {
424              complete(tableName, req, null, new TableNotFoundException(tableName));
425            } else if (!completeNormally) {
426              complete(tableName, req, null, new IOException("Unable to find region for '" +
427                Bytes.toStringBinary(req.row) + "' in " + tableName));
428            }
429          }
430
431          @Override
432          public void onNext(Result[] results, ScanController controller) {
433            if (results.length == 0) {
434              return;
435            }
436            tableNotFound = false;
437            int i = 0;
438            for (; i < results.length; i++) {
439              if (onScanNext(tableName, req, results[i])) {
440                completeNormally = true;
441                controller.terminate();
442                i++;
443                break;
444              }
445            }
446            // Add the remaining results into cache
447            if (i < results.length) {
448              TableCache tableCache = getTableCache(tableName);
449              for (; i < results.length; i++) {
450                RegionLocations locs = MetaTableAccessor.getRegionLocations(results[i]);
451                if (locs == null) {
452                  continue;
453                }
454                HRegionLocation loc = locs.getDefaultRegionLocation();
455                if (loc == null) {
456                  continue;
457                }
458                RegionInfo info = loc.getRegion();
459                if (info == null || info.isOffline() || info.isSplitParent()) {
460                  continue;
461                }
462                RegionLocations addedLocs = addToCache(tableCache, locs);
463                synchronized (tableCache) {
464                  tableCache.clearCompletedRequests(Optional.of(addedLocs));
465                }
466              }
467            }
468          }
469        });
470  }
471
472  private RegionLocations locateInCache(TableCache tableCache, TableName tableName, byte[] row,
473      int replicaId, RegionLocateType locateType) {
474    return locateType.equals(RegionLocateType.BEFORE)
475      ? locateRowBeforeInCache(tableCache, tableName, row, replicaId)
476      : locateRowInCache(tableCache, tableName, row, replicaId);
477  }
478
479  // locateToPrevious is true means we will use the start key of a region to locate the region
480  // placed before it. Used for reverse scan. See the comment of
481  // AsyncRegionLocator.getPreviousRegionLocation.
482  private CompletableFuture<RegionLocations> getRegionLocationsInternal(TableName tableName,
483      byte[] row, int replicaId, RegionLocateType locateType, boolean reload) {
484    // AFTER should be convert to CURRENT before calling this method
485    assert !locateType.equals(RegionLocateType.AFTER);
486    TableCache tableCache = getTableCache(tableName);
487    if (!reload) {
488      RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType);
489      if (isGood(locs, replicaId)) {
490        return CompletableFuture.completedFuture(locs);
491      }
492    }
493    CompletableFuture<RegionLocations> future;
494    LocateRequest req;
495    boolean sendRequest = false;
496    synchronized (tableCache) {
497      // check again
498      if (!reload) {
499        RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType);
500        if (isGood(locs, replicaId)) {
501          return CompletableFuture.completedFuture(locs);
502        }
503      }
504      req = new LocateRequest(row, locateType);
505      future = tableCache.allRequests.get(req);
506      if (future == null) {
507        future = new CompletableFuture<>();
508        tableCache.allRequests.put(req, future);
509        if (tableCache.hasQuota(maxConcurrentLocateRequestPerTable) && !tableCache.isPending(req)) {
510          tableCache.send(req);
511          sendRequest = true;
512        }
513      }
514    }
515    if (sendRequest) {
516      locateInMeta(tableName, req);
517    }
518    return future;
519  }
520
521  CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
522      int replicaId, RegionLocateType locateType, boolean reload) {
523    // as we know the exact row after us, so we can just create the new row, and use the same
524    // algorithm to locate it.
525    if (locateType.equals(RegionLocateType.AFTER)) {
526      row = createClosestRowAfter(row);
527      locateType = RegionLocateType.CURRENT;
528    }
529    return getRegionLocationsInternal(tableName, row, replicaId, locateType, reload);
530  }
531
532  private void removeLocationFromCache(HRegionLocation loc) {
533    TableCache tableCache = cache.get(loc.getRegion().getTable());
534    if (tableCache == null) {
535      return;
536    }
537    byte[] startKey = loc.getRegion().getStartKey();
538    for (;;) {
539      RegionLocations oldLocs = tableCache.cache.get(startKey);
540      HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId());
541      if (!canUpdateOnError(loc, oldLoc)) {
542        return;
543      }
544      RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
545      if (newLocs == null) {
546        if (tableCache.cache.remove(startKey, oldLocs)) {
547          return;
548        }
549      } else {
550        if (tableCache.cache.replace(startKey, oldLocs, newLocs)) {
551          return;
552        }
553      }
554    }
555  }
556
557  private void addLocationToCache(HRegionLocation loc) {
558    addToCache(getTableCache(loc.getRegion().getTable()), createRegionLocations(loc));
559  }
560
561  private HRegionLocation getCachedLocation(HRegionLocation loc) {
562    TableCache tableCache = cache.get(loc.getRegion().getTable());
563    if (tableCache == null) {
564      return null;
565    }
566    RegionLocations locs = tableCache.cache.get(loc.getRegion().getStartKey());
567    return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null;
568  }
569
570  void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
571    AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCachedLocation,
572      this::addLocationToCache, this::removeLocationFromCache);
573  }
574
575  void clearCache(TableName tableName) {
576    TableCache tableCache = cache.remove(tableName);
577    if (tableCache == null) {
578      return;
579    }
580    synchronized (tableCache) {
581      if (!tableCache.allRequests.isEmpty()) {
582        IOException error = new IOException("Cache cleared");
583        tableCache.allRequests.values().forEach(f -> f.completeExceptionally(error));
584      }
585    }
586  }
587
588  // only used for testing whether we have cached the location for a region.
589  @VisibleForTesting
590  RegionLocations getRegionLocationInCache(TableName tableName, byte[] row) {
591    TableCache tableCache = cache.get(tableName);
592    if (tableCache == null) {
593      return null;
594    }
595    return locateRowInCache(tableCache, tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID);
596  }
597}