001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.replication.regionserver; 020 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.util.ArrayList; 024import java.util.Collection; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028import java.util.Map.Entry; 029import java.util.TreeMap; 030import java.util.UUID; 031import java.util.concurrent.atomic.AtomicLong; 032import org.apache.commons.lang3.StringUtils; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.CellScanner; 037import org.apache.hadoop.hbase.CellUtil; 038import org.apache.hadoop.hbase.HBaseConfiguration; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.Stoppable; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.TableNotFoundException; 043import org.apache.yetus.audience.InterfaceAudience; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046import org.apache.hadoop.hbase.client.Connection; 047import org.apache.hadoop.hbase.client.ConnectionFactory; 048import org.apache.hadoop.hbase.client.Delete; 049import org.apache.hadoop.hbase.client.Mutation; 050import org.apache.hadoop.hbase.client.Put; 051import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; 052import org.apache.hadoop.hbase.client.Row; 053import org.apache.hadoop.hbase.client.Table; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; 055import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 057import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 058import org.apache.hadoop.hbase.wal.WALEdit; 059import org.apache.hadoop.hbase.util.Bytes; 060import org.apache.hadoop.hbase.util.Pair; 061 062/** 063 * <p> 064 * This class is responsible for replicating the edits coming 065 * from another cluster. 066 * </p><p> 067 * This replication process is currently waiting for the edits to be applied 068 * before the method can return. This means that the replication of edits 069 * is synchronized (after reading from WALs in ReplicationSource) and that a 070 * single region server cannot receive edits from two sources at the same time 071 * </p><p> 072 * This class uses the native HBase client in order to replicate entries. 073 * </p> 074 * 075 * TODO make this class more like ReplicationSource wrt log handling 076 */ 077@InterfaceAudience.Private 078public class ReplicationSink { 079 080 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSink.class); 081 private final Configuration conf; 082 // Volatile because of note in here -- look for double-checked locking: 083 // http://www.oracle.com/technetwork/articles/javase/bloch-effective-08-qa-140880.html 084 private volatile Connection sharedHtableCon; 085 private final MetricsSink metrics; 086 private final AtomicLong totalReplicatedEdits = new AtomicLong(); 087 private final Object sharedHtableConLock = new Object(); 088 // Number of hfiles that we successfully replicated 089 private long hfilesReplicated = 0; 090 private SourceFSConfigurationProvider provider; 091 private WALEntrySinkFilter walEntrySinkFilter; 092 093 /** 094 * Create a sink for replication 095 * 096 * @param conf conf object 097 * @param stopper boolean to tell this thread to stop 098 * @throws IOException thrown when HDFS goes bad or bad file name 099 */ 100 public ReplicationSink(Configuration conf, Stoppable stopper) 101 throws IOException { 102 this.conf = HBaseConfiguration.create(conf); 103 decorateConf(); 104 this.metrics = new MetricsSink(); 105 this.walEntrySinkFilter = setupWALEntrySinkFilter(); 106 String className = 107 conf.get("hbase.replication.source.fs.conf.provider", 108 DefaultSourceFSConfigurationProvider.class.getCanonicalName()); 109 try { 110 @SuppressWarnings("rawtypes") 111 Class c = Class.forName(className); 112 this.provider = (SourceFSConfigurationProvider) c.getDeclaredConstructor().newInstance(); 113 } catch (Exception e) { 114 throw new IllegalArgumentException("Configured source fs configuration provider class " 115 + className + " throws error.", e); 116 } 117 } 118 119 private WALEntrySinkFilter setupWALEntrySinkFilter() throws IOException { 120 Class<?> walEntryFilterClass = 121 this.conf.getClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY, null); 122 WALEntrySinkFilter filter = null; 123 try { 124 filter = walEntryFilterClass == null? null: 125 (WALEntrySinkFilter)walEntryFilterClass.getDeclaredConstructor().newInstance(); 126 } catch (Exception e) { 127 LOG.warn("Failed to instantiate " + walEntryFilterClass); 128 } 129 if (filter != null) { 130 filter.init(getConnection()); 131 } 132 return filter; 133 } 134 135 /** 136 * decorate the Configuration object to make replication more receptive to delays: 137 * lessen the timeout and numTries. 138 */ 139 private void decorateConf() { 140 this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 141 this.conf.getInt("replication.sink.client.retries.number", 4)); 142 this.conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 143 this.conf.getInt("replication.sink.client.ops.timeout", 10000)); 144 String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY); 145 if (StringUtils.isNotEmpty(replicationCodec)) { 146 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec); 147 } 148 // use server ZK cluster for replication, so we unset the client ZK related properties if any 149 if (this.conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) { 150 this.conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM); 151 } 152 } 153 154 /** 155 * Replicate this array of entries directly into the local cluster using the native client. Only 156 * operates against raw protobuf type saving on a conversion from pb to pojo. 157 * @param replicationClusterId Id which will uniquely identify source cluster FS client 158 * configurations in the replication configuration directory 159 * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace 160 * directory 161 * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory 162 * @throws IOException If failed to replicate the data 163 */ 164 public void replicateEntries(List<WALEntry> entries, final CellScanner cells, 165 String replicationClusterId, String sourceBaseNamespaceDirPath, 166 String sourceHFileArchiveDirPath) throws IOException { 167 if (entries.isEmpty()) return; 168 // Very simple optimization where we batch sequences of rows going 169 // to the same table. 170 try { 171 long totalReplicated = 0; 172 // Map of table => list of Rows, grouped by cluster id, we only want to flushCommits once per 173 // invocation of this method per table and cluster id. 174 Map<TableName, Map<List<UUID>, List<Row>>> rowMap = new TreeMap<>(); 175 176 Map<List<String>, Map<String, List<Pair<byte[], List<String>>>>> bulkLoadsPerClusters = null; 177 for (WALEntry entry : entries) { 178 TableName table = 179 TableName.valueOf(entry.getKey().getTableName().toByteArray()); 180 if (this.walEntrySinkFilter != null) { 181 if (this.walEntrySinkFilter.filter(table, entry.getKey().getWriteTime())) { 182 // Skip Cells in CellScanner associated with this entry. 183 int count = entry.getAssociatedCellCount(); 184 for (int i = 0; i < count; i++) { 185 // Throw index out of bounds if our cell count is off 186 if (!cells.advance()) { 187 throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); 188 } 189 } 190 continue; 191 } 192 } 193 Cell previousCell = null; 194 Mutation mutation = null; 195 int count = entry.getAssociatedCellCount(); 196 for (int i = 0; i < count; i++) { 197 // Throw index out of bounds if our cell count is off 198 if (!cells.advance()) { 199 throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); 200 } 201 Cell cell = cells.current(); 202 // Handle bulk load hfiles replication 203 if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { 204 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); 205 if(bld.getReplicate()) { 206 if (bulkLoadsPerClusters == null) { 207 bulkLoadsPerClusters = new HashMap<>(); 208 } 209 // Map of table name Vs list of pair of family and list of 210 // hfile paths from its namespace 211 Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = 212 bulkLoadsPerClusters.get(bld.getClusterIdsList()); 213 if (bulkLoadHFileMap == null) { 214 bulkLoadHFileMap = new HashMap<>(); 215 bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap); 216 } 217 buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld); 218 } 219 } else { 220 // Handle wal replication 221 if (isNewRowOrType(previousCell, cell)) { 222 // Create new mutation 223 mutation = 224 CellUtil.isDelete(cell) ? new Delete(cell.getRowArray(), cell.getRowOffset(), 225 cell.getRowLength()) : new Put(cell.getRowArray(), cell.getRowOffset(), 226 cell.getRowLength()); 227 List<UUID> clusterIds = new ArrayList<>(entry.getKey().getClusterIdsList().size()); 228 for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) { 229 clusterIds.add(toUUID(clusterId)); 230 } 231 mutation.setClusterIds(clusterIds); 232 addToHashMultiMap(rowMap, table, clusterIds, mutation); 233 } 234 if (CellUtil.isDelete(cell)) { 235 ((Delete) mutation).add(cell); 236 } else { 237 ((Put) mutation).add(cell); 238 } 239 previousCell = cell; 240 } 241 } 242 totalReplicated++; 243 } 244 245 // TODO Replicating mutations and bulk loaded data can be made parallel 246 if (!rowMap.isEmpty()) { 247 LOG.debug("Started replicating mutations."); 248 for (Entry<TableName, Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) { 249 batch(entry.getKey(), entry.getValue().values()); 250 } 251 LOG.debug("Finished replicating mutations."); 252 } 253 254 if(bulkLoadsPerClusters != null) { 255 for (Entry<List<String>, Map<String, List<Pair<byte[], 256 List<String>>>>> entry : bulkLoadsPerClusters.entrySet()) { 257 Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = entry.getValue(); 258 if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) { 259 LOG.debug("Replicating {} bulk loaded data", entry.getKey().toString()); 260 Configuration providerConf = this.provider.getConf(this.conf, replicationClusterId); 261 try (HFileReplicator hFileReplicator = new HFileReplicator(providerConf, 262 sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf, 263 getConnection(), entry.getKey())) { 264 hFileReplicator.replicate(); 265 LOG.debug("Finished replicating {} bulk loaded data", entry.getKey().toString()); 266 } 267 } 268 } 269 } 270 271 int size = entries.size(); 272 this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime()); 273 this.metrics.applyBatch(size + hfilesReplicated, hfilesReplicated); 274 this.totalReplicatedEdits.addAndGet(totalReplicated); 275 } catch (IOException ex) { 276 LOG.error("Unable to accept edit because:", ex); 277 throw ex; 278 } 279 } 280 281 private void buildBulkLoadHFileMap( 282 final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table, 283 BulkLoadDescriptor bld) throws IOException { 284 List<StoreDescriptor> storesList = bld.getStoresList(); 285 int storesSize = storesList.size(); 286 for (int j = 0; j < storesSize; j++) { 287 StoreDescriptor storeDescriptor = storesList.get(j); 288 List<String> storeFileList = storeDescriptor.getStoreFileList(); 289 int storeFilesSize = storeFileList.size(); 290 hfilesReplicated += storeFilesSize; 291 for (int k = 0; k < storeFilesSize; k++) { 292 byte[] family = storeDescriptor.getFamilyName().toByteArray(); 293 294 // Build hfile relative path from its namespace 295 String pathToHfileFromNS = getHFilePath(table, bld, storeFileList.get(k), family); 296 String tableName = table.getNameWithNamespaceInclAsString(); 297 List<Pair<byte[], List<String>>> familyHFilePathsList = bulkLoadHFileMap.get(tableName); 298 if (familyHFilePathsList != null) { 299 boolean foundFamily = false; 300 for (Pair<byte[], List<String>> familyHFilePathsPair : familyHFilePathsList) { 301 if (Bytes.equals(familyHFilePathsPair.getFirst(), family)) { 302 // Found family already present, just add the path to the existing list 303 familyHFilePathsPair.getSecond().add(pathToHfileFromNS); 304 foundFamily = true; 305 break; 306 } 307 } 308 if (!foundFamily) { 309 // Family not found, add this family and its hfile paths pair to the list 310 addFamilyAndItsHFilePathToTableInMap(family, pathToHfileFromNS, familyHFilePathsList); 311 } 312 } else { 313 // Add this table entry into the map 314 addNewTableEntryInMap(bulkLoadHFileMap, family, pathToHfileFromNS, tableName); 315 } 316 } 317 } 318 } 319 320 private void addFamilyAndItsHFilePathToTableInMap(byte[] family, String pathToHfileFromNS, 321 List<Pair<byte[], List<String>>> familyHFilePathsList) { 322 List<String> hfilePaths = new ArrayList<>(1); 323 hfilePaths.add(pathToHfileFromNS); 324 familyHFilePathsList.add(new Pair<>(family, hfilePaths)); 325 } 326 327 private void addNewTableEntryInMap( 328 final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, byte[] family, 329 String pathToHfileFromNS, String tableName) { 330 List<String> hfilePaths = new ArrayList<>(1); 331 hfilePaths.add(pathToHfileFromNS); 332 Pair<byte[], List<String>> newFamilyHFilePathsPair = new Pair<>(family, hfilePaths); 333 List<Pair<byte[], List<String>>> newFamilyHFilePathsList = new ArrayList<>(); 334 newFamilyHFilePathsList.add(newFamilyHFilePathsPair); 335 bulkLoadHFileMap.put(tableName, newFamilyHFilePathsList); 336 } 337 338 private String getHFilePath(TableName table, BulkLoadDescriptor bld, String storeFile, 339 byte[] family) { 340 return new StringBuilder(100).append(table.getNamespaceAsString()).append(Path.SEPARATOR) 341 .append(table.getQualifierAsString()).append(Path.SEPARATOR) 342 .append(Bytes.toString(bld.getEncodedRegionName().toByteArray())).append(Path.SEPARATOR) 343 .append(Bytes.toString(family)).append(Path.SEPARATOR).append(storeFile).toString(); 344 } 345 346 /** 347 * @param previousCell 348 * @param cell 349 * @return True if we have crossed over onto a new row or type 350 */ 351 private boolean isNewRowOrType(final Cell previousCell, final Cell cell) { 352 return previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() || 353 !CellUtil.matchingRows(previousCell, cell); 354 } 355 356 private java.util.UUID toUUID(final HBaseProtos.UUID uuid) { 357 return new java.util.UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()); 358 } 359 360 /** 361 * Simple helper to a map from key to (a list of) values 362 * TODO: Make a general utility method 363 * @param map 364 * @param key1 365 * @param key2 366 * @param value 367 * @return the list of values corresponding to key1 and key2 368 */ 369 private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2,List<V>>> map, K1 key1, K2 key2, V value) { 370 Map<K2,List<V>> innerMap = map.get(key1); 371 if (innerMap == null) { 372 innerMap = new HashMap<>(); 373 map.put(key1, innerMap); 374 } 375 List<V> values = innerMap.get(key2); 376 if (values == null) { 377 values = new ArrayList<>(); 378 innerMap.put(key2, values); 379 } 380 values.add(value); 381 return values; 382 } 383 384 /** 385 * stop the thread pool executor. It is called when the regionserver is stopped. 386 */ 387 public void stopReplicationSinkServices() { 388 try { 389 if (this.sharedHtableCon != null) { 390 synchronized (sharedHtableConLock) { 391 if (this.sharedHtableCon != null) { 392 this.sharedHtableCon.close(); 393 this.sharedHtableCon = null; 394 } 395 } 396 } 397 } catch (IOException e) { 398 LOG.warn("IOException while closing the connection", e); // ignoring as we are closing. 399 } 400 } 401 402 403 /** 404 * Do the changes and handle the pool 405 * @param tableName table to insert into 406 * @param allRows list of actions 407 * @throws IOException 408 */ 409 protected void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException { 410 if (allRows.isEmpty()) { 411 return; 412 } 413 Table table = null; 414 try { 415 Connection connection = getConnection(); 416 table = connection.getTable(tableName); 417 for (List<Row> rows : allRows) { 418 table.batch(rows, null); 419 } 420 } catch (RetriesExhaustedWithDetailsException rewde) { 421 for (Throwable ex : rewde.getCauses()) { 422 if (ex instanceof TableNotFoundException) { 423 throw new TableNotFoundException("'" + tableName + "'"); 424 } 425 } 426 throw rewde; 427 } catch (InterruptedException ix) { 428 throw (InterruptedIOException) new InterruptedIOException().initCause(ix); 429 } finally { 430 if (table != null) { 431 table.close(); 432 } 433 } 434 } 435 436 private Connection getConnection() throws IOException { 437 // See https://en.wikipedia.org/wiki/Double-checked_locking 438 Connection connection = sharedHtableCon; 439 if (connection == null) { 440 synchronized (sharedHtableConLock) { 441 connection = sharedHtableCon; 442 if (connection == null) { 443 connection = sharedHtableCon = ConnectionFactory.createConnection(conf); 444 } 445 } 446 } 447 return connection; 448 } 449 450 /** 451 * Get a string representation of this sink's metrics 452 * @return string with the total replicated edits count and the date 453 * of the last edit that was applied 454 */ 455 public String getStats() { 456 return this.totalReplicatedEdits.get() == 0 ? "" : "Sink: " + 457 "age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp() + 458 ", total replicated edits: " + this.totalReplicatedEdits; 459 } 460 461 /** 462 * Get replication Sink Metrics 463 * @return MetricsSink 464 */ 465 public MetricsSink getSinkMetrics() { 466 return this.metrics; 467 } 468}