1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication;
20  
21  import com.google.protobuf.InvalidProtocolBufferException;
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  import org.apache.hadoop.conf.Configuration;
25  import org.apache.hadoop.hbase.Abortable;
26  import org.apache.hadoop.hbase.exceptions.DeserializationException;
27  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
28  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
29  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
30  import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
31  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
32  import org.apache.zookeeper.KeeperException;
33  
34  import java.io.IOException;
35  import java.util.concurrent.atomic.AtomicBoolean;
36  
37  /**
38   * ReplicationStateImpl is responsible for maintaining the replication state
39   * znode.
40   */
41  public class ReplicationStateImpl extends ReplicationStateZKBase implements
42      ReplicationStateInterface {
43  
44    private final ReplicationStateTracker stateTracker;
45    private final AtomicBoolean replicating;
46  
47    private static final Log LOG = LogFactory.getLog(ReplicationStateImpl.class);
48  
49    public ReplicationStateImpl(final ZooKeeperWatcher zk, final Configuration conf,
50        final Abortable abortable, final AtomicBoolean replicating) {
51      super(zk, conf, abortable);
52      this.replicating = replicating;
53  
54      // Set a tracker on replicationStateNode
55      this.stateTracker =
56          new ReplicationStateTracker(this.zookeeper, this.stateZNode, this.abortable);
57      stateTracker.start();
58      readReplicationStateZnode();
59    }
60  
61    public ReplicationStateImpl(final ZooKeeperWatcher zk, final Configuration conf,
62        final Abortable abortable) {
63      this(zk, conf, abortable, new AtomicBoolean());
64    }
65  
66    @Override
67    public boolean getState() throws KeeperException {
68      return getReplication();
69    }
70  
71    @Override
72    public void setState(boolean newState) throws KeeperException {
73      setReplicating(newState);
74    }
75  
76    @Override
77    public void close() throws IOException {
78      if (stateTracker != null) stateTracker.stop();
79    }
80  
81    /**
82     * @param bytes
83     * @return True if the passed in <code>bytes</code> are those of a pb
84     *         serialized ENABLED state.
85     * @throws DeserializationException
86     */
87    private boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
88      ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
89      return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
90    }
91  
92    /**
93     * @param bytes Content of a state znode.
94     * @return State parsed from the passed bytes.
95     * @throws DeserializationException
96     */
97    private ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
98        throws DeserializationException {
99      ProtobufUtil.expectPBMagicPrefix(bytes);
100     int pblen = ProtobufUtil.lengthOfPBMagic();
101     ZooKeeperProtos.ReplicationState.Builder builder = ZooKeeperProtos.ReplicationState
102         .newBuilder();
103     ZooKeeperProtos.ReplicationState state;
104     try {
105       state = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
106       return state.getState();
107     } catch (InvalidProtocolBufferException e) {
108       throw new DeserializationException(e);
109     }
110   }
111 
112   /**
113    * Set the new replication state for this cluster
114    * @param newState
115    */
116   private void setReplicating(boolean newState) throws KeeperException {
117     ZKUtil.createWithParents(this.zookeeper, this.stateZNode);
118     byte[] stateBytes = (newState == true) ? ReplicationZookeeper.ENABLED_ZNODE_BYTES
119         : ReplicationZookeeper.DISABLED_ZNODE_BYTES;
120     ZKUtil.setData(this.zookeeper, this.stateZNode, stateBytes);
121   }
122 
123   /**
124    * Get the replication status of this cluster. If the state znode doesn't
125    * exist it will also create it and set it true.
126    * @return returns true when it's enabled, else false
127    * @throws KeeperException
128    */
129   private boolean getReplication() throws KeeperException {
130     byte[] data = this.stateTracker.getData(false);
131     if (data == null || data.length == 0) {
132       setReplicating(true);
133       return true;
134     }
135     try {
136       return isStateEnabled(data);
137     } catch (DeserializationException e) {
138       throw ZKUtil.convert(e);
139     }
140   }
141 
142   /**
143    * This reads the state znode for replication and sets the atomic boolean
144    */
145   private void readReplicationStateZnode() {
146     try {
147       this.replicating.set(getReplication());
148       LOG.info("Replication is now " + (this.replicating.get() ? "started" : "stopped"));
149     } catch (KeeperException e) {
150       this.abortable.abort("Failed getting data on from " + this.stateZNode, e);
151     }
152   }
153 
154   /**
155    * Tracker for status of the replication
156    */
157   private class ReplicationStateTracker extends ZooKeeperNodeTracker {
158     public ReplicationStateTracker(ZooKeeperWatcher watcher, String stateZnode, Abortable abortable) {
159       super(watcher, stateZnode, abortable);
160     }
161 
162     @Override
163     public synchronized void nodeDataChanged(String path) {
164       if (path.equals(node)) {
165         super.nodeDataChanged(path);
166         readReplicationStateZnode();
167       }
168     }
169   }
170 }