001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase; 020 021import java.io.IOException; 022import java.util.List; 023import java.util.NavigableMap; 024import java.util.NavigableSet; 025import java.util.concurrent.ConcurrentSkipListMap; 026 027import org.apache.hadoop.hbase.util.Bytes; 028import org.apache.hadoop.hbase.zookeeper.ZKListener; 029import org.apache.hadoop.hbase.zookeeper.ZKUtil; 030import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 031import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.apache.zookeeper.KeeperException; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 038import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 039import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 040 041/** 042 * Class servers two purposes: 043 * 044 * 1. Broadcast NamespaceDescriptor information via ZK 045 * (Done by the Master) 046 * 2. Consume broadcasted NamespaceDescriptor changes 047 * (Done by the RegionServers) 048 * 049 */ 050@InterfaceAudience.Private 051public class ZKNamespaceManager extends ZKListener { 052 private static final Logger LOG = LoggerFactory.getLogger(ZKNamespaceManager.class); 053 private final String nsZNode; 054 private final NavigableMap<String,NamespaceDescriptor> cache; 055 056 public ZKNamespaceManager(ZKWatcher zkw) throws IOException { 057 super(zkw); 058 nsZNode = zkw.znodePaths.namespaceZNode; 059 cache = new ConcurrentSkipListMap<>(); 060 } 061 062 public void start() throws IOException { 063 watcher.registerListener(this); 064 try { 065 if (ZKUtil.watchAndCheckExists(watcher, nsZNode)) { 066 List<ZKUtil.NodeAndData> existing = 067 ZKUtil.getChildDataAndWatchForNewChildren(watcher, nsZNode); 068 if (existing != null) { 069 refreshNodes(existing); 070 } 071 } else { 072 ZKUtil.createWithParents(watcher, nsZNode); 073 } 074 } catch (KeeperException e) { 075 throw new IOException("Failed to initialize ZKNamespaceManager", e); 076 } 077 } 078 079 public void stop() throws IOException { 080 this.watcher.unregisterListener(this); 081 } 082 083 public NamespaceDescriptor get(String name) { 084 return cache.get(name); 085 } 086 087 public void update(NamespaceDescriptor ns) throws IOException { 088 writeNamespace(ns); 089 cache.put(ns.getName(), ns); 090 } 091 092 public void remove(String name) throws IOException { 093 deleteNamespace(name); 094 cache.remove(name); 095 } 096 097 public NavigableSet<NamespaceDescriptor> list() throws IOException { 098 NavigableSet<NamespaceDescriptor> ret = 099 Sets.newTreeSet(NamespaceDescriptor.NAMESPACE_DESCRIPTOR_COMPARATOR); 100 for(NamespaceDescriptor ns: cache.values()) { 101 ret.add(ns); 102 } 103 return ret; 104 } 105 106 @Override 107 public void nodeCreated(String path) { 108 if (nsZNode.equals(path)) { 109 try { 110 List<ZKUtil.NodeAndData> nodes = 111 ZKUtil.getChildDataAndWatchForNewChildren(watcher, nsZNode); 112 refreshNodes(nodes); 113 } catch (KeeperException ke) { 114 String msg = "Error reading data from zookeeper"; 115 LOG.error(msg, ke); 116 watcher.abort(msg, ke); 117 } catch (IOException e) { 118 String msg = "Error parsing data from zookeeper"; 119 LOG.error(msg, e); 120 watcher.abort(msg, e); 121 } 122 } 123 } 124 125 @Override 126 public void nodeDeleted(String path) { 127 if (nsZNode.equals(ZKUtil.getParent(path))) { 128 String nsName = ZKUtil.getNodeName(path); 129 cache.remove(nsName); 130 } 131 } 132 133 @Override 134 public void nodeDataChanged(String path) { 135 if (nsZNode.equals(ZKUtil.getParent(path))) { 136 try { 137 byte[] data = ZKUtil.getDataAndWatch(watcher, path); 138 NamespaceDescriptor ns = 139 ProtobufUtil.toNamespaceDescriptor( 140 HBaseProtos.NamespaceDescriptor.parseFrom(data)); 141 cache.put(ns.getName(), ns); 142 } catch (KeeperException ke) { 143 String msg = "Error reading data from zookeeper for node "+path; 144 LOG.error(msg, ke); 145 // only option is to abort 146 watcher.abort(msg, ke); 147 } catch (IOException ioe) { 148 String msg = "Error deserializing namespace: "+path; 149 LOG.error(msg, ioe); 150 watcher.abort(msg, ioe); 151 } 152 } 153 } 154 155 @Override 156 public void nodeChildrenChanged(String path) { 157 if (nsZNode.equals(path)) { 158 try { 159 List<ZKUtil.NodeAndData> nodes = 160 ZKUtil.getChildDataAndWatchForNewChildren(watcher, nsZNode); 161 refreshNodes(nodes); 162 } catch (KeeperException ke) { 163 LOG.error("Error reading data from zookeeper for path "+path, ke); 164 watcher.abort("ZooKeeper error get node children for path "+path, ke); 165 } catch (IOException e) { 166 LOG.error("Error deserializing namespace child from: "+path, e); 167 watcher.abort("Error deserializing namespace child from: " + path, e); 168 } 169 } 170 } 171 172 private void deleteNamespace(String name) throws IOException { 173 String zNode = ZNodePaths.joinZNode(nsZNode, name); 174 try { 175 ZKUtil.deleteNode(watcher, zNode); 176 } catch (KeeperException e) { 177 if (e instanceof KeeperException.NoNodeException) { 178 // If the node does not exist, it could be already deleted. Continue without fail. 179 LOG.warn("The ZNode " + zNode + " for namespace " + name + " does not exist."); 180 } else { 181 LOG.error("Failed updating permissions for namespace " + name, e); 182 throw new IOException("Failed updating permissions for namespace " + name, e); 183 } 184 } 185 } 186 187 private void writeNamespace(NamespaceDescriptor ns) throws IOException { 188 String zNode = ZNodePaths.joinZNode(nsZNode, ns.getName()); 189 try { 190 ZKUtil.createWithParents(watcher, zNode); 191 ZKUtil.updateExistingNodeData(watcher, zNode, 192 ProtobufUtil.toProtoNamespaceDescriptor(ns).toByteArray(), -1); 193 } catch (KeeperException e) { 194 LOG.error("Failed updating permissions for namespace "+ns.getName(), e); 195 throw new IOException("Failed updating permissions for namespace "+ns.getName(), e); 196 } 197 } 198 199 private void refreshNodes(List<ZKUtil.NodeAndData> nodes) throws IOException { 200 for (ZKUtil.NodeAndData n : nodes) { 201 if (n.isEmpty()) continue; 202 String path = n.getNode(); 203 String namespace = ZKUtil.getNodeName(path); 204 byte[] nodeData = n.getData(); 205 if (LOG.isTraceEnabled()) { 206 LOG.trace("Updating namespace cache from node " + namespace + " with data: " + 207 Bytes.toStringBinary(nodeData)); 208 } 209 NamespaceDescriptor ns = 210 ProtobufUtil.toNamespaceDescriptor( 211 HBaseProtos.NamespaceDescriptor.parseFrom(nodeData)); 212 cache.put(ns.getName(), ns); 213 } 214 } 215}