View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.HashMap;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.concurrent.atomic.AtomicBoolean;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.classification.InterfaceAudience;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.hbase.Abortable;
34  import org.apache.hadoop.hbase.ServerName;
35  import org.apache.hadoop.hbase.exceptions.DeserializationException;
36  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
37  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
38  import org.apache.hadoop.hbase.util.Bytes;
39  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
40  import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
41  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
42  import org.apache.zookeeper.KeeperException;
43  import org.apache.zookeeper.KeeperException.NodeExistsException;
44  
45  import com.google.protobuf.InvalidProtocolBufferException;
46  
47  /**
48   * This class acts as a wrapper for all the objects used to identify and
49   * communicate with remote peers and is responsible for answering to expired
50   * sessions and re-establishing the ZK connections.
51   */
52  @InterfaceAudience.Private
53  public class ReplicationPeer implements Abortable, Closeable {
54    private static final Log LOG = LogFactory.getLog(ReplicationPeer.class);
55  
56    private final String clusterKey;
57    private final String id;
58    private List<ServerName> regionServers = new ArrayList<ServerName>(0);
59    private final AtomicBoolean peerEnabled = new AtomicBoolean();
60    private volatile Map<String, List<String>> tableCFs = new HashMap<String, List<String>>();
61    // Cannot be final since a new object needs to be recreated when session fails
62    private ZooKeeperWatcher zkw;
63    private final Configuration conf;
64    private long lastRegionserverUpdate;
65  
66    private PeerStateTracker peerStateTracker;
67    private TableCFsTracker tableCFsTracker;
68  
69    /**
70     * Constructor that takes all the objects required to communicate with the
71     * specified peer, except for the region server addresses.
72     * @param conf configuration object to this peer
73     * @param key cluster key used to locate the peer
74     * @param id string representation of this peer's identifier
75     */
76    public ReplicationPeer(Configuration conf, String key, String id) throws ReplicationException {
77      this.conf = conf;
78      this.clusterKey = key;
79      this.id = id;
80      try {
81        this.reloadZkWatcher();
82      } catch (IOException e) {
83        throw new ReplicationException("Error connecting to peer cluster with peerId=" + id, e);
84      }
85    }
86  
87    /**
88     * start a state tracker to check whether this peer is enabled or not
89     *
90     * @param zookeeper zk watcher for the local cluster
91     * @param peerStateNode path to zk node which stores peer state
92     * @throws KeeperException
93     */
94    public void startStateTracker(ZooKeeperWatcher zookeeper, String peerStateNode)
95        throws KeeperException {
96      ensurePeerEnabled(zookeeper, peerStateNode);
97      this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this);
98      this.peerStateTracker.start();
99      try {
100       this.readPeerStateZnode();
101     } catch (DeserializationException e) {
102       throw ZKUtil.convert(e);
103     }
104   }
105 
106   private void readPeerStateZnode() throws DeserializationException {
107     this.peerEnabled.set(isStateEnabled(this.peerStateTracker.getData(false)));
108   }
109 
110   /**
111    * start a table-cfs tracker to listen the (table, cf-list) map change
112    *
113    * @param zookeeper zk watcher for the local cluster
114    * @param tableCFsNode path to zk node which stores table-cfs
115    * @throws KeeperException
116    */
117   public void startTableCFsTracker(ZooKeeperWatcher zookeeper, String tableCFsNode)
118     throws KeeperException {
119     this.tableCFsTracker = new TableCFsTracker(tableCFsNode, zookeeper,
120         this);
121     this.tableCFsTracker.start();
122     this.readTableCFsZnode();
123   }
124 
125   static Map<String, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
126     if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
127       return null;
128     }
129 
130     Map<String, List<String>> tableCFsMap = null;
131 
132     // parse out (table, cf-list) pairs from tableCFsConfig
133     // format: "table1:cf1,cf2;table2:cfA,cfB"
134     String[] tables = tableCFsConfig.split(";");
135     for (String tab : tables) {
136       // 1 ignore empty table config
137       tab = tab.trim();
138       if (tab.length() == 0) {
139         continue;
140       }
141       // 2 split to "table" and "cf1,cf2"
142       //   for each table: "table:cf1,cf2" or "table"
143       String[] pair = tab.split(":");
144       String tabName = pair[0].trim();
145       if (pair.length > 2 || tabName.length() == 0) {
146         LOG.error("ignore invalid tableCFs setting: " + tab);
147         continue;
148       }
149 
150       // 3 parse "cf1,cf2" part to List<cf>
151       List<String> cfs = null;
152       if (pair.length == 2) {
153         String[] cfsList = pair[1].split(",");
154         for (String cf : cfsList) {
155           String cfName = cf.trim();
156           if (cfName.length() > 0) {
157             if (cfs == null) {
158               cfs = new ArrayList<String>();
159             }
160             cfs.add(cfName);
161           }
162         }
163       }
164 
165       // 4 put <table, List<cf>> to map
166       if (tableCFsMap == null) {
167         tableCFsMap = new HashMap<String, List<String>>();
168       }
169       tableCFsMap.put(tabName, cfs);
170     }
171 
172     return tableCFsMap;
173   }
174 
175   private void readTableCFsZnode() {
176     String currentTableCFs = Bytes.toString(tableCFsTracker.getData(false));
177     this.tableCFs = parseTableCFsFromConfig(currentTableCFs);
178   }
179 
180   /**
181    * Get the cluster key of that peer
182    * @return string consisting of zk ensemble addresses, client port
183    * and root znode
184    */
185   public String getClusterKey() {
186     return clusterKey;
187   }
188 
189   /**
190    * Get the state of this peer
191    * @return atomic boolean that holds the status
192    */
193   public AtomicBoolean getPeerEnabled() {
194     return peerEnabled;
195   }
196 
197   /**
198    * Get replicable (table, cf-list) map of this peer
199    * @return the replicable (table, cf-list) map
200    */
201   public Map<String, List<String>> getTableCFs() {
202     return this.tableCFs;
203   }
204 
205   /**
206    * Get a list of all the addresses of all the region servers
207    * for this peer cluster
208    * @return list of addresses
209    */
210   public List<ServerName> getRegionServers() {
211     return regionServers;
212   }
213 
214   /**
215    * Set the list of region servers for that peer
216    * @param regionServers list of addresses for the region servers
217    */
218   public void setRegionServers(List<ServerName> regionServers) {
219     this.regionServers = regionServers;
220     lastRegionserverUpdate = System.currentTimeMillis();
221   }
222 
223   /**
224    * Get the ZK connection to this peer
225    * @return zk connection
226    */
227   public ZooKeeperWatcher getZkw() {
228     return zkw;
229   }
230 
231   /**
232    * Get the timestamp at which the last change occurred to the list of region servers to replicate
233    * to.
234    * @return The System.currentTimeMillis at the last time the list of peer region servers changed.
235    */
236   public long getLastRegionserverUpdate() {
237     return lastRegionserverUpdate;
238   }
239 
240   /**
241    * Get the identifier of this peer
242    * @return string representation of the id (short)
243    */
244   public String getId() {
245     return id;
246   }
247 
248   /**
249    * Get the configuration object required to communicate with this peer
250    * @return configuration object
251    */
252   public Configuration getConfiguration() {
253     return conf;
254   }
255 
256   @Override
257   public void abort(String why, Throwable e) {
258     LOG.fatal("The ReplicationPeer coresponding to peer " + clusterKey
259         + " was aborted for the following reason(s):" + why, e);
260   }
261 
262   /**
263    * Closes the current ZKW (if not null) and creates a new one
264    * @throws IOException If anything goes wrong connecting
265    */
266   public void reloadZkWatcher() throws IOException {
267     if (zkw != null) zkw.close();
268     zkw = new ZooKeeperWatcher(conf,
269         "connection to cluster: " + id, this);
270   }
271 
272   @Override
273   public boolean isAborted() {
274     // Currently the replication peer is never "Aborted", we just log when the
275     // abort method is called.
276     return false;
277   }
278 
279   @Override
280   public void close() throws IOException {
281     if (zkw != null){
282       zkw.close();
283     }
284   }
285 
286   /**
287    * Parse the raw data from ZK to get a peer's state
288    * @param bytes raw ZK data
289    * @return True if the passed in <code>bytes</code> are those of a pb serialized ENABLED state.
290    * @throws DeserializationException
291    */
292   public static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
293     ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
294     return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
295   }
296 
297   /**
298    * @param bytes Content of a state znode.
299    * @return State parsed from the passed bytes.
300    * @throws DeserializationException
301    */
302   private static ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
303       throws DeserializationException {
304     ProtobufUtil.expectPBMagicPrefix(bytes);
305     int pblen = ProtobufUtil.lengthOfPBMagic();
306     ZooKeeperProtos.ReplicationState.Builder builder =
307         ZooKeeperProtos.ReplicationState.newBuilder();
308     ZooKeeperProtos.ReplicationState state;
309     try {
310       state = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
311       return state.getState();
312     } catch (InvalidProtocolBufferException e) {
313       throw new DeserializationException(e);
314     }
315   }
316 
317   /**
318    * Utility method to ensure an ENABLED znode is in place; if not present, we create it.
319    * @param zookeeper
320    * @param path Path to znode to check
321    * @return True if we created the znode.
322    * @throws NodeExistsException
323    * @throws KeeperException
324    */
325   private static boolean ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path)
326       throws NodeExistsException, KeeperException {
327     if (ZKUtil.checkExists(zookeeper, path) == -1) {
328       // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
329       // peer-state znode. This happens while adding a peer.
330       // The peer state data is set as "ENABLED" by default.
331       ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path,
332         ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
333       return true;
334     }
335     return false;
336   }
337 
338   /**
339    * Tracker for state of this peer
340    */
341   public class PeerStateTracker extends ZooKeeperNodeTracker {
342 
343     public PeerStateTracker(String peerStateZNode, ZooKeeperWatcher watcher,
344         Abortable abortable) {
345       super(watcher, peerStateZNode, abortable);
346     }
347 
348     @Override
349     public synchronized void nodeDataChanged(String path) {
350       if (path.equals(node)) {
351         super.nodeDataChanged(path);
352         try {
353           readPeerStateZnode();
354         } catch (DeserializationException e) {
355           LOG.warn("Failed deserializing the content of " + path, e);
356         }
357       }
358     }
359   }
360 
361   /**
362    * Tracker for (table, cf-list) map of this peer
363    */
364   public class TableCFsTracker extends ZooKeeperNodeTracker {
365 
366     public TableCFsTracker(String tableCFsZNode, ZooKeeperWatcher watcher,
367         Abortable abortable) {
368       super(watcher, tableCFsZNode, abortable);
369     }
370 
371     @Override
372     public synchronized void nodeDataChanged(String path) {
373       if (path.equals(node)) {
374         super.nodeDataChanged(path);
375         readTableCFsZnode();
376       }
377     }
378   }
379 }