001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import java.io.IOException;
021import java.util.concurrent.ThreadLocalRandom;
022import java.util.concurrent.atomic.AtomicLong;
023import org.apache.commons.lang3.ArrayUtils;
024import org.apache.hadoop.hbase.HBaseTestingUtility;
025import org.apache.hadoop.hbase.HRegionInfo;
026import org.apache.hadoop.hbase.TableName;
027import org.apache.hadoop.hbase.procedure2.Procedure;
028import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
029import org.apache.hadoop.hbase.procedure2.util.StringUtils;
030import org.apache.hadoop.hbase.util.AbstractHBaseTool;
031import org.apache.hadoop.hbase.util.Bytes;
032import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
033
034import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
035import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
036
037/**
038 * Tool to test performance of locks and queues in procedure scheduler independently from other
039 * framework components. Inserts table and region operations in the scheduler, then polls them and
040 * exercises their locks Number of tables, regions and operations can be set using cli args.
041 */
042public class MasterProcedureSchedulerPerformanceEvaluation extends AbstractHBaseTool {
043  protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
044
045  // Command line options and defaults.
046  public static final int DEFAULT_NUM_TABLES = 5;
047  public static final Option NUM_TABLES_OPTION = new Option("num_table", true,
048    "Number of tables to use for table operations. Default: " + DEFAULT_NUM_TABLES);
049  public static final int DEFAULT_REGIONS_PER_TABLE = 10;
050  public static final Option REGIONS_PER_TABLE_OPTION = new Option("regions_per_table", true,
051    "Total number of regions per table. Default: " + DEFAULT_REGIONS_PER_TABLE);
052  public static final int DEFAULT_NUM_OPERATIONS = 10000000; // 10M
053  public static final Option NUM_OPERATIONS_OPTION = new Option("num_ops", true,
054    "Total number of operations to schedule. Default: " + DEFAULT_NUM_OPERATIONS);
055  public static final int DEFAULT_NUM_THREADS = 10;
056  public static final Option NUM_THREADS_OPTION = new Option("threads", true,
057    "Number of procedure executor threads. Default: " + DEFAULT_NUM_THREADS);
058  public static final String DEFAULT_OPS_TYPE = "both";
059  public static final Option OPS_TYPE_OPTION = new Option("ops_type", true,
060    "Type of operations to run. Value can be table/region/both. In case of 'both', "
061      + "proportion of table:region ops is 1:regions_per_table. Default: " + DEFAULT_OPS_TYPE);
062
063  private int numTables = DEFAULT_NUM_TABLES;
064  private int regionsPerTable = DEFAULT_REGIONS_PER_TABLE;
065  private int numOps = DEFAULT_NUM_OPERATIONS;
066  private int numThreads = DEFAULT_NUM_THREADS;
067  private String opsType = DEFAULT_OPS_TYPE;
068
069  private MasterProcedureScheduler procedureScheduler;
070  // List of table/region procedures to schedule.
071  ProcedureFactory[] ops;
072
073  // Using factory pattern to build a collection of operations which can be executed in an
074  // abstract manner by worker threads.
075  private interface ProcedureFactory {
076    Procedure newProcedure(long procId);
077  }
078
079  private class RegionProcedure extends TestMasterProcedureScheduler.TestRegionProcedure {
080    RegionProcedure(long procId, HRegionInfo hri) {
081      super(procId, hri.getTable(), TableOperationType.REGION_UNASSIGN, hri);
082    }
083
084    @Override
085    public LockState acquireLock(Void env) {
086      return procedureScheduler.waitRegions(this, getTableName(), getRegionInfo())
087        ? LockState.LOCK_EVENT_WAIT
088        : LockState.LOCK_ACQUIRED;
089    }
090
091    @Override
092    public void releaseLock(Void env) {
093      procedureScheduler.wakeRegions(this, getTableName(), getRegionInfo());
094    }
095  }
096
097  private class RegionProcedureFactory implements ProcedureFactory {
098    final HRegionInfo hri;
099
100    RegionProcedureFactory(HRegionInfo hri) {
101      this.hri = hri;
102    }
103
104    @Override
105    public Procedure newProcedure(long procId) {
106      return new RegionProcedure(procId, hri);
107    }
108  }
109
110  private class TableProcedure extends TestMasterProcedureScheduler.TestTableProcedure {
111
112    TableProcedure(long procId, TableName tableName) {
113      super(procId, tableName, TableOperationType.EDIT);
114    }
115
116    @Override
117    public LockState acquireLock(Void env) {
118      return procedureScheduler.waitTableExclusiveLock(this, getTableName())
119        ? LockState.LOCK_EVENT_WAIT
120        : LockState.LOCK_ACQUIRED;
121    }
122
123    @Override
124    public void releaseLock(Void env) {
125      procedureScheduler.wakeTableExclusiveLock(this, getTableName());
126    }
127  }
128
129  private class TableProcedureFactory implements ProcedureFactory {
130    final TableName tableName;
131
132    TableProcedureFactory(TableName tableName) {
133      this.tableName = tableName;
134    }
135
136    @Override
137    public Procedure newProcedure(long procId) {
138      return new TableProcedure(procId, tableName);
139    }
140  }
141
142  private void setupOperations() throws Exception {
143    // Create set of operations based on --ops_type command line argument.
144    final ProcedureFactory[] tableOps = new ProcedureFactory[numTables];
145    for (int i = 0; i < numTables; ++i) {
146      tableOps[i] = new TableProcedureFactory(TableName.valueOf("testTableLock-" + i));
147    }
148
149    final ProcedureFactory[] regionOps = new ProcedureFactory[numTables * regionsPerTable];
150    for (int i = 0; i < numTables; ++i) {
151      for (int j = 0; j < regionsPerTable; ++j) {
152        regionOps[i * regionsPerTable + j] = new RegionProcedureFactory(new HRegionInfo(
153          ((TableProcedureFactory) tableOps[i]).tableName, Bytes.toBytes(j), Bytes.toBytes(j + 1)));
154      }
155    }
156
157    if (opsType.equals("table")) {
158      System.out.println("Operations: table only");
159      ops = tableOps;
160    } else if (opsType.equals("region")) {
161      System.out.println("Operations: region only");
162      ops = regionOps;
163    } else if (opsType.equals("both")) {
164      System.out.println("Operations: both (table + region)");
165      ops = (ProcedureFactory[]) ArrayUtils.addAll(tableOps, regionOps);
166    } else {
167      throw new Exception("-ops_type should be one of table/region/both.");
168    }
169  }
170
171  @Override
172  protected void addOptions() {
173    addOption(NUM_TABLES_OPTION);
174    addOption(REGIONS_PER_TABLE_OPTION);
175    addOption(NUM_OPERATIONS_OPTION);
176    addOption(NUM_THREADS_OPTION);
177    addOption(OPS_TYPE_OPTION);
178  }
179
180  @Override
181  protected void processOptions(CommandLine cmd) {
182    numTables = getOptionAsInt(cmd, NUM_TABLES_OPTION.getOpt(), DEFAULT_NUM_TABLES);
183    regionsPerTable =
184      getOptionAsInt(cmd, REGIONS_PER_TABLE_OPTION.getOpt(), DEFAULT_REGIONS_PER_TABLE);
185    numOps = getOptionAsInt(cmd, NUM_OPERATIONS_OPTION.getOpt(), DEFAULT_NUM_OPERATIONS);
186    numThreads = getOptionAsInt(cmd, NUM_THREADS_OPTION.getOpt(), DEFAULT_NUM_THREADS);
187    opsType = cmd.getOptionValue(OPS_TYPE_OPTION.getOpt(), DEFAULT_OPS_TYPE);
188  }
189
190  /*******************
191   * WORKERS
192   *******************/
193
194  private final AtomicLong procIds = new AtomicLong(0);
195  private final AtomicLong yield = new AtomicLong(0);
196  private final AtomicLong completed = new AtomicLong(0);
197
198  private class AddProcsWorker extends Thread {
199    @Override
200    public void run() {
201      long procId = procIds.incrementAndGet();
202      int index;
203      while (procId <= numOps) {
204        index = ThreadLocalRandom.current().nextInt(ops.length);
205        procedureScheduler.addBack(ops[index].newProcedure(procId));
206        procId = procIds.incrementAndGet();
207      }
208    }
209  }
210
211  private class PollAndLockWorker extends Thread {
212    @Override
213    public void run() {
214      while (completed.get() < numOps) {
215        // With lock/unlock being ~100ns, and no other workload, 1000ns wait seams reasonable.
216        TestProcedure proc = (TestProcedure) procedureScheduler.poll(1000);
217        if (proc == null) {
218          yield.incrementAndGet();
219          continue;
220        }
221
222        switch (proc.acquireLock(null)) {
223          case LOCK_ACQUIRED:
224            completed.incrementAndGet();
225            proc.releaseLock(null);
226            break;
227          case LOCK_YIELD_WAIT:
228            break;
229          case LOCK_EVENT_WAIT:
230            break;
231        }
232        if (completed.get() % 100000 == 0) {
233          System.out.println("Completed " + completed.get() + " procedures.");
234        }
235      }
236    }
237  }
238
239  /**
240   * Starts the threads and waits for them to finish.
241   * @return time taken by threads to complete, in milliseconds.
242   */
243  long runThreads(Thread[] threads) throws Exception {
244    final long startTime = EnvironmentEdgeManager.currentTime();
245    for (Thread t : threads) {
246      t.start();
247    }
248    for (Thread t : threads) {
249      t.join();
250    }
251    return EnvironmentEdgeManager.currentTime() - startTime;
252  }
253
254  @Override
255  protected int doWork() throws Exception {
256    procedureScheduler = new MasterProcedureScheduler(pid -> null);
257    procedureScheduler.start();
258    setupOperations();
259
260    final Thread[] threads = new Thread[numThreads];
261    for (int i = 0; i < numThreads; ++i) {
262      threads[i] = new AddProcsWorker();
263    }
264    final long addBackTime = runThreads(threads);
265    System.out.println("Added " + numOps + " procedures to scheduler.");
266
267    for (int i = 0; i < numThreads; ++i) {
268      threads[i] = new PollAndLockWorker();
269    }
270    final long pollTime = runThreads(threads);
271    procedureScheduler.stop();
272
273    final float pollTimeSec = pollTime / 1000.0f;
274    final float addBackTimeSec = addBackTime / 1000.0f;
275    System.out.println("******************************************");
276    System.out.println("Time - addBack     : " + StringUtils.humanTimeDiff(addBackTime));
277    System.out.println("Ops/sec - addBack  : " + StringUtils.humanSize(numOps / addBackTimeSec));
278    System.out.println("Time - poll        : " + StringUtils.humanTimeDiff(pollTime));
279    System.out.println("Ops/sec - poll     : " + StringUtils.humanSize(numOps / pollTimeSec));
280    System.out.println("Num Operations     : " + numOps);
281    System.out.println();
282    System.out.println("Completed          : " + completed.get());
283    System.out.println("Yield              : " + yield.get());
284    System.out.println();
285    System.out.println("Num Tables         : " + numTables);
286    System.out.println("Regions per table  : " + regionsPerTable);
287    System.out.println("Operations type    : " + opsType);
288    System.out.println("Threads            : " + numThreads);
289    System.out.println("******************************************");
290    System.out.println("Raw format for scripts");
291    System.out.println(String.format(
292      "RESULT [%s=%s, %s=%s, %s=%s, %s=%s, %s=%s, "
293        + "num_yield=%s, time_addback_ms=%s, time_poll_ms=%s]",
294      NUM_OPERATIONS_OPTION.getOpt(), numOps, OPS_TYPE_OPTION.getOpt(), opsType,
295      NUM_TABLES_OPTION.getOpt(), numTables, REGIONS_PER_TABLE_OPTION.getOpt(), regionsPerTable,
296      NUM_THREADS_OPTION.getOpt(), numThreads, yield.get(), addBackTime, pollTime));
297    return 0;
298  }
299
300  public static void main(String[] args) throws IOException {
301    MasterProcedureSchedulerPerformanceEvaluation tool =
302      new MasterProcedureSchedulerPerformanceEvaluation();
303    tool.setConf(UTIL.getConfiguration());
304    tool.run(args);
305  }
306}