001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePut; 021import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.Iterator; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.CompletableFuture; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.locks.ReentrantLock; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 038import org.apache.hbase.thirdparty.io.netty.util.Timeout; 039 040/** 041 * The implementation of {@link AsyncBufferedMutator}. Simply wrap an {@link AsyncTable}. 042 */ 043@InterfaceAudience.Private 044class AsyncBufferedMutatorImpl implements AsyncBufferedMutator { 045 046 private static final Logger LOG = LoggerFactory.getLogger(AsyncBufferedMutatorImpl.class); 047 048 private static final int INITIAL_CAPACITY = 100; 049 050 protected static class Batch { 051 final ArrayList<Mutation> toSend; 052 final ArrayList<CompletableFuture<Void>> toComplete; 053 054 Batch(ArrayList<Mutation> toSend, ArrayList<CompletableFuture<Void>> toComplete) { 055 this.toSend = toSend; 056 this.toComplete = toComplete; 057 } 058 } 059 060 private final HashedWheelTimer periodicalFlushTimer; 061 062 private final AsyncTable<?> table; 063 064 private final long writeBufferSize; 065 066 private final long periodicFlushTimeoutNs; 067 068 private final int maxKeyValueSize; 069 070 private final int maxMutations; 071 072 private ArrayList<Mutation> mutations = new ArrayList<>(INITIAL_CAPACITY); 073 074 private ArrayList<CompletableFuture<Void>> futures = new ArrayList<>(INITIAL_CAPACITY); 075 076 private long bufferedSize; 077 078 private volatile boolean closed; 079 080 // Accessed by tests 081 Timeout periodicFlushTask; 082 083 // Accessed by tests 084 final ReentrantLock lock = new ReentrantLock(); 085 086 AsyncBufferedMutatorImpl(HashedWheelTimer periodicalFlushTimer, AsyncTable<?> table, 087 long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize, int maxMutations) { 088 this.periodicalFlushTimer = periodicalFlushTimer; 089 this.table = table; 090 this.writeBufferSize = writeBufferSize; 091 this.periodicFlushTimeoutNs = periodicFlushTimeoutNs; 092 this.maxKeyValueSize = maxKeyValueSize; 093 this.maxMutations = maxMutations; 094 } 095 096 @Override 097 public TableName getName() { 098 return table.getName(); 099 } 100 101 @Override 102 public Configuration getConfiguration() { 103 return table.getConfiguration(); 104 } 105 106 /** 107 * Atomically drains the current buffered mutations and futures under {@link #lock} and prepares 108 * this mutator to accept a new batch. 109 * <p> 110 * The {@link #lock} must be acquired before calling this method. Cancels any pending 111 * {@link #periodicFlushTask} to avoid a redundant flush for the data we are about to send. Swaps 112 * the shared {@link #mutations} and {@link #futures} lists into a returned {@link Batch}, 113 * replaces them with fresh lists, and resets {@link #bufferedSize} to zero. 114 * <p> 115 * If there is nothing buffered, returns {@code null} so callers can skip sending work. 116 * <p> 117 * Protected for being overridden in tests. 118 * @return a {@link Batch} containing drained mutations and futures, or {@code null} if empty 119 */ 120 protected Batch drainBatch() { 121 ArrayList<Mutation> toSend; 122 ArrayList<CompletableFuture<Void>> toComplete; 123 // Cancel the flush task if it is pending. 124 if (periodicFlushTask != null) { 125 periodicFlushTask.cancel(); 126 periodicFlushTask = null; 127 } 128 toSend = this.mutations; 129 if (toSend.isEmpty()) { 130 return null; 131 } 132 toComplete = this.futures; 133 assert toSend.size() == toComplete.size(); 134 this.mutations = new ArrayList<>(INITIAL_CAPACITY); 135 this.futures = new ArrayList<>(INITIAL_CAPACITY); 136 bufferedSize = 0L; 137 return new Batch(toSend, toComplete); 138 } 139 140 /** 141 * Sends a previously drained {@link Batch} and wires the user-visible completion futures to the 142 * underlying results returned by {@link AsyncTable#batch(List)}. 143 * <p> 144 * Preserves the one-to-one, in-order mapping between mutations and their corresponding futures. 145 * @param batch the drained batch to send; may be {@code null} 146 */ 147 private void sendBatch(Batch batch) { 148 if (batch == null) { 149 return; 150 } 151 Iterator<CompletableFuture<Void>> toCompleteIter = batch.toComplete.iterator(); 152 for (CompletableFuture<?> future : table.batch(batch.toSend)) { 153 CompletableFuture<Void> toCompleteFuture = toCompleteIter.next(); 154 addListener(future, (r, e) -> { 155 if (e != null) { 156 toCompleteFuture.completeExceptionally(e); 157 } else { 158 toCompleteFuture.complete(null); 159 } 160 }); 161 } 162 } 163 164 @Override 165 public List<CompletableFuture<Void>> mutate(List<? extends Mutation> mutations) { 166 List<CompletableFuture<Void>> futures = new ArrayList<>(mutations.size()); 167 for (int i = 0, n = mutations.size(); i < n; i++) { 168 futures.add(new CompletableFuture<>()); 169 } 170 if (closed) { 171 IOException ioe = new IOException("Already closed"); 172 futures.forEach(f -> f.completeExceptionally(ioe)); 173 return futures; 174 } 175 long heapSize = 0; 176 for (Mutation mutation : mutations) { 177 heapSize += mutation.heapSize(); 178 if (mutation instanceof Put) { 179 validatePut((Put) mutation, maxKeyValueSize); 180 } 181 } 182 Batch batch = null; 183 lock.lock(); 184 try { 185 if (this.mutations.isEmpty() && periodicFlushTimeoutNs > 0) { 186 periodicFlushTask = periodicalFlushTimer.newTimeout(timeout -> { 187 Batch flushBatch = null; 188 lock.lock(); 189 try { 190 // confirm that we are still valid, if there is already a drainBatch call before us, 191 // then we should not execute anymore. And in drainBatch we will set periodicFlush 192 // to null, and since we may schedule a new one, so here we check whether the references 193 // are equal. 194 if (timeout == periodicFlushTask) { 195 periodicFlushTask = null; 196 flushBatch = drainBatch(); // Drains under lock 197 } 198 } finally { 199 lock.unlock(); 200 } 201 if (flushBatch != null) { 202 sendBatch(flushBatch); // Sends outside of lock 203 } 204 }, periodicFlushTimeoutNs, TimeUnit.NANOSECONDS); 205 } 206 this.mutations.addAll(mutations); 207 this.futures.addAll(futures); 208 bufferedSize += heapSize; 209 if (bufferedSize >= writeBufferSize) { 210 LOG.trace("Flushing because write buffer size {} reached", writeBufferSize); 211 // drain now and send after releasing the lock 212 batch = drainBatch(); 213 } else if (maxMutations > 0 && this.mutations.size() >= maxMutations) { 214 LOG.trace("Flushing because max mutations {} reached", maxMutations); 215 batch = drainBatch(); 216 } 217 } finally { 218 lock.unlock(); 219 } 220 // Send outside of lock 221 if (batch != null) { 222 sendBatch(batch); 223 } 224 return futures; 225 } 226 227 // The only difference bewteen flush and close is that, we will set closed to true before sending 228 // out the batch to prevent further flush or close 229 private void flushOrClose(boolean close) { 230 Batch batch = null; 231 if (!closed) { 232 lock.lock(); 233 try { 234 if (!closed) { 235 // Drains under lock 236 batch = drainBatch(); 237 if (close) { 238 closed = true; 239 } 240 } 241 } finally { 242 lock.unlock(); 243 } 244 } 245 // Send the batch 246 if (batch != null) { 247 // Sends outside of lock 248 sendBatch(batch); 249 } 250 } 251 252 @Override 253 public void flush() { 254 flushOrClose(false); 255 } 256 257 @Override 258 public void close() { 259 flushOrClose(true); 260 } 261 262 @Override 263 public long getWriteBufferSize() { 264 return writeBufferSize; 265 } 266 267 @Override 268 public long getPeriodicalFlushTimeout(TimeUnit unit) { 269 return unit.convert(periodicFlushTimeoutNs, TimeUnit.NANOSECONDS); 270 } 271 272 @Override 273 public int getMaxMutations() { 274 return maxMutations; 275 } 276 277 @Override 278 public Map<String, byte[]> getRequestAttributes() { 279 return table.getRequestAttributes(); 280 } 281}