001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.RegionInfo.DEFAULT_REPLICA_ID; 021import static org.apache.hadoop.hbase.client.RegionInfoBuilder.FIRST_META_REGIONINFO; 022import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForDefaultReplica; 023import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForReplica; 024import static org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.lengthOfPBMagic; 025import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture; 026import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 027import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData; 028 029import java.io.IOException; 030import java.util.List; 031import java.util.concurrent.CompletableFuture; 032import java.util.stream.Collectors; 033import org.apache.commons.lang3.mutable.MutableInt; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.ClusterId; 036import org.apache.hadoop.hbase.HBaseInterfaceAudience; 037import org.apache.hadoop.hbase.HRegionLocation; 038import org.apache.hadoop.hbase.RegionLocations; 039import org.apache.hadoop.hbase.ServerName; 040import org.apache.hadoop.hbase.exceptions.DeserializationException; 041import org.apache.hadoop.hbase.master.RegionState; 042import org.apache.hadoop.hbase.security.User; 043import org.apache.hadoop.hbase.util.Pair; 044import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient; 045import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 046import org.apache.yetus.audience.InterfaceAudience; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 051import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos; 052 053/** 054 * Zookeeper based registry implementation. 055 * @deprecated As of 2.6.0, replaced by {@link RpcConnectionRegistry}, which is the default 056 * connection mechanism as of 3.0.0. Expected to be removed in 4.0.0. 057 * @see <a href="https://issues.apache.org/jira/browse/HBASE-23324">HBASE-23324</a> and its parent 058 * ticket for details. 059 */ 060@Deprecated 061@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 062class ZKConnectionRegistry implements ConnectionRegistry { 063 064 private static final Logger LOG = LoggerFactory.getLogger(ZKConnectionRegistry.class); 065 066 private static final Object WARN_LOCK = new Object(); 067 private static volatile boolean NEEDS_LOG_WARN = true; 068 069 private final ReadOnlyZKClient zk; 070 071 private final ZNodePaths znodePaths; 072 private final Configuration conf; 073 private final int zkRegistryAsyncTimeout; 074 public static final String ZK_REGISTRY_ASYNC_GET_TIMEOUT = "zookeeper.registry.async.get.timeout"; 075 public static final int DEFAULT_ZK_REGISTRY_ASYNC_GET_TIMEOUT = 10000; // 10 sec 076 // User not used, but for rpc based registry we need it 077 078 ZKConnectionRegistry(Configuration conf, User ignored) { 079 this.znodePaths = new ZNodePaths(conf); 080 this.zk = new ReadOnlyZKClient(conf, AsyncConnectionImpl.RETRY_TIMER); 081 this.conf = conf; 082 this.zkRegistryAsyncTimeout = 083 conf.getInt(ZK_REGISTRY_ASYNC_GET_TIMEOUT, DEFAULT_ZK_REGISTRY_ASYNC_GET_TIMEOUT); 084 if (NEEDS_LOG_WARN) { 085 synchronized (WARN_LOCK) { 086 if (NEEDS_LOG_WARN) { 087 LOG.warn( 088 "ZKConnectionRegistry is deprecated. See https://hbase.apache.org/book.html#client.rpcconnectionregistry"); 089 NEEDS_LOG_WARN = false; 090 } 091 } 092 } 093 } 094 095 private interface Converter<T> { 096 T convert(byte[] data) throws Exception; 097 } 098 099 private <T> CompletableFuture<T> getAndConvert(String path, Converter<T> converter) { 100 CompletableFuture<T> future = new CompletableFuture<>(); 101 addListener(zk.get(path, this.zkRegistryAsyncTimeout), (data, error) -> { 102 if (error != null) { 103 future.completeExceptionally(error); 104 return; 105 } 106 try { 107 future.complete(converter.convert(data)); 108 } catch (Exception e) { 109 future.completeExceptionally(e); 110 } 111 }); 112 return future; 113 } 114 115 private static String getClusterId(byte[] data) throws DeserializationException { 116 if (data == null || data.length == 0) { 117 return null; 118 } 119 data = removeMetaData(data); 120 return ClusterId.parseFrom(data).toString(); 121 } 122 123 @Override 124 public CompletableFuture<String> getClusterId() { 125 return tracedFuture( 126 () -> getAndConvert(znodePaths.clusterIdZNode, ZKConnectionRegistry::getClusterId), 127 "ZKConnectionRegistry.getClusterId"); 128 } 129 130 ReadOnlyZKClient getZKClient() { 131 return zk; 132 } 133 134 private static ZooKeeperProtos.MetaRegionServer getMetaProto(byte[] data) throws IOException { 135 if (data == null || data.length == 0) { 136 return null; 137 } 138 data = removeMetaData(data); 139 int prefixLen = lengthOfPBMagic(); 140 return ZooKeeperProtos.MetaRegionServer.parser().parseFrom(data, prefixLen, 141 data.length - prefixLen); 142 } 143 144 private static void tryComplete(MutableInt remaining, HRegionLocation[] locs, 145 CompletableFuture<RegionLocations> future) { 146 remaining.decrement(); 147 if (remaining.intValue() > 0) { 148 return; 149 } 150 future.complete(new RegionLocations(locs)); 151 } 152 153 private Pair<RegionState.State, ServerName> 154 getStateAndServerName(ZooKeeperProtos.MetaRegionServer proto) { 155 RegionState.State state; 156 if (proto.hasState()) { 157 state = RegionState.State.convert(proto.getState()); 158 } else { 159 state = RegionState.State.OPEN; 160 } 161 HBaseProtos.ServerName snProto = proto.getServer(); 162 return Pair.newPair(state, 163 ServerName.valueOf(snProto.getHostName(), snProto.getPort(), snProto.getStartCode())); 164 } 165 166 private void getMetaRegionLocation(CompletableFuture<RegionLocations> future, 167 List<String> metaReplicaZNodes) { 168 if (metaReplicaZNodes.isEmpty()) { 169 future.completeExceptionally(new IOException("No meta znode available")); 170 } 171 HRegionLocation[] locs = new HRegionLocation[metaReplicaZNodes.size()]; 172 MutableInt remaining = new MutableInt(locs.length); 173 for (String metaReplicaZNode : metaReplicaZNodes) { 174 int replicaId = znodePaths.getMetaReplicaIdFromZNode(metaReplicaZNode); 175 String path = ZNodePaths.joinZNode(znodePaths.baseZNode, metaReplicaZNode); 176 if (replicaId == DEFAULT_REPLICA_ID) { 177 addListener(getAndConvert(path, ZKConnectionRegistry::getMetaProto), (proto, error) -> { 178 if (error != null) { 179 future.completeExceptionally(error); 180 return; 181 } 182 if (proto == null) { 183 future.completeExceptionally(new IOException("Meta znode is null")); 184 return; 185 } 186 Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto); 187 if (stateAndServerName.getFirst() != RegionState.State.OPEN) { 188 LOG.warn("Meta region is in state " + stateAndServerName.getFirst()); 189 } 190 locs[DEFAULT_REPLICA_ID] = new HRegionLocation( 191 getRegionInfoForDefaultReplica(FIRST_META_REGIONINFO), stateAndServerName.getSecond()); 192 tryComplete(remaining, locs, future); 193 }); 194 } else { 195 addListener(getAndConvert(path, ZKConnectionRegistry::getMetaProto), (proto, error) -> { 196 if (future.isDone()) { 197 return; 198 } 199 if (error != null) { 200 LOG.warn("Failed to fetch " + path, error); 201 locs[replicaId] = null; 202 } else if (proto == null) { 203 LOG.warn("Meta znode for replica " + replicaId + " is null"); 204 locs[replicaId] = null; 205 } else { 206 Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto); 207 if (stateAndServerName.getFirst() != RegionState.State.OPEN) { 208 LOG.warn("Meta region for replica " + replicaId + " is in state " 209 + stateAndServerName.getFirst()); 210 locs[replicaId] = null; 211 } else { 212 locs[replicaId] = 213 new HRegionLocation(getRegionInfoForReplica(FIRST_META_REGIONINFO, replicaId), 214 stateAndServerName.getSecond()); 215 } 216 } 217 tryComplete(remaining, locs, future); 218 }); 219 } 220 } 221 } 222 223 @Override 224 public CompletableFuture<RegionLocations> getMetaRegionLocations() { 225 return tracedFuture(() -> { 226 CompletableFuture<RegionLocations> future = new CompletableFuture<>(); 227 addListener( 228 zk.list(znodePaths.baseZNode, this.zkRegistryAsyncTimeout).thenApply(children -> children 229 .stream().filter(c -> this.znodePaths.isMetaZNodePrefix(c)).collect(Collectors.toList())), 230 (metaReplicaZNodes, error) -> { 231 if (error != null) { 232 future.completeExceptionally(error); 233 return; 234 } 235 getMetaRegionLocation(future, metaReplicaZNodes); 236 }); 237 return future; 238 }, "ZKConnectionRegistry.getMetaRegionLocations"); 239 } 240 241 private static ZooKeeperProtos.Master getMasterProto(byte[] data) throws IOException { 242 if (data == null || data.length == 0) { 243 return null; 244 } 245 data = removeMetaData(data); 246 int prefixLen = lengthOfPBMagic(); 247 return ZooKeeperProtos.Master.parser().parseFrom(data, prefixLen, data.length - prefixLen); 248 } 249 250 @Override 251 public CompletableFuture<ServerName> getActiveMaster() { 252 return tracedFuture( 253 () -> getAndConvert(znodePaths.masterAddressZNode, ZKConnectionRegistry::getMasterProto) 254 .thenApply(proto -> { 255 if (proto == null) { 256 return null; 257 } 258 HBaseProtos.ServerName snProto = proto.getMaster(); 259 return ServerName.valueOf(snProto.getHostName(), snProto.getPort(), 260 snProto.getStartCode()); 261 }), 262 "ZKConnectionRegistry.getActiveMaster"); 263 } 264 265 @Override 266 public String getConnectionString() { 267 final String serverList = zk.getConnectString(); 268 final String baseZNode = znodePaths.baseZNode; 269 return serverList + ":" + baseZNode; 270 } 271 272 @Override 273 public void close() { 274 zk.close(); 275 } 276}