001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.RegionInfo.DEFAULT_REPLICA_ID; 021import static org.apache.hadoop.hbase.client.RegionInfoBuilder.FIRST_META_REGIONINFO; 022import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForDefaultReplica; 023import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForReplica; 024import static org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.lengthOfPBMagic; 025import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 026import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData; 027 028import java.io.IOException; 029import java.util.concurrent.CompletableFuture; 030import org.apache.commons.lang3.mutable.MutableInt; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.ClusterId; 033import org.apache.hadoop.hbase.HRegionLocation; 034import org.apache.hadoop.hbase.RegionLocations; 035import org.apache.hadoop.hbase.ServerName; 036import org.apache.hadoop.hbase.exceptions.DeserializationException; 037import org.apache.hadoop.hbase.master.RegionState; 038import org.apache.hadoop.hbase.util.Pair; 039import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient; 040import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 046 047import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 048import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos; 049 050/** 051 * Fetch the registry data from zookeeper. 052 */ 053@InterfaceAudience.Private 054class ZKAsyncRegistry implements AsyncRegistry { 055 056 private static final Logger LOG = LoggerFactory.getLogger(ZKAsyncRegistry.class); 057 058 private final ReadOnlyZKClient zk; 059 060 private final ZNodePaths znodePaths; 061 062 ZKAsyncRegistry(Configuration conf) { 063 this.znodePaths = new ZNodePaths(conf); 064 this.zk = new ReadOnlyZKClient(conf); 065 } 066 067 private interface Converter<T> { 068 T convert(byte[] data) throws Exception; 069 } 070 071 private <T> CompletableFuture<T> getAndConvert(String path, Converter<T> converter) { 072 CompletableFuture<T> future = new CompletableFuture<>(); 073 addListener(zk.get(path), (data, error) -> { 074 if (error != null) { 075 future.completeExceptionally(error); 076 return; 077 } 078 try { 079 future.complete(converter.convert(data)); 080 } catch (Exception e) { 081 future.completeExceptionally(e); 082 } 083 }); 084 return future; 085 } 086 087 private static String getClusterId(byte[] data) throws DeserializationException { 088 if (data == null || data.length == 0) { 089 return null; 090 } 091 data = removeMetaData(data); 092 return ClusterId.parseFrom(data).toString(); 093 } 094 095 @Override 096 public CompletableFuture<String> getClusterId() { 097 return getAndConvert(znodePaths.clusterIdZNode, ZKAsyncRegistry::getClusterId); 098 } 099 100 @VisibleForTesting 101 ReadOnlyZKClient getZKClient() { 102 return zk; 103 } 104 105 private static ZooKeeperProtos.MetaRegionServer getMetaProto(byte[] data) throws IOException { 106 if (data == null || data.length == 0) { 107 return null; 108 } 109 data = removeMetaData(data); 110 int prefixLen = lengthOfPBMagic(); 111 return ZooKeeperProtos.MetaRegionServer.parser().parseFrom(data, prefixLen, 112 data.length - prefixLen); 113 } 114 115 private static void tryComplete(MutableInt remaining, HRegionLocation[] locs, 116 CompletableFuture<RegionLocations> future) { 117 remaining.decrement(); 118 if (remaining.intValue() > 0) { 119 return; 120 } 121 future.complete(new RegionLocations(locs)); 122 } 123 124 private Pair<RegionState.State, ServerName> getStateAndServerName( 125 ZooKeeperProtos.MetaRegionServer proto) { 126 RegionState.State state; 127 if (proto.hasState()) { 128 state = RegionState.State.convert(proto.getState()); 129 } else { 130 state = RegionState.State.OPEN; 131 } 132 HBaseProtos.ServerName snProto = proto.getServer(); 133 return Pair.newPair(state, 134 ServerName.valueOf(snProto.getHostName(), snProto.getPort(), snProto.getStartCode())); 135 } 136 137 @Override 138 public CompletableFuture<RegionLocations> getMetaRegionLocation() { 139 CompletableFuture<RegionLocations> future = new CompletableFuture<>(); 140 HRegionLocation[] locs = new HRegionLocation[znodePaths.metaReplicaZNodes.size()]; 141 MutableInt remaining = new MutableInt(locs.length); 142 znodePaths.metaReplicaZNodes.forEach((replicaId, path) -> { 143 if (replicaId == DEFAULT_REPLICA_ID) { 144 addListener(getAndConvert(path, ZKAsyncRegistry::getMetaProto), (proto, error) -> { 145 if (error != null) { 146 future.completeExceptionally(error); 147 return; 148 } 149 if (proto == null) { 150 future.completeExceptionally(new IOException("Meta znode is null")); 151 return; 152 } 153 Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto); 154 if (stateAndServerName.getFirst() != RegionState.State.OPEN) { 155 LOG.warn("Meta region is in state " + stateAndServerName.getFirst()); 156 } 157 locs[DEFAULT_REPLICA_ID] = new HRegionLocation( 158 getRegionInfoForDefaultReplica(FIRST_META_REGIONINFO), stateAndServerName.getSecond()); 159 tryComplete(remaining, locs, future); 160 }); 161 } else { 162 addListener(getAndConvert(path, ZKAsyncRegistry::getMetaProto), (proto, error) -> { 163 if (future.isDone()) { 164 return; 165 } 166 if (error != null) { 167 LOG.warn("Failed to fetch " + path, error); 168 locs[replicaId] = null; 169 } else if (proto == null) { 170 LOG.warn("Meta znode for replica " + replicaId + " is null"); 171 locs[replicaId] = null; 172 } else { 173 Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto); 174 if (stateAndServerName.getFirst() != RegionState.State.OPEN) { 175 LOG.warn("Meta region for replica " + replicaId + " is in state " + 176 stateAndServerName.getFirst()); 177 locs[replicaId] = null; 178 } else { 179 locs[replicaId] = 180 new HRegionLocation(getRegionInfoForReplica(FIRST_META_REGIONINFO, replicaId), 181 stateAndServerName.getSecond()); 182 } 183 } 184 tryComplete(remaining, locs, future); 185 }); 186 } 187 }); 188 return future; 189 } 190 191 @Override 192 public CompletableFuture<Integer> getCurrentNrHRS() { 193 return zk.exists(znodePaths.rsZNode).thenApply(s -> s != null ? s.getNumChildren() : 0); 194 } 195 196 private static ZooKeeperProtos.Master getMasterProto(byte[] data) throws IOException { 197 if (data == null || data.length == 0) { 198 return null; 199 } 200 data = removeMetaData(data); 201 int prefixLen = lengthOfPBMagic(); 202 return ZooKeeperProtos.Master.parser().parseFrom(data, prefixLen, data.length - prefixLen); 203 } 204 205 @Override 206 public CompletableFuture<ServerName> getMasterAddress() { 207 return getAndConvert(znodePaths.masterAddressZNode, ZKAsyncRegistry::getMasterProto) 208 .thenApply(proto -> { 209 if (proto == null) { 210 return null; 211 } 212 HBaseProtos.ServerName snProto = proto.getMaster(); 213 return ServerName.valueOf(snProto.getHostName(), snProto.getPort(), 214 snProto.getStartCode()); 215 }); 216 } 217 218 @Override 219 public CompletableFuture<Integer> getMasterInfoPort() { 220 return getAndConvert(znodePaths.masterAddressZNode, ZKAsyncRegistry::getMasterProto) 221 .thenApply(proto -> proto != null ? proto.getInfoPort() : 0); 222 } 223 224 @Override 225 public void close() { 226 zk.close(); 227 } 228}