001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.OFFSET_COLUMN;
021import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_ENABLED_DEFAULT;
022import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_ENABLED_KEY;
023import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_INFO_FAMILY;
024import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME;
025import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.RS_COLUMN;
026import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.TIMESTAMP_COLUMN;
027import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.WAL_NAME_COLUMN;
028
029import java.io.ByteArrayInputStream;
030import java.io.IOException;
031import java.util.ArrayList;
032import java.util.Collection;
033import java.util.Collections;
034import java.util.HashMap;
035import java.util.List;
036import java.util.Map;
037import java.util.Map.Entry;
038import java.util.TreeMap;
039import java.util.UUID;
040import java.util.concurrent.Future;
041import java.util.concurrent.atomic.AtomicLong;
042import java.util.stream.Collectors;
043import org.apache.commons.lang3.StringUtils;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.hbase.Cell;
047import org.apache.hadoop.hbase.CellScanner;
048import org.apache.hadoop.hbase.CellUtil;
049import org.apache.hadoop.hbase.HBaseConfiguration;
050import org.apache.hadoop.hbase.HConstants;
051import org.apache.hadoop.hbase.TableName;
052import org.apache.hadoop.hbase.TableNotFoundException;
053import org.apache.hadoop.hbase.client.AsyncClusterConnection;
054import org.apache.hadoop.hbase.client.AsyncTable;
055import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
056import org.apache.hadoop.hbase.client.Delete;
057import org.apache.hadoop.hbase.client.Mutation;
058import org.apache.hadoop.hbase.client.Put;
059import org.apache.hadoop.hbase.client.RetriesExhaustedException;
060import org.apache.hadoop.hbase.client.Row;
061import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
062import org.apache.hadoop.hbase.replication.ReplicationUtils;
063import org.apache.hadoop.hbase.security.UserProvider;
064import org.apache.hadoop.hbase.util.Bytes;
065import org.apache.hadoop.hbase.util.FutureUtils;
066import org.apache.hadoop.hbase.util.Pair;
067import org.apache.hadoop.hbase.wal.WALEdit;
068import org.apache.yetus.audience.InterfaceAudience;
069import org.slf4j.Logger;
070import org.slf4j.LoggerFactory;
071
072import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
073
074import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
075import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
079
080/**
081 * <p>
082 * This class is responsible for replicating the edits coming from another cluster.
083 * </p>
084 * <p>
085 * This replication process is currently waiting for the edits to be applied before the method can
086 * return. This means that the replication of edits is synchronized (after reading from WALs in
087 * ReplicationSource) and that a single region server cannot receive edits from two sources at the
088 * same time
089 * </p>
090 * <p>
091 * This class uses the native HBase client in order to replicate entries.
092 * </p>
093 * TODO make this class more like ReplicationSource wrt log handling
094 */
095@InterfaceAudience.Private
096public class ReplicationSink {
097
098  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSink.class);
099  private final Configuration conf;
100  // Volatile because of note in here -- look for double-checked locking:
101  // http://www.oracle.com/technetwork/articles/javase/bloch-effective-08-qa-140880.html
102  private volatile AsyncClusterConnection sharedConn;
103  private final MetricsSink metrics;
104  private final AtomicLong totalReplicatedEdits = new AtomicLong();
105  private final Object sharedConnLock = new Object();
106  // Number of hfiles that we successfully replicated
107  private long hfilesReplicated = 0;
108  private SourceFSConfigurationProvider provider;
109  private WALEntrySinkFilter walEntrySinkFilter;
110
111  /**
112   * Row size threshold for multi requests above which a warning is logged
113   */
114  private final int rowSizeWarnThreshold;
115  private boolean replicationSinkTrackerEnabled;
116
117  private final RegionServerCoprocessorHost rsServerHost;
118
119  /**
120   * Create a sink for replication
121   * @param conf conf object
122   * @throws IOException thrown when HDFS goes bad or bad file name
123   */
124  public ReplicationSink(Configuration conf, RegionServerCoprocessorHost rsServerHost)
125    throws IOException {
126    this.conf = HBaseConfiguration.create(conf);
127    this.rsServerHost = rsServerHost;
128    rowSizeWarnThreshold =
129      conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
130    replicationSinkTrackerEnabled = conf.getBoolean(REPLICATION_SINK_TRACKER_ENABLED_KEY,
131      REPLICATION_SINK_TRACKER_ENABLED_DEFAULT);
132    decorateConf();
133    this.metrics = new MetricsSink();
134    this.walEntrySinkFilter = setupWALEntrySinkFilter();
135    String className = conf.get("hbase.replication.source.fs.conf.provider",
136      DefaultSourceFSConfigurationProvider.class.getCanonicalName());
137    try {
138      Class<? extends SourceFSConfigurationProvider> c =
139        Class.forName(className).asSubclass(SourceFSConfigurationProvider.class);
140      this.provider = c.getDeclaredConstructor().newInstance();
141    } catch (Exception e) {
142      throw new IllegalArgumentException(
143        "Configured source fs configuration provider class " + className + " throws error.", e);
144    }
145  }
146
147  private WALEntrySinkFilter setupWALEntrySinkFilter() throws IOException {
148    Class<?> walEntryFilterClass =
149      this.conf.getClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY, null);
150    WALEntrySinkFilter filter = null;
151    try {
152      filter = walEntryFilterClass == null
153        ? null
154        : (WALEntrySinkFilter) walEntryFilterClass.getDeclaredConstructor().newInstance();
155    } catch (Exception e) {
156      LOG.warn("Failed to instantiate " + walEntryFilterClass);
157    }
158    if (filter != null) {
159      filter.init(getConnection());
160    }
161    return filter;
162  }
163
164  /**
165   * decorate the Configuration object to make replication more receptive to delays: lessen the
166   * timeout and numTries.
167   */
168  private void decorateConf() {
169    this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
170      this.conf.getInt("replication.sink.client.retries.number", 4));
171    this.conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
172      this.conf.getInt("replication.sink.client.ops.timeout", 10000));
173    String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
174    if (StringUtils.isNotEmpty(replicationCodec)) {
175      this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
176    }
177    // use server ZK cluster for replication, so we unset the client ZK related properties if any
178    if (this.conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) {
179      this.conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM);
180    }
181  }
182
183  /**
184   * Replicate this array of entries directly into the local cluster using the native client. Only
185   * operates against raw protobuf type saving on a conversion from pb to pojo.
186   * @param entries                    WAL entries to be replicated.
187   * @param cells                      cell scanner for iteration.
188   * @param replicationClusterId       Id which will uniquely identify source cluster FS client
189   *                                   configurations in the replication configuration directory
190   * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
191   *                                   directory
192   * @param sourceHFileArchiveDirPath  Path that point to the source cluster hfile archive directory
193   * @throws IOException If failed to replicate the data
194   */
195  public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
196    String replicationClusterId, String sourceBaseNamespaceDirPath,
197    String sourceHFileArchiveDirPath) throws IOException {
198    if (entries.isEmpty()) {
199      return;
200    }
201    // Very simple optimization where we batch sequences of rows going
202    // to the same table.
203    try {
204      long totalReplicated = 0;
205      // Map of table => list of Rows, grouped by cluster id, we only want to flushCommits once per
206      // invocation of this method per table and cluster id.
207      Map<TableName, Map<List<UUID>, List<Row>>> rowMap = new TreeMap<>();
208
209      Map<List<String>, Map<String, List<Pair<byte[], List<String>>>>> bulkLoadsPerClusters = null;
210      Pair<List<Mutation>, List<WALEntry>> mutationsToWalEntriesPairs =
211        new Pair<>(new ArrayList<>(), new ArrayList<>());
212      for (WALEntry entry : entries) {
213        TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray());
214        if (this.walEntrySinkFilter != null) {
215          if (this.walEntrySinkFilter.filter(table, entry.getKey().getWriteTime())) {
216            // Skip Cells in CellScanner associated with this entry.
217            int count = entry.getAssociatedCellCount();
218            for (int i = 0; i < count; i++) {
219              // Throw index out of bounds if our cell count is off
220              if (!cells.advance()) {
221                this.metrics.incrementFailedBatches();
222                throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
223              }
224            }
225            continue;
226          }
227        }
228        Cell previousCell = null;
229        Mutation mutation = null;
230        int count = entry.getAssociatedCellCount();
231        for (int i = 0; i < count; i++) {
232          // Throw index out of bounds if our cell count is off
233          if (!cells.advance()) {
234            this.metrics.incrementFailedBatches();
235            throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
236          }
237          Cell cell = cells.current();
238          // Handle bulk load hfiles replication
239          if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
240            BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
241            if (bld.getReplicate()) {
242              if (bulkLoadsPerClusters == null) {
243                bulkLoadsPerClusters = new HashMap<>();
244              }
245              // Map of table name Vs list of pair of family and list of
246              // hfile paths from its namespace
247              Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
248                bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(), k -> new HashMap<>());
249              buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
250            }
251          } else if (CellUtil.matchingQualifier(cell, WALEdit.REPLICATION_MARKER)) {
252            Mutation put = processReplicationMarkerEntry(cell);
253            if (put == null) {
254              continue;
255            }
256            table = REPLICATION_SINK_TRACKER_TABLE_NAME;
257            List<UUID> clusterIds = new ArrayList<>();
258            for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) {
259              clusterIds.add(toUUID(clusterId));
260            }
261            put.setClusterIds(clusterIds);
262            addToHashMultiMap(rowMap, table, clusterIds, put);
263          } else {
264            // Handle wal replication
265            if (isNewRowOrType(previousCell, cell)) {
266              // Create new mutation
267              mutation = CellUtil.isDelete(cell)
268                ? new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())
269                : new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
270              List<UUID> clusterIds = new ArrayList<>(entry.getKey().getClusterIdsList().size());
271              for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) {
272                clusterIds.add(toUUID(clusterId));
273              }
274              mutation.setClusterIds(clusterIds);
275              mutation.setAttribute(ReplicationUtils.REPLICATION_ATTR_NAME,
276                HConstants.EMPTY_BYTE_ARRAY);
277              if (rsServerHost != null) {
278                rsServerHost.preReplicationSinkBatchMutate(entry, mutation);
279                mutationsToWalEntriesPairs.getFirst().add(mutation);
280                mutationsToWalEntriesPairs.getSecond().add(entry);
281              }
282              addToHashMultiMap(rowMap, table, clusterIds, mutation);
283            }
284            if (CellUtil.isDelete(cell)) {
285              ((Delete) mutation).add(cell);
286            } else {
287              ((Put) mutation).add(cell);
288            }
289            previousCell = cell;
290          }
291        }
292        totalReplicated++;
293      }
294
295      // TODO Replicating mutations and bulk loaded data can be made parallel
296      if (!rowMap.isEmpty()) {
297        LOG.debug("Started replicating mutations.");
298        for (Entry<TableName, Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) {
299          batch(entry.getKey(), entry.getValue().values(), rowSizeWarnThreshold);
300        }
301        LOG.debug("Finished replicating mutations.");
302      }
303
304      if (rsServerHost != null) {
305        List<Mutation> mutations = mutationsToWalEntriesPairs.getFirst();
306        List<WALEntry> walEntries = mutationsToWalEntriesPairs.getSecond();
307        for (int i = 0; i < mutations.size(); i++) {
308          rsServerHost.postReplicationSinkBatchMutate(walEntries.get(i), mutations.get(i));
309        }
310      }
311
312      if (bulkLoadsPerClusters != null) {
313        for (Entry<List<String>,
314          Map<String, List<Pair<byte[], List<String>>>>> entry : bulkLoadsPerClusters.entrySet()) {
315          Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = entry.getValue();
316          if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) {
317            LOG.debug("Replicating {} bulk loaded data", entry.getKey().toString());
318            Configuration providerConf = this.provider.getConf(this.conf, replicationClusterId);
319            try (HFileReplicator hFileReplicator = new HFileReplicator(providerConf,
320              sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf,
321              getConnection(), entry.getKey())) {
322              hFileReplicator.replicate();
323              LOG.debug("Finished replicating {} bulk loaded data", entry.getKey().toString());
324            }
325          }
326        }
327      }
328
329      int size = entries.size();
330      this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime());
331      this.metrics.applyBatch(size + hfilesReplicated, hfilesReplicated);
332      this.totalReplicatedEdits.addAndGet(totalReplicated);
333    } catch (IOException ex) {
334      LOG.error("Unable to accept edit because:", ex);
335      this.metrics.incrementFailedBatches();
336      throw ex;
337    }
338  }
339
340  /*
341   * First check if config key hbase.regionserver.replication.sink.tracker.enabled is true or not.
342   * If false, then ignore this cell. If set to true, de-serialize value into
343   * ReplicationTrackerDescriptor. Create a Put mutation with regionserver name, walname, offset and
344   * timestamp from ReplicationMarkerDescriptor.
345   */
346  private Put processReplicationMarkerEntry(Cell cell) throws IOException {
347    // If source is emitting replication marker rows but sink is not accepting them,
348    // ignore the edits.
349    if (!replicationSinkTrackerEnabled) {
350      return null;
351    }
352    WALProtos.ReplicationMarkerDescriptor descriptor =
353      WALProtos.ReplicationMarkerDescriptor.parseFrom(new ByteArrayInputStream(cell.getValueArray(),
354        cell.getValueOffset(), cell.getValueLength()));
355    Put put = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
356    put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, RS_COLUMN, cell.getTimestamp(),
357      (Bytes.toBytes(descriptor.getRegionServerName())));
358    put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, WAL_NAME_COLUMN, cell.getTimestamp(),
359      Bytes.toBytes(descriptor.getWalName()));
360    put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, TIMESTAMP_COLUMN, cell.getTimestamp(),
361      Bytes.toBytes(cell.getTimestamp()));
362    put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, OFFSET_COLUMN, cell.getTimestamp(),
363      Bytes.toBytes(descriptor.getOffset()));
364    return put;
365  }
366
367  private void buildBulkLoadHFileMap(
368    final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table,
369    BulkLoadDescriptor bld) throws IOException {
370    List<StoreDescriptor> storesList = bld.getStoresList();
371    int storesSize = storesList.size();
372    for (int j = 0; j < storesSize; j++) {
373      StoreDescriptor storeDescriptor = storesList.get(j);
374      List<String> storeFileList = storeDescriptor.getStoreFileList();
375      int storeFilesSize = storeFileList.size();
376      hfilesReplicated += storeFilesSize;
377      for (int k = 0; k < storeFilesSize; k++) {
378        byte[] family = storeDescriptor.getFamilyName().toByteArray();
379
380        // Build hfile relative path from its namespace
381        String pathToHfileFromNS = getHFilePath(table, bld, storeFileList.get(k), family);
382        String tableName = table.getNameWithNamespaceInclAsString();
383        List<Pair<byte[], List<String>>> familyHFilePathsList = bulkLoadHFileMap.get(tableName);
384        if (familyHFilePathsList != null) {
385          boolean foundFamily = false;
386          for (Pair<byte[], List<String>> familyHFilePathsPair : familyHFilePathsList) {
387            if (Bytes.equals(familyHFilePathsPair.getFirst(), family)) {
388              // Found family already present, just add the path to the existing list
389              familyHFilePathsPair.getSecond().add(pathToHfileFromNS);
390              foundFamily = true;
391              break;
392            }
393          }
394          if (!foundFamily) {
395            // Family not found, add this family and its hfile paths pair to the list
396            addFamilyAndItsHFilePathToTableInMap(family, pathToHfileFromNS, familyHFilePathsList);
397          }
398        } else {
399          // Add this table entry into the map
400          addNewTableEntryInMap(bulkLoadHFileMap, family, pathToHfileFromNS, tableName);
401        }
402      }
403    }
404  }
405
406  private void addFamilyAndItsHFilePathToTableInMap(byte[] family, String pathToHfileFromNS,
407    List<Pair<byte[], List<String>>> familyHFilePathsList) {
408    List<String> hfilePaths = new ArrayList<>(1);
409    hfilePaths.add(pathToHfileFromNS);
410    familyHFilePathsList.add(new Pair<>(family, hfilePaths));
411  }
412
413  private void addNewTableEntryInMap(
414    final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, byte[] family,
415    String pathToHfileFromNS, String tableName) {
416    List<String> hfilePaths = new ArrayList<>(1);
417    hfilePaths.add(pathToHfileFromNS);
418    Pair<byte[], List<String>> newFamilyHFilePathsPair = new Pair<>(family, hfilePaths);
419    List<Pair<byte[], List<String>>> newFamilyHFilePathsList = new ArrayList<>();
420    newFamilyHFilePathsList.add(newFamilyHFilePathsPair);
421    bulkLoadHFileMap.put(tableName, newFamilyHFilePathsList);
422  }
423
424  private String getHFilePath(TableName table, BulkLoadDescriptor bld, String storeFile,
425    byte[] family) {
426    return new StringBuilder(100).append(table.getNamespaceAsString()).append(Path.SEPARATOR)
427      .append(table.getQualifierAsString()).append(Path.SEPARATOR)
428      .append(Bytes.toString(bld.getEncodedRegionName().toByteArray())).append(Path.SEPARATOR)
429      .append(Bytes.toString(family)).append(Path.SEPARATOR).append(storeFile).toString();
430  }
431
432  /** Returns True if we have crossed over onto a new row or type */
433  private boolean isNewRowOrType(final Cell previousCell, final Cell cell) {
434    return previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
435      || !CellUtil.matchingRows(previousCell, cell);
436  }
437
438  private java.util.UUID toUUID(final HBaseProtos.UUID uuid) {
439    return new java.util.UUID(uuid.getMostSigBits(), uuid.getLeastSigBits());
440  }
441
442  /**
443   * Simple helper to a map from key to (a list of) values TODO: Make a general utility method
444   * @return the list of values corresponding to key1 and key2
445   */
446  private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2, List<V>>> map, K1 key1, K2 key2,
447    V value) {
448    Map<K2, List<V>> innerMap = map.computeIfAbsent(key1, k -> new HashMap<>());
449    List<V> values = innerMap.computeIfAbsent(key2, k -> new ArrayList<>());
450    values.add(value);
451    return values;
452  }
453
454  /**
455   * stop the thread pool executor. It is called when the regionserver is stopped.
456   */
457  public void stopReplicationSinkServices() {
458    try {
459      if (this.sharedConn != null) {
460        synchronized (sharedConnLock) {
461          if (this.sharedConn != null) {
462            this.sharedConn.close();
463            this.sharedConn = null;
464          }
465        }
466      }
467    } catch (IOException e) {
468      LOG.warn("IOException while closing the connection", e); // ignoring as we are closing.
469    }
470  }
471
472  /**
473   * Do the changes and handle the pool
474   * @param tableName             table to insert into
475   * @param allRows               list of actions
476   * @param batchRowSizeThreshold rowSize threshold for batch mutation
477   */
478  private void batch(TableName tableName, Collection<List<Row>> allRows, int batchRowSizeThreshold)
479    throws IOException {
480    if (allRows.isEmpty()) {
481      return;
482    }
483    AsyncTable<?> table = getConnection().getTable(tableName);
484    List<Future<?>> futures = new ArrayList<>();
485    for (List<Row> rows : allRows) {
486      List<List<Row>> batchRows;
487      if (rows.size() > batchRowSizeThreshold) {
488        batchRows = Lists.partition(rows, batchRowSizeThreshold);
489      } else {
490        batchRows = Collections.singletonList(rows);
491      }
492      futures.addAll(batchRows.stream().map(table::batchAll).collect(Collectors.toList()));
493    }
494    for (Future<?> future : futures) {
495      try {
496        FutureUtils.get(future);
497      } catch (RetriesExhaustedException e) {
498        if (e.getCause() instanceof TableNotFoundException) {
499          throw new TableNotFoundException("'" + tableName + "'");
500        }
501        throw e;
502      }
503    }
504  }
505
506  private AsyncClusterConnection getConnection() throws IOException {
507    // See https://en.wikipedia.org/wiki/Double-checked_locking
508    AsyncClusterConnection connection = sharedConn;
509    if (connection == null) {
510      synchronized (sharedConnLock) {
511        connection = sharedConn;
512        if (connection == null) {
513          connection = ClusterConnectionFactory.createAsyncClusterConnection(conf, null,
514            UserProvider.instantiate(conf).getCurrent());
515          sharedConn = connection;
516        }
517      }
518    }
519    return connection;
520  }
521
522  /**
523   * Get a string representation of this sink's metrics
524   * @return string with the total replicated edits count and the date of the last edit that was
525   *         applied
526   */
527  public String getStats() {
528    long total = this.totalReplicatedEdits.get();
529    return total == 0
530      ? ""
531      : "Sink: " + "age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp()
532        + ", total replicated edits: " + total;
533  }
534
535  /**
536   * Get replication Sink Metrics
537   */
538  public MetricsSink getSinkMetrics() {
539    return this.metrics;
540  }
541}