001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import java.util.Map.Entry; 028import java.util.TreeMap; 029import java.util.UUID; 030import java.util.concurrent.Future; 031import java.util.concurrent.atomic.AtomicLong; 032import java.util.stream.Collectors; 033import org.apache.commons.lang3.StringUtils; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.CellScanner; 038import org.apache.hadoop.hbase.CellUtil; 039import org.apache.hadoop.hbase.HBaseConfiguration; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.TableNotFoundException; 043import org.apache.hadoop.hbase.client.AsyncConnection; 044import org.apache.hadoop.hbase.client.AsyncTable; 045import org.apache.hadoop.hbase.client.Connection; 046import org.apache.hadoop.hbase.client.ConnectionFactory; 047import org.apache.hadoop.hbase.client.Delete; 048import org.apache.hadoop.hbase.client.Mutation; 049import org.apache.hadoop.hbase.client.Put; 050import org.apache.hadoop.hbase.client.RetriesExhaustedException; 051import org.apache.hadoop.hbase.client.Row; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.util.FutureUtils; 054import org.apache.hadoop.hbase.util.Pair; 055import org.apache.hadoop.hbase.wal.WALEdit; 056import org.apache.yetus.audience.InterfaceAudience; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 061 062import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; 063import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 064import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 065import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 066 067/** 068 * <p> 069 * This class is responsible for replicating the edits coming from another cluster. 070 * </p> 071 * <p> 072 * This replication process is currently waiting for the edits to be applied before the method can 073 * return. This means that the replication of edits is synchronized (after reading from WALs in 074 * ReplicationSource) and that a single region server cannot receive edits from two sources at the 075 * same time 076 * </p> 077 * <p> 078 * This class uses the native HBase client in order to replicate entries. 079 * </p> 080 * TODO make this class more like ReplicationSource wrt log handling 081 */ 082@InterfaceAudience.Private 083public class ReplicationSink { 084 085 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSink.class); 086 private final Configuration conf; 087 // Volatile because of note in here -- look for double-checked locking: 088 // http://www.oracle.com/technetwork/articles/javase/bloch-effective-08-qa-140880.html 089 /** 090 * This shared {@link Connection} is used for handling bulk load hfiles replication. 091 */ 092 private volatile Connection sharedConnection; 093 /** 094 * This shared {@link AsyncConnection} is used for handling wal replication. 095 */ 096 private volatile AsyncConnection sharedAsyncConnection; 097 private final MetricsSink metrics; 098 private final AtomicLong totalReplicatedEdits = new AtomicLong(); 099 private final Object sharedConnectionLock = new Object(); 100 private final Object sharedAsyncConnectionLock = new Object(); 101 // Number of hfiles that we successfully replicated 102 private long hfilesReplicated = 0; 103 private SourceFSConfigurationProvider provider; 104 private WALEntrySinkFilter walEntrySinkFilter; 105 106 /** 107 * Row size threshold for multi requests above which a warning is logged 108 */ 109 private final int rowSizeWarnThreshold; 110 111 /** 112 * Create a sink for replication 113 * @param conf conf object 114 * @throws IOException thrown when HDFS goes bad or bad file name 115 */ 116 public ReplicationSink(Configuration conf) throws IOException { 117 this.conf = HBaseConfiguration.create(conf); 118 rowSizeWarnThreshold = 119 conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); 120 decorateConf(); 121 this.metrics = new MetricsSink(); 122 this.walEntrySinkFilter = setupWALEntrySinkFilter(); 123 String className = conf.get("hbase.replication.source.fs.conf.provider", 124 DefaultSourceFSConfigurationProvider.class.getCanonicalName()); 125 try { 126 @SuppressWarnings("rawtypes") 127 Class c = Class.forName(className); 128 this.provider = (SourceFSConfigurationProvider) c.getDeclaredConstructor().newInstance(); 129 } catch (Exception e) { 130 throw new IllegalArgumentException( 131 "Configured source fs configuration provider class " + className + " throws error.", e); 132 } 133 } 134 135 private WALEntrySinkFilter setupWALEntrySinkFilter() throws IOException { 136 Class<?> walEntryFilterClass = 137 this.conf.getClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY, null); 138 WALEntrySinkFilter filter = null; 139 try { 140 filter = walEntryFilterClass == null 141 ? null 142 : (WALEntrySinkFilter) walEntryFilterClass.getDeclaredConstructor().newInstance(); 143 } catch (Exception e) { 144 LOG.warn("Failed to instantiate " + walEntryFilterClass); 145 } 146 if (filter != null) { 147 filter.init(getConnection()); 148 } 149 return filter; 150 } 151 152 /** 153 * decorate the Configuration object to make replication more receptive to delays: lessen the 154 * timeout and numTries. 155 */ 156 private void decorateConf() { 157 this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 158 this.conf.getInt("replication.sink.client.retries.number", 4)); 159 this.conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 160 this.conf.getInt("replication.sink.client.ops.timeout", 10000)); 161 String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY); 162 if (StringUtils.isNotEmpty(replicationCodec)) { 163 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec); 164 } 165 // use server ZK cluster for replication, so we unset the client ZK related properties if any 166 if (this.conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) { 167 this.conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM); 168 } 169 } 170 171 /** 172 * Replicate this array of entries directly into the local cluster using the native client. Only 173 * operates against raw protobuf type saving on a conversion from pb to pojo. 174 * @param replicationClusterId Id which will uniquely identify source cluster FS client 175 * configurations in the replication configuration directory 176 * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace 177 * directory 178 * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory 179 * @throws IOException If failed to replicate the data 180 */ 181 public void replicateEntries(List<WALEntry> entries, final CellScanner cells, 182 String replicationClusterId, String sourceBaseNamespaceDirPath, 183 String sourceHFileArchiveDirPath) throws IOException { 184 if (entries.isEmpty()) return; 185 // Very simple optimization where we batch sequences of rows going 186 // to the same table. 187 try { 188 long totalReplicated = 0; 189 // Map of table => list of Rows, grouped by cluster id, we only want to flushCommits once per 190 // invocation of this method per table and cluster id. 191 Map<TableName, Map<List<UUID>, List<Row>>> rowMap = new TreeMap<>(); 192 193 Map<List<String>, Map<String, List<Pair<byte[], List<String>>>>> bulkLoadsPerClusters = null; 194 for (WALEntry entry : entries) { 195 TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray()); 196 if (this.walEntrySinkFilter != null) { 197 if (this.walEntrySinkFilter.filter(table, entry.getKey().getWriteTime())) { 198 // Skip Cells in CellScanner associated with this entry. 199 int count = entry.getAssociatedCellCount(); 200 for (int i = 0; i < count; i++) { 201 // Throw index out of bounds if our cell count is off 202 if (!cells.advance()) { 203 this.metrics.incrementFailedBatches(); 204 throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); 205 } 206 } 207 continue; 208 } 209 } 210 Cell previousCell = null; 211 Mutation mutation = null; 212 int count = entry.getAssociatedCellCount(); 213 for (int i = 0; i < count; i++) { 214 // Throw index out of bounds if our cell count is off 215 if (!cells.advance()) { 216 this.metrics.incrementFailedBatches(); 217 throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); 218 } 219 Cell cell = cells.current(); 220 // Handle bulk load hfiles replication 221 if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { 222 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); 223 if (bld.getReplicate()) { 224 if (bulkLoadsPerClusters == null) { 225 bulkLoadsPerClusters = new HashMap<>(); 226 } 227 // Map of table name Vs list of pair of family and list of 228 // hfile paths from its namespace 229 Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = 230 bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(), k -> new HashMap<>()); 231 buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld); 232 } 233 } else { 234 // Handle wal replication 235 if (isNewRowOrType(previousCell, cell)) { 236 // Create new mutation 237 mutation = CellUtil.isDelete(cell) 238 ? new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) 239 : new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); 240 List<UUID> clusterIds = new ArrayList<>(entry.getKey().getClusterIdsList().size()); 241 for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) { 242 clusterIds.add(toUUID(clusterId)); 243 } 244 mutation.setClusterIds(clusterIds); 245 addToHashMultiMap(rowMap, table, clusterIds, mutation); 246 } 247 if (CellUtil.isDelete(cell)) { 248 ((Delete) mutation).add(cell); 249 } else { 250 ((Put) mutation).add(cell); 251 } 252 previousCell = cell; 253 } 254 } 255 totalReplicated++; 256 } 257 258 // TODO Replicating mutations and bulk loaded data can be made parallel 259 if (!rowMap.isEmpty()) { 260 LOG.debug("Started replicating mutations."); 261 for (Entry<TableName, Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) { 262 batch(entry.getKey(), entry.getValue().values(), rowSizeWarnThreshold); 263 } 264 LOG.debug("Finished replicating mutations."); 265 } 266 267 if (bulkLoadsPerClusters != null) { 268 for (Entry<List<String>, 269 Map<String, List<Pair<byte[], List<String>>>>> entry : bulkLoadsPerClusters.entrySet()) { 270 Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = entry.getValue(); 271 if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) { 272 LOG.debug("Replicating {} bulk loaded data", entry.getKey().toString()); 273 Configuration providerConf = this.provider.getConf(this.conf, replicationClusterId); 274 try (HFileReplicator hFileReplicator = new HFileReplicator(providerConf, 275 sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf, 276 getConnection(), entry.getKey())) { 277 hFileReplicator.replicate(); 278 LOG.debug("Finished replicating {} bulk loaded data", entry.getKey().toString()); 279 } 280 } 281 } 282 } 283 284 int size = entries.size(); 285 this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime()); 286 this.metrics.applyBatch(size + hfilesReplicated, hfilesReplicated); 287 this.totalReplicatedEdits.addAndGet(totalReplicated); 288 } catch (IOException ex) { 289 LOG.error("Unable to accept edit because:", ex); 290 this.metrics.incrementFailedBatches(); 291 throw ex; 292 } 293 } 294 295 private void buildBulkLoadHFileMap( 296 final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table, 297 BulkLoadDescriptor bld) throws IOException { 298 List<StoreDescriptor> storesList = bld.getStoresList(); 299 int storesSize = storesList.size(); 300 for (int j = 0; j < storesSize; j++) { 301 StoreDescriptor storeDescriptor = storesList.get(j); 302 List<String> storeFileList = storeDescriptor.getStoreFileList(); 303 int storeFilesSize = storeFileList.size(); 304 hfilesReplicated += storeFilesSize; 305 for (int k = 0; k < storeFilesSize; k++) { 306 byte[] family = storeDescriptor.getFamilyName().toByteArray(); 307 308 // Build hfile relative path from its namespace 309 String pathToHfileFromNS = getHFilePath(table, bld, storeFileList.get(k), family); 310 String tableName = table.getNameWithNamespaceInclAsString(); 311 List<Pair<byte[], List<String>>> familyHFilePathsList = bulkLoadHFileMap.get(tableName); 312 if (familyHFilePathsList != null) { 313 boolean foundFamily = false; 314 for (Pair<byte[], List<String>> familyHFilePathsPair : familyHFilePathsList) { 315 if (Bytes.equals(familyHFilePathsPair.getFirst(), family)) { 316 // Found family already present, just add the path to the existing list 317 familyHFilePathsPair.getSecond().add(pathToHfileFromNS); 318 foundFamily = true; 319 break; 320 } 321 } 322 if (!foundFamily) { 323 // Family not found, add this family and its hfile paths pair to the list 324 addFamilyAndItsHFilePathToTableInMap(family, pathToHfileFromNS, familyHFilePathsList); 325 } 326 } else { 327 // Add this table entry into the map 328 addNewTableEntryInMap(bulkLoadHFileMap, family, pathToHfileFromNS, tableName); 329 } 330 } 331 } 332 } 333 334 private void addFamilyAndItsHFilePathToTableInMap(byte[] family, String pathToHfileFromNS, 335 List<Pair<byte[], List<String>>> familyHFilePathsList) { 336 List<String> hfilePaths = new ArrayList<>(1); 337 hfilePaths.add(pathToHfileFromNS); 338 familyHFilePathsList.add(new Pair<>(family, hfilePaths)); 339 } 340 341 private void addNewTableEntryInMap( 342 final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, byte[] family, 343 String pathToHfileFromNS, String tableName) { 344 List<String> hfilePaths = new ArrayList<>(1); 345 hfilePaths.add(pathToHfileFromNS); 346 Pair<byte[], List<String>> newFamilyHFilePathsPair = new Pair<>(family, hfilePaths); 347 List<Pair<byte[], List<String>>> newFamilyHFilePathsList = new ArrayList<>(); 348 newFamilyHFilePathsList.add(newFamilyHFilePathsPair); 349 bulkLoadHFileMap.put(tableName, newFamilyHFilePathsList); 350 } 351 352 private String getHFilePath(TableName table, BulkLoadDescriptor bld, String storeFile, 353 byte[] family) { 354 return new StringBuilder(100).append(table.getNamespaceAsString()).append(Path.SEPARATOR) 355 .append(table.getQualifierAsString()).append(Path.SEPARATOR) 356 .append(Bytes.toString(bld.getEncodedRegionName().toByteArray())).append(Path.SEPARATOR) 357 .append(Bytes.toString(family)).append(Path.SEPARATOR).append(storeFile).toString(); 358 } 359 360 /** 361 * nn * @return True if we have crossed over onto a new row or type 362 */ 363 private boolean isNewRowOrType(final Cell previousCell, final Cell cell) { 364 return previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() 365 || !CellUtil.matchingRows(previousCell, cell); 366 } 367 368 private java.util.UUID toUUID(final HBaseProtos.UUID uuid) { 369 return new java.util.UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()); 370 } 371 372 /** 373 * Simple helper to a map from key to (a list of) values TODO: Make a general utility method nnnn 374 * * @return the list of values corresponding to key1 and key2 375 */ 376 private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2, List<V>>> map, K1 key1, K2 key2, 377 V value) { 378 Map<K2, List<V>> innerMap = map.computeIfAbsent(key1, k -> new HashMap<>()); 379 List<V> values = innerMap.computeIfAbsent(key2, k -> new ArrayList<>()); 380 values.add(value); 381 return values; 382 } 383 384 /** 385 * stop the thread pool executor. It is called when the regionserver is stopped. 386 */ 387 public void stopReplicationSinkServices() { 388 try { 389 if (this.sharedConnection != null) { 390 synchronized (sharedConnectionLock) { 391 if (this.sharedConnection != null) { 392 this.sharedConnection.close(); 393 this.sharedConnection = null; 394 } 395 } 396 } 397 } catch (IOException e) { 398 // ignoring as we are closing. 399 LOG.warn("IOException while closing the sharedConnection", e); 400 } 401 402 try { 403 if (this.sharedAsyncConnection != null) { 404 synchronized (sharedAsyncConnectionLock) { 405 if (this.sharedAsyncConnection != null) { 406 this.sharedAsyncConnection.close(); 407 this.sharedAsyncConnection = null; 408 } 409 } 410 } 411 } catch (IOException e) { 412 // ignoring as we are closing. 413 LOG.warn("IOException while closing the sharedAsyncConnection", e); 414 } 415 } 416 417 /** 418 * Do the changes and handle the pool 419 * @param tableName table to insert into 420 * @param allRows list of actions 421 * @param batchRowSizeThreshold rowSize threshold for batch mutation 422 */ 423 private void batch(TableName tableName, Collection<List<Row>> allRows, int batchRowSizeThreshold) 424 throws IOException { 425 if (allRows.isEmpty()) { 426 return; 427 } 428 AsyncTable<?> table = getAsyncConnection().getTable(tableName); 429 List<Future<?>> futures = new ArrayList<>(); 430 for (List<Row> rows : allRows) { 431 List<List<Row>> batchRows; 432 if (rows.size() > batchRowSizeThreshold) { 433 batchRows = Lists.partition(rows, batchRowSizeThreshold); 434 } else { 435 batchRows = Collections.singletonList(rows); 436 } 437 futures.addAll(batchRows.stream().map(table::batchAll).collect(Collectors.toList())); 438 } 439 440 for (Future<?> future : futures) { 441 try { 442 FutureUtils.get(future); 443 } catch (RetriesExhaustedException e) { 444 if (e.getCause() instanceof TableNotFoundException) { 445 throw new TableNotFoundException("'" + tableName + "'"); 446 } 447 throw e; 448 } 449 } 450 } 451 452 /** 453 * Return the shared {@link Connection} which is used for handling bulk load hfiles replication. 454 */ 455 private Connection getConnection() throws IOException { 456 // See https://en.wikipedia.org/wiki/Double-checked_locking 457 Connection connection = sharedConnection; 458 if (connection == null) { 459 synchronized (sharedConnectionLock) { 460 connection = sharedConnection; 461 if (connection == null) { 462 connection = ConnectionFactory.createConnection(conf); 463 sharedConnection = connection; 464 } 465 } 466 } 467 return connection; 468 } 469 470 /** 471 * Return the shared {@link AsyncConnection} which is used for handling wal replication. 472 */ 473 private AsyncConnection getAsyncConnection() throws IOException { 474 // See https://en.wikipedia.org/wiki/Double-checked_locking 475 AsyncConnection asyncConnection = sharedAsyncConnection; 476 if (asyncConnection == null) { 477 synchronized (sharedAsyncConnectionLock) { 478 asyncConnection = sharedAsyncConnection; 479 if (asyncConnection == null) { 480 /** 481 * Get the AsyncConnection immediately. 482 */ 483 asyncConnection = FutureUtils.get(ConnectionFactory.createAsyncConnection(conf)); 484 sharedAsyncConnection = asyncConnection; 485 } 486 } 487 } 488 return asyncConnection; 489 } 490 491 /** 492 * Get a string representation of this sink's metrics 493 * @return string with the total replicated edits count and the date of the last edit that was 494 * applied 495 */ 496 public String getStats() { 497 return this.totalReplicatedEdits.get() == 0 498 ? "" 499 : "Sink: " + "age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp() 500 + ", total replicated edits: " + this.totalReplicatedEdits; 501 } 502 503 /** 504 * Get replication Sink Metrics n 505 */ 506 public MetricsSink getSinkMetrics() { 507 return this.metrics; 508 } 509}