001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.replication.regionserver; 020 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.util.ArrayList; 024import java.util.Collection; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028import java.util.Map.Entry; 029import java.util.TreeMap; 030import java.util.UUID; 031import java.util.concurrent.atomic.AtomicLong; 032 033import org.apache.commons.lang3.StringUtils; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.CellScanner; 038import org.apache.hadoop.hbase.CellUtil; 039import org.apache.hadoop.hbase.HBaseConfiguration; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.Stoppable; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.TableNotFoundException; 044import org.apache.yetus.audience.InterfaceAudience; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047import org.apache.hadoop.hbase.client.Connection; 048import org.apache.hadoop.hbase.client.ConnectionFactory; 049import org.apache.hadoop.hbase.client.Delete; 050import org.apache.hadoop.hbase.client.Mutation; 051import org.apache.hadoop.hbase.client.Put; 052import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; 053import org.apache.hadoop.hbase.client.Row; 054import org.apache.hadoop.hbase.client.Table; 055import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 057import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 058import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 059import org.apache.hadoop.hbase.wal.WALEdit; 060import org.apache.hadoop.hbase.util.Bytes; 061import org.apache.hadoop.hbase.util.Pair; 062 063/** 064 * <p> 065 * This class is responsible for replicating the edits coming 066 * from another cluster. 067 * </p><p> 068 * This replication process is currently waiting for the edits to be applied 069 * before the method can return. This means that the replication of edits 070 * is synchronized (after reading from WALs in ReplicationSource) and that a 071 * single region server cannot receive edits from two sources at the same time 072 * </p><p> 073 * This class uses the native HBase client in order to replicate entries. 074 * </p> 075 * 076 * TODO make this class more like ReplicationSource wrt log handling 077 */ 078@InterfaceAudience.Private 079public class ReplicationSink { 080 081 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSink.class); 082 private final Configuration conf; 083 // Volatile because of note in here -- look for double-checked locking: 084 // http://www.oracle.com/technetwork/articles/javase/bloch-effective-08-qa-140880.html 085 private volatile Connection sharedHtableCon; 086 private final MetricsSink metrics; 087 private final AtomicLong totalReplicatedEdits = new AtomicLong(); 088 private final Object sharedHtableConLock = new Object(); 089 // Number of hfiles that we successfully replicated 090 private long hfilesReplicated = 0; 091 private SourceFSConfigurationProvider provider; 092 private WALEntrySinkFilter walEntrySinkFilter; 093 094 /** 095 * Create a sink for replication 096 * 097 * @param conf conf object 098 * @param stopper boolean to tell this thread to stop 099 * @throws IOException thrown when HDFS goes bad or bad file name 100 */ 101 public ReplicationSink(Configuration conf, Stoppable stopper) 102 throws IOException { 103 this.conf = HBaseConfiguration.create(conf); 104 decorateConf(); 105 this.metrics = new MetricsSink(); 106 this.walEntrySinkFilter = setupWALEntrySinkFilter(); 107 String className = 108 conf.get("hbase.replication.source.fs.conf.provider", 109 DefaultSourceFSConfigurationProvider.class.getCanonicalName()); 110 try { 111 @SuppressWarnings("rawtypes") 112 Class c = Class.forName(className); 113 this.provider = (SourceFSConfigurationProvider) c.getDeclaredConstructor().newInstance(); 114 } catch (Exception e) { 115 throw new IllegalArgumentException("Configured source fs configuration provider class " 116 + className + " throws error.", e); 117 } 118 } 119 120 private WALEntrySinkFilter setupWALEntrySinkFilter() throws IOException { 121 Class<?> walEntryFilterClass = 122 this.conf.getClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY, null); 123 WALEntrySinkFilter filter = null; 124 try { 125 filter = walEntryFilterClass == null? null: 126 (WALEntrySinkFilter)walEntryFilterClass.getDeclaredConstructor().newInstance(); 127 } catch (Exception e) { 128 LOG.warn("Failed to instantiate " + walEntryFilterClass); 129 } 130 if (filter != null) { 131 filter.init(getConnection()); 132 } 133 return filter; 134 } 135 136 /** 137 * decorate the Configuration object to make replication more receptive to delays: 138 * lessen the timeout and numTries. 139 */ 140 private void decorateConf() { 141 this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 142 this.conf.getInt("replication.sink.client.retries.number", 4)); 143 this.conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 144 this.conf.getInt("replication.sink.client.ops.timeout", 10000)); 145 String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY); 146 if (StringUtils.isNotEmpty(replicationCodec)) { 147 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec); 148 } 149 } 150 151 /** 152 * Replicate this array of entries directly into the local cluster using the native client. Only 153 * operates against raw protobuf type saving on a conversion from pb to pojo. 154 * @param replicationClusterId Id which will uniquely identify source cluster FS client 155 * configurations in the replication configuration directory 156 * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace 157 * directory 158 * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory 159 * @throws IOException If failed to replicate the data 160 */ 161 public void replicateEntries(List<WALEntry> entries, final CellScanner cells, 162 String replicationClusterId, String sourceBaseNamespaceDirPath, 163 String sourceHFileArchiveDirPath) throws IOException { 164 if (entries.isEmpty()) return; 165 // Very simple optimization where we batch sequences of rows going 166 // to the same table. 167 try { 168 long totalReplicated = 0; 169 // Map of table => list of Rows, grouped by cluster id, we only want to flushCommits once per 170 // invocation of this method per table and cluster id. 171 Map<TableName, Map<List<UUID>, List<Row>>> rowMap = new TreeMap<>(); 172 173 // Map of table name Vs list of pair of family and list of hfile paths from its namespace 174 Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = null; 175 176 for (WALEntry entry : entries) { 177 TableName table = 178 TableName.valueOf(entry.getKey().getTableName().toByteArray()); 179 if (this.walEntrySinkFilter != null) { 180 if (this.walEntrySinkFilter.filter(table, entry.getKey().getWriteTime())) { 181 // Skip Cells in CellScanner associated with this entry. 182 int count = entry.getAssociatedCellCount(); 183 for (int i = 0; i < count; i++) { 184 // Throw index out of bounds if our cell count is off 185 if (!cells.advance()) { 186 throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); 187 } 188 } 189 continue; 190 } 191 } 192 Cell previousCell = null; 193 Mutation mutation = null; 194 int count = entry.getAssociatedCellCount(); 195 for (int i = 0; i < count; i++) { 196 // Throw index out of bounds if our cell count is off 197 if (!cells.advance()) { 198 throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); 199 } 200 Cell cell = cells.current(); 201 // Handle bulk load hfiles replication 202 if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { 203 if (bulkLoadHFileMap == null) { 204 bulkLoadHFileMap = new HashMap<>(); 205 } 206 buildBulkLoadHFileMap(bulkLoadHFileMap, table, cell); 207 } else { 208 // Handle wal replication 209 if (isNewRowOrType(previousCell, cell)) { 210 // Create new mutation 211 mutation = 212 CellUtil.isDelete(cell) ? new Delete(cell.getRowArray(), cell.getRowOffset(), 213 cell.getRowLength()) : new Put(cell.getRowArray(), cell.getRowOffset(), 214 cell.getRowLength()); 215 List<UUID> clusterIds = new ArrayList<>(entry.getKey().getClusterIdsList().size()); 216 for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) { 217 clusterIds.add(toUUID(clusterId)); 218 } 219 mutation.setClusterIds(clusterIds); 220 addToHashMultiMap(rowMap, table, clusterIds, mutation); 221 } 222 if (CellUtil.isDelete(cell)) { 223 ((Delete) mutation).add(cell); 224 } else { 225 ((Put) mutation).add(cell); 226 } 227 previousCell = cell; 228 } 229 } 230 totalReplicated++; 231 } 232 233 // TODO Replicating mutations and bulk loaded data can be made parallel 234 if (!rowMap.isEmpty()) { 235 LOG.debug("Started replicating mutations."); 236 for (Entry<TableName, Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) { 237 batch(entry.getKey(), entry.getValue().values()); 238 } 239 LOG.debug("Finished replicating mutations."); 240 } 241 242 if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) { 243 LOG.debug("Started replicating bulk loaded data."); 244 HFileReplicator hFileReplicator = 245 new HFileReplicator(this.provider.getConf(this.conf, replicationClusterId), 246 sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf, 247 getConnection()); 248 hFileReplicator.replicate(); 249 LOG.debug("Finished replicating bulk loaded data."); 250 } 251 252 int size = entries.size(); 253 this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime()); 254 this.metrics.applyBatch(size + hfilesReplicated, hfilesReplicated); 255 this.totalReplicatedEdits.addAndGet(totalReplicated); 256 } catch (IOException ex) { 257 LOG.error("Unable to accept edit because:", ex); 258 throw ex; 259 } 260 } 261 262 private void buildBulkLoadHFileMap( 263 final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table, 264 Cell cell) throws IOException { 265 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); 266 List<StoreDescriptor> storesList = bld.getStoresList(); 267 int storesSize = storesList.size(); 268 for (int j = 0; j < storesSize; j++) { 269 StoreDescriptor storeDescriptor = storesList.get(j); 270 List<String> storeFileList = storeDescriptor.getStoreFileList(); 271 int storeFilesSize = storeFileList.size(); 272 hfilesReplicated += storeFilesSize; 273 for (int k = 0; k < storeFilesSize; k++) { 274 byte[] family = storeDescriptor.getFamilyName().toByteArray(); 275 276 // Build hfile relative path from its namespace 277 String pathToHfileFromNS = getHFilePath(table, bld, storeFileList.get(k), family); 278 279 String tableName = table.getNameWithNamespaceInclAsString(); 280 if (bulkLoadHFileMap.containsKey(tableName)) { 281 List<Pair<byte[], List<String>>> familyHFilePathsList = bulkLoadHFileMap.get(tableName); 282 boolean foundFamily = false; 283 for (int i = 0; i < familyHFilePathsList.size(); i++) { 284 Pair<byte[], List<String>> familyHFilePathsPair = familyHFilePathsList.get(i); 285 if (Bytes.equals(familyHFilePathsPair.getFirst(), family)) { 286 // Found family already present, just add the path to the existing list 287 familyHFilePathsPair.getSecond().add(pathToHfileFromNS); 288 foundFamily = true; 289 break; 290 } 291 } 292 if (!foundFamily) { 293 // Family not found, add this family and its hfile paths pair to the list 294 addFamilyAndItsHFilePathToTableInMap(family, pathToHfileFromNS, familyHFilePathsList); 295 } 296 } else { 297 // Add this table entry into the map 298 addNewTableEntryInMap(bulkLoadHFileMap, family, pathToHfileFromNS, tableName); 299 } 300 } 301 } 302 } 303 304 private void addFamilyAndItsHFilePathToTableInMap(byte[] family, String pathToHfileFromNS, 305 List<Pair<byte[], List<String>>> familyHFilePathsList) { 306 List<String> hfilePaths = new ArrayList<>(1); 307 hfilePaths.add(pathToHfileFromNS); 308 familyHFilePathsList.add(new Pair<>(family, hfilePaths)); 309 } 310 311 private void addNewTableEntryInMap( 312 final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, byte[] family, 313 String pathToHfileFromNS, String tableName) { 314 List<String> hfilePaths = new ArrayList<>(1); 315 hfilePaths.add(pathToHfileFromNS); 316 Pair<byte[], List<String>> newFamilyHFilePathsPair = new Pair<>(family, hfilePaths); 317 List<Pair<byte[], List<String>>> newFamilyHFilePathsList = new ArrayList<>(); 318 newFamilyHFilePathsList.add(newFamilyHFilePathsPair); 319 bulkLoadHFileMap.put(tableName, newFamilyHFilePathsList); 320 } 321 322 private String getHFilePath(TableName table, BulkLoadDescriptor bld, String storeFile, 323 byte[] family) { 324 return new StringBuilder(100).append(table.getNamespaceAsString()).append(Path.SEPARATOR) 325 .append(table.getQualifierAsString()).append(Path.SEPARATOR) 326 .append(Bytes.toString(bld.getEncodedRegionName().toByteArray())).append(Path.SEPARATOR) 327 .append(Bytes.toString(family)).append(Path.SEPARATOR).append(storeFile).toString(); 328 } 329 330 /** 331 * @param previousCell 332 * @param cell 333 * @return True if we have crossed over onto a new row or type 334 */ 335 private boolean isNewRowOrType(final Cell previousCell, final Cell cell) { 336 return previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() || 337 !CellUtil.matchingRows(previousCell, cell); 338 } 339 340 private java.util.UUID toUUID(final HBaseProtos.UUID uuid) { 341 return new java.util.UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()); 342 } 343 344 /** 345 * Simple helper to a map from key to (a list of) values 346 * TODO: Make a general utility method 347 * @param map 348 * @param key1 349 * @param key2 350 * @param value 351 * @return the list of values corresponding to key1 and key2 352 */ 353 private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2,List<V>>> map, K1 key1, K2 key2, V value) { 354 Map<K2,List<V>> innerMap = map.get(key1); 355 if (innerMap == null) { 356 innerMap = new HashMap<>(); 357 map.put(key1, innerMap); 358 } 359 List<V> values = innerMap.get(key2); 360 if (values == null) { 361 values = new ArrayList<>(); 362 innerMap.put(key2, values); 363 } 364 values.add(value); 365 return values; 366 } 367 368 /** 369 * stop the thread pool executor. It is called when the regionserver is stopped. 370 */ 371 public void stopReplicationSinkServices() { 372 try { 373 if (this.sharedHtableCon != null) { 374 synchronized (sharedHtableConLock) { 375 if (this.sharedHtableCon != null) { 376 this.sharedHtableCon.close(); 377 this.sharedHtableCon = null; 378 } 379 } 380 } 381 } catch (IOException e) { 382 LOG.warn("IOException while closing the connection", e); // ignoring as we are closing. 383 } 384 } 385 386 387 /** 388 * Do the changes and handle the pool 389 * @param tableName table to insert into 390 * @param allRows list of actions 391 * @throws IOException 392 */ 393 protected void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException { 394 if (allRows.isEmpty()) { 395 return; 396 } 397 Table table = null; 398 try { 399 Connection connection = getConnection(); 400 table = connection.getTable(tableName); 401 for (List<Row> rows : allRows) { 402 table.batch(rows, null); 403 } 404 } catch (RetriesExhaustedWithDetailsException rewde) { 405 for (Throwable ex : rewde.getCauses()) { 406 if (ex instanceof TableNotFoundException) { 407 throw new TableNotFoundException("'" + tableName + "'"); 408 } 409 } 410 throw rewde; 411 } catch (InterruptedException ix) { 412 throw (InterruptedIOException) new InterruptedIOException().initCause(ix); 413 } finally { 414 if (table != null) { 415 table.close(); 416 } 417 } 418 } 419 420 private Connection getConnection() throws IOException { 421 // See https://en.wikipedia.org/wiki/Double-checked_locking 422 Connection connection = sharedHtableCon; 423 if (connection == null) { 424 synchronized (sharedHtableConLock) { 425 connection = sharedHtableCon; 426 if (connection == null) { 427 connection = sharedHtableCon = ConnectionFactory.createConnection(conf); 428 } 429 } 430 } 431 return connection; 432 } 433 434 /** 435 * Get a string representation of this sink's metrics 436 * @return string with the total replicated edits count and the date 437 * of the last edit that was applied 438 */ 439 public String getStats() { 440 return this.totalReplicatedEdits.get() == 0 ? "" : "Sink: " + 441 "age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp() + 442 ", total replicated edits: " + this.totalReplicatedEdits; 443 } 444 445 /** 446 * Get replication Sink Metrics 447 * @return MetricsSink 448 */ 449 public MetricsSink getSinkMetrics() { 450 return this.metrics; 451 } 452}