1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hbase.Abortable;
32 import org.apache.hadoop.hbase.TableName;
33 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
34 import org.apache.hadoop.hbase.exceptions.DeserializationException;
35 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
36 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
37 import org.apache.hadoop.hbase.util.Bytes;
38 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
39 import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
40 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
41 import org.apache.zookeeper.KeeperException;
42 import org.apache.zookeeper.KeeperException.NodeExistsException;
43
44 @InterfaceAudience.Private
45 public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closeable {
46 private static final Log LOG = LogFactory.getLog(ReplicationPeerZKImpl.class);
47
48 private final ReplicationPeerConfig peerConfig;
49 private final String id;
50 private volatile PeerState peerState;
51 private volatile Map<TableName, List<String>> tableCFs = new HashMap<TableName, List<String>>();
52 private final Configuration conf;
53
54 private PeerStateTracker peerStateTracker;
55 private TableCFsTracker tableCFsTracker;
56
57
58
59
60
61
62
63
64 public ReplicationPeerZKImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig)
65 throws ReplicationException {
66 this.conf = conf;
67 this.peerConfig = peerConfig;
68 this.id = id;
69 }
70
71
72
73
74
75
76
77
78 public void startStateTracker(ZooKeeperWatcher zookeeper, String peerStateNode)
79 throws KeeperException {
80 ensurePeerEnabled(zookeeper, peerStateNode);
81 this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this);
82 this.peerStateTracker.start();
83 try {
84 this.readPeerStateZnode();
85 } catch (DeserializationException e) {
86 throw ZKUtil.convert(e);
87 }
88 }
89
90 private void readPeerStateZnode() throws DeserializationException {
91 this.peerState =
92 isStateEnabled(this.peerStateTracker.getData(false))
93 ? PeerState.ENABLED
94 : PeerState.DISABLED;
95 }
96
97
98
99
100
101
102
103
104 public void startTableCFsTracker(ZooKeeperWatcher zookeeper, String tableCFsNode)
105 throws KeeperException {
106 this.tableCFsTracker = new TableCFsTracker(tableCFsNode, zookeeper,
107 this);
108 this.tableCFsTracker.start();
109 this.readTableCFsZnode();
110 }
111
112 private void readTableCFsZnode() {
113 String currentTableCFs = Bytes.toString(tableCFsTracker.getData(false));
114 this.tableCFs = ReplicationAdmin.parseTableCFsFromConfig(currentTableCFs);
115 }
116
117 @Override
118 public PeerState getPeerState() {
119 return peerState;
120 }
121
122
123
124
125
126 @Override
127 public String getId() {
128 return id;
129 }
130
131
132
133
134
135 @Override
136 public ReplicationPeerConfig getPeerConfig() {
137 return peerConfig;
138 }
139
140
141
142
143
144 @Override
145 public Configuration getConfiguration() {
146 return conf;
147 }
148
149
150
151
152
153 @Override
154 public Map<TableName, List<String>> getTableCFs() {
155 return this.tableCFs;
156 }
157
158 @Override
159 public void abort(String why, Throwable e) {
160 LOG.fatal("The ReplicationPeer coresponding to peer " + peerConfig
161 + " was aborted for the following reason(s):" + why, e);
162 }
163
164 @Override
165 public boolean isAborted() {
166
167
168 return false;
169 }
170
171 @Override
172 public void close() throws IOException {
173
174 }
175
176
177
178
179
180
181
182 public static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
183 ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
184 return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
185 }
186
187
188
189
190
191
192 private static ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
193 throws DeserializationException {
194 ProtobufUtil.expectPBMagicPrefix(bytes);
195 int pblen = ProtobufUtil.lengthOfPBMagic();
196 ZooKeeperProtos.ReplicationState.Builder builder =
197 ZooKeeperProtos.ReplicationState.newBuilder();
198 ZooKeeperProtos.ReplicationState state;
199 try {
200 ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
201 state = builder.build();
202 return state.getState();
203 } catch (IOException e) {
204 throw new DeserializationException(e);
205 }
206 }
207
208
209
210
211
212
213
214
215
216 private static boolean ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path)
217 throws NodeExistsException, KeeperException {
218 if (ZKUtil.checkExists(zookeeper, path) == -1) {
219
220
221
222 ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path,
223 ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
224 return true;
225 }
226 return false;
227 }
228
229
230
231
232 public class PeerStateTracker extends ZooKeeperNodeTracker {
233
234 public PeerStateTracker(String peerStateZNode, ZooKeeperWatcher watcher,
235 Abortable abortable) {
236 super(watcher, peerStateZNode, abortable);
237 }
238
239 @Override
240 public synchronized void nodeDataChanged(String path) {
241 if (path.equals(node)) {
242 super.nodeDataChanged(path);
243 try {
244 readPeerStateZnode();
245 } catch (DeserializationException e) {
246 LOG.warn("Failed deserializing the content of " + path, e);
247 }
248 }
249 }
250 }
251
252
253
254
255 public class TableCFsTracker extends ZooKeeperNodeTracker {
256
257 public TableCFsTracker(String tableCFsZNode, ZooKeeperWatcher watcher,
258 Abortable abortable) {
259 super(watcher, tableCFsZNode, abortable);
260 }
261
262 @Override
263 public synchronized void nodeCreated(String path) {
264 if (path.equals(node)) {
265 super.nodeCreated(path);
266 readTableCFsZnode();
267 }
268 }
269
270 @Override
271 public synchronized void nodeDataChanged(String path) {
272 if (path.equals(node)) {
273 super.nodeDataChanged(path);
274 }
275 }
276 }
277 }