001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.RegionInfo.DEFAULT_REPLICA_ID; 021import static org.apache.hadoop.hbase.client.RegionInfoBuilder.FIRST_META_REGIONINFO; 022import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForDefaultReplica; 023import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForReplica; 024import static org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.lengthOfPBMagic; 025import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 026import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData; 027 028import java.io.IOException; 029import java.util.Collection; 030import java.util.List; 031import java.util.Map; 032import java.util.TreeMap; 033import java.util.concurrent.CompletableFuture; 034import java.util.stream.Collectors; 035import org.apache.commons.lang3.mutable.MutableInt; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.hbase.ClusterId; 038import org.apache.hadoop.hbase.HRegionLocation; 039import org.apache.hadoop.hbase.RegionLocations; 040import org.apache.hadoop.hbase.ServerName; 041import org.apache.hadoop.hbase.exceptions.DeserializationException; 042import org.apache.hadoop.hbase.master.RegionState; 043import org.apache.hadoop.hbase.util.Pair; 044import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient; 045import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 046import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos; 048import org.apache.yetus.audience.InterfaceAudience; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 053import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos; 054 055/** 056 * Zookeeper based registry implementation. 057 */ 058@InterfaceAudience.Private 059class ZKConnectionRegistry implements ConnectionRegistry { 060 061 private static final Logger LOG = LoggerFactory.getLogger(ZKConnectionRegistry.class); 062 063 private final ReadOnlyZKClient zk; 064 065 private final ZNodePaths znodePaths; 066 067 ZKConnectionRegistry(Configuration conf) { 068 this.znodePaths = new ZNodePaths(conf); 069 this.zk = new ReadOnlyZKClient(conf); 070 } 071 072 private interface Converter<T> { 073 T convert(byte[] data) throws Exception; 074 } 075 076 private <T> CompletableFuture<T> getAndConvert(String path, Converter<T> converter) { 077 CompletableFuture<T> future = new CompletableFuture<>(); 078 addListener(zk.get(path), (data, error) -> { 079 if (error != null) { 080 future.completeExceptionally(error); 081 return; 082 } 083 try { 084 future.complete(converter.convert(data)); 085 } catch (Exception e) { 086 future.completeExceptionally(e); 087 } 088 }); 089 return future; 090 } 091 092 private static String getClusterId(byte[] data) throws DeserializationException { 093 if (data == null || data.length == 0) { 094 return null; 095 } 096 data = removeMetaData(data); 097 return ClusterId.parseFrom(data).toString(); 098 } 099 100 @Override 101 public CompletableFuture<String> getClusterId() { 102 return getAndConvert(znodePaths.clusterIdZNode, ZKConnectionRegistry::getClusterId); 103 } 104 105 ReadOnlyZKClient getZKClient() { 106 return zk; 107 } 108 109 private static ZooKeeperProtos.MetaRegionServer getMetaProto(byte[] data) throws IOException { 110 if (data == null || data.length == 0) { 111 return null; 112 } 113 data = removeMetaData(data); 114 int prefixLen = lengthOfPBMagic(); 115 return ZooKeeperProtos.MetaRegionServer.parser().parseFrom(data, prefixLen, 116 data.length - prefixLen); 117 } 118 119 private static void tryComplete(MutableInt remaining, Collection<HRegionLocation> locs, 120 CompletableFuture<RegionLocations> future) { 121 remaining.decrement(); 122 if (remaining.intValue() > 0) { 123 return; 124 } 125 future.complete(new RegionLocations(locs)); 126 } 127 128 private Pair<RegionState.State, ServerName> getStateAndServerName( 129 ZooKeeperProtos.MetaRegionServer proto) { 130 RegionState.State state; 131 if (proto.hasState()) { 132 state = RegionState.State.convert(proto.getState()); 133 } else { 134 state = RegionState.State.OPEN; 135 } 136 HBaseProtos.ServerName snProto = proto.getServer(); 137 return Pair.newPair(state, 138 ServerName.valueOf(snProto.getHostName(), snProto.getPort(), snProto.getStartCode())); 139 } 140 141 private void getMetaRegionLocation(CompletableFuture<RegionLocations> future, 142 List<String> metaReplicaZNodes) { 143 if (metaReplicaZNodes.isEmpty()) { 144 future.completeExceptionally(new IOException("No meta znode available")); 145 } 146 // Note, the list of metaReplicaZNodes may be discontiguous regards replicaId; i.e. we may have 147 // a znode for the default -- replicaId=0 -- and perhaps replicaId '2' but be could be missing 148 // znode for replicaId '1'. This is a transient condition. Because of this we are careful 149 // accumulating locations. We use a Map so retries overwrite rather than aggregate and the 150 // Map sorts just to be kind to further processing. The Map will retain the discontinuity on 151 // replicaIds but on completion (of the future), the Map values are passed to the 152 // RegionLocations constructor which knows how to deal with discontinuities. 153 final Map<Integer, HRegionLocation> locs = new TreeMap<>(); 154 MutableInt remaining = new MutableInt(metaReplicaZNodes.size()); 155 for (String metaReplicaZNode : metaReplicaZNodes) { 156 int replicaId = znodePaths.getMetaReplicaIdFromZNode(metaReplicaZNode); 157 String path = ZNodePaths.joinZNode(znodePaths.baseZNode, metaReplicaZNode); 158 if (replicaId == DEFAULT_REPLICA_ID) { 159 addListener(getAndConvert(path, ZKConnectionRegistry::getMetaProto), (proto, error) -> { 160 if (error != null) { 161 future.completeExceptionally(error); 162 return; 163 } 164 if (proto == null) { 165 future.completeExceptionally(new IOException("Meta znode is null")); 166 return; 167 } 168 Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto); 169 if (stateAndServerName.getFirst() != RegionState.State.OPEN) { 170 LOG.warn("Meta region is in state " + stateAndServerName.getFirst()); 171 } 172 locs.put(replicaId, new HRegionLocation( 173 getRegionInfoForDefaultReplica(FIRST_META_REGIONINFO), stateAndServerName.getSecond())); 174 tryComplete(remaining, locs.values(), future); 175 }); 176 } else { 177 addListener(getAndConvert(path, ZKConnectionRegistry::getMetaProto), (proto, error) -> { 178 if (future.isDone()) { 179 return; 180 } 181 if (error != null) { 182 LOG.warn("Failed to fetch " + path, error); 183 locs.put(replicaId, null); 184 } else if (proto == null) { 185 LOG.warn("Meta znode for replica " + replicaId + " is null"); 186 locs.put(replicaId, null); 187 } else { 188 Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto); 189 if (stateAndServerName.getFirst() != RegionState.State.OPEN) { 190 LOG.warn("Meta region for replica " + replicaId + " is in state " + 191 stateAndServerName.getFirst()); 192 locs.put(replicaId, null); 193 } else { 194 locs.put(replicaId, new HRegionLocation( 195 getRegionInfoForReplica(FIRST_META_REGIONINFO, replicaId), 196 stateAndServerName.getSecond())); 197 } 198 } 199 tryComplete(remaining, locs.values(), future); 200 }); 201 } 202 } 203 } 204 205 @Override 206 public CompletableFuture<RegionLocations> getMetaRegionLocations() { 207 CompletableFuture<RegionLocations> future = new CompletableFuture<>(); 208 addListener( 209 zk.list(znodePaths.baseZNode) 210 .thenApply(children -> children.stream() 211 .filter(c -> this.znodePaths.isMetaZNodePrefix(c)).collect(Collectors.toList())), 212 (metaReplicaZNodes, error) -> { 213 if (error != null) { 214 future.completeExceptionally(error); 215 return; 216 } 217 getMetaRegionLocation(future, metaReplicaZNodes); 218 }); 219 return future; 220 } 221 222 private static ZooKeeperProtos.Master getMasterProto(byte[] data) throws IOException { 223 if (data == null || data.length == 0) { 224 return null; 225 } 226 data = removeMetaData(data); 227 int prefixLen = lengthOfPBMagic(); 228 return ZooKeeperProtos.Master.parser().parseFrom(data, prefixLen, data.length - prefixLen); 229 } 230 231 @Override 232 public CompletableFuture<ServerName> getActiveMaster() { 233 return getAndConvert(znodePaths.masterAddressZNode, ZKConnectionRegistry::getMasterProto) 234 .thenApply(proto -> { 235 if (proto == null) { 236 return null; 237 } 238 HBaseProtos.ServerName snProto = proto.getMaster(); 239 return ServerName.valueOf(snProto.getHostName(), snProto.getPort(), 240 snProto.getStartCode()); 241 }); 242 } 243 244 @Override 245 public void close() { 246 zk.close(); 247 } 248}