001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
004 * agreements. See the NOTICE file distributed with this work for additional information regarding
005 * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance with the License. You may obtain a
007 * copy of the License at
008 *
009 *  http://www.apache.org/licenses/LICENSE-2.0
010 *
011 *  Unless required by applicable law or agreed to in writing, software distributed under the
012 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
013 * express or implied. See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.apache.hadoop.hbase.client;
017
018import static org.apache.hadoop.hbase.client.BufferedMutatorParams.UNSET;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.util.ArrayList;
024import java.util.Collections;
025import java.util.Iterator;
026import java.util.List;
027import java.util.NoSuchElementException;
028import java.util.Timer;
029import java.util.TimerTask;
030import java.util.concurrent.ConcurrentLinkedQueue;
031import java.util.concurrent.ExecutorService;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicInteger;
034import java.util.concurrent.atomic.AtomicLong;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.apache.yetus.audience.InterfaceStability;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043/**
044 * <p>
045 * Used to communicate with a single HBase table similar to {@link Table}
046 * but meant for batched, potentially asynchronous puts. Obtain an instance from
047 * a {@link Connection} and call {@link #close()} afterwards. Provide an alternate
048 * to this implementation by setting {@link BufferedMutatorParams#implementationClassName(String)}
049 * or by setting alternate classname via the key {} in Configuration.
050 * </p>
051 *
052 * <p>
053 * While this can be used across threads, great care should be used when doing so.
054 * Errors are global to the buffered mutator and the Exceptions can be thrown on any
055 * thread that causes the flush for requests.
056 * </p>
057 *
058 * @see ConnectionFactory
059 * @see Connection
060 * @since 1.0.0
061 */
062@InterfaceAudience.Private
063@InterfaceStability.Evolving
064public class BufferedMutatorImpl implements BufferedMutator {
065
066  private static final Logger LOG = LoggerFactory.getLogger(BufferedMutatorImpl.class);
067
068  private final ExceptionListener listener;
069
070  private final TableName tableName;
071
072  private final Configuration conf;
073  private final ConcurrentLinkedQueue<Mutation> writeAsyncBuffer = new ConcurrentLinkedQueue<>();
074  private final AtomicLong currentWriteBufferSize = new AtomicLong(0);
075  /**
076   * Count the size of {@link BufferedMutatorImpl#writeAsyncBuffer}.
077   * The {@link ConcurrentLinkedQueue#size()} is NOT a constant-time operation.
078   */
079  private final AtomicInteger undealtMutationCount = new AtomicInteger(0);
080  private final long writeBufferSize;
081
082  private final AtomicLong writeBufferPeriodicFlushTimeoutMs = new AtomicLong(0);
083  private final AtomicLong writeBufferPeriodicFlushTimerTickMs =
084          new AtomicLong(MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
085  private Timer writeBufferPeriodicFlushTimer = null;
086
087  private final int maxKeyValueSize;
088  private final ExecutorService pool;
089  private final AtomicInteger rpcTimeout;
090  private final AtomicInteger operationTimeout;
091  private final boolean cleanupPoolOnClose;
092  private volatile boolean closed = false;
093  private final AsyncProcess ap;
094
095  BufferedMutatorImpl(ClusterConnection conn, BufferedMutatorParams params, AsyncProcess ap) {
096    if (conn == null || conn.isClosed()) {
097      throw new IllegalArgumentException("Connection is null or closed.");
098    }
099    this.tableName = params.getTableName();
100    this.conf = conn.getConfiguration();
101    this.listener = params.getListener();
102    if (params.getPool() == null) {
103      this.pool = HTable.getDefaultExecutor(conf);
104      cleanupPoolOnClose = true;
105    } else {
106      this.pool = params.getPool();
107      cleanupPoolOnClose = false;
108    }
109    ConnectionConfiguration tableConf = new ConnectionConfiguration(conf);
110    this.writeBufferSize =
111            params.getWriteBufferSize() != UNSET ?
112            params.getWriteBufferSize() : tableConf.getWriteBufferSize();
113
114    // Set via the setter because it does value validation and starts/stops the TimerTask
115    long newWriteBufferPeriodicFlushTimeoutMs =
116            params.getWriteBufferPeriodicFlushTimeoutMs() != UNSET
117              ? params.getWriteBufferPeriodicFlushTimeoutMs()
118              : tableConf.getWriteBufferPeriodicFlushTimeoutMs();
119    long newWriteBufferPeriodicFlushTimerTickMs =
120            params.getWriteBufferPeriodicFlushTimerTickMs() != UNSET
121              ? params.getWriteBufferPeriodicFlushTimerTickMs()
122              : tableConf.getWriteBufferPeriodicFlushTimerTickMs();
123    this.setWriteBufferPeriodicFlush(
124            newWriteBufferPeriodicFlushTimeoutMs,
125            newWriteBufferPeriodicFlushTimerTickMs);
126
127    this.maxKeyValueSize =
128            params.getMaxKeyValueSize() != UNSET ?
129            params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize();
130
131    this.rpcTimeout = new AtomicInteger(
132            params.getRpcTimeout() != UNSET ?
133            params.getRpcTimeout() : conn.getConnectionConfiguration().getWriteRpcTimeout());
134
135    this.operationTimeout = new AtomicInteger(
136            params.getOperationTimeout() != UNSET ?
137            params.getOperationTimeout() : conn.getConnectionConfiguration().getOperationTimeout());
138    this.ap = ap;
139  }
140  BufferedMutatorImpl(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory,
141      RpcControllerFactory rpcFactory, BufferedMutatorParams params) {
142    this(conn, params,
143      // puts need to track errors globally due to how the APIs currently work.
144      new AsyncProcess(conn, conn.getConfiguration(), rpcCallerFactory, rpcFactory));
145  }
146
147  private void checkClose() {
148    if (closed) {
149      throw new IllegalStateException("Cannot put when the BufferedMutator is closed.");
150    }
151  }
152
153  ExecutorService getPool() {
154    return pool;
155  }
156
157  AsyncProcess getAsyncProcess() {
158    return ap;
159  }
160
161  @Override
162  public TableName getName() {
163    return tableName;
164  }
165
166  @Override
167  public Configuration getConfiguration() {
168    return conf;
169  }
170
171  @Override
172  public void mutate(Mutation m) throws InterruptedIOException,
173      RetriesExhaustedWithDetailsException {
174    mutate(Collections.singletonList(m));
175  }
176
177  @Override
178  public void mutate(List<? extends Mutation> ms) throws InterruptedIOException,
179      RetriesExhaustedWithDetailsException {
180    checkClose();
181
182    long toAddSize = 0;
183    int toAddCount = 0;
184    for (Mutation m : ms) {
185      if (m instanceof Put) {
186        ConnectionUtils.validatePut((Put) m, maxKeyValueSize);
187      }
188      toAddSize += m.heapSize();
189      ++toAddCount;
190    }
191
192    if (currentWriteBufferSize.get() == 0) {
193      firstRecordInBufferTimestamp.set(System.currentTimeMillis());
194    }
195    currentWriteBufferSize.addAndGet(toAddSize);
196    writeAsyncBuffer.addAll(ms);
197    undealtMutationCount.addAndGet(toAddCount);
198    doFlush(false);
199  }
200
201  protected long getExecutedWriteBufferPeriodicFlushes() {
202    return executedWriteBufferPeriodicFlushes.get();
203  }
204
205  private final AtomicLong firstRecordInBufferTimestamp = new AtomicLong(0);
206  private final AtomicLong executedWriteBufferPeriodicFlushes = new AtomicLong(0);
207
208  private void timerCallbackForWriteBufferPeriodicFlush() {
209    if (currentWriteBufferSize.get() == 0) {
210      return; // Nothing to flush
211    }
212    long now = System.currentTimeMillis();
213    if (firstRecordInBufferTimestamp.get() + writeBufferPeriodicFlushTimeoutMs.get() > now) {
214      return; // No need to flush yet
215    }
216    // The first record in the writebuffer has been in there too long --> flush
217    try {
218      executedWriteBufferPeriodicFlushes.incrementAndGet();
219      flush();
220    } catch (InterruptedIOException | RetriesExhaustedWithDetailsException e) {
221      LOG.error("Exception during timerCallbackForWriteBufferPeriodicFlush --> " + e.getMessage());
222    }
223  }
224
225  @Override
226  public synchronized void close() throws IOException {
227    if (closed) {
228      return;
229    }
230    // Stop any running Periodic Flush timer.
231    disableWriteBufferPeriodicFlush();
232    try {
233      // As we can have an operation in progress even if the buffer is empty, we call
234      // doFlush at least one time.
235      doFlush(true);
236    } finally {
237      if (cleanupPoolOnClose) {
238        this.pool.shutdown();
239        try {
240          if (!pool.awaitTermination(600, TimeUnit.SECONDS)) {
241            LOG.warn("close() failed to terminate pool after 10 minutes. Abandoning pool.");
242          }
243        } catch (InterruptedException e) {
244          LOG.warn("waitForTermination interrupted");
245          Thread.currentThread().interrupt();
246        }
247      }
248      closed = true;
249    }
250  }
251
252  private AsyncProcessTask createTask(QueueRowAccess access) {
253    return new AsyncProcessTask(AsyncProcessTask.newBuilder()
254        .setPool(pool)
255        .setTableName(tableName)
256        .setRowAccess(access)
257        .setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE)
258        .build()) {
259      @Override
260      public int getRpcTimeout() {
261        return rpcTimeout.get();
262      }
263
264      @Override
265      public int getOperationTimeout() {
266        return operationTimeout.get();
267      }
268    };
269  }
270
271  @Override
272  public void flush() throws InterruptedIOException, RetriesExhaustedWithDetailsException {
273    checkClose();
274    doFlush(true);
275  }
276
277  /**
278   * Send the operations in the buffer to the servers.
279   *
280   * @param flushAll - if true, sends all the writes and wait for all of them to finish before
281   *                 returning. Otherwise, flush until buffer size is smaller than threshold
282   */
283  private void doFlush(boolean flushAll) throws InterruptedIOException,
284      RetriesExhaustedWithDetailsException {
285    List<RetriesExhaustedWithDetailsException> errors = new ArrayList<>();
286    while (true) {
287      if (!flushAll && currentWriteBufferSize.get() <= writeBufferSize) {
288        // There is the room to accept more mutations.
289        break;
290      }
291      AsyncRequestFuture asf;
292      try (QueueRowAccess access = createQueueRowAccess()) {
293        if (access.isEmpty()) {
294          // It means someone has gotten the ticker to run the flush.
295          break;
296        }
297        asf = ap.submit(createTask(access));
298      }
299      // DON'T do the wait in the try-with-resources. Otherwise, the undealt mutations won't
300      // be released.
301      asf.waitUntilDone();
302      if (asf.hasError()) {
303        errors.add(asf.getErrors());
304      }
305    }
306
307    RetriesExhaustedWithDetailsException exception = makeException(errors);
308    if (exception == null) {
309      return;
310    } else if(listener == null) {
311      throw exception;
312    } else {
313      listener.onException(exception, this);
314    }
315  }
316
317  private static RetriesExhaustedWithDetailsException makeException(
318    List<RetriesExhaustedWithDetailsException> errors) {
319    switch (errors.size()) {
320      case 0:
321        return null;
322      case 1:
323        return errors.get(0);
324      default:
325        List<Throwable> exceptions = new ArrayList<>();
326        List<Row> actions = new ArrayList<>();
327        List<String> hostnameAndPort = new ArrayList<>();
328        errors.forEach(e -> {
329          exceptions.addAll(e.exceptions);
330          actions.addAll(e.actions);
331          hostnameAndPort.addAll(e.hostnameAndPort);
332        });
333        return new RetriesExhaustedWithDetailsException(exceptions, actions, hostnameAndPort);
334    }
335  }
336
337  /**
338   * {@inheritDoc}
339   */
340  @Override
341  public long getWriteBufferSize() {
342    return this.writeBufferSize;
343  }
344
345  @Override
346  public synchronized void setWriteBufferPeriodicFlush(long timeoutMs, long timerTickMs) {
347    long originalTimeoutMs   = this.writeBufferPeriodicFlushTimeoutMs.get();
348    long originalTimerTickMs = this.writeBufferPeriodicFlushTimerTickMs.get();
349
350    // Both parameters have minimal values.
351    writeBufferPeriodicFlushTimeoutMs.set(Math.max(0, timeoutMs));
352    writeBufferPeriodicFlushTimerTickMs.set(
353            Math.max(MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS, timerTickMs));
354
355    // If something changed we stop the old Timer.
356    if (writeBufferPeriodicFlushTimeoutMs.get() != originalTimeoutMs ||
357        writeBufferPeriodicFlushTimerTickMs.get() != originalTimerTickMs) {
358      if (writeBufferPeriodicFlushTimer != null) {
359        writeBufferPeriodicFlushTimer.cancel();
360        writeBufferPeriodicFlushTimer = null;
361      }
362    }
363
364    // If we have the need for a timer and there is none we start it
365    if (writeBufferPeriodicFlushTimer == null &&
366        writeBufferPeriodicFlushTimeoutMs.get() > 0) {
367      writeBufferPeriodicFlushTimer = new Timer(true); // Create Timer running as Daemon.
368      writeBufferPeriodicFlushTimer.schedule(new TimerTask() {
369        @Override
370        public void run() {
371          BufferedMutatorImpl.this.timerCallbackForWriteBufferPeriodicFlush();
372        }
373      }, writeBufferPeriodicFlushTimerTickMs.get(),
374         writeBufferPeriodicFlushTimerTickMs.get());
375    }
376  }
377
378  @Override
379  public long getWriteBufferPeriodicFlushTimeoutMs() {
380    return writeBufferPeriodicFlushTimeoutMs.get();
381  }
382
383  @Override
384  public long getWriteBufferPeriodicFlushTimerTickMs() {
385    return writeBufferPeriodicFlushTimerTickMs.get();
386  }
387
388  @Override
389  public void setRpcTimeout(int rpcTimeout) {
390    this.rpcTimeout.set(rpcTimeout);
391  }
392
393  @Override
394  public void setOperationTimeout(int operationTimeout) {
395    this.operationTimeout.set(operationTimeout);
396  }
397
398  long getCurrentWriteBufferSize() {
399    return currentWriteBufferSize.get();
400  }
401
402  /**
403   * Count the mutations which haven't been processed.
404   * @return count of undealt mutation
405   */
406  int size() {
407    return undealtMutationCount.get();
408  }
409
410  /**
411   * Count the mutations which haven't been flushed
412   * @return count of unflushed mutation
413   */
414  int getUnflushedSize() {
415    return writeAsyncBuffer.size();
416  }
417
418  QueueRowAccess createQueueRowAccess() {
419    return new QueueRowAccess();
420  }
421
422  class QueueRowAccess implements RowAccess<Row>, Closeable {
423    private int remainder = undealtMutationCount.getAndSet(0);
424    private Mutation last = null;
425
426    private void restoreLastMutation() {
427      // restore the last mutation since it isn't submitted
428      if (last != null) {
429        writeAsyncBuffer.add(last);
430        currentWriteBufferSize.addAndGet(last.heapSize());
431        last = null;
432      }
433    }
434
435    @Override
436    public void close() {
437      restoreLastMutation();
438      if (remainder > 0) {
439        undealtMutationCount.addAndGet(remainder);
440        remainder = 0;
441      }
442    }
443
444    @Override
445    public Iterator<Row> iterator() {
446      return new Iterator<Row>() {
447        private int countDown = remainder;
448        @Override
449        public boolean hasNext() {
450          return countDown > 0;
451        }
452        @Override
453        public Row next() {
454          restoreLastMutation();
455          if (!hasNext()) {
456            throw new NoSuchElementException();
457          }
458          last = writeAsyncBuffer.poll();
459          if (last == null) {
460            throw new NoSuchElementException();
461          }
462          currentWriteBufferSize.addAndGet(-last.heapSize());
463          --countDown;
464          return last;
465        }
466        @Override
467        public void remove() {
468          if (last == null) {
469            throw new IllegalStateException();
470          }
471          --remainder;
472          last = null;
473        }
474      };
475    }
476
477    @Override
478    public int size() {
479      return remainder;
480    }
481
482    @Override
483    public boolean isEmpty() {
484      return remainder <= 0;
485    }
486  }
487}