001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.RegionInfo.DEFAULT_REPLICA_ID; 021import static org.apache.hadoop.hbase.client.RegionInfoBuilder.FIRST_META_REGIONINFO; 022import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForDefaultReplica; 023import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForReplica; 024import static org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.lengthOfPBMagic; 025import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture; 026import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 027import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData; 028 029import java.io.IOException; 030import java.util.List; 031import java.util.concurrent.CompletableFuture; 032import java.util.stream.Collectors; 033import org.apache.commons.lang3.mutable.MutableInt; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.ClusterId; 036import org.apache.hadoop.hbase.HRegionLocation; 037import org.apache.hadoop.hbase.RegionLocations; 038import org.apache.hadoop.hbase.ServerName; 039import org.apache.hadoop.hbase.exceptions.DeserializationException; 040import org.apache.hadoop.hbase.master.RegionState; 041import org.apache.hadoop.hbase.util.Pair; 042import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient; 043import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 044import org.apache.yetus.audience.InterfaceAudience; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 049import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos; 050 051/** 052 * Zookeeper based registry implementation. 053 */ 054@InterfaceAudience.Private 055class ZKConnectionRegistry implements ConnectionRegistry { 056 057 private static final Logger LOG = LoggerFactory.getLogger(ZKConnectionRegistry.class); 058 059 private final ReadOnlyZKClient zk; 060 061 private final ZNodePaths znodePaths; 062 063 ZKConnectionRegistry(Configuration conf) { 064 this.znodePaths = new ZNodePaths(conf); 065 this.zk = new ReadOnlyZKClient(conf); 066 } 067 068 private interface Converter<T> { 069 T convert(byte[] data) throws Exception; 070 } 071 072 private <T> CompletableFuture<T> getAndConvert(String path, Converter<T> converter) { 073 CompletableFuture<T> future = new CompletableFuture<>(); 074 addListener(zk.get(path), (data, error) -> { 075 if (error != null) { 076 future.completeExceptionally(error); 077 return; 078 } 079 try { 080 future.complete(converter.convert(data)); 081 } catch (Exception e) { 082 future.completeExceptionally(e); 083 } 084 }); 085 return future; 086 } 087 088 private static String getClusterId(byte[] data) throws DeserializationException { 089 if (data == null || data.length == 0) { 090 return null; 091 } 092 data = removeMetaData(data); 093 return ClusterId.parseFrom(data).toString(); 094 } 095 096 @Override 097 public CompletableFuture<String> getClusterId() { 098 return tracedFuture( 099 () -> getAndConvert(znodePaths.clusterIdZNode, ZKConnectionRegistry::getClusterId), 100 "ZKConnectionRegistry.getClusterId"); 101 } 102 103 ReadOnlyZKClient getZKClient() { 104 return zk; 105 } 106 107 private static ZooKeeperProtos.MetaRegionServer getMetaProto(byte[] data) throws IOException { 108 if (data == null || data.length == 0) { 109 return null; 110 } 111 data = removeMetaData(data); 112 int prefixLen = lengthOfPBMagic(); 113 return ZooKeeperProtos.MetaRegionServer.parser().parseFrom(data, prefixLen, 114 data.length - prefixLen); 115 } 116 117 private static void tryComplete(MutableInt remaining, HRegionLocation[] locs, 118 CompletableFuture<RegionLocations> future) { 119 remaining.decrement(); 120 if (remaining.intValue() > 0) { 121 return; 122 } 123 future.complete(new RegionLocations(locs)); 124 } 125 126 private Pair<RegionState.State, ServerName> 127 getStateAndServerName(ZooKeeperProtos.MetaRegionServer proto) { 128 RegionState.State state; 129 if (proto.hasState()) { 130 state = RegionState.State.convert(proto.getState()); 131 } else { 132 state = RegionState.State.OPEN; 133 } 134 HBaseProtos.ServerName snProto = proto.getServer(); 135 return Pair.newPair(state, 136 ServerName.valueOf(snProto.getHostName(), snProto.getPort(), snProto.getStartCode())); 137 } 138 139 private void getMetaRegionLocation(CompletableFuture<RegionLocations> future, 140 List<String> metaReplicaZNodes) { 141 if (metaReplicaZNodes.isEmpty()) { 142 future.completeExceptionally(new IOException("No meta znode available")); 143 } 144 HRegionLocation[] locs = new HRegionLocation[metaReplicaZNodes.size()]; 145 MutableInt remaining = new MutableInt(locs.length); 146 for (String metaReplicaZNode : metaReplicaZNodes) { 147 int replicaId = znodePaths.getMetaReplicaIdFromZNode(metaReplicaZNode); 148 String path = ZNodePaths.joinZNode(znodePaths.baseZNode, metaReplicaZNode); 149 if (replicaId == DEFAULT_REPLICA_ID) { 150 addListener(getAndConvert(path, ZKConnectionRegistry::getMetaProto), (proto, error) -> { 151 if (error != null) { 152 future.completeExceptionally(error); 153 return; 154 } 155 if (proto == null) { 156 future.completeExceptionally(new IOException("Meta znode is null")); 157 return; 158 } 159 Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto); 160 if (stateAndServerName.getFirst() != RegionState.State.OPEN) { 161 LOG.warn("Meta region is in state " + stateAndServerName.getFirst()); 162 } 163 locs[DEFAULT_REPLICA_ID] = new HRegionLocation( 164 getRegionInfoForDefaultReplica(FIRST_META_REGIONINFO), stateAndServerName.getSecond()); 165 tryComplete(remaining, locs, future); 166 }); 167 } else { 168 addListener(getAndConvert(path, ZKConnectionRegistry::getMetaProto), (proto, error) -> { 169 if (future.isDone()) { 170 return; 171 } 172 if (error != null) { 173 LOG.warn("Failed to fetch " + path, error); 174 locs[replicaId] = null; 175 } else if (proto == null) { 176 LOG.warn("Meta znode for replica " + replicaId + " is null"); 177 locs[replicaId] = null; 178 } else { 179 Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto); 180 if (stateAndServerName.getFirst() != RegionState.State.OPEN) { 181 LOG.warn("Meta region for replica " + replicaId + " is in state " 182 + stateAndServerName.getFirst()); 183 locs[replicaId] = null; 184 } else { 185 locs[replicaId] = 186 new HRegionLocation(getRegionInfoForReplica(FIRST_META_REGIONINFO, replicaId), 187 stateAndServerName.getSecond()); 188 } 189 } 190 tryComplete(remaining, locs, future); 191 }); 192 } 193 } 194 } 195 196 @Override 197 public CompletableFuture<RegionLocations> getMetaRegionLocations() { 198 return tracedFuture(() -> { 199 CompletableFuture<RegionLocations> future = new CompletableFuture<>(); 200 addListener( 201 zk.list(znodePaths.baseZNode).thenApply(children -> children.stream() 202 .filter(c -> this.znodePaths.isMetaZNodePrefix(c)).collect(Collectors.toList())), 203 (metaReplicaZNodes, error) -> { 204 if (error != null) { 205 future.completeExceptionally(error); 206 return; 207 } 208 getMetaRegionLocation(future, metaReplicaZNodes); 209 }); 210 return future; 211 }, "ZKConnectionRegistry.getMetaRegionLocations"); 212 } 213 214 private static ZooKeeperProtos.Master getMasterProto(byte[] data) throws IOException { 215 if (data == null || data.length == 0) { 216 return null; 217 } 218 data = removeMetaData(data); 219 int prefixLen = lengthOfPBMagic(); 220 return ZooKeeperProtos.Master.parser().parseFrom(data, prefixLen, data.length - prefixLen); 221 } 222 223 @Override 224 public CompletableFuture<ServerName> getActiveMaster() { 225 return tracedFuture( 226 () -> getAndConvert(znodePaths.masterAddressZNode, ZKConnectionRegistry::getMasterProto) 227 .thenApply(proto -> { 228 if (proto == null) { 229 return null; 230 } 231 HBaseProtos.ServerName snProto = proto.getMaster(); 232 return ServerName.valueOf(snProto.getHostName(), snProto.getPort(), 233 snProto.getStartCode()); 234 }), 235 "ZKConnectionRegistry.getActiveMaster"); 236 } 237 238 @Override 239 public String getConnectionString() { 240 final String serverList = zk.getConnectString(); 241 final String baseZNode = znodePaths.baseZNode; 242 return serverList + ":" + baseZNode; 243 } 244 245 @Override 246 public void close() { 247 zk.close(); 248 } 249}