001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.security.access;
020
021import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
022
023import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
024import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
025import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap;
026import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
027
028import java.io.Closeable;
029import java.io.IOException;
030import java.util.HashMap;
031import java.util.List;
032import java.util.Map;
033import java.util.concurrent.ConcurrentSkipListMap;
034import java.util.concurrent.atomic.AtomicLong;
035
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.AuthUtil;
038import org.apache.hadoop.hbase.Cell;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.apache.hadoop.hbase.exceptions.DeserializationException;
043import org.apache.hadoop.hbase.log.HBaseMarkers;
044import org.apache.hadoop.hbase.security.Superusers;
045import org.apache.hadoop.hbase.security.User;
046import org.apache.hadoop.hbase.security.UserProvider;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.zookeeper.KeeperException;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052/**
053 * Performs authorization checks for a given user's assigned permissions
054 */
055@InterfaceAudience.Private
056public class TableAuthManager implements Closeable {
057  private static class PermissionCache<T extends Permission> {
058    /** Cache of user permissions */
059    private ListMultimap<String,T> userCache = ArrayListMultimap.create();
060    /** Cache of group permissions */
061    private ListMultimap<String,T> groupCache = ArrayListMultimap.create();
062
063    public List<T> getUser(String user) {
064      return userCache.get(user);
065    }
066
067    public void putUser(String user, T perm) {
068      userCache.put(user, perm);
069    }
070
071    public List<T> replaceUser(String user, Iterable<? extends T> perms) {
072      return userCache.replaceValues(user, perms);
073    }
074
075    public List<T> getGroup(String group) {
076      return groupCache.get(group);
077    }
078
079    public void putGroup(String group, T perm) {
080      groupCache.put(group, perm);
081    }
082
083    public List<T> replaceGroup(String group, Iterable<? extends T> perms) {
084      return groupCache.replaceValues(group, perms);
085    }
086
087    /**
088     * Returns a combined map of user and group permissions, with group names
089     * distinguished according to {@link AuthUtil#isGroupPrincipal(String)}.
090     */
091    public ListMultimap<String,T> getAllPermissions() {
092      ListMultimap<String,T> tmp = ArrayListMultimap.create();
093      tmp.putAll(userCache);
094      for (String group : groupCache.keySet()) {
095        tmp.putAll(AuthUtil.toGroupEntry(group), groupCache.get(group));
096      }
097      return tmp;
098    }
099  }
100
101  private static final Logger LOG = LoggerFactory.getLogger(TableAuthManager.class);
102
103  /** Cache of global permissions */
104  private volatile PermissionCache<Permission> globalCache;
105
106  private ConcurrentSkipListMap<TableName, PermissionCache<TablePermission>> tableCache =
107      new ConcurrentSkipListMap<>();
108
109  private ConcurrentSkipListMap<String, PermissionCache<TablePermission>> nsCache =
110    new ConcurrentSkipListMap<>();
111
112  private Configuration conf;
113  private ZKPermissionWatcher zkperms;
114  private final AtomicLong mtime = new AtomicLong(0L);
115
116  private TableAuthManager(ZKWatcher watcher, Configuration conf)
117      throws IOException {
118    this.conf = conf;
119
120    // initialize global permissions based on configuration
121    globalCache = initGlobal(conf);
122
123    this.zkperms = new ZKPermissionWatcher(watcher, this, conf);
124    try {
125      this.zkperms.start();
126    } catch (KeeperException ke) {
127      LOG.error("ZooKeeper initialization failed", ke);
128    }
129  }
130
131  @Override
132  public void close() {
133    this.zkperms.close();
134  }
135
136  /**
137   * Returns a new {@code PermissionCache} initialized with permission assignments
138   * from the {@code hbase.superuser} configuration key.
139   */
140  private PermissionCache<Permission> initGlobal(Configuration conf) throws IOException {
141    UserProvider userProvider = UserProvider.instantiate(conf);
142    User user = userProvider.getCurrent();
143    if (user == null) {
144      throw new IOException("Unable to obtain the current user, " +
145          "authorization checks for internal operations will not work correctly!");
146    }
147    PermissionCache<Permission> newCache = new PermissionCache<>();
148    String currentUser = user.getShortName();
149
150    // the system user is always included
151    List<String> superusers = Lists.asList(currentUser, conf.getStrings(
152        Superusers.SUPERUSER_CONF_KEY, new String[0]));
153    if (superusers != null) {
154      for (String name : superusers) {
155        if (AuthUtil.isGroupPrincipal(name)) {
156          newCache.putGroup(AuthUtil.getGroupName(name),
157              new Permission(Permission.Action.values()));
158        } else {
159          newCache.putUser(name, new Permission(Permission.Action.values()));
160        }
161      }
162    }
163    return newCache;
164  }
165
166  public ZKPermissionWatcher getZKPermissionWatcher() {
167    return this.zkperms;
168  }
169
170  public void refreshTableCacheFromWritable(TableName table,
171                                       byte[] data) throws IOException {
172    if (data != null && data.length > 0) {
173      ListMultimap<String,TablePermission> perms;
174      try {
175        perms = AccessControlLists.readPermissions(data, conf);
176      } catch (DeserializationException e) {
177        throw new IOException(e);
178      }
179
180      if (perms != null) {
181        if (Bytes.equals(table.getName(), AccessControlLists.ACL_GLOBAL_NAME)) {
182          updateGlobalCache(perms);
183        } else {
184          updateTableCache(table, perms);
185        }
186      }
187    } else {
188      LOG.debug("Skipping permission cache refresh because writable data is empty");
189    }
190  }
191
192  public void refreshNamespaceCacheFromWritable(String namespace, byte[] data) throws IOException {
193    if (data != null && data.length > 0) {
194      ListMultimap<String,TablePermission> perms;
195      try {
196        perms = AccessControlLists.readPermissions(data, conf);
197      } catch (DeserializationException e) {
198        throw new IOException(e);
199      }
200      if (perms != null) {
201        updateNsCache(namespace, perms);
202      }
203    } else {
204      LOG.debug("Skipping permission cache refresh because writable data is empty");
205    }
206  }
207
208  /**
209   * Updates the internal global permissions cache
210   *
211   * @param userPerms
212   */
213  private void updateGlobalCache(ListMultimap<String,TablePermission> userPerms) {
214    PermissionCache<Permission> newCache = null;
215    try {
216      newCache = initGlobal(conf);
217      for (Map.Entry<String,TablePermission> entry : userPerms.entries()) {
218        if (AuthUtil.isGroupPrincipal(entry.getKey())) {
219          newCache.putGroup(AuthUtil.getGroupName(entry.getKey()),
220              new Permission(entry.getValue().getActions()));
221        } else {
222          newCache.putUser(entry.getKey(), new Permission(entry.getValue().getActions()));
223        }
224      }
225      globalCache = newCache;
226      mtime.incrementAndGet();
227    } catch (IOException e) {
228      // Never happens
229      LOG.error("Error occurred while updating the global cache", e);
230    }
231  }
232
233  /**
234   * Updates the internal permissions cache for a single table, splitting
235   * the permissions listed into separate caches for users and groups to optimize
236   * group lookups.
237   *
238   * @param table
239   * @param tablePerms
240   */
241  private void updateTableCache(TableName table,
242                                ListMultimap<String,TablePermission> tablePerms) {
243    PermissionCache<TablePermission> newTablePerms = new PermissionCache<>();
244
245    for (Map.Entry<String,TablePermission> entry : tablePerms.entries()) {
246      if (AuthUtil.isGroupPrincipal(entry.getKey())) {
247        newTablePerms.putGroup(AuthUtil.getGroupName(entry.getKey()), entry.getValue());
248      } else {
249        newTablePerms.putUser(entry.getKey(), entry.getValue());
250      }
251    }
252
253    tableCache.put(table, newTablePerms);
254    mtime.incrementAndGet();
255  }
256
257  /**
258   * Updates the internal permissions cache for a single table, splitting
259   * the permissions listed into separate caches for users and groups to optimize
260   * group lookups.
261   *
262   * @param namespace
263   * @param tablePerms
264   */
265  private void updateNsCache(String namespace,
266                             ListMultimap<String, TablePermission> tablePerms) {
267    PermissionCache<TablePermission> newTablePerms = new PermissionCache<>();
268
269    for (Map.Entry<String, TablePermission> entry : tablePerms.entries()) {
270      if (AuthUtil.isGroupPrincipal(entry.getKey())) {
271        newTablePerms.putGroup(AuthUtil.getGroupName(entry.getKey()), entry.getValue());
272      } else {
273        newTablePerms.putUser(entry.getKey(), entry.getValue());
274      }
275    }
276
277    nsCache.put(namespace, newTablePerms);
278    mtime.incrementAndGet();
279  }
280
281  private PermissionCache<TablePermission> getTablePermissions(TableName table) {
282    return computeIfAbsent(tableCache, table, PermissionCache::new);
283  }
284
285  private PermissionCache<TablePermission> getNamespacePermissions(String namespace) {
286    return computeIfAbsent(nsCache, namespace, PermissionCache::new);
287  }
288
289  /**
290   * Authorizes a global permission
291   * @param perms
292   * @param action
293   * @return true if authorized, false otherwise
294   */
295  private boolean authorize(List<Permission> perms, Permission.Action action) {
296    if (perms != null) {
297      for (Permission p : perms) {
298        if (p.implies(action)) {
299          return true;
300        }
301      }
302    } else if (LOG.isDebugEnabled()) {
303      LOG.debug("No permissions found for " + action);
304    }
305
306    return false;
307  }
308
309  /**
310   * Authorize a global permission based on ACLs for the given user and the
311   * user's groups.
312   * @param user
313   * @param action
314   * @return true if known and authorized, false otherwise
315   */
316  public boolean authorize(User user, Permission.Action action) {
317    if (user == null) {
318      return false;
319    }
320
321    if (authorize(globalCache.getUser(user.getShortName()), action)) {
322      return true;
323    }
324
325    String[] groups = user.getGroupNames();
326    if (groups != null) {
327      for (String group : groups) {
328        if (authorize(globalCache.getGroup(group), action)) {
329          return true;
330        }
331      }
332    }
333    return false;
334  }
335
336  private boolean authorize(List<TablePermission> perms,
337                            TableName table, byte[] family,
338                            byte[] qualifier, Permission.Action action) {
339    if (perms != null) {
340      for (TablePermission p : perms) {
341        if (p.implies(table, family, qualifier, action)) {
342          return true;
343        }
344      }
345    } else if (LOG.isDebugEnabled()) {
346      LOG.debug("No permissions found for table="+table);
347    }
348    return false;
349  }
350
351  private boolean hasAccess(List<TablePermission> perms,
352                            TableName table, Permission.Action action) {
353    if (perms != null) {
354      for (TablePermission p : perms) {
355        if (p.implies(action)) {
356          return true;
357        }
358      }
359    } else if (LOG.isDebugEnabled()) {
360      LOG.debug("No permissions found for table="+table);
361    }
362    return false;
363  }
364
365  /**
366   * Authorize a user for a given KV. This is called from AccessControlFilter.
367   */
368  public boolean authorize(User user, TableName table, Cell cell, Permission.Action action) {
369    try {
370      List<Permission> perms = AccessControlLists.getCellPermissionsForUser(user, cell);
371      if (LOG.isTraceEnabled()) {
372        LOG.trace("Perms for user " + user.getShortName() + " in cell " + cell + ": " +
373          (perms != null ? perms : ""));
374      }
375      if (perms != null) {
376        for (Permission p: perms) {
377          if (p.implies(action)) {
378            return true;
379          }
380        }
381      }
382    } catch (IOException e) {
383      // We failed to parse the KV tag
384      LOG.error("Failed parse of ACL tag in cell " + cell);
385      // Fall through to check with the table and CF perms we were able
386      // to collect regardless
387    }
388    return false;
389  }
390
391  public boolean authorize(User user, String namespace, Permission.Action action) {
392    // Global authorizations supercede namespace level
393    if (authorize(user, action)) {
394      return true;
395    }
396    // Check namespace permissions
397    PermissionCache<TablePermission> tablePerms = nsCache.get(namespace);
398    if (tablePerms != null) {
399      List<TablePermission> userPerms = tablePerms.getUser(user.getShortName());
400      if (authorize(userPerms, namespace, action)) {
401        return true;
402      }
403      String[] groupNames = user.getGroupNames();
404      if (groupNames != null) {
405        for (String group : groupNames) {
406          List<TablePermission> groupPerms = tablePerms.getGroup(group);
407          if (authorize(groupPerms, namespace, action)) {
408            return true;
409          }
410        }
411      }
412    }
413    return false;
414  }
415
416  private boolean authorize(List<TablePermission> perms, String namespace,
417                            Permission.Action action) {
418    if (perms != null) {
419      for (TablePermission p : perms) {
420        if (p.implies(namespace, action)) {
421          return true;
422        }
423      }
424    } else if (LOG.isDebugEnabled()) {
425      LOG.debug("No permissions for authorize() check, table=" + namespace);
426    }
427
428    return false;
429  }
430
431  /**
432   * Checks authorization to a given table and column family for a user, based on the
433   * stored user permissions.
434   *
435   * @param user
436   * @param table
437   * @param family
438   * @param action
439   * @return true if known and authorized, false otherwise
440   */
441  public boolean authorizeUser(User user, TableName table, byte[] family,
442      Permission.Action action) {
443    return authorizeUser(user, table, family, null, action);
444  }
445
446  public boolean authorizeUser(User user, TableName table, byte[] family,
447      byte[] qualifier, Permission.Action action) {
448    if (table == null) table = AccessControlLists.ACL_TABLE_NAME;
449    // Global and namespace authorizations supercede table level
450    if (authorize(user, table.getNamespaceAsString(), action)) {
451      return true;
452    }
453    // Check table permissions
454    return authorize(getTablePermissions(table).getUser(user.getShortName()), table, family,
455        qualifier, action);
456  }
457
458  /**
459   * Checks if the user has access to the full table or at least a family/qualifier
460   * for the specified action.
461   *
462   * @param user
463   * @param table
464   * @param action
465   * @return true if the user has access to the table, false otherwise
466   */
467  public boolean userHasAccess(User user, TableName table, Permission.Action action) {
468    if (table == null) table = AccessControlLists.ACL_TABLE_NAME;
469    // Global and namespace authorizations supercede table level
470    if (authorize(user, table.getNamespaceAsString(), action)) {
471      return true;
472    }
473    // Check table permissions
474    return hasAccess(getTablePermissions(table).getUser(user.getShortName()), table, action);
475  }
476
477  /**
478   * Checks global authorization for a given action for a group, based on the stored
479   * permissions.
480   */
481  public boolean authorizeGroup(String groupName, Permission.Action action) {
482    List<Permission> perms = globalCache.getGroup(groupName);
483    if (LOG.isDebugEnabled()) {
484      LOG.debug("authorizing " + (perms != null && !perms.isEmpty() ? perms.get(0) : "") +
485        " for " + action);
486    }
487    return authorize(perms, action);
488  }
489
490  /**
491   * Checks authorization to a given table, column family and column for a group, based
492   * on the stored permissions.
493   * @param groupName
494   * @param table
495   * @param family
496   * @param qualifier
497   * @param action
498   * @return true if known and authorized, false otherwise
499   */
500  public boolean authorizeGroup(String groupName, TableName table, byte[] family,
501      byte[] qualifier, Permission.Action action) {
502    // Global authorization supercedes table level
503    if (authorizeGroup(groupName, action)) {
504      return true;
505    }
506    if (table == null) table = AccessControlLists.ACL_TABLE_NAME;
507    // Namespace authorization supercedes table level
508    String namespace = table.getNamespaceAsString();
509    if (authorize(getNamespacePermissions(namespace).getGroup(groupName), namespace, action)) {
510      return true;
511    }
512    // Check table level
513    List<TablePermission> tblPerms = getTablePermissions(table).getGroup(groupName);
514    if (LOG.isDebugEnabled()) {
515      LOG.debug("authorizing " + (tblPerms != null && !tblPerms.isEmpty() ? tblPerms.get(0) : "") +
516        " for " +groupName + " on " + table + "." + Bytes.toString(family) + "." +
517        Bytes.toString(qualifier) + " with " + action);
518    }
519    return authorize(tblPerms, table, family, qualifier, action);
520  }
521
522  /**
523   * Checks if the user has access to the full table or at least a family/qualifier
524   * for the specified action.
525   * @param groupName
526   * @param table
527   * @param action
528   * @return true if the group has access to the table, false otherwise
529   */
530  public boolean groupHasAccess(String groupName, TableName table, Permission.Action action) {
531    // Global authorization supercedes table level
532    if (authorizeGroup(groupName, action)) {
533      return true;
534    }
535    if (table == null) table = AccessControlLists.ACL_TABLE_NAME;
536    // Namespace authorization supercedes table level
537    if (hasAccess(getNamespacePermissions(table.getNamespaceAsString()).getGroup(groupName),
538        table, action)) {
539      return true;
540    }
541    // Check table level
542    return hasAccess(getTablePermissions(table).getGroup(groupName), table, action);
543  }
544
545  public boolean authorize(User user, TableName table, byte[] family,
546      byte[] qualifier, Permission.Action action) {
547    if (authorizeUser(user, table, family, qualifier, action)) {
548      return true;
549    }
550
551    String[] groups = user.getGroupNames();
552    if (groups != null) {
553      for (String group : groups) {
554        if (authorizeGroup(group, table, family, qualifier, action)) {
555          return true;
556        }
557      }
558    }
559    return false;
560  }
561
562  public boolean hasAccess(User user, TableName table, Permission.Action action) {
563    if (userHasAccess(user, table, action)) {
564      return true;
565    }
566
567    String[] groups = user.getGroupNames();
568    if (groups != null) {
569      for (String group : groups) {
570        if (groupHasAccess(group, table, action)) {
571          return true;
572        }
573      }
574    }
575    return false;
576  }
577
578  public boolean authorize(User user, TableName table, byte[] family,
579      Permission.Action action) {
580    return authorize(user, table, family, null, action);
581  }
582
583  /**
584   * Returns true if the given user has a {@link TablePermission} matching up
585   * to the column family portion of a permission.  Note that this permission
586   * may be scoped to a given column qualifier and does not guarantee that
587   * authorize() on the same column family would return true.
588   */
589  public boolean matchPermission(User user,
590      TableName table, byte[] family, Permission.Action action) {
591    PermissionCache<TablePermission> tablePerms = tableCache.get(table);
592    if (tablePerms != null) {
593      List<TablePermission> userPerms = tablePerms.getUser(user.getShortName());
594      if (userPerms != null) {
595        for (TablePermission p : userPerms) {
596          if (p.matchesFamily(table, family, action)) {
597            return true;
598          }
599        }
600      }
601
602      String[] groups = user.getGroupNames();
603      if (groups != null) {
604        for (String group : groups) {
605          List<TablePermission> groupPerms = tablePerms.getGroup(group);
606          if (groupPerms != null) {
607            for (TablePermission p : groupPerms) {
608              if (p.matchesFamily(table, family, action)) {
609                return true;
610              }
611            }
612          }
613        }
614      }
615    }
616
617    return false;
618  }
619
620  public boolean matchPermission(User user,
621      TableName table, byte[] family, byte[] qualifier,
622      Permission.Action action) {
623    PermissionCache<TablePermission> tablePerms = tableCache.get(table);
624    if (tablePerms != null) {
625      List<TablePermission> userPerms = tablePerms.getUser(user.getShortName());
626      if (userPerms != null) {
627        for (TablePermission p : userPerms) {
628          if (p.matchesFamilyQualifier(table, family, qualifier, action)) {
629            return true;
630          }
631        }
632      }
633
634      String[] groups = user.getGroupNames();
635      if (groups != null) {
636        for (String group : groups) {
637          List<TablePermission> groupPerms = tablePerms.getGroup(group);
638          if (groupPerms != null) {
639            for (TablePermission p : groupPerms) {
640              if (p.matchesFamilyQualifier(table, family, qualifier, action)) {
641                return true;
642              }
643            }
644          }
645        }
646      }
647    }
648    return false;
649  }
650
651  public void removeNamespace(byte[] ns) {
652    nsCache.remove(Bytes.toString(ns));
653  }
654
655  public void removeTable(TableName table) {
656    tableCache.remove(table);
657  }
658
659  /**
660   * Overwrites the existing permission set for a given user for a table, and
661   * triggers an update for zookeeper synchronization.
662   * @param username
663   * @param table
664   * @param perms
665   */
666  public void setTableUserPermissions(String username, TableName table,
667      List<TablePermission> perms) {
668    PermissionCache<TablePermission> tablePerms = getTablePermissions(table);
669    tablePerms.replaceUser(username, perms);
670    writeTableToZooKeeper(table, tablePerms);
671  }
672
673  /**
674   * Overwrites the existing permission set for a group and triggers an update
675   * for zookeeper synchronization.
676   * @param group
677   * @param table
678   * @param perms
679   */
680  public void setTableGroupPermissions(String group, TableName table,
681      List<TablePermission> perms) {
682    PermissionCache<TablePermission> tablePerms = getTablePermissions(table);
683    tablePerms.replaceGroup(group, perms);
684    writeTableToZooKeeper(table, tablePerms);
685  }
686
687  /**
688   * Overwrites the existing permission set for a given user for a table, and
689   * triggers an update for zookeeper synchronization.
690   * @param username
691   * @param namespace
692   * @param perms
693   */
694  public void setNamespaceUserPermissions(String username, String namespace,
695      List<TablePermission> perms) {
696    PermissionCache<TablePermission> tablePerms = getNamespacePermissions(namespace);
697    tablePerms.replaceUser(username, perms);
698    writeNamespaceToZooKeeper(namespace, tablePerms);
699  }
700
701  /**
702   * Overwrites the existing permission set for a group and triggers an update
703   * for zookeeper synchronization.
704   * @param group
705   * @param namespace
706   * @param perms
707   */
708  public void setNamespaceGroupPermissions(String group, String namespace,
709      List<TablePermission> perms) {
710    PermissionCache<TablePermission> tablePerms = getNamespacePermissions(namespace);
711    tablePerms.replaceGroup(group, perms);
712    writeNamespaceToZooKeeper(namespace, tablePerms);
713  }
714
715  public void writeTableToZooKeeper(TableName table,
716      PermissionCache<TablePermission> tablePerms) {
717    byte[] serialized = new byte[0];
718    if (tablePerms != null) {
719      serialized = AccessControlLists.writePermissionsAsBytes(tablePerms.getAllPermissions(), conf);
720    }
721    zkperms.writeToZookeeper(table.getName(), serialized);
722  }
723
724  public void writeNamespaceToZooKeeper(String namespace,
725      PermissionCache<TablePermission> tablePerms) {
726    byte[] serialized = new byte[0];
727    if (tablePerms != null) {
728      serialized = AccessControlLists.writePermissionsAsBytes(tablePerms.getAllPermissions(), conf);
729    }
730    zkperms.writeToZookeeper(Bytes.toBytes(AccessControlLists.toNamespaceEntry(namespace)),
731        serialized);
732  }
733
734  public long getMTime() {
735    return mtime.get();
736  }
737
738  private static Map<ZKWatcher,TableAuthManager> managerMap = new HashMap<>();
739
740  private static Map<TableAuthManager, Integer> refCount = new HashMap<>();
741
742  /** Returns a TableAuthManager from the cache. If not cached, constructs a new one. Returned
743   * instance should be released back by calling {@link #release(TableAuthManager)}. */
744  public synchronized static TableAuthManager getOrCreate(
745          ZKWatcher watcher, Configuration conf) throws IOException {
746    TableAuthManager instance = managerMap.get(watcher);
747    if (instance == null) {
748      instance = new TableAuthManager(watcher, conf);
749      managerMap.put(watcher, instance);
750    }
751    int ref = refCount.get(instance) == null ? 0 : refCount.get(instance).intValue();
752    refCount.put(instance, ref + 1);
753    return instance;
754  }
755
756  @VisibleForTesting
757  public static int getTotalRefCount() {
758    int total = 0;
759    for (int count : refCount.values()) {
760      total += count;
761    }
762    return total;
763  }
764
765  /**
766   * Releases the resources for the given TableAuthManager if the reference count is down to 0.
767   * @param instance TableAuthManager to be released
768   */
769  public synchronized static void release(TableAuthManager instance) {
770    if (refCount.get(instance) == null || refCount.get(instance) < 1) {
771      String msg = "Something wrong with the TableAuthManager reference counting: " + instance
772          + " whose count is " + refCount.get(instance);
773      LOG.error(HBaseMarkers.FATAL, msg);
774      instance.close();
775      managerMap.remove(instance.getZKPermissionWatcher().getWatcher());
776      instance.getZKPermissionWatcher().getWatcher().abort(msg, null);
777    } else {
778      int ref = refCount.get(instance);
779      refCount.put(instance, ref-1);
780      if (ref-1 == 0) {
781        instance.close();
782        managerMap.remove(instance.getZKPermissionWatcher().getWatcher());
783        refCount.remove(instance);
784      }
785    }
786  }
787}