001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.HashMap; 024import java.util.Iterator; 025import java.util.List; 026import java.util.Map; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.ServerName; 029import org.apache.hadoop.hbase.util.Pair; 030import org.apache.hadoop.hbase.zookeeper.ZKUtil; 031import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 032import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.apache.zookeeper.KeeperException; 035 036import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 037 038/** 039 * Just retain a small set of the methods for the old zookeeper based replication queue storage, for 040 * migrating. 041 */ 042@InterfaceAudience.Private 043public class ZKReplicationQueueStorageForMigration extends ZKReplicationStorageBase { 044 045 public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY = 046 "zookeeper.znode.replication.hfile.refs"; 047 public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs"; 048 049 public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY = 050 "zookeeper.znode.replication.regions"; 051 public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT = "regions"; 052 053 /** 054 * The name of the znode that contains all replication queues 055 */ 056 private final String queuesZNode; 057 058 /** 059 * The name of the znode that contains queues of hfile references to be replicated 060 */ 061 private final String hfileRefsZNode; 062 063 private final String regionsZNode; 064 065 public ZKReplicationQueueStorageForMigration(ZKWatcher zookeeper, Configuration conf) { 066 super(zookeeper, conf); 067 String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs"); 068 String hfileRefsZNodeName = conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY, 069 ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT); 070 this.queuesZNode = ZNodePaths.joinZNode(replicationZNode, queuesZNodeName); 071 this.hfileRefsZNode = ZNodePaths.joinZNode(replicationZNode, hfileRefsZNodeName); 072 this.regionsZNode = ZNodePaths.joinZNode(replicationZNode, conf 073 .get(ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY, ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT)); 074 } 075 076 public interface MigrationIterator<T> { 077 078 T next() throws Exception; 079 } 080 081 @SuppressWarnings("rawtypes") 082 private static final MigrationIterator EMPTY_ITER = new MigrationIterator() { 083 084 @Override 085 public Object next() { 086 return null; 087 } 088 }; 089 090 public static final class ZkReplicationQueueData { 091 092 private final ReplicationQueueId queueId; 093 094 private final Map<String, Long> walOffsets; 095 096 public ZkReplicationQueueData(ReplicationQueueId queueId, Map<String, Long> walOffsets) { 097 this.queueId = queueId; 098 this.walOffsets = walOffsets; 099 } 100 101 public ReplicationQueueId getQueueId() { 102 return queueId; 103 } 104 105 public Map<String, Long> getWalOffsets() { 106 return walOffsets; 107 } 108 } 109 110 private String getRsNode(ServerName serverName) { 111 return ZNodePaths.joinZNode(queuesZNode, serverName.getServerName()); 112 } 113 114 private String getQueueNode(ServerName serverName, String queueId) { 115 return ZNodePaths.joinZNode(getRsNode(serverName), queueId); 116 } 117 118 private String getFileNode(String queueNode, String fileName) { 119 return ZNodePaths.joinZNode(queueNode, fileName); 120 } 121 122 private String getFileNode(ServerName serverName, String queueId, String fileName) { 123 return getFileNode(getQueueNode(serverName, queueId), fileName); 124 } 125 126 @SuppressWarnings("unchecked") 127 public MigrationIterator<Pair<ServerName, List<ZkReplicationQueueData>>> listAllQueues() 128 throws KeeperException { 129 List<String> replicators = ZKUtil.listChildrenNoWatch(zookeeper, queuesZNode); 130 if (replicators == null || replicators.isEmpty()) { 131 ZKUtil.deleteNodeRecursively(zookeeper, queuesZNode); 132 return EMPTY_ITER; 133 } 134 Iterator<String> iter = replicators.iterator(); 135 return new MigrationIterator<Pair<ServerName, List<ZkReplicationQueueData>>>() { 136 137 private ServerName previousServerName; 138 139 @Override 140 public Pair<ServerName, List<ZkReplicationQueueData>> next() throws Exception { 141 if (previousServerName != null) { 142 ZKUtil.deleteNodeRecursively(zookeeper, getRsNode(previousServerName)); 143 } 144 if (!iter.hasNext()) { 145 ZKUtil.deleteNodeRecursively(zookeeper, queuesZNode); 146 return null; 147 } 148 String replicator = iter.next(); 149 ServerName serverName = ServerName.parseServerName(replicator); 150 previousServerName = serverName; 151 List<String> queueIdList = ZKUtil.listChildrenNoWatch(zookeeper, getRsNode(serverName)); 152 if (queueIdList == null || queueIdList.isEmpty()) { 153 return Pair.newPair(serverName, Collections.emptyList()); 154 } 155 List<ZkReplicationQueueData> queueDataList = new ArrayList<>(queueIdList.size()); 156 for (String queueIdStr : queueIdList) { 157 ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueIdStr); 158 ReplicationQueueId queueId; 159 if (queueInfo.getDeadRegionServers().isEmpty()) { 160 queueId = new ReplicationQueueId(serverName, queueInfo.getPeerId()); 161 } else { 162 queueId = new ReplicationQueueId(serverName, queueInfo.getPeerId(), 163 queueInfo.getDeadRegionServers().get(0)); 164 } 165 List<String> wals = 166 ZKUtil.listChildrenNoWatch(zookeeper, getQueueNode(serverName, queueIdStr)); 167 ZkReplicationQueueData queueData; 168 if (wals == null || wals.isEmpty()) { 169 queueData = new ZkReplicationQueueData(queueId, Collections.emptyMap()); 170 } else { 171 Map<String, Long> walOffsets = new HashMap<>(); 172 for (String wal : wals) { 173 byte[] data = ZKUtil.getData(zookeeper, getFileNode(serverName, queueIdStr, wal)); 174 if (data == null || data.length == 0) { 175 walOffsets.put(wal, 0L); 176 } else { 177 walOffsets.put(wal, ZKUtil.parseWALPositionFrom(data)); 178 } 179 } 180 queueData = new ZkReplicationQueueData(queueId, walOffsets); 181 } 182 queueDataList.add(queueData); 183 } 184 return Pair.newPair(serverName, queueDataList); 185 } 186 }; 187 } 188 189 public static final class ZkLastPushedSeqId { 190 191 private final String encodedRegionName; 192 193 private final String peerId; 194 195 private final long lastPushedSeqId; 196 197 ZkLastPushedSeqId(String encodedRegionName, String peerId, long lastPushedSeqId) { 198 this.encodedRegionName = encodedRegionName; 199 this.peerId = peerId; 200 this.lastPushedSeqId = lastPushedSeqId; 201 } 202 203 public String getEncodedRegionName() { 204 return encodedRegionName; 205 } 206 207 public String getPeerId() { 208 return peerId; 209 } 210 211 public long getLastPushedSeqId() { 212 return lastPushedSeqId; 213 } 214 215 } 216 217 @SuppressWarnings("unchecked") 218 public MigrationIterator<List<ZkLastPushedSeqId>> listAllLastPushedSeqIds() 219 throws KeeperException { 220 List<String> level1Prefixs = ZKUtil.listChildrenNoWatch(zookeeper, regionsZNode); 221 if (level1Prefixs == null || level1Prefixs.isEmpty()) { 222 ZKUtil.deleteNodeRecursively(zookeeper, regionsZNode); 223 return EMPTY_ITER; 224 } 225 Iterator<String> level1Iter = level1Prefixs.iterator(); 226 return new MigrationIterator<List<ZkLastPushedSeqId>>() { 227 228 private String level1Prefix; 229 230 private Iterator<String> level2Iter; 231 232 private String level2Prefix; 233 234 @Override 235 public List<ZkLastPushedSeqId> next() throws Exception { 236 for (;;) { 237 if (level2Iter == null || !level2Iter.hasNext()) { 238 if (!level1Iter.hasNext()) { 239 ZKUtil.deleteNodeRecursively(zookeeper, regionsZNode); 240 return null; 241 } 242 if (level1Prefix != null) { 243 // this will also delete the previous level2Prefix which is under this level1Prefix 244 ZKUtil.deleteNodeRecursively(zookeeper, 245 ZNodePaths.joinZNode(regionsZNode, level1Prefix)); 246 } 247 level1Prefix = level1Iter.next(); 248 List<String> level2Prefixes = ZKUtil.listChildrenNoWatch(zookeeper, 249 ZNodePaths.joinZNode(regionsZNode, level1Prefix)); 250 if (level2Prefixes != null) { 251 level2Iter = level2Prefixes.iterator(); 252 // reset level2Prefix as we have switched level1Prefix, otherwise the below delete 253 // level2Prefix section will delete the znode with this level2Prefix under the new 254 // level1Prefix 255 level2Prefix = null; 256 } 257 } else { 258 if (level2Prefix != null) { 259 ZKUtil.deleteNodeRecursively(zookeeper, 260 ZNodePaths.joinZNode(regionsZNode, level1Prefix, level2Prefix)); 261 } 262 level2Prefix = level2Iter.next(); 263 List<String> encodedRegionNameAndPeerIds = ZKUtil.listChildrenNoWatch(zookeeper, 264 ZNodePaths.joinZNode(regionsZNode, level1Prefix, level2Prefix)); 265 if (encodedRegionNameAndPeerIds == null || encodedRegionNameAndPeerIds.isEmpty()) { 266 return Collections.emptyList(); 267 } 268 List<ZkLastPushedSeqId> lastPushedSeqIds = new ArrayList<>(); 269 for (String encodedRegionNameAndPeerId : encodedRegionNameAndPeerIds) { 270 byte[] data = ZKUtil.getData(zookeeper, ZNodePaths.joinZNode(regionsZNode, 271 level1Prefix, level2Prefix, encodedRegionNameAndPeerId)); 272 long lastPushedSeqId = ZKUtil.parseWALPositionFrom(data); 273 Iterator<String> iter = Splitter.on('-').split(encodedRegionNameAndPeerId).iterator(); 274 String encodedRegionName = level1Prefix + level2Prefix + iter.next(); 275 String peerId = iter.next(); 276 lastPushedSeqIds 277 .add(new ZkLastPushedSeqId(encodedRegionName, peerId, lastPushedSeqId)); 278 } 279 return Collections.unmodifiableList(lastPushedSeqIds); 280 } 281 } 282 } 283 }; 284 } 285 286 private String getHFileRefsPeerNode(String peerId) { 287 return ZNodePaths.joinZNode(hfileRefsZNode, peerId); 288 } 289 290 /** 291 * Pair<PeerId, List<HFileRefs>> 292 */ 293 @SuppressWarnings("unchecked") 294 public MigrationIterator<Pair<String, List<String>>> listAllHFileRefs() throws KeeperException { 295 List<String> peerIds = ZKUtil.listChildrenNoWatch(zookeeper, hfileRefsZNode); 296 if (peerIds == null || peerIds.isEmpty()) { 297 ZKUtil.deleteNodeRecursively(zookeeper, hfileRefsZNode); 298 return EMPTY_ITER; 299 } 300 Iterator<String> iter = peerIds.iterator(); 301 return new MigrationIterator<Pair<String, List<String>>>() { 302 303 private String previousPeerId; 304 305 @Override 306 public Pair<String, List<String>> next() throws KeeperException { 307 if (previousPeerId != null) { 308 ZKUtil.deleteNodeRecursively(zookeeper, getHFileRefsPeerNode(previousPeerId)); 309 } 310 if (!iter.hasNext()) { 311 ZKUtil.deleteNodeRecursively(zookeeper, hfileRefsZNode); 312 return null; 313 } 314 String peerId = iter.next(); 315 List<String> refs = ZKUtil.listChildrenNoWatch(zookeeper, getHFileRefsPeerNode(peerId)); 316 previousPeerId = peerId; 317 return Pair.newPair(peerId, refs != null ? refs : Collections.emptyList()); 318 } 319 }; 320 } 321 322 public boolean hasData() throws KeeperException { 323 return ZKUtil.checkExists(zookeeper, queuesZNode) != -1 324 || ZKUtil.checkExists(zookeeper, regionsZNode) != -1 325 || ZKUtil.checkExists(zookeeper, hfileRefsZNode) != -1; 326 } 327 328 public void deleteAllData() throws KeeperException { 329 ZKUtil.deleteNodeRecursively(zookeeper, queuesZNode); 330 ZKUtil.deleteNodeRecursively(zookeeper, regionsZNode); 331 ZKUtil.deleteNodeRecursively(zookeeper, hfileRefsZNode); 332 } 333 334 @RestrictedApi(explanation = "Should only be called in tests", link = "", 335 allowedOnPath = ".*/src/test/.*") 336 String getQueuesZNode() { 337 return queuesZNode; 338 } 339 340 @RestrictedApi(explanation = "Should only be called in tests", link = "", 341 allowedOnPath = ".*/src/test/.*") 342 String getHfileRefsZNode() { 343 return hfileRefsZNode; 344 } 345 346 @RestrictedApi(explanation = "Should only be called in tests", link = "", 347 allowedOnPath = ".*/src/test/.*") 348 String getRegionsZNode() { 349 return regionsZNode; 350 } 351}