001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.RegionInfo.DEFAULT_REPLICA_ID;
021import static org.apache.hadoop.hbase.client.RegionInfoBuilder.FIRST_META_REGIONINFO;
022import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForDefaultReplica;
023import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForReplica;
024import static org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.lengthOfPBMagic;
025import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
026import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData;
027import java.io.IOException;
028import java.util.List;
029import java.util.concurrent.CompletableFuture;
030import java.util.stream.Collectors;
031import org.apache.commons.lang3.mutable.MutableInt;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.ClusterId;
034import org.apache.hadoop.hbase.HRegionLocation;
035import org.apache.hadoop.hbase.RegionLocations;
036import org.apache.hadoop.hbase.ServerName;
037import org.apache.hadoop.hbase.exceptions.DeserializationException;
038import org.apache.hadoop.hbase.master.RegionState;
039import org.apache.hadoop.hbase.util.Pair;
040import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient;
041import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
046import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
047import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
048
049/**
050 * Zookeeper based registry implementation.
051 */
052@InterfaceAudience.Private
053class ZKAsyncRegistry implements AsyncRegistry {
054
055  private static final Logger LOG = LoggerFactory.getLogger(ZKAsyncRegistry.class);
056
057  private final ReadOnlyZKClient zk;
058
059  private final ZNodePaths znodePaths;
060
061  ZKAsyncRegistry(Configuration conf) {
062    this.znodePaths = new ZNodePaths(conf);
063    this.zk = new ReadOnlyZKClient(conf);
064  }
065
066  private interface Converter<T> {
067    T convert(byte[] data) throws Exception;
068  }
069
070  private <T> CompletableFuture<T> getAndConvert(String path, Converter<T> converter) {
071    CompletableFuture<T> future = new CompletableFuture<>();
072    addListener(zk.get(path), (data, error) -> {
073      if (error != null) {
074        future.completeExceptionally(error);
075        return;
076      }
077      try {
078        future.complete(converter.convert(data));
079      } catch (Exception e) {
080        future.completeExceptionally(e);
081      }
082    });
083    return future;
084  }
085
086  private static String getClusterId(byte[] data) throws DeserializationException {
087    if (data == null || data.length == 0) {
088      return null;
089    }
090    data = removeMetaData(data);
091    return ClusterId.parseFrom(data).toString();
092  }
093
094  @Override
095  public CompletableFuture<String> getClusterId() {
096    return getAndConvert(znodePaths.clusterIdZNode, ZKAsyncRegistry::getClusterId);
097  }
098
099  @VisibleForTesting
100  ReadOnlyZKClient getZKClient() {
101    return zk;
102  }
103
104  private static ZooKeeperProtos.MetaRegionServer getMetaProto(byte[] data) throws IOException {
105    if (data == null || data.length == 0) {
106      return null;
107    }
108    data = removeMetaData(data);
109    int prefixLen = lengthOfPBMagic();
110    return ZooKeeperProtos.MetaRegionServer.parser().parseFrom(data, prefixLen,
111      data.length - prefixLen);
112  }
113
114  private static void tryComplete(MutableInt remaining, HRegionLocation[] locs,
115      CompletableFuture<RegionLocations> future) {
116    remaining.decrement();
117    if (remaining.intValue() > 0) {
118      return;
119    }
120    future.complete(new RegionLocations(locs));
121  }
122
123  private Pair<RegionState.State, ServerName> getStateAndServerName(
124      ZooKeeperProtos.MetaRegionServer proto) {
125    RegionState.State state;
126    if (proto.hasState()) {
127      state = RegionState.State.convert(proto.getState());
128    } else {
129      state = RegionState.State.OPEN;
130    }
131    HBaseProtos.ServerName snProto = proto.getServer();
132    return Pair.newPair(state,
133      ServerName.valueOf(snProto.getHostName(), snProto.getPort(), snProto.getStartCode()));
134  }
135
136  private void getMetaRegionLocation(CompletableFuture<RegionLocations> future,
137      List<String> metaReplicaZNodes) {
138    if (metaReplicaZNodes.isEmpty()) {
139      future.completeExceptionally(new IOException("No meta znode available"));
140    }
141    HRegionLocation[] locs = new HRegionLocation[metaReplicaZNodes.size()];
142    MutableInt remaining = new MutableInt(locs.length);
143    for (String metaReplicaZNode : metaReplicaZNodes) {
144      int replicaId = znodePaths.getMetaReplicaIdFromZnode(metaReplicaZNode);
145      String path = ZNodePaths.joinZNode(znodePaths.baseZNode, metaReplicaZNode);
146      if (replicaId == DEFAULT_REPLICA_ID) {
147        addListener(getAndConvert(path, ZKAsyncRegistry::getMetaProto), (proto, error) -> {
148          if (error != null) {
149            future.completeExceptionally(error);
150            return;
151          }
152          if (proto == null) {
153            future.completeExceptionally(new IOException("Meta znode is null"));
154            return;
155          }
156          Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto);
157          if (stateAndServerName.getFirst() != RegionState.State.OPEN) {
158            LOG.warn("Meta region is in state " + stateAndServerName.getFirst());
159          }
160          locs[DEFAULT_REPLICA_ID] = new HRegionLocation(
161            getRegionInfoForDefaultReplica(FIRST_META_REGIONINFO), stateAndServerName.getSecond());
162          tryComplete(remaining, locs, future);
163        });
164      } else {
165        addListener(getAndConvert(path, ZKAsyncRegistry::getMetaProto), (proto, error) -> {
166          if (future.isDone()) {
167            return;
168          }
169          if (error != null) {
170            LOG.warn("Failed to fetch " + path, error);
171            locs[replicaId] = null;
172          } else if (proto == null) {
173            LOG.warn("Meta znode for replica " + replicaId + " is null");
174            locs[replicaId] = null;
175          } else {
176            Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto);
177            if (stateAndServerName.getFirst() != RegionState.State.OPEN) {
178              LOG.warn("Meta region for replica " + replicaId + " is in state " +
179                stateAndServerName.getFirst());
180              locs[replicaId] = null;
181            } else {
182              locs[replicaId] =
183                new HRegionLocation(getRegionInfoForReplica(FIRST_META_REGIONINFO, replicaId),
184                  stateAndServerName.getSecond());
185            }
186          }
187          tryComplete(remaining, locs, future);
188        });
189      }
190    }
191  }
192
193  @Override
194  public CompletableFuture<RegionLocations> getMetaRegionLocation() {
195    CompletableFuture<RegionLocations> future = new CompletableFuture<>();
196    addListener(
197      zk.list(znodePaths.baseZNode)
198        .thenApply(children -> children.stream()
199          .filter(c -> c.startsWith(znodePaths.metaZNodePrefix)).collect(Collectors.toList())),
200      (metaReplicaZNodes, error) -> {
201        if (error != null) {
202          future.completeExceptionally(error);
203          return;
204        }
205        getMetaRegionLocation(future, metaReplicaZNodes);
206      });
207    return future;
208  }
209
210  private static ZooKeeperProtos.Master getMasterProto(byte[] data) throws IOException {
211    if (data == null || data.length == 0) {
212      return null;
213    }
214    data = removeMetaData(data);
215    int prefixLen = lengthOfPBMagic();
216    return ZooKeeperProtos.Master.parser().parseFrom(data, prefixLen, data.length - prefixLen);
217  }
218
219  @Override
220  public CompletableFuture<ServerName> getMasterAddress() {
221    return getAndConvert(znodePaths.masterAddressZNode, ZKAsyncRegistry::getMasterProto)
222        .thenApply(proto -> {
223          if (proto == null) {
224            return null;
225          }
226          HBaseProtos.ServerName snProto = proto.getMaster();
227          return ServerName.valueOf(snProto.getHostName(), snProto.getPort(),
228            snProto.getStartCode());
229        });
230  }
231
232  @Override
233  public void close() {
234    zk.close();
235  }
236}