001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.master.procedure;
020
021import java.io.IOException;
022import java.util.Random;
023import java.util.concurrent.atomic.AtomicLong;
024import org.apache.commons.lang3.ArrayUtils;
025import org.apache.hadoop.hbase.HBaseTestingUtil;
026import org.apache.hadoop.hbase.TableName;
027import org.apache.hadoop.hbase.client.RegionInfo;
028import org.apache.hadoop.hbase.client.RegionInfoBuilder;
029import org.apache.hadoop.hbase.procedure2.Procedure;
030import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
031import org.apache.hadoop.hbase.procedure2.util.StringUtils;
032import org.apache.hadoop.hbase.util.AbstractHBaseTool;
033import org.apache.hadoop.hbase.util.Bytes;
034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
035import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
036import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
037
038/**
039 * Tool to test performance of locks and queues in procedure scheduler independently from other
040 * framework components.
041 * Inserts table and region operations in the scheduler, then polls them and exercises their locks
042 * Number of tables, regions and operations can be set using cli args.
043 */
044public class MasterProcedureSchedulerPerformanceEvaluation extends AbstractHBaseTool {
045  protected static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
046
047  // Command line options and defaults.
048  public static final int DEFAULT_NUM_TABLES = 5;
049  public static final Option NUM_TABLES_OPTION = new Option("num_table", true,
050      "Number of tables to use for table operations. Default: " + DEFAULT_NUM_TABLES);
051  public static final int DEFAULT_REGIONS_PER_TABLE = 10;
052  public static final Option REGIONS_PER_TABLE_OPTION = new Option("regions_per_table", true,
053      "Total number of regions per table. Default: " + DEFAULT_REGIONS_PER_TABLE);
054  public static final int DEFAULT_NUM_OPERATIONS = 10000000;  // 10M
055  public static final Option NUM_OPERATIONS_OPTION = new Option("num_ops", true,
056      "Total number of operations to schedule. Default: " + DEFAULT_NUM_OPERATIONS);
057  public static final int DEFAULT_NUM_THREADS = 10;
058  public static final Option NUM_THREADS_OPTION = new Option("threads", true,
059      "Number of procedure executor threads. Default: " + DEFAULT_NUM_THREADS);
060  public static final String DEFAULT_OPS_TYPE = "both";
061  public static final Option OPS_TYPE_OPTION = new Option("ops_type", true,
062      "Type of operations to run. Value can be table/region/both. In case of 'both', "
063          + "proportion of table:region ops is 1:regions_per_table. Default: "
064          + DEFAULT_OPS_TYPE);
065
066  private int numTables = DEFAULT_NUM_TABLES;
067  private int regionsPerTable = DEFAULT_REGIONS_PER_TABLE;
068  private int numOps = DEFAULT_NUM_OPERATIONS;
069  private int numThreads = DEFAULT_NUM_THREADS;
070  private String opsType = DEFAULT_OPS_TYPE;
071
072  private MasterProcedureScheduler procedureScheduler;
073  // List of table/region procedures to schedule.
074  ProcedureFactory[] ops;
075
076  // Using factory pattern to build a collection of operations which can be executed in an
077  // abstract manner by worker threads.
078  private interface ProcedureFactory {
079    Procedure newProcedure(long procId);
080  }
081
082  private class RegionProcedure extends TestMasterProcedureScheduler.TestRegionProcedure {
083    RegionProcedure(long procId, RegionInfo hri) {
084      super(procId, hri.getTable(), TableOperationType.REGION_UNASSIGN, hri);
085    }
086
087    @Override
088    public LockState acquireLock(Void env) {
089      return procedureScheduler.waitRegions(this, getTableName(), getRegionInfo())?
090        LockState.LOCK_EVENT_WAIT: LockState.LOCK_ACQUIRED;
091    }
092
093    @Override
094    public void releaseLock(Void env) {
095      procedureScheduler.wakeRegions(this, getTableName(), getRegionInfo());
096    }
097  }
098
099  private class RegionProcedureFactory implements ProcedureFactory {
100    final RegionInfo hri;
101
102    RegionProcedureFactory(RegionInfo hri) {
103      this.hri = hri;
104    }
105
106    @Override
107    public Procedure newProcedure(long procId) {
108      return new RegionProcedure(procId, hri);
109    }
110  }
111
112  private class TableProcedure extends TestMasterProcedureScheduler.TestTableProcedure {
113
114    TableProcedure(long procId, TableName tableName) {
115      super(procId, tableName, TableOperationType.EDIT);
116    }
117
118    @Override
119    public LockState acquireLock(Void env) {
120      return procedureScheduler.waitTableExclusiveLock(this, getTableName())?
121        LockState.LOCK_EVENT_WAIT: LockState.LOCK_ACQUIRED;
122    }
123
124    @Override
125    public void releaseLock(Void env) {
126      procedureScheduler.wakeTableExclusiveLock(this, getTableName());
127    }
128  }
129
130  private class TableProcedureFactory implements ProcedureFactory {
131    final TableName tableName;
132
133    TableProcedureFactory(TableName tableName) {
134      this.tableName = tableName;
135    }
136
137    @Override
138    public Procedure newProcedure(long procId) {
139      return new TableProcedure(procId, tableName);
140    }
141  }
142
143  private void setupOperations() throws Exception {
144    // Create set of operations based on --ops_type command line argument.
145    final ProcedureFactory[] tableOps = new ProcedureFactory[numTables];
146    for (int i = 0; i < numTables; ++i) {
147      tableOps[i] = new TableProcedureFactory(TableName.valueOf("testTableLock-" + i));
148    }
149
150    final ProcedureFactory[] regionOps = new ProcedureFactory[numTables * regionsPerTable];
151    for (int i = 0; i < numTables; ++i) {
152      for (int j = 0; j < regionsPerTable; ++j) {
153        regionOps[i * regionsPerTable + j] = new RegionProcedureFactory(
154          RegionInfoBuilder.newBuilder(((TableProcedureFactory) tableOps[i]).tableName)
155            .setStartKey(Bytes.toBytes(j)).setEndKey(Bytes.toBytes(j + 1)).build());
156      }
157    }
158
159    if (opsType.equals("table")) {
160      System.out.println("Operations: table only");
161      ops = tableOps;
162    } else if (opsType.equals("region")) {
163      System.out.println("Operations: region only");
164      ops = regionOps;
165    } else if (opsType.equals("both")) {
166      System.out.println("Operations: both (table + region)");
167      ops = (ProcedureFactory[])ArrayUtils.addAll(tableOps, regionOps);
168    } else {
169      throw new Exception("-ops_type should be one of table/region/both.");
170    }
171  }
172
173  @Override
174  protected void addOptions() {
175    addOption(NUM_TABLES_OPTION);
176    addOption(REGIONS_PER_TABLE_OPTION);
177    addOption(NUM_OPERATIONS_OPTION);
178    addOption(NUM_THREADS_OPTION);
179    addOption(OPS_TYPE_OPTION);
180  }
181
182  @Override
183  protected void processOptions(CommandLine cmd) {
184    numTables = getOptionAsInt(cmd, NUM_TABLES_OPTION.getOpt(), DEFAULT_NUM_TABLES);
185    regionsPerTable = getOptionAsInt(cmd, REGIONS_PER_TABLE_OPTION.getOpt(),
186        DEFAULT_REGIONS_PER_TABLE);
187    numOps = getOptionAsInt(cmd, NUM_OPERATIONS_OPTION.getOpt(),
188        DEFAULT_NUM_OPERATIONS);
189    numThreads = getOptionAsInt(cmd, NUM_THREADS_OPTION.getOpt(), DEFAULT_NUM_THREADS);
190    opsType = cmd.getOptionValue(OPS_TYPE_OPTION.getOpt(), DEFAULT_OPS_TYPE);
191  }
192
193  /*******************
194   * WORKERS
195   *******************/
196
197  private final AtomicLong procIds = new AtomicLong(0);
198  private final AtomicLong yield = new AtomicLong(0);
199  private final AtomicLong completed = new AtomicLong(0);
200
201  private class AddProcsWorker extends Thread {
202    @Override
203    public void run() {
204      final Random rand = new Random(EnvironmentEdgeManager.currentTime());
205      long procId = procIds.incrementAndGet();
206      int index;
207      while (procId <= numOps) {
208        index = rand.nextInt(ops.length);
209        procedureScheduler.addBack(ops[index].newProcedure(procId));
210        procId = procIds.incrementAndGet();
211      }
212    }
213  }
214
215  private class PollAndLockWorker extends Thread {
216    @Override
217    public void run() {
218      while (completed.get() < numOps) {
219        // With lock/unlock being ~100ns, and no other workload, 1000ns wait seams reasonable.
220        TestProcedure proc = (TestProcedure)procedureScheduler.poll(1000);
221        if (proc == null) {
222          yield.incrementAndGet();
223          continue;
224        }
225
226        switch (proc.acquireLock(null)) {
227          case LOCK_ACQUIRED:
228            completed.incrementAndGet();
229            proc.releaseLock(null);
230            break;
231          case LOCK_YIELD_WAIT:
232            break;
233          case LOCK_EVENT_WAIT:
234            break;
235        }
236        if (completed.get() % 100000 == 0) {
237          System.out.println("Completed " + completed.get() + " procedures.");
238        }
239      }
240    }
241  }
242
243  /**
244   * Starts the threads and waits for them to finish.
245   * @return time taken by threads to complete, in milliseconds.
246   */
247  long runThreads(Thread[] threads) throws Exception {
248    final long startTime = EnvironmentEdgeManager.currentTime();
249    for (Thread t : threads) {
250      t.start();
251    }
252    for (Thread t : threads) {
253      t.join();
254    }
255    return EnvironmentEdgeManager.currentTime() - startTime;
256  }
257
258  @Override
259  protected int doWork() throws Exception {
260    procedureScheduler = new MasterProcedureScheduler(pid -> null);
261    procedureScheduler.start();
262    setupOperations();
263
264    final Thread[] threads = new Thread[numThreads];
265    for (int i = 0; i < numThreads; ++i) {
266      threads[i] = new AddProcsWorker();
267    }
268    final long addBackTime = runThreads(threads);
269    System.out.println("Added " + numOps + " procedures to scheduler.");
270
271    for (int i = 0; i < numThreads; ++i) {
272      threads[i] = new PollAndLockWorker();
273    }
274    final long pollTime = runThreads(threads);
275    procedureScheduler.stop();
276
277    final float pollTimeSec = pollTime / 1000.0f;
278    final float addBackTimeSec = addBackTime / 1000.0f;
279    System.out.println("******************************************");
280    System.out.println("Time - addBack     : " + StringUtils.humanTimeDiff(addBackTime));
281    System.out.println("Ops/sec - addBack  : " + StringUtils.humanSize(numOps / addBackTimeSec));
282    System.out.println("Time - poll        : " + StringUtils.humanTimeDiff(pollTime));
283    System.out.println("Ops/sec - poll     : " + StringUtils.humanSize(numOps / pollTimeSec));
284    System.out.println("Num Operations     : " + numOps);
285    System.out.println();
286    System.out.println("Completed          : " + completed.get());
287    System.out.println("Yield              : " + yield.get());
288    System.out.println();
289    System.out.println("Num Tables         : " + numTables);
290    System.out.println("Regions per table  : " + regionsPerTable);
291    System.out.println("Operations type    : " + opsType);
292    System.out.println("Threads            : " + numThreads);
293    System.out.println("******************************************");
294    System.out.println("Raw format for scripts");
295    System.out.println(String.format("RESULT [%s=%s, %s=%s, %s=%s, %s=%s, %s=%s, "
296            + "num_yield=%s, time_addback_ms=%s, time_poll_ms=%s]",
297        NUM_OPERATIONS_OPTION.getOpt(), numOps, OPS_TYPE_OPTION.getOpt(), opsType,
298        NUM_TABLES_OPTION.getOpt(), numTables, REGIONS_PER_TABLE_OPTION.getOpt(), regionsPerTable,
299        NUM_THREADS_OPTION.getOpt(), numThreads, yield.get(), addBackTime, pollTime));
300    return 0;
301  }
302
303  public static void main(String[] args) throws IOException {
304    MasterProcedureSchedulerPerformanceEvaluation tool =
305        new MasterProcedureSchedulerPerformanceEvaluation();
306    tool.setConf(UTIL.getConfiguration());
307    tool.run(args);
308  }
309}