View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.security.access;
20  
21  import java.io.IOException;
22  import java.util.HashMap;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.concurrent.ConcurrentSkipListMap;
26  import java.util.concurrent.atomic.AtomicLong;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.hbase.Cell;
33  import org.apache.hadoop.hbase.TableName;
34  import org.apache.hadoop.hbase.exceptions.DeserializationException;
35  import org.apache.hadoop.hbase.security.User;
36  import org.apache.hadoop.hbase.security.UserProvider;
37  import org.apache.hadoop.hbase.util.Bytes;
38  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
39  import org.apache.zookeeper.KeeperException;
40  
41  import com.google.common.collect.ArrayListMultimap;
42  import com.google.common.collect.ListMultimap;
43  import com.google.common.collect.Lists;
44  
45  /**
46   * Performs authorization checks for a given user's assigned permissions
47   */
48  @InterfaceAudience.Private
49  public class TableAuthManager {
50    private static class PermissionCache<T extends Permission> {
51      /** Cache of user permissions */
52      private ListMultimap<String,T> userCache = ArrayListMultimap.create();
53      /** Cache of group permissions */
54      private ListMultimap<String,T> groupCache = ArrayListMultimap.create();
55  
56      public List<T> getUser(String user) {
57        return userCache.get(user);
58      }
59  
60      public void putUser(String user, T perm) {
61        userCache.put(user, perm);
62      }
63  
64      public List<T> replaceUser(String user, Iterable<? extends T> perms) {
65        return userCache.replaceValues(user, perms);
66      }
67  
68      public List<T> getGroup(String group) {
69        return groupCache.get(group);
70      }
71  
72      public void putGroup(String group, T perm) {
73        groupCache.put(group, perm);
74      }
75  
76      public List<T> replaceGroup(String group, Iterable<? extends T> perms) {
77        return groupCache.replaceValues(group, perms);
78      }
79  
80      /**
81       * Returns a combined map of user and group permissions, with group names prefixed by
82       * {@link AccessControlLists#GROUP_PREFIX}.
83       */
84      public ListMultimap<String,T> getAllPermissions() {
85        ListMultimap<String,T> tmp = ArrayListMultimap.create();
86        tmp.putAll(userCache);
87        for (String group : groupCache.keySet()) {
88          tmp.putAll(AccessControlLists.GROUP_PREFIX + group, groupCache.get(group));
89        }
90        return tmp;
91      }
92    }
93  
94    private static Log LOG = LogFactory.getLog(TableAuthManager.class);
95  
96    private static TableAuthManager instance;
97  
98    /** Cache of global permissions */
99    private volatile PermissionCache<Permission> globalCache;
100 
101   private ConcurrentSkipListMap<TableName, PermissionCache<TablePermission>> tableCache =
102       new ConcurrentSkipListMap<TableName, PermissionCache<TablePermission>>();
103 
104   private ConcurrentSkipListMap<String, PermissionCache<TablePermission>> nsCache =
105     new ConcurrentSkipListMap<String, PermissionCache<TablePermission>>();
106 
107   private Configuration conf;
108   private ZKPermissionWatcher zkperms;
109   private final AtomicLong mtime = new AtomicLong(0L);
110 
111   private TableAuthManager(ZooKeeperWatcher watcher, Configuration conf)
112       throws IOException {
113     this.conf = conf;
114 
115     // initialize global permissions based on configuration
116     globalCache = initGlobal(conf);
117 
118     this.zkperms = new ZKPermissionWatcher(watcher, this, conf);
119     try {
120       this.zkperms.start();
121     } catch (KeeperException ke) {
122       LOG.error("ZooKeeper initialization failed", ke);
123     }
124   }
125 
126   /**
127    * Returns a new {@code PermissionCache} initialized with permission assignments
128    * from the {@code hbase.superuser} configuration key.
129    */
130   private PermissionCache<Permission> initGlobal(Configuration conf) throws IOException {
131     UserProvider userProvider = UserProvider.instantiate(conf);
132     User user = userProvider.getCurrent();
133     if (user == null) {
134       throw new IOException("Unable to obtain the current user, " +
135           "authorization checks for internal operations will not work correctly!");
136     }
137     PermissionCache<Permission> newCache = new PermissionCache<Permission>();
138     String currentUser = user.getShortName();
139 
140     // the system user is always included
141     List<String> superusers = Lists.asList(currentUser, conf.getStrings(
142         AccessControlLists.SUPERUSER_CONF_KEY, new String[0]));
143     if (superusers != null) {
144       for (String name : superusers) {
145         if (AccessControlLists.isGroupPrincipal(name)) {
146           newCache.putGroup(AccessControlLists.getGroupName(name),
147               new Permission(Permission.Action.values()));
148         } else {
149           newCache.putUser(name, new Permission(Permission.Action.values()));
150         }
151       }
152     }
153     return newCache;
154   }
155 
156   public ZKPermissionWatcher getZKPermissionWatcher() {
157     return this.zkperms;
158   }
159 
160   public void refreshTableCacheFromWritable(TableName table,
161                                        byte[] data) throws IOException {
162     if (data != null && data.length > 0) {
163       ListMultimap<String,TablePermission> perms;
164       try {
165         perms = AccessControlLists.readPermissions(data, conf);
166       } catch (DeserializationException e) {
167         throw new IOException(e);
168       }
169 
170       if (perms != null) {
171         if (Bytes.equals(table.getName(), AccessControlLists.ACL_GLOBAL_NAME)) {
172           updateGlobalCache(perms);
173         } else {
174           updateTableCache(table, perms);
175         }
176       }
177     } else {
178       LOG.debug("Skipping permission cache refresh because writable data is empty");
179     }
180   }
181 
182   public void refreshNamespaceCacheFromWritable(String namespace, byte[] data) throws IOException {
183     if (data != null && data.length > 0) {
184       ListMultimap<String,TablePermission> perms;
185       try {
186         perms = AccessControlLists.readPermissions(data, conf);
187       } catch (DeserializationException e) {
188         throw new IOException(e);
189       }
190       if (perms != null) {
191         updateNsCache(namespace, perms);
192       }
193     } else {
194       LOG.debug("Skipping permission cache refresh because writable data is empty");
195     }
196   }
197 
198   /**
199    * Updates the internal global permissions cache
200    *
201    * @param userPerms
202    */
203   private void updateGlobalCache(ListMultimap<String,TablePermission> userPerms) {
204     PermissionCache<Permission> newCache = null;
205     try {
206       newCache = initGlobal(conf);
207       for (Map.Entry<String,TablePermission> entry : userPerms.entries()) {
208         if (AccessControlLists.isGroupPrincipal(entry.getKey())) {
209           newCache.putGroup(AccessControlLists.getGroupName(entry.getKey()),
210               new Permission(entry.getValue().getActions()));
211         } else {
212           newCache.putUser(entry.getKey(), new Permission(entry.getValue().getActions()));
213         }
214       }
215       globalCache = newCache;
216       mtime.incrementAndGet();
217     } catch (IOException e) {
218       // Never happens
219       LOG.error("Error occured while updating the global cache", e);
220     }
221   }
222 
223   /**
224    * Updates the internal permissions cache for a single table, splitting
225    * the permissions listed into separate caches for users and groups to optimize
226    * group lookups.
227    *
228    * @param table
229    * @param tablePerms
230    */
231   private void updateTableCache(TableName table,
232                                 ListMultimap<String,TablePermission> tablePerms) {
233     PermissionCache<TablePermission> newTablePerms = new PermissionCache<TablePermission>();
234 
235     for (Map.Entry<String,TablePermission> entry : tablePerms.entries()) {
236       if (AccessControlLists.isGroupPrincipal(entry.getKey())) {
237         newTablePerms.putGroup(AccessControlLists.getGroupName(entry.getKey()), entry.getValue());
238       } else {
239         newTablePerms.putUser(entry.getKey(), entry.getValue());
240       }
241     }
242 
243     tableCache.put(table, newTablePerms);
244     mtime.incrementAndGet();
245   }
246 
247   /**
248    * Updates the internal permissions cache for a single table, splitting
249    * the permissions listed into separate caches for users and groups to optimize
250    * group lookups.
251    *
252    * @param namespace
253    * @param tablePerms
254    */
255   private void updateNsCache(String namespace,
256                              ListMultimap<String, TablePermission> tablePerms) {
257     PermissionCache<TablePermission> newTablePerms = new PermissionCache<TablePermission>();
258 
259     for (Map.Entry<String, TablePermission> entry : tablePerms.entries()) {
260       if (AccessControlLists.isGroupPrincipal(entry.getKey())) {
261         newTablePerms.putGroup(AccessControlLists.getGroupName(entry.getKey()), entry.getValue());
262       } else {
263         newTablePerms.putUser(entry.getKey(), entry.getValue());
264       }
265     }
266 
267     nsCache.put(namespace, newTablePerms);
268     mtime.incrementAndGet();
269   }
270 
271   private PermissionCache<TablePermission> getTablePermissions(TableName table) {
272     if (!tableCache.containsKey(table)) {
273       tableCache.putIfAbsent(table, new PermissionCache<TablePermission>());
274     }
275     return tableCache.get(table);
276   }
277 
278   private PermissionCache<TablePermission> getNamespacePermissions(String namespace) {
279     if (!nsCache.containsKey(namespace)) {
280       nsCache.putIfAbsent(namespace, new PermissionCache<TablePermission>());
281     }
282     return nsCache.get(namespace);
283   }
284 
285   /**
286    * Authorizes a global permission
287    * @param perms
288    * @param action
289    * @return true if authorized, false otherwise
290    */
291   private boolean authorize(List<Permission> perms, Permission.Action action) {
292     if (perms != null) {
293       for (Permission p : perms) {
294         if (p.implies(action)) {
295           return true;
296         }
297       }
298     } else if (LOG.isDebugEnabled()) {
299       LOG.debug("No permissions found for " + action);
300     }
301 
302     return false;
303   }
304 
305   /**
306    * Authorize a global permission based on ACLs for the given user and the
307    * user's groups.
308    * @param user
309    * @param action
310    * @return true if known and authorized, false otherwise
311    */
312   public boolean authorize(User user, Permission.Action action) {
313     if (user == null) {
314       return false;
315     }
316 
317     if (authorize(globalCache.getUser(user.getShortName()), action)) {
318       return true;
319     }
320 
321     String[] groups = user.getGroupNames();
322     if (groups != null) {
323       for (String group : groups) {
324         if (authorize(globalCache.getGroup(group), action)) {
325           return true;
326         }
327       }
328     }
329     return false;
330   }
331 
332   private boolean authorize(List<TablePermission> perms,
333                             TableName table, byte[] family,
334                             Permission.Action action) {
335     return authorize(perms, table, family, null, action);
336   }
337 
338   private boolean authorize(List<TablePermission> perms,
339                             TableName table, byte[] family,
340                             byte[] qualifier, Permission.Action action) {
341     if (perms != null) {
342       for (TablePermission p : perms) {
343         if (p.implies(table, family, qualifier, action)) {
344           return true;
345         }
346       }
347     } else if (LOG.isDebugEnabled()) {
348       LOG.debug("No permissions found for table="+table);
349     }
350     return false;
351   }
352 
353   private boolean hasAccess(List<TablePermission> perms,
354                             TableName table, Permission.Action action) {
355     if (perms != null) {
356       for (TablePermission p : perms) {
357         if (p.implies(action)) {
358           return true;
359         }
360       }
361     } else if (LOG.isDebugEnabled()) {
362       LOG.debug("No permissions found for table="+table);
363     }
364     return false;
365   }
366 
367   /**
368    * Authorize a user for a given KV. This is called from AccessControlFilter.
369    */
370   public boolean authorize(User user, TableName table, Cell cell, Permission.Action action) {
371     try {
372       List<Permission> perms = AccessControlLists.getCellPermissionsForUser(user, cell);
373       if (LOG.isTraceEnabled()) {
374         LOG.trace("Perms for user " + user.getShortName() + " in cell " + cell + ": " +
375           (perms != null ? perms : ""));
376       }
377       if (perms != null) {
378         for (Permission p: perms) {
379           if (p.implies(action)) {
380             return true;
381           }
382         }
383       }
384     } catch (IOException e) {
385       // We failed to parse the KV tag
386       LOG.error("Failed parse of ACL tag in cell " + cell);
387       // Fall through to check with the table and CF perms we were able
388       // to collect regardless
389     }
390     return false;
391   }
392 
393   public boolean authorize(User user, String namespace, Permission.Action action) {
394     // Global authorizations supercede namespace level
395     if (authorizeUser(user, action)) {
396       return true;
397     }
398     // Check namespace permissions
399     PermissionCache<TablePermission> tablePerms = nsCache.get(namespace);
400     if (tablePerms != null) {
401       List<TablePermission> userPerms = tablePerms.getUser(user.getShortName());
402       if (authorize(userPerms, namespace, action)) {
403         return true;
404       }
405       String[] groupNames = user.getGroupNames();
406       if (groupNames != null) {
407         for (String group : groupNames) {
408           List<TablePermission> groupPerms = tablePerms.getGroup(group);
409           if (authorize(groupPerms, namespace, action)) {
410             return true;
411           }
412         }
413       }
414     }
415     return false;
416   }
417 
418   private boolean authorize(List<TablePermission> perms, String namespace,
419                             Permission.Action action) {
420     if (perms != null) {
421       for (TablePermission p : perms) {
422         if (p.implies(namespace, action)) {
423           return true;
424         }
425       }
426     } else if (LOG.isDebugEnabled()) {
427       LOG.debug("No permissions for authorize() check, table=" + namespace);
428     }
429 
430     return false;
431   }
432 
433   /**
434    * Checks global authorization for a specific action for a user, based on the
435    * stored user permissions.
436    */
437   public boolean authorizeUser(User user, Permission.Action action) {
438     return authorize(globalCache.getUser(user.getShortName()), action);
439   }
440 
441   /**
442    * Checks authorization to a given table and column family for a user, based on the
443    * stored user permissions.
444    *
445    * @param user
446    * @param table
447    * @param family
448    * @param action
449    * @return true if known and authorized, false otherwise
450    */
451   public boolean authorizeUser(User user, TableName table, byte[] family,
452       Permission.Action action) {
453     return authorizeUser(user, table, family, null, action);
454   }
455 
456   public boolean authorizeUser(User user, TableName table, byte[] family,
457       byte[] qualifier, Permission.Action action) {
458     if (table == null) table = AccessControlLists.ACL_TABLE_NAME;
459     // Global and namespace authorizations supercede table level
460     if (authorize(user, table.getNamespaceAsString(), action)) {
461       return true;
462     }
463     // Check table permissions
464     return authorize(getTablePermissions(table).getUser(user.getShortName()), table, family,
465         qualifier, action);
466   }
467 
468   /**
469    * Checks if the user has access to the full table or at least a family/qualifier
470    * for the specified action.
471    *
472    * @param user
473    * @param table
474    * @param action
475    * @return true if the user has access to the table, false otherwise
476    */
477   public boolean userHasAccess(User user, TableName table, Permission.Action action) {
478     if (table == null) table = AccessControlLists.ACL_TABLE_NAME;
479     // Global and namespace authorizations supercede table level
480     if (authorize(user, table.getNamespaceAsString(), action)) {
481       return true;
482     }
483     // Check table permissions
484     return hasAccess(getTablePermissions(table).getUser(user.getShortName()), table, action);
485   }
486 
487   /**
488    * Checks global authorization for a given action for a group, based on the stored
489    * permissions.
490    */
491   public boolean authorizeGroup(String groupName, Permission.Action action) {
492     List<Permission> perms = globalCache.getGroup(groupName);
493     if (LOG.isDebugEnabled()) {
494       LOG.debug("authorizing " + (perms != null && !perms.isEmpty() ? perms.get(0) : "") +
495         " for " + action);
496     }
497     return authorize(perms, action);
498   }
499 
500   /**
501    * Checks authorization to a given table, column family and column for a group, based
502    * on the stored permissions.
503    * @param groupName
504    * @param table
505    * @param family
506    * @param qualifier
507    * @param action
508    * @return true if known and authorized, false otherwise
509    */
510   public boolean authorizeGroup(String groupName, TableName table, byte[] family,
511       byte[] qualifier, Permission.Action action) {
512     // Global authorization supercedes table level
513     if (authorizeGroup(groupName, action)) {
514       return true;
515     }
516     if (table == null) table = AccessControlLists.ACL_TABLE_NAME;
517     // Namespace authorization supercedes table level
518     String namespace = table.getNamespaceAsString();
519     if (authorize(getNamespacePermissions(namespace).getGroup(groupName), namespace, action)) {
520       return true;
521     }
522     // Check table level
523     List<TablePermission> tblPerms = getTablePermissions(table).getGroup(groupName);
524     if (LOG.isDebugEnabled()) {
525       LOG.debug("authorizing " + (tblPerms != null && !tblPerms.isEmpty() ? tblPerms.get(0) : "") +
526         " for " +groupName + " on " + table + "." + Bytes.toString(family) + "." +
527         Bytes.toString(qualifier) + " with " + action);
528     }
529     return authorize(tblPerms, table, family, qualifier, action);
530   }
531 
532   /**
533    * Checks if the user has access to the full table or at least a family/qualifier
534    * for the specified action.
535    * @param groupName
536    * @param table
537    * @param action
538    * @return true if the group has access to the table, false otherwise
539    */
540   public boolean groupHasAccess(String groupName, TableName table, Permission.Action action) {
541     // Global authorization supercedes table level
542     if (authorizeGroup(groupName, action)) {
543       return true;
544     }
545     if (table == null) table = AccessControlLists.ACL_TABLE_NAME;
546     // Namespace authorization supercedes table level
547     if (hasAccess(getNamespacePermissions(table.getNamespaceAsString()).getGroup(groupName),
548         table, action)) {
549       return true;
550     }
551     // Check table level
552     return hasAccess(getTablePermissions(table).getGroup(groupName), table, action);
553   }
554 
555   public boolean authorize(User user, TableName table, byte[] family,
556       byte[] qualifier, Permission.Action action) {
557     if (authorizeUser(user, table, family, qualifier, action)) {
558       return true;
559     }
560 
561     String[] groups = user.getGroupNames();
562     if (groups != null) {
563       for (String group : groups) {
564         if (authorizeGroup(group, table, family, qualifier, action)) {
565           return true;
566         }
567       }
568     }
569     return false;
570   }
571 
572   public boolean hasAccess(User user, TableName table, Permission.Action action) {
573     if (userHasAccess(user, table, action)) {
574       return true;
575     }
576 
577     String[] groups = user.getGroupNames();
578     if (groups != null) {
579       for (String group : groups) {
580         if (groupHasAccess(group, table, action)) {
581           return true;
582         }
583       }
584     }
585     return false;
586   }
587 
588   public boolean authorize(User user, TableName table, byte[] family,
589       Permission.Action action) {
590     return authorize(user, table, family, null, action);
591   }
592 
593   /**
594    * Returns true if the given user has a {@link TablePermission} matching up
595    * to the column family portion of a permission.  Note that this permission
596    * may be scoped to a given column qualifier and does not guarantee that
597    * authorize() on the same column family would return true.
598    */
599   public boolean matchPermission(User user,
600       TableName table, byte[] family, Permission.Action action) {
601     PermissionCache<TablePermission> tablePerms = tableCache.get(table);
602     if (tablePerms != null) {
603       List<TablePermission> userPerms = tablePerms.getUser(user.getShortName());
604       if (userPerms != null) {
605         for (TablePermission p : userPerms) {
606           if (p.matchesFamily(table, family, action)) {
607             return true;
608           }
609         }
610       }
611 
612       String[] groups = user.getGroupNames();
613       if (groups != null) {
614         for (String group : groups) {
615           List<TablePermission> groupPerms = tablePerms.getGroup(group);
616           if (groupPerms != null) {
617             for (TablePermission p : groupPerms) {
618               if (p.matchesFamily(table, family, action)) {
619                 return true;
620               }
621             }
622           }
623         }
624       }
625     }
626 
627     return false;
628   }
629 
630   public boolean matchPermission(User user,
631       TableName table, byte[] family, byte[] qualifier,
632       Permission.Action action) {
633     PermissionCache<TablePermission> tablePerms = tableCache.get(table);
634     if (tablePerms != null) {
635       List<TablePermission> userPerms = tablePerms.getUser(user.getShortName());
636       if (userPerms != null) {
637         for (TablePermission p : userPerms) {
638           if (p.matchesFamilyQualifier(table, family, qualifier, action)) {
639             return true;
640           }
641         }
642       }
643 
644       String[] groups = user.getGroupNames();
645       if (groups != null) {
646         for (String group : groups) {
647           List<TablePermission> groupPerms = tablePerms.getGroup(group);
648           if (groupPerms != null) {
649             for (TablePermission p : groupPerms) {
650               if (p.matchesFamilyQualifier(table, family, qualifier, action)) {
651                 return true;
652               }
653             }
654           }
655         }
656       }
657     }
658     return false;
659   }
660 
661   public void removeNamespace(byte[] ns) {
662     nsCache.remove(Bytes.toString(ns));
663   }
664 
665   public void removeTable(TableName table) {
666     tableCache.remove(table);
667   }
668 
669   /**
670    * Overwrites the existing permission set for a given user for a table, and
671    * triggers an update for zookeeper synchronization.
672    * @param username
673    * @param table
674    * @param perms
675    */
676   public void setTableUserPermissions(String username, TableName table,
677       List<TablePermission> perms) {
678     PermissionCache<TablePermission> tablePerms = getTablePermissions(table);
679     tablePerms.replaceUser(username, perms);
680     writeTableToZooKeeper(table, tablePerms);
681   }
682 
683   /**
684    * Overwrites the existing permission set for a group and triggers an update
685    * for zookeeper synchronization.
686    * @param group
687    * @param table
688    * @param perms
689    */
690   public void setTableGroupPermissions(String group, TableName table,
691       List<TablePermission> perms) {
692     PermissionCache<TablePermission> tablePerms = getTablePermissions(table);
693     tablePerms.replaceGroup(group, perms);
694     writeTableToZooKeeper(table, tablePerms);
695   }
696 
697   /**
698    * Overwrites the existing permission set for a given user for a table, and
699    * triggers an update for zookeeper synchronization.
700    * @param username
701    * @param namespace
702    * @param perms
703    */
704   public void setNamespaceUserPermissions(String username, String namespace,
705       List<TablePermission> perms) {
706     PermissionCache<TablePermission> tablePerms = getNamespacePermissions(namespace);
707     tablePerms.replaceUser(username, perms);
708     writeNamespaceToZooKeeper(namespace, tablePerms);
709   }
710 
711   /**
712    * Overwrites the existing permission set for a group and triggers an update
713    * for zookeeper synchronization.
714    * @param group
715    * @param namespace
716    * @param perms
717    */
718   public void setNamespaceGroupPermissions(String group, String namespace,
719       List<TablePermission> perms) {
720     PermissionCache<TablePermission> tablePerms = getNamespacePermissions(namespace);
721     tablePerms.replaceGroup(group, perms);
722     writeNamespaceToZooKeeper(namespace, tablePerms);
723   }
724 
725   public void writeTableToZooKeeper(TableName table,
726       PermissionCache<TablePermission> tablePerms) {
727     byte[] serialized = new byte[0];
728     if (tablePerms != null) {
729       serialized = AccessControlLists.writePermissionsAsBytes(tablePerms.getAllPermissions(), conf);
730     }
731     zkperms.writeToZookeeper(table.getName(), serialized);
732   }
733 
734   public void writeNamespaceToZooKeeper(String namespace,
735       PermissionCache<TablePermission> tablePerms) {
736     byte[] serialized = new byte[0];
737     if (tablePerms != null) {
738       serialized = AccessControlLists.writePermissionsAsBytes(tablePerms.getAllPermissions(), conf);
739     }
740     zkperms.writeToZookeeper(Bytes.toBytes(AccessControlLists.toNamespaceEntry(namespace)),
741         serialized);
742   }
743 
744   public long getMTime() {
745     return mtime.get();
746   }
747 
748   static Map<ZooKeeperWatcher,TableAuthManager> managerMap =
749     new HashMap<ZooKeeperWatcher,TableAuthManager>();
750 
751   public synchronized static TableAuthManager get(
752       ZooKeeperWatcher watcher, Configuration conf) throws IOException {
753     instance = managerMap.get(watcher);
754     if (instance == null) {
755       instance = new TableAuthManager(watcher, conf);
756       managerMap.put(watcher, instance);
757     }
758     return instance;
759   }
760 }