001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import java.io.IOException; 021import java.util.Collections; 022import java.util.HashMap; 023import java.util.HashSet; 024import java.util.Iterator; 025import java.util.List; 026import java.util.Map; 027import java.util.NavigableMap; 028import java.util.Set; 029import java.util.TreeMap; 030import java.util.stream.Collectors; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileStatus; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.ServerName; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.ClientSideRegionScanner; 039import org.apache.hadoop.hbase.client.RegionInfo; 040import org.apache.hadoop.hbase.client.Result; 041import org.apache.hadoop.hbase.client.Scan; 042import org.apache.hadoop.hbase.client.TableDescriptor; 043import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; 044import org.apache.hadoop.hbase.replication.ReplicationException; 045import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; 046import org.apache.hadoop.hbase.replication.ReplicationQueueData; 047import org.apache.hadoop.hbase.replication.ReplicationQueueId; 048import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 049import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 050import org.apache.hadoop.hbase.replication.TableReplicationQueueStorage; 051import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.util.CommonFSUtils; 054import org.apache.hadoop.hbase.util.FSUtils; 055import org.apache.hadoop.hbase.util.Pair; 056import org.apache.yetus.audience.InterfaceAudience; 057 058import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 059import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 060 061@InterfaceAudience.Private 062public class OfflineTableReplicationQueueStorage implements ReplicationQueueStorage { 063 064 private final Map<ReplicationQueueId, Map<String, ReplicationGroupOffset>> offsets = 065 new HashMap<>(); 066 067 private final Map<String, Map<String, Long>> lastSequenceIds = new HashMap<>(); 068 069 private final Map<String, Set<String>> hfileRefs = new HashMap<>(); 070 071 private void loadRegionInfo(FileSystem fs, Path regionDir, 072 NavigableMap<byte[], RegionInfo> startKey2RegionInfo) throws IOException { 073 RegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir); 074 // TODO: we consider that the there will not be too many regions for hbase:replication table, so 075 // here we just iterate over all the regions to find out the overlapped ones. Can be optimized 076 // later. 077 Iterator<Map.Entry<byte[], RegionInfo>> iter = startKey2RegionInfo.entrySet().iterator(); 078 while (iter.hasNext()) { 079 Map.Entry<byte[], RegionInfo> entry = iter.next(); 080 if (hri.isOverlap(entry.getValue())) { 081 if (hri.getRegionId() > entry.getValue().getRegionId()) { 082 // we are newer, remove the old hri, we can not break here as if hri is a merged region, 083 // we need to remove all its parent regions. 084 iter.remove(); 085 } else { 086 // we are older, just return, skip the below add 087 return; 088 } 089 } 090 091 } 092 startKey2RegionInfo.put(hri.getStartKey(), hri); 093 } 094 095 private void loadOffsets(Result result) { 096 NavigableMap<byte[], byte[]> map = 097 result.getFamilyMap(TableReplicationQueueStorage.QUEUE_FAMILY); 098 if (map == null || map.isEmpty()) { 099 return; 100 } 101 Map<String, ReplicationGroupOffset> offsetMap = new HashMap<>(); 102 map.forEach((k, v) -> { 103 String walGroup = Bytes.toString(k); 104 ReplicationGroupOffset offset = ReplicationGroupOffset.parse(Bytes.toString(v)); 105 offsetMap.put(walGroup, offset); 106 }); 107 ReplicationQueueId queueId = ReplicationQueueId.parse(Bytes.toString(result.getRow())); 108 offsets.put(queueId, offsetMap); 109 } 110 111 private void loadLastSequenceIds(Result result) { 112 NavigableMap<byte[], byte[]> map = 113 result.getFamilyMap(TableReplicationQueueStorage.LAST_SEQUENCE_ID_FAMILY); 114 if (map == null || map.isEmpty()) { 115 return; 116 } 117 Map<String, Long> lastSeqIdMap = new HashMap<>(); 118 map.forEach((k, v) -> { 119 String encodedRegionName = Bytes.toString(k); 120 long lastSeqId = Bytes.toLong(v); 121 lastSeqIdMap.put(encodedRegionName, lastSeqId); 122 }); 123 String peerId = Bytes.toString(result.getRow()); 124 lastSequenceIds.put(peerId, lastSeqIdMap); 125 } 126 127 private void loadHFileRefs(Result result) { 128 NavigableMap<byte[], byte[]> map = 129 result.getFamilyMap(TableReplicationQueueStorage.HFILE_REF_FAMILY); 130 if (map == null || map.isEmpty()) { 131 return; 132 } 133 Set<String> refs = new HashSet<>(); 134 map.keySet().forEach(ref -> refs.add(Bytes.toString(ref))); 135 String peerId = Bytes.toString(result.getRow()); 136 hfileRefs.put(peerId, refs); 137 } 138 139 private void loadReplicationQueueData(Configuration conf, TableName tableName) 140 throws IOException { 141 Path rootDir = CommonFSUtils.getRootDir(conf); 142 Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName); 143 FileSystem fs = tableDir.getFileSystem(conf); 144 FileStatus[] regionDirs = 145 CommonFSUtils.listStatus(fs, tableDir, new FSUtils.RegionDirFilter(fs)); 146 if (regionDirs == null) { 147 return; 148 } 149 NavigableMap<byte[], RegionInfo> startKey2RegionInfo = new TreeMap<>(Bytes.BYTES_COMPARATOR); 150 for (FileStatus regionDir : regionDirs) { 151 loadRegionInfo(fs, regionDir.getPath(), startKey2RegionInfo); 152 } 153 TableDescriptor td = ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName); 154 for (RegionInfo hri : startKey2RegionInfo.values()) { 155 try (ClientSideRegionScanner scanner = 156 new ClientSideRegionScanner(conf, fs, rootDir, td, hri, new Scan(), null)) { 157 for (;;) { 158 Result result = scanner.next(); 159 if (result == null) { 160 break; 161 } 162 loadOffsets(result); 163 loadLastSequenceIds(result); 164 loadHFileRefs(result); 165 } 166 } 167 } 168 } 169 170 public OfflineTableReplicationQueueStorage(Configuration conf, TableName tableName) 171 throws IOException { 172 loadReplicationQueueData(conf, tableName); 173 } 174 175 @Override 176 public synchronized void setOffset(ReplicationQueueId queueId, String walGroup, 177 ReplicationGroupOffset offset, Map<String, Long> lastSeqIds) throws ReplicationException { 178 Map<String, ReplicationGroupOffset> offsetMap = offsets.get(queueId); 179 if (offsetMap == null) { 180 offsetMap = new HashMap<>(); 181 offsets.put(queueId, offsetMap); 182 } 183 offsetMap.put(walGroup, offset); 184 Map<String, Long> lastSeqIdsMap = lastSequenceIds.get(queueId.getPeerId()); 185 if (lastSeqIdsMap == null) { 186 lastSeqIdsMap = new HashMap<>(); 187 lastSequenceIds.put(queueId.getPeerId(), lastSeqIdsMap); 188 } 189 for (Map.Entry<String, Long> entry : lastSeqIds.entrySet()) { 190 Long oldSeqId = lastSeqIdsMap.get(entry.getKey()); 191 if (oldSeqId == null || oldSeqId < entry.getValue()) { 192 lastSeqIdsMap.put(entry.getKey(), entry.getValue()); 193 } 194 } 195 } 196 197 @Override 198 public synchronized Map<String, ReplicationGroupOffset> getOffsets(ReplicationQueueId queueId) 199 throws ReplicationException { 200 Map<String, ReplicationGroupOffset> offsetMap = offsets.get(queueId); 201 if (offsetMap == null) { 202 return Collections.emptyMap(); 203 } 204 return ImmutableMap.copyOf(offsetMap); 205 } 206 207 @Override 208 public synchronized List<ReplicationQueueId> listAllQueueIds(String peerId) 209 throws ReplicationException { 210 return offsets.keySet().stream().filter(rqi -> rqi.getPeerId().equals(peerId)) 211 .collect(Collectors.toList()); 212 } 213 214 @Override 215 public synchronized List<ReplicationQueueId> listAllQueueIds(ServerName serverName) 216 throws ReplicationException { 217 return offsets.keySet().stream().filter(rqi -> rqi.getServerName().equals(serverName)) 218 .collect(Collectors.toList()); 219 } 220 221 @Override 222 public synchronized List<ReplicationQueueId> listAllQueueIds(String peerId, ServerName serverName) 223 throws ReplicationException { 224 return offsets.keySet().stream() 225 .filter(rqi -> rqi.getPeerId().equals(peerId) && rqi.getServerName().equals(serverName)) 226 .collect(Collectors.toList()); 227 } 228 229 @Override 230 public synchronized List<ReplicationQueueData> listAllQueues() throws ReplicationException { 231 return offsets.entrySet().stream() 232 .map(e -> new ReplicationQueueData(e.getKey(), ImmutableMap.copyOf(e.getValue()))) 233 .collect(Collectors.toList()); 234 } 235 236 @Override 237 public synchronized List<ServerName> listAllReplicators() throws ReplicationException { 238 return offsets.keySet().stream().map(ReplicationQueueId::getServerName).distinct() 239 .collect(Collectors.toList()); 240 } 241 242 @Override 243 public synchronized Map<String, ReplicationGroupOffset> claimQueue(ReplicationQueueId queueId, 244 ServerName targetServerName) throws ReplicationException { 245 Map<String, ReplicationGroupOffset> offsetMap = offsets.remove(queueId); 246 if (offsetMap == null) { 247 return Collections.emptyMap(); 248 } 249 offsets.put(queueId.claim(targetServerName), offsetMap); 250 return ImmutableMap.copyOf(offsetMap); 251 } 252 253 @Override 254 public synchronized void removeQueue(ReplicationQueueId queueId) throws ReplicationException { 255 offsets.remove(queueId); 256 } 257 258 @Override 259 public synchronized void removeAllQueues(String peerId) throws ReplicationException { 260 Iterator<ReplicationQueueId> iter = offsets.keySet().iterator(); 261 while (iter.hasNext()) { 262 if (iter.next().getPeerId().equals(peerId)) { 263 iter.remove(); 264 } 265 } 266 } 267 268 @Override 269 public synchronized long getLastSequenceId(String encodedRegionName, String peerId) 270 throws ReplicationException { 271 Map<String, Long> lastSeqIdMap = lastSequenceIds.get(peerId); 272 if (lastSeqIdMap == null) { 273 return HConstants.NO_SEQNUM; 274 } 275 Long lastSeqId = lastSeqIdMap.get(encodedRegionName); 276 return lastSeqId != null ? lastSeqId.longValue() : HConstants.NO_SEQNUM; 277 } 278 279 @Override 280 public synchronized void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds) 281 throws ReplicationException { 282 Map<String, Long> lastSeqIdMap = lastSequenceIds.get(peerId); 283 if (lastSeqIdMap == null) { 284 lastSeqIdMap = new HashMap<>(); 285 lastSequenceIds.put(peerId, lastSeqIdMap); 286 } 287 lastSeqIdMap.putAll(lastSeqIds); 288 } 289 290 @Override 291 public synchronized void removeLastSequenceIds(String peerId) throws ReplicationException { 292 lastSequenceIds.remove(peerId); 293 } 294 295 @Override 296 public synchronized void removeLastSequenceIds(String peerId, List<String> encodedRegionNames) 297 throws ReplicationException { 298 Map<String, Long> lastSeqIdMap = lastSequenceIds.get(peerId); 299 if (lastSeqIdMap == null) { 300 return; 301 } 302 for (String encodedRegionName : encodedRegionNames) { 303 lastSeqIdMap.remove(encodedRegionName); 304 } 305 } 306 307 @Override 308 public synchronized void removePeerFromHFileRefs(String peerId) throws ReplicationException { 309 hfileRefs.remove(peerId); 310 } 311 312 @Override 313 public synchronized void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs) 314 throws ReplicationException { 315 Set<String> refs = hfileRefs.get(peerId); 316 if (refs == null) { 317 refs = new HashSet<>(); 318 hfileRefs.put(peerId, refs); 319 } 320 for (Pair<Path, Path> pair : pairs) { 321 refs.add(pair.getSecond().getName()); 322 } 323 } 324 325 @Override 326 public synchronized void removeHFileRefs(String peerId, List<String> files) 327 throws ReplicationException { 328 Set<String> refs = hfileRefs.get(peerId); 329 if (refs == null) { 330 return; 331 } 332 refs.removeAll(files); 333 } 334 335 @Override 336 public synchronized List<String> getAllPeersFromHFileRefsQueue() throws ReplicationException { 337 return ImmutableList.copyOf(hfileRefs.keySet()); 338 } 339 340 @Override 341 public synchronized List<String> getReplicableHFiles(String peerId) throws ReplicationException { 342 Set<String> refs = hfileRefs.get(peerId); 343 if (refs == null) { 344 return Collections.emptyList(); 345 } 346 return ImmutableList.copyOf(refs); 347 } 348 349 @Override 350 public synchronized Set<String> getAllHFileRefs() throws ReplicationException { 351 return hfileRefs.values().stream().flatMap(Set::stream).collect(Collectors.toSet()); 352 } 353 354 @Override 355 public boolean hasData() throws ReplicationException { 356 return true; 357 } 358 359 @Override 360 public void batchUpdateQueues(ServerName serverName, List<ReplicationQueueData> datas) 361 throws ReplicationException { 362 throw new UnsupportedOperationException(); 363 } 364 365 @Override 366 public void batchUpdateLastSequenceIds( 367 List<ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId> lastPushedSeqIds) 368 throws ReplicationException { 369 throw new UnsupportedOperationException(); 370 } 371 372 @Override 373 public void batchUpdateHFileRefs(String peerId, List<String> hfileRefs) 374 throws ReplicationException { 375 throw new UnsupportedOperationException(); 376 } 377 378 @Override 379 public void removeLastSequenceIdsAndHFileRefsBefore(long ts) throws ReplicationException { 380 throw new UnsupportedOperationException(); 381 } 382}