001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
004 * agreements. See the NOTICE file distributed with this work for additional information regarding
005 * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance with the License. You may obtain a
007 * copy of the License at
008 *
009 *  http://www.apache.org/licenses/LICENSE-2.0
010 *
011 *  Unless required by applicable law or agreed to in writing, software distributed under the
012 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
013 * express or implied. See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.apache.hadoop.hbase.client;
017
018import static org.apache.hadoop.hbase.client.BufferedMutatorParams.UNSET;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.util.ArrayList;
024import java.util.Collections;
025import java.util.Iterator;
026import java.util.List;
027import java.util.NoSuchElementException;
028import java.util.Timer;
029import java.util.TimerTask;
030import java.util.concurrent.ConcurrentLinkedQueue;
031import java.util.concurrent.ExecutorService;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicInteger;
034import java.util.concurrent.atomic.AtomicLong;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.apache.yetus.audience.InterfaceStability;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
044
045/**
046 * <p>
047 * Used to communicate with a single HBase table similar to {@link Table}
048 * but meant for batched, potentially asynchronous puts. Obtain an instance from
049 * a {@link Connection} and call {@link #close()} afterwards. Provide an alternate
050 * to this implementation by setting {@link BufferedMutatorParams#implementationClassName(String)}
051 * or by setting alternate classname via the key {} in Configuration.
052 * </p>
053 *
054 * <p>
055 * While this can be used across threads, great care should be used when doing so.
056 * Errors are global to the buffered mutator and the Exceptions can be thrown on any
057 * thread that causes the flush for requests.
058 * </p>
059 *
060 * @see ConnectionFactory
061 * @see Connection
062 * @since 1.0.0
063 */
064@InterfaceAudience.Private
065@InterfaceStability.Evolving
066public class BufferedMutatorImpl implements BufferedMutator {
067
068  private static final Logger LOG = LoggerFactory.getLogger(BufferedMutatorImpl.class);
069
070  private final ExceptionListener listener;
071
072  private final TableName tableName;
073
074  private final Configuration conf;
075  private final ConcurrentLinkedQueue<Mutation> writeAsyncBuffer = new ConcurrentLinkedQueue<>();
076  private final AtomicLong currentWriteBufferSize = new AtomicLong(0);
077  /**
078   * Count the size of {@link BufferedMutatorImpl#writeAsyncBuffer}.
079   * The {@link ConcurrentLinkedQueue#size()} is NOT a constant-time operation.
080   */
081  private final AtomicInteger undealtMutationCount = new AtomicInteger(0);
082  private final long writeBufferSize;
083
084  private final AtomicLong writeBufferPeriodicFlushTimeoutMs = new AtomicLong(0);
085  private final AtomicLong writeBufferPeriodicFlushTimerTickMs =
086          new AtomicLong(MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
087  private Timer writeBufferPeriodicFlushTimer = null;
088
089  private final int maxKeyValueSize;
090  private final ExecutorService pool;
091  private final AtomicInteger rpcTimeout;
092  private final AtomicInteger operationTimeout;
093  private final boolean cleanupPoolOnClose;
094  private volatile boolean closed = false;
095  private final AsyncProcess ap;
096
097  @VisibleForTesting
098  BufferedMutatorImpl(ClusterConnection conn, BufferedMutatorParams params, AsyncProcess ap) {
099    if (conn == null || conn.isClosed()) {
100      throw new IllegalArgumentException("Connection is null or closed.");
101    }
102    this.tableName = params.getTableName();
103    this.conf = conn.getConfiguration();
104    this.listener = params.getListener();
105    if (params.getPool() == null) {
106      this.pool = HTable.getDefaultExecutor(conf);
107      cleanupPoolOnClose = true;
108    } else {
109      this.pool = params.getPool();
110      cleanupPoolOnClose = false;
111    }
112    ConnectionConfiguration tableConf = new ConnectionConfiguration(conf);
113    this.writeBufferSize =
114            params.getWriteBufferSize() != UNSET ?
115            params.getWriteBufferSize() : tableConf.getWriteBufferSize();
116
117    // Set via the setter because it does value validation and starts/stops the TimerTask
118    long newWriteBufferPeriodicFlushTimeoutMs =
119            params.getWriteBufferPeriodicFlushTimeoutMs() != UNSET
120              ? params.getWriteBufferPeriodicFlushTimeoutMs()
121              : tableConf.getWriteBufferPeriodicFlushTimeoutMs();
122    long newWriteBufferPeriodicFlushTimerTickMs =
123            params.getWriteBufferPeriodicFlushTimerTickMs() != UNSET
124              ? params.getWriteBufferPeriodicFlushTimerTickMs()
125              : tableConf.getWriteBufferPeriodicFlushTimerTickMs();
126    this.setWriteBufferPeriodicFlush(
127            newWriteBufferPeriodicFlushTimeoutMs,
128            newWriteBufferPeriodicFlushTimerTickMs);
129
130    this.maxKeyValueSize =
131            params.getMaxKeyValueSize() != UNSET ?
132            params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize();
133
134    this.rpcTimeout = new AtomicInteger(
135            params.getRpcTimeout() != UNSET ?
136            params.getRpcTimeout() : conn.getConnectionConfiguration().getWriteRpcTimeout());
137
138    this.operationTimeout = new AtomicInteger(
139            params.getOperationTimeout() != UNSET ?
140            params.getOperationTimeout() : conn.getConnectionConfiguration().getOperationTimeout());
141    this.ap = ap;
142  }
143  BufferedMutatorImpl(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory,
144      RpcControllerFactory rpcFactory, BufferedMutatorParams params) {
145    this(conn, params,
146      // puts need to track errors globally due to how the APIs currently work.
147      new AsyncProcess(conn, conn.getConfiguration(), rpcCallerFactory, rpcFactory));
148  }
149
150  private void checkClose() {
151    if (closed) {
152      throw new IllegalStateException("Cannot put when the BufferedMutator is closed.");
153    }
154  }
155
156  @VisibleForTesting
157  ExecutorService getPool() {
158    return pool;
159  }
160
161  @VisibleForTesting
162  AsyncProcess getAsyncProcess() {
163    return ap;
164  }
165
166  @Override
167  public TableName getName() {
168    return tableName;
169  }
170
171  @Override
172  public Configuration getConfiguration() {
173    return conf;
174  }
175
176  @Override
177  public void mutate(Mutation m) throws InterruptedIOException,
178      RetriesExhaustedWithDetailsException {
179    mutate(Collections.singletonList(m));
180  }
181
182  @Override
183  public void mutate(List<? extends Mutation> ms) throws InterruptedIOException,
184      RetriesExhaustedWithDetailsException {
185    checkClose();
186
187    long toAddSize = 0;
188    int toAddCount = 0;
189    for (Mutation m : ms) {
190      if (m instanceof Put) {
191        ConnectionUtils.validatePut((Put) m, maxKeyValueSize);
192      }
193      toAddSize += m.heapSize();
194      ++toAddCount;
195    }
196
197    if (currentWriteBufferSize.get() == 0) {
198      firstRecordInBufferTimestamp.set(System.currentTimeMillis());
199    }
200    currentWriteBufferSize.addAndGet(toAddSize);
201    writeAsyncBuffer.addAll(ms);
202    undealtMutationCount.addAndGet(toAddCount);
203    doFlush(false);
204  }
205
206  @VisibleForTesting
207  protected long getExecutedWriteBufferPeriodicFlushes() {
208    return executedWriteBufferPeriodicFlushes.get();
209  }
210
211  private final AtomicLong firstRecordInBufferTimestamp = new AtomicLong(0);
212  private final AtomicLong executedWriteBufferPeriodicFlushes = new AtomicLong(0);
213
214  private void timerCallbackForWriteBufferPeriodicFlush() {
215    if (currentWriteBufferSize.get() == 0) {
216      return; // Nothing to flush
217    }
218    long now = System.currentTimeMillis();
219    if (firstRecordInBufferTimestamp.get() + writeBufferPeriodicFlushTimeoutMs.get() > now) {
220      return; // No need to flush yet
221    }
222    // The first record in the writebuffer has been in there too long --> flush
223    try {
224      executedWriteBufferPeriodicFlushes.incrementAndGet();
225      flush();
226    } catch (InterruptedIOException | RetriesExhaustedWithDetailsException e) {
227      LOG.error("Exception during timerCallbackForWriteBufferPeriodicFlush --> " + e.getMessage());
228    }
229  }
230
231  @Override
232  public synchronized void close() throws IOException {
233    if (closed) {
234      return;
235    }
236    // Stop any running Periodic Flush timer.
237    disableWriteBufferPeriodicFlush();
238    try {
239      // As we can have an operation in progress even if the buffer is empty, we call
240      // doFlush at least one time.
241      doFlush(true);
242    } finally {
243      if (cleanupPoolOnClose) {
244        this.pool.shutdown();
245        try {
246          if (!pool.awaitTermination(600, TimeUnit.SECONDS)) {
247            LOG.warn("close() failed to terminate pool after 10 minutes. Abandoning pool.");
248          }
249        } catch (InterruptedException e) {
250          LOG.warn("waitForTermination interrupted");
251          Thread.currentThread().interrupt();
252        }
253      }
254      closed = true;
255    }
256  }
257
258  private AsyncProcessTask createTask(QueueRowAccess access) {
259    return new AsyncProcessTask(AsyncProcessTask.newBuilder()
260        .setPool(pool)
261        .setTableName(tableName)
262        .setRowAccess(access)
263        .setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE)
264        .build()) {
265      @Override
266      public int getRpcTimeout() {
267        return rpcTimeout.get();
268      }
269
270      @Override
271      public int getOperationTimeout() {
272        return operationTimeout.get();
273      }
274    };
275  }
276
277  @Override
278  public void flush() throws InterruptedIOException, RetriesExhaustedWithDetailsException {
279    checkClose();
280    doFlush(true);
281  }
282
283  /**
284   * Send the operations in the buffer to the servers.
285   *
286   * @param flushAll - if true, sends all the writes and wait for all of them to finish before
287   *                 returning. Otherwise, flush until buffer size is smaller than threshold
288   */
289  private void doFlush(boolean flushAll) throws InterruptedIOException,
290      RetriesExhaustedWithDetailsException {
291    List<RetriesExhaustedWithDetailsException> errors = new ArrayList<>();
292    while (true) {
293      if (!flushAll && currentWriteBufferSize.get() <= writeBufferSize) {
294        // There is the room to accept more mutations.
295        break;
296      }
297      AsyncRequestFuture asf;
298      try (QueueRowAccess access = createQueueRowAccess()) {
299        if (access.isEmpty()) {
300          // It means someone has gotten the ticker to run the flush.
301          break;
302        }
303        asf = ap.submit(createTask(access));
304      }
305      // DON'T do the wait in the try-with-resources. Otherwise, the undealt mutations won't
306      // be released.
307      asf.waitUntilDone();
308      if (asf.hasError()) {
309        errors.add(asf.getErrors());
310      }
311    }
312
313    RetriesExhaustedWithDetailsException exception = makeException(errors);
314    if (exception == null) {
315      return;
316    } else if(listener == null) {
317      throw exception;
318    } else {
319      listener.onException(exception, this);
320    }
321  }
322
323  private static RetriesExhaustedWithDetailsException makeException(
324    List<RetriesExhaustedWithDetailsException> errors) {
325    switch (errors.size()) {
326      case 0:
327        return null;
328      case 1:
329        return errors.get(0);
330      default:
331        List<Throwable> exceptions = new ArrayList<>();
332        List<Row> actions = new ArrayList<>();
333        List<String> hostnameAndPort = new ArrayList<>();
334        errors.forEach(e -> {
335          exceptions.addAll(e.exceptions);
336          actions.addAll(e.actions);
337          hostnameAndPort.addAll(e.hostnameAndPort);
338        });
339        return new RetriesExhaustedWithDetailsException(exceptions, actions, hostnameAndPort);
340    }
341  }
342
343  /**
344   * {@inheritDoc}
345   */
346  @Override
347  public long getWriteBufferSize() {
348    return this.writeBufferSize;
349  }
350
351  @Override
352  public synchronized void setWriteBufferPeriodicFlush(long timeoutMs, long timerTickMs) {
353    long originalTimeoutMs   = this.writeBufferPeriodicFlushTimeoutMs.get();
354    long originalTimerTickMs = this.writeBufferPeriodicFlushTimerTickMs.get();
355
356    // Both parameters have minimal values.
357    writeBufferPeriodicFlushTimeoutMs.set(Math.max(0, timeoutMs));
358    writeBufferPeriodicFlushTimerTickMs.set(
359            Math.max(MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS, timerTickMs));
360
361    // If something changed we stop the old Timer.
362    if (writeBufferPeriodicFlushTimeoutMs.get() != originalTimeoutMs ||
363        writeBufferPeriodicFlushTimerTickMs.get() != originalTimerTickMs) {
364      if (writeBufferPeriodicFlushTimer != null) {
365        writeBufferPeriodicFlushTimer.cancel();
366        writeBufferPeriodicFlushTimer = null;
367      }
368    }
369
370    // If we have the need for a timer and there is none we start it
371    if (writeBufferPeriodicFlushTimer == null &&
372        writeBufferPeriodicFlushTimeoutMs.get() > 0) {
373      writeBufferPeriodicFlushTimer = new Timer(true); // Create Timer running as Daemon.
374      writeBufferPeriodicFlushTimer.schedule(new TimerTask() {
375        @Override
376        public void run() {
377          BufferedMutatorImpl.this.timerCallbackForWriteBufferPeriodicFlush();
378        }
379      }, writeBufferPeriodicFlushTimerTickMs.get(),
380         writeBufferPeriodicFlushTimerTickMs.get());
381    }
382  }
383
384  @Override
385  public long getWriteBufferPeriodicFlushTimeoutMs() {
386    return writeBufferPeriodicFlushTimeoutMs.get();
387  }
388
389  @Override
390  public long getWriteBufferPeriodicFlushTimerTickMs() {
391    return writeBufferPeriodicFlushTimerTickMs.get();
392  }
393
394  @Override
395  public void setRpcTimeout(int rpcTimeout) {
396    this.rpcTimeout.set(rpcTimeout);
397  }
398
399  @Override
400  public void setOperationTimeout(int operationTimeout) {
401    this.operationTimeout.set(operationTimeout);
402  }
403
404  @VisibleForTesting
405  long getCurrentWriteBufferSize() {
406    return currentWriteBufferSize.get();
407  }
408
409  /**
410   * Count the mutations which haven't been processed.
411   * @return count of undealt mutation
412   */
413  @VisibleForTesting
414  int size() {
415    return undealtMutationCount.get();
416  }
417
418  /**
419   * Count the mutations which haven't been flushed
420   * @return count of unflushed mutation
421   */
422  @VisibleForTesting
423  int getUnflushedSize() {
424    return writeAsyncBuffer.size();
425  }
426
427  @VisibleForTesting
428  QueueRowAccess createQueueRowAccess() {
429    return new QueueRowAccess();
430  }
431
432  @VisibleForTesting
433  class QueueRowAccess implements RowAccess<Row>, Closeable {
434    private int remainder = undealtMutationCount.getAndSet(0);
435    private Mutation last = null;
436
437    private void restoreLastMutation() {
438      // restore the last mutation since it isn't submitted
439      if (last != null) {
440        writeAsyncBuffer.add(last);
441        currentWriteBufferSize.addAndGet(last.heapSize());
442        last = null;
443      }
444    }
445
446    @Override
447    public void close() {
448      restoreLastMutation();
449      if (remainder > 0) {
450        undealtMutationCount.addAndGet(remainder);
451        remainder = 0;
452      }
453    }
454
455    @Override
456    public Iterator<Row> iterator() {
457      return new Iterator<Row>() {
458        private int countDown = remainder;
459        @Override
460        public boolean hasNext() {
461          return countDown > 0;
462        }
463        @Override
464        public Row next() {
465          restoreLastMutation();
466          if (!hasNext()) {
467            throw new NoSuchElementException();
468          }
469          last = writeAsyncBuffer.poll();
470          if (last == null) {
471            throw new NoSuchElementException();
472          }
473          currentWriteBufferSize.addAndGet(-last.heapSize());
474          --countDown;
475          return last;
476        }
477        @Override
478        public void remove() {
479          if (last == null) {
480            throw new IllegalStateException();
481          }
482          --remainder;
483          last = null;
484        }
485      };
486    }
487
488    @Override
489    public int size() {
490      return remainder;
491    }
492
493    @Override
494    public boolean isEmpty() {
495      return remainder <= 0;
496    }
497  }
498}