001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePut;
021import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Map;
028import java.util.concurrent.CompletableFuture;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.locks.ReentrantLock;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
038import org.apache.hbase.thirdparty.io.netty.util.Timeout;
039
040/**
041 * The implementation of {@link AsyncBufferedMutator}. Simply wrap an {@link AsyncTable}.
042 */
043@InterfaceAudience.Private
044class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
045
046  private static final Logger LOG = LoggerFactory.getLogger(AsyncBufferedMutatorImpl.class);
047
048  private static final int INITIAL_CAPACITY = 100;
049
050  protected static class Batch {
051    final ArrayList<Mutation> toSend;
052    final ArrayList<CompletableFuture<Void>> toComplete;
053
054    Batch(ArrayList<Mutation> toSend, ArrayList<CompletableFuture<Void>> toComplete) {
055      this.toSend = toSend;
056      this.toComplete = toComplete;
057    }
058  }
059
060  private final HashedWheelTimer periodicalFlushTimer;
061
062  private final AsyncTable<?> table;
063
064  private final long writeBufferSize;
065
066  private final long periodicFlushTimeoutNs;
067
068  private final int maxKeyValueSize;
069
070  private final int maxMutations;
071
072  private ArrayList<Mutation> mutations = new ArrayList<>(INITIAL_CAPACITY);
073
074  private ArrayList<CompletableFuture<Void>> futures = new ArrayList<>(INITIAL_CAPACITY);
075
076  private long bufferedSize;
077
078  private volatile boolean closed;
079
080  // Accessed by tests
081  Timeout periodicFlushTask;
082
083  // Accessed by tests
084  final ReentrantLock lock = new ReentrantLock();
085
086  AsyncBufferedMutatorImpl(HashedWheelTimer periodicalFlushTimer, AsyncTable<?> table,
087    long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize, int maxMutations) {
088    this.periodicalFlushTimer = periodicalFlushTimer;
089    this.table = table;
090    this.writeBufferSize = writeBufferSize;
091    this.periodicFlushTimeoutNs = periodicFlushTimeoutNs;
092    this.maxKeyValueSize = maxKeyValueSize;
093    this.maxMutations = maxMutations;
094  }
095
096  @Override
097  public TableName getName() {
098    return table.getName();
099  }
100
101  @Override
102  public Configuration getConfiguration() {
103    return table.getConfiguration();
104  }
105
106  /**
107   * Atomically drains the current buffered mutations and futures under {@link #lock} and prepares
108   * this mutator to accept a new batch.
109   * <p>
110   * The {@link #lock} must be acquired before calling this method. Cancels any pending
111   * {@link #periodicFlushTask} to avoid a redundant flush for the data we are about to send. Swaps
112   * the shared {@link #mutations} and {@link #futures} lists into a returned {@link Batch},
113   * replaces them with fresh lists, and resets {@link #bufferedSize} to zero.
114   * <p>
115   * If there is nothing buffered, returns {@code null} so callers can skip sending work.
116   * <p>
117   * Protected for being overridden in tests.
118   * @return a {@link Batch} containing drained mutations and futures, or {@code null} if empty
119   */
120  protected Batch drainBatch() {
121    ArrayList<Mutation> toSend;
122    ArrayList<CompletableFuture<Void>> toComplete;
123    // Cancel the flush task if it is pending.
124    if (periodicFlushTask != null) {
125      periodicFlushTask.cancel();
126      periodicFlushTask = null;
127    }
128    toSend = this.mutations;
129    if (toSend.isEmpty()) {
130      return null;
131    }
132    toComplete = this.futures;
133    assert toSend.size() == toComplete.size();
134    this.mutations = new ArrayList<>(INITIAL_CAPACITY);
135    this.futures = new ArrayList<>(INITIAL_CAPACITY);
136    bufferedSize = 0L;
137    return new Batch(toSend, toComplete);
138  }
139
140  /**
141   * Sends a previously drained {@link Batch} and wires the user-visible completion futures to the
142   * underlying results returned by {@link AsyncTable#batch(List)}.
143   * <p>
144   * Preserves the one-to-one, in-order mapping between mutations and their corresponding futures.
145   * @param batch the drained batch to send; may be {@code null}
146   */
147  private void sendBatch(Batch batch) {
148    if (batch == null) {
149      return;
150    }
151    Iterator<CompletableFuture<Void>> toCompleteIter = batch.toComplete.iterator();
152    for (CompletableFuture<?> future : table.batch(batch.toSend)) {
153      CompletableFuture<Void> toCompleteFuture = toCompleteIter.next();
154      addListener(future, (r, e) -> {
155        if (e != null) {
156          toCompleteFuture.completeExceptionally(e);
157        } else {
158          toCompleteFuture.complete(null);
159        }
160      });
161    }
162  }
163
164  @Override
165  public List<CompletableFuture<Void>> mutate(List<? extends Mutation> mutations) {
166    List<CompletableFuture<Void>> futures = new ArrayList<>(mutations.size());
167    for (int i = 0, n = mutations.size(); i < n; i++) {
168      futures.add(new CompletableFuture<>());
169    }
170    if (closed) {
171      IOException ioe = new IOException("Already closed");
172      futures.forEach(f -> f.completeExceptionally(ioe));
173      return futures;
174    }
175    long heapSize = 0;
176    for (Mutation mutation : mutations) {
177      heapSize += mutation.heapSize();
178      if (mutation instanceof Put) {
179        validatePut((Put) mutation, maxKeyValueSize);
180      }
181    }
182    Batch batch = null;
183    lock.lock();
184    try {
185      if (this.mutations.isEmpty() && periodicFlushTimeoutNs > 0) {
186        periodicFlushTask = periodicalFlushTimer.newTimeout(timeout -> {
187          Batch flushBatch = null;
188          lock.lock();
189          try {
190            // confirm that we are still valid, if there is already a drainBatch call before us,
191            // then we should not execute anymore. And in drainBatch we will set periodicFlush
192            // to null, and since we may schedule a new one, so here we check whether the references
193            // are equal.
194            if (timeout == periodicFlushTask) {
195              periodicFlushTask = null;
196              flushBatch = drainBatch(); // Drains under lock
197            }
198          } finally {
199            lock.unlock();
200          }
201          if (flushBatch != null) {
202            sendBatch(flushBatch); // Sends outside of lock
203          }
204        }, periodicFlushTimeoutNs, TimeUnit.NANOSECONDS);
205      }
206      this.mutations.addAll(mutations);
207      this.futures.addAll(futures);
208      bufferedSize += heapSize;
209      if (bufferedSize >= writeBufferSize) {
210        LOG.trace("Flushing because write buffer size {} reached", writeBufferSize);
211        // drain now and send after releasing the lock
212        batch = drainBatch();
213      } else if (maxMutations > 0 && this.mutations.size() >= maxMutations) {
214        LOG.trace("Flushing because max mutations {} reached", maxMutations);
215        batch = drainBatch();
216      }
217    } finally {
218      lock.unlock();
219    }
220    // Send outside of lock
221    if (batch != null) {
222      sendBatch(batch);
223    }
224    return futures;
225  }
226
227  // The only difference bewteen flush and close is that, we will set closed to true before sending
228  // out the batch to prevent further flush or close
229  private void flushOrClose(boolean close) {
230    Batch batch = null;
231    if (!closed) {
232      lock.lock();
233      try {
234        if (!closed) {
235          // Drains under lock
236          batch = drainBatch();
237          if (close) {
238            closed = true;
239          }
240        }
241      } finally {
242        lock.unlock();
243      }
244    }
245    // Send the batch
246    if (batch != null) {
247      // Sends outside of lock
248      sendBatch(batch);
249    }
250  }
251
252  @Override
253  public void flush() {
254    flushOrClose(false);
255  }
256
257  @Override
258  public void close() {
259    flushOrClose(true);
260  }
261
262  @Override
263  public long getWriteBufferSize() {
264    return writeBufferSize;
265  }
266
267  @Override
268  public long getPeriodicalFlushTimeout(TimeUnit unit) {
269    return unit.convert(periodicFlushTimeoutNs, TimeUnit.NANOSECONDS);
270  }
271
272  @Override
273  public int getMaxMutations() {
274    return maxMutations;
275  }
276
277  @Override
278  public Map<String, byte[]> getRequestAttributes() {
279    return table.getRequestAttributes();
280  }
281}