001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
021
022import java.util.Map;
023import java.util.Map.Entry;
024import java.util.Set;
025import java.util.concurrent.ConcurrentMap;
026import java.util.concurrent.ConcurrentNavigableMap;
027import java.util.concurrent.CopyOnWriteArraySet;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.HRegionLocation;
030import org.apache.hadoop.hbase.RegionLocations;
031import org.apache.hadoop.hbase.ServerName;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.types.CopyOnWriteArrayMap;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039/**
040 * A cache implementation for region locations from meta.
041 */
042@InterfaceAudience.Private
043public class MetaCache {
044
045  private static final Logger LOG = LoggerFactory.getLogger(MetaCache.class);
046
047  /**
048   * Map of table to table {@link HRegionLocation}s.
049   */
050  private final ConcurrentMap<TableName,
051    ConcurrentNavigableMap<byte[], RegionLocations>> cachedRegionLocations =
052      new CopyOnWriteArrayMap<>();
053
054  // The presence of a server in the map implies it's likely that there is an
055  // entry in cachedRegionLocations that map to this server; but the absence
056  // of a server in this map guarantees that there is no entry in cache that
057  // maps to the absent server.
058  // The access to this attribute must be protected by a lock on cachedRegionLocations
059  private final Set<ServerName> cachedServers = new CopyOnWriteArraySet<>();
060
061  private final MetricsConnection metrics;
062
063  public MetaCache(MetricsConnection metrics) {
064    this.metrics = metrics;
065  }
066
067  /**
068   * Search the cache for a location that fits our table and row key. Return null if no suitable
069   * region is located.
070   * @return Null or region location found in cache.
071   */
072  public RegionLocations getCachedLocation(final TableName tableName, final byte[] row) {
073    ConcurrentNavigableMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
074
075    Entry<byte[], RegionLocations> e = tableLocations.floorEntry(row);
076    if (e == null) {
077      if (metrics != null) metrics.incrMetaCacheMiss();
078      return null;
079    }
080    RegionLocations possibleRegion = e.getValue();
081
082    // make sure that the end key is greater than the row we're looking
083    // for, otherwise the row actually belongs in the next region, not
084    // this one. the exception case is when the endkey is
085    // HConstants.EMPTY_END_ROW, signifying that the region we're
086    // checking is actually the last region in the table.
087    byte[] endKey = possibleRegion.getRegionLocation().getRegion().getEndKey();
088    // Here we do direct Bytes.compareTo and not doing CellComparator/MetaCellComparator path.
089    // MetaCellComparator is for comparing against data in META table which need special handling.
090    // Not doing that is ok for this case because
091    // 1. We are getting the Region location for the given row in non META tables only. The compare
092    // checks the given row is within the end key of the found region. So META regions are not
093    // coming in here.
094    // 2. Even if META region comes in, its end key will be empty byte[] and so Bytes.equals(endKey,
095    // HConstants.EMPTY_END_ROW) check itself will pass.
096    if (
097      Bytes.equals(endKey, HConstants.EMPTY_END_ROW)
098        || Bytes.compareTo(endKey, 0, endKey.length, row, 0, row.length) > 0
099    ) {
100      if (metrics != null) metrics.incrMetaCacheHit();
101      return possibleRegion;
102    }
103
104    // Passed all the way through, so we got nothing - complete cache miss
105    if (metrics != null) metrics.incrMetaCacheMiss();
106    return null;
107  }
108
109  /**
110   * Put a newly discovered HRegionLocation into the cache.
111   * @param tableName The table name.
112   * @param source    the source of the new location
113   * @param location  the new location
114   */
115  public void cacheLocation(final TableName tableName, final ServerName source,
116    final HRegionLocation location) {
117    assert source != null;
118    byte[] startKey = location.getRegion().getStartKey();
119    ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
120    RegionLocations locations = new RegionLocations(new HRegionLocation[] { location });
121    RegionLocations oldLocations = tableLocations.putIfAbsent(startKey, locations);
122    boolean isNewCacheEntry = (oldLocations == null);
123    if (isNewCacheEntry) {
124      if (LOG.isTraceEnabled()) {
125        LOG.trace("Cached location: " + location);
126      }
127      addToCachedServers(locations);
128      return;
129    }
130
131    // If the server in cache sends us a redirect, assume it's always valid.
132    HRegionLocation oldLocation =
133      oldLocations.getRegionLocation(location.getRegion().getReplicaId());
134    boolean force = oldLocation != null && oldLocation.getServerName() != null
135      && oldLocation.getServerName().equals(source);
136
137    // For redirect if the number is equal to previous
138    // record, the most common case is that first the region was closed with seqNum, and then
139    // opened with the same seqNum; hence we will ignore the redirect.
140    // There are so many corner cases with various combinations of opens and closes that
141    // an additional counter on top of seqNum would be necessary to handle them all.
142    RegionLocations updatedLocations = oldLocations.updateLocation(location, false, force);
143    if (oldLocations != updatedLocations) {
144      boolean replaced = tableLocations.replace(startKey, oldLocations, updatedLocations);
145      if (replaced && LOG.isTraceEnabled()) {
146        LOG.trace("Changed cached location to: " + location);
147      }
148      addToCachedServers(updatedLocations);
149    }
150  }
151
152  /**
153   * Put a newly discovered HRegionLocation into the cache.
154   * @param tableName The table name.
155   * @param locations the new locations
156   */
157  public void cacheLocation(final TableName tableName, final RegionLocations locations) {
158    byte[] startKey = locations.getRegionLocation().getRegion().getStartKey();
159    ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
160    RegionLocations oldLocation = tableLocations.putIfAbsent(startKey, locations);
161    boolean isNewCacheEntry = (oldLocation == null);
162    if (isNewCacheEntry) {
163      if (LOG.isTraceEnabled()) {
164        LOG.trace("Cached location: " + locations);
165      }
166      addToCachedServers(locations);
167      return;
168    }
169
170    // merge old and new locations and add it to the cache
171    // Meta record might be stale - some (probably the same) server has closed the region
172    // with later seqNum and told us about the new location.
173    RegionLocations mergedLocation = oldLocation.mergeLocations(locations);
174    boolean replaced = tableLocations.replace(startKey, oldLocation, mergedLocation);
175    if (replaced && LOG.isTraceEnabled()) {
176      LOG.trace("Merged cached locations: " + mergedLocation);
177    }
178    addToCachedServers(locations);
179  }
180
181  private void addToCachedServers(RegionLocations locations) {
182    for (HRegionLocation loc : locations.getRegionLocations()) {
183      if (loc != null) {
184        cachedServers.add(loc.getServerName());
185      }
186    }
187  }
188
189  /**
190   * n * @return Map of cached locations for passed <code>tableName</code>
191   */
192  private ConcurrentNavigableMap<byte[], RegionLocations>
193    getTableLocations(final TableName tableName) {
194    // find the map of cached locations for this table
195    return computeIfAbsent(cachedRegionLocations, tableName,
196      () -> new CopyOnWriteArrayMap<>(Bytes.BYTES_COMPARATOR));
197  }
198
199  /**
200   * Check the region cache to see whether a region is cached yet or not.
201   * @param tableName tableName
202   * @param row       row
203   * @return Region cached or not.
204   */
205  public boolean isRegionCached(TableName tableName, final byte[] row) {
206    RegionLocations location = getCachedLocation(tableName, row);
207    return location != null;
208  }
209
210  /**
211   * Return the number of cached region for a table. It will only be called from a unit test.
212   */
213  public int getNumberOfCachedRegionLocations(final TableName tableName) {
214    Map<byte[], RegionLocations> tableLocs = this.cachedRegionLocations.get(tableName);
215    if (tableLocs == null) {
216      return 0;
217    }
218    int numRegions = 0;
219    for (RegionLocations tableLoc : tableLocs.values()) {
220      numRegions += tableLoc.numNonNullElements();
221    }
222    return numRegions;
223  }
224
225  /**
226   * Delete all cached entries.
227   */
228  public void clearCache() {
229    this.cachedRegionLocations.clear();
230    this.cachedServers.clear();
231  }
232
233  /**
234   * Delete all cached entries of a server.
235   */
236  public void clearCache(final ServerName serverName) {
237    if (!this.cachedServers.contains(serverName)) {
238      return;
239    }
240
241    boolean deletedSomething = false;
242    synchronized (this.cachedServers) {
243      // We block here, because if there is an error on a server, it's likely that multiple
244      // threads will get the error simultaneously. If there are hundreds of thousand of
245      // region location to check, it's better to do this only once. A better pattern would
246      // be to check if the server is dead when we get the region location.
247      if (!this.cachedServers.contains(serverName)) {
248        return;
249      }
250      for (ConcurrentMap<byte[], RegionLocations> tableLocations : cachedRegionLocations.values()) {
251        for (Entry<byte[], RegionLocations> e : tableLocations.entrySet()) {
252          RegionLocations regionLocations = e.getValue();
253          if (regionLocations != null) {
254            RegionLocations updatedLocations = regionLocations.removeByServer(serverName);
255            if (updatedLocations != regionLocations) {
256              if (updatedLocations.isEmpty()) {
257                deletedSomething |= tableLocations.remove(e.getKey(), regionLocations);
258              } else {
259                deletedSomething |=
260                  tableLocations.replace(e.getKey(), regionLocations, updatedLocations);
261              }
262            }
263          }
264        }
265      }
266      this.cachedServers.remove(serverName);
267    }
268    if (deletedSomething) {
269      if (metrics != null) {
270        metrics.incrMetaCacheNumClearServer();
271      }
272      if (LOG.isTraceEnabled()) {
273        LOG.trace("Removed all cached region locations that map to " + serverName);
274      }
275    }
276  }
277
278  /**
279   * Delete all cached entries of a table.
280   */
281  public void clearCache(final TableName tableName) {
282    if (LOG.isTraceEnabled()) {
283      LOG.trace("Removed all cached region locations for table " + tableName);
284    }
285    this.cachedRegionLocations.remove(tableName);
286  }
287
288  /**
289   * Delete a cached location, no matter what it is. Called when we were told to not use cache.
290   * @param tableName tableName n
291   */
292  public void clearCache(final TableName tableName, final byte[] row) {
293    ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
294
295    RegionLocations regionLocations = getCachedLocation(tableName, row);
296    if (regionLocations != null) {
297      byte[] startKey = regionLocations.getRegionLocation().getRegion().getStartKey();
298      boolean removed = tableLocations.remove(startKey, regionLocations);
299      if (removed) {
300        if (metrics != null) {
301          metrics.incrMetaCacheNumClearRegion();
302        }
303        if (LOG.isTraceEnabled()) {
304          LOG.trace("Removed " + regionLocations + " from cache");
305        }
306      }
307    }
308  }
309
310  /**
311   * Delete a cached location with specific replicaId.
312   * @param tableName tableName
313   * @param row       row key
314   * @param replicaId region replica id
315   */
316  public void clearCache(final TableName tableName, final byte[] row, int replicaId) {
317    ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
318
319    RegionLocations regionLocations = getCachedLocation(tableName, row);
320    if (regionLocations != null) {
321      HRegionLocation toBeRemoved = regionLocations.getRegionLocation(replicaId);
322      if (toBeRemoved != null) {
323        RegionLocations updatedLocations = regionLocations.remove(replicaId);
324        byte[] startKey = regionLocations.getRegionLocation().getRegion().getStartKey();
325        boolean removed;
326        if (updatedLocations.isEmpty()) {
327          removed = tableLocations.remove(startKey, regionLocations);
328        } else {
329          removed = tableLocations.replace(startKey, regionLocations, updatedLocations);
330        }
331
332        if (removed) {
333          if (metrics != null) {
334            metrics.incrMetaCacheNumClearRegion();
335          }
336          if (LOG.isTraceEnabled()) {
337            LOG.trace("Removed " + toBeRemoved + " from cache");
338          }
339        }
340      }
341    }
342  }
343
344  /**
345   * Delete a cached location for a table, row and server
346   */
347  public void clearCache(final TableName tableName, final byte[] row, ServerName serverName) {
348    ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
349
350    RegionLocations regionLocations = getCachedLocation(tableName, row);
351    if (regionLocations != null) {
352      RegionLocations updatedLocations = regionLocations.removeByServer(serverName);
353      if (updatedLocations != regionLocations) {
354        byte[] startKey = regionLocations.getRegionLocation().getRegion().getStartKey();
355        boolean removed = false;
356        if (updatedLocations.isEmpty()) {
357          removed = tableLocations.remove(startKey, regionLocations);
358        } else {
359          removed = tableLocations.replace(startKey, regionLocations, updatedLocations);
360        }
361        if (removed) {
362          if (metrics != null) {
363            metrics.incrMetaCacheNumClearRegion();
364          }
365          if (LOG.isTraceEnabled()) {
366            LOG.trace("Removed locations of table: " + tableName + " ,row: " + Bytes.toString(row)
367              + " mapping to server: " + serverName + " from cache");
368          }
369        }
370      }
371    }
372  }
373
374  /**
375   * Deletes the cached location of the region if necessary, based on some error from source.
376   * @param hri The region in question.
377   */
378  public void clearCache(RegionInfo hri) {
379    ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(hri.getTable());
380    RegionLocations regionLocations = tableLocations.get(hri.getStartKey());
381    if (regionLocations != null) {
382      HRegionLocation oldLocation = regionLocations.getRegionLocation(hri.getReplicaId());
383      if (oldLocation == null) return;
384      RegionLocations updatedLocations = regionLocations.remove(oldLocation);
385      boolean removed;
386      if (updatedLocations != regionLocations) {
387        if (updatedLocations.isEmpty()) {
388          removed = tableLocations.remove(hri.getStartKey(), regionLocations);
389        } else {
390          removed = tableLocations.replace(hri.getStartKey(), regionLocations, updatedLocations);
391        }
392        if (removed) {
393          if (metrics != null) {
394            metrics.incrMetaCacheNumClearRegion();
395          }
396          if (LOG.isTraceEnabled()) {
397            LOG.trace("Removed " + oldLocation + " from cache");
398          }
399        }
400      }
401    }
402  }
403
404  public void clearCache(final HRegionLocation location) {
405    if (location == null) {
406      return;
407    }
408    TableName tableName = location.getRegion().getTable();
409    ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
410    RegionLocations regionLocations = tableLocations.get(location.getRegion().getStartKey());
411    if (regionLocations != null) {
412      RegionLocations updatedLocations = regionLocations.remove(location);
413      boolean removed;
414      if (updatedLocations != regionLocations) {
415        if (updatedLocations.isEmpty()) {
416          removed = tableLocations.remove(location.getRegion().getStartKey(), regionLocations);
417        } else {
418          removed = tableLocations.replace(location.getRegion().getStartKey(), regionLocations,
419            updatedLocations);
420        }
421        if (removed) {
422          if (metrics != null) {
423            metrics.incrMetaCacheNumClearRegion();
424          }
425          if (LOG.isTraceEnabled()) {
426            LOG.trace("Removed " + location + " from cache");
427          }
428        }
429      }
430    }
431  }
432}