001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.BufferedMutatorParams.UNSET;
021
022import java.io.Closeable;
023import java.io.IOException;
024import java.io.InterruptedIOException;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.Iterator;
028import java.util.List;
029import java.util.NoSuchElementException;
030import java.util.Timer;
031import java.util.TimerTask;
032import java.util.concurrent.ConcurrentLinkedQueue;
033import java.util.concurrent.ExecutorService;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.atomic.AtomicInteger;
036import java.util.concurrent.atomic.AtomicLong;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.apache.yetus.audience.InterfaceStability;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 * <p>
048 * Used to communicate with a single HBase table similar to {@link Table} but meant for batched,
049 * potentially asynchronous puts. Obtain an instance from a {@link Connection} and call
050 * {@link #close()} afterwards. Provide an alternate to this implementation by setting
051 * {@link BufferedMutatorParams#implementationClassName(String)} or by setting alternate classname
052 * via the key {} in Configuration.
053 * </p>
054 * <p>
055 * While this can be used across threads, great care should be used when doing so. Errors are global
056 * to the buffered mutator and the Exceptions can be thrown on any thread that causes the flush for
057 * requests.
058 * </p>
059 * @see ConnectionFactory
060 * @see Connection
061 * @since 1.0.0
062 */
063@InterfaceAudience.Private
064@InterfaceStability.Evolving
065public class BufferedMutatorImpl implements BufferedMutator {
066
067  private static final Logger LOG = LoggerFactory.getLogger(BufferedMutatorImpl.class);
068
069  private final ExceptionListener listener;
070
071  private final TableName tableName;
072
073  private final Configuration conf;
074  private final ConcurrentLinkedQueue<Mutation> writeAsyncBuffer = new ConcurrentLinkedQueue<>();
075  private final AtomicLong currentWriteBufferSize = new AtomicLong(0);
076  /**
077   * Count the size of {@link BufferedMutatorImpl#writeAsyncBuffer}. The
078   * {@link ConcurrentLinkedQueue#size()} is NOT a constant-time operation.
079   */
080  private final AtomicInteger undealtMutationCount = new AtomicInteger(0);
081  private final long writeBufferSize;
082
083  private final AtomicLong writeBufferPeriodicFlushTimeoutMs = new AtomicLong(0);
084  private final AtomicLong writeBufferPeriodicFlushTimerTickMs =
085    new AtomicLong(MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
086  private Timer writeBufferPeriodicFlushTimer = null;
087
088  private final int maxKeyValueSize;
089  private final ExecutorService pool;
090  private final AtomicInteger rpcTimeout;
091  private final AtomicInteger operationTimeout;
092  private final boolean cleanupPoolOnClose;
093  private volatile boolean closed = false;
094  private final AsyncProcess ap;
095
096  BufferedMutatorImpl(ClusterConnection conn, BufferedMutatorParams params, AsyncProcess ap) {
097    if (conn == null || conn.isClosed()) {
098      throw new IllegalArgumentException("Connection is null or closed.");
099    }
100    this.tableName = params.getTableName();
101    this.conf = conn.getConfiguration();
102    this.listener = params.getListener();
103    if (params.getPool() == null) {
104      this.pool = HTable.getDefaultExecutor(conf);
105      cleanupPoolOnClose = true;
106    } else {
107      this.pool = params.getPool();
108      cleanupPoolOnClose = false;
109    }
110    ConnectionConfiguration tableConf = new ConnectionConfiguration(conf);
111    this.writeBufferSize = params.getWriteBufferSize() != UNSET
112      ? params.getWriteBufferSize()
113      : tableConf.getWriteBufferSize();
114
115    // Set via the setter because it does value validation and starts/stops the TimerTask
116    long newWriteBufferPeriodicFlushTimeoutMs =
117      params.getWriteBufferPeriodicFlushTimeoutMs() != UNSET
118        ? params.getWriteBufferPeriodicFlushTimeoutMs()
119        : tableConf.getWriteBufferPeriodicFlushTimeoutMs();
120    long newWriteBufferPeriodicFlushTimerTickMs =
121      params.getWriteBufferPeriodicFlushTimerTickMs() != UNSET
122        ? params.getWriteBufferPeriodicFlushTimerTickMs()
123        : tableConf.getWriteBufferPeriodicFlushTimerTickMs();
124    this.setWriteBufferPeriodicFlush(newWriteBufferPeriodicFlushTimeoutMs,
125      newWriteBufferPeriodicFlushTimerTickMs);
126
127    this.maxKeyValueSize = params.getMaxKeyValueSize() != UNSET
128      ? params.getMaxKeyValueSize()
129      : tableConf.getMaxKeyValueSize();
130
131    this.rpcTimeout = new AtomicInteger(params.getRpcTimeout() != UNSET
132      ? params.getRpcTimeout()
133      : conn.getConnectionConfiguration().getWriteRpcTimeout());
134
135    this.operationTimeout = new AtomicInteger(params.getOperationTimeout() != UNSET
136      ? params.getOperationTimeout()
137      : conn.getConnectionConfiguration().getOperationTimeout());
138    this.ap = ap;
139  }
140
141  BufferedMutatorImpl(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory,
142    RpcControllerFactory rpcFactory, BufferedMutatorParams params) {
143    this(conn, params,
144      // puts need to track errors globally due to how the APIs currently work.
145      new AsyncProcess(conn, conn.getConfiguration(), rpcCallerFactory, rpcFactory));
146  }
147
148  private void checkClose() {
149    if (closed) {
150      throw new IllegalStateException("Cannot put when the BufferedMutator is closed.");
151    }
152  }
153
154  ExecutorService getPool() {
155    return pool;
156  }
157
158  AsyncProcess getAsyncProcess() {
159    return ap;
160  }
161
162  @Override
163  public TableName getName() {
164    return tableName;
165  }
166
167  @Override
168  public Configuration getConfiguration() {
169    return conf;
170  }
171
172  @Override
173  public void mutate(Mutation m)
174    throws InterruptedIOException, RetriesExhaustedWithDetailsException {
175    mutate(Collections.singletonList(m));
176  }
177
178  @Override
179  public void mutate(List<? extends Mutation> ms)
180    throws InterruptedIOException, RetriesExhaustedWithDetailsException {
181    checkClose();
182
183    long toAddSize = 0;
184    int toAddCount = 0;
185    for (Mutation m : ms) {
186      if (m instanceof Put) {
187        ConnectionUtils.validatePut((Put) m, maxKeyValueSize);
188      }
189      toAddSize += m.heapSize();
190      ++toAddCount;
191    }
192
193    if (currentWriteBufferSize.get() == 0) {
194      firstRecordInBufferTimestamp.set(EnvironmentEdgeManager.currentTime());
195    }
196    currentWriteBufferSize.addAndGet(toAddSize);
197    writeAsyncBuffer.addAll(ms);
198    undealtMutationCount.addAndGet(toAddCount);
199    doFlush(false);
200  }
201
202  protected long getExecutedWriteBufferPeriodicFlushes() {
203    return executedWriteBufferPeriodicFlushes.get();
204  }
205
206  private final AtomicLong firstRecordInBufferTimestamp = new AtomicLong(0);
207  private final AtomicLong executedWriteBufferPeriodicFlushes = new AtomicLong(0);
208
209  private void timerCallbackForWriteBufferPeriodicFlush() {
210    if (currentWriteBufferSize.get() == 0) {
211      return; // Nothing to flush
212    }
213    long now = EnvironmentEdgeManager.currentTime();
214    if (firstRecordInBufferTimestamp.get() + writeBufferPeriodicFlushTimeoutMs.get() > now) {
215      return; // No need to flush yet
216    }
217    // The first record in the writebuffer has been in there too long --> flush
218    try {
219      executedWriteBufferPeriodicFlushes.incrementAndGet();
220      flush();
221    } catch (InterruptedIOException | RetriesExhaustedWithDetailsException e) {
222      LOG.error("Exception during timerCallbackForWriteBufferPeriodicFlush --> " + e.getMessage());
223    }
224  }
225
226  @Override
227  public synchronized void close() throws IOException {
228    if (closed) {
229      return;
230    }
231    // Stop any running Periodic Flush timer.
232    disableWriteBufferPeriodicFlush();
233    try {
234      // As we can have an operation in progress even if the buffer is empty, we call
235      // doFlush at least one time.
236      doFlush(true);
237    } finally {
238      if (cleanupPoolOnClose) {
239        this.pool.shutdown();
240        try {
241          if (!pool.awaitTermination(600, TimeUnit.SECONDS)) {
242            LOG.warn("close() failed to terminate pool after 10 minutes. Abandoning pool.");
243          }
244        } catch (InterruptedException e) {
245          LOG.warn("waitForTermination interrupted");
246          Thread.currentThread().interrupt();
247        }
248      }
249      closed = true;
250    }
251  }
252
253  private AsyncProcessTask createTask(QueueRowAccess access) {
254    return new AsyncProcessTask(AsyncProcessTask.newBuilder().setPool(pool).setTableName(tableName)
255      .setRowAccess(access).setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE).build()) {
256      @Override
257      public int getRpcTimeout() {
258        return rpcTimeout.get();
259      }
260
261      @Override
262      public int getOperationTimeout() {
263        return operationTimeout.get();
264      }
265    };
266  }
267
268  @Override
269  public void flush() throws InterruptedIOException, RetriesExhaustedWithDetailsException {
270    checkClose();
271    doFlush(true);
272  }
273
274  /**
275   * Send the operations in the buffer to the servers.
276   * @param flushAll - if true, sends all the writes and wait for all of them to finish before
277   *                 returning. Otherwise, flush until buffer size is smaller than threshold
278   */
279  private void doFlush(boolean flushAll)
280    throws InterruptedIOException, RetriesExhaustedWithDetailsException {
281    List<RetriesExhaustedWithDetailsException> errors = new ArrayList<>();
282    while (true) {
283      if (!flushAll && currentWriteBufferSize.get() <= writeBufferSize) {
284        // There is the room to accept more mutations.
285        break;
286      }
287      AsyncRequestFuture asf;
288      try (QueueRowAccess access = createQueueRowAccess()) {
289        if (access.isEmpty()) {
290          // It means someone has gotten the ticker to run the flush.
291          break;
292        }
293        asf = ap.submit(createTask(access));
294      }
295      // DON'T do the wait in the try-with-resources. Otherwise, the undealt mutations won't
296      // be released.
297      asf.waitUntilDone();
298      if (asf.hasError()) {
299        errors.add(asf.getErrors());
300      }
301    }
302
303    RetriesExhaustedWithDetailsException exception = makeException(errors);
304    if (exception == null) {
305      return;
306    } else if (listener == null) {
307      throw exception;
308    } else {
309      listener.onException(exception, this);
310    }
311  }
312
313  private static RetriesExhaustedWithDetailsException
314    makeException(List<RetriesExhaustedWithDetailsException> errors) {
315    switch (errors.size()) {
316      case 0:
317        return null;
318      case 1:
319        return errors.get(0);
320      default:
321        List<Throwable> exceptions = new ArrayList<>();
322        List<Row> actions = new ArrayList<>();
323        List<String> hostnameAndPort = new ArrayList<>();
324        errors.forEach(e -> {
325          exceptions.addAll(e.exceptions);
326          actions.addAll(e.actions);
327          hostnameAndPort.addAll(e.hostnameAndPort);
328        });
329        return new RetriesExhaustedWithDetailsException(exceptions, actions, hostnameAndPort);
330    }
331  }
332
333  /**
334   * {@inheritDoc}
335   */
336  @Override
337  public long getWriteBufferSize() {
338    return this.writeBufferSize;
339  }
340
341  @Override
342  public synchronized void setWriteBufferPeriodicFlush(long timeoutMs, long timerTickMs) {
343    long originalTimeoutMs = this.writeBufferPeriodicFlushTimeoutMs.get();
344    long originalTimerTickMs = this.writeBufferPeriodicFlushTimerTickMs.get();
345
346    // Both parameters have minimal values.
347    writeBufferPeriodicFlushTimeoutMs.set(Math.max(0, timeoutMs));
348    writeBufferPeriodicFlushTimerTickMs
349      .set(Math.max(MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS, timerTickMs));
350
351    // If something changed we stop the old Timer.
352    if (
353      writeBufferPeriodicFlushTimeoutMs.get() != originalTimeoutMs
354        || writeBufferPeriodicFlushTimerTickMs.get() != originalTimerTickMs
355    ) {
356      if (writeBufferPeriodicFlushTimer != null) {
357        writeBufferPeriodicFlushTimer.cancel();
358        writeBufferPeriodicFlushTimer = null;
359      }
360    }
361
362    // If we have the need for a timer and there is none we start it
363    if (writeBufferPeriodicFlushTimer == null && writeBufferPeriodicFlushTimeoutMs.get() > 0) {
364      writeBufferPeriodicFlushTimer = new Timer(true); // Create Timer running as Daemon.
365      writeBufferPeriodicFlushTimer.schedule(new TimerTask() {
366        @Override
367        public void run() {
368          BufferedMutatorImpl.this.timerCallbackForWriteBufferPeriodicFlush();
369        }
370      }, writeBufferPeriodicFlushTimerTickMs.get(), writeBufferPeriodicFlushTimerTickMs.get());
371    }
372  }
373
374  @Override
375  public long getWriteBufferPeriodicFlushTimeoutMs() {
376    return writeBufferPeriodicFlushTimeoutMs.get();
377  }
378
379  @Override
380  public long getWriteBufferPeriodicFlushTimerTickMs() {
381    return writeBufferPeriodicFlushTimerTickMs.get();
382  }
383
384  @Override
385  public void setRpcTimeout(int rpcTimeout) {
386    this.rpcTimeout.set(rpcTimeout);
387  }
388
389  @Override
390  public void setOperationTimeout(int operationTimeout) {
391    this.operationTimeout.set(operationTimeout);
392  }
393
394  long getCurrentWriteBufferSize() {
395    return currentWriteBufferSize.get();
396  }
397
398  /**
399   * Count the mutations which haven't been processed.
400   * @return count of undealt mutation
401   */
402  int size() {
403    return undealtMutationCount.get();
404  }
405
406  /**
407   * Count the mutations which haven't been flushed
408   * @return count of unflushed mutation
409   */
410  int getUnflushedSize() {
411    return writeAsyncBuffer.size();
412  }
413
414  QueueRowAccess createQueueRowAccess() {
415    return new QueueRowAccess();
416  }
417
418  class QueueRowAccess implements RowAccess<Row>, Closeable {
419    private int remainder = undealtMutationCount.getAndSet(0);
420    private Mutation last = null;
421
422    private void restoreLastMutation() {
423      // restore the last mutation since it isn't submitted
424      if (last != null) {
425        writeAsyncBuffer.add(last);
426        currentWriteBufferSize.addAndGet(last.heapSize());
427        last = null;
428      }
429    }
430
431    @Override
432    public void close() {
433      restoreLastMutation();
434      if (remainder > 0) {
435        undealtMutationCount.addAndGet(remainder);
436        remainder = 0;
437      }
438    }
439
440    @Override
441    public Iterator<Row> iterator() {
442      return new Iterator<Row>() {
443        private int countDown = remainder;
444
445        @Override
446        public boolean hasNext() {
447          return countDown > 0;
448        }
449
450        @Override
451        public Row next() {
452          restoreLastMutation();
453          if (!hasNext()) {
454            throw new NoSuchElementException();
455          }
456          last = writeAsyncBuffer.poll();
457          if (last == null) {
458            throw new NoSuchElementException();
459          }
460          currentWriteBufferSize.addAndGet(-last.heapSize());
461          --countDown;
462          return last;
463        }
464
465        @Override
466        public void remove() {
467          if (last == null) {
468            throw new IllegalStateException();
469          }
470          --remainder;
471          last = null;
472        }
473      };
474    }
475
476    @Override
477    public int size() {
478      return remainder;
479    }
480
481    @Override
482    public boolean isEmpty() {
483      return remainder <= 0;
484    }
485  }
486}