001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.security.access;
020
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.hbase.TableName;
023import org.apache.hadoop.hbase.util.Bytes;
024import org.apache.hadoop.hbase.util.Threads;
025import org.apache.hadoop.hbase.zookeeper.ZKListener;
026import org.apache.hadoop.hbase.zookeeper.ZKUtil;
027import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
028import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
029import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.apache.zookeeper.KeeperException;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035import java.io.Closeable;
036import java.io.IOException;
037import java.util.List;
038import java.util.concurrent.Callable;
039import java.util.concurrent.CountDownLatch;
040import java.util.concurrent.ExecutionException;
041import java.util.concurrent.ExecutorService;
042import java.util.concurrent.Executors;
043import java.util.concurrent.Future;
044import java.util.concurrent.RejectedExecutionException;
045
046/**
047 * Handles synchronization of access control list entries and updates
048 * throughout all nodes in the cluster.  The {@link AccessController} instance
049 * on the {@code _acl_} table regions, creates a znode for each table as
050 * {@code /hbase/acl/tablename}, with the znode data containing a serialized
051 * list of the permissions granted for the table.  The {@code AccessController}
052 * instances on all other cluster hosts watch the znodes for updates, which
053 * trigger updates in the {@link AuthManager} permission cache.
054 */
055@InterfaceAudience.Private
056public class ZKPermissionWatcher extends ZKListener implements Closeable {
057  private static final Logger LOG = LoggerFactory.getLogger(ZKPermissionWatcher.class);
058  // parent node for permissions lists
059  static final String ACL_NODE = "acl";
060  private final AuthManager authManager;
061  private final String aclZNode;
062  private final CountDownLatch initialized = new CountDownLatch(1);
063  private final ExecutorService executor;
064  private Future<?> childrenChangedFuture;
065
066  public ZKPermissionWatcher(ZKWatcher watcher,
067      AuthManager authManager, Configuration conf) {
068    super(watcher);
069    this.authManager = authManager;
070    String aclZnodeParent = conf.get("zookeeper.znode.acl.parent", ACL_NODE);
071    this.aclZNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, aclZnodeParent);
072    executor = Executors.newSingleThreadExecutor(
073      new ThreadFactoryBuilder().setNameFormat("zk-permission-watcher-pool-%d")
074        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
075  }
076
077  public void start() throws KeeperException {
078    try {
079      watcher.registerListener(this);
080      if (ZKUtil.watchAndCheckExists(watcher, aclZNode)) {
081        try {
082          executor.submit(new Callable<Void>() {
083            @Override
084            public Void call() throws KeeperException {
085              List<ZKUtil.NodeAndData> existing =
086                  ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
087              if (existing != null) {
088                refreshNodes(existing);
089              }
090              return null;
091            }
092          }).get();
093        } catch (ExecutionException ex) {
094          if (ex.getCause() instanceof KeeperException) {
095            throw (KeeperException)ex.getCause();
096          } else {
097            throw new RuntimeException(ex.getCause());
098          }
099        } catch (InterruptedException ex) {
100          Thread.currentThread().interrupt();
101        }
102      }
103    } finally {
104      initialized.countDown();
105    }
106  }
107
108  @Override
109  public void close() {
110    executor.shutdown();
111  }
112
113  private void waitUntilStarted() {
114    try {
115      initialized.await();
116    } catch (InterruptedException e) {
117      LOG.warn("Interrupted while waiting for start", e);
118      Thread.currentThread().interrupt();
119    }
120  }
121
122  @Override
123  public void nodeCreated(String path) {
124    waitUntilStarted();
125    if (path.equals(aclZNode)) {
126      asyncProcessNodeUpdate(new Runnable() {
127        @Override
128        public void run() {
129          try {
130            List<ZKUtil.NodeAndData> nodes =
131                ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
132            refreshNodes(nodes);
133          } catch (KeeperException ke) {
134            LOG.error("Error reading data from zookeeper", ke);
135            // only option is to abort
136            watcher.abort("ZooKeeper error obtaining acl node children", ke);
137          }
138        }
139      });
140    }
141  }
142
143  @Override
144  public void nodeDeleted(final String path) {
145    waitUntilStarted();
146    if (aclZNode.equals(ZKUtil.getParent(path))) {
147      asyncProcessNodeUpdate(new Runnable() {
148        @Override
149        public void run() {
150          String table = ZKUtil.getNodeName(path);
151          if (PermissionStorage.isNamespaceEntry(table)) {
152            authManager.removeNamespace(Bytes.toBytes(table));
153          } else {
154            authManager.removeTable(TableName.valueOf(table));
155          }
156        }
157      });
158    }
159  }
160
161  @Override
162  public void nodeDataChanged(final String path) {
163    waitUntilStarted();
164    if (aclZNode.equals(ZKUtil.getParent(path))) {
165      asyncProcessNodeUpdate(new Runnable() {
166        @Override
167        public void run() {
168          // update cache on an existing table node
169          String entry = ZKUtil.getNodeName(path);
170          try {
171            byte[] data = ZKUtil.getDataAndWatch(watcher, path);
172            refreshAuthManager(entry, data);
173          } catch (KeeperException ke) {
174            LOG.error("Error reading data from zookeeper for node " + entry, ke);
175            // only option is to abort
176            watcher.abort("ZooKeeper error getting data for node " + entry, ke);
177          } catch (IOException ioe) {
178            LOG.error("Error reading permissions writables", ioe);
179          }
180        }
181      });
182    }
183  }
184
185
186  @Override
187  public void nodeChildrenChanged(final String path) {
188    waitUntilStarted();
189    if (path.equals(aclZNode)) {
190      // preempt any existing nodeChildrenChanged event processing
191      if (childrenChangedFuture != null && !childrenChangedFuture.isDone()) {
192        boolean cancelled = childrenChangedFuture.cancel(true);
193        if (!cancelled) {
194          // task may have finished between our check and attempted cancel, this is fine.
195          if (!childrenChangedFuture.isDone()) {
196            LOG.warn("Could not cancel processing node children changed event, "
197              + "please file a JIRA and attach logs if possible.");
198          }
199        }
200      }
201      childrenChangedFuture = asyncProcessNodeUpdate(() -> {
202        try {
203          final List<ZKUtil.NodeAndData> nodeList =
204            ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode, false);
205          refreshNodes(nodeList);
206        } catch (KeeperException ke) {
207          String msg = "ZooKeeper error while reading node children data for path " + path;
208          LOG.error(msg, ke);
209          watcher.abort(msg, ke);
210        }
211      });
212    }
213  }
214
215  private Future<?> asyncProcessNodeUpdate(Runnable runnable) {
216    if (!executor.isShutdown()) {
217      try {
218        return executor.submit(runnable);
219      } catch (RejectedExecutionException e) {
220        if (executor.isShutdown()) {
221          LOG.warn("aclZNode changed after ZKPermissionWatcher was shutdown");
222        } else {
223          throw e;
224        }
225      }
226    }
227    return null; // No task launched so there will be nothing to cancel later
228  }
229
230  private void refreshNodes(List<ZKUtil.NodeAndData> nodes) {
231    for (ZKUtil.NodeAndData n : nodes) {
232      if (Thread.interrupted()) {
233        // Use Thread.interrupted so that we clear interrupt status
234        break;
235      }
236      if (n.isEmpty()) continue;
237      String path = n.getNode();
238      String entry = (ZKUtil.getNodeName(path));
239      try {
240        refreshAuthManager(entry, n.getData());
241      } catch (IOException ioe) {
242        LOG.error("Failed parsing permissions for table '" + entry +
243            "' from zk", ioe);
244      }
245    }
246  }
247
248  private void refreshAuthManager(String entry, byte[] nodeData) throws IOException {
249    if (LOG.isDebugEnabled()) {
250      LOG.debug("Updating permissions cache from {} with data {}", entry,
251          Bytes.toStringBinary(nodeData));
252    }
253    if (PermissionStorage.isNamespaceEntry(entry)) {
254      authManager.refreshNamespaceCacheFromWritable(PermissionStorage.fromNamespaceEntry(entry),
255        nodeData);
256    } else {
257      authManager.refreshTableCacheFromWritable(TableName.valueOf(entry), nodeData);
258    }
259  }
260
261  /***
262   * Write a table's access controls to the permissions mirror in zookeeper
263   * @param entry
264   * @param permsData
265   */
266  public void writeToZookeeper(byte[] entry, byte[] permsData) {
267    String entryName = Bytes.toString(entry);
268    String zkNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, ACL_NODE);
269    zkNode = ZNodePaths.joinZNode(zkNode, entryName);
270
271    try {
272      ZKUtil.createWithParents(watcher, zkNode);
273      ZKUtil.updateExistingNodeData(watcher, zkNode, permsData, -1);
274    } catch (KeeperException e) {
275      LOG.error("Failed updating permissions for entry '" +
276          entryName + "'", e);
277      watcher.abort("Failed writing node "+zkNode+" to zookeeper", e);
278    }
279  }
280
281  /***
282   * Delete the acl notify node of table
283   * @param tableName
284   */
285  public void deleteTableACLNode(final TableName tableName) {
286    String zkNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, ACL_NODE);
287    zkNode = ZNodePaths.joinZNode(zkNode, tableName.getNameAsString());
288
289    try {
290      ZKUtil.deleteNode(watcher, zkNode);
291    } catch (KeeperException.NoNodeException e) {
292      LOG.warn("No acl notify node of table '" + tableName + "'");
293    } catch (KeeperException e) {
294      LOG.error("Failed deleting acl node of table '" + tableName + "'", e);
295      watcher.abort("Failed deleting node " + zkNode, e);
296    }
297  }
298
299  /***
300   * Delete the acl notify node of namespace
301   */
302  public void deleteNamespaceACLNode(final String namespace) {
303    String zkNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, ACL_NODE);
304    zkNode = ZNodePaths.joinZNode(zkNode, PermissionStorage.NAMESPACE_PREFIX + namespace);
305
306    try {
307      ZKUtil.deleteNode(watcher, zkNode);
308    } catch (KeeperException.NoNodeException e) {
309      LOG.warn("No acl notify node of namespace '" + namespace + "'");
310    } catch (KeeperException e) {
311      LOG.error("Failed deleting acl node of namespace '" + namespace + "'", e);
312      watcher.abort("Failed deleting node " + zkNode, e);
313    }
314  }
315}