View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.client;
20  
21  import java.util.Map;
22  import java.util.Map.Entry;
23  import java.util.Set;
24  import java.util.concurrent.ConcurrentMap;
25  import java.util.concurrent.ConcurrentNavigableMap;
26  import java.util.concurrent.CopyOnWriteArraySet;
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.HRegionInfo;
32  import org.apache.hadoop.hbase.HRegionLocation;
33  import org.apache.hadoop.hbase.KeyValue;
34  import org.apache.hadoop.hbase.KeyValue.KVComparator;
35  import org.apache.hadoop.hbase.RegionLocations;
36  import org.apache.hadoop.hbase.ServerName;
37  import org.apache.hadoop.hbase.TableName;
38  import org.apache.hadoop.hbase.types.CopyOnWriteArrayMap;
39  import org.apache.hadoop.hbase.util.Bytes;
40  
41  /**
42   * A cache implementation for region locations from meta.
43   */
44  @InterfaceAudience.Private
45  public class MetaCache {
46  
47    private static final Log LOG = LogFactory.getLog(MetaCache.class);
48  
49    /**
50     * Map of table to table {@link HRegionLocation}s.
51     */
52    private final ConcurrentMap<TableName, ConcurrentNavigableMap<byte[], RegionLocations>>
53    cachedRegionLocations =
54    new CopyOnWriteArrayMap<>();
55  
56    // The presence of a server in the map implies it's likely that there is an
57    // entry in cachedRegionLocations that map to this server; but the absence
58    // of a server in this map guarentees that there is no entry in cache that
59    // maps to the absent server.
60    // The access to this attribute must be protected by a lock on cachedRegionLocations
61    private final Set<ServerName> cachedServers = new CopyOnWriteArraySet<>();
62  
63    private final MetricsConnection metrics;
64  
65    public MetaCache(MetricsConnection metrics) {
66      this.metrics = metrics;
67    }
68  
69    /**
70     * Search the cache for a location that fits our table and row key.
71     * Return null if no suitable region is located.
72     *
73     * @return Null or region location found in cache.
74     */
75    public RegionLocations getCachedLocation(final TableName tableName, final byte [] row) {
76      ConcurrentNavigableMap<byte[], RegionLocations> tableLocations =
77        getTableLocations(tableName);
78  
79      Entry<byte[], RegionLocations> e = tableLocations.floorEntry(row);
80      if (e == null) {
81        if (metrics!= null) metrics.incrMetaCacheMiss();
82        return null;
83      }
84      RegionLocations possibleRegion = e.getValue();
85  
86      // make sure that the end key is greater than the row we're looking
87      // for, otherwise the row actually belongs in the next region, not
88      // this one. the exception case is when the endkey is
89      // HConstants.EMPTY_END_ROW, signifying that the region we're
90      // checking is actually the last region in the table.
91      byte[] endKey = possibleRegion.getRegionLocation().getRegionInfo().getEndKey();
92      if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) ||
93          getRowComparator(tableName).compareRows(
94              endKey, 0, endKey.length, row, 0, row.length) > 0) {
95        if (metrics != null) metrics.incrMetaCacheHit();
96        return possibleRegion;
97      }
98  
99      // Passed all the way through, so we got nothing - complete cache miss
100     if (metrics != null) metrics.incrMetaCacheMiss();
101     return null;
102   }
103 
104   private KVComparator getRowComparator(TableName tableName) {
105     return TableName.META_TABLE_NAME.equals(tableName) ? KeyValue.META_COMPARATOR
106         : KeyValue.COMPARATOR;
107   }
108   /**
109    * Put a newly discovered HRegionLocation into the cache.
110    * @param tableName The table name.
111    * @param source the source of the new location
112    * @param location the new location
113    */
114   public void cacheLocation(final TableName tableName, final ServerName source,
115       final HRegionLocation location) {
116     assert source != null;
117     byte [] startKey = location.getRegionInfo().getStartKey();
118     ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
119     RegionLocations locations = new RegionLocations(new HRegionLocation[] {location}) ;
120     RegionLocations oldLocations = tableLocations.putIfAbsent(startKey, locations);
121     boolean isNewCacheEntry = (oldLocations == null);
122     if (isNewCacheEntry) {
123       if (LOG.isTraceEnabled()) {
124         LOG.trace("Cached location: " + location);
125       }
126       addToCachedServers(locations);
127       return;
128     }
129 
130     // If the server in cache sends us a redirect, assume it's always valid.
131     HRegionLocation oldLocation = oldLocations.getRegionLocation(
132       location.getRegionInfo().getReplicaId());
133     boolean force = oldLocation != null && oldLocation.getServerName() != null
134         && oldLocation.getServerName().equals(source);
135 
136     // For redirect if the number is equal to previous
137     // record, the most common case is that first the region was closed with seqNum, and then
138     // opened with the same seqNum; hence we will ignore the redirect.
139     // There are so many corner cases with various combinations of opens and closes that
140     // an additional counter on top of seqNum would be necessary to handle them all.
141     RegionLocations updatedLocations = oldLocations.updateLocation(location, false, force);
142     if (oldLocations != updatedLocations) {
143       boolean replaced = tableLocations.replace(startKey, oldLocations, updatedLocations);
144       if (replaced && LOG.isTraceEnabled()) {
145         LOG.trace("Changed cached location to: " + location);
146       }
147       addToCachedServers(updatedLocations);
148     }
149   }
150 
151   /**
152    * Put a newly discovered HRegionLocation into the cache.
153    * @param tableName The table name.
154    * @param locations the new locations
155    */
156   public void cacheLocation(final TableName tableName, final RegionLocations locations) {
157     byte [] startKey = locations.getRegionLocation().getRegionInfo().getStartKey();
158     ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
159     RegionLocations oldLocation = tableLocations.putIfAbsent(startKey, locations);
160     boolean isNewCacheEntry = (oldLocation == null);
161     if (isNewCacheEntry) {
162       if (LOG.isTraceEnabled()) {
163         LOG.trace("Cached location: " + locations);
164       }
165       addToCachedServers(locations);
166       return;
167     }
168 
169     // merge old and new locations and add it to the cache
170     // Meta record might be stale - some (probably the same) server has closed the region
171     // with later seqNum and told us about the new location.
172     RegionLocations mergedLocation = oldLocation.mergeLocations(locations);
173     boolean replaced = tableLocations.replace(startKey, oldLocation, mergedLocation);
174     if (replaced && LOG.isTraceEnabled()) {
175       LOG.trace("Merged cached locations: " + mergedLocation);
176     }
177     addToCachedServers(locations);
178   }
179 
180   private void addToCachedServers(RegionLocations locations) {
181     for (HRegionLocation loc : locations.getRegionLocations()) {
182       if (loc != null) {
183         cachedServers.add(loc.getServerName());
184       }
185     }
186   }
187 
188   /**
189    * @param tableName
190    * @return Map of cached locations for passed <code>tableName</code>
191    */
192   private ConcurrentNavigableMap<byte[], RegionLocations>
193     getTableLocations(final TableName tableName) {
194     // find the map of cached locations for this table
195     ConcurrentNavigableMap<byte[], RegionLocations> result;
196     result = this.cachedRegionLocations.get(tableName);
197     // if tableLocations for this table isn't built yet, make one
198     if (result == null) {
199       result = new CopyOnWriteArrayMap<>(Bytes.BYTES_COMPARATOR);
200       ConcurrentNavigableMap<byte[], RegionLocations> old =
201           this.cachedRegionLocations.putIfAbsent(tableName, result);
202       if (old != null) {
203         return old;
204       }
205     }
206     return result;
207   }
208 
209   /**
210    * Check the region cache to see whether a region is cached yet or not.
211    * @param tableName tableName
212    * @param row row
213    * @return Region cached or not.
214    */
215   public boolean isRegionCached(TableName tableName, final byte[] row) {
216     RegionLocations location = getCachedLocation(tableName, row);
217     return location != null;
218   }
219 
220   /**
221    * Return the number of cached region for a table. It will only be called
222    * from a unit test.
223    */
224   public int getNumberOfCachedRegionLocations(final TableName tableName) {
225     Map<byte[], RegionLocations> tableLocs = this.cachedRegionLocations.get(tableName);
226     if (tableLocs == null) {
227       return 0;
228     }
229     int numRegions = 0;
230     for (RegionLocations tableLoc : tableLocs.values()) {
231       numRegions += tableLoc.numNonNullElements();
232     }
233     return numRegions;
234   }
235 
236   /**
237    * Delete all cached entries.
238    */
239   public void clearCache() {
240     this.cachedRegionLocations.clear();
241     this.cachedServers.clear();
242   }
243 
244   /**
245    * Delete all cached entries of a server.
246    */
247   public void clearCache(final ServerName serverName) {
248     if (!this.cachedServers.contains(serverName)) {
249       return;
250     }
251 
252     boolean deletedSomething = false;
253     synchronized (this.cachedServers) {
254       // We block here, because if there is an error on a server, it's likely that multiple
255       //  threads will get the error  simultaneously. If there are hundreds of thousand of
256       //  region location to check, it's better to do this only once. A better pattern would
257       //  be to check if the server is dead when we get the region location.
258       if (!this.cachedServers.contains(serverName)) {
259         return;
260       }
261       for (ConcurrentMap<byte[], RegionLocations> tableLocations : cachedRegionLocations.values()){
262         for (Entry<byte[], RegionLocations> e : tableLocations.entrySet()) {
263           RegionLocations regionLocations = e.getValue();
264           if (regionLocations != null) {
265             RegionLocations updatedLocations = regionLocations.removeByServer(serverName);
266             if (updatedLocations != regionLocations) {
267               if (updatedLocations.isEmpty()) {
268                 deletedSomething |= tableLocations.remove(e.getKey(), regionLocations);
269               } else {
270                 deletedSomething |= tableLocations.replace(e.getKey(), regionLocations, updatedLocations);
271               }
272             }
273           }
274         }
275       }
276       this.cachedServers.remove(serverName);
277     }
278     if (deletedSomething && LOG.isTraceEnabled()) {
279       LOG.trace("Removed all cached region locations that map to " + serverName);
280     }
281   }
282 
283   /**
284    * Delete all cached entries of a table.
285    */
286   public void clearCache(final TableName tableName) {
287     if (LOG.isTraceEnabled()) {
288       LOG.trace("Removed all cached region locations for table " + tableName);
289     }
290     this.cachedRegionLocations.remove(tableName);
291   }
292 
293   /**
294    * Delete a cached location, no matter what it is. Called when we were told to not use cache.
295    * @param tableName tableName
296    * @param row
297    */
298   public void clearCache(final TableName tableName, final byte [] row, int replicaId) {
299     ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
300 
301     boolean removed = false;
302     RegionLocations regionLocations = getCachedLocation(tableName, row);
303     if (regionLocations != null) {
304       HRegionLocation toBeRemoved = regionLocations.getRegionLocation(replicaId);
305       RegionLocations updatedLocations = regionLocations.remove(replicaId);
306       if (updatedLocations != regionLocations) {
307         byte[] startKey = regionLocations.getRegionLocation().getRegionInfo().getStartKey();
308         if (updatedLocations.isEmpty()) {
309           removed = tableLocations.remove(startKey, regionLocations);
310         } else {
311           removed = tableLocations.replace(startKey, regionLocations, updatedLocations);
312         }
313       }
314 
315       if (removed && LOG.isTraceEnabled() && toBeRemoved != null) {
316         LOG.trace("Removed " + toBeRemoved + " from cache");
317       }
318     }
319   }
320 
321   /**
322    * Delete a cached location, no matter what it is. Called when we were told to not use cache.
323    * @param tableName tableName
324    * @param row
325    */
326   public void clearCache(final TableName tableName, final byte [] row) {
327     ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
328 
329     RegionLocations regionLocations = getCachedLocation(tableName, row);
330     if (regionLocations != null) {
331       byte[] startKey = regionLocations.getRegionLocation().getRegionInfo().getStartKey();
332       boolean removed = tableLocations.remove(startKey, regionLocations);
333       if (removed && LOG.isTraceEnabled()) {
334         LOG.trace("Removed " + regionLocations + " from cache");
335       }
336     }
337   }
338 
339   /**
340    * Delete a cached location for a table, row and server
341    */
342   public void clearCache(final TableName tableName, final byte [] row, ServerName serverName) {
343     ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
344 
345     RegionLocations regionLocations = getCachedLocation(tableName, row);
346     if (regionLocations != null) {
347       RegionLocations updatedLocations = regionLocations.removeByServer(serverName);
348       if (updatedLocations != regionLocations) {
349         byte[] startKey = regionLocations.getRegionLocation().getRegionInfo().getStartKey();
350         boolean removed = false;
351         if (updatedLocations.isEmpty()) {
352           removed = tableLocations.remove(startKey, regionLocations);
353         } else {
354           removed = tableLocations.replace(startKey, regionLocations, updatedLocations);
355         }
356         if (removed && LOG.isTraceEnabled()) {
357           LOG.trace("Removed locations of table: " + tableName + " ,row: " + Bytes.toString(row)
358             + " mapping to server: " + serverName + " from cache");
359         }
360       }
361     }
362   }
363 
364   /**
365    * Deletes the cached location of the region if necessary, based on some error from source.
366    * @param hri The region in question.
367    */
368   public void clearCache(HRegionInfo hri) {
369     ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(hri.getTable());
370     RegionLocations regionLocations = tableLocations.get(hri.getStartKey());
371     if (regionLocations != null) {
372       HRegionLocation oldLocation = regionLocations.getRegionLocation(hri.getReplicaId());
373       if (oldLocation == null) return;
374       RegionLocations updatedLocations = regionLocations.remove(oldLocation);
375       boolean removed = false;
376       if (updatedLocations != regionLocations) {
377         if (updatedLocations.isEmpty()) {
378           removed = tableLocations.remove(hri.getStartKey(), regionLocations);
379         } else {
380           removed = tableLocations.replace(hri.getStartKey(), regionLocations, updatedLocations);
381         }
382         if (removed && LOG.isTraceEnabled()) {
383           LOG.trace("Removed " + oldLocation + " from cache");
384         }
385       }
386     }
387   }
388 
389   public void clearCache(final HRegionLocation location) {
390     if (location == null) {
391       return;
392     }
393     TableName tableName = location.getRegionInfo().getTable();
394     ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
395     RegionLocations regionLocations = tableLocations.get(location.getRegionInfo().getStartKey());
396     if (regionLocations != null) {
397       RegionLocations updatedLocations = regionLocations.remove(location);
398       boolean removed = false;
399       if (updatedLocations != regionLocations) {
400         if (updatedLocations.isEmpty()) {
401           removed = tableLocations.remove(location.getRegionInfo().getStartKey(), regionLocations);
402         } else {
403           removed = tableLocations.replace(location.getRegionInfo().getStartKey(), regionLocations, updatedLocations);
404         }
405         if (removed && LOG.isTraceEnabled()) {
406           LOG.trace("Removed " + location + " from cache");
407         }
408       }
409     }
410   }
411 }