001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.replication;
020
021import java.io.ByteArrayOutputStream;
022import java.io.IOException;
023import java.util.List;
024
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.Abortable;
027import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
028import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
029import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
030import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
031import org.apache.hadoop.hbase.zookeeper.ZKConfig;
032import org.apache.hadoop.hbase.zookeeper.ZKUtil;
033import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
034import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.apache.zookeeper.KeeperException;
037
038/**
039 * This is a base class for maintaining replication state in zookeeper.
040 */
041@InterfaceAudience.Private
042public abstract class ReplicationStateZKBase {
043
044  /**
045   * The name of the znode that contains the replication status of a remote slave (i.e. peer)
046   * cluster.
047   */
048  protected final String peerStateNodeName;
049  /** The name of the base znode that contains all replication state. */
050  protected final String replicationZNode;
051  /** The name of the znode that contains a list of all remote slave (i.e. peer) clusters. */
052  protected final String peersZNode;
053  /** The name of the znode that contains all replication queues */
054  protected final String queuesZNode;
055  /** The name of the znode that contains queues of hfile references to be replicated */
056  protected final String hfileRefsZNode;
057  /** The cluster key of the local cluster */
058  protected final String ourClusterKey;
059  /** The name of the znode that contains tableCFs */
060  protected final String tableCFsNodeName;
061
062  protected final ZKWatcher zookeeper;
063  protected final Configuration conf;
064  protected final Abortable abortable;
065
066  // Public for testing
067  public static final byte[] ENABLED_ZNODE_BYTES =
068      toByteArray(ReplicationProtos.ReplicationState.State.ENABLED);
069  public static final byte[] DISABLED_ZNODE_BYTES =
070      toByteArray(ReplicationProtos.ReplicationState.State.DISABLED);
071  public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY =
072      "zookeeper.znode.replication.hfile.refs";
073  public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs";
074
075  public ReplicationStateZKBase(ZKWatcher zookeeper, Configuration conf,
076                                Abortable abortable) {
077    this.zookeeper = zookeeper;
078    this.conf = conf;
079    this.abortable = abortable;
080
081    String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
082    String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers");
083    String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
084    String hfileRefsZNodeName = conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
085      ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT);
086    this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state");
087    this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs");
088    this.ourClusterKey = ZKConfig.getZooKeeperClusterKey(this.conf);
089    this.replicationZNode = ZNodePaths.joinZNode(this.zookeeper.znodePaths.baseZNode,
090      replicationZNodeName);
091    this.peersZNode = ZNodePaths.joinZNode(replicationZNode, peersZNodeName);
092    this.queuesZNode = ZNodePaths.joinZNode(replicationZNode, queuesZNodeName);
093    this.hfileRefsZNode = ZNodePaths.joinZNode(replicationZNode, hfileRefsZNodeName);
094  }
095
096  /**
097   * Subclasses that use ZK explicitly can just call this directly while classes
098   * that are trying to hide internal details of storage can wrap the KeeperException
099   * into a ReplicationException or something else.
100   */
101  protected List<String> getListOfReplicatorsZK() throws KeeperException {
102    List<String> result = null;
103    try {
104      result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.queuesZNode);
105    } catch (KeeperException e) {
106      this.abortable.abort("Failed to get list of replicators", e);
107      throw e;
108    }
109    return result;
110  }
111
112  /**
113   * @param state
114   * @return Serialized protobuf of <code>state</code> with pb magic prefix prepended suitable for
115   *         use as content of a peer-state znode under a peer cluster id as in
116   *         /hbase/replication/peers/PEER_ID/peer-state.
117   */
118  protected static byte[] toByteArray(final ReplicationProtos.ReplicationState.State state) {
119    ReplicationProtos.ReplicationState msg =
120        ReplicationProtos.ReplicationState.newBuilder().setState(state).build();
121    // There is no toByteArray on this pb Message?
122    // 32 bytes is default which seems fair enough here.
123    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
124      CodedOutputStream cos = CodedOutputStream.newInstance(baos, 16);
125      msg.writeTo(cos);
126      cos.flush();
127      baos.flush();
128      return ProtobufUtil.prependPBMagic(baos.toByteArray());
129    } catch (IOException e) {
130      throw new RuntimeException(e);
131    }
132  }
133
134  protected boolean peerExists(String id) throws KeeperException {
135    return ZKUtil.checkExists(this.zookeeper, ZNodePaths.joinZNode(this.peersZNode, id)) >= 0;
136  }
137
138  /**
139   * Determine if a ZK path points to a peer node.
140   * @param path path to be checked
141   * @return true if the path points to a peer node, otherwise false
142   */
143  protected boolean isPeerPath(String path) {
144    return path.split("/").length == peersZNode.split("/").length + 1;
145  }
146
147  @VisibleForTesting
148  protected String getTableCFsNode(String id) {
149    return ZNodePaths.joinZNode(this.peersZNode, ZNodePaths.joinZNode(id, this.tableCFsNodeName));
150  }
151
152  @VisibleForTesting
153  protected String getPeerStateNode(String id) {
154    return ZNodePaths.joinZNode(this.peersZNode, ZNodePaths.joinZNode(id, this.peerStateNodeName));
155  }
156  @VisibleForTesting
157  protected String getPeerNode(String id) {
158    return ZNodePaths.joinZNode(this.peersZNode, id);
159  }
160}