001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.regionreplication; 019 020import java.util.concurrent.TimeUnit; 021import org.apache.hadoop.conf.Configuration; 022import org.apache.hadoop.hbase.util.Threads; 023import org.apache.yetus.audience.InterfaceAudience; 024 025import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 026import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 027import org.apache.hbase.thirdparty.io.netty.util.Timeout; 028 029/** 030 * A helper class for requesting flush on a given region. 031 * <p/> 032 * In general, we do not want to trigger flush too frequently for a region, so here we will add 033 * something like a rate control, i.e, the interval of the two flush request should not be too 034 * small. 035 */ 036@InterfaceAudience.Private 037class RegionReplicationFlushRequester { 038 039 /** 040 * The timer for executing delayed flush request task. 041 * <p/> 042 * It will be shared across all the instances {@link RegionReplicationFlushRequester}. Created on 043 * demand to save one extra thread as not every user uses region replication. 044 */ 045 private static volatile HashedWheelTimer TIMER; 046 047 /** 048 * The minimum interval between two flush requests 049 */ 050 public static final String MIN_INTERVAL_SECS = 051 "hbase.region.read-replica.sink.flush.min-interval.secs"; 052 053 public static final int MIN_INTERVAL_SECS_DEFAULT = 30; 054 055 private final Runnable flushRequester; 056 057 private final long minIntervalSecs; 058 059 private long lastRequestNanos; 060 061 private long pendingFlushRequestSequenceId; 062 063 private long lastFlushedSequenceId; 064 065 private Timeout pendingFlushRequest; 066 067 RegionReplicationFlushRequester(Configuration conf, Runnable flushRequester) { 068 this.flushRequester = flushRequester; 069 this.minIntervalSecs = conf.getInt(MIN_INTERVAL_SECS, MIN_INTERVAL_SECS_DEFAULT); 070 } 071 072 private static HashedWheelTimer getTimer() { 073 HashedWheelTimer timer = TIMER; 074 if (timer != null) { 075 return timer; 076 } 077 synchronized (RegionReplicationFlushRequester.class) { 078 timer = TIMER; 079 if (timer != null) { 080 return timer; 081 } 082 timer = new HashedWheelTimer( 083 new ThreadFactoryBuilder().setNameFormat("RegionReplicationFlushRequester-Timer-pool-%d") 084 .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), 085 500, TimeUnit.MILLISECONDS); 086 TIMER = timer; 087 } 088 return timer; 089 } 090 091 private void request() { 092 flushRequester.run(); 093 lastRequestNanos = System.nanoTime(); 094 } 095 096 private synchronized void flush(Timeout timeout) { 097 pendingFlushRequest = null; 098 if (pendingFlushRequestSequenceId >= lastFlushedSequenceId) { 099 request(); 100 } 101 } 102 103 /** 104 * Request a flush for the given region. 105 * <p/> 106 * The sequence id of the edit which we fail to replicate. A flush must happen after this sequence 107 * id to recover the failure. 108 */ 109 synchronized void requestFlush(long sequenceId) { 110 // if there is already a flush task, just reuse it. 111 if (pendingFlushRequest != null) { 112 pendingFlushRequestSequenceId = Math.max(sequenceId, pendingFlushRequestSequenceId); 113 return; 114 } 115 // check last flush time 116 long elapsedSecs = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - lastRequestNanos); 117 if (elapsedSecs >= minIntervalSecs) { 118 request(); 119 return; 120 } 121 // schedule a timer task 122 HashedWheelTimer timer = getTimer(); 123 pendingFlushRequestSequenceId = sequenceId; 124 pendingFlushRequest = 125 timer.newTimeout(this::flush, minIntervalSecs - elapsedSecs, TimeUnit.SECONDS); 126 } 127 128 /** 129 * Record that we have already finished a flush with the given {@code sequenceId}. 130 * <p/> 131 * We can cancel the pending flush request if the failed sequence id is less than the given 132 * {@code sequenceId}. 133 */ 134 synchronized void recordFlush(long sequenceId) { 135 this.lastFlushedSequenceId = sequenceId; 136 // cancel the pending flush request if it is necessary, i.e, we have already finished a flush 137 // with higher sequence id. 138 if (sequenceId > pendingFlushRequestSequenceId && pendingFlushRequest != null) { 139 pendingFlushRequest.cancel(); 140 pendingFlushRequest = null; 141 } 142 } 143}