001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.security.access;
020
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.hbase.DaemonThreadFactory;
023import org.apache.hadoop.hbase.TableName;
024import org.apache.hadoop.hbase.util.Bytes;
025import org.apache.hadoop.hbase.zookeeper.ZKListener;
026import org.apache.hadoop.hbase.zookeeper.ZKUtil;
027import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
028import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.apache.zookeeper.KeeperException;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034import java.io.Closeable;
035import java.io.IOException;
036import java.util.List;
037import java.util.concurrent.Callable;
038import java.util.concurrent.CountDownLatch;
039import java.util.concurrent.ExecutionException;
040import java.util.concurrent.ExecutorService;
041import java.util.concurrent.Executors;
042import java.util.concurrent.Future;
043import java.util.concurrent.RejectedExecutionException;
044
045/**
046 * Handles synchronization of access control list entries and updates
047 * throughout all nodes in the cluster.  The {@link AccessController} instance
048 * on the {@code _acl_} table regions, creates a znode for each table as
049 * {@code /hbase/acl/tablename}, with the znode data containing a serialized
050 * list of the permissions granted for the table.  The {@code AccessController}
051 * instances on all other cluster hosts watch the znodes for updates, which
052 * trigger updates in the {@link AuthManager} permission cache.
053 */
054@InterfaceAudience.Private
055public class ZKPermissionWatcher extends ZKListener implements Closeable {
056  private static final Logger LOG = LoggerFactory.getLogger(ZKPermissionWatcher.class);
057  // parent node for permissions lists
058  static final String ACL_NODE = "acl";
059  private final AuthManager authManager;
060  private final String aclZNode;
061  private final CountDownLatch initialized = new CountDownLatch(1);
062  private final ExecutorService executor;
063  private Future<?> childrenChangedFuture;
064
065  public ZKPermissionWatcher(ZKWatcher watcher,
066      AuthManager authManager, Configuration conf) {
067    super(watcher);
068    this.authManager = authManager;
069    String aclZnodeParent = conf.get("zookeeper.znode.acl.parent", ACL_NODE);
070    this.aclZNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, aclZnodeParent);
071    executor = Executors.newSingleThreadExecutor(
072      new DaemonThreadFactory("zk-permission-watcher"));
073  }
074
075  public void start() throws KeeperException {
076    try {
077      watcher.registerListener(this);
078      if (ZKUtil.watchAndCheckExists(watcher, aclZNode)) {
079        try {
080          executor.submit(new Callable<Void>() {
081            @Override
082            public Void call() throws KeeperException {
083              List<ZKUtil.NodeAndData> existing =
084                  ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
085              if (existing != null) {
086                refreshNodes(existing);
087              }
088              return null;
089            }
090          }).get();
091        } catch (ExecutionException ex) {
092          if (ex.getCause() instanceof KeeperException) {
093            throw (KeeperException)ex.getCause();
094          } else {
095            throw new RuntimeException(ex.getCause());
096          }
097        } catch (InterruptedException ex) {
098          Thread.currentThread().interrupt();
099        }
100      }
101    } finally {
102      initialized.countDown();
103    }
104  }
105
106  @Override
107  public void close() {
108    executor.shutdown();
109  }
110
111  private void waitUntilStarted() {
112    try {
113      initialized.await();
114    } catch (InterruptedException e) {
115      LOG.warn("Interrupted while waiting for start", e);
116      Thread.currentThread().interrupt();
117    }
118  }
119
120  @Override
121  public void nodeCreated(String path) {
122    waitUntilStarted();
123    if (path.equals(aclZNode)) {
124      asyncProcessNodeUpdate(new Runnable() {
125        @Override
126        public void run() {
127          try {
128            List<ZKUtil.NodeAndData> nodes =
129                ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
130            refreshNodes(nodes);
131          } catch (KeeperException ke) {
132            LOG.error("Error reading data from zookeeper", ke);
133            // only option is to abort
134            watcher.abort("ZooKeeper error obtaining acl node children", ke);
135          }
136        }
137      });
138    }
139  }
140
141  @Override
142  public void nodeDeleted(final String path) {
143    waitUntilStarted();
144    if (aclZNode.equals(ZKUtil.getParent(path))) {
145      asyncProcessNodeUpdate(new Runnable() {
146        @Override
147        public void run() {
148          String table = ZKUtil.getNodeName(path);
149          if(AccessControlLists.isNamespaceEntry(table)) {
150            authManager.removeNamespace(Bytes.toBytes(table));
151          } else {
152            authManager.removeTable(TableName.valueOf(table));
153          }
154        }
155      });
156    }
157  }
158
159  @Override
160  public void nodeDataChanged(final String path) {
161    waitUntilStarted();
162    if (aclZNode.equals(ZKUtil.getParent(path))) {
163      asyncProcessNodeUpdate(new Runnable() {
164        @Override
165        public void run() {
166          // update cache on an existing table node
167          String entry = ZKUtil.getNodeName(path);
168          try {
169            byte[] data = ZKUtil.getDataAndWatch(watcher, path);
170            refreshAuthManager(entry, data);
171          } catch (KeeperException ke) {
172            LOG.error("Error reading data from zookeeper for node " + entry, ke);
173            // only option is to abort
174            watcher.abort("ZooKeeper error getting data for node " + entry, ke);
175          } catch (IOException ioe) {
176            LOG.error("Error reading permissions writables", ioe);
177          }
178        }
179      });
180    }
181  }
182
183
184  @Override
185  public void nodeChildrenChanged(final String path) {
186    waitUntilStarted();
187    if (path.equals(aclZNode)) {
188      try {
189        final List<ZKUtil.NodeAndData> nodeList =
190            ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
191        // preempt any existing nodeChildrenChanged event processing
192        if (childrenChangedFuture != null && !childrenChangedFuture.isDone()) {
193          boolean cancelled = childrenChangedFuture.cancel(true);
194          if (!cancelled) {
195            // task may have finished between our check and attempted cancel, this is fine.
196            if (! childrenChangedFuture.isDone()) {
197              LOG.warn("Could not cancel processing node children changed event, " +
198                  "please file a JIRA and attach logs if possible.");
199            }
200          }
201        }
202        childrenChangedFuture = asyncProcessNodeUpdate(() -> refreshNodes(nodeList));
203      } catch (KeeperException ke) {
204        LOG.error("Error reading data from zookeeper for path "+path, ke);
205        watcher.abort("ZooKeeper error get node children for path "+path, ke);
206      }
207    }
208  }
209
210  private Future<?> asyncProcessNodeUpdate(Runnable runnable) {
211    if (!executor.isShutdown()) {
212      try {
213        return executor.submit(runnable);
214      } catch (RejectedExecutionException e) {
215        if (executor.isShutdown()) {
216          LOG.warn("aclZNode changed after ZKPermissionWatcher was shutdown");
217        } else {
218          throw e;
219        }
220      }
221    }
222    return null; // No task launched so there will be nothing to cancel later
223  }
224
225  private void refreshNodes(List<ZKUtil.NodeAndData> nodes) {
226    for (ZKUtil.NodeAndData n : nodes) {
227      if (Thread.interrupted()) {
228        // Use Thread.interrupted so that we clear interrupt status
229        break;
230      }
231      if (n.isEmpty()) continue;
232      String path = n.getNode();
233      String entry = (ZKUtil.getNodeName(path));
234      try {
235        refreshAuthManager(entry, n.getData());
236      } catch (IOException ioe) {
237        LOG.error("Failed parsing permissions for table '" + entry +
238            "' from zk", ioe);
239      }
240    }
241  }
242
243  private void refreshAuthManager(String entry, byte[] nodeData) throws IOException {
244    if (LOG.isDebugEnabled()) {
245      LOG.debug("Updating permissions cache from {} with data {}", entry,
246          Bytes.toStringBinary(nodeData));
247    }
248    if(AccessControlLists.isNamespaceEntry(entry)) {
249      authManager.refreshNamespaceCacheFromWritable(
250          AccessControlLists.fromNamespaceEntry(entry), nodeData);
251    } else {
252      authManager.refreshTableCacheFromWritable(TableName.valueOf(entry), nodeData);
253    }
254  }
255
256  /***
257   * Write a table's access controls to the permissions mirror in zookeeper
258   * @param entry
259   * @param permsData
260   */
261  public void writeToZookeeper(byte[] entry, byte[] permsData) {
262    String entryName = Bytes.toString(entry);
263    String zkNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, ACL_NODE);
264    zkNode = ZNodePaths.joinZNode(zkNode, entryName);
265
266    try {
267      ZKUtil.createWithParents(watcher, zkNode);
268      ZKUtil.updateExistingNodeData(watcher, zkNode, permsData, -1);
269    } catch (KeeperException e) {
270      LOG.error("Failed updating permissions for entry '" +
271          entryName + "'", e);
272      watcher.abort("Failed writing node "+zkNode+" to zookeeper", e);
273    }
274  }
275
276  /***
277   * Delete the acl notify node of table
278   * @param tableName
279   */
280  public void deleteTableACLNode(final TableName tableName) {
281    String zkNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, ACL_NODE);
282    zkNode = ZNodePaths.joinZNode(zkNode, tableName.getNameAsString());
283
284    try {
285      ZKUtil.deleteNode(watcher, zkNode);
286    } catch (KeeperException.NoNodeException e) {
287      LOG.warn("No acl notify node of table '" + tableName + "'");
288    } catch (KeeperException e) {
289      LOG.error("Failed deleting acl node of table '" + tableName + "'", e);
290      watcher.abort("Failed deleting node " + zkNode, e);
291    }
292  }
293
294  /***
295   * Delete the acl notify node of namespace
296   */
297  public void deleteNamespaceACLNode(final String namespace) {
298    String zkNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, ACL_NODE);
299    zkNode = ZNodePaths.joinZNode(zkNode, AccessControlLists.NAMESPACE_PREFIX + namespace);
300
301    try {
302      ZKUtil.deleteNode(watcher, zkNode);
303    } catch (KeeperException.NoNodeException e) {
304      LOG.warn("No acl notify node of namespace '" + namespace + "'");
305    } catch (KeeperException e) {
306      LOG.error("Failed deleting acl node of namespace '" + namespace + "'", e);
307      watcher.abort("Failed deleting node " + zkNode, e);
308    }
309  }
310}