View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
4    * agreements. See the NOTICE file distributed with this work for additional information regarding
5    * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License. You may obtain a
7    * copy of the License at
8    *
9    *  http://www.apache.org/licenses/LICENSE-2.0
10   *
11   *  Unless required by applicable law or agreed to in writing, software distributed under the
12   * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
13   * express or implied. See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.apache.hadoop.hbase.client;
17  
18  import org.apache.commons.logging.Log;
19  import org.apache.commons.logging.LogFactory;
20  import org.apache.hadoop.conf.Configuration;
21  import org.apache.hadoop.hbase.TableName;
22  import org.apache.hadoop.hbase.classification.InterfaceAudience;
23  import org.apache.hadoop.hbase.classification.InterfaceStability;
24  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
25  
26  import java.io.IOException;
27  import java.io.InterruptedIOException;
28  import java.util.LinkedList;
29  import java.util.List;
30  import java.util.concurrent.ExecutorService;
31  import java.util.concurrent.TimeUnit;
32  
33  /**
34   * <p>
35   * Used to communicate with a single HBase table similar to {@link HTable}
36   * but meant for batched, potentially asynchronous puts. Obtain an instance from
37   * a {@link Connection} and call {@link #close()} afterwards.
38   * </p>
39   *
40   * @see ConnectionFactory
41   * @see Connection
42   * @since 1.0.0
43   */
44  @InterfaceAudience.Private
45  @InterfaceStability.Evolving
46  public class BufferedMutatorImpl implements BufferedMutator {
47  
48    private static final Log LOG = LogFactory.getLog(BufferedMutatorImpl.class);
49    
50    private final ExceptionListener listener;
51  
52    protected ClusterConnection connection; // non-final so can be overridden in test
53    private final TableName tableName;
54    private volatile Configuration conf;
55    private List<Row> writeAsyncBuffer = new LinkedList<>();
56    private long writeBufferSize;
57    private final int maxKeyValueSize;
58    protected long currentWriteBufferSize = 0;
59    private boolean closed = false;
60    private final ExecutorService pool;
61    protected AsyncProcess ap; // non-final so can be overridden in test
62  
63    BufferedMutatorImpl(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory,
64        RpcControllerFactory rpcFactory, BufferedMutatorParams params) {
65      if (conn == null || conn.isClosed()) {
66        throw new IllegalArgumentException("Connection is null or closed.");
67      }
68  
69      this.tableName = params.getTableName();
70      this.connection = conn;
71      this.conf = connection.getConfiguration();
72      this.pool = params.getPool();
73      this.listener = params.getListener();
74  
75      ConnectionConfiguration tableConf = new ConnectionConfiguration(conf);
76      this.writeBufferSize = params.getWriteBufferSize() != BufferedMutatorParams.UNSET ?
77          params.getWriteBufferSize() : tableConf.getWriteBufferSize();
78      this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ?
79          params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize();
80  
81      // puts need to track errors globally due to how the APIs currently work.
82      ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory);
83    }
84  
85    @Override
86    public TableName getName() {
87      return tableName;
88    }
89  
90    @Override
91    public Configuration getConfiguration() {
92      return conf;
93    }
94  
95    @Override
96    public synchronized void mutate(Mutation m) throws InterruptedIOException,
97        RetriesExhaustedWithDetailsException {
98      doMutate(m);
99    }
100 
101   @Override
102   public synchronized void mutate(List<? extends Mutation> ms) throws InterruptedIOException,
103       RetriesExhaustedWithDetailsException {
104     for (Mutation m : ms) {
105       doMutate(m);
106     }
107   }
108 
109   /**
110    * Add the put to the buffer. If the buffer is already too large, sends the buffer to the
111    * cluster.
112    *
113    * @throws RetriesExhaustedWithDetailsException if there is an error on the cluster.
114    * @throws InterruptedIOException if we were interrupted.
115    */
116   private void doMutate(Mutation m) throws InterruptedIOException,
117       RetriesExhaustedWithDetailsException {
118     if (closed) {
119       throw new IllegalStateException("Cannot put when the BufferedMutator is closed.");
120     }
121     if (!(m instanceof Put) && !(m instanceof Delete)) {
122       throw new IllegalArgumentException("Pass a Delete or a Put");
123     }
124 
125     // This behavior is highly non-intuitive... it does not protect us against
126     // 94-incompatible behavior, which is a timing issue because hasError, the below code
127     // and setter of hasError are not synchronized. Perhaps it should be removed.
128     if (ap.hasError()) {
129       writeAsyncBuffer.add(m);
130       backgroundFlushCommits(true);
131     }
132 
133     if (m instanceof Put) {
134       validatePut((Put) m);
135     }
136 
137     currentWriteBufferSize += m.heapSize();
138     writeAsyncBuffer.add(m);
139 
140     while (currentWriteBufferSize > writeBufferSize) {
141       backgroundFlushCommits(false);
142     }
143   }
144 
145   // validate for well-formedness
146   public void validatePut(final Put put) throws IllegalArgumentException {
147     HTable.validatePut(put, maxKeyValueSize);
148   }
149 
150   @Override
151   public synchronized void close() throws IOException {
152     if (this.closed) {
153       return;
154     }
155     try {
156       // As we can have an operation in progress even if the buffer is empty, we call
157       // backgroundFlushCommits at least one time.
158       backgroundFlushCommits(true);
159       this.pool.shutdown();
160       boolean terminated = false;
161       int loopCnt = 0;
162       do {
163         // wait until the pool has terminated
164         terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
165         loopCnt += 1;
166         if (loopCnt >= 10) {
167           LOG.warn("close() failed to terminate pool after 10 minutes. Abandoning pool.");
168           break;
169         }
170       } while (!terminated);
171     } catch (InterruptedException e) {
172       LOG.warn("waitForTermination interrupted");
173     } finally {
174       this.closed = true;
175     }
176   }
177 
178   @Override
179   public synchronized void flush() throws InterruptedIOException,
180       RetriesExhaustedWithDetailsException {
181     // As we can have an operation in progress even if the buffer is empty, we call
182     // backgroundFlushCommits at least one time.
183     backgroundFlushCommits(true);
184   }
185 
186   /**
187    * Send the operations in the buffer to the servers. Does not wait for the server's answer. If
188    * the is an error (max retried reach from a previous flush or bad operation), it tries to send
189    * all operations in the buffer and sends an exception.
190    *
191    * @param synchronous - if true, sends all the writes and wait for all of them to finish before
192    *        returning.
193    */
194   private void backgroundFlushCommits(boolean synchronous) throws InterruptedIOException,
195       RetriesExhaustedWithDetailsException {
196     try {
197       if (!synchronous) {
198         ap.submit(tableName, writeAsyncBuffer, true, null, false);
199         if (ap.hasError()) {
200           LOG.debug(tableName + ": One or more of the operations have failed -"
201               + " waiting for all operation in progress to finish (successfully or not)");
202         }
203       }
204       if (synchronous || ap.hasError()) {
205         while (!writeAsyncBuffer.isEmpty()) {
206           ap.submit(tableName, writeAsyncBuffer, true, null, false);
207         }
208         RetriesExhaustedWithDetailsException error =
209             ap.waitForAllPreviousOpsAndReset(null, tableName.getNameAsString());
210         if (error != null) {
211           if (listener == null) {
212             throw error;
213           } else {
214             this.listener.onException(error, this);
215           }
216         }
217       }
218     } finally {
219       currentWriteBufferSize = 0;
220       for (Row mut : writeAsyncBuffer) {
221         if (mut instanceof Mutation) {
222           currentWriteBufferSize += ((Mutation) mut).heapSize();
223         }
224       }
225     }
226   }
227 
228   /**
229    * This is used for legacy purposes in {@link HTable#setWriteBufferSize(long)} only. This ought
230    * not be called for production uses.
231    * @deprecated Going away when we drop public support for {@link HTableInterface}.
232    */
233   @Deprecated
234   public void setWriteBufferSize(long writeBufferSize) throws RetriesExhaustedWithDetailsException,
235       InterruptedIOException {
236     this.writeBufferSize = writeBufferSize;
237     if (currentWriteBufferSize > writeBufferSize) {
238       flush();
239     }
240   }
241 
242   /**
243    * {@inheritDoc}
244    */
245   @Override
246   public long getWriteBufferSize() {
247     return this.writeBufferSize;
248   }
249 
250   /**
251    * This is used for legacy purposes in {@link HTable#getWriteBuffer()} only. This should not beÓ
252    * called from production uses.
253    * @deprecated Going away when we drop public support for {@link HTableInterface}.
254 Ó   */
255   @Deprecated
256   public List<Row> getWriteBuffer() {
257     return this.writeAsyncBuffer;
258   }
259 }