View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.security.access;
20  
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.apache.hadoop.hbase.classification.InterfaceAudience;
24  import org.apache.hadoop.conf.Configuration;
25  import org.apache.hadoop.hbase.DaemonThreadFactory;
26  import org.apache.hadoop.hbase.TableName;
27  import org.apache.hadoop.hbase.util.Bytes;
28  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
29  import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
30  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
31  import org.apache.zookeeper.KeeperException;
32  
33  import java.io.Closeable;
34  import java.io.IOException;
35  import java.util.List;
36  import java.util.concurrent.Callable;
37  import java.util.concurrent.CountDownLatch;
38  import java.util.concurrent.ExecutionException;
39  import java.util.concurrent.ExecutorService;
40  import java.util.concurrent.Executors;
41  import java.util.concurrent.atomic.AtomicReference;
42  
43  /**
44   * Handles synchronization of access control list entries and updates
45   * throughout all nodes in the cluster.  The {@link AccessController} instance
46   * on the {@code _acl_} table regions, creates a znode for each table as
47   * {@code /hbase/acl/tablename}, with the znode data containing a serialized
48   * list of the permissions granted for the table.  The {@code AccessController}
49   * instances on all other cluster hosts watch the znodes for updates, which
50   * trigger updates in the {@link TableAuthManager} permission cache.
51   */
52  @InterfaceAudience.Private
53  public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable {
54    private static final Log LOG = LogFactory.getLog(ZKPermissionWatcher.class);
55    // parent node for permissions lists
56    static final String ACL_NODE = "acl";
57    TableAuthManager authManager;
58    String aclZNode;
59    CountDownLatch initialized = new CountDownLatch(1);
60    AtomicReference<List<ZKUtil.NodeAndData>> nodes =
61        new AtomicReference<List<ZKUtil.NodeAndData>>(null);
62    ExecutorService executor;
63  
64    public ZKPermissionWatcher(ZooKeeperWatcher watcher,
65        TableAuthManager authManager, Configuration conf) {
66      super(watcher);
67      this.authManager = authManager;
68      String aclZnodeParent = conf.get("zookeeper.znode.acl.parent", ACL_NODE);
69      this.aclZNode = ZKUtil.joinZNode(watcher.baseZNode, aclZnodeParent);
70      executor = Executors.newSingleThreadExecutor(
71        new DaemonThreadFactory("zk-permission-watcher"));
72    }
73  
74    public void start() throws KeeperException {
75      try {
76        watcher.registerListener(this);
77        if (ZKUtil.watchAndCheckExists(watcher, aclZNode)) {
78          try {
79            executor.submit(new Callable<Void>() {
80              @Override
81              public Void call() throws KeeperException {
82                List<ZKUtil.NodeAndData> existing =
83                    ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
84                if (existing != null) {
85                  refreshNodes(existing, null);
86                }
87                return null;
88              }
89            }).get();
90          } catch (ExecutionException ex) {
91            if (ex.getCause() instanceof KeeperException) {
92              throw (KeeperException)ex.getCause();
93            } else {
94              throw new RuntimeException(ex.getCause());
95            }
96          } catch (InterruptedException ex) {
97            Thread.currentThread().interrupt();
98          }
99        }
100     } finally {
101       initialized.countDown();
102     }
103   }
104 
105   @Override
106   public void close() {
107     executor.shutdown();
108   }
109 
110   private void waitUntilStarted() {
111     try {
112       initialized.await();
113     } catch (InterruptedException e) {
114       LOG.warn("Interrupted while waiting for start", e);
115       Thread.currentThread().interrupt();
116     }
117   }
118 
119   @Override
120   public void nodeCreated(String path) {
121     waitUntilStarted();
122     if (path.equals(aclZNode)) {
123       executor.submit(new Runnable() {
124         @Override
125         public void run() {
126           try {
127             List<ZKUtil.NodeAndData> nodes =
128                 ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
129             refreshNodes(nodes, null);
130           } catch (KeeperException ke) {
131             LOG.error("Error reading data from zookeeper", ke);
132             // only option is to abort
133             watcher.abort("Zookeeper error obtaining acl node children", ke);
134           }
135         }
136       });
137     }
138   }
139 
140   @Override
141   public void nodeDeleted(final String path) {
142     waitUntilStarted();
143     if (aclZNode.equals(ZKUtil.getParent(path))) {
144       executor.submit(new Runnable() {
145         @Override
146         public void run() {
147           String table = ZKUtil.getNodeName(path);
148           if(AccessControlLists.isNamespaceEntry(table)) {
149             authManager.removeNamespace(Bytes.toBytes(table));
150           } else {
151             authManager.removeTable(TableName.valueOf(table));
152           }
153         }
154       });
155     }
156   }
157 
158   @Override
159   public void nodeDataChanged(final String path) {
160     waitUntilStarted();
161     if (aclZNode.equals(ZKUtil.getParent(path))) {
162       executor.submit(new Runnable() {
163         @Override
164         public void run() {
165           // update cache on an existing table node
166           String entry = ZKUtil.getNodeName(path);
167           try {
168             byte[] data = ZKUtil.getDataAndWatch(watcher, path);
169             refreshAuthManager(entry, data);
170           } catch (KeeperException ke) {
171             LOG.error("Error reading data from zookeeper for node " + entry, ke);
172             // only option is to abort
173             watcher.abort("Zookeeper error getting data for node " + entry, ke);
174           } catch (IOException ioe) {
175             LOG.error("Error reading permissions writables", ioe);
176           }
177         }
178       });
179     }
180   }
181 
182   @Override
183   public void nodeChildrenChanged(final String path) {
184     waitUntilStarted();
185     if (path.equals(aclZNode)) {
186       try {
187         List<ZKUtil.NodeAndData> nodeList =
188             ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
189         while (!nodes.compareAndSet(null, nodeList)) {
190           try {
191             Thread.sleep(20);
192           } catch (InterruptedException e) {
193             LOG.warn("Interrupted while setting node list", e);
194             Thread.currentThread().interrupt();
195           }
196         }
197       } catch (KeeperException ke) {
198         LOG.error("Error reading data from zookeeper for path "+path, ke);
199         watcher.abort("Zookeeper error get node children for path "+path, ke);
200       }
201       executor.submit(new Runnable() {
202         // allows subsequent nodeChildrenChanged event to preempt current processing of
203         // nodeChildrenChanged event
204         @Override
205         public void run() {
206           List<ZKUtil.NodeAndData> nodeList = nodes.get();
207           nodes.set(null);
208           refreshNodes(nodeList, nodes);
209         }
210       });
211     }
212   }
213 
214   private void refreshNodes(List<ZKUtil.NodeAndData> nodes, AtomicReference ref) {
215     for (ZKUtil.NodeAndData n : nodes) {
216       if (ref != null && ref.get() != null) {
217         // there is a newer list
218         break;
219       }
220       if (n.isEmpty()) continue;
221       String path = n.getNode();
222       String entry = (ZKUtil.getNodeName(path));
223       try {
224         refreshAuthManager(entry, n.getData());
225       } catch (IOException ioe) {
226         LOG.error("Failed parsing permissions for table '" + entry +
227             "' from zk", ioe);
228       }
229     }
230   }
231 
232   private void refreshAuthManager(String entry, byte[] nodeData) throws IOException {
233     if (LOG.isDebugEnabled()) {
234       LOG.debug("Updating permissions cache from node "+entry+" with data: "+
235           Bytes.toStringBinary(nodeData));
236     }
237     if(AccessControlLists.isNamespaceEntry(entry)) {
238       authManager.refreshNamespaceCacheFromWritable(
239           AccessControlLists.fromNamespaceEntry(entry), nodeData);
240     } else {
241       authManager.refreshTableCacheFromWritable(TableName.valueOf(entry), nodeData);
242     }
243   }
244 
245   /***
246    * Write a table's access controls to the permissions mirror in zookeeper
247    * @param entry
248    * @param permsData
249    */
250   public void writeToZookeeper(byte[] entry, byte[] permsData) {
251     String entryName = Bytes.toString(entry);
252     String zkNode = ZKUtil.joinZNode(watcher.baseZNode, ACL_NODE);
253     zkNode = ZKUtil.joinZNode(zkNode, entryName);
254 
255     try {
256       ZKUtil.createWithParents(watcher, zkNode);
257       ZKUtil.updateExistingNodeData(watcher, zkNode, permsData, -1);
258     } catch (KeeperException e) {
259       LOG.error("Failed updating permissions for entry '" +
260           entryName + "'", e);
261       watcher.abort("Failed writing node "+zkNode+" to zookeeper", e);
262     }
263   }
264 
265   /***
266    * Delete the acl notify node of table
267    * @param tableName
268    */
269   public void deleteTableACLNode(final TableName tableName) {
270     String zkNode = ZKUtil.joinZNode(watcher.baseZNode, ACL_NODE);
271     zkNode = ZKUtil.joinZNode(zkNode, tableName.getNameAsString());
272 
273     try {
274       ZKUtil.deleteNode(watcher, zkNode);
275     } catch (KeeperException.NoNodeException e) {
276       LOG.warn("No acl notify node of table '" + tableName + "'");
277     } catch (KeeperException e) {
278       LOG.error("Failed deleting acl node of table '" + tableName + "'", e);
279       watcher.abort("Failed deleting node " + zkNode, e);
280     }
281   }
282 
283   /***
284    * Delete the acl notify node of namespace
285    */
286   public void deleteNamespaceACLNode(final String namespace) {
287     String zkNode = ZKUtil.joinZNode(watcher.baseZNode, ACL_NODE);
288     zkNode = ZKUtil.joinZNode(zkNode, AccessControlLists.NAMESPACE_PREFIX + namespace);
289 
290     try {
291       ZKUtil.deleteNode(watcher, zkNode);
292     } catch (KeeperException.NoNodeException e) {
293       LOG.warn("No acl notify node of namespace '" + namespace + "'");
294     } catch (KeeperException e) {
295       LOG.error("Failed deleting acl node of namespace '" + namespace + "'", e);
296       watcher.abort("Failed deleting node " + zkNode, e);
297     }
298   }
299 }