001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.RegionInfo.DEFAULT_REPLICA_ID; 021import static org.apache.hadoop.hbase.client.RegionInfoBuilder.FIRST_META_REGIONINFO; 022import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForDefaultReplica; 023import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForReplica; 024import static org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.lengthOfPBMagic; 025import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 026import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData; 027import java.io.IOException; 028import java.util.List; 029import java.util.concurrent.CompletableFuture; 030import java.util.stream.Collectors; 031import org.apache.commons.lang3.mutable.MutableInt; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.ClusterId; 034import org.apache.hadoop.hbase.HRegionLocation; 035import org.apache.hadoop.hbase.RegionLocations; 036import org.apache.hadoop.hbase.ServerName; 037import org.apache.hadoop.hbase.exceptions.DeserializationException; 038import org.apache.hadoop.hbase.master.RegionState; 039import org.apache.hadoop.hbase.util.Pair; 040import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient; 041import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 046import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos; 048 049/** 050 * Zookeeper based registry implementation. 051 */ 052@InterfaceAudience.Private 053class ZKAsyncRegistry implements AsyncRegistry { 054 055 private static final Logger LOG = LoggerFactory.getLogger(ZKAsyncRegistry.class); 056 057 private final ReadOnlyZKClient zk; 058 059 private final ZNodePaths znodePaths; 060 061 ZKAsyncRegistry(Configuration conf) { 062 this.znodePaths = new ZNodePaths(conf); 063 this.zk = new ReadOnlyZKClient(conf); 064 } 065 066 private interface Converter<T> { 067 T convert(byte[] data) throws Exception; 068 } 069 070 private <T> CompletableFuture<T> getAndConvert(String path, Converter<T> converter) { 071 CompletableFuture<T> future = new CompletableFuture<>(); 072 addListener(zk.get(path), (data, error) -> { 073 if (error != null) { 074 future.completeExceptionally(error); 075 return; 076 } 077 try { 078 future.complete(converter.convert(data)); 079 } catch (Exception e) { 080 future.completeExceptionally(e); 081 } 082 }); 083 return future; 084 } 085 086 private static String getClusterId(byte[] data) throws DeserializationException { 087 if (data == null || data.length == 0) { 088 return null; 089 } 090 data = removeMetaData(data); 091 return ClusterId.parseFrom(data).toString(); 092 } 093 094 @Override 095 public CompletableFuture<String> getClusterId() { 096 return getAndConvert(znodePaths.clusterIdZNode, ZKAsyncRegistry::getClusterId); 097 } 098 099 @VisibleForTesting 100 ReadOnlyZKClient getZKClient() { 101 return zk; 102 } 103 104 private static ZooKeeperProtos.MetaRegionServer getMetaProto(byte[] data) throws IOException { 105 if (data == null || data.length == 0) { 106 return null; 107 } 108 data = removeMetaData(data); 109 int prefixLen = lengthOfPBMagic(); 110 return ZooKeeperProtos.MetaRegionServer.parser().parseFrom(data, prefixLen, 111 data.length - prefixLen); 112 } 113 114 private static void tryComplete(MutableInt remaining, HRegionLocation[] locs, 115 CompletableFuture<RegionLocations> future) { 116 remaining.decrement(); 117 if (remaining.intValue() > 0) { 118 return; 119 } 120 future.complete(new RegionLocations(locs)); 121 } 122 123 private Pair<RegionState.State, ServerName> getStateAndServerName( 124 ZooKeeperProtos.MetaRegionServer proto) { 125 RegionState.State state; 126 if (proto.hasState()) { 127 state = RegionState.State.convert(proto.getState()); 128 } else { 129 state = RegionState.State.OPEN; 130 } 131 HBaseProtos.ServerName snProto = proto.getServer(); 132 return Pair.newPair(state, 133 ServerName.valueOf(snProto.getHostName(), snProto.getPort(), snProto.getStartCode())); 134 } 135 136 private void getMetaRegionLocation(CompletableFuture<RegionLocations> future, 137 List<String> metaReplicaZNodes) { 138 if (metaReplicaZNodes.isEmpty()) { 139 future.completeExceptionally(new IOException("No meta znode available")); 140 } 141 HRegionLocation[] locs = new HRegionLocation[metaReplicaZNodes.size()]; 142 MutableInt remaining = new MutableInt(locs.length); 143 for (String metaReplicaZNode : metaReplicaZNodes) { 144 int replicaId = znodePaths.getMetaReplicaIdFromZnode(metaReplicaZNode); 145 String path = ZNodePaths.joinZNode(znodePaths.baseZNode, metaReplicaZNode); 146 if (replicaId == DEFAULT_REPLICA_ID) { 147 addListener(getAndConvert(path, ZKAsyncRegistry::getMetaProto), (proto, error) -> { 148 if (error != null) { 149 future.completeExceptionally(error); 150 return; 151 } 152 if (proto == null) { 153 future.completeExceptionally(new IOException("Meta znode is null")); 154 return; 155 } 156 Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto); 157 if (stateAndServerName.getFirst() != RegionState.State.OPEN) { 158 LOG.warn("Meta region is in state " + stateAndServerName.getFirst()); 159 } 160 locs[DEFAULT_REPLICA_ID] = new HRegionLocation( 161 getRegionInfoForDefaultReplica(FIRST_META_REGIONINFO), stateAndServerName.getSecond()); 162 tryComplete(remaining, locs, future); 163 }); 164 } else { 165 addListener(getAndConvert(path, ZKAsyncRegistry::getMetaProto), (proto, error) -> { 166 if (future.isDone()) { 167 return; 168 } 169 if (error != null) { 170 LOG.warn("Failed to fetch " + path, error); 171 locs[replicaId] = null; 172 } else if (proto == null) { 173 LOG.warn("Meta znode for replica " + replicaId + " is null"); 174 locs[replicaId] = null; 175 } else { 176 Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto); 177 if (stateAndServerName.getFirst() != RegionState.State.OPEN) { 178 LOG.warn("Meta region for replica " + replicaId + " is in state " + 179 stateAndServerName.getFirst()); 180 locs[replicaId] = null; 181 } else { 182 locs[replicaId] = 183 new HRegionLocation(getRegionInfoForReplica(FIRST_META_REGIONINFO, replicaId), 184 stateAndServerName.getSecond()); 185 } 186 } 187 tryComplete(remaining, locs, future); 188 }); 189 } 190 } 191 } 192 193 @Override 194 public CompletableFuture<RegionLocations> getMetaRegionLocation() { 195 CompletableFuture<RegionLocations> future = new CompletableFuture<>(); 196 addListener( 197 zk.list(znodePaths.baseZNode) 198 .thenApply(children -> children.stream() 199 .filter(c -> c.startsWith(znodePaths.metaZNodePrefix)).collect(Collectors.toList())), 200 (metaReplicaZNodes, error) -> { 201 if (error != null) { 202 future.completeExceptionally(error); 203 return; 204 } 205 getMetaRegionLocation(future, metaReplicaZNodes); 206 }); 207 return future; 208 } 209 210 private static ZooKeeperProtos.Master getMasterProto(byte[] data) throws IOException { 211 if (data == null || data.length == 0) { 212 return null; 213 } 214 data = removeMetaData(data); 215 int prefixLen = lengthOfPBMagic(); 216 return ZooKeeperProtos.Master.parser().parseFrom(data, prefixLen, data.length - prefixLen); 217 } 218 219 @Override 220 public CompletableFuture<ServerName> getMasterAddress() { 221 return getAndConvert(znodePaths.masterAddressZNode, ZKAsyncRegistry::getMasterProto) 222 .thenApply(proto -> { 223 if (proto == null) { 224 return null; 225 } 226 HBaseProtos.ServerName snProto = proto.getMaster(); 227 return ServerName.valueOf(snProto.getHostName(), snProto.getPort(), 228 snProto.getStartCode()); 229 }); 230 } 231 232 @Override 233 public void close() { 234 zk.close(); 235 } 236}