001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import java.io.IOException;
021import java.io.InputStream;
022import java.lang.reflect.Constructor;
023import java.lang.reflect.Modifier;
024import java.util.concurrent.TimeUnit;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.HConstants;
027import org.apache.hadoop.hbase.util.NonceKey;
028import org.apache.hadoop.hbase.util.RetryCounter;
029import org.apache.hadoop.hbase.util.RetryCounter.ExponentialBackoffPolicyWithLimit;
030import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig;
031import org.apache.yetus.audience.InterfaceAudience;
032
033import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
034import org.apache.hbase.thirdparty.com.google.protobuf.Any;
035import org.apache.hbase.thirdparty.com.google.protobuf.Internal;
036import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
037import org.apache.hbase.thirdparty.com.google.protobuf.Message;
038import org.apache.hbase.thirdparty.com.google.protobuf.Parser;
039import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
040
041import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos;
042import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
043
044/**
045 * Helper to convert to/from ProcedureProtos
046 */
047@InterfaceAudience.Private
048public final class ProcedureUtil {
049
050  private ProcedureUtil() {
051  }
052
053  // ==========================================================================
054  // Reflection helpers to create/validate a Procedure object
055  // ==========================================================================
056  private static Procedure<?> newProcedure(String className) throws BadProcedureException {
057    try {
058      Class<?> clazz = Class.forName(className);
059      if (!Modifier.isPublic(clazz.getModifiers())) {
060        throw new Exception("the " + clazz + " class is not public");
061      }
062
063      @SuppressWarnings("rawtypes")
064      Constructor<? extends Procedure> ctor = clazz.asSubclass(Procedure.class).getConstructor();
065      assert ctor != null : "no constructor found";
066      if (!Modifier.isPublic(ctor.getModifiers())) {
067        throw new Exception("the " + clazz + " constructor is not public");
068      }
069      return ctor.newInstance();
070    } catch (Exception e) {
071      throw new BadProcedureException(
072        "The procedure class " + className + " must be accessible and have an empty constructor",
073        e);
074    }
075  }
076
077  static void validateClass(Procedure<?> proc) throws BadProcedureException {
078    try {
079      Class<?> clazz = proc.getClass();
080      if (!Modifier.isPublic(clazz.getModifiers())) {
081        throw new Exception("the " + clazz + " class is not public");
082      }
083
084      Constructor<?> ctor = clazz.getConstructor();
085      assert ctor != null;
086      if (!Modifier.isPublic(ctor.getModifiers())) {
087        throw new Exception("the " + clazz + " constructor is not public");
088      }
089    } catch (Exception e) {
090      throw new BadProcedureException("The procedure class " + proc.getClass().getName()
091        + " must be accessible and have an empty constructor", e);
092    }
093  }
094
095  // ==========================================================================
096  // convert to and from Procedure object
097  // ==========================================================================
098
099  /**
100   * A serializer for our Procedures. Instead of the previous serializer, it uses the stateMessage
101   * list to store the internal state of the Procedures.
102   */
103  private static class StateSerializer implements ProcedureStateSerializer {
104    private final ProcedureProtos.Procedure.Builder builder;
105    private int deserializeIndex;
106
107    public StateSerializer(ProcedureProtos.Procedure.Builder builder) {
108      this.builder = builder;
109    }
110
111    @Override
112    public void serialize(Message message) throws IOException {
113      Any packedMessage = Any.pack(message);
114      builder.addStateMessage(packedMessage);
115    }
116
117    @Override
118    public <M extends Message> M deserialize(Class<M> clazz) throws IOException {
119      if (deserializeIndex >= builder.getStateMessageCount()) {
120        throw new IOException("Invalid state message index: " + deserializeIndex);
121      }
122
123      try {
124        Any packedMessage = builder.getStateMessage(deserializeIndex++);
125        return packedMessage.unpack(clazz);
126      } catch (InvalidProtocolBufferException e) {
127        throw e.unwrapIOException();
128      }
129    }
130  }
131
132  /**
133   * A serializer (deserializer) for those Procedures which were serialized before this patch. It
134   * deserializes the old, binary stateData field.
135   */
136  private static class CompatStateSerializer implements ProcedureStateSerializer {
137    private InputStream inputStream;
138
139    public CompatStateSerializer(InputStream inputStream) {
140      this.inputStream = inputStream;
141    }
142
143    @Override
144    public void serialize(Message message) throws IOException {
145      throw new UnsupportedOperationException();
146    }
147
148    @SuppressWarnings("unchecked")
149    @Override
150    public <M extends Message> M deserialize(Class<M> clazz) throws IOException {
151      Parser<M> parser = (Parser<M>) Internal.getDefaultInstance(clazz).getParserForType();
152      try {
153        return parser.parseDelimitedFrom(inputStream);
154      } catch (InvalidProtocolBufferException e) {
155        throw e.unwrapIOException();
156      }
157    }
158  }
159
160  /**
161   * Helper to convert the procedure to protobuf.
162   * <p/>
163   * Used by ProcedureStore implementations.
164   */
165  public static ProcedureProtos.Procedure convertToProtoProcedure(Procedure<?> proc)
166    throws IOException {
167    Preconditions.checkArgument(proc != null);
168    validateClass(proc);
169
170    final ProcedureProtos.Procedure.Builder builder = ProcedureProtos.Procedure.newBuilder()
171      .setClassName(proc.getClass().getName()).setProcId(proc.getProcId()).setState(proc.getState())
172      .setSubmittedTime(proc.getSubmittedTime()).setLastUpdate(proc.getLastUpdate());
173
174    if (proc.hasParent()) {
175      builder.setParentId(proc.getParentProcId());
176    }
177
178    if (proc.hasTimeout()) {
179      builder.setTimeout(proc.getTimeout());
180    }
181
182    if (proc.hasOwner()) {
183      builder.setOwner(proc.getOwner());
184    }
185
186    final int[] stackIds = proc.getStackIndexes();
187    if (stackIds != null) {
188      for (int i = 0; i < stackIds.length; ++i) {
189        builder.addStackId(stackIds[i]);
190      }
191    }
192    builder.setExecuted(proc.wasExecuted());
193
194    if (proc.hasException()) {
195      RemoteProcedureException exception = proc.getException();
196      builder.setException(
197        RemoteProcedureException.toProto(exception.getSource(), exception.getCause()));
198    }
199
200    final byte[] result = proc.getResult();
201    if (result != null) {
202      builder.setResult(UnsafeByteOperations.unsafeWrap(result));
203    }
204
205    ProcedureStateSerializer serializer = new StateSerializer(builder);
206    proc.serializeStateData(serializer);
207
208    if (proc.getNonceKey() != null) {
209      builder.setNonceGroup(proc.getNonceKey().getNonceGroup());
210      builder.setNonce(proc.getNonceKey().getNonce());
211    }
212
213    if (proc.hasLock()) {
214      builder.setLocked(true);
215    }
216
217    if (proc.isBypass()) {
218      builder.setBypass(true);
219    }
220    return builder.build();
221  }
222
223  /**
224   * Helper to convert the protobuf procedure.
225   * <p/>
226   * Used by ProcedureStore implementations.
227   * <p/>
228   * TODO: OPTIMIZATION: some of the field never change during the execution (e.g. className,
229   * procId, parentId, ...). We can split in 'data' and 'state', and the store may take advantage of
230   * it by storing the data only on insert().
231   */
232  public static Procedure<?> convertToProcedure(ProcedureProtos.Procedure proto)
233    throws IOException {
234    // Procedure from class name
235    Procedure<?> proc = newProcedure(proto.getClassName());
236
237    // set fields
238    proc.setProcId(proto.getProcId());
239    proc.setState(proto.getState());
240    proc.setSubmittedTime(proto.getSubmittedTime());
241    proc.setLastUpdate(proto.getLastUpdate());
242
243    if (proto.hasParentId()) {
244      proc.setParentProcId(proto.getParentId());
245    }
246
247    if (proto.hasOwner()) {
248      proc.setOwner(proto.getOwner());
249    }
250
251    if (proto.hasTimeout()) {
252      proc.setTimeout(proto.getTimeout());
253    }
254
255    if (proto.getStackIdCount() > 0) {
256      proc.setStackIndexes(proto.getStackIdList());
257    }
258    if (proto.getExecuted()) {
259      proc.setExecuted();
260    }
261
262    if (proto.hasException()) {
263      assert proc.getState() == ProcedureProtos.ProcedureState.FAILED
264        || proc.getState() == ProcedureProtos.ProcedureState.ROLLEDBACK
265        : "The procedure must be failed (waiting to rollback) or rolledback";
266      proc.setFailure(RemoteProcedureException.fromProto(proto.getException()));
267    }
268
269    if (proto.hasResult()) {
270      proc.setResult(proto.getResult().toByteArray());
271    }
272
273    if (proto.getNonce() != HConstants.NO_NONCE) {
274      proc.setNonceKey(new NonceKey(proto.getNonceGroup(), proto.getNonce()));
275    }
276
277    if (proto.getLocked()) {
278      proc.lockedWhenLoading();
279    }
280
281    if (proto.getBypass()) {
282      proc.bypass(null);
283    }
284
285    ProcedureStateSerializer serializer = null;
286
287    if (proto.getStateMessageCount() > 0) {
288      serializer = new StateSerializer(proto.toBuilder());
289    } else if (proto.hasStateData()) {
290      InputStream inputStream = proto.getStateData().newInput();
291      serializer = new CompatStateSerializer(inputStream);
292    }
293
294    if (serializer != null) {
295      proc.deserializeStateData(serializer);
296    }
297
298    return proc;
299  }
300
301  // ==========================================================================
302  // convert from LockedResource object
303  // ==========================================================================
304
305  public static LockServiceProtos.LockedResourceType
306    convertToProtoResourceType(LockedResourceType resourceType) {
307    return LockServiceProtos.LockedResourceType.valueOf(resourceType.name());
308  }
309
310  public static LockServiceProtos.LockType convertToProtoLockType(LockType lockType) {
311    return LockServiceProtos.LockType.valueOf(lockType.name());
312  }
313
314  public static LockServiceProtos.LockedResource
315    convertToProtoLockedResource(LockedResource lockedResource) throws IOException {
316    LockServiceProtos.LockedResource.Builder builder =
317      LockServiceProtos.LockedResource.newBuilder();
318
319    builder.setResourceType(convertToProtoResourceType(lockedResource.getResourceType()))
320      .setResourceName(lockedResource.getResourceName())
321      .setLockType(convertToProtoLockType(lockedResource.getLockType()));
322
323    Procedure<?> exclusiveLockOwnerProcedure = lockedResource.getExclusiveLockOwnerProcedure();
324
325    if (exclusiveLockOwnerProcedure != null) {
326      ProcedureProtos.Procedure exclusiveLockOwnerProcedureProto =
327        convertToProtoProcedure(exclusiveLockOwnerProcedure);
328      builder.setExclusiveLockOwnerProcedure(exclusiveLockOwnerProcedureProto);
329    }
330
331    builder.setSharedLockCount(lockedResource.getSharedLockCount());
332
333    for (Procedure<?> waitingProcedure : lockedResource.getWaitingProcedures()) {
334      ProcedureProtos.Procedure waitingProcedureProto = convertToProtoProcedure(waitingProcedure);
335      builder.addWaitingProcedures(waitingProcedureProto);
336    }
337
338    return builder.build();
339  }
340
341  public static final String PROCEDURE_RETRY_SLEEP_INTERVAL_MS =
342    "hbase.procedure.retry.sleep.interval.ms";
343
344  // default to 1 second
345  public static final long DEFAULT_PROCEDURE_RETRY_SLEEP_INTERVAL_MS = 1000;
346
347  public static final String PROCEDURE_RETRY_MAX_SLEEP_TIME_MS =
348    "hbase.procedure.retry.max.sleep.time.ms";
349
350  // default to 10 minutes
351  public static final long DEFAULT_PROCEDURE_RETRY_MAX_SLEEP_TIME_MS =
352    TimeUnit.MINUTES.toMillis(10);
353
354  /**
355   * Get a retry counter for getting the backoff time. We will use the
356   * {@link ExponentialBackoffPolicyWithLimit} policy, and the base unit is 1 second, max sleep time
357   * is 10 minutes by default.
358   * <p/>
359   * For UTs, you can set the {@link #PROCEDURE_RETRY_SLEEP_INTERVAL_MS} and
360   * {@link #PROCEDURE_RETRY_MAX_SLEEP_TIME_MS} to make more frequent retry so your UT will not
361   * timeout.
362   */
363  public static RetryCounter createRetryCounter(Configuration conf) {
364    long sleepIntervalMs =
365      conf.getLong(PROCEDURE_RETRY_SLEEP_INTERVAL_MS, DEFAULT_PROCEDURE_RETRY_SLEEP_INTERVAL_MS);
366    long maxSleepTimeMs =
367      conf.getLong(PROCEDURE_RETRY_MAX_SLEEP_TIME_MS, DEFAULT_PROCEDURE_RETRY_MAX_SLEEP_TIME_MS);
368    RetryConfig retryConfig = new RetryConfig().setSleepInterval(sleepIntervalMs)
369      .setMaxSleepTime(maxSleepTimeMs).setBackoffPolicy(new ExponentialBackoffPolicyWithLimit());
370    return new RetryCounter(retryConfig);
371  }
372
373  public static boolean isFinished(ProcedureProtos.Procedure proc) {
374    if (!proc.hasParentId()) {
375      switch (proc.getState()) {
376        case ROLLEDBACK:
377        case SUCCESS:
378          return true;
379        default:
380          break;
381      }
382    }
383    return false;
384  }
385}