001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Map; 028import java.util.NavigableMap; 029import java.util.Set; 030import java.util.function.Supplier; 031import java.util.stream.Collectors; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.CellScanner; 035import org.apache.hadoop.hbase.CellUtil; 036import org.apache.hadoop.hbase.CompareOperator; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.ServerName; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.AsyncTable; 041import org.apache.hadoop.hbase.client.Connection; 042import org.apache.hadoop.hbase.client.Delete; 043import org.apache.hadoop.hbase.client.Get; 044import org.apache.hadoop.hbase.client.Put; 045import org.apache.hadoop.hbase.client.Result; 046import org.apache.hadoop.hbase.client.ResultScanner; 047import org.apache.hadoop.hbase.client.Scan; 048import org.apache.hadoop.hbase.client.Scan.ReadType; 049import org.apache.hadoop.hbase.client.Table; 050import org.apache.hadoop.hbase.filter.KeyOnlyFilter; 051import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.util.FutureUtils; 054import org.apache.hadoop.hbase.util.Pair; 055import org.apache.yetus.audience.InterfaceAudience; 056 057import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 058 059import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 060import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos; 063 064/** 065 * HBase table based replication queue storage. 066 */ 067@InterfaceAudience.Private 068public class TableReplicationQueueStorage implements ReplicationQueueStorage { 069 070 public static final byte[] QUEUE_FAMILY = Bytes.toBytes("queue"); 071 072 public static final byte[] LAST_SEQUENCE_ID_FAMILY = Bytes.toBytes("sid"); 073 074 public static final byte[] HFILE_REF_FAMILY = Bytes.toBytes("hfileref"); 075 076 private final Connection conn; 077 078 private final TableName tableName; 079 080 public TableReplicationQueueStorage(Connection conn, TableName tableName) { 081 this.conn = conn; 082 this.tableName = tableName; 083 } 084 085 private void addLastSeqIdsPut(MultiRowMutationProtos.MutateRowsRequest.Builder builder, 086 String peerId, Map<String, Long> lastSeqIds, AsyncTable<?> table) throws IOException { 087 // get the previous sequence ids first 088 byte[] row = Bytes.toBytes(peerId); 089 Get get = new Get(row); 090 lastSeqIds.keySet().forEach(encodedRegionName -> get.addColumn(LAST_SEQUENCE_ID_FAMILY, 091 Bytes.toBytes(encodedRegionName))); 092 Result result = FutureUtils.get(table.get(get)); 093 Put put = new Put(row); 094 for (Map.Entry<String, Long> entry : lastSeqIds.entrySet()) { 095 String encodedRegionName = entry.getKey(); 096 long lastSeqId = entry.getValue(); 097 byte[] encodedRegionNameAsBytes = Bytes.toBytes(encodedRegionName); 098 byte[] previousLastSeqIdAsBytes = 099 result.getValue(LAST_SEQUENCE_ID_FAMILY, encodedRegionNameAsBytes); 100 if (previousLastSeqIdAsBytes != null) { 101 long previousLastSeqId = Bytes.toLong(previousLastSeqIdAsBytes); 102 if (lastSeqId > previousLastSeqId) { 103 // update last seq id when it is greater, and use CAS to make sure we do not overwrite 104 // other's value. 105 put.addColumn(LAST_SEQUENCE_ID_FAMILY, encodedRegionNameAsBytes, 106 Bytes.toBytes(lastSeqId)); 107 builder.addCondition(ProtobufUtil.toCondition(row, LAST_SEQUENCE_ID_FAMILY, 108 encodedRegionNameAsBytes, CompareOperator.EQUAL, previousLastSeqIdAsBytes, null)); 109 } 110 } else { 111 // also update last seq id when there is no value yet, and use CAS to make sure we do not 112 // overwrite 113 // other's value. 114 put.addColumn(LAST_SEQUENCE_ID_FAMILY, encodedRegionNameAsBytes, Bytes.toBytes(lastSeqId)); 115 builder.addCondition(ProtobufUtil.toCondition(row, LAST_SEQUENCE_ID_FAMILY, 116 encodedRegionNameAsBytes, CompareOperator.EQUAL, null, null)); 117 } 118 } 119 if (!put.isEmpty()) { 120 builder.addMutationRequest(ProtobufUtil.toMutation(MutationType.PUT, put)); 121 } 122 } 123 124 @Override 125 public void setOffset(ReplicationQueueId queueId, String walGroup, ReplicationGroupOffset offset, 126 Map<String, Long> lastSeqIds) throws ReplicationException { 127 Put put = new Put(Bytes.toBytes(queueId.toString())).addColumn(QUEUE_FAMILY, 128 Bytes.toBytes(walGroup), Bytes.toBytes(offset.toString())); 129 AsyncTable<?> asyncTable = conn.toAsyncConnection().getTable(tableName); 130 try { 131 if (lastSeqIds.isEmpty()) { 132 FutureUtils.get(asyncTable.put(put)); 133 } else { 134 for (;;) { 135 MultiRowMutationProtos.MutateRowsRequest.Builder builder = 136 MultiRowMutationProtos.MutateRowsRequest.newBuilder(); 137 addLastSeqIdsPut(builder, queueId.getPeerId(), lastSeqIds, asyncTable); 138 if (builder.getMutationRequestCount() > 0) { 139 // use MultiRowMutationService to atomically update offset and last sequence ids 140 MultiRowMutationProtos.MutateRowsRequest request = 141 builder.addMutationRequest(ProtobufUtil.toMutation(MutationType.PUT, put)).build(); 142 MultiRowMutationProtos.MutateRowsResponse responose = 143 FutureUtils.get(asyncTable.<MultiRowMutationProtos.MultiRowMutationService.Interface, 144 MultiRowMutationProtos.MutateRowsResponse> coprocessorService( 145 MultiRowMutationProtos.MultiRowMutationService::newStub, 146 (stub, controller, done) -> stub.mutateRows(controller, request, done), 147 put.getRow())); 148 if (responose.getProcessed()) { 149 break; 150 } 151 } else { 152 // we do not need to update last seq id, fallback to single put 153 FutureUtils.get(asyncTable.put(put)); 154 break; 155 } 156 } 157 } 158 } catch (IOException e) { 159 throw new ReplicationException("failed to setOffset, queueId=" + queueId + ", walGroup=" 160 + walGroup + ", offset=" + offset + ", lastSeqIds=" + lastSeqIds, e); 161 } 162 } 163 164 private ImmutableMap<String, ReplicationGroupOffset> parseOffsets(Result result) { 165 ImmutableMap.Builder<String, ReplicationGroupOffset> builder = 166 ImmutableMap.builderWithExpectedSize(result.size()); 167 NavigableMap<byte[], byte[]> map = result.getFamilyMap(QUEUE_FAMILY); 168 if (map != null) { 169 map.forEach((k, v) -> { 170 String walGroup = Bytes.toString(k); 171 ReplicationGroupOffset offset = ReplicationGroupOffset.parse(Bytes.toString(v)); 172 builder.put(walGroup, offset); 173 }); 174 } 175 return builder.build(); 176 } 177 178 private Map<String, ReplicationGroupOffset> getOffsets0(Table table, ReplicationQueueId queueId) 179 throws IOException { 180 Result result = table.get(new Get(Bytes.toBytes(queueId.toString())).addFamily(QUEUE_FAMILY)); 181 return parseOffsets(result); 182 } 183 184 @Override 185 public Map<String, ReplicationGroupOffset> getOffsets(ReplicationQueueId queueId) 186 throws ReplicationException { 187 try (Table table = conn.getTable(tableName)) { 188 return getOffsets0(table, queueId); 189 } catch (IOException e) { 190 throw new ReplicationException("failed to getOffsets, queueId=" + queueId, e); 191 } 192 } 193 194 private void listAllQueueIds(Table table, Scan scan, List<ReplicationQueueId> queueIds) 195 throws IOException { 196 try (ResultScanner scanner = table.getScanner(scan)) { 197 for (;;) { 198 Result result = scanner.next(); 199 if (result == null) { 200 break; 201 } 202 ReplicationQueueId queueId = ReplicationQueueId.parse(Bytes.toString(result.getRow())); 203 queueIds.add(queueId); 204 } 205 } 206 } 207 208 private void listAllQueueIds(Table table, String peerId, ServerName serverName, 209 List<ReplicationQueueId> queueIds) throws IOException { 210 listAllQueueIds(table, 211 new Scan().setStartStopRowForPrefixScan(ReplicationQueueId.getScanPrefix(serverName, peerId)) 212 .addFamily(QUEUE_FAMILY).setFilter(new KeyOnlyFilter()), 213 queueIds); 214 } 215 216 private String getNextPeerId(Table table, String previousPeerId) throws IOException { 217 Scan peerScan = 218 new Scan().addFamily(QUEUE_FAMILY).setOneRowLimit().setFilter(new KeyOnlyFilter()); 219 if (previousPeerId != null) { 220 peerScan.withStartRow(ReplicationQueueId.getScanStartRowForNextPeerId(previousPeerId)); 221 } 222 try (ResultScanner scanner = table.getScanner(peerScan)) { 223 Result result = scanner.next(); 224 if (result == null) { 225 return null; 226 } 227 return ReplicationQueueId.getPeerId(Bytes.toString(result.getRow())); 228 } 229 } 230 231 @Override 232 public List<String> listAllPeerIds() throws ReplicationException { 233 List<String> peerIds = new ArrayList<>(); 234 String previousPeerId = null; 235 try (Table table = conn.getTable(tableName)) { 236 for (;;) { 237 String peerId = getNextPeerId(table, previousPeerId); 238 if (peerId == null) { 239 break; 240 } 241 peerIds.add(peerId); 242 previousPeerId = peerId; 243 } 244 } catch (IOException e) { 245 throw new ReplicationException("failed to listAllPeerIds", e); 246 } 247 return peerIds; 248 } 249 250 @Override 251 public List<ReplicationQueueId> listAllQueueIds(String peerId) throws ReplicationException { 252 Scan scan = new Scan().setStartStopRowForPrefixScan(ReplicationQueueId.getScanPrefix(peerId)) 253 .addFamily(QUEUE_FAMILY).setFilter(new KeyOnlyFilter()); 254 List<ReplicationQueueId> queueIds = new ArrayList<>(); 255 try (Table table = conn.getTable(tableName)) { 256 listAllQueueIds(table, scan, queueIds); 257 } catch (IOException e) { 258 throw new ReplicationException("failed to listAllQueueIds, peerId=" + peerId, e); 259 } 260 return queueIds; 261 } 262 263 @Override 264 public List<ReplicationQueueId> listAllQueueIds(ServerName serverName) 265 throws ReplicationException { 266 List<ReplicationQueueId> queueIds = new ArrayList<>(); 267 try (Table table = conn.getTable(tableName)) { 268 String previousPeerId = null; 269 for (;;) { 270 // first, get the next peerId 271 String peerId = getNextPeerId(table, previousPeerId); 272 if (peerId == null) { 273 break; 274 } 275 listAllQueueIds(table, peerId, serverName, queueIds); 276 previousPeerId = peerId; 277 } 278 } catch (IOException e) { 279 throw new ReplicationException("failed to listAllQueueIds, serverName=" + serverName, e); 280 } 281 return queueIds; 282 } 283 284 @Override 285 public List<ReplicationQueueId> listAllQueueIds(String peerId, ServerName serverName) 286 throws ReplicationException { 287 List<ReplicationQueueId> queueIds = new ArrayList<>(); 288 try (Table table = conn.getTable(tableName)) { 289 listAllQueueIds(table, peerId, serverName, queueIds); 290 } catch (IOException e) { 291 throw new ReplicationException( 292 "failed to listAllQueueIds, peerId=" + peerId + ", serverName=" + serverName, e); 293 } 294 return queueIds; 295 } 296 297 @Override 298 public List<ReplicationQueueData> listAllQueues() throws ReplicationException { 299 List<ReplicationQueueData> queues = new ArrayList<>(); 300 Scan scan = new Scan().addFamily(QUEUE_FAMILY).setReadType(ReadType.STREAM); 301 try (Table table = conn.getTable(tableName); ResultScanner scanner = table.getScanner(scan)) { 302 for (;;) { 303 Result result = scanner.next(); 304 if (result == null) { 305 break; 306 } 307 ReplicationQueueId queueId = ReplicationQueueId.parse(Bytes.toString(result.getRow())); 308 ReplicationQueueData queueData = new ReplicationQueueData(queueId, parseOffsets(result)); 309 queues.add(queueData); 310 } 311 } catch (IOException e) { 312 throw new ReplicationException("failed to listAllQueues", e); 313 } 314 return queues; 315 } 316 317 @Override 318 public List<ServerName> listAllReplicators() throws ReplicationException { 319 Set<ServerName> replicators = new HashSet<>(); 320 Scan scan = new Scan().addFamily(QUEUE_FAMILY).setFilter(new KeyOnlyFilter()) 321 .setReadType(ReadType.STREAM); 322 try (Table table = conn.getTable(tableName); ResultScanner scanner = table.getScanner(scan)) { 323 for (;;) { 324 Result result = scanner.next(); 325 if (result == null) { 326 break; 327 } 328 ReplicationQueueId queueId = ReplicationQueueId.parse(Bytes.toString(result.getRow())); 329 replicators.add(queueId.getServerName()); 330 } 331 } catch (IOException e) { 332 throw new ReplicationException("failed to listAllReplicators", e); 333 } 334 return new ArrayList<>(replicators); 335 } 336 337 @Override 338 public Map<String, ReplicationGroupOffset> claimQueue(ReplicationQueueId queueId, 339 ServerName targetServerName) throws ReplicationException { 340 ReplicationQueueId newQueueId = queueId.claim(targetServerName); 341 byte[] coprocessorRow = ReplicationQueueId.getScanPrefix(queueId.getPeerId()); 342 AsyncTable<?> asyncTable = conn.toAsyncConnection().getTable(tableName); 343 try (Table table = conn.getTable(tableName)) { 344 for (;;) { 345 Map<String, ReplicationGroupOffset> offsets = getOffsets0(table, queueId); 346 if (offsets.isEmpty()) { 347 return Collections.emptyMap(); 348 } 349 Map.Entry<String, ReplicationGroupOffset> entry = offsets.entrySet().iterator().next(); 350 ClientProtos.Condition condition = ProtobufUtil.toCondition( 351 Bytes.toBytes(queueId.toString()), QUEUE_FAMILY, Bytes.toBytes(entry.getKey()), 352 CompareOperator.EQUAL, Bytes.toBytes(entry.getValue().toString()), null); 353 Delete delete = new Delete(Bytes.toBytes(queueId.toString())).addFamily(QUEUE_FAMILY); 354 Put put = new Put(Bytes.toBytes(newQueueId.toString())); 355 offsets.forEach((walGroup, offset) -> put.addColumn(QUEUE_FAMILY, Bytes.toBytes(walGroup), 356 Bytes.toBytes(offset.toString()))); 357 MultiRowMutationProtos.MutateRowsRequest request = 358 MultiRowMutationProtos.MutateRowsRequest.newBuilder().addCondition(condition) 359 .addMutationRequest(ProtobufUtil.toMutation(MutationType.DELETE, delete)) 360 .addMutationRequest(ProtobufUtil.toMutation(MutationType.PUT, put)).build(); 361 MultiRowMutationProtos.MutateRowsResponse resp = 362 FutureUtils.get(asyncTable.<MultiRowMutationProtos.MultiRowMutationService.Interface, 363 MultiRowMutationProtos.MutateRowsResponse> coprocessorService( 364 MultiRowMutationProtos.MultiRowMutationService::newStub, 365 (stub, controller, done) -> stub.mutateRows(controller, request, done), 366 coprocessorRow)); 367 if (resp.getProcessed()) { 368 return offsets; 369 } 370 // if the multi is not processed, which usually the queue has already been claimed by 371 // others, for safety, let's try claiming again, usually the next get operation above will 372 // return an empty map and we will quit the loop. 373 } 374 } catch (IOException e) { 375 throw new ReplicationException( 376 "failed to claimQueue, queueId=" + queueId + ", targetServerName=" + targetServerName, e); 377 } 378 } 379 380 @Override 381 public void removeQueue(ReplicationQueueId queueId) throws ReplicationException { 382 try (Table table = conn.getTable(tableName)) { 383 table.delete(new Delete(Bytes.toBytes(queueId.toString())).addFamily(QUEUE_FAMILY)); 384 } catch (IOException e) { 385 throw new ReplicationException("failed to removeQueue, queueId=" + queueId, e); 386 } 387 } 388 389 @Override 390 public void removeAllQueues(String peerId) throws ReplicationException { 391 Scan scan = new Scan().setStartStopRowForPrefixScan(ReplicationQueueId.getScanPrefix(peerId)) 392 .addFamily(QUEUE_FAMILY).setFilter(new KeyOnlyFilter()); 393 try (Table table = conn.getTable(tableName); ResultScanner scanner = table.getScanner(scan)) { 394 for (;;) { 395 Result result = scanner.next(); 396 if (result == null) { 397 break; 398 } 399 table.delete(new Delete(result.getRow())); 400 } 401 } catch (IOException e) { 402 throw new ReplicationException("failed to removeAllQueues, peerId=" + peerId, e); 403 } 404 } 405 406 @Override 407 public long getLastSequenceId(String encodedRegionName, String peerId) 408 throws ReplicationException { 409 byte[] qual = Bytes.toBytes(encodedRegionName); 410 try (Table table = conn.getTable(tableName)) { 411 Result result = 412 table.get(new Get(Bytes.toBytes(peerId)).addColumn(LAST_SEQUENCE_ID_FAMILY, qual)); 413 byte[] lastSeqId = result.getValue(LAST_SEQUENCE_ID_FAMILY, qual); 414 return lastSeqId != null ? Bytes.toLong(lastSeqId) : HConstants.NO_SEQNUM; 415 } catch (IOException e) { 416 throw new ReplicationException("failed to getLastSequenceId, encodedRegionName=" 417 + encodedRegionName + ", peerId=" + peerId, e); 418 } 419 } 420 421 @Override 422 public void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds) 423 throws ReplicationException { 424 // No need CAS and retry here, because it'll call setLastSequenceIds() for disabled peers 425 // only, so no conflict happen. 426 Put put = new Put(Bytes.toBytes(peerId)); 427 lastSeqIds.forEach((encodedRegionName, lastSeqId) -> put.addColumn(LAST_SEQUENCE_ID_FAMILY, 428 Bytes.toBytes(encodedRegionName), Bytes.toBytes(lastSeqId))); 429 try (Table table = conn.getTable(tableName)) { 430 table.put(put); 431 } catch (IOException e) { 432 throw new ReplicationException( 433 "failed to setLastSequenceIds, peerId=" + peerId + ", lastSeqIds=" + lastSeqIds, e); 434 } 435 } 436 437 @Override 438 public void removeLastSequenceIds(String peerId) throws ReplicationException { 439 Delete delete = new Delete(Bytes.toBytes(peerId)).addFamily(LAST_SEQUENCE_ID_FAMILY); 440 try (Table table = conn.getTable(tableName)) { 441 table.delete(delete); 442 } catch (IOException e) { 443 throw new ReplicationException("failed to removeLastSequenceIds, peerId=" + peerId, e); 444 } 445 } 446 447 @Override 448 public void removeLastSequenceIds(String peerId, List<String> encodedRegionNames) 449 throws ReplicationException { 450 Delete delete = new Delete(Bytes.toBytes(peerId)); 451 encodedRegionNames.forEach(n -> delete.addColumns(LAST_SEQUENCE_ID_FAMILY, Bytes.toBytes(n))); 452 try (Table table = conn.getTable(tableName)) { 453 table.delete(delete); 454 } catch (IOException e) { 455 throw new ReplicationException("failed to removeLastSequenceIds, peerId=" + peerId 456 + ", encodedRegionNames=" + encodedRegionNames, e); 457 } 458 } 459 460 @Override 461 public void removePeerFromHFileRefs(String peerId) throws ReplicationException { 462 try (Table table = conn.getTable(tableName)) { 463 table.delete(new Delete(Bytes.toBytes(peerId)).addFamily(HFILE_REF_FAMILY)); 464 } catch (IOException e) { 465 throw new ReplicationException("failed to removePeerFromHFileRefs, peerId=" + peerId, e); 466 } 467 } 468 469 @Override 470 public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs) 471 throws ReplicationException { 472 Put put = new Put(Bytes.toBytes(peerId)); 473 pairs.forEach(p -> put.addColumn(HFILE_REF_FAMILY, Bytes.toBytes(p.getSecond().getName()), 474 HConstants.EMPTY_BYTE_ARRAY)); 475 try (Table table = conn.getTable(tableName)) { 476 table.put(put); 477 } catch (IOException e) { 478 throw new ReplicationException( 479 "failed to addHFileRefs, peerId=" + peerId + ", pairs=" + pairs, e); 480 } 481 } 482 483 @Override 484 public void removeHFileRefs(String peerId, List<String> files) throws ReplicationException { 485 Delete delete = new Delete(Bytes.toBytes(peerId)); 486 files.forEach(f -> delete.addColumns(HFILE_REF_FAMILY, Bytes.toBytes(f))); 487 try (Table table = conn.getTable(tableName)) { 488 table.delete(delete); 489 } catch (IOException e) { 490 throw new ReplicationException( 491 "failed to removeHFileRefs, peerId=" + peerId + ", files=" + files, e); 492 } 493 } 494 495 @Override 496 public List<String> getAllPeersFromHFileRefsQueue() throws ReplicationException { 497 List<String> peerIds = new ArrayList<>(); 498 Scan scan = new Scan().addFamily(HFILE_REF_FAMILY).setReadType(ReadType.STREAM) 499 .setFilter(new KeyOnlyFilter()); 500 try (Table table = conn.getTable(tableName); ResultScanner scanner = table.getScanner(scan)) { 501 for (;;) { 502 Result result = scanner.next(); 503 if (result == null) { 504 break; 505 } 506 peerIds.add(Bytes.toString(result.getRow())); 507 } 508 } catch (IOException e) { 509 throw new ReplicationException("failed to getAllPeersFromHFileRefsQueue", e); 510 } 511 return peerIds; 512 } 513 514 private <T extends Collection<String>> T scanHFiles(Scan scan, Supplier<T> creator) 515 throws IOException { 516 T files = creator.get(); 517 try (Table table = conn.getTable(tableName); ResultScanner scanner = table.getScanner(scan)) { 518 for (;;) { 519 Result result = scanner.next(); 520 if (result == null) { 521 break; 522 } 523 CellScanner cellScanner = result.cellScanner(); 524 while (cellScanner.advance()) { 525 Cell cell = cellScanner.current(); 526 files.add(Bytes.toString(CellUtil.cloneQualifier(cell))); 527 } 528 } 529 } 530 return files; 531 } 532 533 @Override 534 public List<String> getReplicableHFiles(String peerId) throws ReplicationException { 535 // use scan to avoid getting a too large row one time, which may cause a very huge memory usage. 536 Scan scan = new Scan().addFamily(HFILE_REF_FAMILY) 537 .setStartStopRowForPrefixScan(Bytes.toBytes(peerId)).setAllowPartialResults(true); 538 try { 539 return scanHFiles(scan, ArrayList::new); 540 } catch (IOException e) { 541 throw new ReplicationException("failed to getReplicableHFiles, peerId=" + peerId, e); 542 } 543 } 544 545 @Override 546 public Set<String> getAllHFileRefs() throws ReplicationException { 547 Scan scan = new Scan().addFamily(HFILE_REF_FAMILY).setReadType(ReadType.STREAM) 548 .setAllowPartialResults(true); 549 try { 550 return scanHFiles(scan, HashSet::new); 551 } catch (IOException e) { 552 throw new ReplicationException("failed to getAllHFileRefs", e); 553 } 554 } 555 556 @Override 557 public boolean hasData() throws ReplicationException { 558 try { 559 return conn.getAdmin().tableExists(tableName); 560 } catch (IOException e) { 561 throw new ReplicationException("failed to get replication queue table", e); 562 } 563 } 564 565 @Override 566 public void batchUpdateQueues(ServerName serverName, List<ReplicationQueueData> datas) 567 throws ReplicationException { 568 List<Put> puts = new ArrayList<>(); 569 for (ReplicationQueueData data : datas) { 570 if (data.getOffsets().isEmpty()) { 571 continue; 572 } 573 Put put = new Put(Bytes.toBytes(data.getId().toString())); 574 data.getOffsets().forEach((walGroup, offset) -> { 575 put.addColumn(QUEUE_FAMILY, Bytes.toBytes(walGroup), Bytes.toBytes(offset.toString())); 576 }); 577 puts.add(put); 578 } 579 try (Table table = conn.getTable(tableName)) { 580 table.put(puts); 581 } catch (IOException e) { 582 throw new ReplicationException("failed to batch update queues", e); 583 } 584 } 585 586 @Override 587 public void batchUpdateLastSequenceIds(List<ZkLastPushedSeqId> lastPushedSeqIds) 588 throws ReplicationException { 589 Map<String, Put> peerId2Put = new HashMap<>(); 590 for (ZkLastPushedSeqId lastPushedSeqId : lastPushedSeqIds) { 591 peerId2Put 592 .computeIfAbsent(lastPushedSeqId.getPeerId(), peerId -> new Put(Bytes.toBytes(peerId))) 593 .addColumn(LAST_SEQUENCE_ID_FAMILY, Bytes.toBytes(lastPushedSeqId.getEncodedRegionName()), 594 Bytes.toBytes(lastPushedSeqId.getLastPushedSeqId())); 595 } 596 try (Table table = conn.getTable(tableName)) { 597 table 598 .put(peerId2Put.values().stream().filter(p -> !p.isEmpty()).collect(Collectors.toList())); 599 } catch (IOException e) { 600 throw new ReplicationException("failed to batch update last pushed sequence ids", e); 601 } 602 } 603 604 @Override 605 public void batchUpdateHFileRefs(String peerId, List<String> hfileRefs) 606 throws ReplicationException { 607 if (hfileRefs.isEmpty()) { 608 return; 609 } 610 Put put = new Put(Bytes.toBytes(peerId)); 611 for (String ref : hfileRefs) { 612 put.addColumn(HFILE_REF_FAMILY, Bytes.toBytes(ref), HConstants.EMPTY_BYTE_ARRAY); 613 } 614 try (Table table = conn.getTable(tableName)) { 615 table.put(put); 616 } catch (IOException e) { 617 throw new ReplicationException("failed to batch update hfile references", e); 618 } 619 } 620 621 @Override 622 public void removeLastSequenceIdsAndHFileRefsBefore(long ts) throws ReplicationException { 623 try (Table table = conn.getTable(tableName); 624 ResultScanner scanner = table.getScanner(new Scan().addFamily(LAST_SEQUENCE_ID_FAMILY) 625 .addFamily(HFILE_REF_FAMILY).setFilter(new KeyOnlyFilter()))) { 626 for (;;) { 627 Result r = scanner.next(); 628 if (r == null) { 629 break; 630 } 631 Delete delete = new Delete(r.getRow()).addFamily(LAST_SEQUENCE_ID_FAMILY, ts) 632 .addFamily(HFILE_REF_FAMILY, ts); 633 table.delete(delete); 634 } 635 } catch (IOException e) { 636 throw new ReplicationException( 637 "failed to remove last sequence ids and hfile references before timestamp " + ts, e); 638 } 639 } 640}