001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.OFFSET_COLUMN; 021import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_ENABLED_DEFAULT; 022import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_ENABLED_KEY; 023import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_INFO_FAMILY; 024import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME; 025import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.RS_COLUMN; 026import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.TIMESTAMP_COLUMN; 027import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.WAL_NAME_COLUMN; 028 029import java.io.ByteArrayInputStream; 030import java.io.IOException; 031import java.util.ArrayList; 032import java.util.Collection; 033import java.util.Collections; 034import java.util.HashMap; 035import java.util.List; 036import java.util.Map; 037import java.util.Map.Entry; 038import java.util.TreeMap; 039import java.util.UUID; 040import java.util.concurrent.Future; 041import java.util.concurrent.atomic.AtomicLong; 042import java.util.stream.Collectors; 043import org.apache.commons.lang3.StringUtils; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.fs.Path; 046import org.apache.hadoop.hbase.Cell; 047import org.apache.hadoop.hbase.CellScanner; 048import org.apache.hadoop.hbase.CellUtil; 049import org.apache.hadoop.hbase.HBaseConfiguration; 050import org.apache.hadoop.hbase.HConstants; 051import org.apache.hadoop.hbase.TableName; 052import org.apache.hadoop.hbase.TableNotFoundException; 053import org.apache.hadoop.hbase.client.AsyncClusterConnection; 054import org.apache.hadoop.hbase.client.AsyncTable; 055import org.apache.hadoop.hbase.client.ClusterConnectionFactory; 056import org.apache.hadoop.hbase.client.Delete; 057import org.apache.hadoop.hbase.client.Mutation; 058import org.apache.hadoop.hbase.client.Put; 059import org.apache.hadoop.hbase.client.RetriesExhaustedException; 060import org.apache.hadoop.hbase.client.Row; 061import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; 062import org.apache.hadoop.hbase.replication.ReplicationUtils; 063import org.apache.hadoop.hbase.security.UserProvider; 064import org.apache.hadoop.hbase.util.Bytes; 065import org.apache.hadoop.hbase.util.FutureUtils; 066import org.apache.hadoop.hbase.util.Pair; 067import org.apache.hadoop.hbase.wal.WALEdit; 068import org.apache.yetus.audience.InterfaceAudience; 069import org.slf4j.Logger; 070import org.slf4j.LoggerFactory; 071 072import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 073 074import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; 075import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 076import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 077import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 078import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 079 080/** 081 * <p> 082 * This class is responsible for replicating the edits coming from another cluster. 083 * </p> 084 * <p> 085 * This replication process is currently waiting for the edits to be applied before the method can 086 * return. This means that the replication of edits is synchronized (after reading from WALs in 087 * ReplicationSource) and that a single region server cannot receive edits from two sources at the 088 * same time 089 * </p> 090 * <p> 091 * This class uses the native HBase client in order to replicate entries. 092 * </p> 093 * TODO make this class more like ReplicationSource wrt log handling 094 */ 095@InterfaceAudience.Private 096public class ReplicationSink { 097 098 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSink.class); 099 private final Configuration conf; 100 // Volatile because of note in here -- look for double-checked locking: 101 // http://www.oracle.com/technetwork/articles/javase/bloch-effective-08-qa-140880.html 102 private volatile AsyncClusterConnection sharedConn; 103 private final MetricsSink metrics; 104 private final AtomicLong totalReplicatedEdits = new AtomicLong(); 105 private final Object sharedConnLock = new Object(); 106 // Number of hfiles that we successfully replicated 107 private long hfilesReplicated = 0; 108 private SourceFSConfigurationProvider provider; 109 private WALEntrySinkFilter walEntrySinkFilter; 110 111 /** 112 * Row size threshold for multi requests above which a warning is logged 113 */ 114 private final int rowSizeWarnThreshold; 115 private boolean replicationSinkTrackerEnabled; 116 117 private final RegionServerCoprocessorHost rsServerHost; 118 119 /** 120 * Create a sink for replication 121 * @param conf conf object 122 * @throws IOException thrown when HDFS goes bad or bad file name 123 */ 124 public ReplicationSink(Configuration conf, RegionServerCoprocessorHost rsServerHost) 125 throws IOException { 126 this.conf = HBaseConfiguration.create(conf); 127 this.rsServerHost = rsServerHost; 128 rowSizeWarnThreshold = 129 conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); 130 replicationSinkTrackerEnabled = conf.getBoolean(REPLICATION_SINK_TRACKER_ENABLED_KEY, 131 REPLICATION_SINK_TRACKER_ENABLED_DEFAULT); 132 decorateConf(); 133 this.metrics = new MetricsSink(); 134 this.walEntrySinkFilter = setupWALEntrySinkFilter(); 135 String className = conf.get("hbase.replication.source.fs.conf.provider", 136 DefaultSourceFSConfigurationProvider.class.getCanonicalName()); 137 try { 138 Class<? extends SourceFSConfigurationProvider> c = 139 Class.forName(className).asSubclass(SourceFSConfigurationProvider.class); 140 this.provider = c.getDeclaredConstructor().newInstance(); 141 } catch (Exception e) { 142 throw new IllegalArgumentException( 143 "Configured source fs configuration provider class " + className + " throws error.", e); 144 } 145 } 146 147 private WALEntrySinkFilter setupWALEntrySinkFilter() throws IOException { 148 Class<?> walEntryFilterClass = 149 this.conf.getClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY, null); 150 WALEntrySinkFilter filter = null; 151 try { 152 filter = walEntryFilterClass == null 153 ? null 154 : (WALEntrySinkFilter) walEntryFilterClass.getDeclaredConstructor().newInstance(); 155 } catch (Exception e) { 156 LOG.warn("Failed to instantiate " + walEntryFilterClass); 157 } 158 if (filter != null) { 159 filter.init(getConnection()); 160 } 161 return filter; 162 } 163 164 /** 165 * decorate the Configuration object to make replication more receptive to delays: lessen the 166 * timeout and numTries. 167 */ 168 private void decorateConf() { 169 this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 170 this.conf.getInt("replication.sink.client.retries.number", 4)); 171 this.conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 172 this.conf.getInt("replication.sink.client.ops.timeout", 10000)); 173 String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY); 174 if (StringUtils.isNotEmpty(replicationCodec)) { 175 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec); 176 } 177 // use server ZK cluster for replication, so we unset the client ZK related properties if any 178 if (this.conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) { 179 this.conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM); 180 } 181 } 182 183 /** 184 * Replicate this array of entries directly into the local cluster using the native client. Only 185 * operates against raw protobuf type saving on a conversion from pb to pojo. 186 * @param entries WAL entries to be replicated. 187 * @param cells cell scanner for iteration. 188 * @param replicationClusterId Id which will uniquely identify source cluster FS client 189 * configurations in the replication configuration directory 190 * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace 191 * directory 192 * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory 193 * @throws IOException If failed to replicate the data 194 */ 195 public void replicateEntries(List<WALEntry> entries, final CellScanner cells, 196 String replicationClusterId, String sourceBaseNamespaceDirPath, 197 String sourceHFileArchiveDirPath) throws IOException { 198 if (entries.isEmpty()) { 199 return; 200 } 201 // Very simple optimization where we batch sequences of rows going 202 // to the same table. 203 try { 204 long totalReplicated = 0; 205 // Map of table => list of Rows, grouped by cluster id, we only want to flushCommits once per 206 // invocation of this method per table and cluster id. 207 Map<TableName, Map<List<UUID>, List<Row>>> rowMap = new TreeMap<>(); 208 209 Map<List<String>, Map<String, List<Pair<byte[], List<String>>>>> bulkLoadsPerClusters = null; 210 Pair<List<Mutation>, List<WALEntry>> mutationsToWalEntriesPairs = 211 new Pair<>(new ArrayList<>(), new ArrayList<>()); 212 for (WALEntry entry : entries) { 213 TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray()); 214 if (this.walEntrySinkFilter != null) { 215 if (this.walEntrySinkFilter.filter(table, entry.getKey().getWriteTime())) { 216 // Skip Cells in CellScanner associated with this entry. 217 int count = entry.getAssociatedCellCount(); 218 for (int i = 0; i < count; i++) { 219 // Throw index out of bounds if our cell count is off 220 if (!cells.advance()) { 221 this.metrics.incrementFailedBatches(); 222 throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); 223 } 224 } 225 continue; 226 } 227 } 228 Cell previousCell = null; 229 Mutation mutation = null; 230 int count = entry.getAssociatedCellCount(); 231 for (int i = 0; i < count; i++) { 232 // Throw index out of bounds if our cell count is off 233 if (!cells.advance()) { 234 this.metrics.incrementFailedBatches(); 235 throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); 236 } 237 Cell cell = cells.current(); 238 // Handle bulk load hfiles replication 239 if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { 240 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); 241 if (bld.getReplicate()) { 242 if (bulkLoadsPerClusters == null) { 243 bulkLoadsPerClusters = new HashMap<>(); 244 } 245 // Map of table name Vs list of pair of family and list of 246 // hfile paths from its namespace 247 Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = 248 bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(), k -> new HashMap<>()); 249 buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld); 250 } 251 } else if (CellUtil.matchingQualifier(cell, WALEdit.REPLICATION_MARKER)) { 252 Mutation put = processReplicationMarkerEntry(cell); 253 if (put == null) { 254 continue; 255 } 256 table = REPLICATION_SINK_TRACKER_TABLE_NAME; 257 List<UUID> clusterIds = new ArrayList<>(); 258 for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) { 259 clusterIds.add(toUUID(clusterId)); 260 } 261 put.setClusterIds(clusterIds); 262 addToHashMultiMap(rowMap, table, clusterIds, put); 263 } else { 264 // Handle wal replication 265 if (isNewRowOrType(previousCell, cell)) { 266 // Create new mutation 267 mutation = CellUtil.isDelete(cell) 268 ? new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) 269 : new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); 270 List<UUID> clusterIds = new ArrayList<>(entry.getKey().getClusterIdsList().size()); 271 for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) { 272 clusterIds.add(toUUID(clusterId)); 273 } 274 mutation.setClusterIds(clusterIds); 275 mutation.setAttribute(ReplicationUtils.REPLICATION_ATTR_NAME, 276 HConstants.EMPTY_BYTE_ARRAY); 277 if (rsServerHost != null) { 278 rsServerHost.preReplicationSinkBatchMutate(entry, mutation); 279 mutationsToWalEntriesPairs.getFirst().add(mutation); 280 mutationsToWalEntriesPairs.getSecond().add(entry); 281 } 282 addToHashMultiMap(rowMap, table, clusterIds, mutation); 283 } 284 if (CellUtil.isDelete(cell)) { 285 ((Delete) mutation).add(cell); 286 } else { 287 ((Put) mutation).add(cell); 288 } 289 previousCell = cell; 290 } 291 } 292 totalReplicated++; 293 } 294 295 // TODO Replicating mutations and bulk loaded data can be made parallel 296 if (!rowMap.isEmpty()) { 297 LOG.debug("Started replicating mutations."); 298 for (Entry<TableName, Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) { 299 batch(entry.getKey(), entry.getValue().values(), rowSizeWarnThreshold); 300 } 301 LOG.debug("Finished replicating mutations."); 302 } 303 304 if (rsServerHost != null) { 305 List<Mutation> mutations = mutationsToWalEntriesPairs.getFirst(); 306 List<WALEntry> walEntries = mutationsToWalEntriesPairs.getSecond(); 307 for (int i = 0; i < mutations.size(); i++) { 308 rsServerHost.postReplicationSinkBatchMutate(walEntries.get(i), mutations.get(i)); 309 } 310 } 311 312 if (bulkLoadsPerClusters != null) { 313 for (Entry<List<String>, 314 Map<String, List<Pair<byte[], List<String>>>>> entry : bulkLoadsPerClusters.entrySet()) { 315 Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = entry.getValue(); 316 if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) { 317 LOG.debug("Replicating {} bulk loaded data", entry.getKey().toString()); 318 Configuration providerConf = this.provider.getConf(this.conf, replicationClusterId); 319 try (HFileReplicator hFileReplicator = new HFileReplicator(providerConf, 320 sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf, 321 getConnection(), entry.getKey())) { 322 hFileReplicator.replicate(); 323 LOG.debug("Finished replicating {} bulk loaded data", entry.getKey().toString()); 324 } 325 } 326 } 327 } 328 329 int size = entries.size(); 330 this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime()); 331 this.metrics.applyBatch(size + hfilesReplicated, hfilesReplicated); 332 this.totalReplicatedEdits.addAndGet(totalReplicated); 333 } catch (IOException ex) { 334 LOG.error("Unable to accept edit because:", ex); 335 this.metrics.incrementFailedBatches(); 336 throw ex; 337 } 338 } 339 340 /* 341 * First check if config key hbase.regionserver.replication.sink.tracker.enabled is true or not. 342 * If false, then ignore this cell. If set to true, de-serialize value into 343 * ReplicationTrackerDescriptor. Create a Put mutation with regionserver name, walname, offset and 344 * timestamp from ReplicationMarkerDescriptor. 345 */ 346 private Put processReplicationMarkerEntry(Cell cell) throws IOException { 347 // If source is emitting replication marker rows but sink is not accepting them, 348 // ignore the edits. 349 if (!replicationSinkTrackerEnabled) { 350 return null; 351 } 352 WALProtos.ReplicationMarkerDescriptor descriptor = 353 WALProtos.ReplicationMarkerDescriptor.parseFrom(new ByteArrayInputStream(cell.getValueArray(), 354 cell.getValueOffset(), cell.getValueLength())); 355 Put put = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); 356 put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, RS_COLUMN, cell.getTimestamp(), 357 (Bytes.toBytes(descriptor.getRegionServerName()))); 358 put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, WAL_NAME_COLUMN, cell.getTimestamp(), 359 Bytes.toBytes(descriptor.getWalName())); 360 put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, TIMESTAMP_COLUMN, cell.getTimestamp(), 361 Bytes.toBytes(cell.getTimestamp())); 362 put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, OFFSET_COLUMN, cell.getTimestamp(), 363 Bytes.toBytes(descriptor.getOffset())); 364 return put; 365 } 366 367 private void buildBulkLoadHFileMap( 368 final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table, 369 BulkLoadDescriptor bld) throws IOException { 370 List<StoreDescriptor> storesList = bld.getStoresList(); 371 int storesSize = storesList.size(); 372 for (int j = 0; j < storesSize; j++) { 373 StoreDescriptor storeDescriptor = storesList.get(j); 374 List<String> storeFileList = storeDescriptor.getStoreFileList(); 375 int storeFilesSize = storeFileList.size(); 376 hfilesReplicated += storeFilesSize; 377 for (int k = 0; k < storeFilesSize; k++) { 378 byte[] family = storeDescriptor.getFamilyName().toByteArray(); 379 380 // Build hfile relative path from its namespace 381 String pathToHfileFromNS = getHFilePath(table, bld, storeFileList.get(k), family); 382 String tableName = table.getNameWithNamespaceInclAsString(); 383 List<Pair<byte[], List<String>>> familyHFilePathsList = bulkLoadHFileMap.get(tableName); 384 if (familyHFilePathsList != null) { 385 boolean foundFamily = false; 386 for (Pair<byte[], List<String>> familyHFilePathsPair : familyHFilePathsList) { 387 if (Bytes.equals(familyHFilePathsPair.getFirst(), family)) { 388 // Found family already present, just add the path to the existing list 389 familyHFilePathsPair.getSecond().add(pathToHfileFromNS); 390 foundFamily = true; 391 break; 392 } 393 } 394 if (!foundFamily) { 395 // Family not found, add this family and its hfile paths pair to the list 396 addFamilyAndItsHFilePathToTableInMap(family, pathToHfileFromNS, familyHFilePathsList); 397 } 398 } else { 399 // Add this table entry into the map 400 addNewTableEntryInMap(bulkLoadHFileMap, family, pathToHfileFromNS, tableName); 401 } 402 } 403 } 404 } 405 406 private void addFamilyAndItsHFilePathToTableInMap(byte[] family, String pathToHfileFromNS, 407 List<Pair<byte[], List<String>>> familyHFilePathsList) { 408 List<String> hfilePaths = new ArrayList<>(1); 409 hfilePaths.add(pathToHfileFromNS); 410 familyHFilePathsList.add(new Pair<>(family, hfilePaths)); 411 } 412 413 private void addNewTableEntryInMap( 414 final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, byte[] family, 415 String pathToHfileFromNS, String tableName) { 416 List<String> hfilePaths = new ArrayList<>(1); 417 hfilePaths.add(pathToHfileFromNS); 418 Pair<byte[], List<String>> newFamilyHFilePathsPair = new Pair<>(family, hfilePaths); 419 List<Pair<byte[], List<String>>> newFamilyHFilePathsList = new ArrayList<>(); 420 newFamilyHFilePathsList.add(newFamilyHFilePathsPair); 421 bulkLoadHFileMap.put(tableName, newFamilyHFilePathsList); 422 } 423 424 private String getHFilePath(TableName table, BulkLoadDescriptor bld, String storeFile, 425 byte[] family) { 426 return new StringBuilder(100).append(table.getNamespaceAsString()).append(Path.SEPARATOR) 427 .append(table.getQualifierAsString()).append(Path.SEPARATOR) 428 .append(Bytes.toString(bld.getEncodedRegionName().toByteArray())).append(Path.SEPARATOR) 429 .append(Bytes.toString(family)).append(Path.SEPARATOR).append(storeFile).toString(); 430 } 431 432 /** Returns True if we have crossed over onto a new row or type */ 433 private boolean isNewRowOrType(final Cell previousCell, final Cell cell) { 434 return previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() 435 || !CellUtil.matchingRows(previousCell, cell); 436 } 437 438 private java.util.UUID toUUID(final HBaseProtos.UUID uuid) { 439 return new java.util.UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()); 440 } 441 442 /** 443 * Simple helper to a map from key to (a list of) values TODO: Make a general utility method 444 * @return the list of values corresponding to key1 and key2 445 */ 446 private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2, List<V>>> map, K1 key1, K2 key2, 447 V value) { 448 Map<K2, List<V>> innerMap = map.computeIfAbsent(key1, k -> new HashMap<>()); 449 List<V> values = innerMap.computeIfAbsent(key2, k -> new ArrayList<>()); 450 values.add(value); 451 return values; 452 } 453 454 /** 455 * stop the thread pool executor. It is called when the regionserver is stopped. 456 */ 457 public void stopReplicationSinkServices() { 458 try { 459 if (this.sharedConn != null) { 460 synchronized (sharedConnLock) { 461 if (this.sharedConn != null) { 462 this.sharedConn.close(); 463 this.sharedConn = null; 464 } 465 } 466 } 467 } catch (IOException e) { 468 LOG.warn("IOException while closing the connection", e); // ignoring as we are closing. 469 } 470 } 471 472 /** 473 * Do the changes and handle the pool 474 * @param tableName table to insert into 475 * @param allRows list of actions 476 * @param batchRowSizeThreshold rowSize threshold for batch mutation 477 */ 478 private void batch(TableName tableName, Collection<List<Row>> allRows, int batchRowSizeThreshold) 479 throws IOException { 480 if (allRows.isEmpty()) { 481 return; 482 } 483 AsyncTable<?> table = getConnection().getTable(tableName); 484 List<Future<?>> futures = new ArrayList<>(); 485 for (List<Row> rows : allRows) { 486 List<List<Row>> batchRows; 487 if (rows.size() > batchRowSizeThreshold) { 488 batchRows = Lists.partition(rows, batchRowSizeThreshold); 489 } else { 490 batchRows = Collections.singletonList(rows); 491 } 492 futures.addAll(batchRows.stream().map(table::batchAll).collect(Collectors.toList())); 493 } 494 for (Future<?> future : futures) { 495 try { 496 FutureUtils.get(future); 497 } catch (RetriesExhaustedException e) { 498 if (e.getCause() instanceof TableNotFoundException) { 499 throw new TableNotFoundException("'" + tableName + "'"); 500 } 501 throw e; 502 } 503 } 504 } 505 506 private AsyncClusterConnection getConnection() throws IOException { 507 // See https://en.wikipedia.org/wiki/Double-checked_locking 508 AsyncClusterConnection connection = sharedConn; 509 if (connection == null) { 510 synchronized (sharedConnLock) { 511 connection = sharedConn; 512 if (connection == null) { 513 connection = ClusterConnectionFactory.createAsyncClusterConnection(conf, null, 514 UserProvider.instantiate(conf).getCurrent()); 515 sharedConn = connection; 516 } 517 } 518 } 519 return connection; 520 } 521 522 /** 523 * Get a string representation of this sink's metrics 524 * @return string with the total replicated edits count and the date of the last edit that was 525 * applied 526 */ 527 public String getStats() { 528 long total = this.totalReplicatedEdits.get(); 529 return total == 0 530 ? "" 531 : "Sink: " + "age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp() 532 + ", total replicated edits: " + total; 533 } 534 535 /** 536 * Get replication Sink Metrics 537 */ 538 public MetricsSink getSinkMetrics() { 539 return this.metrics; 540 } 541}