001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.master.procedure;
020
021import java.io.IOException;
022import java.util.concurrent.atomic.AtomicLong;
023import java.util.Random;
024
025import org.apache.commons.lang3.ArrayUtils;
026import org.apache.hadoop.hbase.HBaseTestingUtility;
027import org.apache.hadoop.hbase.HRegionInfo;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.procedure2.Procedure;
030import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
031import org.apache.hadoop.hbase.procedure2.util.StringUtils;
032import org.apache.hadoop.hbase.util.AbstractHBaseTool;
033import org.apache.hadoop.hbase.util.Bytes;
034import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
035import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
036
037/**
038 * Tool to test performance of locks and queues in procedure scheduler independently from other
039 * framework components.
040 * Inserts table and region operations in the scheduler, then polls them and exercises their locks
041 * Number of tables, regions and operations can be set using cli args.
042 */
043public class MasterProcedureSchedulerPerformanceEvaluation extends AbstractHBaseTool {
044  protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
045
046  // Command line options and defaults.
047  public static final int DEFAULT_NUM_TABLES = 5;
048  public static final Option NUM_TABLES_OPTION = new Option("num_table", true,
049      "Number of tables to use for table operations. Default: " + DEFAULT_NUM_TABLES);
050  public static final int DEFAULT_REGIONS_PER_TABLE = 10;
051  public static final Option REGIONS_PER_TABLE_OPTION = new Option("regions_per_table", true,
052      "Total number of regions per table. Default: " + DEFAULT_REGIONS_PER_TABLE);
053  public static final int DEFAULT_NUM_OPERATIONS = 10000000;  // 10M
054  public static final Option NUM_OPERATIONS_OPTION = new Option("num_ops", true,
055      "Total number of operations to schedule. Default: " + DEFAULT_NUM_OPERATIONS);
056  public static final int DEFAULT_NUM_THREADS = 10;
057  public static final Option NUM_THREADS_OPTION = new Option("threads", true,
058      "Number of procedure executor threads. Default: " + DEFAULT_NUM_THREADS);
059  public static final String DEFAULT_OPS_TYPE = "both";
060  public static final Option OPS_TYPE_OPTION = new Option("ops_type", true,
061      "Type of operations to run. Value can be table/region/both. In case of 'both', "
062          + "proportion of table:region ops is 1:regions_per_table. Default: "
063          + DEFAULT_OPS_TYPE);
064
065  private int numTables = DEFAULT_NUM_TABLES;
066  private int regionsPerTable = DEFAULT_REGIONS_PER_TABLE;
067  private int numOps = DEFAULT_NUM_OPERATIONS;
068  private int numThreads = DEFAULT_NUM_THREADS;
069  private String opsType = DEFAULT_OPS_TYPE;
070
071  private MasterProcedureScheduler procedureScheduler;
072  // List of table/region procedures to schedule.
073  ProcedureFactory[] ops;
074
075  // Using factory pattern to build a collection of operations which can be executed in an
076  // abstract manner by worker threads.
077  private interface ProcedureFactory {
078    Procedure newProcedure(long procId);
079  }
080
081  private class RegionProcedure extends TestMasterProcedureScheduler.TestRegionProcedure {
082    RegionProcedure(long procId, HRegionInfo hri) {
083      super(procId, hri.getTable(), TableOperationType.REGION_UNASSIGN, hri);
084    }
085
086    @Override
087    public LockState acquireLock(Void env) {
088      return procedureScheduler.waitRegions(this, getTableName(), getRegionInfo())?
089        LockState.LOCK_EVENT_WAIT: LockState.LOCK_ACQUIRED;
090    }
091
092    @Override
093    public void releaseLock(Void env) {
094      procedureScheduler.wakeRegions(this, getTableName(), getRegionInfo());
095    }
096  }
097
098  private class RegionProcedureFactory implements ProcedureFactory {
099    final HRegionInfo hri;
100
101    RegionProcedureFactory(HRegionInfo hri) {
102      this.hri = hri;
103    }
104
105    @Override
106    public Procedure newProcedure(long procId) {
107      return new RegionProcedure(procId, hri);
108    }
109  }
110
111  private class TableProcedure extends TestMasterProcedureScheduler.TestTableProcedure {
112
113    TableProcedure(long procId, TableName tableName) {
114      super(procId, tableName, TableOperationType.EDIT);
115    }
116
117    @Override
118    public LockState acquireLock(Void env) {
119      return procedureScheduler.waitTableExclusiveLock(this, getTableName())?
120        LockState.LOCK_EVENT_WAIT: LockState.LOCK_ACQUIRED;
121    }
122
123    @Override
124    public void releaseLock(Void env) {
125      procedureScheduler.wakeTableExclusiveLock(this, getTableName());
126    }
127  }
128
129  private class TableProcedureFactory implements ProcedureFactory {
130    final TableName tableName;
131
132    TableProcedureFactory(TableName tableName) {
133      this.tableName = tableName;
134    }
135
136    @Override
137    public Procedure newProcedure(long procId) {
138      return new TableProcedure(procId, tableName);
139    }
140  }
141
142  private void setupOperations() throws Exception {
143    // Create set of operations based on --ops_type command line argument.
144    final ProcedureFactory[] tableOps = new ProcedureFactory[numTables];
145    for (int i = 0; i < numTables; ++i) {
146      tableOps[i] = new TableProcedureFactory(TableName.valueOf("testTableLock-" + i));
147    }
148
149    final ProcedureFactory[] regionOps = new ProcedureFactory[numTables * regionsPerTable];
150    for (int i = 0; i < numTables; ++i) {
151      for (int j = 0; j < regionsPerTable; ++j) {
152        regionOps[i * regionsPerTable + j] = new RegionProcedureFactory(
153            new HRegionInfo(((TableProcedureFactory)tableOps[i]).tableName, Bytes.toBytes(j),
154                Bytes.toBytes(j + 1)));
155      }
156    }
157
158    if (opsType.equals("table")) {
159      System.out.println("Operations: table only");
160      ops = tableOps;
161    } else if (opsType.equals("region")) {
162      System.out.println("Operations: region only");
163      ops = regionOps;
164    } else if (opsType.equals("both")) {
165      System.out.println("Operations: both (table + region)");
166      ops = (ProcedureFactory[])ArrayUtils.addAll(tableOps, regionOps);
167    } else {
168      throw new Exception("-ops_type should be one of table/region/both.");
169    }
170  }
171
172  @Override
173  protected void addOptions() {
174    addOption(NUM_TABLES_OPTION);
175    addOption(REGIONS_PER_TABLE_OPTION);
176    addOption(NUM_OPERATIONS_OPTION);
177    addOption(NUM_THREADS_OPTION);
178    addOption(OPS_TYPE_OPTION);
179  }
180
181  @Override
182  protected void processOptions(CommandLine cmd) {
183    numTables = getOptionAsInt(cmd, NUM_TABLES_OPTION.getOpt(), DEFAULT_NUM_TABLES);
184    regionsPerTable = getOptionAsInt(cmd, REGIONS_PER_TABLE_OPTION.getOpt(),
185        DEFAULT_REGIONS_PER_TABLE);
186    numOps = getOptionAsInt(cmd, NUM_OPERATIONS_OPTION.getOpt(),
187        DEFAULT_NUM_OPERATIONS);
188    numThreads = getOptionAsInt(cmd, NUM_THREADS_OPTION.getOpt(), DEFAULT_NUM_THREADS);
189    opsType = cmd.getOptionValue(OPS_TYPE_OPTION.getOpt(), DEFAULT_OPS_TYPE);
190  }
191
192  /*******************
193   * WORKERS
194   *******************/
195
196  private final AtomicLong procIds = new AtomicLong(0);
197  private final AtomicLong yield = new AtomicLong(0);
198  private final AtomicLong completed = new AtomicLong(0);
199
200  private class AddProcsWorker extends Thread {
201    @Override
202    public void run() {
203      final Random rand = new Random(System.currentTimeMillis());
204      long procId = procIds.incrementAndGet();
205      int index;
206      while (procId <= numOps) {
207        index = rand.nextInt(ops.length);
208        procedureScheduler.addBack(ops[index].newProcedure(procId));
209        procId = procIds.incrementAndGet();
210      }
211    }
212  }
213
214  private class PollAndLockWorker extends Thread {
215    @Override
216    public void run() {
217      while (completed.get() < numOps) {
218        // With lock/unlock being ~100ns, and no other workload, 1000ns wait seams reasonable.
219        TestProcedure proc = (TestProcedure)procedureScheduler.poll(1000);
220        if (proc == null) {
221          yield.incrementAndGet();
222          continue;
223        }
224
225        switch (proc.acquireLock(null)) {
226          case LOCK_ACQUIRED:
227            completed.incrementAndGet();
228            proc.releaseLock(null);
229            break;
230          case LOCK_YIELD_WAIT:
231            break;
232          case LOCK_EVENT_WAIT:
233            break;
234        }
235        if (completed.get() % 100000 == 0) {
236          System.out.println("Completed " + completed.get() + " procedures.");
237        }
238      }
239    }
240  }
241
242  /**
243   * Starts the threads and waits for them to finish.
244   * @return time taken by threads to complete, in milliseconds.
245   */
246  long runThreads(Thread[] threads) throws Exception {
247    final long startTime = System.currentTimeMillis();
248    for (Thread t : threads) {
249      t.start();
250    }
251    for (Thread t : threads) {
252      t.join();
253    }
254    return System.currentTimeMillis() - startTime;
255  }
256
257  @Override
258  protected int doWork() throws Exception {
259    procedureScheduler = new MasterProcedureScheduler(pid -> null);
260    procedureScheduler.start();
261    setupOperations();
262
263    final Thread[] threads = new Thread[numThreads];
264    for (int i = 0; i < numThreads; ++i) {
265      threads[i] = new AddProcsWorker();
266    }
267    final long addBackTime = runThreads(threads);
268    System.out.println("Added " + numOps + " procedures to scheduler.");
269
270    for (int i = 0; i < numThreads; ++i) {
271      threads[i] = new PollAndLockWorker();
272    }
273    final long pollTime = runThreads(threads);
274    procedureScheduler.stop();
275
276    final float pollTimeSec = pollTime / 1000.0f;
277    final float addBackTimeSec = addBackTime / 1000.0f;
278    System.out.println("******************************************");
279    System.out.println("Time - addBack     : " + StringUtils.humanTimeDiff(addBackTime));
280    System.out.println("Ops/sec - addBack  : " + StringUtils.humanSize(numOps / addBackTimeSec));
281    System.out.println("Time - poll        : " + StringUtils.humanTimeDiff(pollTime));
282    System.out.println("Ops/sec - poll     : " + StringUtils.humanSize(numOps / pollTimeSec));
283    System.out.println("Num Operations     : " + numOps);
284    System.out.println();
285    System.out.println("Completed          : " + completed.get());
286    System.out.println("Yield              : " + yield.get());
287    System.out.println();
288    System.out.println("Num Tables         : " + numTables);
289    System.out.println("Regions per table  : " + regionsPerTable);
290    System.out.println("Operations type    : " + opsType);
291    System.out.println("Threads            : " + numThreads);
292    System.out.println("******************************************");
293    System.out.println("Raw format for scripts");
294    System.out.println(String.format("RESULT [%s=%s, %s=%s, %s=%s, %s=%s, %s=%s, "
295            + "num_yield=%s, time_addback_ms=%s, time_poll_ms=%s]",
296        NUM_OPERATIONS_OPTION.getOpt(), numOps, OPS_TYPE_OPTION.getOpt(), opsType,
297        NUM_TABLES_OPTION.getOpt(), numTables, REGIONS_PER_TABLE_OPTION.getOpt(), regionsPerTable,
298        NUM_THREADS_OPTION.getOpt(), numThreads, yield.get(), addBackTime, pollTime));
299    return 0;
300  }
301
302  public static void main(String[] args) throws IOException {
303    MasterProcedureSchedulerPerformanceEvaluation tool =
304        new MasterProcedureSchedulerPerformanceEvaluation();
305    tool.setConf(UTIL.getConfiguration());
306    tool.run(args);
307  }
308}