001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.HashSet;
023import java.util.List;
024import java.util.Objects;
025import java.util.Random;
026import java.util.Set;
027import org.apache.hadoop.hbase.HBaseTestingUtil;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
030import org.apache.hadoop.hbase.client.Get;
031import org.apache.hadoop.hbase.client.Increment;
032import org.apache.hadoop.hbase.client.Put;
033import org.apache.hadoop.hbase.client.ResultScanner;
034import org.apache.hadoop.hbase.client.Scan;
035import org.apache.hadoop.hbase.client.Table;
036import org.apache.hadoop.hbase.security.User;
037import org.apache.hadoop.hbase.util.Bytes;
038import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
039import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
040import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
041import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046@InterfaceAudience.Private
047public final class ThrottleQuotaTestUtil {
048
049  private final static Logger LOG = LoggerFactory.getLogger(ThrottleQuotaTestUtil.class);
050  private static ManualEnvironmentEdge envEdge = new ManualEnvironmentEdge();
051  private final static int REFRESH_TIME = 30 * 60000;
052  static {
053    envEdge.setValue(EnvironmentEdgeManager.currentTime());
054    // only active the envEdge for quotas package
055    EnvironmentEdgeManagerTestHelper.injectEdgeForPackage(envEdge,
056      ThrottleQuotaTestUtil.class.getPackage().getName());
057  }
058
059  private ThrottleQuotaTestUtil() {
060    // Hide utility class constructor
061    LOG.debug("Call constructor of ThrottleQuotaTestUtil");
062  }
063
064  static int doPuts(int maxOps, byte[] family, byte[] qualifier, final Table... tables) {
065    return doPuts(maxOps, -1, family, qualifier, tables);
066  }
067
068  static int doPuts(int maxOps, int valueSize, byte[] family, byte[] qualifier,
069    final Table... tables) {
070    int count = 0;
071    try {
072      while (count < maxOps) {
073        Put put = new Put(Bytes.toBytes("row-" + count));
074        byte[] value;
075        if (valueSize < 0) {
076          value = Bytes.toBytes("data-" + count);
077        } else {
078          value = generateValue(valueSize);
079        }
080        put.addColumn(family, qualifier, value);
081        for (final Table table : tables) {
082          table.put(put);
083        }
084        count += tables.length;
085      }
086    } catch (IOException e) {
087      LOG.error("put failed after nRetries=" + count, e);
088    }
089    return count;
090  }
091
092  private static byte[] generateValue(int valueSize) {
093    byte[] bytes = new byte[valueSize];
094    for (int i = 0; i < valueSize; i++) {
095      bytes[i] = 'a';
096    }
097    return bytes;
098  }
099
100  static long doGets(int maxOps, final Table... tables) {
101    int count = 0;
102    try {
103      while (count < maxOps) {
104        Get get = new Get(Bytes.toBytes("row-" + count));
105        for (final Table table : tables) {
106          table.get(get);
107        }
108        count += tables.length;
109      }
110    } catch (IOException e) {
111      LOG.error("get failed after nRetries=" + count, e);
112    }
113    return count;
114  }
115
116  static long doGets(int maxOps, byte[] family, byte[] qualifier, final Table... tables) {
117    int count = 0;
118    try {
119      while (count < maxOps) {
120        Get get = new Get(Bytes.toBytes("row-" + count));
121        get.addColumn(family, qualifier);
122        for (final Table table : tables) {
123          table.get(get);
124        }
125        count += tables.length;
126      }
127    } catch (IOException e) {
128      LOG.error("get failed after nRetries=" + count, e);
129    }
130    return count;
131  }
132
133  static long doIncrements(int maxOps, byte[] family, byte[] qualifier, final Table... tables) {
134    int count = 0;
135    try {
136      while (count < maxOps) {
137        Increment inc = new Increment(Bytes.toBytes("row-" + count));
138        inc.addColumn(family, qualifier, 1L);
139        for (final Table table : tables) {
140          table.increment(inc);
141        }
142        count += tables.length;
143      }
144    } catch (IOException e) {
145      LOG.error("increment failed after nRetries=" + count, e);
146    }
147    return count;
148  }
149
150  static long doMultiGets(int maxOps, int batchSize, int rowCount, byte[] family, byte[] qualifier,
151    final Table... tables) {
152    int opCount = 0;
153    Random random = new Random();
154    try {
155      while (opCount < maxOps) {
156        List<Get> gets = new ArrayList<>(batchSize);
157        while (gets.size() < batchSize) {
158          Get get = new Get(Bytes.toBytes("row-" + random.nextInt(rowCount)));
159          get.addColumn(family, qualifier);
160          gets.add(get);
161        }
162        for (final Table table : tables) {
163          table.get(gets);
164        }
165        opCount += tables.length;
166      }
167    } catch (IOException e) {
168      LOG.error("multiget failed after nRetries=" + opCount, e);
169    }
170    return opCount;
171  }
172
173  static long doScans(int desiredRows, Table table, int caching) {
174    int count = 0;
175    try {
176      Scan scan = new Scan();
177      scan.setCaching(caching);
178      scan.setCacheBlocks(false);
179      ResultScanner scanner = table.getScanner(scan);
180      while (count < desiredRows) {
181        scanner.next();
182        count += 1;
183      }
184    } catch (IOException e) {
185      LOG.error("scan failed after nRetries=" + count, e);
186    }
187    return count;
188  }
189
190  static void triggerUserCacheRefresh(HBaseTestingUtil testUtil, boolean bypass,
191    TableName... tables) throws Exception {
192    triggerCacheRefresh(testUtil, bypass, true, false, false, false, false, tables);
193  }
194
195  static void triggerTableCacheRefresh(HBaseTestingUtil testUtil, boolean bypass,
196    TableName... tables) throws Exception {
197    triggerCacheRefresh(testUtil, bypass, false, true, false, false, false, tables);
198  }
199
200  static void triggerNamespaceCacheRefresh(HBaseTestingUtil testUtil, boolean bypass,
201    TableName... tables) throws Exception {
202    triggerCacheRefresh(testUtil, bypass, false, false, true, false, false, tables);
203  }
204
205  static void triggerRegionServerCacheRefresh(HBaseTestingUtil testUtil, boolean bypass)
206    throws Exception {
207    triggerCacheRefresh(testUtil, bypass, false, false, false, true, false);
208  }
209
210  static void triggerExceedThrottleQuotaCacheRefresh(HBaseTestingUtil testUtil,
211    boolean exceedEnabled) throws Exception {
212    triggerCacheRefresh(testUtil, exceedEnabled, false, false, false, false, true);
213  }
214
215  private static void triggerCacheRefresh(HBaseTestingUtil testUtil, boolean bypass,
216    boolean userLimiter, boolean tableLimiter, boolean nsLimiter, boolean rsLimiter,
217    boolean exceedThrottleQuota, final TableName... tables) throws Exception {
218    envEdge.incValue(2 * REFRESH_TIME);
219    for (RegionServerThread rst : testUtil.getMiniHBaseCluster().getRegionServerThreads()) {
220      RegionServerRpcQuotaManager quotaManager =
221        rst.getRegionServer().getRegionServerRpcQuotaManager();
222      QuotaCache quotaCache = quotaManager.getQuotaCache();
223      quotaCache.forceSynchronousCacheRefresh();
224      Thread.sleep(250);
225      testUtil.waitFor(60000, 250, new ExplainingPredicate<Exception>() {
226
227        @Override
228        public boolean evaluate() throws Exception {
229          boolean isUpdated = true;
230          for (TableName table : tables) {
231            if (userLimiter) {
232              boolean isUserBypass =
233                quotaCache.getUserLimiter(User.getCurrent().getUGI(), table).isBypass();
234              if (isUserBypass != bypass) {
235                LOG.info(
236                  "User limiter for user={}, table={} not refreshed, bypass expected {}, actual {}",
237                  User.getCurrent(), table, bypass, isUserBypass);
238                envEdge.incValue(100);
239                isUpdated = false;
240                break;
241              }
242            }
243            if (tableLimiter) {
244              boolean isTableBypass = quotaCache.getTableLimiter(table).isBypass();
245              if (isTableBypass != bypass) {
246                LOG.info("Table limiter for table={} not refreshed, bypass expected {}, actual {}",
247                  table, bypass, isTableBypass);
248                envEdge.incValue(100);
249                isUpdated = false;
250                break;
251              }
252            }
253            if (nsLimiter) {
254              boolean isNsBypass =
255                quotaCache.getNamespaceLimiter(table.getNamespaceAsString()).isBypass();
256              if (isNsBypass != bypass) {
257                LOG.info(
258                  "Namespace limiter for namespace={} not refreshed, bypass expected {}, actual {}",
259                  table.getNamespaceAsString(), bypass, isNsBypass);
260                envEdge.incValue(100);
261                isUpdated = false;
262                break;
263              }
264            }
265          }
266          if (rsLimiter) {
267            boolean rsIsBypass = quotaCache
268              .getRegionServerQuotaLimiter(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY).isBypass();
269            if (rsIsBypass != bypass) {
270              LOG.info("RegionServer limiter not refreshed, bypass expected {}, actual {}", bypass,
271                rsIsBypass);
272              envEdge.incValue(100);
273              isUpdated = false;
274            }
275          }
276          if (exceedThrottleQuota) {
277            if (quotaCache.isExceedThrottleQuotaEnabled() != bypass) {
278              LOG.info("ExceedThrottleQuotaEnabled not refreshed, bypass expected {}, actual {}",
279                bypass, quotaCache.isExceedThrottleQuotaEnabled());
280              envEdge.incValue(100);
281              isUpdated = false;
282            }
283          }
284          if (isUpdated) {
285            return true;
286          }
287          quotaCache.triggerCacheRefresh();
288          return false;
289        }
290
291        @Override
292        public String explainFailure() throws Exception {
293          return "Quota cache is still not refreshed";
294        }
295      });
296
297      LOG.debug("QuotaCache");
298      LOG.debug(Objects.toString(quotaCache.getNamespaceQuotaCache()));
299      LOG.debug(Objects.toString(quotaCache.getTableQuotaCache()));
300      LOG.debug(Objects.toString(quotaCache.getUserQuotaCache()));
301      LOG.debug(Objects.toString(quotaCache.getRegionServerQuotaCache()));
302    }
303  }
304
305  static Set<QuotaCache> getQuotaCaches(HBaseTestingUtil testUtil) {
306    Set<QuotaCache> quotaCaches = new HashSet<>();
307    for (RegionServerThread rst : testUtil.getMiniHBaseCluster().getRegionServerThreads()) {
308      RegionServerRpcQuotaManager quotaManager =
309        rst.getRegionServer().getRegionServerRpcQuotaManager();
310      quotaCaches.add(quotaManager.getQuotaCache());
311    }
312    return quotaCaches;
313  }
314
315  static void waitMinuteQuota() {
316    envEdge.incValue(70000);
317  }
318
319  static void clearQuotaCache(HBaseTestingUtil testUtil) {
320    for (RegionServerThread rst : testUtil.getMiniHBaseCluster().getRegionServerThreads()) {
321      RegionServerRpcQuotaManager quotaManager =
322        rst.getRegionServer().getRegionServerRpcQuotaManager();
323      QuotaCache quotaCache = quotaManager.getQuotaCache();
324      quotaCache.getNamespaceQuotaCache().clear();
325      quotaCache.getTableQuotaCache().clear();
326      quotaCache.getUserQuotaCache().clear();
327      quotaCache.getRegionServerQuotaCache().clear();
328    }
329  }
330}