View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.security.access;
20  
21  import java.io.IOException;
22  import java.util.HashMap;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.concurrent.ConcurrentSkipListMap;
26  import java.util.concurrent.atomic.AtomicLong;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.hbase.AuthUtil;
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.hbase.Cell;
34  import org.apache.hadoop.hbase.TableName;
35  import org.apache.hadoop.hbase.exceptions.DeserializationException;
36  import org.apache.hadoop.hbase.security.Superusers;
37  import org.apache.hadoop.hbase.security.User;
38  import org.apache.hadoop.hbase.security.UserProvider;
39  import org.apache.hadoop.hbase.util.Bytes;
40  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
41  import org.apache.zookeeper.KeeperException;
42  
43  import com.google.common.collect.ArrayListMultimap;
44  import com.google.common.collect.ListMultimap;
45  import com.google.common.collect.Lists;
46  
47  /**
48   * Performs authorization checks for a given user's assigned permissions
49   */
50  @InterfaceAudience.Private
51  public class TableAuthManager {
52    private static class PermissionCache<T extends Permission> {
53      /** Cache of user permissions */
54      private ListMultimap<String,T> userCache = ArrayListMultimap.create();
55      /** Cache of group permissions */
56      private ListMultimap<String,T> groupCache = ArrayListMultimap.create();
57  
58      public List<T> getUser(String user) {
59        return userCache.get(user);
60      }
61  
62      public void putUser(String user, T perm) {
63        userCache.put(user, perm);
64      }
65  
66      public List<T> replaceUser(String user, Iterable<? extends T> perms) {
67        return userCache.replaceValues(user, perms);
68      }
69  
70      public List<T> getGroup(String group) {
71        return groupCache.get(group);
72      }
73  
74      public void putGroup(String group, T perm) {
75        groupCache.put(group, perm);
76      }
77  
78      public List<T> replaceGroup(String group, Iterable<? extends T> perms) {
79        return groupCache.replaceValues(group, perms);
80      }
81  
82      /**
83       * Returns a combined map of user and group permissions, with group names prefixed by
84       * {@link AuthUtil#GROUP_PREFIX}.
85       */
86      public ListMultimap<String,T> getAllPermissions() {
87        ListMultimap<String,T> tmp = ArrayListMultimap.create();
88        tmp.putAll(userCache);
89        for (String group : groupCache.keySet()) {
90          tmp.putAll(AuthUtil.toGroupEntry(group), groupCache.get(group));
91        }
92        return tmp;
93      }
94    }
95  
96    private static final Log LOG = LogFactory.getLog(TableAuthManager.class);
97  
98    private static TableAuthManager instance;
99  
100   /** Cache of global permissions */
101   private volatile PermissionCache<Permission> globalCache;
102 
103   private ConcurrentSkipListMap<TableName, PermissionCache<TablePermission>> tableCache =
104       new ConcurrentSkipListMap<TableName, PermissionCache<TablePermission>>();
105 
106   private ConcurrentSkipListMap<String, PermissionCache<TablePermission>> nsCache =
107     new ConcurrentSkipListMap<String, PermissionCache<TablePermission>>();
108 
109   private Configuration conf;
110   private ZKPermissionWatcher zkperms;
111   private final AtomicLong mtime = new AtomicLong(0L);
112 
113   private TableAuthManager(ZooKeeperWatcher watcher, Configuration conf)
114       throws IOException {
115     this.conf = conf;
116 
117     // initialize global permissions based on configuration
118     globalCache = initGlobal(conf);
119 
120     this.zkperms = new ZKPermissionWatcher(watcher, this, conf);
121     try {
122       this.zkperms.start();
123     } catch (KeeperException ke) {
124       LOG.error("ZooKeeper initialization failed", ke);
125     }
126   }
127 
128   /**
129    * Returns a new {@code PermissionCache} initialized with permission assignments
130    * from the {@code hbase.superuser} configuration key.
131    */
132   private PermissionCache<Permission> initGlobal(Configuration conf) throws IOException {
133     UserProvider userProvider = UserProvider.instantiate(conf);
134     User user = userProvider.getCurrent();
135     if (user == null) {
136       throw new IOException("Unable to obtain the current user, " +
137           "authorization checks for internal operations will not work correctly!");
138     }
139     PermissionCache<Permission> newCache = new PermissionCache<Permission>();
140     String currentUser = user.getShortName();
141 
142     // the system user is always included
143     List<String> superusers = Lists.asList(currentUser, conf.getStrings(
144         Superusers.SUPERUSER_CONF_KEY, new String[0]));
145     if (superusers != null) {
146       for (String name : superusers) {
147         if (AuthUtil.isGroupPrincipal(name)) {
148           newCache.putGroup(AuthUtil.getGroupName(name),
149               new Permission(Permission.Action.values()));
150         } else {
151           newCache.putUser(name, new Permission(Permission.Action.values()));
152         }
153       }
154     }
155     return newCache;
156   }
157 
158   public ZKPermissionWatcher getZKPermissionWatcher() {
159     return this.zkperms;
160   }
161 
162   public void refreshTableCacheFromWritable(TableName table,
163                                        byte[] data) throws IOException {
164     if (data != null && data.length > 0) {
165       ListMultimap<String,TablePermission> perms;
166       try {
167         perms = AccessControlLists.readPermissions(data, conf);
168       } catch (DeserializationException e) {
169         throw new IOException(e);
170       }
171 
172       if (perms != null) {
173         if (Bytes.equals(table.getName(), AccessControlLists.ACL_GLOBAL_NAME)) {
174           updateGlobalCache(perms);
175         } else {
176           updateTableCache(table, perms);
177         }
178       }
179     } else {
180       LOG.debug("Skipping permission cache refresh because writable data is empty");
181     }
182   }
183 
184   public void refreshNamespaceCacheFromWritable(String namespace, byte[] data) throws IOException {
185     if (data != null && data.length > 0) {
186       ListMultimap<String,TablePermission> perms;
187       try {
188         perms = AccessControlLists.readPermissions(data, conf);
189       } catch (DeserializationException e) {
190         throw new IOException(e);
191       }
192       if (perms != null) {
193         updateNsCache(namespace, perms);
194       }
195     } else {
196       LOG.debug("Skipping permission cache refresh because writable data is empty");
197     }
198   }
199 
200   /**
201    * Updates the internal global permissions cache
202    *
203    * @param userPerms
204    */
205   private void updateGlobalCache(ListMultimap<String,TablePermission> userPerms) {
206     PermissionCache<Permission> newCache = null;
207     try {
208       newCache = initGlobal(conf);
209       for (Map.Entry<String,TablePermission> entry : userPerms.entries()) {
210         if (AuthUtil.isGroupPrincipal(entry.getKey())) {
211           newCache.putGroup(AuthUtil.getGroupName(entry.getKey()),
212               new Permission(entry.getValue().getActions()));
213         } else {
214           newCache.putUser(entry.getKey(), new Permission(entry.getValue().getActions()));
215         }
216       }
217       globalCache = newCache;
218       mtime.incrementAndGet();
219     } catch (IOException e) {
220       // Never happens
221       LOG.error("Error occured while updating the global cache", e);
222     }
223   }
224 
225   /**
226    * Updates the internal permissions cache for a single table, splitting
227    * the permissions listed into separate caches for users and groups to optimize
228    * group lookups.
229    *
230    * @param table
231    * @param tablePerms
232    */
233   private void updateTableCache(TableName table,
234                                 ListMultimap<String,TablePermission> tablePerms) {
235     PermissionCache<TablePermission> newTablePerms = new PermissionCache<TablePermission>();
236 
237     for (Map.Entry<String,TablePermission> entry : tablePerms.entries()) {
238       if (AuthUtil.isGroupPrincipal(entry.getKey())) {
239         newTablePerms.putGroup(AuthUtil.getGroupName(entry.getKey()), entry.getValue());
240       } else {
241         newTablePerms.putUser(entry.getKey(), entry.getValue());
242       }
243     }
244 
245     tableCache.put(table, newTablePerms);
246     mtime.incrementAndGet();
247   }
248 
249   /**
250    * Updates the internal permissions cache for a single table, splitting
251    * the permissions listed into separate caches for users and groups to optimize
252    * group lookups.
253    *
254    * @param namespace
255    * @param tablePerms
256    */
257   private void updateNsCache(String namespace,
258                              ListMultimap<String, TablePermission> tablePerms) {
259     PermissionCache<TablePermission> newTablePerms = new PermissionCache<TablePermission>();
260 
261     for (Map.Entry<String, TablePermission> entry : tablePerms.entries()) {
262       if (AuthUtil.isGroupPrincipal(entry.getKey())) {
263         newTablePerms.putGroup(AuthUtil.getGroupName(entry.getKey()), entry.getValue());
264       } else {
265         newTablePerms.putUser(entry.getKey(), entry.getValue());
266       }
267     }
268 
269     nsCache.put(namespace, newTablePerms);
270     mtime.incrementAndGet();
271   }
272 
273   private PermissionCache<TablePermission> getTablePermissions(TableName table) {
274     if (!tableCache.containsKey(table)) {
275       tableCache.putIfAbsent(table, new PermissionCache<TablePermission>());
276     }
277     return tableCache.get(table);
278   }
279 
280   private PermissionCache<TablePermission> getNamespacePermissions(String namespace) {
281     if (!nsCache.containsKey(namespace)) {
282       nsCache.putIfAbsent(namespace, new PermissionCache<TablePermission>());
283     }
284     return nsCache.get(namespace);
285   }
286 
287   /**
288    * Authorizes a global permission
289    * @param perms
290    * @param action
291    * @return true if authorized, false otherwise
292    */
293   private boolean authorize(List<Permission> perms, Permission.Action action) {
294     if (perms != null) {
295       for (Permission p : perms) {
296         if (p.implies(action)) {
297           return true;
298         }
299       }
300     } else if (LOG.isDebugEnabled()) {
301       LOG.debug("No permissions found for " + action);
302     }
303 
304     return false;
305   }
306 
307   /**
308    * Authorize a global permission based on ACLs for the given user and the
309    * user's groups.
310    * @param user
311    * @param action
312    * @return true if known and authorized, false otherwise
313    */
314   public boolean authorize(User user, Permission.Action action) {
315     if (user == null) {
316       return false;
317     }
318 
319     if (authorize(globalCache.getUser(user.getShortName()), action)) {
320       return true;
321     }
322 
323     String[] groups = user.getGroupNames();
324     if (groups != null) {
325       for (String group : groups) {
326         if (authorize(globalCache.getGroup(group), action)) {
327           return true;
328         }
329       }
330     }
331     return false;
332   }
333 
334   private boolean authorize(List<TablePermission> perms,
335                             TableName table, byte[] family,
336                             Permission.Action action) {
337     return authorize(perms, table, family, null, action);
338   }
339 
340   private boolean authorize(List<TablePermission> perms,
341                             TableName table, byte[] family,
342                             byte[] qualifier, Permission.Action action) {
343     if (perms != null) {
344       for (TablePermission p : perms) {
345         if (p.implies(table, family, qualifier, action)) {
346           return true;
347         }
348       }
349     } else if (LOG.isDebugEnabled()) {
350       LOG.debug("No permissions found for table="+table);
351     }
352     return false;
353   }
354 
355   private boolean hasAccess(List<TablePermission> perms,
356                             TableName table, Permission.Action action) {
357     if (perms != null) {
358       for (TablePermission p : perms) {
359         if (p.implies(action)) {
360           return true;
361         }
362       }
363     } else if (LOG.isDebugEnabled()) {
364       LOG.debug("No permissions found for table="+table);
365     }
366     return false;
367   }
368 
369   /**
370    * Authorize a user for a given KV. This is called from AccessControlFilter.
371    */
372   public boolean authorize(User user, TableName table, Cell cell, Permission.Action action) {
373     try {
374       List<Permission> perms = AccessControlLists.getCellPermissionsForUser(user, cell);
375       if (LOG.isTraceEnabled()) {
376         LOG.trace("Perms for user " + user.getShortName() + " in cell " + cell + ": " +
377           (perms != null ? perms : ""));
378       }
379       if (perms != null) {
380         for (Permission p: perms) {
381           if (p.implies(action)) {
382             return true;
383           }
384         }
385       }
386     } catch (IOException e) {
387       // We failed to parse the KV tag
388       LOG.error("Failed parse of ACL tag in cell " + cell);
389       // Fall through to check with the table and CF perms we were able
390       // to collect regardless
391     }
392     return false;
393   }
394 
395   public boolean authorize(User user, String namespace, Permission.Action action) {
396     // Global authorizations supercede namespace level
397     if (authorize(user, action)) {
398       return true;
399     }
400     // Check namespace permissions
401     PermissionCache<TablePermission> tablePerms = nsCache.get(namespace);
402     if (tablePerms != null) {
403       List<TablePermission> userPerms = tablePerms.getUser(user.getShortName());
404       if (authorize(userPerms, namespace, action)) {
405         return true;
406       }
407       String[] groupNames = user.getGroupNames();
408       if (groupNames != null) {
409         for (String group : groupNames) {
410           List<TablePermission> groupPerms = tablePerms.getGroup(group);
411           if (authorize(groupPerms, namespace, action)) {
412             return true;
413           }
414         }
415       }
416     }
417     return false;
418   }
419 
420   private boolean authorize(List<TablePermission> perms, String namespace,
421                             Permission.Action action) {
422     if (perms != null) {
423       for (TablePermission p : perms) {
424         if (p.implies(namespace, action)) {
425           return true;
426         }
427       }
428     } else if (LOG.isDebugEnabled()) {
429       LOG.debug("No permissions for authorize() check, table=" + namespace);
430     }
431 
432     return false;
433   }
434 
435   /**
436    * Checks authorization to a given table and column family for a user, based on the
437    * stored user permissions.
438    *
439    * @param user
440    * @param table
441    * @param family
442    * @param action
443    * @return true if known and authorized, false otherwise
444    */
445   public boolean authorizeUser(User user, TableName table, byte[] family,
446       Permission.Action action) {
447     return authorizeUser(user, table, family, null, action);
448   }
449 
450   public boolean authorizeUser(User user, TableName table, byte[] family,
451       byte[] qualifier, Permission.Action action) {
452     if (table == null) table = AccessControlLists.ACL_TABLE_NAME;
453     // Global and namespace authorizations supercede table level
454     if (authorize(user, table.getNamespaceAsString(), action)) {
455       return true;
456     }
457     // Check table permissions
458     return authorize(getTablePermissions(table).getUser(user.getShortName()), table, family,
459         qualifier, action);
460   }
461 
462   /**
463    * Checks if the user has access to the full table or at least a family/qualifier
464    * for the specified action.
465    *
466    * @param user
467    * @param table
468    * @param action
469    * @return true if the user has access to the table, false otherwise
470    */
471   public boolean userHasAccess(User user, TableName table, Permission.Action action) {
472     if (table == null) table = AccessControlLists.ACL_TABLE_NAME;
473     // Global and namespace authorizations supercede table level
474     if (authorize(user, table.getNamespaceAsString(), action)) {
475       return true;
476     }
477     // Check table permissions
478     return hasAccess(getTablePermissions(table).getUser(user.getShortName()), table, action);
479   }
480 
481   /**
482    * Checks global authorization for a given action for a group, based on the stored
483    * permissions.
484    */
485   public boolean authorizeGroup(String groupName, Permission.Action action) {
486     List<Permission> perms = globalCache.getGroup(groupName);
487     if (LOG.isDebugEnabled()) {
488       LOG.debug("authorizing " + (perms != null && !perms.isEmpty() ? perms.get(0) : "") +
489         " for " + action);
490     }
491     return authorize(perms, action);
492   }
493 
494   /**
495    * Checks authorization to a given table, column family and column for a group, based
496    * on the stored permissions.
497    * @param groupName
498    * @param table
499    * @param family
500    * @param qualifier
501    * @param action
502    * @return true if known and authorized, false otherwise
503    */
504   public boolean authorizeGroup(String groupName, TableName table, byte[] family,
505       byte[] qualifier, Permission.Action action) {
506     // Global authorization supercedes table level
507     if (authorizeGroup(groupName, action)) {
508       return true;
509     }
510     if (table == null) table = AccessControlLists.ACL_TABLE_NAME;
511     // Namespace authorization supercedes table level
512     String namespace = table.getNamespaceAsString();
513     if (authorize(getNamespacePermissions(namespace).getGroup(groupName), namespace, action)) {
514       return true;
515     }
516     // Check table level
517     List<TablePermission> tblPerms = getTablePermissions(table).getGroup(groupName);
518     if (LOG.isDebugEnabled()) {
519       LOG.debug("authorizing " + (tblPerms != null && !tblPerms.isEmpty() ? tblPerms.get(0) : "") +
520         " for " +groupName + " on " + table + "." + Bytes.toString(family) + "." +
521         Bytes.toString(qualifier) + " with " + action);
522     }
523     return authorize(tblPerms, table, family, qualifier, action);
524   }
525 
526   /**
527    * Checks if the user has access to the full table or at least a family/qualifier
528    * for the specified action.
529    * @param groupName
530    * @param table
531    * @param action
532    * @return true if the group has access to the table, false otherwise
533    */
534   public boolean groupHasAccess(String groupName, TableName table, Permission.Action action) {
535     // Global authorization supercedes table level
536     if (authorizeGroup(groupName, action)) {
537       return true;
538     }
539     if (table == null) table = AccessControlLists.ACL_TABLE_NAME;
540     // Namespace authorization supercedes table level
541     if (hasAccess(getNamespacePermissions(table.getNamespaceAsString()).getGroup(groupName),
542         table, action)) {
543       return true;
544     }
545     // Check table level
546     return hasAccess(getTablePermissions(table).getGroup(groupName), table, action);
547   }
548 
549   public boolean authorize(User user, TableName table, byte[] family,
550       byte[] qualifier, Permission.Action action) {
551     if (authorizeUser(user, table, family, qualifier, action)) {
552       return true;
553     }
554 
555     String[] groups = user.getGroupNames();
556     if (groups != null) {
557       for (String group : groups) {
558         if (authorizeGroup(group, table, family, qualifier, action)) {
559           return true;
560         }
561       }
562     }
563     return false;
564   }
565 
566   public boolean hasAccess(User user, TableName table, Permission.Action action) {
567     if (userHasAccess(user, table, action)) {
568       return true;
569     }
570 
571     String[] groups = user.getGroupNames();
572     if (groups != null) {
573       for (String group : groups) {
574         if (groupHasAccess(group, table, action)) {
575           return true;
576         }
577       }
578     }
579     return false;
580   }
581 
582   public boolean authorize(User user, TableName table, byte[] family,
583       Permission.Action action) {
584     return authorize(user, table, family, null, action);
585   }
586 
587   /**
588    * Returns true if the given user has a {@link TablePermission} matching up
589    * to the column family portion of a permission.  Note that this permission
590    * may be scoped to a given column qualifier and does not guarantee that
591    * authorize() on the same column family would return true.
592    */
593   public boolean matchPermission(User user,
594       TableName table, byte[] family, Permission.Action action) {
595     PermissionCache<TablePermission> tablePerms = tableCache.get(table);
596     if (tablePerms != null) {
597       List<TablePermission> userPerms = tablePerms.getUser(user.getShortName());
598       if (userPerms != null) {
599         for (TablePermission p : userPerms) {
600           if (p.matchesFamily(table, family, action)) {
601             return true;
602           }
603         }
604       }
605 
606       String[] groups = user.getGroupNames();
607       if (groups != null) {
608         for (String group : groups) {
609           List<TablePermission> groupPerms = tablePerms.getGroup(group);
610           if (groupPerms != null) {
611             for (TablePermission p : groupPerms) {
612               if (p.matchesFamily(table, family, action)) {
613                 return true;
614               }
615             }
616           }
617         }
618       }
619     }
620 
621     return false;
622   }
623 
624   public boolean matchPermission(User user,
625       TableName table, byte[] family, byte[] qualifier,
626       Permission.Action action) {
627     PermissionCache<TablePermission> tablePerms = tableCache.get(table);
628     if (tablePerms != null) {
629       List<TablePermission> userPerms = tablePerms.getUser(user.getShortName());
630       if (userPerms != null) {
631         for (TablePermission p : userPerms) {
632           if (p.matchesFamilyQualifier(table, family, qualifier, action)) {
633             return true;
634           }
635         }
636       }
637 
638       String[] groups = user.getGroupNames();
639       if (groups != null) {
640         for (String group : groups) {
641           List<TablePermission> groupPerms = tablePerms.getGroup(group);
642           if (groupPerms != null) {
643             for (TablePermission p : groupPerms) {
644               if (p.matchesFamilyQualifier(table, family, qualifier, action)) {
645                 return true;
646               }
647             }
648           }
649         }
650       }
651     }
652     return false;
653   }
654 
655   public void removeNamespace(byte[] ns) {
656     nsCache.remove(Bytes.toString(ns));
657   }
658 
659   public void removeTable(TableName table) {
660     tableCache.remove(table);
661   }
662 
663   /**
664    * Overwrites the existing permission set for a given user for a table, and
665    * triggers an update for zookeeper synchronization.
666    * @param username
667    * @param table
668    * @param perms
669    */
670   public void setTableUserPermissions(String username, TableName table,
671       List<TablePermission> perms) {
672     PermissionCache<TablePermission> tablePerms = getTablePermissions(table);
673     tablePerms.replaceUser(username, perms);
674     writeTableToZooKeeper(table, tablePerms);
675   }
676 
677   /**
678    * Overwrites the existing permission set for a group and triggers an update
679    * for zookeeper synchronization.
680    * @param group
681    * @param table
682    * @param perms
683    */
684   public void setTableGroupPermissions(String group, TableName table,
685       List<TablePermission> perms) {
686     PermissionCache<TablePermission> tablePerms = getTablePermissions(table);
687     tablePerms.replaceGroup(group, perms);
688     writeTableToZooKeeper(table, tablePerms);
689   }
690 
691   /**
692    * Overwrites the existing permission set for a given user for a table, and
693    * triggers an update for zookeeper synchronization.
694    * @param username
695    * @param namespace
696    * @param perms
697    */
698   public void setNamespaceUserPermissions(String username, String namespace,
699       List<TablePermission> perms) {
700     PermissionCache<TablePermission> tablePerms = getNamespacePermissions(namespace);
701     tablePerms.replaceUser(username, perms);
702     writeNamespaceToZooKeeper(namespace, tablePerms);
703   }
704 
705   /**
706    * Overwrites the existing permission set for a group and triggers an update
707    * for zookeeper synchronization.
708    * @param group
709    * @param namespace
710    * @param perms
711    */
712   public void setNamespaceGroupPermissions(String group, String namespace,
713       List<TablePermission> perms) {
714     PermissionCache<TablePermission> tablePerms = getNamespacePermissions(namespace);
715     tablePerms.replaceGroup(group, perms);
716     writeNamespaceToZooKeeper(namespace, tablePerms);
717   }
718 
719   public void writeTableToZooKeeper(TableName table,
720       PermissionCache<TablePermission> tablePerms) {
721     byte[] serialized = new byte[0];
722     if (tablePerms != null) {
723       serialized = AccessControlLists.writePermissionsAsBytes(tablePerms.getAllPermissions(), conf);
724     }
725     zkperms.writeToZookeeper(table.getName(), serialized);
726   }
727 
728   public void writeNamespaceToZooKeeper(String namespace,
729       PermissionCache<TablePermission> tablePerms) {
730     byte[] serialized = new byte[0];
731     if (tablePerms != null) {
732       serialized = AccessControlLists.writePermissionsAsBytes(tablePerms.getAllPermissions(), conf);
733     }
734     zkperms.writeToZookeeper(Bytes.toBytes(AccessControlLists.toNamespaceEntry(namespace)),
735         serialized);
736   }
737 
738   public long getMTime() {
739     return mtime.get();
740   }
741 
742   static Map<ZooKeeperWatcher,TableAuthManager> managerMap =
743     new HashMap<ZooKeeperWatcher,TableAuthManager>();
744 
745   public synchronized static TableAuthManager get(
746       ZooKeeperWatcher watcher, Configuration conf) throws IOException {
747     instance = managerMap.get(watcher);
748     if (instance == null) {
749       instance = new TableAuthManager(watcher, conf);
750       managerMap.put(watcher, instance);
751     }
752     return instance;
753   }
754 }