001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.replication.regionserver; 020 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.util.ArrayList; 024import java.util.Collection; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028import java.util.Map.Entry; 029import java.util.TreeMap; 030import java.util.UUID; 031import java.util.concurrent.atomic.AtomicLong; 032 033import org.apache.commons.lang3.StringUtils; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.CellScanner; 038import org.apache.hadoop.hbase.CellUtil; 039import org.apache.hadoop.hbase.HBaseConfiguration; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.Stoppable; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.TableNotFoundException; 044import org.apache.yetus.audience.InterfaceAudience; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047import org.apache.hadoop.hbase.client.Connection; 048import org.apache.hadoop.hbase.client.ConnectionFactory; 049import org.apache.hadoop.hbase.client.Delete; 050import org.apache.hadoop.hbase.client.Mutation; 051import org.apache.hadoop.hbase.client.Put; 052import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; 053import org.apache.hadoop.hbase.client.Row; 054import org.apache.hadoop.hbase.client.Table; 055import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 057import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 058import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 059import org.apache.hadoop.hbase.wal.WALEdit; 060import org.apache.hadoop.hbase.util.Bytes; 061import org.apache.hadoop.hbase.util.Pair; 062 063/** 064 * <p> 065 * This class is responsible for replicating the edits coming 066 * from another cluster. 067 * </p><p> 068 * This replication process is currently waiting for the edits to be applied 069 * before the method can return. This means that the replication of edits 070 * is synchronized (after reading from WALs in ReplicationSource) and that a 071 * single region server cannot receive edits from two sources at the same time 072 * </p><p> 073 * This class uses the native HBase client in order to replicate entries. 074 * </p> 075 * 076 * TODO make this class more like ReplicationSource wrt log handling 077 */ 078@InterfaceAudience.Private 079public class ReplicationSink { 080 081 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSink.class); 082 private final Configuration conf; 083 // Volatile because of note in here -- look for double-checked locking: 084 // http://www.oracle.com/technetwork/articles/javase/bloch-effective-08-qa-140880.html 085 private volatile Connection sharedHtableCon; 086 private final MetricsSink metrics; 087 private final AtomicLong totalReplicatedEdits = new AtomicLong(); 088 private final Object sharedHtableConLock = new Object(); 089 // Number of hfiles that we successfully replicated 090 private long hfilesReplicated = 0; 091 private SourceFSConfigurationProvider provider; 092 private WALEntrySinkFilter walEntrySinkFilter; 093 094 /** 095 * Create a sink for replication 096 * 097 * @param conf conf object 098 * @param stopper boolean to tell this thread to stop 099 * @throws IOException thrown when HDFS goes bad or bad file name 100 */ 101 public ReplicationSink(Configuration conf, Stoppable stopper) 102 throws IOException { 103 this.conf = HBaseConfiguration.create(conf); 104 decorateConf(); 105 this.metrics = new MetricsSink(); 106 this.walEntrySinkFilter = setupWALEntrySinkFilter(); 107 String className = 108 conf.get("hbase.replication.source.fs.conf.provider", 109 DefaultSourceFSConfigurationProvider.class.getCanonicalName()); 110 try { 111 @SuppressWarnings("rawtypes") 112 Class c = Class.forName(className); 113 this.provider = (SourceFSConfigurationProvider) c.getDeclaredConstructor().newInstance(); 114 } catch (Exception e) { 115 throw new IllegalArgumentException("Configured source fs configuration provider class " 116 + className + " throws error.", e); 117 } 118 } 119 120 private WALEntrySinkFilter setupWALEntrySinkFilter() throws IOException { 121 Class<?> walEntryFilterClass = 122 this.conf.getClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY, null); 123 WALEntrySinkFilter filter = null; 124 try { 125 filter = walEntryFilterClass == null? null: 126 (WALEntrySinkFilter)walEntryFilterClass.getDeclaredConstructor().newInstance(); 127 } catch (Exception e) { 128 LOG.warn("Failed to instantiate " + walEntryFilterClass); 129 } 130 if (filter != null) { 131 filter.init(getConnection()); 132 } 133 return filter; 134 } 135 136 /** 137 * decorate the Configuration object to make replication more receptive to delays: 138 * lessen the timeout and numTries. 139 */ 140 private void decorateConf() { 141 this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 142 this.conf.getInt("replication.sink.client.retries.number", 4)); 143 this.conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 144 this.conf.getInt("replication.sink.client.ops.timeout", 10000)); 145 String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY); 146 if (StringUtils.isNotEmpty(replicationCodec)) { 147 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec); 148 } 149 // use server ZK cluster for replication, so we unset the client ZK related properties if any 150 if (this.conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) { 151 this.conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM); 152 } 153 } 154 155 /** 156 * Replicate this array of entries directly into the local cluster using the native client. Only 157 * operates against raw protobuf type saving on a conversion from pb to pojo. 158 * @param replicationClusterId Id which will uniquely identify source cluster FS client 159 * configurations in the replication configuration directory 160 * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace 161 * directory 162 * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory 163 * @throws IOException If failed to replicate the data 164 */ 165 public void replicateEntries(List<WALEntry> entries, final CellScanner cells, 166 String replicationClusterId, String sourceBaseNamespaceDirPath, 167 String sourceHFileArchiveDirPath) throws IOException { 168 if (entries.isEmpty()) return; 169 // Very simple optimization where we batch sequences of rows going 170 // to the same table. 171 try { 172 long totalReplicated = 0; 173 // Map of table => list of Rows, grouped by cluster id, we only want to flushCommits once per 174 // invocation of this method per table and cluster id. 175 Map<TableName, Map<List<UUID>, List<Row>>> rowMap = new TreeMap<>(); 176 177 Map<List<String>, Map<String, List<Pair<byte[], List<String>>>>> bulkLoadsPerClusters = null; 178 for (WALEntry entry : entries) { 179 TableName table = 180 TableName.valueOf(entry.getKey().getTableName().toByteArray()); 181 if (this.walEntrySinkFilter != null) { 182 if (this.walEntrySinkFilter.filter(table, entry.getKey().getWriteTime())) { 183 // Skip Cells in CellScanner associated with this entry. 184 int count = entry.getAssociatedCellCount(); 185 for (int i = 0; i < count; i++) { 186 // Throw index out of bounds if our cell count is off 187 if (!cells.advance()) { 188 throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); 189 } 190 } 191 continue; 192 } 193 } 194 Cell previousCell = null; 195 Mutation mutation = null; 196 int count = entry.getAssociatedCellCount(); 197 for (int i = 0; i < count; i++) { 198 // Throw index out of bounds if our cell count is off 199 if (!cells.advance()) { 200 throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); 201 } 202 Cell cell = cells.current(); 203 // Handle bulk load hfiles replication 204 if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { 205 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); 206 if(bld.getReplicate()) { 207 if (bulkLoadsPerClusters == null) { 208 bulkLoadsPerClusters = new HashMap<>(); 209 } 210 // Map of table name Vs list of pair of family and list of 211 // hfile paths from its namespace 212 Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = 213 bulkLoadsPerClusters.get(bld.getClusterIdsList()); 214 if (bulkLoadHFileMap == null) { 215 bulkLoadHFileMap = new HashMap<>(); 216 bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap); 217 } 218 buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld); 219 } 220 } else { 221 // Handle wal replication 222 if (isNewRowOrType(previousCell, cell)) { 223 // Create new mutation 224 mutation = 225 CellUtil.isDelete(cell) ? new Delete(cell.getRowArray(), cell.getRowOffset(), 226 cell.getRowLength()) : new Put(cell.getRowArray(), cell.getRowOffset(), 227 cell.getRowLength()); 228 List<UUID> clusterIds = new ArrayList<>(entry.getKey().getClusterIdsList().size()); 229 for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) { 230 clusterIds.add(toUUID(clusterId)); 231 } 232 mutation.setClusterIds(clusterIds); 233 addToHashMultiMap(rowMap, table, clusterIds, mutation); 234 } 235 if (CellUtil.isDelete(cell)) { 236 ((Delete) mutation).add(cell); 237 } else { 238 ((Put) mutation).add(cell); 239 } 240 previousCell = cell; 241 } 242 } 243 totalReplicated++; 244 } 245 246 // TODO Replicating mutations and bulk loaded data can be made parallel 247 if (!rowMap.isEmpty()) { 248 LOG.debug("Started replicating mutations."); 249 for (Entry<TableName, Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) { 250 batch(entry.getKey(), entry.getValue().values()); 251 } 252 LOG.debug("Finished replicating mutations."); 253 } 254 255 if(bulkLoadsPerClusters != null) { 256 for (Entry<List<String>, Map<String, List<Pair<byte[], 257 List<String>>>>> entry : bulkLoadsPerClusters.entrySet()) { 258 Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = entry.getValue(); 259 if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) { 260 if(LOG.isDebugEnabled()) { 261 LOG.debug("Started replicating bulk loaded data from cluster ids: {}.", 262 entry.getKey().toString()); 263 } 264 HFileReplicator hFileReplicator = 265 new HFileReplicator(this.provider.getConf(this.conf, replicationClusterId), 266 sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf, 267 getConnection(), entry.getKey()); 268 hFileReplicator.replicate(); 269 if(LOG.isDebugEnabled()) { 270 LOG.debug("Finished replicating bulk loaded data from cluster id: {}", 271 entry.getKey().toString()); 272 } 273 } 274 } 275 } 276 277 int size = entries.size(); 278 this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime()); 279 this.metrics.applyBatch(size + hfilesReplicated, hfilesReplicated); 280 this.totalReplicatedEdits.addAndGet(totalReplicated); 281 } catch (IOException ex) { 282 LOG.error("Unable to accept edit because:", ex); 283 throw ex; 284 } 285 } 286 287 private void buildBulkLoadHFileMap( 288 final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table, 289 BulkLoadDescriptor bld) throws IOException { 290 List<StoreDescriptor> storesList = bld.getStoresList(); 291 int storesSize = storesList.size(); 292 for (int j = 0; j < storesSize; j++) { 293 StoreDescriptor storeDescriptor = storesList.get(j); 294 List<String> storeFileList = storeDescriptor.getStoreFileList(); 295 int storeFilesSize = storeFileList.size(); 296 hfilesReplicated += storeFilesSize; 297 for (int k = 0; k < storeFilesSize; k++) { 298 byte[] family = storeDescriptor.getFamilyName().toByteArray(); 299 300 // Build hfile relative path from its namespace 301 String pathToHfileFromNS = getHFilePath(table, bld, storeFileList.get(k), family); 302 String tableName = table.getNameWithNamespaceInclAsString(); 303 List<Pair<byte[], List<String>>> familyHFilePathsList = bulkLoadHFileMap.get(tableName); 304 if (familyHFilePathsList != null) { 305 boolean foundFamily = false; 306 for (Pair<byte[], List<String>> familyHFilePathsPair : familyHFilePathsList) { 307 if (Bytes.equals(familyHFilePathsPair.getFirst(), family)) { 308 // Found family already present, just add the path to the existing list 309 familyHFilePathsPair.getSecond().add(pathToHfileFromNS); 310 foundFamily = true; 311 break; 312 } 313 } 314 if (!foundFamily) { 315 // Family not found, add this family and its hfile paths pair to the list 316 addFamilyAndItsHFilePathToTableInMap(family, pathToHfileFromNS, familyHFilePathsList); 317 } 318 } else { 319 // Add this table entry into the map 320 addNewTableEntryInMap(bulkLoadHFileMap, family, pathToHfileFromNS, tableName); 321 } 322 } 323 } 324 } 325 326 private void addFamilyAndItsHFilePathToTableInMap(byte[] family, String pathToHfileFromNS, 327 List<Pair<byte[], List<String>>> familyHFilePathsList) { 328 List<String> hfilePaths = new ArrayList<>(1); 329 hfilePaths.add(pathToHfileFromNS); 330 familyHFilePathsList.add(new Pair<>(family, hfilePaths)); 331 } 332 333 private void addNewTableEntryInMap( 334 final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, byte[] family, 335 String pathToHfileFromNS, String tableName) { 336 List<String> hfilePaths = new ArrayList<>(1); 337 hfilePaths.add(pathToHfileFromNS); 338 Pair<byte[], List<String>> newFamilyHFilePathsPair = new Pair<>(family, hfilePaths); 339 List<Pair<byte[], List<String>>> newFamilyHFilePathsList = new ArrayList<>(); 340 newFamilyHFilePathsList.add(newFamilyHFilePathsPair); 341 bulkLoadHFileMap.put(tableName, newFamilyHFilePathsList); 342 } 343 344 private String getHFilePath(TableName table, BulkLoadDescriptor bld, String storeFile, 345 byte[] family) { 346 return new StringBuilder(100).append(table.getNamespaceAsString()).append(Path.SEPARATOR) 347 .append(table.getQualifierAsString()).append(Path.SEPARATOR) 348 .append(Bytes.toString(bld.getEncodedRegionName().toByteArray())).append(Path.SEPARATOR) 349 .append(Bytes.toString(family)).append(Path.SEPARATOR).append(storeFile).toString(); 350 } 351 352 /** 353 * @param previousCell 354 * @param cell 355 * @return True if we have crossed over onto a new row or type 356 */ 357 private boolean isNewRowOrType(final Cell previousCell, final Cell cell) { 358 return previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() || 359 !CellUtil.matchingRows(previousCell, cell); 360 } 361 362 private java.util.UUID toUUID(final HBaseProtos.UUID uuid) { 363 return new java.util.UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()); 364 } 365 366 /** 367 * Simple helper to a map from key to (a list of) values 368 * TODO: Make a general utility method 369 * @param map 370 * @param key1 371 * @param key2 372 * @param value 373 * @return the list of values corresponding to key1 and key2 374 */ 375 private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2,List<V>>> map, K1 key1, K2 key2, V value) { 376 Map<K2,List<V>> innerMap = map.get(key1); 377 if (innerMap == null) { 378 innerMap = new HashMap<>(); 379 map.put(key1, innerMap); 380 } 381 List<V> values = innerMap.get(key2); 382 if (values == null) { 383 values = new ArrayList<>(); 384 innerMap.put(key2, values); 385 } 386 values.add(value); 387 return values; 388 } 389 390 /** 391 * stop the thread pool executor. It is called when the regionserver is stopped. 392 */ 393 public void stopReplicationSinkServices() { 394 try { 395 if (this.sharedHtableCon != null) { 396 synchronized (sharedHtableConLock) { 397 if (this.sharedHtableCon != null) { 398 this.sharedHtableCon.close(); 399 this.sharedHtableCon = null; 400 } 401 } 402 } 403 } catch (IOException e) { 404 LOG.warn("IOException while closing the connection", e); // ignoring as we are closing. 405 } 406 } 407 408 409 /** 410 * Do the changes and handle the pool 411 * @param tableName table to insert into 412 * @param allRows list of actions 413 * @throws IOException 414 */ 415 protected void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException { 416 if (allRows.isEmpty()) { 417 return; 418 } 419 Table table = null; 420 try { 421 Connection connection = getConnection(); 422 table = connection.getTable(tableName); 423 for (List<Row> rows : allRows) { 424 table.batch(rows, null); 425 } 426 } catch (RetriesExhaustedWithDetailsException rewde) { 427 for (Throwable ex : rewde.getCauses()) { 428 if (ex instanceof TableNotFoundException) { 429 throw new TableNotFoundException("'" + tableName + "'"); 430 } 431 } 432 throw rewde; 433 } catch (InterruptedException ix) { 434 throw (InterruptedIOException) new InterruptedIOException().initCause(ix); 435 } finally { 436 if (table != null) { 437 table.close(); 438 } 439 } 440 } 441 442 private Connection getConnection() throws IOException { 443 // See https://en.wikipedia.org/wiki/Double-checked_locking 444 Connection connection = sharedHtableCon; 445 if (connection == null) { 446 synchronized (sharedHtableConLock) { 447 connection = sharedHtableCon; 448 if (connection == null) { 449 connection = sharedHtableCon = ConnectionFactory.createConnection(conf); 450 } 451 } 452 } 453 return connection; 454 } 455 456 /** 457 * Get a string representation of this sink's metrics 458 * @return string with the total replicated edits count and the date 459 * of the last edit that was applied 460 */ 461 public String getStats() { 462 return this.totalReplicatedEdits.get() == 0 ? "" : "Sink: " + 463 "age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp() + 464 ", total replicated edits: " + this.totalReplicatedEdits; 465 } 466 467 /** 468 * Get replication Sink Metrics 469 * @return MetricsSink 470 */ 471 public MetricsSink getSinkMetrics() { 472 return this.metrics; 473 } 474}