View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.client;
20  
21  import java.util.Map;
22  import java.util.Map.Entry;
23  import java.util.Set;
24  import java.util.concurrent.ConcurrentHashMap;
25  import java.util.concurrent.ConcurrentMap;
26  import java.util.concurrent.ConcurrentSkipListMap;
27  import java.util.concurrent.ConcurrentSkipListSet;
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.hbase.HConstants;
32  import org.apache.hadoop.hbase.HRegionInfo;
33  import org.apache.hadoop.hbase.HRegionLocation;
34  import org.apache.hadoop.hbase.KeyValue;
35  import org.apache.hadoop.hbase.KeyValue.KVComparator;
36  import org.apache.hadoop.hbase.RegionLocations;
37  import org.apache.hadoop.hbase.ServerName;
38  import org.apache.hadoop.hbase.TableName;
39  import org.apache.hadoop.hbase.util.Bytes;
40  
41  /**
42   * A cache implementation for region locations from meta.
43   */
44  @InterfaceAudience.Private
45  public class MetaCache {
46  
47    private static final Log LOG = LogFactory.getLog(MetaCache.class);
48  
49    /**
50     * Map of table to table {@link HRegionLocation}s.
51     */
52    private final ConcurrentMap<TableName, ConcurrentSkipListMap<byte[], RegionLocations>>
53    cachedRegionLocations =
54    new ConcurrentHashMap<TableName, ConcurrentSkipListMap<byte[], RegionLocations>>();
55  
56    // The presence of a server in the map implies it's likely that there is an
57    // entry in cachedRegionLocations that map to this server; but the absence
58    // of a server in this map guarentees that there is no entry in cache that
59    // maps to the absent server.
60    // The access to this attribute must be protected by a lock on cachedRegionLocations
61    private final Set<ServerName> cachedServers = new ConcurrentSkipListSet<ServerName>();
62  
63    /**
64     * Search the cache for a location that fits our table and row key.
65     * Return null if no suitable region is located.
66     *
67     *
68     * @param tableName
69     * @param row
70     * @return Null or region location found in cache.
71     */
72    public RegionLocations getCachedLocation(final TableName tableName, final byte [] row) {
73      ConcurrentSkipListMap<byte[], RegionLocations> tableLocations =
74        getTableLocations(tableName);
75  
76      Entry<byte[], RegionLocations> e = tableLocations.floorEntry(row);
77      if (e == null) {
78        return null;
79      }
80      RegionLocations possibleRegion = e.getValue();
81  
82      // make sure that the end key is greater than the row we're looking
83      // for, otherwise the row actually belongs in the next region, not
84      // this one. the exception case is when the endkey is
85      // HConstants.EMPTY_END_ROW, signifying that the region we're
86      // checking is actually the last region in the table.
87      byte[] endKey = possibleRegion.getRegionLocation().getRegionInfo().getEndKey();
88      if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) ||
89          getRowComparator(tableName).compareRows(
90              endKey, 0, endKey.length, row, 0, row.length) > 0) {
91        return possibleRegion;
92      }
93  
94      // Passed all the way through, so we got nothing - complete cache miss
95      return null;
96    }
97  
98    private KVComparator getRowComparator(TableName tableName) {
99      return TableName.META_TABLE_NAME.equals(tableName) ? KeyValue.META_COMPARATOR
100         : KeyValue.COMPARATOR;
101   }
102   /**
103    * Put a newly discovered HRegionLocation into the cache.
104    * @param tableName The table name.
105    * @param source the source of the new location
106    * @param location the new location
107    */
108   public void cacheLocation(final TableName tableName, final ServerName source,
109       final HRegionLocation location) {
110     assert source != null;
111     byte [] startKey = location.getRegionInfo().getStartKey();
112     ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
113     RegionLocations locations = new RegionLocations(new HRegionLocation[] {location}) ;
114     RegionLocations oldLocations = tableLocations.putIfAbsent(startKey, locations);
115     boolean isNewCacheEntry = (oldLocations == null);
116     if (isNewCacheEntry) {
117       if (LOG.isTraceEnabled()) {
118         LOG.trace("Cached location: " + location);
119       }
120       addToCachedServers(locations);
121       return;
122     }
123 
124     // If the server in cache sends us a redirect, assume it's always valid.
125     HRegionLocation oldLocation = oldLocations.getRegionLocation(
126       location.getRegionInfo().getReplicaId());
127     boolean force = oldLocation != null && oldLocation.getServerName() != null
128         && oldLocation.getServerName().equals(source);
129 
130     // For redirect if the number is equal to previous
131     // record, the most common case is that first the region was closed with seqNum, and then
132     // opened with the same seqNum; hence we will ignore the redirect.
133     // There are so many corner cases with various combinations of opens and closes that
134     // an additional counter on top of seqNum would be necessary to handle them all.
135     RegionLocations updatedLocations = oldLocations.updateLocation(location, false, force);
136     if (oldLocations != updatedLocations) {
137       boolean replaced = tableLocations.replace(startKey, oldLocations, updatedLocations);
138       if (replaced && LOG.isTraceEnabled()) {
139         LOG.trace("Changed cached location to: " + location);
140       }
141       addToCachedServers(updatedLocations);
142     }
143   }
144 
145   /**
146    * Put a newly discovered HRegionLocation into the cache.
147    * @param tableName The table name.
148    * @param locations the new locations
149    */
150   public void cacheLocation(final TableName tableName, final RegionLocations locations) {
151     byte [] startKey = locations.getRegionLocation().getRegionInfo().getStartKey();
152     ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
153     RegionLocations oldLocation = tableLocations.putIfAbsent(startKey, locations);
154     boolean isNewCacheEntry = (oldLocation == null);
155     if (isNewCacheEntry) {
156       if (LOG.isTraceEnabled()) {
157         LOG.trace("Cached location: " + locations);
158       }
159       addToCachedServers(locations);
160       return;
161     }
162 
163     // merge old and new locations and add it to the cache
164     // Meta record might be stale - some (probably the same) server has closed the region
165     // with later seqNum and told us about the new location.
166     RegionLocations mergedLocation = oldLocation.mergeLocations(locations);
167     boolean replaced = tableLocations.replace(startKey, oldLocation, mergedLocation);
168     if (replaced && LOG.isTraceEnabled()) {
169       LOG.trace("Merged cached locations: " + mergedLocation);
170     }
171     addToCachedServers(locations);
172   }
173 
174   private void addToCachedServers(RegionLocations locations) {
175     for (HRegionLocation loc : locations.getRegionLocations()) {
176       if (loc != null) {
177         cachedServers.add(loc.getServerName());
178       }
179     }
180   }
181 
182   /**
183    * @param tableName
184    * @return Map of cached locations for passed <code>tableName</code>
185    */
186   private ConcurrentSkipListMap<byte[], RegionLocations>
187     getTableLocations(final TableName tableName) {
188     // find the map of cached locations for this table
189     ConcurrentSkipListMap<byte[], RegionLocations> result;
190     result = this.cachedRegionLocations.get(tableName);
191     // if tableLocations for this table isn't built yet, make one
192     if (result == null) {
193       result = new ConcurrentSkipListMap<byte[], RegionLocations>(Bytes.BYTES_COMPARATOR);
194       ConcurrentSkipListMap<byte[], RegionLocations> old =
195           this.cachedRegionLocations.putIfAbsent(tableName, result);
196       if (old != null) {
197         return old;
198       }
199     }
200     return result;
201   }
202 
203   /**
204    * Check the region cache to see whether a region is cached yet or not.
205    * @param tableName tableName
206    * @param row row
207    * @return Region cached or not.
208    */
209   public boolean isRegionCached(TableName tableName, final byte[] row) {
210     RegionLocations location = getCachedLocation(tableName, row);
211     return location != null;
212   }
213 
214   /**
215    * Return the number of cached region for a table. It will only be called
216    * from a unit test.
217    */
218   public int getNumberOfCachedRegionLocations(final TableName tableName) {
219     Map<byte[], RegionLocations> tableLocs = this.cachedRegionLocations.get(tableName);
220     if (tableLocs == null) {
221       return 0;
222     }
223     int numRegions = 0;
224     for (RegionLocations tableLoc : tableLocs.values()) {
225       numRegions += tableLoc.numNonNullElements();
226     }
227     return numRegions;
228   }
229 
230   /**
231    * Delete all cached entries.
232    */
233   public void clearCache() {
234     this.cachedRegionLocations.clear();
235     this.cachedServers.clear();
236   }
237 
238   /**
239    * Delete all cached entries of a server.
240    */
241   public void clearCache(final ServerName serverName) {
242     if (!this.cachedServers.contains(serverName)) {
243       return;
244     }
245 
246     boolean deletedSomething = false;
247     synchronized (this.cachedServers) {
248       // We block here, because if there is an error on a server, it's likely that multiple
249       //  threads will get the error  simultaneously. If there are hundreds of thousand of
250       //  region location to check, it's better to do this only once. A better pattern would
251       //  be to check if the server is dead when we get the region location.
252       if (!this.cachedServers.contains(serverName)) {
253         return;
254       }
255       for (ConcurrentMap<byte[], RegionLocations> tableLocations : cachedRegionLocations.values()){
256         for (Entry<byte[], RegionLocations> e : tableLocations.entrySet()) {
257           RegionLocations regionLocations = e.getValue();
258           if (regionLocations != null) {
259             RegionLocations updatedLocations = regionLocations.removeByServer(serverName);
260             if (updatedLocations != regionLocations) {
261               if (updatedLocations.isEmpty()) {
262                 deletedSomething |= tableLocations.remove(e.getKey(), regionLocations);
263               } else {
264                 deletedSomething |= tableLocations.replace(e.getKey(), regionLocations, updatedLocations);
265               }
266             }
267           }
268         }
269       }
270       this.cachedServers.remove(serverName);
271     }
272     if (deletedSomething && LOG.isTraceEnabled()) {
273       LOG.trace("Removed all cached region locations that map to " + serverName);
274     }
275   }
276 
277   /**
278    * Delete all cached entries of a table.
279    */
280   public void clearCache(final TableName tableName) {
281     if (LOG.isTraceEnabled()) {
282       LOG.trace("Removed all cached region locations for table " + tableName);
283     }
284     this.cachedRegionLocations.remove(tableName);
285   }
286 
287   /**
288    * Delete a cached location, no matter what it is. Called when we were told to not use cache.
289    * @param tableName tableName
290    * @param row
291    */
292   public void clearCache(final TableName tableName, final byte [] row, int replicaId) {
293     ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
294 
295     boolean removed = false;
296     RegionLocations regionLocations = getCachedLocation(tableName, row);
297     if (regionLocations != null) {
298       HRegionLocation toBeRemoved = regionLocations.getRegionLocation(replicaId);
299       RegionLocations updatedLocations = regionLocations.remove(replicaId);
300       if (updatedLocations != regionLocations) {
301         byte[] startKey = regionLocations.getRegionLocation().getRegionInfo().getStartKey();
302         if (updatedLocations.isEmpty()) {
303           removed = tableLocations.remove(startKey, regionLocations);
304         } else {
305           removed = tableLocations.replace(startKey, regionLocations, updatedLocations);
306         }
307       }
308 
309       if (removed && LOG.isTraceEnabled() && toBeRemoved != null) {
310         LOG.trace("Removed " + toBeRemoved + " from cache");
311       }
312     }
313   }
314 
315   /**
316    * Delete a cached location, no matter what it is. Called when we were told to not use cache.
317    * @param tableName tableName
318    * @param row
319    */
320   public void clearCache(final TableName tableName, final byte [] row) {
321     ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
322 
323     RegionLocations regionLocations = getCachedLocation(tableName, row);
324     if (regionLocations != null) {
325       byte[] startKey = regionLocations.getRegionLocation().getRegionInfo().getStartKey();
326       boolean removed = tableLocations.remove(startKey, regionLocations);
327       if (removed && LOG.isTraceEnabled()) {
328         LOG.trace("Removed " + regionLocations + " from cache");
329       }
330     }
331   }
332 
333   /**
334    * Delete a cached location for a table, row and server
335    */
336   public void clearCache(final TableName tableName, final byte [] row, ServerName serverName) {
337     ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
338 
339     RegionLocations regionLocations = getCachedLocation(tableName, row);
340     if (regionLocations != null) {
341       RegionLocations updatedLocations = regionLocations.removeByServer(serverName);
342       if (updatedLocations != regionLocations) {
343         byte[] startKey = regionLocations.getRegionLocation().getRegionInfo().getStartKey();
344         boolean removed = false;
345         if (updatedLocations.isEmpty()) {
346           removed = tableLocations.remove(startKey, regionLocations);
347         } else {
348           removed = tableLocations.replace(startKey, regionLocations, updatedLocations);
349         }
350         if (removed && LOG.isTraceEnabled()) {
351           LOG.trace("Removed locations of table: " + tableName + " ,row: " + Bytes.toString(row)
352             + " mapping to server: " + serverName + " from cache");
353         }
354       }
355     }
356   }
357 
358   /**
359    * Deletes the cached location of the region if necessary, based on some error from source.
360    * @param hri The region in question.
361    */
362   public void clearCache(HRegionInfo hri) {
363     ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(hri.getTable());
364     RegionLocations regionLocations = tableLocations.get(hri.getStartKey());
365     if (regionLocations != null) {
366       HRegionLocation oldLocation = regionLocations.getRegionLocation(hri.getReplicaId());
367       if (oldLocation == null) return;
368       RegionLocations updatedLocations = regionLocations.remove(oldLocation);
369       boolean removed = false;
370       if (updatedLocations != regionLocations) {
371         if (updatedLocations.isEmpty()) {
372           removed = tableLocations.remove(hri.getStartKey(), regionLocations);
373         } else {
374           removed = tableLocations.replace(hri.getStartKey(), regionLocations, updatedLocations);
375         }
376         if (removed && LOG.isTraceEnabled()) {
377           LOG.trace("Removed " + oldLocation + " from cache");
378         }
379       }
380     }
381   }
382 
383   public void clearCache(final HRegionLocation location) {
384     if (location == null) {
385       return;
386     }
387     TableName tableName = location.getRegionInfo().getTable();
388     ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
389     RegionLocations regionLocations = tableLocations.get(location.getRegionInfo().getStartKey());
390     if (regionLocations != null) {
391       RegionLocations updatedLocations = regionLocations.remove(location);
392       boolean removed = false;
393       if (updatedLocations != regionLocations) {
394         if (updatedLocations.isEmpty()) {
395           removed = tableLocations.remove(location.getRegionInfo().getStartKey(), regionLocations);
396         } else {
397           removed = tableLocations.replace(location.getRegionInfo().getStartKey(), regionLocations, updatedLocations);
398         }
399         if (removed && LOG.isTraceEnabled()) {
400           LOG.trace("Removed " + location + " from cache");
401         }
402       }
403     }
404   }
405 }