001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.thrift;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.List;
023import java.util.Set;
024import java.util.concurrent.Callable;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.ConcurrentMap;
027import java.util.concurrent.LinkedBlockingQueue;
028import java.util.concurrent.ThreadPoolExecutor;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.atomic.LongAdder;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.client.Table;
033import org.apache.hadoop.hbase.thrift.generated.TIncrement;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.apache.hadoop.hbase.util.Threads;
036import org.apache.hadoop.metrics2.util.MBeans;
037import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * This class will coalesce increments from a thift server if
044 * hbase.regionserver.thrift.coalesceIncrement is set to true. Turning this
045 * config to true will cause the thrift server to queue increments into an
046 * instance of this class. The thread pool associated with this class will drain
047 * the coalesced increments as the thread is able. This can cause data loss if the
048 * thrift server dies or is shut down before everything in the queue is drained.
049 *
050 */
051@InterfaceAudience.Private
052public class IncrementCoalescer implements IncrementCoalescerMBean {
053  /**
054   * Used to identify a cell that will be incremented.
055   *
056   */
057  static class FullyQualifiedRow {
058    private byte[] table;
059    private byte[] rowKey;
060    private byte[] family;
061    private byte[] qualifier;
062
063    public FullyQualifiedRow(byte[] table, byte[] rowKey, byte[] fam, byte[] qual) {
064      super();
065      this.table = table;
066      this.rowKey = rowKey;
067      this.family = fam;
068      this.qualifier = qual;
069    }
070
071    public byte[] getTable() {
072      return table;
073    }
074
075    public void setTable(byte[] table) {
076      this.table = table;
077    }
078
079    public byte[] getRowKey() {
080      return rowKey;
081    }
082
083    public byte[] getFamily() {
084      return family;
085    }
086
087    public void setFamily(byte[] fam) {
088      this.family = fam;
089    }
090
091    public byte[] getQualifier() {
092      return qualifier;
093    }
094
095    public void setQualifier(byte[] qual) {
096      this.qualifier = qual;
097    }
098
099    @Override
100    public int hashCode() {
101      final int prime = 31;
102      int result = 1;
103      result = prime * result + Arrays.hashCode(family);
104      result = prime * result + Arrays.hashCode(qualifier);
105      result = prime * result + Arrays.hashCode(rowKey);
106      result = prime * result + Arrays.hashCode(table);
107      return result;
108    }
109
110    @Override
111    public boolean equals(Object obj) {
112      if (this == obj) {
113        return true;
114      }
115      if (obj == null) {
116        return false;
117      }
118      if (getClass() != obj.getClass()) {
119        return false;
120      }
121
122      FullyQualifiedRow other = (FullyQualifiedRow) obj;
123
124      if (!Arrays.equals(family, other.family)) {
125        return false;
126      }
127      if (!Arrays.equals(qualifier, other.qualifier)) {
128        return false;
129      }
130      if (!Arrays.equals(rowKey, other.rowKey)) {
131        return false;
132      }
133
134      return Arrays.equals(table, other.table);
135    }
136  }
137
138  private final LongAdder failedIncrements = new LongAdder();
139  private final LongAdder successfulCoalescings = new LongAdder();
140  private final LongAdder totalIncrements = new LongAdder();
141  private final ConcurrentMap<FullyQualifiedRow, Long> countersMap =
142      new ConcurrentHashMap<>(100000, 0.75f, 1500);
143  private final ThreadPoolExecutor pool;
144  private final ThriftHBaseServiceHandler handler;
145
146  private int maxQueueSize = 500000;
147  private static final int CORE_POOL_SIZE = 1;
148
149  private static final Logger LOG = LoggerFactory.getLogger(IncrementCoalescer.class);
150
151  public IncrementCoalescer(ThriftHBaseServiceHandler hand) {
152    this.handler = hand;
153    LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
154    pool = new ThreadPoolExecutor(CORE_POOL_SIZE, CORE_POOL_SIZE, 50, TimeUnit.MILLISECONDS, queue,
155      new ThreadFactoryBuilder().setNameFormat("IncrementCoalescer-pool-%d").setDaemon(true)
156        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
157    MBeans.register("thrift", "Thrift", this);
158  }
159
160  public boolean queueIncrement(TIncrement inc) {
161    if (!canQueue()) {
162      failedIncrements.increment();
163      return false;
164    }
165    return internalQueueTincrement(inc);
166  }
167
168  public boolean queueIncrements(List<TIncrement> incs) {
169    if (!canQueue()) {
170      failedIncrements.increment();
171      return false;
172    }
173
174    for (TIncrement tinc : incs) {
175      internalQueueTincrement(tinc);
176    }
177
178    return true;
179  }
180
181  private boolean internalQueueTincrement(TIncrement inc) {
182    byte[][] famAndQf = CellUtil.parseColumn(inc.getColumn());
183
184    if (famAndQf.length != 2) {
185      return false;
186    }
187
188    return internalQueueIncrement(inc.getTable(), inc.getRow(), famAndQf[0], famAndQf[1],
189      inc.getAmmount());
190  }
191
192  @SuppressWarnings("FutureReturnValueIgnored")
193  private boolean internalQueueIncrement(byte[] tableName, byte[] rowKey, byte[] fam,
194      byte[] qual, long ammount) {
195    int countersMapSize = countersMap.size();
196
197    //Make sure that the number of threads is scaled.
198    dynamicallySetCoreSize(countersMapSize);
199
200    totalIncrements.increment();
201
202    FullyQualifiedRow key = new FullyQualifiedRow(tableName, rowKey, fam, qual);
203
204    long currentAmount = ammount;
205    // Spin until able to insert the value back without collisions
206    while (true) {
207      Long value = countersMap.remove(key);
208      if (value == null) {
209        // There was nothing there, create a new value
210        value = currentAmount;
211      } else {
212        value += currentAmount;
213        successfulCoalescings.increment();
214      }
215      // Try to put the value, only if there was none
216      Long oldValue = countersMap.putIfAbsent(key, value);
217      if (oldValue == null) {
218        // We were able to put it in, we're done
219        break;
220      }
221      // Someone else was able to put a value in, so let's remember our
222      // current value (plus what we picked up) and retry to add it in
223      currentAmount = value;
224    }
225
226    // We limit the size of the queue simply because all we need is a
227    // notification that something needs to be incremented. No need
228    // for millions of callables that mean the same thing.
229    if (pool.getQueue().size() <= 1000) {
230      // queue it up
231      Callable<Integer> callable = createIncCallable();
232      pool.submit(callable);
233    }
234
235    return true;
236  }
237
238  public boolean canQueue() {
239    return countersMap.size() < maxQueueSize;
240  }
241
242  private Callable<Integer> createIncCallable() {
243    return () -> {
244      int failures = 0;
245      Set<FullyQualifiedRow> keys = countersMap.keySet();
246      for (FullyQualifiedRow row : keys) {
247        Long counter = countersMap.remove(row);
248        if (counter == null) {
249          continue;
250        }
251        Table table = null;
252        try {
253          table = handler.getTable(row.getTable());
254          if (failures > 2) {
255            throw new IOException("Auto-Fail rest of ICVs");
256          }
257          table.incrementColumnValue(row.getRowKey(), row.getFamily(), row.getQualifier(),
258            counter);
259        } catch (IOException e) {
260          // log failure of increment
261          failures++;
262          LOG.error("FAILED_ICV: " + Bytes.toString(row.getTable()) + ", "
263              + Bytes.toStringBinary(row.getRowKey()) + ", "
264              + Bytes.toStringBinary(row.getFamily()) + ", "
265              + Bytes.toStringBinary(row.getQualifier()) + ", " + counter, e);
266        } finally{
267          if(table != null){
268            table.close();
269          }
270        }
271      }
272      return failures;
273    };
274  }
275
276  /**
277   * This method samples the incoming requests and, if selected, will check if
278   * the corePoolSize should be changed.
279   * @param countersMapSize the size of the counters map
280   */
281  private void dynamicallySetCoreSize(int countersMapSize) {
282    // Here we are using countersMapSize as a random number, meaning this
283    // could be a Random object
284    if (countersMapSize % 10 != 0) {
285      return;
286    }
287    double currentRatio = (double) countersMapSize / (double) maxQueueSize;
288    int newValue;
289
290    if (currentRatio < 0.1) {
291      newValue = 1;
292    } else if (currentRatio < 0.3) {
293      newValue = 2;
294    } else if (currentRatio < 0.5) {
295      newValue = 4;
296    } else if (currentRatio < 0.7) {
297      newValue = 8;
298    } else if (currentRatio < 0.9) {
299      newValue = 14;
300    } else {
301      newValue = 22;
302    }
303
304    if (pool.getCorePoolSize() != newValue) {
305      pool.setCorePoolSize(newValue);
306    }
307  }
308
309  // MBean get/set methods
310  @Override
311  public int getQueueSize() {
312    return pool.getQueue().size();
313  }
314
315  @Override
316  public int getMaxQueueSize() {
317    return this.maxQueueSize;
318  }
319
320  @Override
321  public void setMaxQueueSize(int newSize) {
322    this.maxQueueSize = newSize;
323  }
324
325  @Override
326  public long getPoolCompletedTaskCount() {
327    return pool.getCompletedTaskCount();
328  }
329
330  @Override
331  public long getPoolTaskCount() {
332    return pool.getTaskCount();
333  }
334
335  @Override
336  public int getPoolLargestPoolSize() {
337    return pool.getLargestPoolSize();
338  }
339
340  @Override
341  public int getCorePoolSize() {
342    return pool.getCorePoolSize();
343  }
344
345  @Override
346  public void setCorePoolSize(int newCoreSize) {
347    pool.setCorePoolSize(newCoreSize);
348  }
349
350  @Override
351  public int getMaxPoolSize() {
352    return pool.getMaximumPoolSize();
353  }
354
355  @Override
356  public void setMaxPoolSize(int newMaxSize) {
357    pool.setMaximumPoolSize(newMaxSize);
358  }
359
360  @Override
361  public long getFailedIncrements() {
362    return failedIncrements.sum();
363  }
364
365  @Override
366  public long getSuccessfulCoalescings() {
367    return successfulCoalescings.sum();
368  }
369
370  @Override
371  public long getTotalIncrements() {
372    return totalIncrements.sum();
373  }
374
375  @Override
376  public long getCountersMapSize() {
377    return countersMap.size();
378  }
379}