001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import java.io.IOException;
021import java.util.concurrent.ThreadLocalRandom;
022import java.util.concurrent.atomic.AtomicLong;
023import org.apache.commons.lang3.ArrayUtils;
024import org.apache.hadoop.hbase.HBaseTestingUtil;
025import org.apache.hadoop.hbase.TableName;
026import org.apache.hadoop.hbase.client.RegionInfo;
027import org.apache.hadoop.hbase.client.RegionInfoBuilder;
028import org.apache.hadoop.hbase.procedure2.Procedure;
029import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
030import org.apache.hadoop.hbase.procedure2.util.StringUtils;
031import org.apache.hadoop.hbase.util.AbstractHBaseTool;
032import org.apache.hadoop.hbase.util.Bytes;
033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
034
035import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
036import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
037
038/**
039 * Tool to test performance of locks and queues in procedure scheduler independently from other
040 * framework components. Inserts table and region operations in the scheduler, then polls them and
041 * exercises their locks Number of tables, regions and operations can be set using cli args.
042 */
043public class MasterProcedureSchedulerPerformanceEvaluation extends AbstractHBaseTool {
044  protected static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
045
046  // Command line options and defaults.
047  public static final int DEFAULT_NUM_TABLES = 5;
048  public static final Option NUM_TABLES_OPTION = new Option("num_table", true,
049    "Number of tables to use for table operations. Default: " + DEFAULT_NUM_TABLES);
050  public static final int DEFAULT_REGIONS_PER_TABLE = 10;
051  public static final Option REGIONS_PER_TABLE_OPTION = new Option("regions_per_table", true,
052    "Total number of regions per table. Default: " + DEFAULT_REGIONS_PER_TABLE);
053  public static final int DEFAULT_NUM_OPERATIONS = 10000000; // 10M
054  public static final Option NUM_OPERATIONS_OPTION = new Option("num_ops", true,
055    "Total number of operations to schedule. Default: " + DEFAULT_NUM_OPERATIONS);
056  public static final int DEFAULT_NUM_THREADS = 10;
057  public static final Option NUM_THREADS_OPTION = new Option("threads", true,
058    "Number of procedure executor threads. Default: " + DEFAULT_NUM_THREADS);
059  public static final String DEFAULT_OPS_TYPE = "both";
060  public static final Option OPS_TYPE_OPTION = new Option("ops_type", true,
061    "Type of operations to run. Value can be table/region/both. In case of 'both', "
062      + "proportion of table:region ops is 1:regions_per_table. Default: " + DEFAULT_OPS_TYPE);
063
064  private int numTables = DEFAULT_NUM_TABLES;
065  private int regionsPerTable = DEFAULT_REGIONS_PER_TABLE;
066  private int numOps = DEFAULT_NUM_OPERATIONS;
067  private int numThreads = DEFAULT_NUM_THREADS;
068  private String opsType = DEFAULT_OPS_TYPE;
069
070  private MasterProcedureScheduler procedureScheduler;
071  // List of table/region procedures to schedule.
072  ProcedureFactory[] ops;
073
074  // Using factory pattern to build a collection of operations which can be executed in an
075  // abstract manner by worker threads.
076  private interface ProcedureFactory {
077    Procedure newProcedure(long procId);
078  }
079
080  private class RegionProcedure extends TestMasterProcedureScheduler.TestRegionProcedure {
081    RegionProcedure(long procId, RegionInfo hri) {
082      super(procId, hri.getTable(), TableOperationType.REGION_UNASSIGN, hri);
083    }
084
085    @Override
086    public LockState acquireLock(Void env) {
087      return procedureScheduler.waitRegions(this, getTableName(), getRegionInfo())
088        ? LockState.LOCK_EVENT_WAIT
089        : LockState.LOCK_ACQUIRED;
090    }
091
092    @Override
093    public void releaseLock(Void env) {
094      procedureScheduler.wakeRegions(this, getTableName(), getRegionInfo());
095    }
096  }
097
098  private class RegionProcedureFactory implements ProcedureFactory {
099    final RegionInfo hri;
100
101    RegionProcedureFactory(RegionInfo hri) {
102      this.hri = hri;
103    }
104
105    @Override
106    public Procedure newProcedure(long procId) {
107      return new RegionProcedure(procId, hri);
108    }
109  }
110
111  private class TableProcedure extends TestMasterProcedureScheduler.TestTableProcedure {
112
113    TableProcedure(long procId, TableName tableName) {
114      super(procId, tableName, TableOperationType.EDIT);
115    }
116
117    @Override
118    public LockState acquireLock(Void env) {
119      return procedureScheduler.waitTableExclusiveLock(this, getTableName())
120        ? LockState.LOCK_EVENT_WAIT
121        : LockState.LOCK_ACQUIRED;
122    }
123
124    @Override
125    public void releaseLock(Void env) {
126      procedureScheduler.wakeTableExclusiveLock(this, getTableName());
127    }
128  }
129
130  private class TableProcedureFactory implements ProcedureFactory {
131    final TableName tableName;
132
133    TableProcedureFactory(TableName tableName) {
134      this.tableName = tableName;
135    }
136
137    @Override
138    public Procedure newProcedure(long procId) {
139      return new TableProcedure(procId, tableName);
140    }
141  }
142
143  private void setupOperations() throws Exception {
144    // Create set of operations based on --ops_type command line argument.
145    final ProcedureFactory[] tableOps = new ProcedureFactory[numTables];
146    for (int i = 0; i < numTables; ++i) {
147      tableOps[i] = new TableProcedureFactory(TableName.valueOf("testTableLock-" + i));
148    }
149
150    final ProcedureFactory[] regionOps = new ProcedureFactory[numTables * regionsPerTable];
151    for (int i = 0; i < numTables; ++i) {
152      for (int j = 0; j < regionsPerTable; ++j) {
153        regionOps[i * regionsPerTable + j] = new RegionProcedureFactory(
154          RegionInfoBuilder.newBuilder(((TableProcedureFactory) tableOps[i]).tableName)
155            .setStartKey(Bytes.toBytes(j)).setEndKey(Bytes.toBytes(j + 1)).build());
156      }
157    }
158
159    if (opsType.equals("table")) {
160      System.out.println("Operations: table only");
161      ops = tableOps;
162    } else if (opsType.equals("region")) {
163      System.out.println("Operations: region only");
164      ops = regionOps;
165    } else if (opsType.equals("both")) {
166      System.out.println("Operations: both (table + region)");
167      ops = (ProcedureFactory[]) ArrayUtils.addAll(tableOps, regionOps);
168    } else {
169      throw new Exception("-ops_type should be one of table/region/both.");
170    }
171  }
172
173  @Override
174  protected void addOptions() {
175    addOption(NUM_TABLES_OPTION);
176    addOption(REGIONS_PER_TABLE_OPTION);
177    addOption(NUM_OPERATIONS_OPTION);
178    addOption(NUM_THREADS_OPTION);
179    addOption(OPS_TYPE_OPTION);
180  }
181
182  @Override
183  protected void processOptions(CommandLine cmd) {
184    numTables = getOptionAsInt(cmd, NUM_TABLES_OPTION.getOpt(), DEFAULT_NUM_TABLES);
185    regionsPerTable =
186      getOptionAsInt(cmd, REGIONS_PER_TABLE_OPTION.getOpt(), DEFAULT_REGIONS_PER_TABLE);
187    numOps = getOptionAsInt(cmd, NUM_OPERATIONS_OPTION.getOpt(), DEFAULT_NUM_OPERATIONS);
188    numThreads = getOptionAsInt(cmd, NUM_THREADS_OPTION.getOpt(), DEFAULT_NUM_THREADS);
189    opsType = cmd.getOptionValue(OPS_TYPE_OPTION.getOpt(), DEFAULT_OPS_TYPE);
190  }
191
192  /*******************
193   * WORKERS
194   *******************/
195
196  private final AtomicLong procIds = new AtomicLong(0);
197  private final AtomicLong yield = new AtomicLong(0);
198  private final AtomicLong completed = new AtomicLong(0);
199
200  private class AddProcsWorker extends Thread {
201    @Override
202    public void run() {
203      long procId = procIds.incrementAndGet();
204      int index;
205      while (procId <= numOps) {
206        index = ThreadLocalRandom.current().nextInt(ops.length);
207        procedureScheduler.addBack(ops[index].newProcedure(procId));
208        procId = procIds.incrementAndGet();
209      }
210    }
211  }
212
213  private class PollAndLockWorker extends Thread {
214    @Override
215    public void run() {
216      while (completed.get() < numOps) {
217        // With lock/unlock being ~100ns, and no other workload, 1000ns wait seams reasonable.
218        TestProcedure proc = (TestProcedure) procedureScheduler.poll(1000);
219        if (proc == null) {
220          yield.incrementAndGet();
221          continue;
222        }
223
224        switch (proc.acquireLock(null)) {
225          case LOCK_ACQUIRED:
226            completed.incrementAndGet();
227            proc.releaseLock(null);
228            break;
229          case LOCK_YIELD_WAIT:
230            break;
231          case LOCK_EVENT_WAIT:
232            break;
233        }
234        if (completed.get() % 100000 == 0) {
235          System.out.println("Completed " + completed.get() + " procedures.");
236        }
237      }
238    }
239  }
240
241  /**
242   * Starts the threads and waits for them to finish.
243   * @return time taken by threads to complete, in milliseconds.
244   */
245  long runThreads(Thread[] threads) throws Exception {
246    final long startTime = EnvironmentEdgeManager.currentTime();
247    for (Thread t : threads) {
248      t.start();
249    }
250    for (Thread t : threads) {
251      t.join();
252    }
253    return EnvironmentEdgeManager.currentTime() - startTime;
254  }
255
256  @Override
257  protected int doWork() throws Exception {
258    procedureScheduler = new MasterProcedureScheduler(pid -> null);
259    procedureScheduler.start();
260    setupOperations();
261
262    final Thread[] threads = new Thread[numThreads];
263    for (int i = 0; i < numThreads; ++i) {
264      threads[i] = new AddProcsWorker();
265    }
266    final long addBackTime = runThreads(threads);
267    System.out.println("Added " + numOps + " procedures to scheduler.");
268
269    for (int i = 0; i < numThreads; ++i) {
270      threads[i] = new PollAndLockWorker();
271    }
272    final long pollTime = runThreads(threads);
273    procedureScheduler.stop();
274
275    final float pollTimeSec = pollTime / 1000.0f;
276    final float addBackTimeSec = addBackTime / 1000.0f;
277    System.out.println("******************************************");
278    System.out.println("Time - addBack     : " + StringUtils.humanTimeDiff(addBackTime));
279    System.out.println("Ops/sec - addBack  : " + StringUtils.humanSize(numOps / addBackTimeSec));
280    System.out.println("Time - poll        : " + StringUtils.humanTimeDiff(pollTime));
281    System.out.println("Ops/sec - poll     : " + StringUtils.humanSize(numOps / pollTimeSec));
282    System.out.println("Num Operations     : " + numOps);
283    System.out.println();
284    System.out.println("Completed          : " + completed.get());
285    System.out.println("Yield              : " + yield.get());
286    System.out.println();
287    System.out.println("Num Tables         : " + numTables);
288    System.out.println("Regions per table  : " + regionsPerTable);
289    System.out.println("Operations type    : " + opsType);
290    System.out.println("Threads            : " + numThreads);
291    System.out.println("******************************************");
292    System.out.println("Raw format for scripts");
293    System.out.println(String.format(
294      "RESULT [%s=%s, %s=%s, %s=%s, %s=%s, %s=%s, "
295        + "num_yield=%s, time_addback_ms=%s, time_poll_ms=%s]",
296      NUM_OPERATIONS_OPTION.getOpt(), numOps, OPS_TYPE_OPTION.getOpt(), opsType,
297      NUM_TABLES_OPTION.getOpt(), numTables, REGIONS_PER_TABLE_OPTION.getOpt(), regionsPerTable,
298      NUM_THREADS_OPTION.getOpt(), numThreads, yield.get(), addBackTime, pollTime));
299    return 0;
300  }
301
302  public static void main(String[] args) throws IOException {
303    MasterProcedureSchedulerPerformanceEvaluation tool =
304      new MasterProcedureSchedulerPerformanceEvaluation();
305    tool.setConf(UTIL.getConfiguration());
306    tool.run(args);
307  }
308}