1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  package org.apache.hadoop.hbase.client;
17  
18  import org.apache.commons.logging.Log;
19  import org.apache.commons.logging.LogFactory;
20  import org.apache.hadoop.conf.Configuration;
21  import org.apache.hadoop.hbase.TableName;
22  import org.apache.hadoop.hbase.classification.InterfaceAudience;
23  import org.apache.hadoop.hbase.classification.InterfaceStability;
24  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
25  
26  import java.io.IOException;
27  import java.io.InterruptedIOException;
28  import java.util.LinkedList;
29  import java.util.List;
30  import java.util.concurrent.ExecutorService;
31  import java.util.concurrent.TimeUnit;
32  
33  
34  
35  
36  
37  
38  
39  
40  
41  
42  
43  
44  @InterfaceAudience.Private
45  @InterfaceStability.Evolving
46  public class BufferedMutatorImpl implements BufferedMutator {
47  
48    private static final Log LOG = LogFactory.getLog(BufferedMutatorImpl.class);
49    
50    private final ExceptionListener listener;
51  
52    protected ClusterConnection connection; 
53    private final TableName tableName;
54    private volatile Configuration conf;
55    private List<Row> writeAsyncBuffer = new LinkedList<>();
56    private long writeBufferSize;
57    private final int maxKeyValueSize;
58    protected long currentWriteBufferSize = 0;
59    private boolean closed = false;
60    private final ExecutorService pool;
61    protected AsyncProcess ap; 
62  
63    BufferedMutatorImpl(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory,
64        RpcControllerFactory rpcFactory, BufferedMutatorParams params) {
65      if (conn == null || conn.isClosed()) {
66        throw new IllegalArgumentException("Connection is null or closed.");
67      }
68  
69      this.tableName = params.getTableName();
70      this.connection = conn;
71      this.conf = connection.getConfiguration();
72      this.pool = params.getPool();
73      this.listener = params.getListener();
74  
75      ConnectionConfiguration tableConf = new ConnectionConfiguration(conf);
76      this.writeBufferSize = params.getWriteBufferSize() != BufferedMutatorParams.UNSET ?
77          params.getWriteBufferSize() : tableConf.getWriteBufferSize();
78      this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ?
79          params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize();
80  
81      
82      ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory);
83    }
84  
85    @Override
86    public TableName getName() {
87      return tableName;
88    }
89  
90    @Override
91    public Configuration getConfiguration() {
92      return conf;
93    }
94  
95    @Override
96    public synchronized void mutate(Mutation m) throws InterruptedIOException,
97        RetriesExhaustedWithDetailsException {
98      doMutate(m);
99    }
100 
101   @Override
102   public synchronized void mutate(List<? extends Mutation> ms) throws InterruptedIOException,
103       RetriesExhaustedWithDetailsException {
104     for (Mutation m : ms) {
105       doMutate(m);
106     }
107   }
108 
109   
110 
111 
112 
113 
114 
115 
116   private void doMutate(Mutation m) throws InterruptedIOException,
117       RetriesExhaustedWithDetailsException {
118     if (closed) {
119       throw new IllegalStateException("Cannot put when the BufferedMutator is closed.");
120     }
121     if (!(m instanceof Put) && !(m instanceof Delete)) {
122       throw new IllegalArgumentException("Pass a Delete or a Put");
123     }
124 
125     
126     
127     
128     if (ap.hasError()) {
129       writeAsyncBuffer.add(m);
130       backgroundFlushCommits(true);
131     }
132 
133     if (m instanceof Put) {
134       validatePut((Put) m);
135     }
136 
137     currentWriteBufferSize += m.heapSize();
138     writeAsyncBuffer.add(m);
139 
140     while (currentWriteBufferSize > writeBufferSize) {
141       backgroundFlushCommits(false);
142     }
143   }
144 
145   
146   public void validatePut(final Put put) throws IllegalArgumentException {
147     HTable.validatePut(put, maxKeyValueSize);
148   }
149 
150   @Override
151   public synchronized void close() throws IOException {
152     if (this.closed) {
153       return;
154     }
155     try {
156       
157       
158       backgroundFlushCommits(true);
159       this.pool.shutdown();
160       boolean terminated = false;
161       int loopCnt = 0;
162       do {
163         
164         terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
165         loopCnt += 1;
166         if (loopCnt >= 10) {
167           LOG.warn("close() failed to terminate pool after 10 minutes. Abandoning pool.");
168           break;
169         }
170       } while (!terminated);
171     } catch (InterruptedException e) {
172       LOG.warn("waitForTermination interrupted");
173     } finally {
174       this.closed = true;
175     }
176   }
177 
178   @Override
179   public synchronized void flush() throws InterruptedIOException,
180       RetriesExhaustedWithDetailsException {
181     
182     
183     backgroundFlushCommits(true);
184   }
185 
186   
187 
188 
189 
190 
191 
192 
193 
194   private void backgroundFlushCommits(boolean synchronous) throws InterruptedIOException,
195       RetriesExhaustedWithDetailsException {
196     try {
197       if (!synchronous) {
198         ap.submit(tableName, writeAsyncBuffer, true, null, false);
199         if (ap.hasError()) {
200           LOG.debug(tableName + ": One or more of the operations have failed -"
201               + " waiting for all operation in progress to finish (successfully or not)");
202         }
203       }
204       if (synchronous || ap.hasError()) {
205         while (!writeAsyncBuffer.isEmpty()) {
206           ap.submit(tableName, writeAsyncBuffer, true, null, false);
207         }
208         RetriesExhaustedWithDetailsException error =
209             ap.waitForAllPreviousOpsAndReset(null, tableName.getNameAsString());
210         if (error != null) {
211           if (listener == null) {
212             throw error;
213           } else {
214             this.listener.onException(error, this);
215           }
216         }
217       }
218     } finally {
219       currentWriteBufferSize = 0;
220       for (Row mut : writeAsyncBuffer) {
221         if (mut instanceof Mutation) {
222           currentWriteBufferSize += ((Mutation) mut).heapSize();
223         }
224       }
225     }
226   }
227 
228   
229 
230 
231 
232 
233   @Deprecated
234   public void setWriteBufferSize(long writeBufferSize) throws RetriesExhaustedWithDetailsException,
235       InterruptedIOException {
236     this.writeBufferSize = writeBufferSize;
237     if (currentWriteBufferSize > writeBufferSize) {
238       flush();
239     }
240   }
241 
242   
243 
244 
245   @Override
246   public long getWriteBufferSize() {
247     return this.writeBufferSize;
248   }
249 
250   
251 
252 
253 
254 
255   @Deprecated
256   public List<Row> getWriteBuffer() {
257     return this.writeAsyncBuffer;
258   }
259 }