001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePut;
021import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Iterator;
026import java.util.List;
027import java.util.concurrent.CompletableFuture;
028import java.util.concurrent.TimeUnit;
029import java.util.stream.Collectors;
030import java.util.stream.Stream;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.yetus.audience.InterfaceAudience;
034
035import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
036import org.apache.hbase.thirdparty.io.netty.util.Timeout;
037
038/**
039 * The implementation of {@link AsyncBufferedMutator}. Simply wrap an {@link AsyncTable}.
040 */
041@InterfaceAudience.Private
042class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
043
044  private final HashedWheelTimer periodicalFlushTimer;
045
046  private final AsyncTable<?> table;
047
048  private final long writeBufferSize;
049
050  private final long periodicFlushTimeoutNs;
051
052  private final int maxKeyValueSize;
053
054  private List<Mutation> mutations = new ArrayList<>();
055
056  private List<CompletableFuture<Void>> futures = new ArrayList<>();
057
058  private long bufferedSize;
059
060  private boolean closed;
061
062  Timeout periodicFlushTask;
063
064  AsyncBufferedMutatorImpl(HashedWheelTimer periodicalFlushTimer, AsyncTable<?> table,
065    long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize) {
066    this.periodicalFlushTimer = periodicalFlushTimer;
067    this.table = table;
068    this.writeBufferSize = writeBufferSize;
069    this.periodicFlushTimeoutNs = periodicFlushTimeoutNs;
070    this.maxKeyValueSize = maxKeyValueSize;
071  }
072
073  @Override
074  public TableName getName() {
075    return table.getName();
076  }
077
078  @Override
079  public Configuration getConfiguration() {
080    return table.getConfiguration();
081  }
082
083  // will be overridden in test
084  protected void internalFlush() {
085    if (periodicFlushTask != null) {
086      periodicFlushTask.cancel();
087      periodicFlushTask = null;
088    }
089    List<Mutation> toSend = this.mutations;
090    if (toSend.isEmpty()) {
091      return;
092    }
093    List<CompletableFuture<Void>> toComplete = this.futures;
094    assert toSend.size() == toComplete.size();
095    this.mutations = new ArrayList<>();
096    this.futures = new ArrayList<>();
097    bufferedSize = 0L;
098    Iterator<CompletableFuture<Void>> toCompleteIter = toComplete.iterator();
099    for (CompletableFuture<?> future : table.batch(toSend)) {
100      CompletableFuture<Void> toCompleteFuture = toCompleteIter.next();
101      addListener(future, (r, e) -> {
102        if (e != null) {
103          toCompleteFuture.completeExceptionally(e);
104        } else {
105          toCompleteFuture.complete(null);
106        }
107      });
108    }
109  }
110
111  @Override
112  public List<CompletableFuture<Void>> mutate(List<? extends Mutation> mutations) {
113    List<CompletableFuture<Void>> futures =
114      Stream.<CompletableFuture<Void>> generate(CompletableFuture::new).limit(mutations.size())
115        .collect(Collectors.toList());
116    long heapSize = 0;
117    for (Mutation mutation : mutations) {
118      heapSize += mutation.heapSize();
119      if (mutation instanceof Put) {
120        validatePut((Put) mutation, maxKeyValueSize);
121      }
122    }
123    synchronized (this) {
124      if (closed) {
125        IOException ioe = new IOException("Already closed");
126        futures.forEach(f -> f.completeExceptionally(ioe));
127        return futures;
128      }
129      if (this.mutations.isEmpty() && periodicFlushTimeoutNs > 0) {
130        periodicFlushTask = periodicalFlushTimer.newTimeout(timeout -> {
131          synchronized (AsyncBufferedMutatorImpl.this) {
132            // confirm that we are still valid, if there is already an internalFlush call before us,
133            // then we should not execute any more. And in internalFlush we will set periodicFlush
134            // to null, and since we may schedule a new one, so here we check whether the references
135            // are equal.
136            if (timeout == periodicFlushTask) {
137              periodicFlushTask = null;
138              internalFlush();
139            }
140          }
141        }, periodicFlushTimeoutNs, TimeUnit.NANOSECONDS);
142      }
143      this.mutations.addAll(mutations);
144      this.futures.addAll(futures);
145      bufferedSize += heapSize;
146      if (bufferedSize >= writeBufferSize) {
147        internalFlush();
148      }
149    }
150    return futures;
151  }
152
153  @Override
154  public synchronized void flush() {
155    internalFlush();
156  }
157
158  @Override
159  public synchronized void close() {
160    internalFlush();
161    closed = true;
162  }
163
164  @Override
165  public long getWriteBufferSize() {
166    return writeBufferSize;
167  }
168
169  @Override
170  public long getPeriodicalFlushTimeout(TimeUnit unit) {
171    return unit.convert(periodicFlushTimeoutNs, TimeUnit.NANOSECONDS);
172  }
173}