001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.replication; 020 021import java.io.ByteArrayOutputStream; 022import java.io.IOException; 023import java.util.List; 024 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.Abortable; 027import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 028import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream; 029import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 030import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; 031import org.apache.hadoop.hbase.zookeeper.ZKConfig; 032import org.apache.hadoop.hbase.zookeeper.ZKUtil; 033import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 034import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.apache.zookeeper.KeeperException; 037 038/** 039 * This is a base class for maintaining replication state in zookeeper. 040 */ 041@InterfaceAudience.Private 042public abstract class ReplicationStateZKBase { 043 044 /** 045 * The name of the znode that contains the replication status of a remote slave (i.e. peer) 046 * cluster. 047 */ 048 protected final String peerStateNodeName; 049 /** The name of the base znode that contains all replication state. */ 050 protected final String replicationZNode; 051 /** The name of the znode that contains a list of all remote slave (i.e. peer) clusters. */ 052 protected final String peersZNode; 053 /** The name of the znode that contains all replication queues */ 054 protected final String queuesZNode; 055 /** The name of the znode that contains queues of hfile references to be replicated */ 056 protected final String hfileRefsZNode; 057 /** The cluster key of the local cluster */ 058 protected final String ourClusterKey; 059 /** The name of the znode that contains tableCFs */ 060 protected final String tableCFsNodeName; 061 062 protected final ZKWatcher zookeeper; 063 protected final Configuration conf; 064 protected final Abortable abortable; 065 066 // Public for testing 067 public static final byte[] ENABLED_ZNODE_BYTES = 068 toByteArray(ReplicationProtos.ReplicationState.State.ENABLED); 069 public static final byte[] DISABLED_ZNODE_BYTES = 070 toByteArray(ReplicationProtos.ReplicationState.State.DISABLED); 071 public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY = 072 "zookeeper.znode.replication.hfile.refs"; 073 public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs"; 074 075 public ReplicationStateZKBase(ZKWatcher zookeeper, Configuration conf, 076 Abortable abortable) { 077 this.zookeeper = zookeeper; 078 this.conf = conf; 079 this.abortable = abortable; 080 081 String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); 082 String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers"); 083 String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs"); 084 String hfileRefsZNodeName = conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY, 085 ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT); 086 this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state"); 087 this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs"); 088 this.ourClusterKey = ZKConfig.getZooKeeperClusterKey(this.conf); 089 this.replicationZNode = ZNodePaths.joinZNode(this.zookeeper.znodePaths.baseZNode, 090 replicationZNodeName); 091 this.peersZNode = ZNodePaths.joinZNode(replicationZNode, peersZNodeName); 092 this.queuesZNode = ZNodePaths.joinZNode(replicationZNode, queuesZNodeName); 093 this.hfileRefsZNode = ZNodePaths.joinZNode(replicationZNode, hfileRefsZNodeName); 094 } 095 096 /** 097 * Subclasses that use ZK explicitly can just call this directly while classes 098 * that are trying to hide internal details of storage can wrap the KeeperException 099 * into a ReplicationException or something else. 100 */ 101 protected List<String> getListOfReplicatorsZK() throws KeeperException { 102 List<String> result = null; 103 try { 104 result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.queuesZNode); 105 } catch (KeeperException e) { 106 this.abortable.abort("Failed to get list of replicators", e); 107 throw e; 108 } 109 return result; 110 } 111 112 /** 113 * @param state 114 * @return Serialized protobuf of <code>state</code> with pb magic prefix prepended suitable for 115 * use as content of a peer-state znode under a peer cluster id as in 116 * /hbase/replication/peers/PEER_ID/peer-state. 117 */ 118 protected static byte[] toByteArray(final ReplicationProtos.ReplicationState.State state) { 119 ReplicationProtos.ReplicationState msg = 120 ReplicationProtos.ReplicationState.newBuilder().setState(state).build(); 121 // There is no toByteArray on this pb Message? 122 // 32 bytes is default which seems fair enough here. 123 try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { 124 CodedOutputStream cos = CodedOutputStream.newInstance(baos, 16); 125 msg.writeTo(cos); 126 cos.flush(); 127 baos.flush(); 128 return ProtobufUtil.prependPBMagic(baos.toByteArray()); 129 } catch (IOException e) { 130 throw new RuntimeException(e); 131 } 132 } 133 134 protected boolean peerExists(String id) throws KeeperException { 135 return ZKUtil.checkExists(this.zookeeper, ZNodePaths.joinZNode(this.peersZNode, id)) >= 0; 136 } 137 138 /** 139 * Determine if a ZK path points to a peer node. 140 * @param path path to be checked 141 * @return true if the path points to a peer node, otherwise false 142 */ 143 protected boolean isPeerPath(String path) { 144 return path.split("/").length == peersZNode.split("/").length + 1; 145 } 146 147 @VisibleForTesting 148 protected String getTableCFsNode(String id) { 149 return ZNodePaths.joinZNode(this.peersZNode, ZNodePaths.joinZNode(id, this.tableCFsNodeName)); 150 } 151 152 @VisibleForTesting 153 protected String getPeerStateNode(String id) { 154 return ZNodePaths.joinZNode(this.peersZNode, ZNodePaths.joinZNode(id, this.peerStateNodeName)); 155 } 156 @VisibleForTesting 157 protected String getPeerNode(String id) { 158 return ZNodePaths.joinZNode(this.peersZNode, id); 159 } 160}