001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.HashMap;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Map;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.ServerName;
029import org.apache.hadoop.hbase.util.Pair;
030import org.apache.hadoop.hbase.zookeeper.ZKUtil;
031import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
032import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.apache.zookeeper.KeeperException;
035
036import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
037
038/**
039 * Just retain a small set of the methods for the old zookeeper based replication queue storage, for
040 * migrating.
041 */
042@InterfaceAudience.Private
043public class ZKReplicationQueueStorageForMigration extends ZKReplicationStorageBase {
044
045  public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY =
046    "zookeeper.znode.replication.hfile.refs";
047  public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs";
048
049  public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY =
050    "zookeeper.znode.replication.regions";
051  public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT = "regions";
052
053  /**
054   * The name of the znode that contains all replication queues
055   */
056  private final String queuesZNode;
057
058  /**
059   * The name of the znode that contains queues of hfile references to be replicated
060   */
061  private final String hfileRefsZNode;
062
063  private final String regionsZNode;
064
065  public ZKReplicationQueueStorageForMigration(ZKWatcher zookeeper, Configuration conf) {
066    super(zookeeper, conf);
067    String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
068    String hfileRefsZNodeName = conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
069      ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT);
070    this.queuesZNode = ZNodePaths.joinZNode(replicationZNode, queuesZNodeName);
071    this.hfileRefsZNode = ZNodePaths.joinZNode(replicationZNode, hfileRefsZNodeName);
072    this.regionsZNode = ZNodePaths.joinZNode(replicationZNode, conf
073      .get(ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY, ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT));
074  }
075
076  public interface MigrationIterator<T> {
077
078    T next() throws Exception;
079  }
080
081  @SuppressWarnings("rawtypes")
082  private static final MigrationIterator EMPTY_ITER = new MigrationIterator() {
083
084    @Override
085    public Object next() {
086      return null;
087    }
088  };
089
090  public static final class ZkReplicationQueueData {
091
092    private final ReplicationQueueId queueId;
093
094    private final Map<String, Long> walOffsets;
095
096    public ZkReplicationQueueData(ReplicationQueueId queueId, Map<String, Long> walOffsets) {
097      this.queueId = queueId;
098      this.walOffsets = walOffsets;
099    }
100
101    public ReplicationQueueId getQueueId() {
102      return queueId;
103    }
104
105    public Map<String, Long> getWalOffsets() {
106      return walOffsets;
107    }
108  }
109
110  private String getRsNode(ServerName serverName) {
111    return ZNodePaths.joinZNode(queuesZNode, serverName.getServerName());
112  }
113
114  private String getQueueNode(ServerName serverName, String queueId) {
115    return ZNodePaths.joinZNode(getRsNode(serverName), queueId);
116  }
117
118  private String getFileNode(String queueNode, String fileName) {
119    return ZNodePaths.joinZNode(queueNode, fileName);
120  }
121
122  private String getFileNode(ServerName serverName, String queueId, String fileName) {
123    return getFileNode(getQueueNode(serverName, queueId), fileName);
124  }
125
126  @SuppressWarnings("unchecked")
127  public MigrationIterator<Pair<ServerName, List<ZkReplicationQueueData>>> listAllQueues()
128    throws KeeperException {
129    List<String> replicators = ZKUtil.listChildrenNoWatch(zookeeper, queuesZNode);
130    if (replicators == null || replicators.isEmpty()) {
131      ZKUtil.deleteNodeRecursively(zookeeper, queuesZNode);
132      return EMPTY_ITER;
133    }
134    Iterator<String> iter = replicators.iterator();
135    return new MigrationIterator<Pair<ServerName, List<ZkReplicationQueueData>>>() {
136
137      private ServerName previousServerName;
138
139      @Override
140      public Pair<ServerName, List<ZkReplicationQueueData>> next() throws Exception {
141        if (previousServerName != null) {
142          ZKUtil.deleteNodeRecursively(zookeeper, getRsNode(previousServerName));
143        }
144        if (!iter.hasNext()) {
145          ZKUtil.deleteNodeRecursively(zookeeper, queuesZNode);
146          return null;
147        }
148        String replicator = iter.next();
149        ServerName serverName = ServerName.parseServerName(replicator);
150        previousServerName = serverName;
151        List<String> queueIdList = ZKUtil.listChildrenNoWatch(zookeeper, getRsNode(serverName));
152        if (queueIdList == null || queueIdList.isEmpty()) {
153          return Pair.newPair(serverName, Collections.emptyList());
154        }
155        List<ZkReplicationQueueData> queueDataList = new ArrayList<>(queueIdList.size());
156        for (String queueIdStr : queueIdList) {
157          ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueIdStr);
158          ReplicationQueueId queueId;
159          if (queueInfo.getDeadRegionServers().isEmpty()) {
160            queueId = new ReplicationQueueId(serverName, queueInfo.getPeerId());
161          } else {
162            queueId = new ReplicationQueueId(serverName, queueInfo.getPeerId(),
163              queueInfo.getDeadRegionServers().get(0));
164          }
165          List<String> wals =
166            ZKUtil.listChildrenNoWatch(zookeeper, getQueueNode(serverName, queueIdStr));
167          ZkReplicationQueueData queueData;
168          if (wals == null || wals.isEmpty()) {
169            queueData = new ZkReplicationQueueData(queueId, Collections.emptyMap());
170          } else {
171            Map<String, Long> walOffsets = new HashMap<>();
172            for (String wal : wals) {
173              byte[] data = ZKUtil.getData(zookeeper, getFileNode(serverName, queueIdStr, wal));
174              if (data == null || data.length == 0) {
175                walOffsets.put(wal, 0L);
176              } else {
177                walOffsets.put(wal, ZKUtil.parseWALPositionFrom(data));
178              }
179            }
180            queueData = new ZkReplicationQueueData(queueId, walOffsets);
181          }
182          queueDataList.add(queueData);
183        }
184        return Pair.newPair(serverName, queueDataList);
185      }
186    };
187  }
188
189  public static final class ZkLastPushedSeqId {
190
191    private final String encodedRegionName;
192
193    private final String peerId;
194
195    private final long lastPushedSeqId;
196
197    ZkLastPushedSeqId(String encodedRegionName, String peerId, long lastPushedSeqId) {
198      this.encodedRegionName = encodedRegionName;
199      this.peerId = peerId;
200      this.lastPushedSeqId = lastPushedSeqId;
201    }
202
203    public String getEncodedRegionName() {
204      return encodedRegionName;
205    }
206
207    public String getPeerId() {
208      return peerId;
209    }
210
211    public long getLastPushedSeqId() {
212      return lastPushedSeqId;
213    }
214
215  }
216
217  @SuppressWarnings("unchecked")
218  public MigrationIterator<List<ZkLastPushedSeqId>> listAllLastPushedSeqIds()
219    throws KeeperException {
220    List<String> level1Prefixs = ZKUtil.listChildrenNoWatch(zookeeper, regionsZNode);
221    if (level1Prefixs == null || level1Prefixs.isEmpty()) {
222      ZKUtil.deleteNodeRecursively(zookeeper, regionsZNode);
223      return EMPTY_ITER;
224    }
225    Iterator<String> level1Iter = level1Prefixs.iterator();
226    return new MigrationIterator<List<ZkLastPushedSeqId>>() {
227
228      private String level1Prefix;
229
230      private Iterator<String> level2Iter;
231
232      private String level2Prefix;
233
234      @Override
235      public List<ZkLastPushedSeqId> next() throws Exception {
236        for (;;) {
237          if (level2Iter == null || !level2Iter.hasNext()) {
238            if (!level1Iter.hasNext()) {
239              ZKUtil.deleteNodeRecursively(zookeeper, regionsZNode);
240              return null;
241            }
242            if (level1Prefix != null) {
243              // this will also delete the previous level2Prefix which is under this level1Prefix
244              ZKUtil.deleteNodeRecursively(zookeeper,
245                ZNodePaths.joinZNode(regionsZNode, level1Prefix));
246            }
247            level1Prefix = level1Iter.next();
248            List<String> level2Prefixes = ZKUtil.listChildrenNoWatch(zookeeper,
249              ZNodePaths.joinZNode(regionsZNode, level1Prefix));
250            if (level2Prefixes != null) {
251              level2Iter = level2Prefixes.iterator();
252              // reset level2Prefix as we have switched level1Prefix, otherwise the below delete
253              // level2Prefix section will delete the znode with this level2Prefix under the new
254              // level1Prefix
255              level2Prefix = null;
256            }
257          } else {
258            if (level2Prefix != null) {
259              ZKUtil.deleteNodeRecursively(zookeeper,
260                ZNodePaths.joinZNode(regionsZNode, level1Prefix, level2Prefix));
261            }
262            level2Prefix = level2Iter.next();
263            List<String> encodedRegionNameAndPeerIds = ZKUtil.listChildrenNoWatch(zookeeper,
264              ZNodePaths.joinZNode(regionsZNode, level1Prefix, level2Prefix));
265            if (encodedRegionNameAndPeerIds == null || encodedRegionNameAndPeerIds.isEmpty()) {
266              return Collections.emptyList();
267            }
268            List<ZkLastPushedSeqId> lastPushedSeqIds = new ArrayList<>();
269            for (String encodedRegionNameAndPeerId : encodedRegionNameAndPeerIds) {
270              byte[] data = ZKUtil.getData(zookeeper, ZNodePaths.joinZNode(regionsZNode,
271                level1Prefix, level2Prefix, encodedRegionNameAndPeerId));
272              long lastPushedSeqId = ZKUtil.parseWALPositionFrom(data);
273              Iterator<String> iter = Splitter.on('-').split(encodedRegionNameAndPeerId).iterator();
274              String encodedRegionName = level1Prefix + level2Prefix + iter.next();
275              String peerId = iter.next();
276              lastPushedSeqIds
277                .add(new ZkLastPushedSeqId(encodedRegionName, peerId, lastPushedSeqId));
278            }
279            return Collections.unmodifiableList(lastPushedSeqIds);
280          }
281        }
282      }
283    };
284  }
285
286  private String getHFileRefsPeerNode(String peerId) {
287    return ZNodePaths.joinZNode(hfileRefsZNode, peerId);
288  }
289
290  /**
291   * Pair&lt;PeerId, List&lt;HFileRefs&gt;&gt;
292   */
293  @SuppressWarnings("unchecked")
294  public MigrationIterator<Pair<String, List<String>>> listAllHFileRefs() throws KeeperException {
295    List<String> peerIds = ZKUtil.listChildrenNoWatch(zookeeper, hfileRefsZNode);
296    if (peerIds == null || peerIds.isEmpty()) {
297      ZKUtil.deleteNodeRecursively(zookeeper, hfileRefsZNode);
298      return EMPTY_ITER;
299    }
300    Iterator<String> iter = peerIds.iterator();
301    return new MigrationIterator<Pair<String, List<String>>>() {
302
303      private String previousPeerId;
304
305      @Override
306      public Pair<String, List<String>> next() throws KeeperException {
307        if (previousPeerId != null) {
308          ZKUtil.deleteNodeRecursively(zookeeper, getHFileRefsPeerNode(previousPeerId));
309        }
310        if (!iter.hasNext()) {
311          ZKUtil.deleteNodeRecursively(zookeeper, hfileRefsZNode);
312          return null;
313        }
314        String peerId = iter.next();
315        List<String> refs = ZKUtil.listChildrenNoWatch(zookeeper, getHFileRefsPeerNode(peerId));
316        previousPeerId = peerId;
317        return Pair.newPair(peerId, refs != null ? refs : Collections.emptyList());
318      }
319    };
320  }
321
322  public boolean hasData() throws KeeperException {
323    return ZKUtil.checkExists(zookeeper, queuesZNode) != -1
324      || ZKUtil.checkExists(zookeeper, regionsZNode) != -1
325      || ZKUtil.checkExists(zookeeper, hfileRefsZNode) != -1;
326  }
327
328  public void deleteAllData() throws KeeperException {
329    ZKUtil.deleteNodeRecursively(zookeeper, queuesZNode);
330    ZKUtil.deleteNodeRecursively(zookeeper, regionsZNode);
331    ZKUtil.deleteNodeRecursively(zookeeper, hfileRefsZNode);
332  }
333
334  @RestrictedApi(explanation = "Should only be called in tests", link = "",
335      allowedOnPath = ".*/src/test/.*")
336  String getQueuesZNode() {
337    return queuesZNode;
338  }
339
340  @RestrictedApi(explanation = "Should only be called in tests", link = "",
341      allowedOnPath = ".*/src/test/.*")
342  String getHfileRefsZNode() {
343    return hfileRefsZNode;
344  }
345
346  @RestrictedApi(explanation = "Should only be called in tests", link = "",
347      allowedOnPath = ".*/src/test/.*")
348  String getRegionsZNode() {
349    return regionsZNode;
350  }
351}