001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.RegionInfo.DEFAULT_REPLICA_ID;
021import static org.apache.hadoop.hbase.client.RegionInfoBuilder.FIRST_META_REGIONINFO;
022import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForDefaultReplica;
023import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForReplica;
024import static org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.lengthOfPBMagic;
025import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture;
026import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
027import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData;
028
029import java.io.IOException;
030import java.util.Collection;
031import java.util.List;
032import java.util.Map;
033import java.util.TreeMap;
034import java.util.concurrent.CompletableFuture;
035import java.util.stream.Collectors;
036import org.apache.commons.lang3.mutable.MutableInt;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.ClusterId;
039import org.apache.hadoop.hbase.HRegionLocation;
040import org.apache.hadoop.hbase.RegionLocations;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.exceptions.DeserializationException;
043import org.apache.hadoop.hbase.master.RegionState;
044import org.apache.hadoop.hbase.util.Pair;
045import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient;
046import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
047import org.apache.yetus.audience.InterfaceAudience;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
053
054/**
055 * Zookeeper based registry implementation.
056 */
057@InterfaceAudience.Private
058class ZKConnectionRegistry implements ConnectionRegistry {
059
060  private static final Logger LOG = LoggerFactory.getLogger(ZKConnectionRegistry.class);
061
062  private final ReadOnlyZKClient zk;
063
064  private final ZNodePaths znodePaths;
065
066  ZKConnectionRegistry(Configuration conf) {
067    this.znodePaths = new ZNodePaths(conf);
068    this.zk = new ReadOnlyZKClient(conf);
069  }
070
071  private interface Converter<T> {
072    T convert(byte[] data) throws Exception;
073  }
074
075  private <T> CompletableFuture<T> getAndConvert(String path, Converter<T> converter) {
076    CompletableFuture<T> future = new CompletableFuture<>();
077    addListener(zk.get(path), (data, error) -> {
078      if (error != null) {
079        future.completeExceptionally(error);
080        return;
081      }
082      try {
083        future.complete(converter.convert(data));
084      } catch (Exception e) {
085        future.completeExceptionally(e);
086      }
087    });
088    return future;
089  }
090
091  private static String getClusterId(byte[] data) throws DeserializationException {
092    if (data == null || data.length == 0) {
093      return null;
094    }
095    data = removeMetaData(data);
096    return ClusterId.parseFrom(data).toString();
097  }
098
099  @Override
100  public CompletableFuture<String> getClusterId() {
101    return tracedFuture(
102      () -> getAndConvert(znodePaths.clusterIdZNode, ZKConnectionRegistry::getClusterId),
103      "ZKConnectionRegistry.getClusterId");
104  }
105
106  ReadOnlyZKClient getZKClient() {
107    return zk;
108  }
109
110  private static ZooKeeperProtos.MetaRegionServer getMetaProto(byte[] data) throws IOException {
111    if (data == null || data.length == 0) {
112      return null;
113    }
114    data = removeMetaData(data);
115    int prefixLen = lengthOfPBMagic();
116    return ZooKeeperProtos.MetaRegionServer.parser().parseFrom(data, prefixLen,
117      data.length - prefixLen);
118  }
119
120  private static void tryComplete(MutableInt remaining, Collection<HRegionLocation> locs,
121    CompletableFuture<RegionLocations> future) {
122    remaining.decrement();
123    if (remaining.intValue() > 0) {
124      return;
125    }
126    future.complete(new RegionLocations(locs));
127  }
128
129  private Pair<RegionState.State, ServerName>
130    getStateAndServerName(ZooKeeperProtos.MetaRegionServer proto) {
131    RegionState.State state;
132    if (proto.hasState()) {
133      state = RegionState.State.convert(proto.getState());
134    } else {
135      state = RegionState.State.OPEN;
136    }
137    HBaseProtos.ServerName snProto = proto.getServer();
138    return Pair.newPair(state,
139      ServerName.valueOf(snProto.getHostName(), snProto.getPort(), snProto.getStartCode()));
140  }
141
142  private void getMetaRegionLocation(CompletableFuture<RegionLocations> future,
143    List<String> metaReplicaZNodes) {
144    if (metaReplicaZNodes.isEmpty()) {
145      future.completeExceptionally(new IOException("No meta znode available"));
146    }
147    // Note, the list of metaReplicaZNodes may be discontiguous regards replicaId; i.e. we may have
148    // a znode for the default -- replicaId=0 -- and perhaps replicaId '2' but be could be missing
149    // znode for replicaId '1'. This is a transient condition. Because of this we are careful
150    // accumulating locations. We use a Map so retries overwrite rather than aggregate and the
151    // Map sorts just to be kind to further processing. The Map will retain the discontinuity on
152    // replicaIds but on completion (of the future), the Map values are passed to the
153    // RegionLocations constructor which knows how to deal with discontinuities.
154    final Map<Integer, HRegionLocation> locs = new TreeMap<>();
155    MutableInt remaining = new MutableInt(metaReplicaZNodes.size());
156    for (String metaReplicaZNode : metaReplicaZNodes) {
157      int replicaId = znodePaths.getMetaReplicaIdFromZNode(metaReplicaZNode);
158      String path = ZNodePaths.joinZNode(znodePaths.baseZNode, metaReplicaZNode);
159      if (replicaId == DEFAULT_REPLICA_ID) {
160        addListener(getAndConvert(path, ZKConnectionRegistry::getMetaProto), (proto, error) -> {
161          if (error != null) {
162            future.completeExceptionally(error);
163            return;
164          }
165          if (proto == null) {
166            future.completeExceptionally(new IOException("Meta znode is null"));
167            return;
168          }
169          Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto);
170          if (stateAndServerName.getFirst() != RegionState.State.OPEN) {
171            LOG.warn("Meta region is in state " + stateAndServerName.getFirst());
172          }
173          locs.put(replicaId, new HRegionLocation(
174            getRegionInfoForDefaultReplica(FIRST_META_REGIONINFO), stateAndServerName.getSecond()));
175          tryComplete(remaining, locs.values(), future);
176        });
177      } else {
178        addListener(getAndConvert(path, ZKConnectionRegistry::getMetaProto), (proto, error) -> {
179          if (future.isDone()) {
180            return;
181          }
182          if (error != null) {
183            LOG.warn("Failed to fetch " + path, error);
184            locs.put(replicaId, null);
185          } else if (proto == null) {
186            LOG.warn("Meta znode for replica " + replicaId + " is null");
187            locs.put(replicaId, null);
188          } else {
189            Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto);
190            if (stateAndServerName.getFirst() != RegionState.State.OPEN) {
191              LOG.warn("Meta region for replica " + replicaId + " is in state "
192                + stateAndServerName.getFirst());
193              locs.put(replicaId, null);
194            } else {
195              locs.put(replicaId,
196                new HRegionLocation(getRegionInfoForReplica(FIRST_META_REGIONINFO, replicaId),
197                  stateAndServerName.getSecond()));
198            }
199          }
200          tryComplete(remaining, locs.values(), future);
201        });
202      }
203    }
204  }
205
206  @Override
207  public CompletableFuture<RegionLocations> getMetaRegionLocations() {
208    return tracedFuture(() -> {
209      CompletableFuture<RegionLocations> future = new CompletableFuture<>();
210      addListener(
211        zk.list(znodePaths.baseZNode).thenApply(children -> children.stream()
212          .filter(c -> this.znodePaths.isMetaZNodePrefix(c)).collect(Collectors.toList())),
213        (metaReplicaZNodes, error) -> {
214          if (error != null) {
215            future.completeExceptionally(error);
216            return;
217          }
218          getMetaRegionLocation(future, metaReplicaZNodes);
219        });
220      return future;
221    }, "ZKConnectionRegistry.getMetaRegionLocations");
222  }
223
224  private static ZooKeeperProtos.Master getMasterProto(byte[] data) throws IOException {
225    if (data == null || data.length == 0) {
226      return null;
227    }
228    data = removeMetaData(data);
229    int prefixLen = lengthOfPBMagic();
230    return ZooKeeperProtos.Master.parser().parseFrom(data, prefixLen, data.length - prefixLen);
231  }
232
233  @Override
234  public CompletableFuture<ServerName> getActiveMaster() {
235    return tracedFuture(
236      () -> getAndConvert(znodePaths.masterAddressZNode, ZKConnectionRegistry::getMasterProto)
237        .thenApply(proto -> {
238          if (proto == null) {
239            return null;
240          }
241          HBaseProtos.ServerName snProto = proto.getMaster();
242          return ServerName.valueOf(snProto.getHostName(), snProto.getPort(),
243            snProto.getStartCode());
244        }),
245      "ZKConnectionRegistry.getActiveMaster");
246  }
247
248  @Override
249  public String getConnectionString() {
250    final String serverList = zk.getConnectString();
251    final String baseZNode = znodePaths.baseZNode;
252    return serverList + ":" + baseZNode;
253  }
254
255  @Override
256  public void close() {
257    zk.close();
258  }
259}