001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.regionreplication; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.IOException; 022import java.util.ArrayDeque; 023import java.util.ArrayList; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import java.util.Optional; 028import java.util.Queue; 029import java.util.Set; 030import java.util.TreeSet; 031import java.util.concurrent.TimeUnit; 032import java.util.concurrent.atomic.AtomicInteger; 033import java.util.stream.Collectors; 034import org.agrona.collections.IntHashSet; 035import org.apache.commons.lang3.mutable.MutableObject; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.CellUtil; 039import org.apache.hadoop.hbase.client.AsyncClusterConnection; 040import org.apache.hadoop.hbase.client.RegionInfo; 041import org.apache.hadoop.hbase.client.RegionReplicaUtil; 042import org.apache.hadoop.hbase.client.TableDescriptor; 043import org.apache.hadoop.hbase.ipc.ServerCall; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.apache.hadoop.hbase.util.FutureUtils; 046import org.apache.hadoop.hbase.wal.WAL; 047import org.apache.hadoop.hbase.wal.WALEdit; 048import org.apache.hadoop.hbase.wal.WALKeyImpl; 049import org.apache.hadoop.util.StringUtils; 050import org.apache.yetus.audience.InterfaceAudience; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 055 056import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; 057import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; 058 059/** 060 * The class for replicating WAL edits to secondary replicas, one instance per region. 061 */ 062@InterfaceAudience.Private 063public class RegionReplicationSink { 064 065 private static final Logger LOG = LoggerFactory.getLogger(RegionReplicationSink.class); 066 067 public static final String RETRIES_NUMBER = "hbase.region.read-replica.sink.retries.number"; 068 069 public static final int RETRIES_NUMBER_DEFAULT = 3; 070 071 public static final String RPC_TIMEOUT_MS = "hbase.region.read-replica.sink.rpc.timeout.ms"; 072 073 public static final long RPC_TIMEOUT_MS_DEFAULT = 1000; 074 075 public static final String OPERATION_TIMEOUT_MS = 076 "hbase.region.read-replica.sink.operation.timeout.ms"; 077 078 public static final long OPERATION_TIMEOUT_MS_DEFAULT = 5000; 079 080 // the two options below are for replicating meta edits, as usually a meta edit will trigger a 081 // refreshStoreFiles call at remote side so it will likely to spend more time. And also a meta 082 // edit is more important for fixing inconsistent state so it worth to wait for more time. 083 public static final String META_EDIT_RPC_TIMEOUT_MS = 084 "hbase.region.read-replica.sink.meta-edit.rpc.timeout.ms"; 085 086 public static final long META_EDIT_RPC_TIMEOUT_MS_DEFAULT = 15000; 087 088 public static final String META_EDIT_OPERATION_TIMEOUT_MS = 089 "hbase.region.read-replica.sink.meta-edit.operation.timeout.ms"; 090 091 public static final long META_EDIT_OPERATION_TIMEOUT_MS_DEFAULT = 60000; 092 093 public static final String BATCH_SIZE_CAPACITY = "hbase.region.read-replica.sink.size.capacity"; 094 095 public static final long BATCH_SIZE_CAPACITY_DEFAULT = 1024L * 1024; 096 097 public static final String BATCH_COUNT_CAPACITY = "hbase.region.read-replica.sink.nb.capacity"; 098 099 public static final int BATCH_COUNT_CAPACITY_DEFAULT = 100; 100 101 private static final class SinkEntry { 102 103 final WALKeyImpl key; 104 105 final WALEdit edit; 106 107 final ServerCall<?> rpcCall; 108 109 final long size; 110 111 SinkEntry(WALKeyImpl key, WALEdit edit, ServerCall<?> rpcCall) { 112 this.key = key; 113 this.edit = edit; 114 this.rpcCall = rpcCall; 115 this.size = key.estimatedSerializedSizeOf() + edit.estimatedSerializedSizeOf(); 116 if (rpcCall != null) { 117 // increase the reference count to avoid the rpc framework free the memory before we 118 // actually sending them out. 119 rpcCall.retainByWAL(); 120 } 121 } 122 123 /** 124 * Should be called regardless of the result of the replicating operation. Unless you still want 125 * to reuse this entry, otherwise you must call this method to release the possible off heap 126 * memories. 127 */ 128 void replicated() { 129 if (rpcCall != null) { 130 rpcCall.releaseByWAL(); 131 } 132 } 133 } 134 135 private final RegionInfo primary; 136 137 private final TableDescriptor tableDesc; 138 139 // store it here to avoid passing it every time when calling TableDescriptor.getRegionReplication. 140 private final int regionReplication; 141 142 private final RegionReplicationBufferManager manager; 143 144 private final RegionReplicationFlushRequester flushRequester; 145 146 private final AsyncClusterConnection conn; 147 148 // used to track the replicas which we failed to replicate edits to them 149 // the key is the replica id, the value is the sequence id of the last failed edit 150 // when we get a flush all request, we will try to remove a replica from this map, the key point 151 // here is the flush sequence number must be greater than the failed sequence id, otherwise we 152 // should not remove the replica from this map 153 private final IntHashSet failedReplicas; 154 155 private final Queue<SinkEntry> entries = new ArrayDeque<>(); 156 157 private final int retries; 158 159 private final long rpcTimeoutNs; 160 161 private final long operationTimeoutNs; 162 163 private final long metaEditRpcTimeoutNs; 164 165 private final long metaEditOperationTimeoutNs; 166 167 private final long batchSizeCapacity; 168 169 private final long batchCountCapacity; 170 171 private volatile long pendingSize; 172 173 private long lastFlushedSequenceId; 174 175 private boolean sending; 176 177 private boolean stopping; 178 179 private boolean stopped; 180 181 public RegionReplicationSink(Configuration conf, RegionInfo primary, TableDescriptor td, 182 RegionReplicationBufferManager manager, Runnable flushRequester, AsyncClusterConnection conn) { 183 Preconditions.checkArgument(RegionReplicaUtil.isDefaultReplica(primary), "%s is not primary", 184 primary); 185 this.regionReplication = td.getRegionReplication(); 186 Preconditions.checkArgument(this.regionReplication > 1, 187 "region replication should be greater than 1 but got %s", this.regionReplication); 188 this.primary = primary; 189 this.tableDesc = td; 190 this.manager = manager; 191 this.flushRequester = new RegionReplicationFlushRequester(conf, flushRequester); 192 this.conn = conn; 193 this.retries = conf.getInt(RETRIES_NUMBER, RETRIES_NUMBER_DEFAULT); 194 this.rpcTimeoutNs = 195 TimeUnit.MILLISECONDS.toNanos(conf.getLong(RPC_TIMEOUT_MS, RPC_TIMEOUT_MS_DEFAULT)); 196 this.operationTimeoutNs = TimeUnit.MILLISECONDS 197 .toNanos(conf.getLong(OPERATION_TIMEOUT_MS, OPERATION_TIMEOUT_MS_DEFAULT)); 198 this.metaEditRpcTimeoutNs = TimeUnit.MILLISECONDS 199 .toNanos(conf.getLong(META_EDIT_RPC_TIMEOUT_MS, META_EDIT_RPC_TIMEOUT_MS_DEFAULT)); 200 this.metaEditOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos( 201 conf.getLong(META_EDIT_OPERATION_TIMEOUT_MS, META_EDIT_OPERATION_TIMEOUT_MS_DEFAULT)); 202 this.batchSizeCapacity = conf.getLong(BATCH_SIZE_CAPACITY, BATCH_SIZE_CAPACITY_DEFAULT); 203 this.batchCountCapacity = conf.getInt(BATCH_COUNT_CAPACITY, BATCH_COUNT_CAPACITY_DEFAULT); 204 this.failedReplicas = new IntHashSet(regionReplication - 1); 205 } 206 207 void onComplete(List<SinkEntry> sent, Map<Integer, MutableObject<Throwable>> replica2Error) { 208 long maxSequenceId = Long.MIN_VALUE; 209 long toReleaseSize = 0; 210 for (SinkEntry entry : sent) { 211 maxSequenceId = Math.max(maxSequenceId, entry.key.getSequenceId()); 212 entry.replicated(); 213 toReleaseSize += entry.size; 214 } 215 manager.decrease(toReleaseSize); 216 synchronized (entries) { 217 pendingSize -= toReleaseSize; 218 boolean addFailedReplicas = false; 219 for (Map.Entry<Integer, MutableObject<Throwable>> entry : replica2Error.entrySet()) { 220 Integer replicaId = entry.getKey(); 221 Throwable error = entry.getValue().getValue(); 222 if (error != null) { 223 if (maxSequenceId > lastFlushedSequenceId) { 224 LOG.warn( 225 "Failed to replicate to secondary replica {} for {}, since the max sequence" 226 + " id of sunk entris is {}, which is greater than the last flush SN {}," 227 + " we will stop replicating for a while and trigger a flush", 228 replicaId, primary, maxSequenceId, lastFlushedSequenceId, error); 229 failedReplicas.add(replicaId); 230 addFailedReplicas = true; 231 } else { 232 LOG.warn( 233 "Failed to replicate to secondary replica {} for {}, since the max sequence" 234 + " id of sunk entris is {}, which is less than or equal to the last flush SN {}," 235 + " we will not stop replicating", 236 replicaId, primary, maxSequenceId, lastFlushedSequenceId, error); 237 } 238 } 239 } 240 241 if (addFailedReplicas) { 242 flushRequester.requestFlush(maxSequenceId); 243 } 244 sending = false; 245 if (stopping) { 246 stopped = true; 247 entries.notifyAll(); 248 return; 249 } 250 if (!entries.isEmpty()) { 251 send(); 252 } 253 } 254 } 255 256 private void send() { 257 List<SinkEntry> toSend = new ArrayList<>(); 258 long totalSize = 0L; 259 boolean hasMetaEdit = false; 260 for (SinkEntry entry;;) { 261 entry = entries.poll(); 262 if (entry == null) { 263 break; 264 } 265 toSend.add(entry); 266 totalSize += entry.size; 267 hasMetaEdit |= entry.edit.isMetaEdit(); 268 if (toSend.size() >= batchCountCapacity || totalSize >= batchSizeCapacity) { 269 break; 270 } 271 } 272 int toSendReplicaCount = regionReplication - 1 - failedReplicas.size(); 273 if (toSendReplicaCount <= 0) { 274 return; 275 } 276 long rpcTimeoutNsToUse; 277 long operationTimeoutNsToUse; 278 if (!hasMetaEdit) { 279 rpcTimeoutNsToUse = rpcTimeoutNs; 280 operationTimeoutNsToUse = operationTimeoutNs; 281 } else { 282 rpcTimeoutNsToUse = metaEditRpcTimeoutNs; 283 operationTimeoutNsToUse = metaEditOperationTimeoutNs; 284 } 285 sending = true; 286 List<WAL.Entry> walEntries = 287 toSend.stream().map(e -> new WAL.Entry(e.key, e.edit)).collect(Collectors.toList()); 288 AtomicInteger remaining = new AtomicInteger(toSendReplicaCount); 289 Map<Integer, MutableObject<Throwable>> replica2Error = new HashMap<>(); 290 for (int replicaId = 1; replicaId < regionReplication; replicaId++) { 291 if (failedReplicas.contains(replicaId)) { 292 continue; 293 } 294 MutableObject<Throwable> error = new MutableObject<>(); 295 replica2Error.put(replicaId, error); 296 RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(primary, replicaId); 297 FutureUtils.addListener( 298 conn.replicate(replica, walEntries, retries, rpcTimeoutNsToUse, operationTimeoutNsToUse), 299 (r, e) -> { 300 error.setValue(e); 301 if (remaining.decrementAndGet() == 0) { 302 onComplete(toSend, replica2Error); 303 } 304 }); 305 } 306 } 307 308 private boolean isStartFlushAllStores(FlushDescriptor flushDesc) { 309 if (flushDesc.getAction() == FlushAction.CANNOT_FLUSH) { 310 // this means the memstore is empty, which means all data before this sequence id are flushed 311 // out, so it equals to a flush all, return true 312 return true; 313 } 314 if (flushDesc.getAction() != FlushAction.START_FLUSH) { 315 return false; 316 } 317 Set<byte[]> storesFlushed = 318 flushDesc.getStoreFlushesList().stream().map(sfd -> sfd.getFamilyName().toByteArray()) 319 .collect(Collectors.toCollection(() -> new TreeSet<>(Bytes.BYTES_COMPARATOR))); 320 if (storesFlushed.size() != tableDesc.getColumnFamilyCount()) { 321 return false; 322 } 323 return storesFlushed.containsAll(tableDesc.getColumnFamilyNames()); 324 } 325 326 Optional<FlushDescriptor> getStartFlushAllDescriptor(Cell metaCell) { 327 if (!CellUtil.matchingFamily(metaCell, WALEdit.METAFAMILY)) { 328 return Optional.empty(); 329 } 330 FlushDescriptor flushDesc; 331 try { 332 flushDesc = WALEdit.getFlushDescriptor(metaCell); 333 } catch (IOException e) { 334 LOG.warn("Failed to parse FlushDescriptor from {}", metaCell); 335 return Optional.empty(); 336 } 337 if (flushDesc != null && isStartFlushAllStores(flushDesc)) { 338 return Optional.of(flushDesc); 339 } else { 340 return Optional.empty(); 341 } 342 } 343 344 private long clearAllEntries() { 345 long toClearSize = 0; 346 for (SinkEntry entry : entries) { 347 toClearSize += entry.size; 348 entry.replicated(); 349 } 350 entries.clear(); 351 pendingSize -= toClearSize; 352 manager.decrease(toClearSize); 353 return toClearSize; 354 } 355 356 /** 357 * Add this edit to replication queue. 358 * <p/> 359 * The {@code rpcCall} is for retaining the cells if the edit is built within an rpc call and the 360 * rpc call has cell scanner, which is off heap. 361 */ 362 public void add(WALKeyImpl key, WALEdit edit, ServerCall<?> rpcCall) { 363 if (!tableDesc.hasRegionMemStoreReplication() && !edit.isMetaEdit()) { 364 // only replicate meta edit if region memstore replication is not enabled 365 return; 366 } 367 synchronized (entries) { 368 if (stopping) { 369 return; 370 } 371 if (edit.isMetaEdit()) { 372 // check whether we flushed all stores, which means we could drop all the previous edits, 373 // and also, recover from the previous failure of some replicas 374 for (Cell metaCell : edit.getCells()) { 375 getStartFlushAllDescriptor(metaCell).ifPresent(flushDesc -> { 376 long flushSequenceNumber = flushDesc.getFlushSequenceNumber(); 377 lastFlushedSequenceId = flushSequenceNumber; 378 long clearedCount = entries.size(); 379 long clearedSize = clearAllEntries(); 380 if (LOG.isDebugEnabled()) { 381 LOG.debug( 382 "Got a flush all request with sequence id {}, clear {} pending" 383 + " entries with size {}, clear failed replicas {}", 384 flushSequenceNumber, clearedCount, 385 StringUtils.TraditionalBinaryPrefix.long2String(clearedSize, "", 1), 386 failedReplicas); 387 } 388 failedReplicas.clear(); 389 flushRequester.recordFlush(flushSequenceNumber); 390 }); 391 } 392 } 393 if (failedReplicas.size() == regionReplication - 1) { 394 // this means we have marked all the replicas as failed, so just give up here 395 return; 396 } 397 SinkEntry entry = new SinkEntry(key, edit, rpcCall); 398 entries.add(entry); 399 pendingSize += entry.size; 400 if (manager.increase(entry.size)) { 401 if (!sending) { 402 send(); 403 } 404 } else { 405 // we have run out of the max pending size, drop all the edits, and mark all replicas as 406 // failed 407 clearAllEntries(); 408 for (int replicaId = 1; replicaId < regionReplication; replicaId++) { 409 failedReplicas.add(replicaId); 410 } 411 flushRequester.requestFlush(entry.key.getSequenceId()); 412 } 413 } 414 } 415 416 long pendingSize() { 417 return pendingSize; 418 } 419 420 /** 421 * Stop the replication sink. 422 * <p/> 423 * Usually this should only be called when you want to close a region. 424 */ 425 public void stop() { 426 synchronized (entries) { 427 stopping = true; 428 clearAllEntries(); 429 if (!sending) { 430 stopped = true; 431 entries.notifyAll(); 432 } 433 } 434 } 435 436 /** 437 * Make sure that we have finished all the replicating requests. 438 * <p/> 439 * After returning, we can make sure there will be no new replicating requests to secondary 440 * replicas. 441 * <p/> 442 * This is used to keep the replicating order the same with the WAL edit order when writing. 443 */ 444 public void waitUntilStopped() throws InterruptedException { 445 synchronized (entries) { 446 while (!stopped) { 447 entries.wait(); 448 } 449 } 450 } 451 452 @RestrictedApi(explanation = "Should only be called in tests", link = "", 453 allowedOnPath = ".*/src/test/.*") 454 IntHashSet getFailedReplicas() { 455 synchronized (entries) { 456 return this.failedReplicas; 457 } 458 } 459}