001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.validateMutation; 021import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.Iterator; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.CompletableFuture; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.locks.ReentrantLock; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 038import org.apache.hbase.thirdparty.io.netty.util.Timeout; 039 040/** 041 * The implementation of {@link AsyncBufferedMutator}. Simply wrap an {@link AsyncTable}. 042 */ 043@InterfaceAudience.Private 044class AsyncBufferedMutatorImpl implements AsyncBufferedMutator { 045 046 private static final Logger LOG = LoggerFactory.getLogger(AsyncBufferedMutatorImpl.class); 047 048 private static final int INITIAL_CAPACITY = 100; 049 050 protected static class Batch { 051 final ArrayList<Mutation> toSend; 052 final ArrayList<CompletableFuture<Void>> toComplete; 053 054 Batch(ArrayList<Mutation> toSend, ArrayList<CompletableFuture<Void>> toComplete) { 055 this.toSend = toSend; 056 this.toComplete = toComplete; 057 } 058 } 059 060 private final HashedWheelTimer periodicalFlushTimer; 061 062 private final AsyncTable<?> table; 063 064 private final long writeBufferSize; 065 066 private final long periodicFlushTimeoutNs; 067 068 private final int maxKeyValueSize; 069 070 private final int maxMutations; 071 072 private ArrayList<Mutation> mutations = new ArrayList<>(INITIAL_CAPACITY); 073 074 private ArrayList<CompletableFuture<Void>> futures = new ArrayList<>(INITIAL_CAPACITY); 075 076 private long bufferedSize; 077 078 private volatile boolean closed; 079 080 // Accessed by tests 081 Timeout periodicFlushTask; 082 083 // Accessed by tests 084 final ReentrantLock lock = new ReentrantLock(); 085 086 AsyncBufferedMutatorImpl(HashedWheelTimer periodicalFlushTimer, AsyncTable<?> table, 087 long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize, int maxMutations) { 088 this.periodicalFlushTimer = periodicalFlushTimer; 089 this.table = table; 090 this.writeBufferSize = writeBufferSize; 091 this.periodicFlushTimeoutNs = periodicFlushTimeoutNs; 092 this.maxKeyValueSize = maxKeyValueSize; 093 this.maxMutations = maxMutations; 094 } 095 096 @Override 097 public TableName getName() { 098 return table.getName(); 099 } 100 101 @Override 102 public Configuration getConfiguration() { 103 return table.getConfiguration(); 104 } 105 106 /** 107 * Atomically drains the current buffered mutations and futures under {@link #lock} and prepares 108 * this mutator to accept a new batch. 109 * <p> 110 * The {@link #lock} must be acquired before calling this method. Cancels any pending 111 * {@link #periodicFlushTask} to avoid a redundant flush for the data we are about to send. Swaps 112 * the shared {@link #mutations} and {@link #futures} lists into a returned {@link Batch}, 113 * replaces them with fresh lists, and resets {@link #bufferedSize} to zero. 114 * <p> 115 * If there is nothing buffered, returns {@code null} so callers can skip sending work. 116 * <p> 117 * Protected for being overridden in tests. 118 * @return a {@link Batch} containing drained mutations and futures, or {@code null} if empty 119 */ 120 protected Batch drainBatch() { 121 ArrayList<Mutation> toSend; 122 ArrayList<CompletableFuture<Void>> toComplete; 123 // Cancel the flush task if it is pending. 124 if (periodicFlushTask != null) { 125 periodicFlushTask.cancel(); 126 periodicFlushTask = null; 127 } 128 toSend = this.mutations; 129 if (toSend.isEmpty()) { 130 return null; 131 } 132 toComplete = this.futures; 133 assert toSend.size() == toComplete.size(); 134 this.mutations = new ArrayList<>(INITIAL_CAPACITY); 135 this.futures = new ArrayList<>(INITIAL_CAPACITY); 136 bufferedSize = 0L; 137 return new Batch(toSend, toComplete); 138 } 139 140 /** 141 * Sends a previously drained {@link Batch} and wires the user-visible completion futures to the 142 * underlying results returned by {@link AsyncTable#batch(List)}. 143 * <p> 144 * Preserves the one-to-one, in-order mapping between mutations and their corresponding futures. 145 * @param batch the drained batch to send; may be {@code null} 146 */ 147 private void sendBatch(Batch batch) { 148 if (batch == null) { 149 return; 150 } 151 Iterator<CompletableFuture<Void>> toCompleteIter = batch.toComplete.iterator(); 152 for (CompletableFuture<?> future : table.batch(batch.toSend)) { 153 CompletableFuture<Void> toCompleteFuture = toCompleteIter.next(); 154 addListener(future, (r, e) -> { 155 if (e != null) { 156 toCompleteFuture.completeExceptionally(e); 157 } else { 158 toCompleteFuture.complete(null); 159 } 160 }); 161 } 162 } 163 164 @Override 165 public List<CompletableFuture<Void>> mutate(List<? extends Mutation> mutations) { 166 List<CompletableFuture<Void>> futures = new ArrayList<>(mutations.size()); 167 for (int i = 0, n = mutations.size(); i < n; i++) { 168 futures.add(new CompletableFuture<>()); 169 } 170 if (closed) { 171 IOException ioe = new IOException("Already closed"); 172 futures.forEach(f -> f.completeExceptionally(ioe)); 173 return futures; 174 } 175 long heapSize = 0; 176 for (Mutation mutation : mutations) { 177 heapSize += mutation.heapSize(); 178 validateMutation(mutation, maxKeyValueSize); 179 } 180 Batch batch = null; 181 lock.lock(); 182 try { 183 if (this.mutations.isEmpty() && periodicFlushTimeoutNs > 0) { 184 periodicFlushTask = periodicalFlushTimer.newTimeout(timeout -> { 185 Batch flushBatch = null; 186 lock.lock(); 187 try { 188 // confirm that we are still valid, if there is already a drainBatch call before us, 189 // then we should not execute anymore. And in drainBatch we will set periodicFlush 190 // to null, and since we may schedule a new one, so here we check whether the references 191 // are equal. 192 if (timeout == periodicFlushTask) { 193 periodicFlushTask = null; 194 flushBatch = drainBatch(); // Drains under lock 195 } 196 } finally { 197 lock.unlock(); 198 } 199 if (flushBatch != null) { 200 sendBatch(flushBatch); // Sends outside of lock 201 } 202 }, periodicFlushTimeoutNs, TimeUnit.NANOSECONDS); 203 } 204 this.mutations.addAll(mutations); 205 this.futures.addAll(futures); 206 bufferedSize += heapSize; 207 if (bufferedSize >= writeBufferSize) { 208 LOG.trace("Flushing because write buffer size {} reached", writeBufferSize); 209 // drain now and send after releasing the lock 210 batch = drainBatch(); 211 } else if (maxMutations > 0 && this.mutations.size() >= maxMutations) { 212 LOG.trace("Flushing because max mutations {} reached", maxMutations); 213 batch = drainBatch(); 214 } 215 } finally { 216 lock.unlock(); 217 } 218 // Send outside of lock 219 if (batch != null) { 220 sendBatch(batch); 221 } 222 return futures; 223 } 224 225 // The only difference bewteen flush and close is that, we will set closed to true before sending 226 // out the batch to prevent further flush or close 227 private void flushOrClose(boolean close) { 228 Batch batch = null; 229 if (!closed) { 230 lock.lock(); 231 try { 232 if (!closed) { 233 // Drains under lock 234 batch = drainBatch(); 235 if (close) { 236 closed = true; 237 } 238 } 239 } finally { 240 lock.unlock(); 241 } 242 } 243 // Send the batch 244 if (batch != null) { 245 // Sends outside of lock 246 sendBatch(batch); 247 } 248 } 249 250 @Override 251 public void flush() { 252 flushOrClose(false); 253 } 254 255 @Override 256 public void close() { 257 flushOrClose(true); 258 } 259 260 @Override 261 public long getWriteBufferSize() { 262 return writeBufferSize; 263 } 264 265 @Override 266 public long getPeriodicalFlushTimeout(TimeUnit unit) { 267 return unit.convert(periodicFlushTimeoutNs, TimeUnit.NANOSECONDS); 268 } 269 270 @Override 271 public int getMaxMutations() { 272 return maxMutations; 273 } 274 275 @Override 276 public Map<String, byte[]> getRequestAttributes() { 277 return table.getRequestAttributes(); 278 } 279}