001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Map; 028import java.util.NavigableMap; 029import java.util.Set; 030import java.util.function.Supplier; 031import java.util.stream.Collectors; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.CellScanner; 035import org.apache.hadoop.hbase.CellUtil; 036import org.apache.hadoop.hbase.CompareOperator; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.ServerName; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.AsyncTable; 041import org.apache.hadoop.hbase.client.Connection; 042import org.apache.hadoop.hbase.client.Delete; 043import org.apache.hadoop.hbase.client.Get; 044import org.apache.hadoop.hbase.client.Put; 045import org.apache.hadoop.hbase.client.Result; 046import org.apache.hadoop.hbase.client.ResultScanner; 047import org.apache.hadoop.hbase.client.Scan; 048import org.apache.hadoop.hbase.client.Scan.ReadType; 049import org.apache.hadoop.hbase.client.Table; 050import org.apache.hadoop.hbase.filter.KeyOnlyFilter; 051import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.util.FutureUtils; 054import org.apache.hadoop.hbase.util.Pair; 055import org.apache.yetus.audience.InterfaceAudience; 056 057import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 058 059import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 060import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos; 063 064/** 065 * HBase table based replication queue storage. 066 */ 067@InterfaceAudience.Private 068public class TableReplicationQueueStorage implements ReplicationQueueStorage { 069 070 public static final byte[] QUEUE_FAMILY = Bytes.toBytes("queue"); 071 072 public static final byte[] LAST_SEQUENCE_ID_FAMILY = Bytes.toBytes("sid"); 073 074 public static final byte[] HFILE_REF_FAMILY = Bytes.toBytes("hfileref"); 075 076 private final Connection conn; 077 078 private final TableName tableName; 079 080 public TableReplicationQueueStorage(Connection conn, TableName tableName) { 081 this.conn = conn; 082 this.tableName = tableName; 083 } 084 085 private void addLastSeqIdsPut(MultiRowMutationProtos.MutateRowsRequest.Builder builder, 086 String peerId, Map<String, Long> lastSeqIds, AsyncTable<?> table) throws IOException { 087 // get the previous sequence ids first 088 byte[] row = Bytes.toBytes(peerId); 089 Get get = new Get(row); 090 lastSeqIds.keySet().forEach(encodedRegionName -> get.addColumn(LAST_SEQUENCE_ID_FAMILY, 091 Bytes.toBytes(encodedRegionName))); 092 Result result = FutureUtils.get(table.get(get)); 093 Put put = new Put(row); 094 for (Map.Entry<String, Long> entry : lastSeqIds.entrySet()) { 095 String encodedRegionName = entry.getKey(); 096 long lastSeqId = entry.getValue(); 097 byte[] encodedRegionNameAsBytes = Bytes.toBytes(encodedRegionName); 098 byte[] previousLastSeqIdAsBytes = 099 result.getValue(LAST_SEQUENCE_ID_FAMILY, encodedRegionNameAsBytes); 100 if (previousLastSeqIdAsBytes != null) { 101 long previousLastSeqId = Bytes.toLong(previousLastSeqIdAsBytes); 102 if (lastSeqId > previousLastSeqId) { 103 // update last seq id when it is greater, and use CAS to make sure we do not overwrite 104 // other's value. 105 put.addColumn(LAST_SEQUENCE_ID_FAMILY, encodedRegionNameAsBytes, 106 Bytes.toBytes(lastSeqId)); 107 builder.addCondition(ProtobufUtil.toCondition(row, LAST_SEQUENCE_ID_FAMILY, 108 encodedRegionNameAsBytes, CompareOperator.EQUAL, previousLastSeqIdAsBytes, null)); 109 } 110 } else { 111 // also update last seq id when there is no value yet, and use CAS to make sure we do not 112 // overwrite 113 // other's value. 114 put.addColumn(LAST_SEQUENCE_ID_FAMILY, encodedRegionNameAsBytes, Bytes.toBytes(lastSeqId)); 115 builder.addCondition(ProtobufUtil.toCondition(row, LAST_SEQUENCE_ID_FAMILY, 116 encodedRegionNameAsBytes, CompareOperator.EQUAL, null, null)); 117 } 118 } 119 if (!put.isEmpty()) { 120 builder.addMutationRequest(ProtobufUtil.toMutation(MutationType.PUT, put)); 121 } 122 } 123 124 @Override 125 public void setOffset(ReplicationQueueId queueId, String walGroup, ReplicationGroupOffset offset, 126 Map<String, Long> lastSeqIds) throws ReplicationException { 127 Put put = new Put(Bytes.toBytes(queueId.toString())).addColumn(QUEUE_FAMILY, 128 Bytes.toBytes(walGroup), Bytes.toBytes(offset.toString())); 129 AsyncTable<?> asyncTable = conn.toAsyncConnection().getTable(tableName); 130 try { 131 if (lastSeqIds.isEmpty()) { 132 FutureUtils.get(asyncTable.put(put)); 133 } else { 134 for (;;) { 135 MultiRowMutationProtos.MutateRowsRequest.Builder builder = 136 MultiRowMutationProtos.MutateRowsRequest.newBuilder(); 137 addLastSeqIdsPut(builder, queueId.getPeerId(), lastSeqIds, asyncTable); 138 if (builder.getMutationRequestCount() > 0) { 139 // use MultiRowMutationService to atomically update offset and last sequence ids 140 MultiRowMutationProtos.MutateRowsRequest request = 141 builder.addMutationRequest(ProtobufUtil.toMutation(MutationType.PUT, put)).build(); 142 MultiRowMutationProtos.MutateRowsResponse responose = 143 FutureUtils.get(asyncTable.<MultiRowMutationProtos.MultiRowMutationService.Interface, 144 MultiRowMutationProtos.MutateRowsResponse> coprocessorService( 145 MultiRowMutationProtos.MultiRowMutationService::newStub, 146 (stub, controller, done) -> stub.mutateRows(controller, request, done), 147 put.getRow())); 148 if (responose.getProcessed()) { 149 break; 150 } 151 } else { 152 // we do not need to update last seq id, fallback to single put 153 FutureUtils.get(asyncTable.put(put)); 154 break; 155 } 156 } 157 } 158 } catch (IOException e) { 159 throw new ReplicationException("failed to setOffset, queueId=" + queueId + ", walGroup=" 160 + walGroup + ", offset=" + offset + ", lastSeqIds=" + lastSeqIds, e); 161 } 162 } 163 164 private ImmutableMap<String, ReplicationGroupOffset> parseOffsets(Result result) { 165 ImmutableMap.Builder<String, ReplicationGroupOffset> builder = 166 ImmutableMap.builderWithExpectedSize(result.size()); 167 NavigableMap<byte[], byte[]> map = result.getFamilyMap(QUEUE_FAMILY); 168 if (map != null) { 169 map.forEach((k, v) -> { 170 String walGroup = Bytes.toString(k); 171 ReplicationGroupOffset offset = ReplicationGroupOffset.parse(Bytes.toString(v)); 172 builder.put(walGroup, offset); 173 }); 174 } 175 return builder.build(); 176 } 177 178 private Map<String, ReplicationGroupOffset> getOffsets0(Table table, ReplicationQueueId queueId) 179 throws IOException { 180 Result result = table.get(new Get(Bytes.toBytes(queueId.toString())).addFamily(QUEUE_FAMILY)); 181 return parseOffsets(result); 182 } 183 184 @Override 185 public Map<String, ReplicationGroupOffset> getOffsets(ReplicationQueueId queueId) 186 throws ReplicationException { 187 try (Table table = conn.getTable(tableName)) { 188 return getOffsets0(table, queueId); 189 } catch (IOException e) { 190 throw new ReplicationException("failed to getOffsets, queueId=" + queueId, e); 191 } 192 } 193 194 private void listAllQueueIds(Table table, Scan scan, List<ReplicationQueueId> queueIds) 195 throws IOException { 196 try (ResultScanner scanner = table.getScanner(scan)) { 197 for (;;) { 198 Result result = scanner.next(); 199 if (result == null) { 200 break; 201 } 202 ReplicationQueueId queueId = ReplicationQueueId.parse(Bytes.toString(result.getRow())); 203 queueIds.add(queueId); 204 } 205 } 206 } 207 208 private void listAllQueueIds(Table table, String peerId, ServerName serverName, 209 List<ReplicationQueueId> queueIds) throws IOException { 210 listAllQueueIds(table, 211 new Scan().setStartStopRowForPrefixScan(ReplicationQueueId.getScanPrefix(serverName, peerId)) 212 .addFamily(QUEUE_FAMILY).setFilter(new KeyOnlyFilter()), 213 queueIds); 214 } 215 216 @Override 217 public List<ReplicationQueueId> listAllQueueIds(String peerId) throws ReplicationException { 218 Scan scan = new Scan().setStartStopRowForPrefixScan(ReplicationQueueId.getScanPrefix(peerId)) 219 .addFamily(QUEUE_FAMILY).setFilter(new KeyOnlyFilter()); 220 List<ReplicationQueueId> queueIds = new ArrayList<>(); 221 try (Table table = conn.getTable(tableName)) { 222 listAllQueueIds(table, scan, queueIds); 223 } catch (IOException e) { 224 throw new ReplicationException("failed to listAllQueueIds, peerId=" + peerId, e); 225 } 226 return queueIds; 227 } 228 229 @Override 230 public List<ReplicationQueueId> listAllQueueIds(ServerName serverName) 231 throws ReplicationException { 232 List<ReplicationQueueId> queueIds = new ArrayList<>(); 233 try (Table table = conn.getTable(tableName)) { 234 KeyOnlyFilter keyOnlyFilter = new KeyOnlyFilter(); 235 String previousPeerId = null; 236 for (;;) { 237 // first, get the next peerId 238 Scan peerScan = 239 new Scan().addFamily(QUEUE_FAMILY).setOneRowLimit().setFilter(keyOnlyFilter); 240 if (previousPeerId != null) { 241 peerScan.withStartRow(ReplicationQueueId.getScanStartRowForNextPeerId(previousPeerId)); 242 } 243 String peerId; 244 try (ResultScanner scanner = table.getScanner(peerScan)) { 245 Result result = scanner.next(); 246 if (result == null) { 247 // no more peers, break 248 break; 249 } 250 peerId = ReplicationQueueId.getPeerId(Bytes.toString(result.getRow())); 251 } 252 listAllQueueIds(table, peerId, serverName, queueIds); 253 previousPeerId = peerId; 254 } 255 } catch (IOException e) { 256 throw new ReplicationException("failed to listAllQueueIds, serverName=" + serverName, e); 257 } 258 return queueIds; 259 } 260 261 @Override 262 public List<ReplicationQueueId> listAllQueueIds(String peerId, ServerName serverName) 263 throws ReplicationException { 264 List<ReplicationQueueId> queueIds = new ArrayList<>(); 265 try (Table table = conn.getTable(tableName)) { 266 listAllQueueIds(table, peerId, serverName, queueIds); 267 } catch (IOException e) { 268 throw new ReplicationException( 269 "failed to listAllQueueIds, peerId=" + peerId + ", serverName=" + serverName, e); 270 } 271 return queueIds; 272 } 273 274 @Override 275 public List<ReplicationQueueData> listAllQueues() throws ReplicationException { 276 List<ReplicationQueueData> queues = new ArrayList<>(); 277 Scan scan = new Scan().addFamily(QUEUE_FAMILY).setReadType(ReadType.STREAM); 278 try (Table table = conn.getTable(tableName); ResultScanner scanner = table.getScanner(scan)) { 279 for (;;) { 280 Result result = scanner.next(); 281 if (result == null) { 282 break; 283 } 284 ReplicationQueueId queueId = ReplicationQueueId.parse(Bytes.toString(result.getRow())); 285 ReplicationQueueData queueData = new ReplicationQueueData(queueId, parseOffsets(result)); 286 queues.add(queueData); 287 } 288 } catch (IOException e) { 289 throw new ReplicationException("failed to listAllQueues", e); 290 } 291 return queues; 292 } 293 294 @Override 295 public List<ServerName> listAllReplicators() throws ReplicationException { 296 Set<ServerName> replicators = new HashSet<>(); 297 Scan scan = new Scan().addFamily(QUEUE_FAMILY).setFilter(new KeyOnlyFilter()) 298 .setReadType(ReadType.STREAM); 299 try (Table table = conn.getTable(tableName); ResultScanner scanner = table.getScanner(scan)) { 300 for (;;) { 301 Result result = scanner.next(); 302 if (result == null) { 303 break; 304 } 305 ReplicationQueueId queueId = ReplicationQueueId.parse(Bytes.toString(result.getRow())); 306 replicators.add(queueId.getServerName()); 307 } 308 } catch (IOException e) { 309 throw new ReplicationException("failed to listAllReplicators", e); 310 } 311 return new ArrayList<>(replicators); 312 } 313 314 @Override 315 public Map<String, ReplicationGroupOffset> claimQueue(ReplicationQueueId queueId, 316 ServerName targetServerName) throws ReplicationException { 317 ReplicationQueueId newQueueId = queueId.claim(targetServerName); 318 byte[] coprocessorRow = ReplicationQueueId.getScanPrefix(queueId.getPeerId()); 319 AsyncTable<?> asyncTable = conn.toAsyncConnection().getTable(tableName); 320 try (Table table = conn.getTable(tableName)) { 321 for (;;) { 322 Map<String, ReplicationGroupOffset> offsets = getOffsets0(table, queueId); 323 if (offsets.isEmpty()) { 324 return Collections.emptyMap(); 325 } 326 Map.Entry<String, ReplicationGroupOffset> entry = offsets.entrySet().iterator().next(); 327 ClientProtos.Condition condition = ProtobufUtil.toCondition( 328 Bytes.toBytes(queueId.toString()), QUEUE_FAMILY, Bytes.toBytes(entry.getKey()), 329 CompareOperator.EQUAL, Bytes.toBytes(entry.getValue().toString()), null); 330 Delete delete = new Delete(Bytes.toBytes(queueId.toString())).addFamily(QUEUE_FAMILY); 331 Put put = new Put(Bytes.toBytes(newQueueId.toString())); 332 offsets.forEach((walGroup, offset) -> put.addColumn(QUEUE_FAMILY, Bytes.toBytes(walGroup), 333 Bytes.toBytes(offset.toString()))); 334 MultiRowMutationProtos.MutateRowsRequest request = 335 MultiRowMutationProtos.MutateRowsRequest.newBuilder().addCondition(condition) 336 .addMutationRequest(ProtobufUtil.toMutation(MutationType.DELETE, delete)) 337 .addMutationRequest(ProtobufUtil.toMutation(MutationType.PUT, put)).build(); 338 MultiRowMutationProtos.MutateRowsResponse resp = 339 FutureUtils.get(asyncTable.<MultiRowMutationProtos.MultiRowMutationService.Interface, 340 MultiRowMutationProtos.MutateRowsResponse> coprocessorService( 341 MultiRowMutationProtos.MultiRowMutationService::newStub, 342 (stub, controller, done) -> stub.mutateRows(controller, request, done), 343 coprocessorRow)); 344 if (resp.getProcessed()) { 345 return offsets; 346 } 347 // if the multi is not processed, which usually the queue has already been claimed by 348 // others, for safety, let's try claiming again, usually the next get operation above will 349 // return an empty map and we will quit the loop. 350 } 351 } catch (IOException e) { 352 throw new ReplicationException( 353 "failed to claimQueue, queueId=" + queueId + ", targetServerName=" + targetServerName, e); 354 } 355 } 356 357 @Override 358 public void removeQueue(ReplicationQueueId queueId) throws ReplicationException { 359 try (Table table = conn.getTable(tableName)) { 360 table.delete(new Delete(Bytes.toBytes(queueId.toString())).addFamily(QUEUE_FAMILY)); 361 } catch (IOException e) { 362 throw new ReplicationException("failed to removeQueue, queueId=" + queueId, e); 363 } 364 } 365 366 @Override 367 public void removeAllQueues(String peerId) throws ReplicationException { 368 Scan scan = new Scan().setStartStopRowForPrefixScan(ReplicationQueueId.getScanPrefix(peerId)) 369 .addFamily(QUEUE_FAMILY).setFilter(new KeyOnlyFilter()); 370 try (Table table = conn.getTable(tableName); ResultScanner scanner = table.getScanner(scan)) { 371 for (;;) { 372 Result result = scanner.next(); 373 if (result == null) { 374 break; 375 } 376 table.delete(new Delete(result.getRow())); 377 } 378 } catch (IOException e) { 379 throw new ReplicationException("failed to listAllQueueIds, peerId=" + peerId, e); 380 } 381 } 382 383 @Override 384 public long getLastSequenceId(String encodedRegionName, String peerId) 385 throws ReplicationException { 386 byte[] qual = Bytes.toBytes(encodedRegionName); 387 try (Table table = conn.getTable(tableName)) { 388 Result result = 389 table.get(new Get(Bytes.toBytes(peerId)).addColumn(LAST_SEQUENCE_ID_FAMILY, qual)); 390 byte[] lastSeqId = result.getValue(LAST_SEQUENCE_ID_FAMILY, qual); 391 return lastSeqId != null ? Bytes.toLong(lastSeqId) : HConstants.NO_SEQNUM; 392 } catch (IOException e) { 393 throw new ReplicationException("failed to getLastSequenceId, encodedRegionName=" 394 + encodedRegionName + ", peerId=" + peerId, e); 395 } 396 } 397 398 @Override 399 public void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds) 400 throws ReplicationException { 401 // No need CAS and retry here, because it'll call setLastSequenceIds() for disabled peers 402 // only, so no conflict happen. 403 Put put = new Put(Bytes.toBytes(peerId)); 404 lastSeqIds.forEach((encodedRegionName, lastSeqId) -> put.addColumn(LAST_SEQUENCE_ID_FAMILY, 405 Bytes.toBytes(encodedRegionName), Bytes.toBytes(lastSeqId))); 406 try (Table table = conn.getTable(tableName)) { 407 table.put(put); 408 } catch (IOException e) { 409 throw new ReplicationException( 410 "failed to setLastSequenceIds, peerId=" + peerId + ", lastSeqIds=" + lastSeqIds, e); 411 } 412 } 413 414 @Override 415 public void removeLastSequenceIds(String peerId) throws ReplicationException { 416 Delete delete = new Delete(Bytes.toBytes(peerId)).addFamily(LAST_SEQUENCE_ID_FAMILY); 417 try (Table table = conn.getTable(tableName)) { 418 table.delete(delete); 419 } catch (IOException e) { 420 throw new ReplicationException("failed to removeLastSequenceIds, peerId=" + peerId, e); 421 } 422 } 423 424 @Override 425 public void removeLastSequenceIds(String peerId, List<String> encodedRegionNames) 426 throws ReplicationException { 427 Delete delete = new Delete(Bytes.toBytes(peerId)); 428 encodedRegionNames.forEach(n -> delete.addColumns(LAST_SEQUENCE_ID_FAMILY, Bytes.toBytes(n))); 429 try (Table table = conn.getTable(tableName)) { 430 table.delete(delete); 431 } catch (IOException e) { 432 throw new ReplicationException("failed to removeLastSequenceIds, peerId=" + peerId 433 + ", encodedRegionNames=" + encodedRegionNames, e); 434 } 435 } 436 437 @Override 438 public void removePeerFromHFileRefs(String peerId) throws ReplicationException { 439 try (Table table = conn.getTable(tableName)) { 440 table.delete(new Delete(Bytes.toBytes(peerId)).addFamily(HFILE_REF_FAMILY)); 441 } catch (IOException e) { 442 throw new ReplicationException("failed to removePeerFromHFileRefs, peerId=" + peerId, e); 443 } 444 } 445 446 @Override 447 public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs) 448 throws ReplicationException { 449 Put put = new Put(Bytes.toBytes(peerId)); 450 pairs.forEach(p -> put.addColumn(HFILE_REF_FAMILY, Bytes.toBytes(p.getSecond().getName()), 451 HConstants.EMPTY_BYTE_ARRAY)); 452 try (Table table = conn.getTable(tableName)) { 453 table.put(put); 454 } catch (IOException e) { 455 throw new ReplicationException( 456 "failed to addHFileRefs, peerId=" + peerId + ", pairs=" + pairs, e); 457 } 458 } 459 460 @Override 461 public void removeHFileRefs(String peerId, List<String> files) throws ReplicationException { 462 Delete delete = new Delete(Bytes.toBytes(peerId)); 463 files.forEach(f -> delete.addColumns(HFILE_REF_FAMILY, Bytes.toBytes(f))); 464 try (Table table = conn.getTable(tableName)) { 465 table.delete(delete); 466 } catch (IOException e) { 467 throw new ReplicationException( 468 "failed to removeHFileRefs, peerId=" + peerId + ", files=" + files, e); 469 } 470 } 471 472 @Override 473 public List<String> getAllPeersFromHFileRefsQueue() throws ReplicationException { 474 List<String> peerIds = new ArrayList<>(); 475 Scan scan = new Scan().addFamily(HFILE_REF_FAMILY).setReadType(ReadType.STREAM) 476 .setFilter(new KeyOnlyFilter()); 477 try (Table table = conn.getTable(tableName); ResultScanner scanner = table.getScanner(scan)) { 478 for (;;) { 479 Result result = scanner.next(); 480 if (result == null) { 481 break; 482 } 483 peerIds.add(Bytes.toString(result.getRow())); 484 } 485 } catch (IOException e) { 486 throw new ReplicationException("failed to getAllPeersFromHFileRefsQueue", e); 487 } 488 return peerIds; 489 } 490 491 private <T extends Collection<String>> T scanHFiles(Scan scan, Supplier<T> creator) 492 throws IOException { 493 T files = creator.get(); 494 try (Table table = conn.getTable(tableName); ResultScanner scanner = table.getScanner(scan)) { 495 for (;;) { 496 Result result = scanner.next(); 497 if (result == null) { 498 break; 499 } 500 CellScanner cellScanner = result.cellScanner(); 501 while (cellScanner.advance()) { 502 Cell cell = cellScanner.current(); 503 files.add(Bytes.toString(CellUtil.cloneQualifier(cell))); 504 } 505 } 506 } 507 return files; 508 } 509 510 @Override 511 public List<String> getReplicableHFiles(String peerId) throws ReplicationException { 512 // use scan to avoid getting a too large row one time, which may cause a very huge memory usage. 513 Scan scan = new Scan().addFamily(HFILE_REF_FAMILY) 514 .setStartStopRowForPrefixScan(Bytes.toBytes(peerId)).setAllowPartialResults(true); 515 try { 516 return scanHFiles(scan, ArrayList::new); 517 } catch (IOException e) { 518 throw new ReplicationException("failed to getReplicableHFiles, peerId=" + peerId, e); 519 } 520 } 521 522 @Override 523 public Set<String> getAllHFileRefs() throws ReplicationException { 524 Scan scan = new Scan().addFamily(HFILE_REF_FAMILY).setReadType(ReadType.STREAM) 525 .setAllowPartialResults(true); 526 try { 527 return scanHFiles(scan, HashSet::new); 528 } catch (IOException e) { 529 throw new ReplicationException("failed to getAllHFileRefs", e); 530 } 531 } 532 533 @Override 534 public boolean hasData() throws ReplicationException { 535 try { 536 return conn.getAdmin().tableExists(tableName); 537 } catch (IOException e) { 538 throw new ReplicationException("failed to get replication queue table", e); 539 } 540 } 541 542 @Override 543 public void batchUpdateQueues(ServerName serverName, List<ReplicationQueueData> datas) 544 throws ReplicationException { 545 List<Put> puts = new ArrayList<>(); 546 for (ReplicationQueueData data : datas) { 547 if (data.getOffsets().isEmpty()) { 548 continue; 549 } 550 Put put = new Put(Bytes.toBytes(data.getId().toString())); 551 data.getOffsets().forEach((walGroup, offset) -> { 552 put.addColumn(QUEUE_FAMILY, Bytes.toBytes(walGroup), Bytes.toBytes(offset.toString())); 553 }); 554 puts.add(put); 555 } 556 try (Table table = conn.getTable(tableName)) { 557 table.put(puts); 558 } catch (IOException e) { 559 throw new ReplicationException("failed to batch update queues", e); 560 } 561 } 562 563 @Override 564 public void batchUpdateLastSequenceIds(List<ZkLastPushedSeqId> lastPushedSeqIds) 565 throws ReplicationException { 566 Map<String, Put> peerId2Put = new HashMap<>(); 567 for (ZkLastPushedSeqId lastPushedSeqId : lastPushedSeqIds) { 568 peerId2Put 569 .computeIfAbsent(lastPushedSeqId.getPeerId(), peerId -> new Put(Bytes.toBytes(peerId))) 570 .addColumn(LAST_SEQUENCE_ID_FAMILY, Bytes.toBytes(lastPushedSeqId.getEncodedRegionName()), 571 Bytes.toBytes(lastPushedSeqId.getLastPushedSeqId())); 572 } 573 try (Table table = conn.getTable(tableName)) { 574 table 575 .put(peerId2Put.values().stream().filter(p -> !p.isEmpty()).collect(Collectors.toList())); 576 } catch (IOException e) { 577 throw new ReplicationException("failed to batch update last pushed sequence ids", e); 578 } 579 } 580 581 @Override 582 public void batchUpdateHFileRefs(String peerId, List<String> hfileRefs) 583 throws ReplicationException { 584 if (hfileRefs.isEmpty()) { 585 return; 586 } 587 Put put = new Put(Bytes.toBytes(peerId)); 588 for (String ref : hfileRefs) { 589 put.addColumn(HFILE_REF_FAMILY, Bytes.toBytes(ref), HConstants.EMPTY_BYTE_ARRAY); 590 } 591 try (Table table = conn.getTable(tableName)) { 592 table.put(put); 593 } catch (IOException e) { 594 throw new ReplicationException("failed to batch update hfile references", e); 595 } 596 } 597 598 @Override 599 public void removeLastSequenceIdsAndHFileRefsBefore(long ts) throws ReplicationException { 600 try (Table table = conn.getTable(tableName); 601 ResultScanner scanner = table.getScanner(new Scan().addFamily(LAST_SEQUENCE_ID_FAMILY) 602 .addFamily(HFILE_REF_FAMILY).setFilter(new KeyOnlyFilter()))) { 603 for (;;) { 604 Result r = scanner.next(); 605 if (r == null) { 606 break; 607 } 608 Delete delete = new Delete(r.getRow()).addFamily(LAST_SEQUENCE_ID_FAMILY, ts) 609 .addFamily(HFILE_REF_FAMILY, ts); 610 table.delete(delete); 611 } 612 } catch (IOException e) { 613 throw new ReplicationException( 614 "failed to remove last sequence ids and hfile references before timestamp " + ts, e); 615 } 616 } 617}