001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.IOException; 021import java.util.List; 022import java.util.NavigableMap; 023import java.util.NavigableSet; 024import java.util.concurrent.ConcurrentSkipListMap; 025import org.apache.hadoop.hbase.util.Bytes; 026import org.apache.hadoop.hbase.zookeeper.ZKListener; 027import org.apache.hadoop.hbase.zookeeper.ZKUtil; 028import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 029import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.apache.zookeeper.KeeperException; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 036 037import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 038import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 039 040/** 041 * Class servers two purposes: 1. Broadcast NamespaceDescriptor information via ZK (Done by the 042 * Master) 2. Consume broadcasted NamespaceDescriptor changes (Done by the RegionServers) 043 */ 044@InterfaceAudience.Private 045public class ZKNamespaceManager extends ZKListener { 046 private static final Logger LOG = LoggerFactory.getLogger(ZKNamespaceManager.class); 047 private final String nsZNode; 048 private final NavigableMap<String, NamespaceDescriptor> cache; 049 050 public ZKNamespaceManager(ZKWatcher zkw) throws IOException { 051 super(zkw); 052 nsZNode = zkw.getZNodePaths().namespaceZNode; 053 cache = new ConcurrentSkipListMap<>(); 054 } 055 056 public void start() throws IOException { 057 watcher.registerListener(this); 058 try { 059 if (ZKUtil.watchAndCheckExists(watcher, nsZNode)) { 060 List<ZKUtil.NodeAndData> existing = 061 ZKUtil.getChildDataAndWatchForNewChildren(watcher, nsZNode); 062 if (existing != null) { 063 refreshNodes(existing); 064 } 065 } else { 066 ZKUtil.createWithParents(watcher, nsZNode); 067 } 068 } catch (KeeperException e) { 069 throw new IOException("Failed to initialize ZKNamespaceManager", e); 070 } 071 } 072 073 public void stop() throws IOException { 074 this.watcher.unregisterListener(this); 075 } 076 077 public NamespaceDescriptor get(String name) { 078 return cache.get(name); 079 } 080 081 public void update(NamespaceDescriptor ns) throws IOException { 082 writeNamespace(ns); 083 cache.put(ns.getName(), ns); 084 } 085 086 public void remove(String name) throws IOException { 087 deleteNamespace(name); 088 cache.remove(name); 089 } 090 091 public NavigableSet<NamespaceDescriptor> list() throws IOException { 092 NavigableSet<NamespaceDescriptor> ret = 093 Sets.newTreeSet(NamespaceDescriptor.NAMESPACE_DESCRIPTOR_COMPARATOR); 094 for (NamespaceDescriptor ns : cache.values()) { 095 ret.add(ns); 096 } 097 return ret; 098 } 099 100 @Override 101 public void nodeCreated(String path) { 102 if (nsZNode.equals(path)) { 103 try { 104 List<ZKUtil.NodeAndData> nodes = 105 ZKUtil.getChildDataAndWatchForNewChildren(watcher, nsZNode); 106 refreshNodes(nodes); 107 } catch (KeeperException ke) { 108 String msg = "Error reading data from zookeeper"; 109 LOG.error(msg, ke); 110 watcher.abort(msg, ke); 111 } catch (IOException e) { 112 String msg = "Error parsing data from zookeeper"; 113 LOG.error(msg, e); 114 watcher.abort(msg, e); 115 } 116 } 117 } 118 119 @Override 120 public void nodeDeleted(String path) { 121 if (nsZNode.equals(ZKUtil.getParent(path))) { 122 String nsName = ZKUtil.getNodeName(path); 123 cache.remove(nsName); 124 } 125 } 126 127 @Override 128 public void nodeDataChanged(String path) { 129 if (nsZNode.equals(ZKUtil.getParent(path))) { 130 try { 131 byte[] data = ZKUtil.getDataAndWatch(watcher, path); 132 NamespaceDescriptor ns = 133 ProtobufUtil.toNamespaceDescriptor(HBaseProtos.NamespaceDescriptor.parseFrom(data)); 134 cache.put(ns.getName(), ns); 135 } catch (KeeperException ke) { 136 String msg = "Error reading data from zookeeper for node " + path; 137 LOG.error(msg, ke); 138 // only option is to abort 139 watcher.abort(msg, ke); 140 } catch (IOException ioe) { 141 String msg = "Error deserializing namespace: " + path; 142 LOG.error(msg, ioe); 143 watcher.abort(msg, ioe); 144 } 145 } 146 } 147 148 @Override 149 public void nodeChildrenChanged(String path) { 150 if (nsZNode.equals(path)) { 151 try { 152 List<ZKUtil.NodeAndData> nodes = 153 ZKUtil.getChildDataAndWatchForNewChildren(watcher, nsZNode); 154 refreshNodes(nodes); 155 } catch (KeeperException ke) { 156 LOG.error("Error reading data from zookeeper for path " + path, ke); 157 watcher.abort("ZooKeeper error get node children for path " + path, ke); 158 } catch (IOException e) { 159 LOG.error("Error deserializing namespace child from: " + path, e); 160 watcher.abort("Error deserializing namespace child from: " + path, e); 161 } 162 } 163 } 164 165 private void deleteNamespace(String name) throws IOException { 166 String zNode = ZNodePaths.joinZNode(nsZNode, name); 167 try { 168 ZKUtil.deleteNode(watcher, zNode); 169 } catch (KeeperException e) { 170 if (e instanceof KeeperException.NoNodeException) { 171 // If the node does not exist, it could be already deleted. Continue without fail. 172 LOG.warn("The ZNode " + zNode + " for namespace " + name + " does not exist."); 173 } else { 174 LOG.error("Failed updating permissions for namespace " + name, e); 175 throw new IOException("Failed updating permissions for namespace " + name, e); 176 } 177 } 178 } 179 180 private void writeNamespace(NamespaceDescriptor ns) throws IOException { 181 String zNode = ZNodePaths.joinZNode(nsZNode, ns.getName()); 182 try { 183 ZKUtil.createWithParents(watcher, zNode); 184 ZKUtil.updateExistingNodeData(watcher, zNode, 185 ProtobufUtil.toProtoNamespaceDescriptor(ns).toByteArray(), -1); 186 } catch (KeeperException e) { 187 LOG.error("Failed updating permissions for namespace " + ns.getName(), e); 188 throw new IOException("Failed updating permissions for namespace " + ns.getName(), e); 189 } 190 } 191 192 private void refreshNodes(List<ZKUtil.NodeAndData> nodes) throws IOException { 193 for (ZKUtil.NodeAndData n : nodes) { 194 if (n.isEmpty()) continue; 195 String path = n.getNode(); 196 String namespace = ZKUtil.getNodeName(path); 197 byte[] nodeData = n.getData(); 198 if (LOG.isTraceEnabled()) { 199 LOG.trace("Updating namespace cache from node " + namespace + " with data: " 200 + Bytes.toStringBinary(nodeData)); 201 } 202 NamespaceDescriptor ns = 203 ProtobufUtil.toNamespaceDescriptor(HBaseProtos.NamespaceDescriptor.parseFrom(nodeData)); 204 cache.put(ns.getName(), ns); 205 } 206 } 207}