001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.replication;
020
021import java.util.ArrayList;
022import java.util.List;
023import java.util.SortedSet;
024import java.util.TreeSet;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.Abortable;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.exceptions.DeserializationException;
031import org.apache.hadoop.hbase.util.Bytes;
032import org.apache.hadoop.hbase.util.Pair;
033import org.apache.hadoop.hbase.zookeeper.ZKUtil;
034import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
035import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
036import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.apache.zookeeper.KeeperException;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * This class provides an implementation of the
044 * interface using ZooKeeper. The
045 * base znode that this class works at is the myQueuesZnode. The myQueuesZnode contains a list of
046 * all outstanding WAL files on this region server that need to be replicated. The myQueuesZnode is
047 * the regionserver name (a concatenation of the region server’s hostname, client port and start
048 * code). For example:
049 *
050 * /hbase/replication/rs/hostname.example.org,6020,1234
051 *
052 * Within this znode, the region server maintains a set of WAL replication queues. These queues are
053 * represented by child znodes named using there give queue id. For example:
054 *
055 * /hbase/replication/rs/hostname.example.org,6020,1234/1
056 * /hbase/replication/rs/hostname.example.org,6020,1234/2
057 *
058 * Each queue has one child znode for every WAL that still needs to be replicated. The value of
059 * these WAL child znodes is the latest position that has been replicated. This position is updated
060 * every time a WAL entry is replicated. For example:
061 *
062 * /hbase/replication/rs/hostname.example.org,6020,1234/1/23522342.23422 [VALUE: 254]
063 */
064@InterfaceAudience.Private
065public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements ReplicationQueues {
066
067  /** Znode containing all replication queues for this region server. */
068  private String myQueuesZnode;
069
070  private static final Logger LOG = LoggerFactory.getLogger(ReplicationQueuesZKImpl.class);
071
072  public ReplicationQueuesZKImpl(ReplicationQueuesArguments args) {
073    this(args.getZk(), args.getConf(), args.getAbortable());
074  }
075
076  public ReplicationQueuesZKImpl(final ZKWatcher zk, Configuration conf,
077                                 Abortable abortable) {
078    super(zk, conf, abortable);
079  }
080
081  @Override
082  public void init(String serverName) throws ReplicationException {
083    this.myQueuesZnode = ZNodePaths.joinZNode(this.queuesZNode, serverName);
084    try {
085      if (ZKUtil.checkExists(this.zookeeper, this.myQueuesZnode) < 0) {
086        ZKUtil.createWithParents(this.zookeeper, this.myQueuesZnode);
087      }
088    } catch (KeeperException e) {
089      throw new ReplicationException("Could not initialize replication queues.", e);
090    }
091    if (conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
092      HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT)) {
093      try {
094        if (ZKUtil.checkExists(this.zookeeper, this.hfileRefsZNode) < 0) {
095          ZKUtil.createWithParents(this.zookeeper, this.hfileRefsZNode);
096        }
097      } catch (KeeperException e) {
098        throw new ReplicationException("Could not initialize hfile references replication queue.",
099            e);
100      }
101    }
102  }
103  
104  @Override
105  public List<String> getListOfReplicators() throws ReplicationException {
106    try {
107      return super.getListOfReplicatorsZK();
108    } catch (KeeperException e) {
109      LOG.warn("getListOfReplicators() from ZK failed", e);
110      throw new ReplicationException("getListOfReplicators() from ZK failed", e);
111    }
112  }
113  
114  @Override
115  public void removeQueue(String queueId) {
116    try {
117      ZKUtil.deleteNodeRecursively(this.zookeeper,
118        ZNodePaths.joinZNode(this.myQueuesZnode, queueId));
119    } catch (KeeperException e) {
120      this.abortable.abort("Failed to delete queue (queueId=" + queueId + ")", e);
121    }
122  }
123
124  @Override
125  public void addLog(String queueId, String filename) throws ReplicationException {
126    String znode = ZNodePaths.joinZNode(this.myQueuesZnode, queueId);
127    znode = ZNodePaths.joinZNode(znode, filename);
128    try {
129      ZKUtil.createWithParents(this.zookeeper, znode);
130    } catch (KeeperException e) {
131      throw new ReplicationException(
132          "Could not add log because znode could not be created. queueId=" + queueId
133              + ", filename=" + filename);
134    }
135  }
136
137  @Override
138  public void removeLog(String queueId, String filename) {
139    try {
140      String znode = ZNodePaths.joinZNode(this.myQueuesZnode, queueId);
141      znode = ZNodePaths.joinZNode(znode, filename);
142      ZKUtil.deleteNode(this.zookeeper, znode);
143    } catch (KeeperException e) {
144      this.abortable.abort("Failed to remove wal from queue (queueId=" + queueId + ", filename="
145          + filename + ")", e);
146    }
147  }
148
149  @Override
150  public void setLogPosition(String queueId, String filename, long position) {
151    try {
152      String znode = ZNodePaths.joinZNode(this.myQueuesZnode, queueId);
153      znode = ZNodePaths.joinZNode(znode, filename);
154      // Why serialize String of Long and not Long as bytes?
155      ZKUtil.setData(this.zookeeper, znode, ZKUtil.positionToByteArray(position));
156    } catch (KeeperException e) {
157      this.abortable.abort("Failed to write replication wal position (filename=" + filename
158          + ", position=" + position + ")", e);
159    }
160  }
161
162  @Override
163  public long getLogPosition(String queueId, String filename) throws ReplicationException {
164    String clusterZnode = ZNodePaths.joinZNode(this.myQueuesZnode, queueId);
165    String znode = ZNodePaths.joinZNode(clusterZnode, filename);
166    byte[] bytes = null;
167    try {
168      bytes = ZKUtil.getData(this.zookeeper, znode);
169    } catch (KeeperException e) {
170      throw new ReplicationException("Internal Error: could not get position in log for queueId="
171          + queueId + ", filename=" + filename, e);
172    } catch (InterruptedException e) {
173      Thread.currentThread().interrupt();
174      return 0;
175    }
176    try {
177      return ZKUtil.parseWALPositionFrom(bytes);
178    } catch (DeserializationException de) {
179      LOG.warn("Failed to parse WALPosition for queueId=" + queueId + " and wal=" + filename
180          + " znode content, continuing.");
181    }
182    // if we can not parse the position, start at the beginning of the wal file
183    // again
184    return 0;
185  }
186
187  @Override
188  public boolean isThisOurRegionServer(String regionserver) {
189    return ZNodePaths.joinZNode(this.queuesZNode, regionserver).equals(this.myQueuesZnode);
190  }
191
192  @Override
193  public List<String> getUnClaimedQueueIds(String regionserver) {
194    if (isThisOurRegionServer(regionserver)) {
195      return null;
196    }
197    String rsZnodePath = ZNodePaths.joinZNode(this.queuesZNode, regionserver);
198    List<String> queues = null;
199    try {
200      queues = ZKUtil.listChildrenNoWatch(this.zookeeper, rsZnodePath);
201    } catch (KeeperException e) {
202      this.abortable.abort("Failed to getUnClaimedQueueIds for RS" + regionserver, e);
203    }
204    return queues;
205  }
206
207  @Override
208  public Pair<String, SortedSet<String>> claimQueue(String regionserver, String queueId) {
209    LOG.info("Atomically moving " + regionserver + "/" + queueId + "'s WALs to my queue");
210    return moveQueueUsingMulti(regionserver, queueId);
211  }
212
213  @Override
214  public void removeReplicatorIfQueueIsEmpty(String regionserver) {
215    String rsPath = ZNodePaths.joinZNode(this.queuesZNode, regionserver);
216    try {
217      List<String> list = ZKUtil.listChildrenNoWatch(this.zookeeper, rsPath);
218      if (list != null && list.isEmpty()){
219        ZKUtil.deleteNode(this.zookeeper, rsPath);
220      }
221    } catch (KeeperException e) {
222      LOG.warn("Got error while removing replicator", e);
223    }
224  }
225
226  @Override
227  public void removeAllQueues() {
228    try {
229      ZKUtil.deleteNodeRecursively(this.zookeeper, this.myQueuesZnode);
230    } catch (KeeperException e) {
231      // if the znode is already expired, don't bother going further
232      if (e instanceof KeeperException.SessionExpiredException) {
233        return;
234      }
235      this.abortable.abort("Failed to delete replication queues for region server: "
236          + this.myQueuesZnode, e);
237    }
238  }
239
240  @Override
241  public List<String> getLogsInQueue(String queueId) {
242    String znode = ZNodePaths.joinZNode(this.myQueuesZnode, queueId);
243    List<String> result = null;
244    try {
245      result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
246    } catch (KeeperException e) {
247      this.abortable.abort("Failed to get list of wals for queueId=" + queueId, e);
248    }
249    return result;
250  }
251
252  @Override
253  public List<String> getAllQueues() {
254    List<String> listOfQueues = null;
255    try {
256      listOfQueues = ZKUtil.listChildrenNoWatch(this.zookeeper, this.myQueuesZnode);
257    } catch (KeeperException e) {
258      this.abortable.abort("Failed to get a list of queues for region server: "
259          + this.myQueuesZnode, e);
260    }
261    return listOfQueues == null ? new ArrayList<>() : listOfQueues;
262  }
263
264  /**
265   * It "atomically" copies one peer's wals queue from another dead region server and returns them
266   * all sorted. The new peer id is equal to the old peer id appended with the dead server's znode.
267   * @param znode pertaining to the region server to copy the queues from
268   * @peerId peerId pertaining to the queue need to be copied
269   */
270  private Pair<String, SortedSet<String>> moveQueueUsingMulti(String znode, String peerId) {
271    try {
272      // hbase/replication/rs/deadrs
273      String deadRSZnodePath = ZNodePaths.joinZNode(this.queuesZNode, znode);
274      List<ZKUtilOp> listOfOps = new ArrayList<>();
275      ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
276
277      String newPeerId = peerId + "-" + znode;
278      String newPeerZnode = ZNodePaths.joinZNode(this.myQueuesZnode, newPeerId);
279      // check the logs queue for the old peer cluster
280      String oldClusterZnode = ZNodePaths.joinZNode(deadRSZnodePath, peerId);
281      List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode);
282
283      if (!peerExists(replicationQueueInfo.getPeerId())) {
284        LOG.warn("Peer " + replicationQueueInfo.getPeerId() +
285                " didn't exist, will move its queue to avoid the failure of multi op");
286        for (String wal : wals) {
287          String oldWalZnode = ZNodePaths.joinZNode(oldClusterZnode, wal);
288          listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode));
289        }
290        listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
291        ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
292        return null;
293      }
294
295      SortedSet<String> logQueue = new TreeSet<>();
296      if (wals == null || wals.isEmpty()) {
297        listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
298      } else {
299        // create the new cluster znode
300        ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY);
301        listOfOps.add(op);
302        // get the offset of the logs and set it to new znodes
303        for (String wal : wals) {
304          String oldWalZnode = ZNodePaths.joinZNode(oldClusterZnode, wal);
305          byte[] logOffset = ZKUtil.getData(this.zookeeper, oldWalZnode);
306          LOG.debug("Creating " + wal + " with data " + Bytes.toString(logOffset));
307          String newLogZnode = ZNodePaths.joinZNode(newPeerZnode, wal);
308          listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset));
309          listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode));
310          logQueue.add(wal);
311        }
312        // add delete op for peer
313        listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
314
315        if (LOG.isTraceEnabled())
316          LOG.trace(" The multi list size is: " + listOfOps.size());
317      }
318      ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
319
320      LOG.info("Atomically moved " + znode + "/" + peerId + "'s WALs to my queue");
321      return new Pair<>(newPeerId, logQueue);
322    } catch (KeeperException e) {
323      // Multi call failed; it looks like some other regionserver took away the logs.
324      LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
325    } catch (InterruptedException e) {
326      LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
327      Thread.currentThread().interrupt();
328    }
329    return null;
330  }
331
332  @Override
333  public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs)
334      throws ReplicationException {
335    String peerZnode = ZNodePaths.joinZNode(this.hfileRefsZNode, peerId);
336    boolean debugEnabled = LOG.isDebugEnabled();
337    if (debugEnabled) {
338      LOG.debug("Adding hfile references " + pairs + " in queue " + peerZnode);
339    }
340
341    int size = pairs.size();
342    List<ZKUtilOp> listOfOps = new ArrayList<>(size);
343
344    for (int i = 0; i < size; i++) {
345      listOfOps.add(ZKUtilOp.createAndFailSilent(
346        ZNodePaths.joinZNode(peerZnode, pairs.get(i).getSecond().getName()),
347        HConstants.EMPTY_BYTE_ARRAY));
348    }
349    if (debugEnabled) {
350      LOG.debug(" The multi list size for adding hfile references in zk for node " + peerZnode
351          + " is " + listOfOps.size());
352    }
353    try {
354      ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true);
355    } catch (KeeperException e) {
356      throw new ReplicationException("Failed to create hfile reference znode=" + e.getPath(), e);
357    }
358  }
359
360  @Override
361  public void removeHFileRefs(String peerId, List<String> files) {
362    String peerZnode = ZNodePaths.joinZNode(this.hfileRefsZNode, peerId);
363    boolean debugEnabled = LOG.isDebugEnabled();
364    if (debugEnabled) {
365      LOG.debug("Removing hfile references " + files + " from queue " + peerZnode);
366    }
367
368    int size = files.size();
369    List<ZKUtilOp> listOfOps = new ArrayList<>(size);
370
371    for (int i = 0; i < size; i++) {
372      listOfOps.add(ZKUtilOp.deleteNodeFailSilent(ZNodePaths.joinZNode(peerZnode, files.get(i))));
373    }
374    if (debugEnabled) {
375      LOG.debug(" The multi list size for removing hfile references in zk for node " + peerZnode
376          + " is " + listOfOps.size());
377    }
378    try {
379      ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true);
380    } catch (KeeperException e) {
381      LOG.error("Failed to remove hfile reference znode=" + e.getPath(), e);
382    }
383  }
384
385  @Override
386  public void addPeerToHFileRefs(String peerId) throws ReplicationException {
387    String peerZnode = ZNodePaths.joinZNode(this.hfileRefsZNode, peerId);
388    try {
389      if (ZKUtil.checkExists(this.zookeeper, peerZnode) == -1) {
390        LOG.info("Adding peer " + peerId + " to hfile reference queue.");
391        ZKUtil.createWithParents(this.zookeeper, peerZnode);
392      }
393    } catch (KeeperException e) {
394      throw new ReplicationException("Failed to add peer " + peerId + " to hfile reference queue.",
395          e);
396    }
397  }
398
399  @Override
400  public void removePeerFromHFileRefs(String peerId) {
401    final String peerZnode = ZNodePaths.joinZNode(this.hfileRefsZNode, peerId);
402    try {
403      if (ZKUtil.checkExists(this.zookeeper, peerZnode) == -1) {
404        if (LOG.isDebugEnabled()) {
405          LOG.debug("Peer " + peerZnode + " not found in hfile reference queue.");
406        }
407        return;
408      } else {
409        LOG.info("Removing peer " + peerZnode + " from hfile reference queue.");
410        ZKUtil.deleteNodeRecursively(this.zookeeper, peerZnode);
411      }
412    } catch (KeeperException e) {
413      LOG.error("Ignoring the exception to remove peer " + peerId + " from hfile reference queue.",
414        e);
415    }
416  }
417}