View Javadoc

1   /*
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.replication;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.List;
25  import java.util.concurrent.atomic.AtomicBoolean;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.Abortable;
31  import org.apache.hadoop.hbase.ServerName;
32  import org.apache.hadoop.hbase.replication.ReplicationZookeeper.PeerState;
33  import org.apache.hadoop.hbase.util.Bytes;
34  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
35  import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
36  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
37  import org.apache.zookeeper.KeeperException;
38  
39  /**
40   * This class acts as a wrapper for all the objects used to identify and
41   * communicate with remote peers and is responsible for answering to expired
42   * sessions and re-establishing the ZK connections.
43   */
44  public class ReplicationPeer implements Abortable {
45    private static final Log LOG = LogFactory.getLog(ReplicationPeer.class);
46  
47    private final String clusterKey;
48    private final String id;
49    private List<ServerName> regionServers = new ArrayList<ServerName>(0);
50    private final AtomicBoolean peerEnabled = new AtomicBoolean();
51    // Cannot be final since a new object needs to be recreated when session fails
52    private ZooKeeperWatcher zkw;
53    private final Configuration conf;
54  
55    private PeerStateTracker peerStateTracker;
56  
57    /**
58     * Constructor that takes all the objects required to communicate with the
59     * specified peer, except for the region server addresses.
60     * @param conf configuration object to this peer
61     * @param key cluster key used to locate the peer
62     * @param id string representation of this peer's identifier
63     */
64    public ReplicationPeer(Configuration conf, String key,
65        String id) throws IOException {
66      this.conf = conf;
67      this.clusterKey = key;
68      this.id = id;
69      this.reloadZkWatcher();
70    }
71  
72    /**
73     * start a state tracker to check whether this peer is enabled or not
74     *
75     * @param zookeeper zk watcher for the local cluster
76     * @param peerStateNode path to zk node which stores peer state
77     * @throws KeeperException
78     */
79    public void startStateTracker(ZooKeeperWatcher zookeeper, String peerStateNode)
80        throws KeeperException {
81      if (ZKUtil.checkExists(zookeeper, peerStateNode) == -1) {
82        // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
83        // peer-state znode. This happens while adding a peer.
84        // The peer state data is set as "ENABLED" by default.
85        ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, peerStateNode,
86          Bytes.toBytes(PeerState.ENABLED.name()));
87      }
88      this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper,
89          this);
90      this.peerStateTracker.start();
91      this.readPeerStateZnode();
92    }
93  
94    private void readPeerStateZnode() {
95      String currentState = Bytes.toString(peerStateTracker.getData(false));
96      this.peerEnabled.set(PeerState.ENABLED.equals(PeerState
97          .valueOf(currentState)));
98    }
99  
100   /**
101    * Get the cluster key of that peer
102    * @return string consisting of zk ensemble addresses, client port
103    * and root znode
104    */
105   public String getClusterKey() {
106     return clusterKey;
107   }
108 
109   /**
110    * Get the state of this peer
111    * @return atomic boolean that holds the status
112    */
113   public AtomicBoolean getPeerEnabled() {
114     return peerEnabled;
115   }
116 
117   /**
118    * Get a list of all the addresses of all the region servers
119    * for this peer cluster
120    * @return list of addresses
121    */
122   public List<ServerName> getRegionServers() {
123     return regionServers;
124   }
125 
126   /**
127    * Set the list of region servers for that peer
128    * @param regionServers list of addresses for the region servers
129    */
130   public void setRegionServers(List<ServerName> regionServers) {
131     this.regionServers = regionServers;
132   }
133 
134   /**
135    * Get the ZK connection to this peer
136    * @return zk connection
137    */
138   public ZooKeeperWatcher getZkw() {
139     return zkw;
140   }
141 
142   /**
143    * Get the identifier of this peer
144    * @return string representation of the id (short)
145    */
146   public String getId() {
147     return id;
148   }
149 
150   /**
151    * Get the configuration object required to communicate with this peer
152    * @return configuration object
153    */
154   public Configuration getConfiguration() {
155     return conf;
156   }
157 
158   @Override
159   public void abort(String why, Throwable e) {
160     LOG.fatal("The ReplicationPeer coresponding to peer " + clusterKey
161         + " was aborted for the following reason(s):" + why, e);
162   }
163 
164   /**
165    * Closes the current ZKW (if not null) and creates a new one
166    * @throws IOException If anything goes wrong connecting
167    */
168   public void reloadZkWatcher() throws IOException {
169     if (zkw != null) zkw.close();
170     zkw = new ZooKeeperWatcher(conf,
171         "connection to cluster: " + id, this);    
172   }
173 
174   @Override
175   public boolean isAborted() {
176     // Currently the replication peer is never "Aborted", we just log when the
177     // abort method is called.
178     return false;
179   }
180 
181   /**
182    * Tracker for state of this peer
183    */
184   public class PeerStateTracker extends ZooKeeperNodeTracker {
185 
186     public PeerStateTracker(String peerStateZNode, ZooKeeperWatcher watcher,
187         Abortable abortable) {
188       super(watcher, peerStateZNode, abortable);
189     }
190 
191     @Override
192     public synchronized void nodeDataChanged(String path) {
193       if (path.equals(node)) {
194         super.nodeDataChanged(path);
195         readPeerStateZnode();
196       }
197     }
198   }
199 }