View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.security.access;
20  
21  import java.io.IOException;
22  import java.util.HashMap;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.concurrent.ConcurrentSkipListMap;
26  import java.util.concurrent.atomic.AtomicLong;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.hbase.AuthUtil;
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.hbase.Cell;
34  import org.apache.hadoop.hbase.TableName;
35  import org.apache.hadoop.hbase.exceptions.DeserializationException;
36  import org.apache.hadoop.hbase.security.Superusers;
37  import org.apache.hadoop.hbase.security.User;
38  import org.apache.hadoop.hbase.security.UserProvider;
39  import org.apache.hadoop.hbase.util.Bytes;
40  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
41  import org.apache.zookeeper.KeeperException;
42  
43  import com.google.common.collect.ArrayListMultimap;
44  import com.google.common.collect.ListMultimap;
45  import com.google.common.collect.Lists;
46  
47  /**
48   * Performs authorization checks for a given user's assigned permissions
49   */
50  @InterfaceAudience.Private
51  public class TableAuthManager {
52    private static class PermissionCache<T extends Permission> {
53      /** Cache of user permissions */
54      private ListMultimap<String,T> userCache = ArrayListMultimap.create();
55      /** Cache of group permissions */
56      private ListMultimap<String,T> groupCache = ArrayListMultimap.create();
57  
58      public List<T> getUser(String user) {
59        return userCache.get(user);
60      }
61  
62      public void putUser(String user, T perm) {
63        userCache.put(user, perm);
64      }
65  
66      public List<T> replaceUser(String user, Iterable<? extends T> perms) {
67        return userCache.replaceValues(user, perms);
68      }
69  
70      public List<T> getGroup(String group) {
71        return groupCache.get(group);
72      }
73  
74      public void putGroup(String group, T perm) {
75        groupCache.put(group, perm);
76      }
77  
78      public List<T> replaceGroup(String group, Iterable<? extends T> perms) {
79        return groupCache.replaceValues(group, perms);
80      }
81  
82      /**
83       * Returns a combined map of user and group permissions, with group names prefixed by
84       * {@link AuthUtil#GROUP_PREFIX}.
85       */
86      public ListMultimap<String,T> getAllPermissions() {
87        ListMultimap<String,T> tmp = ArrayListMultimap.create();
88        tmp.putAll(userCache);
89        for (String group : groupCache.keySet()) {
90          tmp.putAll(AuthUtil.toGroupEntry(group), groupCache.get(group));
91        }
92        return tmp;
93      }
94    }
95  
96    private static final Log LOG = LogFactory.getLog(TableAuthManager.class);
97  
98    private static TableAuthManager instance;
99  
100   /** Cache of global permissions */
101   private volatile PermissionCache<Permission> globalCache;
102 
103   private ConcurrentSkipListMap<TableName, PermissionCache<TablePermission>> tableCache =
104       new ConcurrentSkipListMap<TableName, PermissionCache<TablePermission>>();
105 
106   private ConcurrentSkipListMap<String, PermissionCache<TablePermission>> nsCache =
107     new ConcurrentSkipListMap<String, PermissionCache<TablePermission>>();
108 
109   private Configuration conf;
110   private ZKPermissionWatcher zkperms;
111   private AtomicLong mtime = new AtomicLong(0);
112 
113   private TableAuthManager(ZooKeeperWatcher watcher, Configuration conf)
114       throws IOException {
115     this.conf = conf;
116 
117     // initialize global permissions based on configuration
118     globalCache = initGlobal(conf);
119 
120     this.zkperms = new ZKPermissionWatcher(watcher, this, conf);
121     try {
122       this.zkperms.start();
123     } catch (KeeperException ke) {
124       LOG.error("ZooKeeper initialization failed", ke);
125     }
126   }
127 
128   /**
129    * Returns a new {@code PermissionCache} initialized with permission assignments
130    * from the {@code hbase.superuser} configuration key.
131    */
132   private PermissionCache<Permission> initGlobal(Configuration conf) throws IOException {
133     UserProvider userProvider = UserProvider.instantiate(conf);
134     User user = userProvider.getCurrent();
135     if (user == null) {
136       throw new IOException("Unable to obtain the current user, " +
137           "authorization checks for internal operations will not work correctly!");
138     }
139     PermissionCache<Permission> newCache = new PermissionCache<Permission>();
140     String currentUser = user.getShortName();
141 
142     // the system user is always included
143     List<String> superusers = Lists.asList(currentUser, conf.getStrings(
144         Superusers.SUPERUSER_CONF_KEY, new String[0]));
145     if (superusers != null) {
146       for (String name : superusers) {
147         if (AuthUtil.isGroupPrincipal(name)) {
148           newCache.putGroup(AuthUtil.getGroupName(name),
149               new Permission(Permission.Action.values()));
150         } else {
151           newCache.putUser(name, new Permission(Permission.Action.values()));
152         }
153       }
154     }
155     return newCache;
156   }
157 
158   public ZKPermissionWatcher getZKPermissionWatcher() {
159     return this.zkperms;
160   }
161 
162   public void refreshTableCacheFromWritable(TableName table,
163                                        byte[] data) throws IOException {
164     if (data != null && data.length > 0) {
165       ListMultimap<String,TablePermission> perms;
166       try {
167         perms = AccessControlLists.readPermissions(data, conf);
168       } catch (DeserializationException e) {
169         throw new IOException(e);
170       }
171 
172       if (perms != null) {
173         if (Bytes.equals(table.getName(), AccessControlLists.ACL_GLOBAL_NAME)) {
174           updateGlobalCache(perms);
175         } else {
176           updateTableCache(table, perms);
177         }
178       }
179     } else {
180       LOG.debug("Skipping permission cache refresh because writable data is empty");
181     }
182   }
183 
184   public void refreshNamespaceCacheFromWritable(String namespace, byte[] data) throws IOException {
185     if (data != null && data.length > 0) {
186       ListMultimap<String,TablePermission> perms;
187       try {
188         perms = AccessControlLists.readPermissions(data, conf);
189       } catch (DeserializationException e) {
190         throw new IOException(e);
191       }
192       if (perms != null) {
193         updateNsCache(namespace, perms);
194       }
195     } else {
196       LOG.debug("Skipping permission cache refresh because writable data is empty");
197     }
198   }
199 
200   /**
201    * Updates the internal global permissions cache
202    *
203    * @param userPerms
204    */
205   private void updateGlobalCache(ListMultimap<String,TablePermission> userPerms) {
206     PermissionCache<Permission> newCache = null;
207     try {
208       newCache = initGlobal(conf);
209       for (Map.Entry<String,TablePermission> entry : userPerms.entries()) {
210         if (AuthUtil.isGroupPrincipal(entry.getKey())) {
211           newCache.putGroup(AuthUtil.getGroupName(entry.getKey()),
212               new Permission(entry.getValue().getActions()));
213         } else {
214           newCache.putUser(entry.getKey(), new Permission(entry.getValue().getActions()));
215         }
216       }
217       globalCache = newCache;
218       mtime.incrementAndGet();
219     } catch (IOException e) {
220       // Never happens
221       LOG.error("Error occured while updating the global cache", e);
222     }
223   }
224 
225   /**
226    * Updates the internal permissions cache for a single table, splitting
227    * the permissions listed into separate caches for users and groups to optimize
228    * group lookups.
229    *
230    * @param table
231    * @param tablePerms
232    */
233   private void updateTableCache(TableName table,
234                                 ListMultimap<String,TablePermission> tablePerms) {
235     PermissionCache<TablePermission> newTablePerms = new PermissionCache<TablePermission>();
236 
237     for (Map.Entry<String,TablePermission> entry : tablePerms.entries()) {
238       if (AuthUtil.isGroupPrincipal(entry.getKey())) {
239         newTablePerms.putGroup(AuthUtil.getGroupName(entry.getKey()), entry.getValue());
240       } else {
241         newTablePerms.putUser(entry.getKey(), entry.getValue());
242       }
243     }
244 
245     tableCache.put(table, newTablePerms);
246     mtime.incrementAndGet();
247   }
248 
249   /**
250    * Updates the internal permissions cache for a single table, splitting
251    * the permissions listed into separate caches for users and groups to optimize
252    * group lookups.
253    *
254    * @param namespace
255    * @param tablePerms
256    */
257   private void updateNsCache(String namespace,
258                              ListMultimap<String, TablePermission> tablePerms) {
259     PermissionCache<TablePermission> newTablePerms = new PermissionCache<TablePermission>();
260 
261     for (Map.Entry<String, TablePermission> entry : tablePerms.entries()) {
262       if (AuthUtil.isGroupPrincipal(entry.getKey())) {
263         newTablePerms.putGroup(AuthUtil.getGroupName(entry.getKey()), entry.getValue());
264       } else {
265         newTablePerms.putUser(entry.getKey(), entry.getValue());
266       }
267     }
268 
269     nsCache.put(namespace, newTablePerms);
270     mtime.incrementAndGet();
271   }
272 
273   private PermissionCache<TablePermission> getTablePermissions(TableName table) {
274     if (!tableCache.containsKey(table)) {
275       tableCache.putIfAbsent(table, new PermissionCache<TablePermission>());
276     }
277     return tableCache.get(table);
278   }
279 
280   private PermissionCache<TablePermission> getNamespacePermissions(String namespace) {
281     if (!nsCache.containsKey(namespace)) {
282       nsCache.putIfAbsent(namespace, new PermissionCache<TablePermission>());
283     }
284     return nsCache.get(namespace);
285   }
286 
287   /**
288    * Authorizes a global permission
289    * @param perms
290    * @param action
291    * @return true if authorized, false otherwise
292    */
293   private boolean authorize(List<Permission> perms, Permission.Action action) {
294     if (perms != null) {
295       for (Permission p : perms) {
296         if (p.implies(action)) {
297           return true;
298         }
299       }
300     } else if (LOG.isDebugEnabled()) {
301       LOG.debug("No permissions found for " + action);
302     }
303 
304     return false;
305   }
306 
307   /**
308    * Authorize a global permission based on ACLs for the given user and the
309    * user's groups.
310    * @param user
311    * @param action
312    * @return true if known and authorized, false otherwise
313    */
314   public boolean authorize(User user, Permission.Action action) {
315     if (user == null) {
316       return false;
317     }
318 
319     if (authorize(globalCache.getUser(user.getShortName()), action)) {
320       return true;
321     }
322 
323     String[] groups = user.getGroupNames();
324     if (groups != null) {
325       for (String group : groups) {
326         if (authorize(globalCache.getGroup(group), action)) {
327           return true;
328         }
329       }
330     }
331     return false;
332   }
333 
334   private boolean authorize(List<TablePermission> perms,
335                             TableName table, byte[] family,
336                             byte[] qualifier, Permission.Action action) {
337     if (perms != null) {
338       for (TablePermission p : perms) {
339         if (p.implies(table, family, qualifier, action)) {
340           return true;
341         }
342       }
343     } else if (LOG.isDebugEnabled()) {
344       LOG.debug("No permissions found for table="+table);
345     }
346     return false;
347   }
348 
349   private boolean hasAccess(List<TablePermission> perms,
350                             TableName table, Permission.Action action) {
351     if (perms != null) {
352       for (TablePermission p : perms) {
353         if (p.implies(action)) {
354           return true;
355         }
356       }
357     } else if (LOG.isDebugEnabled()) {
358       LOG.debug("No permissions found for table="+table);
359     }
360     return false;
361   }
362 
363   /**
364    * Authorize a user for a given KV. This is called from AccessControlFilter.
365    */
366   public boolean authorize(User user, TableName table, Cell cell, Permission.Action action) {
367     try {
368       List<Permission> perms = AccessControlLists.getCellPermissionsForUser(user, cell);
369       if (LOG.isTraceEnabled()) {
370         LOG.trace("Perms for user " + user.getShortName() + " in cell " + cell + ": " +
371           (perms != null ? perms : ""));
372       }
373       if (perms != null) {
374         for (Permission p: perms) {
375           if (p.implies(action)) {
376             return true;
377           }
378         }
379       }
380     } catch (IOException e) {
381       // We failed to parse the KV tag
382       LOG.error("Failed parse of ACL tag in cell " + cell);
383       // Fall through to check with the table and CF perms we were able
384       // to collect regardless
385     }
386     return false;
387   }
388 
389   public boolean authorize(User user, String namespace, Permission.Action action) {
390     // Global authorizations supercede namespace level
391     if (authorize(user, action)) {
392       return true;
393     }
394     // Check namespace permissions
395     PermissionCache<TablePermission> tablePerms = nsCache.get(namespace);
396     if (tablePerms != null) {
397       List<TablePermission> userPerms = tablePerms.getUser(user.getShortName());
398       if (authorize(userPerms, namespace, action)) {
399         return true;
400       }
401       String[] groupNames = user.getGroupNames();
402       if (groupNames != null) {
403         for (String group : groupNames) {
404           List<TablePermission> groupPerms = tablePerms.getGroup(group);
405           if (authorize(groupPerms, namespace, action)) {
406             return true;
407           }
408         }
409       }
410     }
411     return false;
412   }
413 
414   private boolean authorize(List<TablePermission> perms, String namespace,
415                             Permission.Action action) {
416     if (perms != null) {
417       for (TablePermission p : perms) {
418         if (p.implies(namespace, action)) {
419           return true;
420         }
421       }
422     } else if (LOG.isDebugEnabled()) {
423       LOG.debug("No permissions for authorize() check, table=" + namespace);
424     }
425 
426     return false;
427   }
428 
429   /**
430    * Checks authorization to a given table and column family for a user, based on the
431    * stored user permissions.
432    *
433    * @param user
434    * @param table
435    * @param family
436    * @param action
437    * @return true if known and authorized, false otherwise
438    */
439   public boolean authorizeUser(User user, TableName table, byte[] family,
440       Permission.Action action) {
441     return authorizeUser(user, table, family, null, action);
442   }
443 
444   public boolean authorizeUser(User user, TableName table, byte[] family,
445       byte[] qualifier, Permission.Action action) {
446     if (table == null) table = AccessControlLists.ACL_TABLE_NAME;
447     // Global and namespace authorizations supercede table level
448     if (authorize(user, table.getNamespaceAsString(), action)) {
449       return true;
450     }
451     // Check table permissions
452     return authorize(getTablePermissions(table).getUser(user.getShortName()), table, family,
453         qualifier, action);
454   }
455 
456   /**
457    * Checks if the user has access to the full table or at least a family/qualifier
458    * for the specified action.
459    *
460    * @param user
461    * @param table
462    * @param action
463    * @return true if the user has access to the table, false otherwise
464    */
465   public boolean userHasAccess(User user, TableName table, Permission.Action action) {
466     if (table == null) table = AccessControlLists.ACL_TABLE_NAME;
467     // Global and namespace authorizations supercede table level
468     if (authorize(user, table.getNamespaceAsString(), action)) {
469       return true;
470     }
471     // Check table permissions
472     return hasAccess(getTablePermissions(table).getUser(user.getShortName()), table, action);
473   }
474 
475   /**
476    * Checks global authorization for a given action for a group, based on the stored
477    * permissions.
478    */
479   public boolean authorizeGroup(String groupName, Permission.Action action) {
480     List<Permission> perms = globalCache.getGroup(groupName);
481     if (LOG.isDebugEnabled()) {
482       LOG.debug("authorizing " + (perms != null && !perms.isEmpty() ? perms.get(0) : "") +
483         " for " + action);
484     }
485     return authorize(perms, action);
486   }
487 
488   /**
489    * Checks authorization to a given table, column family and column for a group, based
490    * on the stored permissions.
491    * @param groupName
492    * @param table
493    * @param family
494    * @param qualifier
495    * @param action
496    * @return true if known and authorized, false otherwise
497    */
498   public boolean authorizeGroup(String groupName, TableName table, byte[] family,
499       byte[] qualifier, Permission.Action action) {
500     // Global authorization supercedes table level
501     if (authorizeGroup(groupName, action)) {
502       return true;
503     }
504     if (table == null) table = AccessControlLists.ACL_TABLE_NAME;
505     // Namespace authorization supercedes table level
506     String namespace = table.getNamespaceAsString();
507     if (authorize(getNamespacePermissions(namespace).getGroup(groupName), namespace, action)) {
508       return true;
509     }
510     // Check table level
511     List<TablePermission> tblPerms = getTablePermissions(table).getGroup(groupName);
512     if (LOG.isDebugEnabled()) {
513       LOG.debug("authorizing " + (tblPerms != null && !tblPerms.isEmpty() ? tblPerms.get(0) : "") +
514         " for " +groupName + " on " + table + "." + Bytes.toString(family) + "." +
515         Bytes.toString(qualifier) + " with " + action);
516     }
517     return authorize(tblPerms, table, family, qualifier, action);
518   }
519 
520   /**
521    * Checks if the user has access to the full table or at least a family/qualifier
522    * for the specified action.
523    * @param groupName
524    * @param table
525    * @param action
526    * @return true if the group has access to the table, false otherwise
527    */
528   public boolean groupHasAccess(String groupName, TableName table, Permission.Action action) {
529     // Global authorization supercedes table level
530     if (authorizeGroup(groupName, action)) {
531       return true;
532     }
533     if (table == null) table = AccessControlLists.ACL_TABLE_NAME;
534     // Namespace authorization supercedes table level
535     if (hasAccess(getNamespacePermissions(table.getNamespaceAsString()).getGroup(groupName),
536         table, action)) {
537       return true;
538     }
539     // Check table level
540     return hasAccess(getTablePermissions(table).getGroup(groupName), table, action);
541   }
542 
543   public boolean authorize(User user, TableName table, byte[] family,
544       byte[] qualifier, Permission.Action action) {
545     if (authorizeUser(user, table, family, qualifier, action)) {
546       return true;
547     }
548 
549     String[] groups = user.getGroupNames();
550     if (groups != null) {
551       for (String group : groups) {
552         if (authorizeGroup(group, table, family, qualifier, action)) {
553           return true;
554         }
555       }
556     }
557     return false;
558   }
559 
560   public boolean hasAccess(User user, TableName table, Permission.Action action) {
561     if (userHasAccess(user, table, action)) {
562       return true;
563     }
564 
565     String[] groups = user.getGroupNames();
566     if (groups != null) {
567       for (String group : groups) {
568         if (groupHasAccess(group, table, action)) {
569           return true;
570         }
571       }
572     }
573     return false;
574   }
575 
576   public boolean authorize(User user, TableName table, byte[] family,
577       Permission.Action action) {
578     return authorize(user, table, family, null, action);
579   }
580 
581   /**
582    * Returns true if the given user has a {@link TablePermission} matching up
583    * to the column family portion of a permission.  Note that this permission
584    * may be scoped to a given column qualifier and does not guarantee that
585    * authorize() on the same column family would return true.
586    */
587   public boolean matchPermission(User user,
588       TableName table, byte[] family, Permission.Action action) {
589     PermissionCache<TablePermission> tablePerms = tableCache.get(table);
590     if (tablePerms != null) {
591       List<TablePermission> userPerms = tablePerms.getUser(user.getShortName());
592       if (userPerms != null) {
593         for (TablePermission p : userPerms) {
594           if (p.matchesFamily(table, family, action)) {
595             return true;
596           }
597         }
598       }
599 
600       String[] groups = user.getGroupNames();
601       if (groups != null) {
602         for (String group : groups) {
603           List<TablePermission> groupPerms = tablePerms.getGroup(group);
604           if (groupPerms != null) {
605             for (TablePermission p : groupPerms) {
606               if (p.matchesFamily(table, family, action)) {
607                 return true;
608               }
609             }
610           }
611         }
612       }
613     }
614 
615     return false;
616   }
617 
618   public boolean matchPermission(User user,
619       TableName table, byte[] family, byte[] qualifier,
620       Permission.Action action) {
621     PermissionCache<TablePermission> tablePerms = tableCache.get(table);
622     if (tablePerms != null) {
623       List<TablePermission> userPerms = tablePerms.getUser(user.getShortName());
624       if (userPerms != null) {
625         for (TablePermission p : userPerms) {
626           if (p.matchesFamilyQualifier(table, family, qualifier, action)) {
627             return true;
628           }
629         }
630       }
631 
632       String[] groups = user.getGroupNames();
633       if (groups != null) {
634         for (String group : groups) {
635           List<TablePermission> groupPerms = tablePerms.getGroup(group);
636           if (groupPerms != null) {
637             for (TablePermission p : groupPerms) {
638               if (p.matchesFamilyQualifier(table, family, qualifier, action)) {
639                 return true;
640               }
641             }
642           }
643         }
644       }
645     }
646     return false;
647   }
648 
649   public void removeNamespace(byte[] ns) {
650     nsCache.remove(Bytes.toString(ns));
651   }
652 
653   public void removeTable(TableName table) {
654     tableCache.remove(table);
655   }
656 
657   /**
658    * Overwrites the existing permission set for a given user for a table, and
659    * triggers an update for zookeeper synchronization.
660    * @param username
661    * @param table
662    * @param perms
663    */
664   public void setTableUserPermissions(String username, TableName table,
665       List<TablePermission> perms) {
666     PermissionCache<TablePermission> tablePerms = getTablePermissions(table);
667     tablePerms.replaceUser(username, perms);
668     writeTableToZooKeeper(table, tablePerms);
669   }
670 
671   /**
672    * Overwrites the existing permission set for a group and triggers an update
673    * for zookeeper synchronization.
674    * @param group
675    * @param table
676    * @param perms
677    */
678   public void setTableGroupPermissions(String group, TableName table,
679       List<TablePermission> perms) {
680     PermissionCache<TablePermission> tablePerms = getTablePermissions(table);
681     tablePerms.replaceGroup(group, perms);
682     writeTableToZooKeeper(table, tablePerms);
683   }
684 
685   /**
686    * Overwrites the existing permission set for a given user for a table, and
687    * triggers an update for zookeeper synchronization.
688    * @param username
689    * @param namespace
690    * @param perms
691    */
692   public void setNamespaceUserPermissions(String username, String namespace,
693       List<TablePermission> perms) {
694     PermissionCache<TablePermission> tablePerms = getNamespacePermissions(namespace);
695     tablePerms.replaceUser(username, perms);
696     writeNamespaceToZooKeeper(namespace, tablePerms);
697   }
698 
699   /**
700    * Overwrites the existing permission set for a group and triggers an update
701    * for zookeeper synchronization.
702    * @param group
703    * @param namespace
704    * @param perms
705    */
706   public void setNamespaceGroupPermissions(String group, String namespace,
707       List<TablePermission> perms) {
708     PermissionCache<TablePermission> tablePerms = getNamespacePermissions(namespace);
709     tablePerms.replaceGroup(group, perms);
710     writeNamespaceToZooKeeper(namespace, tablePerms);
711   }
712 
713   public void writeTableToZooKeeper(TableName table,
714       PermissionCache<TablePermission> tablePerms) {
715     byte[] serialized = new byte[0];
716     if (tablePerms != null) {
717       serialized = AccessControlLists.writePermissionsAsBytes(tablePerms.getAllPermissions(), conf);
718     }
719     zkperms.writeToZookeeper(table.getName(), serialized);
720   }
721 
722   public void writeNamespaceToZooKeeper(String namespace,
723       PermissionCache<TablePermission> tablePerms) {
724     byte[] serialized = new byte[0];
725     if (tablePerms != null) {
726       serialized = AccessControlLists.writePermissionsAsBytes(tablePerms.getAllPermissions(), conf);
727     }
728     zkperms.writeToZookeeper(Bytes.toBytes(AccessControlLists.toNamespaceEntry(namespace)),
729         serialized);
730   }
731 
732   public long getMTime() {
733     return mtime.get();
734   }
735 
736   static Map<ZooKeeperWatcher,TableAuthManager> managerMap =
737     new HashMap<ZooKeeperWatcher,TableAuthManager>();
738 
739   public synchronized static TableAuthManager get(
740       ZooKeeperWatcher watcher, Configuration conf) throws IOException {
741     instance = managerMap.get(watcher);
742     if (instance == null) {
743       instance = new TableAuthManager(watcher, conf);
744       managerMap.put(watcher, instance);
745     }
746     return instance;
747   }
748 }