001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.RegionInfo.DEFAULT_REPLICA_ID;
021import static org.apache.hadoop.hbase.client.RegionInfoBuilder.FIRST_META_REGIONINFO;
022import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForDefaultReplica;
023import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForReplica;
024import static org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.lengthOfPBMagic;
025import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture;
026import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
027import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData;
028
029import java.io.IOException;
030import java.util.List;
031import java.util.concurrent.CompletableFuture;
032import java.util.stream.Collectors;
033import org.apache.commons.lang3.mutable.MutableInt;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.ClusterId;
036import org.apache.hadoop.hbase.HRegionLocation;
037import org.apache.hadoop.hbase.RegionLocations;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.exceptions.DeserializationException;
040import org.apache.hadoop.hbase.master.RegionState;
041import org.apache.hadoop.hbase.util.Pair;
042import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient;
043import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
044import org.apache.yetus.audience.InterfaceAudience;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
049import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
050
051/**
052 * Zookeeper based registry implementation.
053 */
054@InterfaceAudience.Private
055class ZKConnectionRegistry implements ConnectionRegistry {
056
057  private static final Logger LOG = LoggerFactory.getLogger(ZKConnectionRegistry.class);
058
059  private final ReadOnlyZKClient zk;
060
061  private final ZNodePaths znodePaths;
062
063  ZKConnectionRegistry(Configuration conf) {
064    this.znodePaths = new ZNodePaths(conf);
065    this.zk = new ReadOnlyZKClient(conf);
066  }
067
068  private interface Converter<T> {
069    T convert(byte[] data) throws Exception;
070  }
071
072  private <T> CompletableFuture<T> getAndConvert(String path, Converter<T> converter) {
073    CompletableFuture<T> future = new CompletableFuture<>();
074    addListener(zk.get(path), (data, error) -> {
075      if (error != null) {
076        future.completeExceptionally(error);
077        return;
078      }
079      try {
080        future.complete(converter.convert(data));
081      } catch (Exception e) {
082        future.completeExceptionally(e);
083      }
084    });
085    return future;
086  }
087
088  private static String getClusterId(byte[] data) throws DeserializationException {
089    if (data == null || data.length == 0) {
090      return null;
091    }
092    data = removeMetaData(data);
093    return ClusterId.parseFrom(data).toString();
094  }
095
096  @Override
097  public CompletableFuture<String> getClusterId() {
098    return tracedFuture(
099      () -> getAndConvert(znodePaths.clusterIdZNode, ZKConnectionRegistry::getClusterId),
100      "ZKConnectionRegistry.getClusterId");
101  }
102
103  ReadOnlyZKClient getZKClient() {
104    return zk;
105  }
106
107  private static ZooKeeperProtos.MetaRegionServer getMetaProto(byte[] data) throws IOException {
108    if (data == null || data.length == 0) {
109      return null;
110    }
111    data = removeMetaData(data);
112    int prefixLen = lengthOfPBMagic();
113    return ZooKeeperProtos.MetaRegionServer.parser().parseFrom(data, prefixLen,
114      data.length - prefixLen);
115  }
116
117  private static void tryComplete(MutableInt remaining, HRegionLocation[] locs,
118    CompletableFuture<RegionLocations> future) {
119    remaining.decrement();
120    if (remaining.intValue() > 0) {
121      return;
122    }
123    future.complete(new RegionLocations(locs));
124  }
125
126  private Pair<RegionState.State, ServerName>
127    getStateAndServerName(ZooKeeperProtos.MetaRegionServer proto) {
128    RegionState.State state;
129    if (proto.hasState()) {
130      state = RegionState.State.convert(proto.getState());
131    } else {
132      state = RegionState.State.OPEN;
133    }
134    HBaseProtos.ServerName snProto = proto.getServer();
135    return Pair.newPair(state,
136      ServerName.valueOf(snProto.getHostName(), snProto.getPort(), snProto.getStartCode()));
137  }
138
139  private void getMetaRegionLocation(CompletableFuture<RegionLocations> future,
140    List<String> metaReplicaZNodes) {
141    if (metaReplicaZNodes.isEmpty()) {
142      future.completeExceptionally(new IOException("No meta znode available"));
143    }
144    HRegionLocation[] locs = new HRegionLocation[metaReplicaZNodes.size()];
145    MutableInt remaining = new MutableInt(locs.length);
146    for (String metaReplicaZNode : metaReplicaZNodes) {
147      int replicaId = znodePaths.getMetaReplicaIdFromZNode(metaReplicaZNode);
148      String path = ZNodePaths.joinZNode(znodePaths.baseZNode, metaReplicaZNode);
149      if (replicaId == DEFAULT_REPLICA_ID) {
150        addListener(getAndConvert(path, ZKConnectionRegistry::getMetaProto), (proto, error) -> {
151          if (error != null) {
152            future.completeExceptionally(error);
153            return;
154          }
155          if (proto == null) {
156            future.completeExceptionally(new IOException("Meta znode is null"));
157            return;
158          }
159          Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto);
160          if (stateAndServerName.getFirst() != RegionState.State.OPEN) {
161            LOG.warn("Meta region is in state " + stateAndServerName.getFirst());
162          }
163          locs[DEFAULT_REPLICA_ID] = new HRegionLocation(
164            getRegionInfoForDefaultReplica(FIRST_META_REGIONINFO), stateAndServerName.getSecond());
165          tryComplete(remaining, locs, future);
166        });
167      } else {
168        addListener(getAndConvert(path, ZKConnectionRegistry::getMetaProto), (proto, error) -> {
169          if (future.isDone()) {
170            return;
171          }
172          if (error != null) {
173            LOG.warn("Failed to fetch " + path, error);
174            locs[replicaId] = null;
175          } else if (proto == null) {
176            LOG.warn("Meta znode for replica " + replicaId + " is null");
177            locs[replicaId] = null;
178          } else {
179            Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto);
180            if (stateAndServerName.getFirst() != RegionState.State.OPEN) {
181              LOG.warn("Meta region for replica " + replicaId + " is in state "
182                + stateAndServerName.getFirst());
183              locs[replicaId] = null;
184            } else {
185              locs[replicaId] =
186                new HRegionLocation(getRegionInfoForReplica(FIRST_META_REGIONINFO, replicaId),
187                  stateAndServerName.getSecond());
188            }
189          }
190          tryComplete(remaining, locs, future);
191        });
192      }
193    }
194  }
195
196  @Override
197  public CompletableFuture<RegionLocations> getMetaRegionLocations() {
198    return tracedFuture(() -> {
199      CompletableFuture<RegionLocations> future = new CompletableFuture<>();
200      addListener(
201        zk.list(znodePaths.baseZNode).thenApply(children -> children.stream()
202          .filter(c -> this.znodePaths.isMetaZNodePrefix(c)).collect(Collectors.toList())),
203        (metaReplicaZNodes, error) -> {
204          if (error != null) {
205            future.completeExceptionally(error);
206            return;
207          }
208          getMetaRegionLocation(future, metaReplicaZNodes);
209        });
210      return future;
211    }, "ZKConnectionRegistry.getMetaRegionLocations");
212  }
213
214  private static ZooKeeperProtos.Master getMasterProto(byte[] data) throws IOException {
215    if (data == null || data.length == 0) {
216      return null;
217    }
218    data = removeMetaData(data);
219    int prefixLen = lengthOfPBMagic();
220    return ZooKeeperProtos.Master.parser().parseFrom(data, prefixLen, data.length - prefixLen);
221  }
222
223  @Override
224  public CompletableFuture<ServerName> getActiveMaster() {
225    return tracedFuture(
226      () -> getAndConvert(znodePaths.masterAddressZNode, ZKConnectionRegistry::getMasterProto)
227        .thenApply(proto -> {
228          if (proto == null) {
229            return null;
230          }
231          HBaseProtos.ServerName snProto = proto.getMaster();
232          return ServerName.valueOf(snProto.getHostName(), snProto.getPort(),
233            snProto.getStartCode());
234        }),
235      "ZKConnectionRegistry.getActiveMaster");
236  }
237
238  @Override
239  public String getConnectionString() {
240    final String serverList = zk.getConnectString();
241    final String baseZNode = znodePaths.baseZNode;
242    return serverList + ":" + baseZNode;
243  }
244
245  @Override
246  public void close() {
247    zk.close();
248  }
249}