001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.NavigableMap;
029import java.util.Set;
030import java.util.function.Supplier;
031import java.util.stream.Collectors;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CellScanner;
035import org.apache.hadoop.hbase.CellUtil;
036import org.apache.hadoop.hbase.CompareOperator;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.AsyncTable;
041import org.apache.hadoop.hbase.client.Connection;
042import org.apache.hadoop.hbase.client.Delete;
043import org.apache.hadoop.hbase.client.Get;
044import org.apache.hadoop.hbase.client.Put;
045import org.apache.hadoop.hbase.client.Result;
046import org.apache.hadoop.hbase.client.ResultScanner;
047import org.apache.hadoop.hbase.client.Scan;
048import org.apache.hadoop.hbase.client.Scan.ReadType;
049import org.apache.hadoop.hbase.client.Table;
050import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
051import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.util.FutureUtils;
054import org.apache.hadoop.hbase.util.Pair;
055import org.apache.yetus.audience.InterfaceAudience;
056
057import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
058
059import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
060import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos;
063
064/**
065 * HBase table based replication queue storage.
066 */
067@InterfaceAudience.Private
068public class TableReplicationQueueStorage implements ReplicationQueueStorage {
069
070  public static final byte[] QUEUE_FAMILY = Bytes.toBytes("queue");
071
072  public static final byte[] LAST_SEQUENCE_ID_FAMILY = Bytes.toBytes("sid");
073
074  public static final byte[] HFILE_REF_FAMILY = Bytes.toBytes("hfileref");
075
076  private final Connection conn;
077
078  private final TableName tableName;
079
080  public TableReplicationQueueStorage(Connection conn, TableName tableName) {
081    this.conn = conn;
082    this.tableName = tableName;
083  }
084
085  private void addLastSeqIdsPut(MultiRowMutationProtos.MutateRowsRequest.Builder builder,
086    String peerId, Map<String, Long> lastSeqIds, AsyncTable<?> table) throws IOException {
087    // get the previous sequence ids first
088    byte[] row = Bytes.toBytes(peerId);
089    Get get = new Get(row);
090    lastSeqIds.keySet().forEach(encodedRegionName -> get.addColumn(LAST_SEQUENCE_ID_FAMILY,
091      Bytes.toBytes(encodedRegionName)));
092    Result result = FutureUtils.get(table.get(get));
093    Put put = new Put(row);
094    for (Map.Entry<String, Long> entry : lastSeqIds.entrySet()) {
095      String encodedRegionName = entry.getKey();
096      long lastSeqId = entry.getValue();
097      byte[] encodedRegionNameAsBytes = Bytes.toBytes(encodedRegionName);
098      byte[] previousLastSeqIdAsBytes =
099        result.getValue(LAST_SEQUENCE_ID_FAMILY, encodedRegionNameAsBytes);
100      if (previousLastSeqIdAsBytes != null) {
101        long previousLastSeqId = Bytes.toLong(previousLastSeqIdAsBytes);
102        if (lastSeqId > previousLastSeqId) {
103          // update last seq id when it is greater, and use CAS to make sure we do not overwrite
104          // other's value.
105          put.addColumn(LAST_SEQUENCE_ID_FAMILY, encodedRegionNameAsBytes,
106            Bytes.toBytes(lastSeqId));
107          builder.addCondition(ProtobufUtil.toCondition(row, LAST_SEQUENCE_ID_FAMILY,
108            encodedRegionNameAsBytes, CompareOperator.EQUAL, previousLastSeqIdAsBytes, null));
109        }
110      } else {
111        // also update last seq id when there is no value yet, and use CAS to make sure we do not
112        // overwrite
113        // other's value.
114        put.addColumn(LAST_SEQUENCE_ID_FAMILY, encodedRegionNameAsBytes, Bytes.toBytes(lastSeqId));
115        builder.addCondition(ProtobufUtil.toCondition(row, LAST_SEQUENCE_ID_FAMILY,
116          encodedRegionNameAsBytes, CompareOperator.EQUAL, null, null));
117      }
118    }
119    if (!put.isEmpty()) {
120      builder.addMutationRequest(ProtobufUtil.toMutation(MutationType.PUT, put));
121    }
122  }
123
124  @Override
125  public void setOffset(ReplicationQueueId queueId, String walGroup, ReplicationGroupOffset offset,
126    Map<String, Long> lastSeqIds) throws ReplicationException {
127    Put put = new Put(Bytes.toBytes(queueId.toString())).addColumn(QUEUE_FAMILY,
128      Bytes.toBytes(walGroup), Bytes.toBytes(offset.toString()));
129    AsyncTable<?> asyncTable = conn.toAsyncConnection().getTable(tableName);
130    try {
131      if (lastSeqIds.isEmpty()) {
132        FutureUtils.get(asyncTable.put(put));
133      } else {
134        for (;;) {
135          MultiRowMutationProtos.MutateRowsRequest.Builder builder =
136            MultiRowMutationProtos.MutateRowsRequest.newBuilder();
137          addLastSeqIdsPut(builder, queueId.getPeerId(), lastSeqIds, asyncTable);
138          if (builder.getMutationRequestCount() > 0) {
139            // use MultiRowMutationService to atomically update offset and last sequence ids
140            MultiRowMutationProtos.MutateRowsRequest request =
141              builder.addMutationRequest(ProtobufUtil.toMutation(MutationType.PUT, put)).build();
142            MultiRowMutationProtos.MutateRowsResponse responose =
143              FutureUtils.get(asyncTable.<MultiRowMutationProtos.MultiRowMutationService.Interface,
144                MultiRowMutationProtos.MutateRowsResponse> coprocessorService(
145                  MultiRowMutationProtos.MultiRowMutationService::newStub,
146                  (stub, controller, done) -> stub.mutateRows(controller, request, done),
147                  put.getRow()));
148            if (responose.getProcessed()) {
149              break;
150            }
151          } else {
152            // we do not need to update last seq id, fallback to single put
153            FutureUtils.get(asyncTable.put(put));
154            break;
155          }
156        }
157      }
158    } catch (IOException e) {
159      throw new ReplicationException("failed to setOffset, queueId=" + queueId + ", walGroup="
160        + walGroup + ", offset=" + offset + ", lastSeqIds=" + lastSeqIds, e);
161    }
162  }
163
164  private ImmutableMap<String, ReplicationGroupOffset> parseOffsets(Result result) {
165    ImmutableMap.Builder<String, ReplicationGroupOffset> builder =
166      ImmutableMap.builderWithExpectedSize(result.size());
167    NavigableMap<byte[], byte[]> map = result.getFamilyMap(QUEUE_FAMILY);
168    if (map != null) {
169      map.forEach((k, v) -> {
170        String walGroup = Bytes.toString(k);
171        ReplicationGroupOffset offset = ReplicationGroupOffset.parse(Bytes.toString(v));
172        builder.put(walGroup, offset);
173      });
174    }
175    return builder.build();
176  }
177
178  private Map<String, ReplicationGroupOffset> getOffsets0(Table table, ReplicationQueueId queueId)
179    throws IOException {
180    Result result = table.get(new Get(Bytes.toBytes(queueId.toString())).addFamily(QUEUE_FAMILY));
181    return parseOffsets(result);
182  }
183
184  @Override
185  public Map<String, ReplicationGroupOffset> getOffsets(ReplicationQueueId queueId)
186    throws ReplicationException {
187    try (Table table = conn.getTable(tableName)) {
188      return getOffsets0(table, queueId);
189    } catch (IOException e) {
190      throw new ReplicationException("failed to getOffsets, queueId=" + queueId, e);
191    }
192  }
193
194  private void listAllQueueIds(Table table, Scan scan, List<ReplicationQueueId> queueIds)
195    throws IOException {
196    try (ResultScanner scanner = table.getScanner(scan)) {
197      for (;;) {
198        Result result = scanner.next();
199        if (result == null) {
200          break;
201        }
202        ReplicationQueueId queueId = ReplicationQueueId.parse(Bytes.toString(result.getRow()));
203        queueIds.add(queueId);
204      }
205    }
206  }
207
208  private void listAllQueueIds(Table table, String peerId, ServerName serverName,
209    List<ReplicationQueueId> queueIds) throws IOException {
210    listAllQueueIds(table,
211      new Scan().setStartStopRowForPrefixScan(ReplicationQueueId.getScanPrefix(serverName, peerId))
212        .addFamily(QUEUE_FAMILY).setFilter(new KeyOnlyFilter()),
213      queueIds);
214  }
215
216  @Override
217  public List<ReplicationQueueId> listAllQueueIds(String peerId) throws ReplicationException {
218    Scan scan = new Scan().setStartStopRowForPrefixScan(ReplicationQueueId.getScanPrefix(peerId))
219      .addFamily(QUEUE_FAMILY).setFilter(new KeyOnlyFilter());
220    List<ReplicationQueueId> queueIds = new ArrayList<>();
221    try (Table table = conn.getTable(tableName)) {
222      listAllQueueIds(table, scan, queueIds);
223    } catch (IOException e) {
224      throw new ReplicationException("failed to listAllQueueIds, peerId=" + peerId, e);
225    }
226    return queueIds;
227  }
228
229  @Override
230  public List<ReplicationQueueId> listAllQueueIds(ServerName serverName)
231    throws ReplicationException {
232    List<ReplicationQueueId> queueIds = new ArrayList<>();
233    try (Table table = conn.getTable(tableName)) {
234      KeyOnlyFilter keyOnlyFilter = new KeyOnlyFilter();
235      String previousPeerId = null;
236      for (;;) {
237        // first, get the next peerId
238        Scan peerScan =
239          new Scan().addFamily(QUEUE_FAMILY).setOneRowLimit().setFilter(keyOnlyFilter);
240        if (previousPeerId != null) {
241          peerScan.withStartRow(ReplicationQueueId.getScanStartRowForNextPeerId(previousPeerId));
242        }
243        String peerId;
244        try (ResultScanner scanner = table.getScanner(peerScan)) {
245          Result result = scanner.next();
246          if (result == null) {
247            // no more peers, break
248            break;
249          }
250          peerId = ReplicationQueueId.getPeerId(Bytes.toString(result.getRow()));
251        }
252        listAllQueueIds(table, peerId, serverName, queueIds);
253        previousPeerId = peerId;
254      }
255    } catch (IOException e) {
256      throw new ReplicationException("failed to listAllQueueIds, serverName=" + serverName, e);
257    }
258    return queueIds;
259  }
260
261  @Override
262  public List<ReplicationQueueId> listAllQueueIds(String peerId, ServerName serverName)
263    throws ReplicationException {
264    List<ReplicationQueueId> queueIds = new ArrayList<>();
265    try (Table table = conn.getTable(tableName)) {
266      listAllQueueIds(table, peerId, serverName, queueIds);
267    } catch (IOException e) {
268      throw new ReplicationException(
269        "failed to listAllQueueIds, peerId=" + peerId + ", serverName=" + serverName, e);
270    }
271    return queueIds;
272  }
273
274  @Override
275  public List<ReplicationQueueData> listAllQueues() throws ReplicationException {
276    List<ReplicationQueueData> queues = new ArrayList<>();
277    Scan scan = new Scan().addFamily(QUEUE_FAMILY).setReadType(ReadType.STREAM);
278    try (Table table = conn.getTable(tableName); ResultScanner scanner = table.getScanner(scan)) {
279      for (;;) {
280        Result result = scanner.next();
281        if (result == null) {
282          break;
283        }
284        ReplicationQueueId queueId = ReplicationQueueId.parse(Bytes.toString(result.getRow()));
285        ReplicationQueueData queueData = new ReplicationQueueData(queueId, parseOffsets(result));
286        queues.add(queueData);
287      }
288    } catch (IOException e) {
289      throw new ReplicationException("failed to listAllQueues", e);
290    }
291    return queues;
292  }
293
294  @Override
295  public List<ServerName> listAllReplicators() throws ReplicationException {
296    Set<ServerName> replicators = new HashSet<>();
297    Scan scan = new Scan().addFamily(QUEUE_FAMILY).setFilter(new KeyOnlyFilter())
298      .setReadType(ReadType.STREAM);
299    try (Table table = conn.getTable(tableName); ResultScanner scanner = table.getScanner(scan)) {
300      for (;;) {
301        Result result = scanner.next();
302        if (result == null) {
303          break;
304        }
305        ReplicationQueueId queueId = ReplicationQueueId.parse(Bytes.toString(result.getRow()));
306        replicators.add(queueId.getServerName());
307      }
308    } catch (IOException e) {
309      throw new ReplicationException("failed to listAllReplicators", e);
310    }
311    return new ArrayList<>(replicators);
312  }
313
314  @Override
315  public Map<String, ReplicationGroupOffset> claimQueue(ReplicationQueueId queueId,
316    ServerName targetServerName) throws ReplicationException {
317    ReplicationQueueId newQueueId = queueId.claim(targetServerName);
318    byte[] coprocessorRow = ReplicationQueueId.getScanPrefix(queueId.getPeerId());
319    AsyncTable<?> asyncTable = conn.toAsyncConnection().getTable(tableName);
320    try (Table table = conn.getTable(tableName)) {
321      for (;;) {
322        Map<String, ReplicationGroupOffset> offsets = getOffsets0(table, queueId);
323        if (offsets.isEmpty()) {
324          return Collections.emptyMap();
325        }
326        Map.Entry<String, ReplicationGroupOffset> entry = offsets.entrySet().iterator().next();
327        ClientProtos.Condition condition = ProtobufUtil.toCondition(
328          Bytes.toBytes(queueId.toString()), QUEUE_FAMILY, Bytes.toBytes(entry.getKey()),
329          CompareOperator.EQUAL, Bytes.toBytes(entry.getValue().toString()), null);
330        Delete delete = new Delete(Bytes.toBytes(queueId.toString())).addFamily(QUEUE_FAMILY);
331        Put put = new Put(Bytes.toBytes(newQueueId.toString()));
332        offsets.forEach((walGroup, offset) -> put.addColumn(QUEUE_FAMILY, Bytes.toBytes(walGroup),
333          Bytes.toBytes(offset.toString())));
334        MultiRowMutationProtos.MutateRowsRequest request =
335          MultiRowMutationProtos.MutateRowsRequest.newBuilder().addCondition(condition)
336            .addMutationRequest(ProtobufUtil.toMutation(MutationType.DELETE, delete))
337            .addMutationRequest(ProtobufUtil.toMutation(MutationType.PUT, put)).build();
338        MultiRowMutationProtos.MutateRowsResponse resp =
339          FutureUtils.get(asyncTable.<MultiRowMutationProtos.MultiRowMutationService.Interface,
340            MultiRowMutationProtos.MutateRowsResponse> coprocessorService(
341              MultiRowMutationProtos.MultiRowMutationService::newStub,
342              (stub, controller, done) -> stub.mutateRows(controller, request, done),
343              coprocessorRow));
344        if (resp.getProcessed()) {
345          return offsets;
346        }
347        // if the multi is not processed, which usually the queue has already been claimed by
348        // others, for safety, let's try claiming again, usually the next get operation above will
349        // return an empty map and we will quit the loop.
350      }
351    } catch (IOException e) {
352      throw new ReplicationException(
353        "failed to claimQueue, queueId=" + queueId + ", targetServerName=" + targetServerName, e);
354    }
355  }
356
357  @Override
358  public void removeQueue(ReplicationQueueId queueId) throws ReplicationException {
359    try (Table table = conn.getTable(tableName)) {
360      table.delete(new Delete(Bytes.toBytes(queueId.toString())).addFamily(QUEUE_FAMILY));
361    } catch (IOException e) {
362      throw new ReplicationException("failed to removeQueue, queueId=" + queueId, e);
363    }
364  }
365
366  @Override
367  public void removeAllQueues(String peerId) throws ReplicationException {
368    Scan scan = new Scan().setStartStopRowForPrefixScan(ReplicationQueueId.getScanPrefix(peerId))
369      .addFamily(QUEUE_FAMILY).setFilter(new KeyOnlyFilter());
370    try (Table table = conn.getTable(tableName); ResultScanner scanner = table.getScanner(scan)) {
371      for (;;) {
372        Result result = scanner.next();
373        if (result == null) {
374          break;
375        }
376        table.delete(new Delete(result.getRow()));
377      }
378    } catch (IOException e) {
379      throw new ReplicationException("failed to listAllQueueIds, peerId=" + peerId, e);
380    }
381  }
382
383  @Override
384  public long getLastSequenceId(String encodedRegionName, String peerId)
385    throws ReplicationException {
386    byte[] qual = Bytes.toBytes(encodedRegionName);
387    try (Table table = conn.getTable(tableName)) {
388      Result result =
389        table.get(new Get(Bytes.toBytes(peerId)).addColumn(LAST_SEQUENCE_ID_FAMILY, qual));
390      byte[] lastSeqId = result.getValue(LAST_SEQUENCE_ID_FAMILY, qual);
391      return lastSeqId != null ? Bytes.toLong(lastSeqId) : HConstants.NO_SEQNUM;
392    } catch (IOException e) {
393      throw new ReplicationException("failed to getLastSequenceId, encodedRegionName="
394        + encodedRegionName + ", peerId=" + peerId, e);
395    }
396  }
397
398  @Override
399  public void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds)
400    throws ReplicationException {
401    // No need CAS and retry here, because it'll call setLastSequenceIds() for disabled peers
402    // only, so no conflict happen.
403    Put put = new Put(Bytes.toBytes(peerId));
404    lastSeqIds.forEach((encodedRegionName, lastSeqId) -> put.addColumn(LAST_SEQUENCE_ID_FAMILY,
405      Bytes.toBytes(encodedRegionName), Bytes.toBytes(lastSeqId)));
406    try (Table table = conn.getTable(tableName)) {
407      table.put(put);
408    } catch (IOException e) {
409      throw new ReplicationException(
410        "failed to setLastSequenceIds, peerId=" + peerId + ", lastSeqIds=" + lastSeqIds, e);
411    }
412  }
413
414  @Override
415  public void removeLastSequenceIds(String peerId) throws ReplicationException {
416    Delete delete = new Delete(Bytes.toBytes(peerId)).addFamily(LAST_SEQUENCE_ID_FAMILY);
417    try (Table table = conn.getTable(tableName)) {
418      table.delete(delete);
419    } catch (IOException e) {
420      throw new ReplicationException("failed to removeLastSequenceIds, peerId=" + peerId, e);
421    }
422  }
423
424  @Override
425  public void removeLastSequenceIds(String peerId, List<String> encodedRegionNames)
426    throws ReplicationException {
427    Delete delete = new Delete(Bytes.toBytes(peerId));
428    encodedRegionNames.forEach(n -> delete.addColumns(LAST_SEQUENCE_ID_FAMILY, Bytes.toBytes(n)));
429    try (Table table = conn.getTable(tableName)) {
430      table.delete(delete);
431    } catch (IOException e) {
432      throw new ReplicationException("failed to removeLastSequenceIds, peerId=" + peerId
433        + ", encodedRegionNames=" + encodedRegionNames, e);
434    }
435  }
436
437  @Override
438  public void removePeerFromHFileRefs(String peerId) throws ReplicationException {
439    try (Table table = conn.getTable(tableName)) {
440      table.delete(new Delete(Bytes.toBytes(peerId)).addFamily(HFILE_REF_FAMILY));
441    } catch (IOException e) {
442      throw new ReplicationException("failed to removePeerFromHFileRefs, peerId=" + peerId, e);
443    }
444  }
445
446  @Override
447  public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs)
448    throws ReplicationException {
449    Put put = new Put(Bytes.toBytes(peerId));
450    pairs.forEach(p -> put.addColumn(HFILE_REF_FAMILY, Bytes.toBytes(p.getSecond().getName()),
451      HConstants.EMPTY_BYTE_ARRAY));
452    try (Table table = conn.getTable(tableName)) {
453      table.put(put);
454    } catch (IOException e) {
455      throw new ReplicationException(
456        "failed to addHFileRefs, peerId=" + peerId + ", pairs=" + pairs, e);
457    }
458  }
459
460  @Override
461  public void removeHFileRefs(String peerId, List<String> files) throws ReplicationException {
462    Delete delete = new Delete(Bytes.toBytes(peerId));
463    files.forEach(f -> delete.addColumns(HFILE_REF_FAMILY, Bytes.toBytes(f)));
464    try (Table table = conn.getTable(tableName)) {
465      table.delete(delete);
466    } catch (IOException e) {
467      throw new ReplicationException(
468        "failed to removeHFileRefs, peerId=" + peerId + ", files=" + files, e);
469    }
470  }
471
472  @Override
473  public List<String> getAllPeersFromHFileRefsQueue() throws ReplicationException {
474    List<String> peerIds = new ArrayList<>();
475    Scan scan = new Scan().addFamily(HFILE_REF_FAMILY).setReadType(ReadType.STREAM)
476      .setFilter(new KeyOnlyFilter());
477    try (Table table = conn.getTable(tableName); ResultScanner scanner = table.getScanner(scan)) {
478      for (;;) {
479        Result result = scanner.next();
480        if (result == null) {
481          break;
482        }
483        peerIds.add(Bytes.toString(result.getRow()));
484      }
485    } catch (IOException e) {
486      throw new ReplicationException("failed to getAllPeersFromHFileRefsQueue", e);
487    }
488    return peerIds;
489  }
490
491  private <T extends Collection<String>> T scanHFiles(Scan scan, Supplier<T> creator)
492    throws IOException {
493    T files = creator.get();
494    try (Table table = conn.getTable(tableName); ResultScanner scanner = table.getScanner(scan)) {
495      for (;;) {
496        Result result = scanner.next();
497        if (result == null) {
498          break;
499        }
500        CellScanner cellScanner = result.cellScanner();
501        while (cellScanner.advance()) {
502          Cell cell = cellScanner.current();
503          files.add(Bytes.toString(CellUtil.cloneQualifier(cell)));
504        }
505      }
506    }
507    return files;
508  }
509
510  @Override
511  public List<String> getReplicableHFiles(String peerId) throws ReplicationException {
512    // use scan to avoid getting a too large row one time, which may cause a very huge memory usage.
513    Scan scan = new Scan().addFamily(HFILE_REF_FAMILY)
514      .setStartStopRowForPrefixScan(Bytes.toBytes(peerId)).setAllowPartialResults(true);
515    try {
516      return scanHFiles(scan, ArrayList::new);
517    } catch (IOException e) {
518      throw new ReplicationException("failed to getReplicableHFiles, peerId=" + peerId, e);
519    }
520  }
521
522  @Override
523  public Set<String> getAllHFileRefs() throws ReplicationException {
524    Scan scan = new Scan().addFamily(HFILE_REF_FAMILY).setReadType(ReadType.STREAM)
525      .setAllowPartialResults(true);
526    try {
527      return scanHFiles(scan, HashSet::new);
528    } catch (IOException e) {
529      throw new ReplicationException("failed to getAllHFileRefs", e);
530    }
531  }
532
533  @Override
534  public boolean hasData() throws ReplicationException {
535    try {
536      return conn.getAdmin().tableExists(tableName);
537    } catch (IOException e) {
538      throw new ReplicationException("failed to get replication queue table", e);
539    }
540  }
541
542  @Override
543  public void batchUpdateQueues(ServerName serverName, List<ReplicationQueueData> datas)
544    throws ReplicationException {
545    List<Put> puts = new ArrayList<>();
546    for (ReplicationQueueData data : datas) {
547      if (data.getOffsets().isEmpty()) {
548        continue;
549      }
550      Put put = new Put(Bytes.toBytes(data.getId().toString()));
551      data.getOffsets().forEach((walGroup, offset) -> {
552        put.addColumn(QUEUE_FAMILY, Bytes.toBytes(walGroup), Bytes.toBytes(offset.toString()));
553      });
554      puts.add(put);
555    }
556    try (Table table = conn.getTable(tableName)) {
557      table.put(puts);
558    } catch (IOException e) {
559      throw new ReplicationException("failed to batch update queues", e);
560    }
561  }
562
563  @Override
564  public void batchUpdateLastSequenceIds(List<ZkLastPushedSeqId> lastPushedSeqIds)
565    throws ReplicationException {
566    Map<String, Put> peerId2Put = new HashMap<>();
567    for (ZkLastPushedSeqId lastPushedSeqId : lastPushedSeqIds) {
568      peerId2Put
569        .computeIfAbsent(lastPushedSeqId.getPeerId(), peerId -> new Put(Bytes.toBytes(peerId)))
570        .addColumn(LAST_SEQUENCE_ID_FAMILY, Bytes.toBytes(lastPushedSeqId.getEncodedRegionName()),
571          Bytes.toBytes(lastPushedSeqId.getLastPushedSeqId()));
572    }
573    try (Table table = conn.getTable(tableName)) {
574      table
575        .put(peerId2Put.values().stream().filter(p -> !p.isEmpty()).collect(Collectors.toList()));
576    } catch (IOException e) {
577      throw new ReplicationException("failed to batch update last pushed sequence ids", e);
578    }
579  }
580
581  @Override
582  public void batchUpdateHFileRefs(String peerId, List<String> hfileRefs)
583    throws ReplicationException {
584    if (hfileRefs.isEmpty()) {
585      return;
586    }
587    Put put = new Put(Bytes.toBytes(peerId));
588    for (String ref : hfileRefs) {
589      put.addColumn(HFILE_REF_FAMILY, Bytes.toBytes(ref), HConstants.EMPTY_BYTE_ARRAY);
590    }
591    try (Table table = conn.getTable(tableName)) {
592      table.put(put);
593    } catch (IOException e) {
594      throw new ReplicationException("failed to batch update hfile references", e);
595    }
596  }
597
598  @Override
599  public void removeLastSequenceIdsAndHFileRefsBefore(long ts) throws ReplicationException {
600    try (Table table = conn.getTable(tableName);
601      ResultScanner scanner = table.getScanner(new Scan().addFamily(LAST_SEQUENCE_ID_FAMILY)
602        .addFamily(HFILE_REF_FAMILY).setFilter(new KeyOnlyFilter()))) {
603      for (;;) {
604        Result r = scanner.next();
605        if (r == null) {
606          break;
607        }
608        Delete delete = new Delete(r.getRow()).addFamily(LAST_SEQUENCE_ID_FAMILY, ts)
609          .addFamily(HFILE_REF_FAMILY, ts);
610        table.delete(delete);
611      }
612    } catch (IOException e) {
613      throw new ReplicationException(
614        "failed to remove last sequence ids and hfile references before timestamp " + ts, e);
615    }
616  }
617}