001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.replication.regionserver; 020 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.util.ArrayList; 024import java.util.Collection; 025import java.util.Collections; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.Map.Entry; 030import java.util.TreeMap; 031import java.util.UUID; 032import java.util.concurrent.atomic.AtomicLong; 033import org.apache.commons.lang3.StringUtils; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.CellScanner; 038import org.apache.hadoop.hbase.CellUtil; 039import org.apache.hadoop.hbase.HBaseConfiguration; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.TableNotFoundException; 043import org.apache.yetus.audience.InterfaceAudience; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046import org.apache.hadoop.hbase.client.Connection; 047import org.apache.hadoop.hbase.client.ConnectionFactory; 048import org.apache.hadoop.hbase.client.Delete; 049import org.apache.hadoop.hbase.client.Mutation; 050import org.apache.hadoop.hbase.client.Put; 051import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; 052import org.apache.hadoop.hbase.client.Row; 053import org.apache.hadoop.hbase.client.Table; 054import org.apache.hadoop.hbase.util.Bytes; 055import org.apache.hadoop.hbase.util.Pair; 056import org.apache.hadoop.hbase.wal.WALEdit; 057import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 058import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; 059import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 060import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 062 063/** 064 * <p> 065 * This class is responsible for replicating the edits coming 066 * from another cluster. 067 * </p><p> 068 * This replication process is currently waiting for the edits to be applied 069 * before the method can return. This means that the replication of edits 070 * is synchronized (after reading from WALs in ReplicationSource) and that a 071 * single region server cannot receive edits from two sources at the same time 072 * </p><p> 073 * This class uses the native HBase client in order to replicate entries. 074 * </p> 075 * 076 * TODO make this class more like ReplicationSource wrt log handling 077 */ 078@InterfaceAudience.Private 079public class ReplicationSink { 080 081 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSink.class); 082 private final Configuration conf; 083 // Volatile because of note in here -- look for double-checked locking: 084 // http://www.oracle.com/technetwork/articles/javase/bloch-effective-08-qa-140880.html 085 private volatile Connection sharedHtableCon; 086 private final MetricsSink metrics; 087 private final AtomicLong totalReplicatedEdits = new AtomicLong(); 088 private final Object sharedHtableConLock = new Object(); 089 // Number of hfiles that we successfully replicated 090 private long hfilesReplicated = 0; 091 private SourceFSConfigurationProvider provider; 092 private WALEntrySinkFilter walEntrySinkFilter; 093 094 /** 095 * Row size threshold for multi requests above which a warning is logged 096 */ 097 private final int rowSizeWarnThreshold; 098 099 /** 100 * Create a sink for replication 101 * @param conf conf object 102 * @throws IOException thrown when HDFS goes bad or bad file name 103 */ 104 public ReplicationSink(Configuration conf) 105 throws IOException { 106 this.conf = HBaseConfiguration.create(conf); 107 rowSizeWarnThreshold = conf.getInt( 108 HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); 109 decorateConf(); 110 this.metrics = new MetricsSink(); 111 this.walEntrySinkFilter = setupWALEntrySinkFilter(); 112 String className = 113 conf.get("hbase.replication.source.fs.conf.provider", 114 DefaultSourceFSConfigurationProvider.class.getCanonicalName()); 115 try { 116 @SuppressWarnings("rawtypes") 117 Class c = Class.forName(className); 118 this.provider = (SourceFSConfigurationProvider) c.getDeclaredConstructor().newInstance(); 119 } catch (Exception e) { 120 throw new IllegalArgumentException("Configured source fs configuration provider class " 121 + className + " throws error.", e); 122 } 123 } 124 125 private WALEntrySinkFilter setupWALEntrySinkFilter() throws IOException { 126 Class<?> walEntryFilterClass = 127 this.conf.getClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY, null); 128 WALEntrySinkFilter filter = null; 129 try { 130 filter = walEntryFilterClass == null? null: 131 (WALEntrySinkFilter)walEntryFilterClass.getDeclaredConstructor().newInstance(); 132 } catch (Exception e) { 133 LOG.warn("Failed to instantiate " + walEntryFilterClass); 134 } 135 if (filter != null) { 136 filter.init(getConnection()); 137 } 138 return filter; 139 } 140 141 /** 142 * decorate the Configuration object to make replication more receptive to delays: 143 * lessen the timeout and numTries. 144 */ 145 private void decorateConf() { 146 this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 147 this.conf.getInt("replication.sink.client.retries.number", 4)); 148 this.conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 149 this.conf.getInt("replication.sink.client.ops.timeout", 10000)); 150 String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY); 151 if (StringUtils.isNotEmpty(replicationCodec)) { 152 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec); 153 } 154 // use server ZK cluster for replication, so we unset the client ZK related properties if any 155 if (this.conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) { 156 this.conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM); 157 } 158 } 159 160 /** 161 * Replicate this array of entries directly into the local cluster using the native client. Only 162 * operates against raw protobuf type saving on a conversion from pb to pojo. 163 * @param replicationClusterId Id which will uniquely identify source cluster FS client 164 * configurations in the replication configuration directory 165 * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace 166 * directory 167 * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory 168 * @throws IOException If failed to replicate the data 169 */ 170 public void replicateEntries(List<WALEntry> entries, final CellScanner cells, 171 String replicationClusterId, String sourceBaseNamespaceDirPath, 172 String sourceHFileArchiveDirPath) throws IOException { 173 if (entries.isEmpty()) return; 174 // Very simple optimization where we batch sequences of rows going 175 // to the same table. 176 try { 177 long totalReplicated = 0; 178 // Map of table => list of Rows, grouped by cluster id, we only want to flushCommits once per 179 // invocation of this method per table and cluster id. 180 Map<TableName, Map<List<UUID>, List<Row>>> rowMap = new TreeMap<>(); 181 182 Map<List<String>, Map<String, List<Pair<byte[], List<String>>>>> bulkLoadsPerClusters = null; 183 for (WALEntry entry : entries) { 184 TableName table = 185 TableName.valueOf(entry.getKey().getTableName().toByteArray()); 186 if (this.walEntrySinkFilter != null) { 187 if (this.walEntrySinkFilter.filter(table, entry.getKey().getWriteTime())) { 188 // Skip Cells in CellScanner associated with this entry. 189 int count = entry.getAssociatedCellCount(); 190 for (int i = 0; i < count; i++) { 191 // Throw index out of bounds if our cell count is off 192 if (!cells.advance()) { 193 throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); 194 } 195 } 196 continue; 197 } 198 } 199 Cell previousCell = null; 200 Mutation mutation = null; 201 int count = entry.getAssociatedCellCount(); 202 for (int i = 0; i < count; i++) { 203 // Throw index out of bounds if our cell count is off 204 if (!cells.advance()) { 205 throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); 206 } 207 Cell cell = cells.current(); 208 // Handle bulk load hfiles replication 209 if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { 210 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); 211 if(bld.getReplicate()) { 212 if (bulkLoadsPerClusters == null) { 213 bulkLoadsPerClusters = new HashMap<>(); 214 } 215 // Map of table name Vs list of pair of family and list of 216 // hfile paths from its namespace 217 Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = 218 bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(), k -> new HashMap<>()); 219 buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld); 220 } 221 } else { 222 // Handle wal replication 223 if (isNewRowOrType(previousCell, cell)) { 224 // Create new mutation 225 mutation = 226 CellUtil.isDelete(cell) ? new Delete(cell.getRowArray(), cell.getRowOffset(), 227 cell.getRowLength()) : new Put(cell.getRowArray(), cell.getRowOffset(), 228 cell.getRowLength()); 229 List<UUID> clusterIds = new ArrayList<>(entry.getKey().getClusterIdsList().size()); 230 for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) { 231 clusterIds.add(toUUID(clusterId)); 232 } 233 mutation.setClusterIds(clusterIds); 234 addToHashMultiMap(rowMap, table, clusterIds, mutation); 235 } 236 if (CellUtil.isDelete(cell)) { 237 ((Delete) mutation).add(cell); 238 } else { 239 ((Put) mutation).add(cell); 240 } 241 previousCell = cell; 242 } 243 } 244 totalReplicated++; 245 } 246 247 // TODO Replicating mutations and bulk loaded data can be made parallel 248 if (!rowMap.isEmpty()) { 249 LOG.debug("Started replicating mutations."); 250 for (Entry<TableName, Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) { 251 batch(entry.getKey(), entry.getValue().values(), rowSizeWarnThreshold); 252 } 253 LOG.debug("Finished replicating mutations."); 254 } 255 256 if(bulkLoadsPerClusters != null) { 257 for (Entry<List<String>, Map<String, List<Pair<byte[], 258 List<String>>>>> entry : bulkLoadsPerClusters.entrySet()) { 259 Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = entry.getValue(); 260 if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) { 261 LOG.debug("Replicating {} bulk loaded data", entry.getKey().toString()); 262 Configuration providerConf = this.provider.getConf(this.conf, replicationClusterId); 263 try (HFileReplicator hFileReplicator = new HFileReplicator(providerConf, 264 sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf, 265 getConnection(), entry.getKey())) { 266 hFileReplicator.replicate(); 267 LOG.debug("Finished replicating {} bulk loaded data", entry.getKey().toString()); 268 } 269 } 270 } 271 } 272 273 int size = entries.size(); 274 this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime()); 275 this.metrics.applyBatch(size + hfilesReplicated, hfilesReplicated); 276 this.totalReplicatedEdits.addAndGet(totalReplicated); 277 } catch (IOException ex) { 278 LOG.error("Unable to accept edit because:", ex); 279 throw ex; 280 } 281 } 282 283 private void buildBulkLoadHFileMap( 284 final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table, 285 BulkLoadDescriptor bld) throws IOException { 286 List<StoreDescriptor> storesList = bld.getStoresList(); 287 int storesSize = storesList.size(); 288 for (int j = 0; j < storesSize; j++) { 289 StoreDescriptor storeDescriptor = storesList.get(j); 290 List<String> storeFileList = storeDescriptor.getStoreFileList(); 291 int storeFilesSize = storeFileList.size(); 292 hfilesReplicated += storeFilesSize; 293 for (int k = 0; k < storeFilesSize; k++) { 294 byte[] family = storeDescriptor.getFamilyName().toByteArray(); 295 296 // Build hfile relative path from its namespace 297 String pathToHfileFromNS = getHFilePath(table, bld, storeFileList.get(k), family); 298 String tableName = table.getNameWithNamespaceInclAsString(); 299 List<Pair<byte[], List<String>>> familyHFilePathsList = bulkLoadHFileMap.get(tableName); 300 if (familyHFilePathsList != null) { 301 boolean foundFamily = false; 302 for (Pair<byte[], List<String>> familyHFilePathsPair : familyHFilePathsList) { 303 if (Bytes.equals(familyHFilePathsPair.getFirst(), family)) { 304 // Found family already present, just add the path to the existing list 305 familyHFilePathsPair.getSecond().add(pathToHfileFromNS); 306 foundFamily = true; 307 break; 308 } 309 } 310 if (!foundFamily) { 311 // Family not found, add this family and its hfile paths pair to the list 312 addFamilyAndItsHFilePathToTableInMap(family, pathToHfileFromNS, familyHFilePathsList); 313 } 314 } else { 315 // Add this table entry into the map 316 addNewTableEntryInMap(bulkLoadHFileMap, family, pathToHfileFromNS, tableName); 317 } 318 } 319 } 320 } 321 322 private void addFamilyAndItsHFilePathToTableInMap(byte[] family, String pathToHfileFromNS, 323 List<Pair<byte[], List<String>>> familyHFilePathsList) { 324 List<String> hfilePaths = new ArrayList<>(1); 325 hfilePaths.add(pathToHfileFromNS); 326 familyHFilePathsList.add(new Pair<>(family, hfilePaths)); 327 } 328 329 private void addNewTableEntryInMap( 330 final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, byte[] family, 331 String pathToHfileFromNS, String tableName) { 332 List<String> hfilePaths = new ArrayList<>(1); 333 hfilePaths.add(pathToHfileFromNS); 334 Pair<byte[], List<String>> newFamilyHFilePathsPair = new Pair<>(family, hfilePaths); 335 List<Pair<byte[], List<String>>> newFamilyHFilePathsList = new ArrayList<>(); 336 newFamilyHFilePathsList.add(newFamilyHFilePathsPair); 337 bulkLoadHFileMap.put(tableName, newFamilyHFilePathsList); 338 } 339 340 private String getHFilePath(TableName table, BulkLoadDescriptor bld, String storeFile, 341 byte[] family) { 342 return new StringBuilder(100).append(table.getNamespaceAsString()).append(Path.SEPARATOR) 343 .append(table.getQualifierAsString()).append(Path.SEPARATOR) 344 .append(Bytes.toString(bld.getEncodedRegionName().toByteArray())).append(Path.SEPARATOR) 345 .append(Bytes.toString(family)).append(Path.SEPARATOR).append(storeFile).toString(); 346 } 347 348 /** 349 * @param previousCell 350 * @param cell 351 * @return True if we have crossed over onto a new row or type 352 */ 353 private boolean isNewRowOrType(final Cell previousCell, final Cell cell) { 354 return previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() || 355 !CellUtil.matchingRows(previousCell, cell); 356 } 357 358 private java.util.UUID toUUID(final HBaseProtos.UUID uuid) { 359 return new java.util.UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()); 360 } 361 362 /** 363 * Simple helper to a map from key to (a list of) values 364 * TODO: Make a general utility method 365 * @param map 366 * @param key1 367 * @param key2 368 * @param value 369 * @return the list of values corresponding to key1 and key2 370 */ 371 private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2, List<V>>> map, K1 key1, K2 key2, 372 V value) { 373 Map<K2, List<V>> innerMap = map.computeIfAbsent(key1, k -> new HashMap<>()); 374 List<V> values = innerMap.computeIfAbsent(key2, k -> new ArrayList<>()); 375 values.add(value); 376 return values; 377 } 378 379 /** 380 * stop the thread pool executor. It is called when the regionserver is stopped. 381 */ 382 public void stopReplicationSinkServices() { 383 try { 384 if (this.sharedHtableCon != null) { 385 synchronized (sharedHtableConLock) { 386 if (this.sharedHtableCon != null) { 387 this.sharedHtableCon.close(); 388 this.sharedHtableCon = null; 389 } 390 } 391 } 392 } catch (IOException e) { 393 LOG.warn("IOException while closing the connection", e); // ignoring as we are closing. 394 } 395 } 396 397 398 /** 399 * Do the changes and handle the pool 400 * @param tableName table to insert into 401 * @param allRows list of actions 402 * @param batchRowSizeThreshold rowSize threshold for batch mutation 403 */ 404 private void batch(TableName tableName, Collection<List<Row>> allRows, int batchRowSizeThreshold) 405 throws IOException { 406 if (allRows.isEmpty()) { 407 return; 408 } 409 Table table = null; 410 try { 411 Connection connection = getConnection(); 412 table = connection.getTable(tableName); 413 for (List<Row> rows : allRows) { 414 List<List<Row>> batchRows; 415 if (rows.size() > batchRowSizeThreshold) { 416 batchRows = Lists.partition(rows, batchRowSizeThreshold); 417 } else { 418 batchRows = Collections.singletonList(rows); 419 } 420 for(List<Row> rowList:batchRows){ 421 table.batch(rowList, null); 422 } 423 } 424 } catch (RetriesExhaustedWithDetailsException rewde) { 425 for (Throwable ex : rewde.getCauses()) { 426 if (ex instanceof TableNotFoundException) { 427 throw new TableNotFoundException("'" + tableName + "'"); 428 } 429 } 430 throw rewde; 431 } catch (InterruptedException ix) { 432 throw (InterruptedIOException) new InterruptedIOException().initCause(ix); 433 } finally { 434 if (table != null) { 435 table.close(); 436 } 437 } 438 } 439 440 private Connection getConnection() throws IOException { 441 // See https://en.wikipedia.org/wiki/Double-checked_locking 442 Connection connection = sharedHtableCon; 443 if (connection == null) { 444 synchronized (sharedHtableConLock) { 445 connection = sharedHtableCon; 446 if (connection == null) { 447 connection = sharedHtableCon = ConnectionFactory.createConnection(conf); 448 } 449 } 450 } 451 return connection; 452 } 453 454 /** 455 * Get a string representation of this sink's metrics 456 * @return string with the total replicated edits count and the date 457 * of the last edit that was applied 458 */ 459 public String getStats() { 460 return this.totalReplicatedEdits.get() == 0 ? "" : "Sink: " + 461 "age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp() + 462 ", total replicated edits: " + this.totalReplicatedEdits; 463 } 464 465 /** 466 * Get replication Sink Metrics 467 * @return MetricsSink 468 */ 469 public MetricsSink getSinkMetrics() { 470 return this.metrics; 471 } 472}