001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.RegionInfo.DEFAULT_REPLICA_ID; 021import static org.apache.hadoop.hbase.client.RegionInfoBuilder.FIRST_META_REGIONINFO; 022import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForDefaultReplica; 023import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForReplica; 024import static org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.lengthOfPBMagic; 025import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture; 026import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 027import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData; 028 029import java.io.IOException; 030import java.util.List; 031import java.util.concurrent.CompletableFuture; 032import java.util.stream.Collectors; 033import org.apache.commons.lang3.mutable.MutableInt; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.ClusterId; 036import org.apache.hadoop.hbase.HBaseInterfaceAudience; 037import org.apache.hadoop.hbase.HRegionLocation; 038import org.apache.hadoop.hbase.RegionLocations; 039import org.apache.hadoop.hbase.ServerName; 040import org.apache.hadoop.hbase.exceptions.DeserializationException; 041import org.apache.hadoop.hbase.master.RegionState; 042import org.apache.hadoop.hbase.security.User; 043import org.apache.hadoop.hbase.util.Pair; 044import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient; 045import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 046import org.apache.yetus.audience.InterfaceAudience; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 051import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos; 052 053/** 054 * Zookeeper based registry implementation. 055 * @deprecated As of 2.6.0, replaced by {@link RpcConnectionRegistry}, which is the default 056 * connection mechanism as of 3.0.0. Expected to be removed in 4.0.0. 057 * @see <a href="https://issues.apache.org/jira/browse/HBASE-23324">HBASE-23324</a> and its parent 058 * ticket for details. 059 */ 060@Deprecated 061@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 062class ZKConnectionRegistry implements ConnectionRegistry { 063 064 private static final Logger LOG = LoggerFactory.getLogger(ZKConnectionRegistry.class); 065 066 private static final Object WARN_LOCK = new Object(); 067 private static volatile boolean NEEDS_LOG_WARN = true; 068 069 private final ReadOnlyZKClient zk; 070 071 private final ZNodePaths znodePaths; 072 073 // User not used, but for rpc based registry we need it 074 ZKConnectionRegistry(Configuration conf, User ignored) { 075 this.znodePaths = new ZNodePaths(conf); 076 this.zk = new ReadOnlyZKClient(conf); 077 if (NEEDS_LOG_WARN) { 078 synchronized (WARN_LOCK) { 079 if (NEEDS_LOG_WARN) { 080 LOG.warn( 081 "ZKConnectionRegistry is deprecated. See https://hbase.apache.org/book.html#client.rpcconnectionregistry"); 082 NEEDS_LOG_WARN = false; 083 } 084 } 085 } 086 } 087 088 private interface Converter<T> { 089 T convert(byte[] data) throws Exception; 090 } 091 092 private <T> CompletableFuture<T> getAndConvert(String path, Converter<T> converter) { 093 CompletableFuture<T> future = new CompletableFuture<>(); 094 addListener(zk.get(path), (data, error) -> { 095 if (error != null) { 096 future.completeExceptionally(error); 097 return; 098 } 099 try { 100 future.complete(converter.convert(data)); 101 } catch (Exception e) { 102 future.completeExceptionally(e); 103 } 104 }); 105 return future; 106 } 107 108 private static String getClusterId(byte[] data) throws DeserializationException { 109 if (data == null || data.length == 0) { 110 return null; 111 } 112 data = removeMetaData(data); 113 return ClusterId.parseFrom(data).toString(); 114 } 115 116 @Override 117 public CompletableFuture<String> getClusterId() { 118 return tracedFuture( 119 () -> getAndConvert(znodePaths.clusterIdZNode, ZKConnectionRegistry::getClusterId), 120 "ZKConnectionRegistry.getClusterId"); 121 } 122 123 ReadOnlyZKClient getZKClient() { 124 return zk; 125 } 126 127 private static ZooKeeperProtos.MetaRegionServer getMetaProto(byte[] data) throws IOException { 128 if (data == null || data.length == 0) { 129 return null; 130 } 131 data = removeMetaData(data); 132 int prefixLen = lengthOfPBMagic(); 133 return ZooKeeperProtos.MetaRegionServer.parser().parseFrom(data, prefixLen, 134 data.length - prefixLen); 135 } 136 137 private static void tryComplete(MutableInt remaining, HRegionLocation[] locs, 138 CompletableFuture<RegionLocations> future) { 139 remaining.decrement(); 140 if (remaining.intValue() > 0) { 141 return; 142 } 143 future.complete(new RegionLocations(locs)); 144 } 145 146 private Pair<RegionState.State, ServerName> 147 getStateAndServerName(ZooKeeperProtos.MetaRegionServer proto) { 148 RegionState.State state; 149 if (proto.hasState()) { 150 state = RegionState.State.convert(proto.getState()); 151 } else { 152 state = RegionState.State.OPEN; 153 } 154 HBaseProtos.ServerName snProto = proto.getServer(); 155 return Pair.newPair(state, 156 ServerName.valueOf(snProto.getHostName(), snProto.getPort(), snProto.getStartCode())); 157 } 158 159 private void getMetaRegionLocation(CompletableFuture<RegionLocations> future, 160 List<String> metaReplicaZNodes) { 161 if (metaReplicaZNodes.isEmpty()) { 162 future.completeExceptionally(new IOException("No meta znode available")); 163 } 164 HRegionLocation[] locs = new HRegionLocation[metaReplicaZNodes.size()]; 165 MutableInt remaining = new MutableInt(locs.length); 166 for (String metaReplicaZNode : metaReplicaZNodes) { 167 int replicaId = znodePaths.getMetaReplicaIdFromZNode(metaReplicaZNode); 168 String path = ZNodePaths.joinZNode(znodePaths.baseZNode, metaReplicaZNode); 169 if (replicaId == DEFAULT_REPLICA_ID) { 170 addListener(getAndConvert(path, ZKConnectionRegistry::getMetaProto), (proto, error) -> { 171 if (error != null) { 172 future.completeExceptionally(error); 173 return; 174 } 175 if (proto == null) { 176 future.completeExceptionally(new IOException("Meta znode is null")); 177 return; 178 } 179 Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto); 180 if (stateAndServerName.getFirst() != RegionState.State.OPEN) { 181 LOG.warn("Meta region is in state " + stateAndServerName.getFirst()); 182 } 183 locs[DEFAULT_REPLICA_ID] = new HRegionLocation( 184 getRegionInfoForDefaultReplica(FIRST_META_REGIONINFO), stateAndServerName.getSecond()); 185 tryComplete(remaining, locs, future); 186 }); 187 } else { 188 addListener(getAndConvert(path, ZKConnectionRegistry::getMetaProto), (proto, error) -> { 189 if (future.isDone()) { 190 return; 191 } 192 if (error != null) { 193 LOG.warn("Failed to fetch " + path, error); 194 locs[replicaId] = null; 195 } else if (proto == null) { 196 LOG.warn("Meta znode for replica " + replicaId + " is null"); 197 locs[replicaId] = null; 198 } else { 199 Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto); 200 if (stateAndServerName.getFirst() != RegionState.State.OPEN) { 201 LOG.warn("Meta region for replica " + replicaId + " is in state " 202 + stateAndServerName.getFirst()); 203 locs[replicaId] = null; 204 } else { 205 locs[replicaId] = 206 new HRegionLocation(getRegionInfoForReplica(FIRST_META_REGIONINFO, replicaId), 207 stateAndServerName.getSecond()); 208 } 209 } 210 tryComplete(remaining, locs, future); 211 }); 212 } 213 } 214 } 215 216 @Override 217 public CompletableFuture<RegionLocations> getMetaRegionLocations() { 218 return tracedFuture(() -> { 219 CompletableFuture<RegionLocations> future = new CompletableFuture<>(); 220 addListener( 221 zk.list(znodePaths.baseZNode).thenApply(children -> children.stream() 222 .filter(c -> this.znodePaths.isMetaZNodePrefix(c)).collect(Collectors.toList())), 223 (metaReplicaZNodes, error) -> { 224 if (error != null) { 225 future.completeExceptionally(error); 226 return; 227 } 228 getMetaRegionLocation(future, metaReplicaZNodes); 229 }); 230 return future; 231 }, "ZKConnectionRegistry.getMetaRegionLocations"); 232 } 233 234 private static ZooKeeperProtos.Master getMasterProto(byte[] data) throws IOException { 235 if (data == null || data.length == 0) { 236 return null; 237 } 238 data = removeMetaData(data); 239 int prefixLen = lengthOfPBMagic(); 240 return ZooKeeperProtos.Master.parser().parseFrom(data, prefixLen, data.length - prefixLen); 241 } 242 243 @Override 244 public CompletableFuture<ServerName> getActiveMaster() { 245 return tracedFuture( 246 () -> getAndConvert(znodePaths.masterAddressZNode, ZKConnectionRegistry::getMasterProto) 247 .thenApply(proto -> { 248 if (proto == null) { 249 return null; 250 } 251 HBaseProtos.ServerName snProto = proto.getMaster(); 252 return ServerName.valueOf(snProto.getHostName(), snProto.getPort(), 253 snProto.getStartCode()); 254 }), 255 "ZKConnectionRegistry.getActiveMaster"); 256 } 257 258 @Override 259 public String getConnectionString() { 260 final String serverList = zk.getConnectString(); 261 final String baseZNode = znodePaths.baseZNode; 262 return serverList + ":" + baseZNode; 263 } 264 265 @Override 266 public void close() { 267 zk.close(); 268 } 269}