001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.HashMap;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Map;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.ServerName;
029import org.apache.hadoop.hbase.util.Pair;
030import org.apache.hadoop.hbase.zookeeper.ZKUtil;
031import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
032import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.apache.zookeeper.KeeperException;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
039import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
040
041/**
042 * Just retain a small set of the methods for the old zookeeper based replication queue storage, for
043 * migrating.
044 */
045@InterfaceAudience.Private
046public class ZKReplicationQueueStorageForMigration extends ZKReplicationStorageBase {
047
048  private static final Logger LOG =
049    LoggerFactory.getLogger(ZKReplicationQueueStorageForMigration.class);
050
051  public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY =
052    "zookeeper.znode.replication.hfile.refs";
053  public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs";
054
055  public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY =
056    "zookeeper.znode.replication.regions";
057  public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT = "regions";
058
059  /**
060   * The name of the znode that contains all replication queues
061   */
062  private final String queuesZNode;
063
064  /**
065   * The name of the znode that contains queues of hfile references to be replicated
066   */
067  private final String hfileRefsZNode;
068
069  private final String regionsZNode;
070
071  public ZKReplicationQueueStorageForMigration(ZKWatcher zookeeper, Configuration conf) {
072    super(zookeeper, conf);
073    String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
074    String hfileRefsZNodeName = conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
075      ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT);
076    this.queuesZNode = ZNodePaths.joinZNode(replicationZNode, queuesZNodeName);
077    this.hfileRefsZNode = ZNodePaths.joinZNode(replicationZNode, hfileRefsZNodeName);
078    this.regionsZNode = ZNodePaths.joinZNode(replicationZNode, conf
079      .get(ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY, ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT));
080  }
081
082  public interface MigrationIterator<T> {
083
084    T next() throws Exception;
085  }
086
087  @SuppressWarnings("rawtypes")
088  private static final MigrationIterator EMPTY_ITER = new MigrationIterator() {
089
090    @Override
091    public Object next() {
092      return null;
093    }
094  };
095
096  public static final class ZkReplicationQueueData {
097
098    private final ReplicationQueueId queueId;
099
100    private final Map<String, Long> walOffsets;
101
102    public ZkReplicationQueueData(ReplicationQueueId queueId, Map<String, Long> walOffsets) {
103      this.queueId = queueId;
104      this.walOffsets = walOffsets;
105    }
106
107    public ReplicationQueueId getQueueId() {
108      return queueId;
109    }
110
111    public Map<String, Long> getWalOffsets() {
112      return walOffsets;
113    }
114  }
115
116  private String getRsNode(ServerName serverName) {
117    return ZNodePaths.joinZNode(queuesZNode, serverName.getServerName());
118  }
119
120  private String getQueueNode(ServerName serverName, String queueId) {
121    return ZNodePaths.joinZNode(getRsNode(serverName), queueId);
122  }
123
124  private String getFileNode(String queueNode, String fileName) {
125    return ZNodePaths.joinZNode(queueNode, fileName);
126  }
127
128  private String getFileNode(ServerName serverName, String queueId, String fileName) {
129    return getFileNode(getQueueNode(serverName, queueId), fileName);
130  }
131
132  static final String REGION_REPLICA_REPLICATION_PEER = "region_replica_replication";
133
134  @SuppressWarnings("unchecked")
135  public MigrationIterator<Pair<ServerName, List<ZkReplicationQueueData>>> listAllQueues()
136    throws KeeperException {
137    List<String> replicators = ZKUtil.listChildrenNoWatch(zookeeper, queuesZNode);
138    if (replicators == null || replicators.isEmpty()) {
139      ZKUtil.deleteNodeRecursively(zookeeper, queuesZNode);
140      return EMPTY_ITER;
141    }
142    Iterator<String> iter = replicators.iterator();
143    return new MigrationIterator<Pair<ServerName, List<ZkReplicationQueueData>>>() {
144
145      private ServerName previousServerName;
146
147      private boolean hasRegionReplicaReplicationQueue;
148
149      private void cleanupQueuesWithoutRegionReplicaReplication(ServerName serverName)
150        throws Exception {
151        String rsZNode = getRsNode(serverName);
152        List<String> queueIdList = ZKUtil.listChildrenNoWatch(zookeeper, rsZNode);
153        if (CollectionUtils.isEmpty(queueIdList)) {
154          return;
155        }
156        for (String queueId : queueIdList) {
157          ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
158          if (!queueInfo.getPeerId().equals(REGION_REPLICA_REPLICATION_PEER)) {
159            ZKUtil.deleteNodeRecursively(zookeeper, getQueueNode(serverName, queueId));
160          }
161        }
162      }
163
164      @Override
165      public Pair<ServerName, List<ZkReplicationQueueData>> next() throws Exception {
166        if (previousServerName != null) {
167          if (hasRegionReplicaReplicationQueue) {
168            // if there are region_replica_replication queues, we can not delete it, just delete
169            // other queues, see HBASE-29169.
170            cleanupQueuesWithoutRegionReplicaReplication(previousServerName);
171          } else {
172            ZKUtil.deleteNodeRecursively(zookeeper, getRsNode(previousServerName));
173          }
174        }
175        if (!iter.hasNext()) {
176          // If there are region_replica_replication queues then we can not delete the data right
177          // now, otherwise we may crash the old region servers, see HBASE-29169.
178          // The migration procedure has a special step to cleanup everything
179          return null;
180        }
181        hasRegionReplicaReplicationQueue = false;
182        String replicator = iter.next();
183        ServerName serverName = ServerName.parseServerName(replicator);
184        previousServerName = serverName;
185        List<String> queueIdList = ZKUtil.listChildrenNoWatch(zookeeper, getRsNode(serverName));
186        if (CollectionUtils.isEmpty(queueIdList)) {
187          return Pair.newPair(serverName, Collections.emptyList());
188        }
189        List<ZkReplicationQueueData> queueDataList = new ArrayList<>(queueIdList.size());
190        for (String queueIdStr : queueIdList) {
191          ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueIdStr);
192          if (queueInfo.getPeerId().equals(REGION_REPLICA_REPLICATION_PEER)) {
193            // we do not need to migrate the data for this queue, skip
194            LOG.debug("Found region replica replication queue {}, skip", queueInfo);
195            hasRegionReplicaReplicationQueue = true;
196            continue;
197          }
198          ReplicationQueueId queueId;
199          if (queueInfo.getDeadRegionServers().isEmpty()) {
200            queueId = new ReplicationQueueId(serverName, queueInfo.getPeerId());
201          } else {
202            queueId = new ReplicationQueueId(serverName, queueInfo.getPeerId(),
203              queueInfo.getDeadRegionServers().get(0));
204          }
205          List<String> wals =
206            ZKUtil.listChildrenNoWatch(zookeeper, getQueueNode(serverName, queueIdStr));
207          ZkReplicationQueueData queueData;
208          if (wals == null || wals.isEmpty()) {
209            queueData = new ZkReplicationQueueData(queueId, Collections.emptyMap());
210          } else {
211            Map<String, Long> walOffsets = new HashMap<>();
212            for (String wal : wals) {
213              byte[] data = ZKUtil.getData(zookeeper, getFileNode(serverName, queueIdStr, wal));
214              if (data == null || data.length == 0) {
215                walOffsets.put(wal, 0L);
216              } else {
217                walOffsets.put(wal, ZKUtil.parseWALPositionFrom(data));
218              }
219            }
220            queueData = new ZkReplicationQueueData(queueId, walOffsets);
221          }
222          queueDataList.add(queueData);
223        }
224        return Pair.newPair(serverName, queueDataList);
225      }
226    };
227  }
228
229  public static final class ZkLastPushedSeqId {
230
231    private final String encodedRegionName;
232
233    private final String peerId;
234
235    private final long lastPushedSeqId;
236
237    ZkLastPushedSeqId(String encodedRegionName, String peerId, long lastPushedSeqId) {
238      this.encodedRegionName = encodedRegionName;
239      this.peerId = peerId;
240      this.lastPushedSeqId = lastPushedSeqId;
241    }
242
243    public String getEncodedRegionName() {
244      return encodedRegionName;
245    }
246
247    public String getPeerId() {
248      return peerId;
249    }
250
251    public long getLastPushedSeqId() {
252      return lastPushedSeqId;
253    }
254
255  }
256
257  @SuppressWarnings("unchecked")
258  public MigrationIterator<List<ZkLastPushedSeqId>> listAllLastPushedSeqIds()
259    throws KeeperException {
260    List<String> level1Prefixs = ZKUtil.listChildrenNoWatch(zookeeper, regionsZNode);
261    if (level1Prefixs == null || level1Prefixs.isEmpty()) {
262      ZKUtil.deleteNodeRecursively(zookeeper, regionsZNode);
263      return EMPTY_ITER;
264    }
265    Iterator<String> level1Iter = level1Prefixs.iterator();
266    return new MigrationIterator<List<ZkLastPushedSeqId>>() {
267
268      private String level1Prefix;
269
270      private Iterator<String> level2Iter;
271
272      private String level2Prefix;
273
274      @Override
275      public List<ZkLastPushedSeqId> next() throws Exception {
276        for (;;) {
277          if (level2Iter == null || !level2Iter.hasNext()) {
278            if (!level1Iter.hasNext()) {
279              ZKUtil.deleteNodeRecursively(zookeeper, regionsZNode);
280              return null;
281            }
282            if (level1Prefix != null) {
283              // this will also delete the previous level2Prefix which is under this level1Prefix
284              ZKUtil.deleteNodeRecursively(zookeeper,
285                ZNodePaths.joinZNode(regionsZNode, level1Prefix));
286            }
287            level1Prefix = level1Iter.next();
288            List<String> level2Prefixes = ZKUtil.listChildrenNoWatch(zookeeper,
289              ZNodePaths.joinZNode(regionsZNode, level1Prefix));
290            if (level2Prefixes != null) {
291              level2Iter = level2Prefixes.iterator();
292              // reset level2Prefix as we have switched level1Prefix, otherwise the below delete
293              // level2Prefix section will delete the znode with this level2Prefix under the new
294              // level1Prefix
295              level2Prefix = null;
296            }
297          } else {
298            if (level2Prefix != null) {
299              ZKUtil.deleteNodeRecursively(zookeeper,
300                ZNodePaths.joinZNode(regionsZNode, level1Prefix, level2Prefix));
301            }
302            level2Prefix = level2Iter.next();
303            List<String> encodedRegionNameAndPeerIds = ZKUtil.listChildrenNoWatch(zookeeper,
304              ZNodePaths.joinZNode(regionsZNode, level1Prefix, level2Prefix));
305            if (encodedRegionNameAndPeerIds == null || encodedRegionNameAndPeerIds.isEmpty()) {
306              return Collections.emptyList();
307            }
308            List<ZkLastPushedSeqId> lastPushedSeqIds = new ArrayList<>();
309            for (String encodedRegionNameAndPeerId : encodedRegionNameAndPeerIds) {
310              byte[] data = ZKUtil.getData(zookeeper, ZNodePaths.joinZNode(regionsZNode,
311                level1Prefix, level2Prefix, encodedRegionNameAndPeerId));
312              long lastPushedSeqId = ZKUtil.parseWALPositionFrom(data);
313              Iterator<String> iter = Splitter.on('-').split(encodedRegionNameAndPeerId).iterator();
314              String encodedRegionName = level1Prefix + level2Prefix + iter.next();
315              String peerId = iter.next();
316              lastPushedSeqIds
317                .add(new ZkLastPushedSeqId(encodedRegionName, peerId, lastPushedSeqId));
318            }
319            return Collections.unmodifiableList(lastPushedSeqIds);
320          }
321        }
322      }
323    };
324  }
325
326  private String getHFileRefsPeerNode(String peerId) {
327    return ZNodePaths.joinZNode(hfileRefsZNode, peerId);
328  }
329
330  /**
331   * Pair&lt;PeerId, List&lt;HFileRefs&gt;&gt;
332   */
333  @SuppressWarnings("unchecked")
334  public MigrationIterator<Pair<String, List<String>>> listAllHFileRefs() throws KeeperException {
335    List<String> peerIds = ZKUtil.listChildrenNoWatch(zookeeper, hfileRefsZNode);
336    if (peerIds == null || peerIds.isEmpty()) {
337      ZKUtil.deleteNodeRecursively(zookeeper, hfileRefsZNode);
338      return EMPTY_ITER;
339    }
340    Iterator<String> iter = peerIds.iterator();
341    return new MigrationIterator<Pair<String, List<String>>>() {
342
343      private String previousPeerId;
344
345      @Override
346      public Pair<String, List<String>> next() throws KeeperException {
347        if (previousPeerId != null) {
348          ZKUtil.deleteNodeRecursively(zookeeper, getHFileRefsPeerNode(previousPeerId));
349        }
350        if (!iter.hasNext()) {
351          ZKUtil.deleteNodeRecursively(zookeeper, hfileRefsZNode);
352          return null;
353        }
354        String peerId = iter.next();
355        List<String> refs = ZKUtil.listChildrenNoWatch(zookeeper, getHFileRefsPeerNode(peerId));
356        previousPeerId = peerId;
357        return Pair.newPair(peerId, refs != null ? refs : Collections.emptyList());
358      }
359    };
360  }
361
362  public boolean hasData() throws KeeperException {
363    return ZKUtil.checkExists(zookeeper, queuesZNode) != -1
364      || ZKUtil.checkExists(zookeeper, regionsZNode) != -1
365      || ZKUtil.checkExists(zookeeper, hfileRefsZNode) != -1;
366  }
367
368  public void deleteAllData() throws KeeperException {
369    ZKUtil.deleteNodeRecursively(zookeeper, queuesZNode);
370    ZKUtil.deleteNodeRecursively(zookeeper, regionsZNode);
371    ZKUtil.deleteNodeRecursively(zookeeper, hfileRefsZNode);
372  }
373
374  @RestrictedApi(explanation = "Should only be called in tests", link = "",
375      allowedOnPath = ".*/src/test/.*")
376  String getQueuesZNode() {
377    return queuesZNode;
378  }
379
380  @RestrictedApi(explanation = "Should only be called in tests", link = "",
381      allowedOnPath = ".*/src/test/.*")
382  String getHfileRefsZNode() {
383    return hfileRefsZNode;
384  }
385
386  @RestrictedApi(explanation = "Should only be called in tests", link = "",
387      allowedOnPath = ".*/src/test/.*")
388  String getRegionsZNode() {
389    return regionsZNode;
390  }
391}