001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePut;
021import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Map;
028import java.util.concurrent.CompletableFuture;
029import java.util.concurrent.TimeUnit;
030import java.util.stream.Collectors;
031import java.util.stream.Stream;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
039import org.apache.hbase.thirdparty.io.netty.util.Timeout;
040
041/**
042 * The implementation of {@link AsyncBufferedMutator}. Simply wrap an {@link AsyncTable}.
043 */
044@InterfaceAudience.Private
045class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
046
047  private static final Logger LOG = LoggerFactory.getLogger(AsyncBufferedMutatorImpl.class);
048
049  private final HashedWheelTimer periodicalFlushTimer;
050
051  private final AsyncTable<?> table;
052
053  private final long writeBufferSize;
054
055  private final long periodicFlushTimeoutNs;
056
057  private final int maxKeyValueSize;
058
059  private final int maxMutations;
060
061  private List<Mutation> mutations = new ArrayList<>();
062
063  private List<CompletableFuture<Void>> futures = new ArrayList<>();
064
065  private long bufferedSize;
066
067  private boolean closed;
068
069  Timeout periodicFlushTask;
070
071  AsyncBufferedMutatorImpl(HashedWheelTimer periodicalFlushTimer, AsyncTable<?> table,
072    long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize, int maxMutations) {
073    this.periodicalFlushTimer = periodicalFlushTimer;
074    this.table = table;
075    this.writeBufferSize = writeBufferSize;
076    this.periodicFlushTimeoutNs = periodicFlushTimeoutNs;
077    this.maxKeyValueSize = maxKeyValueSize;
078    this.maxMutations = maxMutations;
079  }
080
081  @Override
082  public TableName getName() {
083    return table.getName();
084  }
085
086  @Override
087  public Configuration getConfiguration() {
088    return table.getConfiguration();
089  }
090
091  // will be overridden in test
092  protected void internalFlush() {
093    if (periodicFlushTask != null) {
094      periodicFlushTask.cancel();
095      periodicFlushTask = null;
096    }
097    List<Mutation> toSend = this.mutations;
098    if (toSend.isEmpty()) {
099      return;
100    }
101    List<CompletableFuture<Void>> toComplete = this.futures;
102    assert toSend.size() == toComplete.size();
103    this.mutations = new ArrayList<>();
104    this.futures = new ArrayList<>();
105    bufferedSize = 0L;
106    Iterator<CompletableFuture<Void>> toCompleteIter = toComplete.iterator();
107    for (CompletableFuture<?> future : table.batch(toSend)) {
108      CompletableFuture<Void> toCompleteFuture = toCompleteIter.next();
109      addListener(future, (r, e) -> {
110        if (e != null) {
111          toCompleteFuture.completeExceptionally(e);
112        } else {
113          toCompleteFuture.complete(null);
114        }
115      });
116    }
117  }
118
119  @Override
120  public List<CompletableFuture<Void>> mutate(List<? extends Mutation> mutations) {
121    List<CompletableFuture<Void>> futures =
122      Stream.<CompletableFuture<Void>> generate(CompletableFuture::new).limit(mutations.size())
123        .collect(Collectors.toList());
124    long heapSize = 0;
125    for (Mutation mutation : mutations) {
126      heapSize += mutation.heapSize();
127      if (mutation instanceof Put) {
128        validatePut((Put) mutation, maxKeyValueSize);
129      }
130    }
131    synchronized (this) {
132      if (closed) {
133        IOException ioe = new IOException("Already closed");
134        futures.forEach(f -> f.completeExceptionally(ioe));
135        return futures;
136      }
137      if (this.mutations.isEmpty() && periodicFlushTimeoutNs > 0) {
138        periodicFlushTask = periodicalFlushTimer.newTimeout(timeout -> {
139          synchronized (AsyncBufferedMutatorImpl.this) {
140            // confirm that we are still valid, if there is already an internalFlush call before us,
141            // then we should not execute anymore. And in internalFlush we will set periodicFlush
142            // to null, and since we may schedule a new one, so here we check whether the references
143            // are equal.
144            if (timeout == periodicFlushTask) {
145              periodicFlushTask = null;
146              internalFlush();
147            }
148          }
149        }, periodicFlushTimeoutNs, TimeUnit.NANOSECONDS);
150      }
151      this.mutations.addAll(mutations);
152      this.futures.addAll(futures);
153      bufferedSize += heapSize;
154      if (bufferedSize >= writeBufferSize) {
155        LOG.trace("Flushing because write buffer size {} reached", writeBufferSize);
156        internalFlush();
157      } else if (maxMutations > 0 && this.mutations.size() >= maxMutations) {
158        LOG.trace("Flushing because max mutations {} reached", maxMutations);
159        internalFlush();
160      }
161    }
162    return futures;
163  }
164
165  @Override
166  public synchronized void flush() {
167    internalFlush();
168  }
169
170  @Override
171  public synchronized void close() {
172    internalFlush();
173    closed = true;
174  }
175
176  @Override
177  public long getWriteBufferSize() {
178    return writeBufferSize;
179  }
180
181  @Override
182  public long getPeriodicalFlushTimeout(TimeUnit unit) {
183    return unit.convert(periodicFlushTimeoutNs, TimeUnit.NANOSECONDS);
184  }
185
186  @Override
187  public int getMaxMutations() {
188    return maxMutations;
189  }
190
191  @Override
192  public Map<String, byte[]> getRequestAttributes() {
193    return table.getRequestAttributes();
194  }
195}