001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.RegionInfo.DEFAULT_REPLICA_ID;
021import static org.apache.hadoop.hbase.client.RegionInfoBuilder.FIRST_META_REGIONINFO;
022import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForDefaultReplica;
023import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForReplica;
024import static org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.lengthOfPBMagic;
025import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
026import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData;
027
028import java.io.IOException;
029import java.util.Collection;
030import java.util.List;
031import java.util.Map;
032import java.util.TreeMap;
033import java.util.concurrent.CompletableFuture;
034import java.util.stream.Collectors;
035import org.apache.commons.lang3.mutable.MutableInt;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.ClusterId;
038import org.apache.hadoop.hbase.HRegionLocation;
039import org.apache.hadoop.hbase.RegionLocations;
040import org.apache.hadoop.hbase.ServerName;
041import org.apache.hadoop.hbase.exceptions.DeserializationException;
042import org.apache.hadoop.hbase.master.RegionState;
043import org.apache.hadoop.hbase.util.Pair;
044import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient;
045import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
046import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
047import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
048import org.apache.yetus.audience.InterfaceAudience;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
054
055/**
056 * Zookeeper based registry implementation.
057 */
058@InterfaceAudience.Private
059class ZKConnectionRegistry implements ConnectionRegistry {
060
061  private static final Logger LOG = LoggerFactory.getLogger(ZKConnectionRegistry.class);
062
063  private final ReadOnlyZKClient zk;
064
065  private final ZNodePaths znodePaths;
066
067  ZKConnectionRegistry(Configuration conf) {
068    this.znodePaths = new ZNodePaths(conf);
069    this.zk = new ReadOnlyZKClient(conf);
070  }
071
072  private interface Converter<T> {
073    T convert(byte[] data) throws Exception;
074  }
075
076  private <T> CompletableFuture<T> getAndConvert(String path, Converter<T> converter) {
077    CompletableFuture<T> future = new CompletableFuture<>();
078    addListener(zk.get(path), (data, error) -> {
079      if (error != null) {
080        future.completeExceptionally(error);
081        return;
082      }
083      try {
084        future.complete(converter.convert(data));
085      } catch (Exception e) {
086        future.completeExceptionally(e);
087      }
088    });
089    return future;
090  }
091
092  private static String getClusterId(byte[] data) throws DeserializationException {
093    if (data == null || data.length == 0) {
094      return null;
095    }
096    data = removeMetaData(data);
097    return ClusterId.parseFrom(data).toString();
098  }
099
100  @Override
101  public CompletableFuture<String> getClusterId() {
102    return getAndConvert(znodePaths.clusterIdZNode, ZKConnectionRegistry::getClusterId);
103  }
104
105  ReadOnlyZKClient getZKClient() {
106    return zk;
107  }
108
109  private static ZooKeeperProtos.MetaRegionServer getMetaProto(byte[] data) throws IOException {
110    if (data == null || data.length == 0) {
111      return null;
112    }
113    data = removeMetaData(data);
114    int prefixLen = lengthOfPBMagic();
115    return ZooKeeperProtos.MetaRegionServer.parser().parseFrom(data, prefixLen,
116      data.length - prefixLen);
117  }
118
119  private static void tryComplete(MutableInt remaining, Collection<HRegionLocation> locs,
120      CompletableFuture<RegionLocations> future) {
121    remaining.decrement();
122    if (remaining.intValue() > 0) {
123      return;
124    }
125    future.complete(new RegionLocations(locs));
126  }
127
128  private Pair<RegionState.State, ServerName> getStateAndServerName(
129      ZooKeeperProtos.MetaRegionServer proto) {
130    RegionState.State state;
131    if (proto.hasState()) {
132      state = RegionState.State.convert(proto.getState());
133    } else {
134      state = RegionState.State.OPEN;
135    }
136    HBaseProtos.ServerName snProto = proto.getServer();
137    return Pair.newPair(state,
138      ServerName.valueOf(snProto.getHostName(), snProto.getPort(), snProto.getStartCode()));
139  }
140
141  private void getMetaRegionLocation(CompletableFuture<RegionLocations> future,
142      List<String> metaReplicaZNodes) {
143    if (metaReplicaZNodes.isEmpty()) {
144      future.completeExceptionally(new IOException("No meta znode available"));
145    }
146    // Note, the list of metaReplicaZNodes may be discontiguous regards replicaId; i.e. we may have
147    // a znode for the default -- replicaId=0 -- and perhaps replicaId '2' but be could be missing
148    // znode for replicaId '1'. This is a transient condition. Because of this we are careful
149    // accumulating locations. We use a Map so retries overwrite rather than aggregate and the
150    // Map sorts just to be kind to further processing. The Map will retain the discontinuity on
151    // replicaIds but on completion (of the future), the Map values are passed to the
152    // RegionLocations constructor which knows how to deal with discontinuities.
153    final Map<Integer, HRegionLocation> locs = new TreeMap<>();
154    MutableInt remaining = new MutableInt(metaReplicaZNodes.size());
155    for (String metaReplicaZNode : metaReplicaZNodes) {
156      int replicaId = znodePaths.getMetaReplicaIdFromZNode(metaReplicaZNode);
157      String path = ZNodePaths.joinZNode(znodePaths.baseZNode, metaReplicaZNode);
158      if (replicaId == DEFAULT_REPLICA_ID) {
159        addListener(getAndConvert(path, ZKConnectionRegistry::getMetaProto), (proto, error) -> {
160          if (error != null) {
161            future.completeExceptionally(error);
162            return;
163          }
164          if (proto == null) {
165            future.completeExceptionally(new IOException("Meta znode is null"));
166            return;
167          }
168          Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto);
169          if (stateAndServerName.getFirst() != RegionState.State.OPEN) {
170            LOG.warn("Meta region is in state " + stateAndServerName.getFirst());
171          }
172          locs.put(replicaId, new HRegionLocation(
173            getRegionInfoForDefaultReplica(FIRST_META_REGIONINFO), stateAndServerName.getSecond()));
174          tryComplete(remaining, locs.values(), future);
175        });
176      } else {
177        addListener(getAndConvert(path, ZKConnectionRegistry::getMetaProto), (proto, error) -> {
178          if (future.isDone()) {
179            return;
180          }
181          if (error != null) {
182            LOG.warn("Failed to fetch " + path, error);
183            locs.put(replicaId, null);
184          } else if (proto == null) {
185            LOG.warn("Meta znode for replica " + replicaId + " is null");
186            locs.put(replicaId, null);
187          } else {
188            Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto);
189            if (stateAndServerName.getFirst() != RegionState.State.OPEN) {
190              LOG.warn("Meta region for replica " + replicaId + " is in state " +
191                stateAndServerName.getFirst());
192              locs.put(replicaId, null);
193            } else {
194              locs.put(replicaId, new HRegionLocation(
195                getRegionInfoForReplica(FIRST_META_REGIONINFO, replicaId),
196                  stateAndServerName.getSecond()));
197            }
198          }
199          tryComplete(remaining, locs.values(), future);
200        });
201      }
202    }
203  }
204
205  @Override
206  public CompletableFuture<RegionLocations> getMetaRegionLocations() {
207    CompletableFuture<RegionLocations> future = new CompletableFuture<>();
208    addListener(
209      zk.list(znodePaths.baseZNode)
210        .thenApply(children -> children.stream()
211          .filter(c -> this.znodePaths.isMetaZNodePrefix(c)).collect(Collectors.toList())),
212      (metaReplicaZNodes, error) -> {
213        if (error != null) {
214          future.completeExceptionally(error);
215          return;
216        }
217        getMetaRegionLocation(future, metaReplicaZNodes);
218      });
219    return future;
220  }
221
222  private static ZooKeeperProtos.Master getMasterProto(byte[] data) throws IOException {
223    if (data == null || data.length == 0) {
224      return null;
225    }
226    data = removeMetaData(data);
227    int prefixLen = lengthOfPBMagic();
228    return ZooKeeperProtos.Master.parser().parseFrom(data, prefixLen, data.length - prefixLen);
229  }
230
231  @Override
232  public CompletableFuture<ServerName> getActiveMaster() {
233    return getAndConvert(znodePaths.masterAddressZNode, ZKConnectionRegistry::getMasterProto)
234        .thenApply(proto -> {
235          if (proto == null) {
236            return null;
237          }
238          HBaseProtos.ServerName snProto = proto.getMaster();
239          return ServerName.valueOf(snProto.getHostName(), snProto.getPort(),
240            snProto.getStartCode());
241        });
242  }
243
244  @Override
245  public void close() {
246    zk.close();
247  }
248}