View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.thrift;
20  
21  import java.io.IOException;
22  import java.util.Arrays;
23  import java.util.List;
24  import java.util.Set;
25  import java.util.concurrent.Callable;
26  import java.util.concurrent.ConcurrentHashMap;
27  import java.util.concurrent.ConcurrentMap;
28  import java.util.concurrent.LinkedBlockingQueue;
29  import java.util.concurrent.ThreadFactory;
30  import java.util.concurrent.ThreadPoolExecutor;
31  import java.util.concurrent.TimeUnit;
32  import java.util.concurrent.atomic.AtomicInteger;
33  import java.util.concurrent.atomic.AtomicLong;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.hbase.KeyValue;
38  import org.apache.hadoop.hbase.client.Table;
39  import org.apache.hadoop.hbase.thrift.ThriftServerRunner.HBaseHandler;
40  import org.apache.hadoop.hbase.thrift.generated.TIncrement;
41  import org.apache.hadoop.hbase.util.Bytes;
42  import org.apache.hadoop.hbase.util.Threads;
43  import org.apache.hadoop.metrics.util.MBeanUtil;
44  import org.apache.thrift.TException;
45  
46  /**
47   * This class will coalesce increments from a thift server if
48   * hbase.regionserver.thrift.coalesceIncrement is set to true. Turning this
49   * config to true will cause the thrift server to queue increments into an
50   * instance of this class. The thread pool associated with this class will drain
51   * the coalesced increments as the thread is able. This can cause data loss if the
52   * thrift server dies or is shut down before everything in the queue is drained.
53   *
54   */
55  public class IncrementCoalescer implements IncrementCoalescerMBean {
56  
57    /**
58     * Used to identify a cell that will be incremented.
59     *
60     */
61    static class FullyQualifiedRow {
62      private byte[] table;
63      private byte[] rowKey;
64      private byte[] family;
65      private byte[] qualifier;
66  
67      public FullyQualifiedRow(byte[] table, byte[] rowKey, byte[] fam, byte[] qual) {
68        super();
69        this.table = table;
70        this.rowKey = rowKey;
71        this.family = fam;
72        this.qualifier = qual;
73      }
74  
75      public byte[] getTable() {
76        return table;
77      }
78  
79      public void setTable(byte[] table) {
80        this.table = table;
81      }
82  
83      public byte[] getRowKey() {
84        return rowKey;
85      }
86  
87      public void setRowKey(byte[] rowKey) {
88        this.rowKey = rowKey;
89      }
90  
91      public byte[] getFamily() {
92        return family;
93      }
94  
95      public void setFamily(byte[] fam) {
96        this.family = fam;
97      }
98  
99      public byte[] getQualifier() {
100       return qualifier;
101     }
102 
103     public void setQualifier(byte[] qual) {
104       this.qualifier = qual;
105     }
106 
107     @Override
108     public int hashCode() {
109       final int prime = 31;
110       int result = 1;
111       result = prime * result + Arrays.hashCode(family);
112       result = prime * result + Arrays.hashCode(qualifier);
113       result = prime * result + Arrays.hashCode(rowKey);
114       result = prime * result + Arrays.hashCode(table);
115       return result;
116     }
117 
118     @Override
119     public boolean equals(Object obj) {
120       if (this == obj) return true;
121       if (obj == null) return false;
122       if (getClass() != obj.getClass()) return false;
123       FullyQualifiedRow other = (FullyQualifiedRow) obj;
124       if (!Arrays.equals(family, other.family)) return false;
125       if (!Arrays.equals(qualifier, other.qualifier)) return false;
126       if (!Arrays.equals(rowKey, other.rowKey)) return false;
127       if (!Arrays.equals(table, other.table)) return false;
128       return true;
129     }
130 
131   }
132 
133   static class DaemonThreadFactory implements ThreadFactory {
134     static final AtomicInteger poolNumber = new AtomicInteger(1);
135     final ThreadGroup group;
136     final AtomicInteger threadNumber = new AtomicInteger(1);
137     final String namePrefix;
138 
139     DaemonThreadFactory() {
140       SecurityManager s = System.getSecurityManager();
141       group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
142       namePrefix = "ICV-" + poolNumber.getAndIncrement() + "-thread-";
143     }
144 
145     public Thread newThread(Runnable r) {
146       Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
147       if (!t.isDaemon()) t.setDaemon(true);
148       if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY);
149       return t;
150     }
151   }
152 
153   private final AtomicLong failedIncrements = new AtomicLong();
154   private final AtomicLong successfulCoalescings = new AtomicLong();
155   private final AtomicLong totalIncrements = new AtomicLong();
156   private final ConcurrentMap<FullyQualifiedRow, Long> countersMap =
157       new ConcurrentHashMap<FullyQualifiedRow, Long>(100000, 0.75f, 1500);
158   private final ThreadPoolExecutor pool;
159   private final HBaseHandler handler;
160 
161   private int maxQueueSize = 500000;
162   private static final int CORE_POOL_SIZE = 1;
163 
164   protected final Log LOG = LogFactory.getLog(this.getClass().getName());
165 
166   @SuppressWarnings("deprecation")
167   public IncrementCoalescer(HBaseHandler hand) {
168     this.handler = hand;
169     LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
170     pool =
171         new ThreadPoolExecutor(CORE_POOL_SIZE, CORE_POOL_SIZE, 50, TimeUnit.MILLISECONDS, queue,
172             Threads.newDaemonThreadFactory("IncrementCoalescer"));
173 
174     MBeanUtil.registerMBean("thrift", "Thrift", this);
175   }
176 
177   public boolean queueIncrement(TIncrement inc) throws TException {
178     if (!canQueue()) {
179       failedIncrements.incrementAndGet();
180       return false;
181     }
182     return internalQueueTincrement(inc);
183   }
184 
185   public boolean queueIncrements(List<TIncrement> incs) throws TException {
186     if (!canQueue()) {
187       failedIncrements.incrementAndGet();
188       return false;
189     }
190 
191     for (TIncrement tinc : incs) {
192       internalQueueTincrement(tinc);
193     }
194     return true;
195 
196   }
197 
198   private boolean internalQueueTincrement(TIncrement inc) throws TException {
199     byte[][] famAndQf = KeyValue.parseColumn(inc.getColumn());
200     if (famAndQf.length != 2) return false;
201 
202     return internalQueueIncrement(inc.getTable(), inc.getRow(), famAndQf[0], famAndQf[1],
203       inc.getAmmount());
204   }
205 
206   private boolean internalQueueIncrement(byte[] tableName, byte[] rowKey, byte[] fam,
207       byte[] qual, long ammount) throws TException {
208     int countersMapSize = countersMap.size();
209 
210 
211     //Make sure that the number of threads is scaled.
212     dynamicallySetCoreSize(countersMapSize);
213 
214     totalIncrements.incrementAndGet();
215 
216     FullyQualifiedRow key = new FullyQualifiedRow(tableName, rowKey, fam, qual);
217 
218     long currentAmount = ammount;
219     // Spin until able to insert the value back without collisions
220     while (true) {
221       Long value = countersMap.remove(key);
222       if (value == null) {
223         // There was nothing there, create a new value
224         value = Long.valueOf(currentAmount);
225       } else {
226         value += currentAmount;
227         successfulCoalescings.incrementAndGet();
228       }
229       // Try to put the value, only if there was none
230       Long oldValue = countersMap.putIfAbsent(key, value);
231       if (oldValue == null) {
232         // We were able to put it in, we're done
233         break;
234       }
235       // Someone else was able to put a value in, so let's remember our
236       // current value (plus what we picked up) and retry to add it in
237       currentAmount = value;
238     }
239 
240     // We limit the size of the queue simply because all we need is a
241     // notification that something needs to be incremented. No need
242     // for millions of callables that mean the same thing.
243     if (pool.getQueue().size() <= 1000) {
244       // queue it up
245       Callable<Integer> callable = createIncCallable();
246       pool.submit(callable);
247     }
248 
249     return true;
250   }
251 
252   public boolean canQueue() {
253     return countersMap.size() < maxQueueSize;
254   }
255 
256   private Callable<Integer> createIncCallable() {
257     return new Callable<Integer>() {
258       @Override
259       public Integer call() throws Exception {
260         int failures = 0;
261         Set<FullyQualifiedRow> keys = countersMap.keySet();
262         for (FullyQualifiedRow row : keys) {
263           Long counter = countersMap.remove(row);
264           if (counter == null) {
265             continue;
266           }
267           Table table = null;
268           try {
269             table = handler.getTable(row.getTable());
270             if (failures > 2) {
271               throw new IOException("Auto-Fail rest of ICVs");
272             }
273             table.incrementColumnValue(row.getRowKey(), row.getFamily(), row.getQualifier(),
274               counter);
275           } catch (IOException e) {
276             // log failure of increment
277             failures++;
278             LOG.error("FAILED_ICV: " + Bytes.toString(row.getTable()) + ", "
279                 + Bytes.toStringBinary(row.getRowKey()) + ", "
280                 + Bytes.toStringBinary(row.getFamily()) + ", "
281                 + Bytes.toStringBinary(row.getQualifier()) + ", " + counter, e);
282           } finally{
283             if(table != null){
284               table.close();
285             }
286           }
287         }
288         return failures;
289       }
290     };
291   }
292 
293   /**
294    * This method samples the incoming requests and, if selected, will check if
295    * the corePoolSize should be changed.
296    * @param countersMapSize
297    */
298   private void dynamicallySetCoreSize(int countersMapSize) {
299     // Here we are using countersMapSize as a random number, meaning this
300     // could be a Random object
301     if (countersMapSize % 10 != 0) {
302       return;
303     }
304     double currentRatio = (double) countersMapSize / (double) maxQueueSize;
305     int newValue = 1;
306     if (currentRatio < 0.1) {
307       // it's 1
308     } else if (currentRatio < 0.3) {
309       newValue = 2;
310     } else if (currentRatio < 0.5) {
311       newValue = 4;
312     } else if (currentRatio < 0.7) {
313       newValue = 8;
314     } else if (currentRatio < 0.9) {
315       newValue = 14;
316     } else {
317       newValue = 22;
318     }
319     if (pool.getCorePoolSize() != newValue) {
320       pool.setCorePoolSize(newValue);
321     }
322   }
323 
324   // MBean get/set methods
325   public int getQueueSize() {
326     return pool.getQueue().size();
327   }
328   public int getMaxQueueSize() {
329     return this.maxQueueSize;
330   }
331   public void setMaxQueueSize(int newSize) {
332     this.maxQueueSize = newSize;
333   }
334 
335   public long getPoolCompletedTaskCount() {
336     return pool.getCompletedTaskCount();
337   }
338   public long getPoolTaskCount() {
339     return pool.getTaskCount();
340   }
341   public int getPoolLargestPoolSize() {
342     return pool.getLargestPoolSize();
343   }
344   public int getCorePoolSize() {
345     return pool.getCorePoolSize();
346   }
347   public void setCorePoolSize(int newCoreSize) {
348     pool.setCorePoolSize(newCoreSize);
349   }
350   public int getMaxPoolSize() {
351     return pool.getMaximumPoolSize();
352   }
353   public void setMaxPoolSize(int newMaxSize) {
354     pool.setMaximumPoolSize(newMaxSize);
355   }
356   public long getFailedIncrements() {
357     return failedIncrements.get();
358   }
359 
360   public long getSuccessfulCoalescings() {
361     return successfulCoalescings.get();
362   }
363 
364   public long getTotalIncrements() {
365     return totalIncrements.get();
366   }
367 
368   public long getCountersMapSize() {
369     return countersMap.size();
370   }
371 
372 }