001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.client;
020
021import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
022
023import java.util.Map;
024import java.util.Map.Entry;
025import java.util.Set;
026import java.util.concurrent.ConcurrentMap;
027import java.util.concurrent.ConcurrentNavigableMap;
028import java.util.concurrent.CopyOnWriteArraySet;
029
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.HRegionLocation;
032import org.apache.hadoop.hbase.RegionLocations;
033import org.apache.hadoop.hbase.ServerName;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.types.CopyOnWriteArrayMap;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 * A cache implementation for region locations from meta.
043 */
044@InterfaceAudience.Private
045public class MetaCache {
046
047  private static final Logger LOG = LoggerFactory.getLogger(MetaCache.class);
048
049  /**
050   * Map of table to table {@link HRegionLocation}s.
051   */
052  private final ConcurrentMap<TableName, ConcurrentNavigableMap<byte[], RegionLocations>>
053    cachedRegionLocations = new CopyOnWriteArrayMap<>();
054
055  // The presence of a server in the map implies it's likely that there is an
056  // entry in cachedRegionLocations that map to this server; but the absence
057  // of a server in this map guarantees that there is no entry in cache that
058  // maps to the absent server.
059  // The access to this attribute must be protected by a lock on cachedRegionLocations
060  private final Set<ServerName> cachedServers = new CopyOnWriteArraySet<>();
061
062  private final MetricsConnection metrics;
063
064  public MetaCache(MetricsConnection metrics) {
065    this.metrics = metrics;
066  }
067
068  /**
069   * Search the cache for a location that fits our table and row key.
070   * Return null if no suitable region is located.
071   *
072   * @return Null or region location found in cache.
073   */
074  public RegionLocations getCachedLocation(final TableName tableName, final byte [] row) {
075    ConcurrentNavigableMap<byte[], RegionLocations> tableLocations =
076      getTableLocations(tableName);
077
078    Entry<byte[], RegionLocations> e = tableLocations.floorEntry(row);
079    if (e == null) {
080      if (metrics != null) metrics.incrMetaCacheMiss();
081      return null;
082    }
083    RegionLocations possibleRegion = e.getValue();
084
085    // make sure that the end key is greater than the row we're looking
086    // for, otherwise the row actually belongs in the next region, not
087    // this one. the exception case is when the endkey is
088    // HConstants.EMPTY_END_ROW, signifying that the region we're
089    // checking is actually the last region in the table.
090    byte[] endKey = possibleRegion.getRegionLocation().getRegion().getEndKey();
091    // Here we do direct Bytes.compareTo and not doing CellComparator/MetaCellComparator path.
092    // MetaCellComparator is for comparing against data in META table which need special handling.
093    // Not doing that is ok for this case because
094    // 1. We are getting the Region location for the given row in non META tables only. The compare
095    // checks the given row is within the end key of the found region. So META regions are not
096    // coming in here.
097    // 2. Even if META region comes in, its end key will be empty byte[] and so Bytes.equals(endKey,
098    // HConstants.EMPTY_END_ROW) check itself will pass.
099    if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) ||
100        Bytes.compareTo(endKey, 0, endKey.length, row, 0, row.length) > 0) {
101      if (metrics != null) metrics.incrMetaCacheHit();
102      return possibleRegion;
103    }
104
105    // Passed all the way through, so we got nothing - complete cache miss
106    if (metrics != null) metrics.incrMetaCacheMiss();
107    return null;
108  }
109
110  /**
111   * Put a newly discovered HRegionLocation into the cache.
112   * @param tableName The table name.
113   * @param source the source of the new location
114   * @param location the new location
115   */
116  public void cacheLocation(final TableName tableName, final ServerName source,
117      final HRegionLocation location) {
118    assert source != null;
119    byte [] startKey = location.getRegion().getStartKey();
120    ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
121    RegionLocations locations = new RegionLocations(new HRegionLocation[] {location}) ;
122    RegionLocations oldLocations = tableLocations.putIfAbsent(startKey, locations);
123    boolean isNewCacheEntry = (oldLocations == null);
124    if (isNewCacheEntry) {
125      if (LOG.isTraceEnabled()) {
126        LOG.trace("Cached location: " + location);
127      }
128      addToCachedServers(locations);
129      return;
130    }
131
132    // If the server in cache sends us a redirect, assume it's always valid.
133    HRegionLocation oldLocation = oldLocations.getRegionLocation(
134      location.getRegion().getReplicaId());
135    boolean force = oldLocation != null && oldLocation.getServerName() != null
136        && oldLocation.getServerName().equals(source);
137
138    // For redirect if the number is equal to previous
139    // record, the most common case is that first the region was closed with seqNum, and then
140    // opened with the same seqNum; hence we will ignore the redirect.
141    // There are so many corner cases with various combinations of opens and closes that
142    // an additional counter on top of seqNum would be necessary to handle them all.
143    RegionLocations updatedLocations = oldLocations.updateLocation(location, false, force);
144    if (oldLocations != updatedLocations) {
145      boolean replaced = tableLocations.replace(startKey, oldLocations, updatedLocations);
146      if (replaced && LOG.isTraceEnabled()) {
147        LOG.trace("Changed cached location to: " + location);
148      }
149      addToCachedServers(updatedLocations);
150    }
151  }
152
153  /**
154   * Put a newly discovered HRegionLocation into the cache.
155   * @param tableName The table name.
156   * @param locations the new locations
157   */
158  public void cacheLocation(final TableName tableName, final RegionLocations locations) {
159    byte [] startKey = locations.getRegionLocation().getRegion().getStartKey();
160    ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
161    RegionLocations oldLocation = tableLocations.putIfAbsent(startKey, locations);
162    boolean isNewCacheEntry = (oldLocation == null);
163    if (isNewCacheEntry) {
164      if (LOG.isTraceEnabled()) {
165        LOG.trace("Cached location: " + locations);
166      }
167      addToCachedServers(locations);
168      return;
169    }
170
171    // merge old and new locations and add it to the cache
172    // Meta record might be stale - some (probably the same) server has closed the region
173    // with later seqNum and told us about the new location.
174    RegionLocations mergedLocation = oldLocation.mergeLocations(locations);
175    boolean replaced = tableLocations.replace(startKey, oldLocation, mergedLocation);
176    if (replaced && LOG.isTraceEnabled()) {
177      LOG.trace("Merged cached locations: " + mergedLocation);
178    }
179    addToCachedServers(locations);
180  }
181
182  private void addToCachedServers(RegionLocations locations) {
183    for (HRegionLocation loc : locations.getRegionLocations()) {
184      if (loc != null) {
185        cachedServers.add(loc.getServerName());
186      }
187    }
188  }
189
190  /**
191   * @param tableName
192   * @return Map of cached locations for passed <code>tableName</code>
193   */
194  private ConcurrentNavigableMap<byte[], RegionLocations> getTableLocations(
195      final TableName tableName) {
196    // find the map of cached locations for this table
197    return computeIfAbsent(cachedRegionLocations, tableName,
198      () -> new CopyOnWriteArrayMap<>(Bytes.BYTES_COMPARATOR));
199  }
200
201  /**
202   * Check the region cache to see whether a region is cached yet or not.
203   * @param tableName tableName
204   * @param row row
205   * @return Region cached or not.
206   */
207  public boolean isRegionCached(TableName tableName, final byte[] row) {
208    RegionLocations location = getCachedLocation(tableName, row);
209    return location != null;
210  }
211
212  /**
213   * Return the number of cached region for a table. It will only be called
214   * from a unit test.
215   */
216  public int getNumberOfCachedRegionLocations(final TableName tableName) {
217    Map<byte[], RegionLocations> tableLocs = this.cachedRegionLocations.get(tableName);
218    if (tableLocs == null) {
219      return 0;
220    }
221    int numRegions = 0;
222    for (RegionLocations tableLoc : tableLocs.values()) {
223      numRegions += tableLoc.numNonNullElements();
224    }
225    return numRegions;
226  }
227
228  /**
229   * Delete all cached entries.
230   */
231  public void clearCache() {
232    this.cachedRegionLocations.clear();
233    this.cachedServers.clear();
234  }
235
236  /**
237   * Delete all cached entries of a server.
238   */
239  public void clearCache(final ServerName serverName) {
240    if (!this.cachedServers.contains(serverName)) {
241      return;
242    }
243
244    boolean deletedSomething = false;
245    synchronized (this.cachedServers) {
246      // We block here, because if there is an error on a server, it's likely that multiple
247      //  threads will get the error  simultaneously. If there are hundreds of thousand of
248      //  region location to check, it's better to do this only once. A better pattern would
249      //  be to check if the server is dead when we get the region location.
250      if (!this.cachedServers.contains(serverName)) {
251        return;
252      }
253      for (ConcurrentMap<byte[], RegionLocations> tableLocations : cachedRegionLocations.values()){
254        for (Entry<byte[], RegionLocations> e : tableLocations.entrySet()) {
255          RegionLocations regionLocations = e.getValue();
256          if (regionLocations != null) {
257            RegionLocations updatedLocations = regionLocations.removeByServer(serverName);
258            if (updatedLocations != regionLocations) {
259              if (updatedLocations.isEmpty()) {
260                deletedSomething |= tableLocations.remove(e.getKey(), regionLocations);
261              } else {
262                deletedSomething |= tableLocations.replace(e.getKey(), regionLocations,
263                    updatedLocations);
264              }
265            }
266          }
267        }
268      }
269      this.cachedServers.remove(serverName);
270    }
271    if (deletedSomething) {
272      if (metrics != null) {
273        metrics.incrMetaCacheNumClearServer();
274      }
275      if (LOG.isTraceEnabled()) {
276        LOG.trace("Removed all cached region locations that map to " + serverName);
277      }
278    }
279  }
280
281  /**
282   * Delete all cached entries of a table.
283   */
284  public void clearCache(final TableName tableName) {
285    if (LOG.isTraceEnabled()) {
286      LOG.trace("Removed all cached region locations for table " + tableName);
287    }
288    this.cachedRegionLocations.remove(tableName);
289  }
290
291  /**
292   * Delete a cached location, no matter what it is. Called when we were told to not use cache.
293   * @param tableName tableName
294   * @param row
295   */
296  public void clearCache(final TableName tableName, final byte [] row) {
297    ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
298
299    RegionLocations regionLocations = getCachedLocation(tableName, row);
300    if (regionLocations != null) {
301      byte[] startKey = regionLocations.getRegionLocation().getRegion().getStartKey();
302      boolean removed = tableLocations.remove(startKey, regionLocations);
303      if (removed) {
304        if (metrics != null) {
305          metrics.incrMetaCacheNumClearRegion();
306        }
307        if (LOG.isTraceEnabled()) {
308          LOG.trace("Removed " + regionLocations + " from cache");
309        }
310      }
311    }
312  }
313
314  /**
315   * Delete a cached location with specific replicaId.
316   * @param tableName tableName
317   * @param row row key
318   * @param replicaId region replica id
319   */
320  public void clearCache(final TableName tableName, final byte [] row, int replicaId) {
321    ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
322
323    RegionLocations regionLocations = getCachedLocation(tableName, row);
324    if (regionLocations != null) {
325      HRegionLocation toBeRemoved = regionLocations.getRegionLocation(replicaId);
326      if (toBeRemoved != null) {
327        RegionLocations updatedLocations = regionLocations.remove(replicaId);
328        byte[] startKey = regionLocations.getRegionLocation().getRegion().getStartKey();
329        boolean removed;
330        if (updatedLocations.isEmpty()) {
331          removed = tableLocations.remove(startKey, regionLocations);
332        } else {
333          removed = tableLocations.replace(startKey, regionLocations, updatedLocations);
334        }
335
336        if (removed) {
337          if (metrics != null) {
338            metrics.incrMetaCacheNumClearRegion();
339          }
340          if (LOG.isTraceEnabled()) {
341            LOG.trace("Removed " + toBeRemoved + " from cache");
342          }
343        }
344      }
345    }
346  }
347
348  /**
349   * Delete a cached location for a table, row and server
350   */
351  public void clearCache(final TableName tableName, final byte [] row, ServerName serverName) {
352    ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
353
354    RegionLocations regionLocations = getCachedLocation(tableName, row);
355    if (regionLocations != null) {
356      RegionLocations updatedLocations = regionLocations.removeByServer(serverName);
357      if (updatedLocations != regionLocations) {
358        byte[] startKey = regionLocations.getRegionLocation().getRegion().getStartKey();
359        boolean removed = false;
360        if (updatedLocations.isEmpty()) {
361          removed = tableLocations.remove(startKey, regionLocations);
362        } else {
363          removed = tableLocations.replace(startKey, regionLocations, updatedLocations);
364        }
365        if (removed) {
366          if (metrics != null) {
367            metrics.incrMetaCacheNumClearRegion();
368          }
369          if (LOG.isTraceEnabled()) {
370            LOG.trace("Removed locations of table: " + tableName + " ,row: " + Bytes.toString(row)
371              + " mapping to server: " + serverName + " from cache");
372          }
373        }
374      }
375    }
376  }
377
378  /**
379   * Deletes the cached location of the region if necessary, based on some error from source.
380   * @param hri The region in question.
381   */
382  public void clearCache(RegionInfo hri) {
383    ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(hri.getTable());
384    RegionLocations regionLocations = tableLocations.get(hri.getStartKey());
385    if (regionLocations != null) {
386      HRegionLocation oldLocation = regionLocations.getRegionLocation(hri.getReplicaId());
387      if (oldLocation == null) return;
388      RegionLocations updatedLocations = regionLocations.remove(oldLocation);
389      boolean removed;
390      if (updatedLocations != regionLocations) {
391        if (updatedLocations.isEmpty()) {
392          removed = tableLocations.remove(hri.getStartKey(), regionLocations);
393        } else {
394          removed = tableLocations.replace(hri.getStartKey(), regionLocations, updatedLocations);
395        }
396        if (removed) {
397          if (metrics != null) {
398            metrics.incrMetaCacheNumClearRegion();
399          }
400          if (LOG.isTraceEnabled()) {
401            LOG.trace("Removed " + oldLocation + " from cache");
402          }
403        }
404      }
405    }
406  }
407
408  public void clearCache(final HRegionLocation location) {
409    if (location == null) {
410      return;
411    }
412    TableName tableName = location.getRegion().getTable();
413    ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
414    RegionLocations regionLocations = tableLocations.get(location.getRegion().getStartKey());
415    if (regionLocations != null) {
416      RegionLocations updatedLocations = regionLocations.remove(location);
417      boolean removed;
418      if (updatedLocations != regionLocations) {
419        if (updatedLocations.isEmpty()) {
420          removed = tableLocations.remove(location.getRegion().getStartKey(), regionLocations);
421        } else {
422          removed = tableLocations.replace(location.getRegion().getStartKey(), regionLocations,
423              updatedLocations);
424        }
425        if (removed) {
426          if (metrics != null) {
427            metrics.incrMetaCacheNumClearRegion();
428          }
429          if (LOG.isTraceEnabled()) {
430            LOG.trace("Removed " + location + " from cache");
431          }
432        }
433      }
434    }
435  }
436}