001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.RegionInfo.DEFAULT_REPLICA_ID; 021import static org.apache.hadoop.hbase.client.RegionInfoBuilder.FIRST_META_REGIONINFO; 022import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForDefaultReplica; 023import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForReplica; 024import static org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.lengthOfPBMagic; 025import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture; 026import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 027import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData; 028 029import java.io.IOException; 030import java.util.Collection; 031import java.util.List; 032import java.util.Map; 033import java.util.TreeMap; 034import java.util.concurrent.CompletableFuture; 035import java.util.stream.Collectors; 036import org.apache.commons.lang3.mutable.MutableInt; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.ClusterId; 039import org.apache.hadoop.hbase.HRegionLocation; 040import org.apache.hadoop.hbase.RegionLocations; 041import org.apache.hadoop.hbase.ServerName; 042import org.apache.hadoop.hbase.exceptions.DeserializationException; 043import org.apache.hadoop.hbase.master.RegionState; 044import org.apache.hadoop.hbase.util.Pair; 045import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient; 046import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 047import org.apache.yetus.audience.InterfaceAudience; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 052import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos; 053 054/** 055 * Zookeeper based registry implementation. 056 */ 057@InterfaceAudience.Private 058class ZKConnectionRegistry implements ConnectionRegistry { 059 060 private static final Logger LOG = LoggerFactory.getLogger(ZKConnectionRegistry.class); 061 062 private final ReadOnlyZKClient zk; 063 064 private final ZNodePaths znodePaths; 065 066 ZKConnectionRegistry(Configuration conf) { 067 this.znodePaths = new ZNodePaths(conf); 068 this.zk = new ReadOnlyZKClient(conf); 069 } 070 071 private interface Converter<T> { 072 T convert(byte[] data) throws Exception; 073 } 074 075 private <T> CompletableFuture<T> getAndConvert(String path, Converter<T> converter) { 076 CompletableFuture<T> future = new CompletableFuture<>(); 077 addListener(zk.get(path), (data, error) -> { 078 if (error != null) { 079 future.completeExceptionally(error); 080 return; 081 } 082 try { 083 future.complete(converter.convert(data)); 084 } catch (Exception e) { 085 future.completeExceptionally(e); 086 } 087 }); 088 return future; 089 } 090 091 private static String getClusterId(byte[] data) throws DeserializationException { 092 if (data == null || data.length == 0) { 093 return null; 094 } 095 data = removeMetaData(data); 096 return ClusterId.parseFrom(data).toString(); 097 } 098 099 @Override 100 public CompletableFuture<String> getClusterId() { 101 return tracedFuture( 102 () -> getAndConvert(znodePaths.clusterIdZNode, ZKConnectionRegistry::getClusterId), 103 "ZKConnectionRegistry.getClusterId"); 104 } 105 106 ReadOnlyZKClient getZKClient() { 107 return zk; 108 } 109 110 private static ZooKeeperProtos.MetaRegionServer getMetaProto(byte[] data) throws IOException { 111 if (data == null || data.length == 0) { 112 return null; 113 } 114 data = removeMetaData(data); 115 int prefixLen = lengthOfPBMagic(); 116 return ZooKeeperProtos.MetaRegionServer.parser().parseFrom(data, prefixLen, 117 data.length - prefixLen); 118 } 119 120 private static void tryComplete(MutableInt remaining, Collection<HRegionLocation> locs, 121 CompletableFuture<RegionLocations> future) { 122 remaining.decrement(); 123 if (remaining.intValue() > 0) { 124 return; 125 } 126 future.complete(new RegionLocations(locs)); 127 } 128 129 private Pair<RegionState.State, ServerName> 130 getStateAndServerName(ZooKeeperProtos.MetaRegionServer proto) { 131 RegionState.State state; 132 if (proto.hasState()) { 133 state = RegionState.State.convert(proto.getState()); 134 } else { 135 state = RegionState.State.OPEN; 136 } 137 HBaseProtos.ServerName snProto = proto.getServer(); 138 return Pair.newPair(state, 139 ServerName.valueOf(snProto.getHostName(), snProto.getPort(), snProto.getStartCode())); 140 } 141 142 private void getMetaRegionLocation(CompletableFuture<RegionLocations> future, 143 List<String> metaReplicaZNodes) { 144 if (metaReplicaZNodes.isEmpty()) { 145 future.completeExceptionally(new IOException("No meta znode available")); 146 } 147 // Note, the list of metaReplicaZNodes may be discontiguous regards replicaId; i.e. we may have 148 // a znode for the default -- replicaId=0 -- and perhaps replicaId '2' but be could be missing 149 // znode for replicaId '1'. This is a transient condition. Because of this we are careful 150 // accumulating locations. We use a Map so retries overwrite rather than aggregate and the 151 // Map sorts just to be kind to further processing. The Map will retain the discontinuity on 152 // replicaIds but on completion (of the future), the Map values are passed to the 153 // RegionLocations constructor which knows how to deal with discontinuities. 154 final Map<Integer, HRegionLocation> locs = new TreeMap<>(); 155 MutableInt remaining = new MutableInt(metaReplicaZNodes.size()); 156 for (String metaReplicaZNode : metaReplicaZNodes) { 157 int replicaId = znodePaths.getMetaReplicaIdFromZNode(metaReplicaZNode); 158 String path = ZNodePaths.joinZNode(znodePaths.baseZNode, metaReplicaZNode); 159 if (replicaId == DEFAULT_REPLICA_ID) { 160 addListener(getAndConvert(path, ZKConnectionRegistry::getMetaProto), (proto, error) -> { 161 if (error != null) { 162 future.completeExceptionally(error); 163 return; 164 } 165 if (proto == null) { 166 future.completeExceptionally(new IOException("Meta znode is null")); 167 return; 168 } 169 Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto); 170 if (stateAndServerName.getFirst() != RegionState.State.OPEN) { 171 LOG.warn("Meta region is in state " + stateAndServerName.getFirst()); 172 } 173 locs.put(replicaId, new HRegionLocation( 174 getRegionInfoForDefaultReplica(FIRST_META_REGIONINFO), stateAndServerName.getSecond())); 175 tryComplete(remaining, locs.values(), future); 176 }); 177 } else { 178 addListener(getAndConvert(path, ZKConnectionRegistry::getMetaProto), (proto, error) -> { 179 if (future.isDone()) { 180 return; 181 } 182 if (error != null) { 183 LOG.warn("Failed to fetch " + path, error); 184 locs.put(replicaId, null); 185 } else if (proto == null) { 186 LOG.warn("Meta znode for replica " + replicaId + " is null"); 187 locs.put(replicaId, null); 188 } else { 189 Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto); 190 if (stateAndServerName.getFirst() != RegionState.State.OPEN) { 191 LOG.warn("Meta region for replica " + replicaId + " is in state " 192 + stateAndServerName.getFirst()); 193 locs.put(replicaId, null); 194 } else { 195 locs.put(replicaId, 196 new HRegionLocation(getRegionInfoForReplica(FIRST_META_REGIONINFO, replicaId), 197 stateAndServerName.getSecond())); 198 } 199 } 200 tryComplete(remaining, locs.values(), future); 201 }); 202 } 203 } 204 } 205 206 @Override 207 public CompletableFuture<RegionLocations> getMetaRegionLocations() { 208 return tracedFuture(() -> { 209 CompletableFuture<RegionLocations> future = new CompletableFuture<>(); 210 addListener( 211 zk.list(znodePaths.baseZNode).thenApply(children -> children.stream() 212 .filter(c -> this.znodePaths.isMetaZNodePrefix(c)).collect(Collectors.toList())), 213 (metaReplicaZNodes, error) -> { 214 if (error != null) { 215 future.completeExceptionally(error); 216 return; 217 } 218 getMetaRegionLocation(future, metaReplicaZNodes); 219 }); 220 return future; 221 }, "ZKConnectionRegistry.getMetaRegionLocations"); 222 } 223 224 private static ZooKeeperProtos.Master getMasterProto(byte[] data) throws IOException { 225 if (data == null || data.length == 0) { 226 return null; 227 } 228 data = removeMetaData(data); 229 int prefixLen = lengthOfPBMagic(); 230 return ZooKeeperProtos.Master.parser().parseFrom(data, prefixLen, data.length - prefixLen); 231 } 232 233 @Override 234 public CompletableFuture<ServerName> getActiveMaster() { 235 return tracedFuture( 236 () -> getAndConvert(znodePaths.masterAddressZNode, ZKConnectionRegistry::getMasterProto) 237 .thenApply(proto -> { 238 if (proto == null) { 239 return null; 240 } 241 HBaseProtos.ServerName snProto = proto.getMaster(); 242 return ServerName.valueOf(snProto.getHostName(), snProto.getPort(), 243 snProto.getStartCode()); 244 }), 245 "ZKConnectionRegistry.getActiveMaster"); 246 } 247 248 @Override 249 public String getConnectionString() { 250 final String serverList = zk.getConnectString(); 251 final String baseZNode = znodePaths.baseZNode; 252 return serverList + ":" + baseZNode; 253 } 254 255 @Override 256 public void close() { 257 zk.close(); 258 } 259}