001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.RegionInfo.DEFAULT_REPLICA_ID;
021import static org.apache.hadoop.hbase.client.RegionInfoBuilder.FIRST_META_REGIONINFO;
022import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForDefaultReplica;
023import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForReplica;
024import static org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.lengthOfPBMagic;
025import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
026import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData;
027
028import java.io.IOException;
029import java.util.concurrent.CompletableFuture;
030import org.apache.commons.lang3.mutable.MutableInt;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.ClusterId;
033import org.apache.hadoop.hbase.HRegionLocation;
034import org.apache.hadoop.hbase.RegionLocations;
035import org.apache.hadoop.hbase.ServerName;
036import org.apache.hadoop.hbase.exceptions.DeserializationException;
037import org.apache.hadoop.hbase.master.RegionState;
038import org.apache.hadoop.hbase.util.Pair;
039import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient;
040import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
046
047import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
048import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
049
050/**
051 * Fetch the registry data from zookeeper.
052 */
053@InterfaceAudience.Private
054class ZKAsyncRegistry implements AsyncRegistry {
055
056  private static final Logger LOG = LoggerFactory.getLogger(ZKAsyncRegistry.class);
057
058  private final ReadOnlyZKClient zk;
059
060  private final ZNodePaths znodePaths;
061
062  ZKAsyncRegistry(Configuration conf) {
063    this.znodePaths = new ZNodePaths(conf);
064    this.zk = new ReadOnlyZKClient(conf);
065  }
066
067  private interface Converter<T> {
068    T convert(byte[] data) throws Exception;
069  }
070
071  private <T> CompletableFuture<T> getAndConvert(String path, Converter<T> converter) {
072    CompletableFuture<T> future = new CompletableFuture<>();
073    addListener(zk.get(path), (data, error) -> {
074      if (error != null) {
075        future.completeExceptionally(error);
076        return;
077      }
078      try {
079        future.complete(converter.convert(data));
080      } catch (Exception e) {
081        future.completeExceptionally(e);
082      }
083    });
084    return future;
085  }
086
087  private static String getClusterId(byte[] data) throws DeserializationException {
088    if (data == null || data.length == 0) {
089      return null;
090    }
091    data = removeMetaData(data);
092    return ClusterId.parseFrom(data).toString();
093  }
094
095  @Override
096  public CompletableFuture<String> getClusterId() {
097    return getAndConvert(znodePaths.clusterIdZNode, ZKAsyncRegistry::getClusterId);
098  }
099
100  @VisibleForTesting
101  ReadOnlyZKClient getZKClient() {
102    return zk;
103  }
104
105  private static ZooKeeperProtos.MetaRegionServer getMetaProto(byte[] data) throws IOException {
106    if (data == null || data.length == 0) {
107      return null;
108    }
109    data = removeMetaData(data);
110    int prefixLen = lengthOfPBMagic();
111    return ZooKeeperProtos.MetaRegionServer.parser().parseFrom(data, prefixLen,
112      data.length - prefixLen);
113  }
114
115  private static void tryComplete(MutableInt remaining, HRegionLocation[] locs,
116      CompletableFuture<RegionLocations> future) {
117    remaining.decrement();
118    if (remaining.intValue() > 0) {
119      return;
120    }
121    future.complete(new RegionLocations(locs));
122  }
123
124  private Pair<RegionState.State, ServerName> getStateAndServerName(
125      ZooKeeperProtos.MetaRegionServer proto) {
126    RegionState.State state;
127    if (proto.hasState()) {
128      state = RegionState.State.convert(proto.getState());
129    } else {
130      state = RegionState.State.OPEN;
131    }
132    HBaseProtos.ServerName snProto = proto.getServer();
133    return Pair.newPair(state,
134      ServerName.valueOf(snProto.getHostName(), snProto.getPort(), snProto.getStartCode()));
135  }
136
137  @Override
138  public CompletableFuture<RegionLocations> getMetaRegionLocation() {
139    CompletableFuture<RegionLocations> future = new CompletableFuture<>();
140    HRegionLocation[] locs = new HRegionLocation[znodePaths.metaReplicaZNodes.size()];
141    MutableInt remaining = new MutableInt(locs.length);
142    znodePaths.metaReplicaZNodes.forEach((replicaId, path) -> {
143      if (replicaId == DEFAULT_REPLICA_ID) {
144        addListener(getAndConvert(path, ZKAsyncRegistry::getMetaProto), (proto, error) -> {
145          if (error != null) {
146            future.completeExceptionally(error);
147            return;
148          }
149          if (proto == null) {
150            future.completeExceptionally(new IOException("Meta znode is null"));
151            return;
152          }
153          Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto);
154          if (stateAndServerName.getFirst() != RegionState.State.OPEN) {
155            LOG.warn("Meta region is in state " + stateAndServerName.getFirst());
156          }
157          locs[DEFAULT_REPLICA_ID] = new HRegionLocation(
158            getRegionInfoForDefaultReplica(FIRST_META_REGIONINFO), stateAndServerName.getSecond());
159          tryComplete(remaining, locs, future);
160        });
161      } else {
162        addListener(getAndConvert(path, ZKAsyncRegistry::getMetaProto), (proto, error) -> {
163          if (future.isDone()) {
164            return;
165          }
166          if (error != null) {
167            LOG.warn("Failed to fetch " + path, error);
168            locs[replicaId] = null;
169          } else if (proto == null) {
170            LOG.warn("Meta znode for replica " + replicaId + " is null");
171            locs[replicaId] = null;
172          } else {
173            Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto);
174            if (stateAndServerName.getFirst() != RegionState.State.OPEN) {
175              LOG.warn("Meta region for replica " + replicaId + " is in state " +
176                stateAndServerName.getFirst());
177              locs[replicaId] = null;
178            } else {
179              locs[replicaId] =
180                new HRegionLocation(getRegionInfoForReplica(FIRST_META_REGIONINFO, replicaId),
181                  stateAndServerName.getSecond());
182            }
183          }
184          tryComplete(remaining, locs, future);
185        });
186      }
187    });
188    return future;
189  }
190
191  @Override
192  public CompletableFuture<Integer> getCurrentNrHRS() {
193    return zk.exists(znodePaths.rsZNode).thenApply(s -> s != null ? s.getNumChildren() : 0);
194  }
195
196  private static ZooKeeperProtos.Master getMasterProto(byte[] data) throws IOException {
197    if (data == null || data.length == 0) {
198      return null;
199    }
200    data = removeMetaData(data);
201    int prefixLen = lengthOfPBMagic();
202    return ZooKeeperProtos.Master.parser().parseFrom(data, prefixLen, data.length - prefixLen);
203  }
204
205  @Override
206  public CompletableFuture<ServerName> getMasterAddress() {
207    return getAndConvert(znodePaths.masterAddressZNode, ZKAsyncRegistry::getMasterProto)
208        .thenApply(proto -> {
209          if (proto == null) {
210            return null;
211          }
212          HBaseProtos.ServerName snProto = proto.getMaster();
213          return ServerName.valueOf(snProto.getHostName(), snProto.getPort(),
214            snProto.getStartCode());
215        });
216  }
217
218  @Override
219  public CompletableFuture<Integer> getMasterInfoPort() {
220    return getAndConvert(znodePaths.masterAddressZNode, ZKAsyncRegistry::getMasterProto)
221        .thenApply(proto -> proto != null ? proto.getInfoPort() : 0);
222  }
223
224  @Override
225  public void close() {
226    zk.close();
227  }
228}