View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
4    * agreements. See the NOTICE file distributed with this work for additional information regarding
5    * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License. You may obtain a
7    * copy of the License at
8    *
9    *  http://www.apache.org/licenses/LICENSE-2.0
10   *
11   *  Unless required by applicable law or agreed to in writing, software distributed under the
12   * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
13   * express or implied. See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.apache.hadoop.hbase.client;
17  
18  import com.google.common.annotations.VisibleForTesting;
19  import org.apache.commons.logging.Log;
20  import org.apache.commons.logging.LogFactory;
21  import org.apache.hadoop.conf.Configuration;
22  import org.apache.hadoop.hbase.TableName;
23  import org.apache.hadoop.hbase.classification.InterfaceAudience;
24  import org.apache.hadoop.hbase.classification.InterfaceStability;
25  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
26  
27  import java.io.IOException;
28  import java.io.InterruptedIOException;
29  import java.util.Arrays;
30  import java.util.LinkedList;
31  import java.util.List;
32  import java.util.concurrent.ConcurrentLinkedQueue;
33  import java.util.concurrent.ExecutorService;
34  import java.util.concurrent.TimeUnit;
35  import java.util.concurrent.atomic.AtomicLong;
36  
37  /**
38   * <p>
39   * Used to communicate with a single HBase table similar to {@link HTable}
40   * but meant for batched, potentially asynchronous puts. Obtain an instance from
41   * a {@link Connection} and call {@link #close()} afterwards.
42   * </p>
43   *
44   * <p>
45   * While this can be used accross threads, great care should be used when doing so.
46   * Errors are global to the buffered mutator and the Exceptions can be thrown on any
47   * thread that causes the flush for requests.
48   * </p>
49   *
50   * @see ConnectionFactory
51   * @see Connection
52   * @since 1.0.0
53   */
54  @InterfaceAudience.Private
55  @InterfaceStability.Evolving
56  public class BufferedMutatorImpl implements BufferedMutator {
57  
58    private static final Log LOG = LogFactory.getLog(BufferedMutatorImpl.class);
59    
60    private final ExceptionListener listener;
61  
62    protected ClusterConnection connection; // non-final so can be overridden in test
63    private final TableName tableName;
64    private volatile Configuration conf;
65    @VisibleForTesting
66    final ConcurrentLinkedQueue<Mutation> writeAsyncBuffer = new ConcurrentLinkedQueue<Mutation>();
67    @VisibleForTesting
68    AtomicLong currentWriteBufferSize = new AtomicLong(0);
69  
70    private long writeBufferSize;
71    private final int maxKeyValueSize;
72    private boolean closed = false;
73    private final ExecutorService pool;
74  
75    @VisibleForTesting
76    protected AsyncProcess ap; // non-final so can be overridden in test
77  
78    BufferedMutatorImpl(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory,
79        RpcControllerFactory rpcFactory, BufferedMutatorParams params) {
80      if (conn == null || conn.isClosed()) {
81        throw new IllegalArgumentException("Connection is null or closed.");
82      }
83  
84      this.tableName = params.getTableName();
85      this.connection = conn;
86      this.conf = connection.getConfiguration();
87      this.pool = params.getPool();
88      this.listener = params.getListener();
89  
90      ConnectionConfiguration tableConf = new ConnectionConfiguration(conf);
91      this.writeBufferSize = params.getWriteBufferSize() != BufferedMutatorParams.UNSET ?
92          params.getWriteBufferSize() : tableConf.getWriteBufferSize();
93      this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ?
94          params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize();
95  
96      // puts need to track errors globally due to how the APIs currently work.
97      ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory);
98    }
99  
100   @Override
101   public TableName getName() {
102     return tableName;
103   }
104 
105   @Override
106   public Configuration getConfiguration() {
107     return conf;
108   }
109 
110   @Override
111   public void mutate(Mutation m) throws InterruptedIOException,
112       RetriesExhaustedWithDetailsException {
113     mutate(Arrays.asList(m));
114   }
115 
116   @Override
117   public void mutate(List<? extends Mutation> ms) throws InterruptedIOException,
118       RetriesExhaustedWithDetailsException {
119 
120     if (closed) {
121       throw new IllegalStateException("Cannot put when the BufferedMutator is closed.");
122     }
123 
124     long toAddSize = 0;
125     for (Mutation m : ms) {
126       if (m instanceof Put) {
127         validatePut((Put) m);
128       }
129       toAddSize += m.heapSize();
130     }
131 
132     // This behavior is highly non-intuitive... it does not protect us against
133     // 94-incompatible behavior, which is a timing issue because hasError, the below code
134     // and setter of hasError are not synchronized. Perhaps it should be removed.
135     if (ap.hasError()) {
136       currentWriteBufferSize.addAndGet(toAddSize);
137       writeAsyncBuffer.addAll(ms);
138       backgroundFlushCommits(true);
139     } else {
140       currentWriteBufferSize.addAndGet(toAddSize);
141       writeAsyncBuffer.addAll(ms);
142     }
143 
144     // Now try and queue what needs to be queued.
145     while (currentWriteBufferSize.get() > writeBufferSize) {
146       backgroundFlushCommits(false);
147     }
148   }
149 
150   // validate for well-formedness
151   public void validatePut(final Put put) throws IllegalArgumentException {
152     HTable.validatePut(put, maxKeyValueSize);
153   }
154 
155   @Override
156   public synchronized void close() throws IOException {
157     try {
158       if (this.closed) {
159         return;
160       }
161       // As we can have an operation in progress even if the buffer is empty, we call
162       // backgroundFlushCommits at least one time.
163       backgroundFlushCommits(true);
164       this.pool.shutdown();
165       boolean terminated;
166       int loopCnt = 0;
167       do {
168         // wait until the pool has terminated
169         terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
170         loopCnt += 1;
171         if (loopCnt >= 10) {
172           LOG.warn("close() failed to terminate pool after 10 minutes. Abandoning pool.");
173           break;
174         }
175       } while (!terminated);
176 
177     } catch (InterruptedException e) {
178       LOG.warn("waitForTermination interrupted");
179 
180     } finally {
181       this.closed = true;
182     }
183   }
184 
185   @Override
186   public synchronized void flush() throws InterruptedIOException,
187       RetriesExhaustedWithDetailsException {
188     // As we can have an operation in progress even if the buffer is empty, we call
189     // backgroundFlushCommits at least one time.
190     backgroundFlushCommits(true);
191   }
192 
193   /**
194    * Send the operations in the buffer to the servers. Does not wait for the server's answer. If
195    * the is an error (max retried reach from a previous flush or bad operation), it tries to send
196    * all operations in the buffer and sends an exception.
197    *
198    * @param synchronous - if true, sends all the writes and wait for all of them to finish before
199    *        returning.
200    */
201   private void backgroundFlushCommits(boolean synchronous) throws
202       InterruptedIOException,
203       RetriesExhaustedWithDetailsException {
204 
205     LinkedList<Mutation> buffer = new LinkedList<>();
206     // Keep track of the size so that this thread doesn't spin forever
207     long dequeuedSize = 0;
208 
209     try {
210       // Grab all of the available mutations.
211       Mutation m;
212 
213       // If there's no buffer size drain everything. If there is a buffersize drain up to twice
214       // that amount. This should keep the loop from continually spinning if there are threads
215       // that keep adding more data to the buffer.
216       while (
217           (writeBufferSize <= 0 || dequeuedSize < (writeBufferSize * 2) || synchronous)
218               && (m = writeAsyncBuffer.poll()) != null) {
219         buffer.add(m);
220         long size = m.heapSize();
221         dequeuedSize += size;
222         currentWriteBufferSize.addAndGet(-size);
223       }
224 
225       if (!synchronous && dequeuedSize == 0) {
226         return;
227       }
228 
229       if (!synchronous) {
230         ap.submit(tableName, buffer, true, null, false);
231         if (ap.hasError()) {
232           LOG.debug(tableName + ": One or more of the operations have failed -"
233               + " waiting for all operation in progress to finish (successfully or not)");
234         }
235       }
236       if (synchronous || ap.hasError()) {
237         while (!buffer.isEmpty()) {
238           ap.submit(tableName, buffer, true, null, false);
239         }
240         RetriesExhaustedWithDetailsException error =
241             ap.waitForAllPreviousOpsAndReset(null, tableName.getNameAsString());
242         if (error != null) {
243           if (listener == null) {
244             throw error;
245           } else {
246             this.listener.onException(error, this);
247           }
248         }
249       }
250     } finally {
251       for (Mutation mut : buffer) {
252         long size = mut.heapSize();
253         currentWriteBufferSize.addAndGet(size);
254         dequeuedSize -= size;
255         writeAsyncBuffer.add(mut);
256       }
257     }
258   }
259 
260   /**
261    * This is used for legacy purposes in {@link HTable#setWriteBufferSize(long)} only. This ought
262    * not be called for production uses.
263    * @deprecated Going away when we drop public support for {@link HTableInterface}.
264    */
265   @Deprecated
266   public void setWriteBufferSize(long writeBufferSize) throws RetriesExhaustedWithDetailsException,
267       InterruptedIOException {
268     this.writeBufferSize = writeBufferSize;
269     if (currentWriteBufferSize.get() > writeBufferSize) {
270       flush();
271     }
272   }
273 
274   /**
275    * {@inheritDoc}
276    */
277   @Override
278   public long getWriteBufferSize() {
279     return this.writeBufferSize;
280   }
281 
282   /**
283    * This is used for legacy purposes in {@link HTable#getWriteBuffer()} only. This should not beÓ
284    * called from production uses.
285    * @deprecated Going away when we drop public support for {@link HTableInterface}.
286 Ó   */
287   @Deprecated
288   public List<Row> getWriteBuffer() {
289     return Arrays.asList(writeAsyncBuffer.toArray(new Row[0]));
290   }
291 }