View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.util.HashMap;
24  import java.util.List;
25  import java.util.Map;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.hbase.Abortable;
32  import org.apache.hadoop.hbase.TableName;
33  import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
34  import org.apache.hadoop.hbase.exceptions.DeserializationException;
35  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
36  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
37  import org.apache.hadoop.hbase.util.Bytes;
38  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
39  import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
40  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
41  import org.apache.zookeeper.KeeperException;
42  import org.apache.zookeeper.KeeperException.NodeExistsException;
43  
44  @InterfaceAudience.Private
45  public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closeable {
46    private static final Log LOG = LogFactory.getLog(ReplicationPeerZKImpl.class);
47  
48    private final ReplicationPeerConfig peerConfig;
49    private final String id;
50    private volatile PeerState peerState;
51    private volatile Map<TableName, List<String>> tableCFs = new HashMap<TableName, List<String>>();
52    private final Configuration conf;
53  
54    private PeerStateTracker peerStateTracker;
55    private TableCFsTracker tableCFsTracker;
56  
57    /**
58     * Constructor that takes all the objects required to communicate with the
59     * specified peer, except for the region server addresses.
60     * @param conf configuration object to this peer
61     * @param id string representation of this peer's identifier
62     * @param peerConfig configuration for the replication peer
63     */
64    public ReplicationPeerZKImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig)
65        throws ReplicationException {
66      this.conf = conf;
67      this.peerConfig = peerConfig;
68      this.id = id;
69    }
70  
71    /**
72     * start a state tracker to check whether this peer is enabled or not
73     *
74     * @param zookeeper zk watcher for the local cluster
75     * @param peerStateNode path to zk node which stores peer state
76     * @throws KeeperException
77     */
78    public void startStateTracker(ZooKeeperWatcher zookeeper, String peerStateNode)
79        throws KeeperException {
80      ensurePeerEnabled(zookeeper, peerStateNode);
81      this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this);
82      this.peerStateTracker.start();
83      try {
84        this.readPeerStateZnode();
85      } catch (DeserializationException e) {
86        throw ZKUtil.convert(e);
87      }
88    }
89  
90    private void readPeerStateZnode() throws DeserializationException {
91      this.peerState =
92          isStateEnabled(this.peerStateTracker.getData(false))
93            ? PeerState.ENABLED
94            : PeerState.DISABLED;
95    }
96  
97    /**
98     * start a table-cfs tracker to listen the (table, cf-list) map change
99     *
100    * @param zookeeper zk watcher for the local cluster
101    * @param tableCFsNode path to zk node which stores table-cfs
102    * @throws KeeperException
103    */
104   public void startTableCFsTracker(ZooKeeperWatcher zookeeper, String tableCFsNode)
105     throws KeeperException {
106     this.tableCFsTracker = new TableCFsTracker(tableCFsNode, zookeeper,
107         this);
108     this.tableCFsTracker.start();
109     this.readTableCFsZnode();
110   }
111 
112   private void readTableCFsZnode() {
113     String currentTableCFs = Bytes.toString(tableCFsTracker.getData(false));
114     this.tableCFs = ReplicationAdmin.parseTableCFsFromConfig(currentTableCFs);
115   }
116 
117   @Override
118   public PeerState getPeerState() {
119     return peerState;
120   }
121 
122   /**
123    * Get the identifier of this peer
124    * @return string representation of the id (short)
125    */
126   @Override
127   public String getId() {
128     return id;
129   }
130 
131   /**
132    * Get the peer config object
133    * @return the ReplicationPeerConfig for this peer
134    */
135   @Override
136   public ReplicationPeerConfig getPeerConfig() {
137     return peerConfig;
138   }
139 
140   /**
141    * Get the configuration object required to communicate with this peer
142    * @return configuration object
143    */
144   @Override
145   public Configuration getConfiguration() {
146     return conf;
147   }
148 
149   /**
150    * Get replicable (table, cf-list) map of this peer
151    * @return the replicable (table, cf-list) map
152    */
153   @Override
154   public Map<TableName, List<String>> getTableCFs() {
155     return this.tableCFs;
156   }
157 
158   @Override
159   public void abort(String why, Throwable e) {
160     LOG.fatal("The ReplicationPeer coresponding to peer " + peerConfig
161         + " was aborted for the following reason(s):" + why, e);
162   }
163 
164   @Override
165   public boolean isAborted() {
166     // Currently the replication peer is never "Aborted", we just log when the
167     // abort method is called.
168     return false;
169   }
170 
171   @Override
172   public void close() throws IOException {
173     // TODO: stop zkw?
174   }
175 
176   /**
177    * Parse the raw data from ZK to get a peer's state
178    * @param bytes raw ZK data
179    * @return True if the passed in <code>bytes</code> are those of a pb serialized ENABLED state.
180    * @throws DeserializationException
181    */
182   public static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
183     ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
184     return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
185   }
186 
187   /**
188    * @param bytes Content of a state znode.
189    * @return State parsed from the passed bytes.
190    * @throws DeserializationException
191    */
192   private static ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
193       throws DeserializationException {
194     ProtobufUtil.expectPBMagicPrefix(bytes);
195     int pblen = ProtobufUtil.lengthOfPBMagic();
196     ZooKeeperProtos.ReplicationState.Builder builder =
197         ZooKeeperProtos.ReplicationState.newBuilder();
198     ZooKeeperProtos.ReplicationState state;
199     try {
200       ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
201       state = builder.build();
202       return state.getState();
203     } catch (IOException e) {
204       throw new DeserializationException(e);
205     }
206   }
207 
208   /**
209    * Utility method to ensure an ENABLED znode is in place; if not present, we create it.
210    * @param zookeeper
211    * @param path Path to znode to check
212    * @return True if we created the znode.
213    * @throws NodeExistsException
214    * @throws KeeperException
215    */
216   private static boolean ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path)
217       throws NodeExistsException, KeeperException {
218     if (ZKUtil.checkExists(zookeeper, path) == -1) {
219       // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
220       // peer-state znode. This happens while adding a peer.
221       // The peer state data is set as "ENABLED" by default.
222       ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path,
223         ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
224       return true;
225     }
226     return false;
227   }
228 
229   /**
230    * Tracker for state of this peer
231    */
232   public class PeerStateTracker extends ZooKeeperNodeTracker {
233 
234     public PeerStateTracker(String peerStateZNode, ZooKeeperWatcher watcher,
235         Abortable abortable) {
236       super(watcher, peerStateZNode, abortable);
237     }
238 
239     @Override
240     public synchronized void nodeDataChanged(String path) {
241       if (path.equals(node)) {
242         super.nodeDataChanged(path);
243         try {
244           readPeerStateZnode();
245         } catch (DeserializationException e) {
246           LOG.warn("Failed deserializing the content of " + path, e);
247         }
248       }
249     }
250   }
251 
252   /**
253    * Tracker for (table, cf-list) map of this peer
254    */
255   public class TableCFsTracker extends ZooKeeperNodeTracker {
256 
257     public TableCFsTracker(String tableCFsZNode, ZooKeeperWatcher watcher,
258         Abortable abortable) {
259       super(watcher, tableCFsZNode, abortable);
260     }
261     
262     @Override
263     public synchronized void nodeCreated(String path) {
264       if (path.equals(node)) {
265         super.nodeCreated(path);
266         readTableCFsZnode();
267       }
268     }
269 
270     @Override
271     public synchronized void nodeDataChanged(String path) {
272       if (path.equals(node)) {
273         super.nodeDataChanged(path);
274       }
275     }
276   }
277 }