001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2.store; 019 020import java.io.IOException; 021import org.apache.hadoop.hbase.procedure2.Procedure; 022import org.apache.yetus.audience.InterfaceAudience; 023import org.apache.yetus.audience.InterfaceStability; 024 025/** 026 * The ProcedureStore is used by the executor to persist the state of each procedure execution. This 027 * allows to resume the execution of pending/in-progress procedures in case of machine failure or 028 * service shutdown. 029 * <p/> 030 * Notice that, the implementation must guarantee that the maxProcId when loading is the maximum one 031 * in the whole history, not only the current live procedures. This is very important as for 032 * executing remote procedures, we have some nonce checks at region server side to prevent executing 033 * non-idempotent operations more than once. If the procedure id could go back, then we may 034 * accidentally ignore some important operations such as region assign or unassign.<br/> 035 * This may lead to some garbages so we provide a {@link #cleanup()} method, the framework will call 036 * this method periodically and the store implementation could do some clean up works in this 037 * method. 038 */ 039@InterfaceAudience.Private 040@InterfaceStability.Evolving 041public interface ProcedureStore { 042 /** 043 * Store listener interface. 044 * <p/> 045 * The main process should register a listener and respond to the store events. 046 */ 047 public interface ProcedureStoreListener { 048 049 /** 050 * triggered when the store sync is completed. 051 */ 052 default void postSync() { 053 } 054 055 /** 056 * triggered when the store is not able to write out data. the main process should abort. 057 */ 058 default void abortProcess() { 059 } 060 061 /** 062 * Suggest that the upper layer should update the state of some procedures. Ignore this call 063 * will not effect correctness but performance. 064 * <p/> 065 * For a WAL based ProcedureStore implementation, if all the procedures stored in a WAL file 066 * have been deleted, or updated later in another WAL file, then we can delete the WAL file. If 067 * there are old procedures in a WAL file which are never deleted or updated, then we can not 068 * delete the WAL file and this will cause we hold lots of WAL file and slow down the master 069 * restarts. So here we introduce this method to tell the upper layer that please update the 070 * states of these procedures so that we can delete the old WAL file. 071 * @param procIds the id for the procedures 072 */ 073 default void forceUpdate(long[] procIds) { 074 } 075 } 076 077 /** 078 * An Iterator over a collection of Procedure 079 */ 080 public interface ProcedureIterator { 081 /** 082 * Reset the Iterator by seeking to the beginning of the list. 083 */ 084 void reset(); 085 086 /** 087 * Returns true if the iterator has more elements. (In other words, returns true if next() would 088 * return a Procedure rather than throwing an exception.) 089 * @return true if the iterator has more procedures 090 */ 091 boolean hasNext(); 092 093 /** 094 * Calling this method does not need to convert the protobuf message to the Procedure class, so 095 * if it returns true we can call {@link #skipNext()} to skip the procedure without 096 * deserializing. This could increase the performance. 097 * @return true if the iterator next element is a completed procedure. 098 */ 099 boolean isNextFinished(); 100 101 /** 102 * Skip the next procedure 103 * <p/> 104 * This method is used to skip the deserializing of the procedure to increase performance, as 105 * when calling next we need to convert the protobuf message to the Procedure class. 106 */ 107 void skipNext(); 108 109 /** 110 * Returns the next procedure in the iteration. 111 * @throws IOException if there was an error fetching/deserializing the procedure 112 * @return the next procedure in the iteration. 113 */ 114 @SuppressWarnings("rawtypes") 115 Procedure next() throws IOException; 116 } 117 118 /** 119 * Interface passed to the ProcedureStore.load() method to handle the store-load events. 120 */ 121 public interface ProcedureLoader { 122 /** 123 * Called by ProcedureStore.load() to notify about the maximum proc-id in the store. 124 * @param maxProcId the highest proc-id in the store 125 */ 126 void setMaxProcId(long maxProcId); 127 128 /** 129 * Called by the ProcedureStore.load() every time a set of procedures are ready to be executed. 130 * The ProcedureIterator passed to the method, has the procedure sorted in replay-order. 131 * @param procIter iterator over the procedures ready to be added to the executor. 132 */ 133 void load(ProcedureIterator procIter) throws IOException; 134 135 /** 136 * Called by the ProcedureStore.load() in case we have procedures not-ready to be added to the 137 * executor, which probably means they are corrupted since some information/link is missing. 138 * @param procIter iterator over the procedures not ready to be added to the executor, corrupted 139 */ 140 void handleCorrupted(ProcedureIterator procIter) throws IOException; 141 } 142 143 /** 144 * Add the listener to the notification list. 145 * @param listener The AssignmentListener to register 146 */ 147 void registerListener(ProcedureStoreListener listener); 148 149 /** 150 * Remove the listener from the notification list. 151 * @param listener The AssignmentListener to unregister 152 * @return true if the listner was in the list and it was removed, otherwise false. 153 */ 154 boolean unregisterListener(ProcedureStoreListener listener); 155 156 /** 157 * Start/Open the procedure store 158 * @param numThreads number of threads to be used by the procedure store 159 */ 160 void start(int numThreads) throws IOException; 161 162 /** 163 * Stop/Close the procedure store 164 * @param abort true if the stop is an abort 165 */ 166 void stop(boolean abort); 167 168 /** Returns true if the store is running, otherwise false. */ 169 boolean isRunning(); 170 171 /** Returns the number of threads/slots passed to start() */ 172 int getNumThreads(); 173 174 /** 175 * Set the number of procedure running. This can be used, for example, by the store to know how 176 * long to wait before a sync. 177 * @return how many procedures are running (may not be same as <code>count</code>). 178 */ 179 int setRunningProcedureCount(int count); 180 181 /** 182 * Acquire the lease for the procedure store. 183 * @deprecated since 2.3.0, will be removed in 4.0.0 along with 184 * {@link org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore}. As now we 185 * will store the procedure data in a master local region, and master itself will deal 186 * with the lease recovery of the region. 187 */ 188 @Deprecated 189 void recoverLease() throws IOException; 190 191 /** 192 * Load the Procedures in the store. 193 * @param loader the ProcedureLoader that will handle the store-load events 194 */ 195 void load(ProcedureLoader loader) throws IOException; 196 197 /** 198 * When a procedure is submitted to the executor insert(proc, null) will be called. 'proc' has a 199 * 'RUNNABLE' state and the initial information required to start up. When a procedure is executed 200 * and it returns children insert(proc, subprocs) will be called. 'proc' has a 'WAITING' state and 201 * an update state. 'subprocs' are the children in 'RUNNABLE' state with the initial information. 202 * @param proc the procedure to serialize and write to the store. 203 * @param subprocs the newly created child of the proc. 204 */ 205 void insert(Procedure<?> proc, Procedure<?>[] subprocs); 206 207 /** 208 * Serialize a set of new procedures. These procedures are freshly submitted to the executor and 209 * each procedure has a 'RUNNABLE' state and the initial information required to start up. 210 * @param procs the procedures to serialize and write to the store. 211 */ 212 void insert(Procedure<?>[] procs); 213 214 /** 215 * The specified procedure was executed, and the new state should be written to the store. 216 * @param proc the procedure to serialize and write to the store. 217 */ 218 void update(Procedure<?> proc); 219 220 /** 221 * The specified procId was removed from the executor, due to completion, abort or failure. The 222 * store implementor should remove all the information about the specified procId. 223 * @param procId the ID of the procedure to remove. 224 */ 225 void delete(long procId); 226 227 /** 228 * The parent procedure completed. Update the state and mark all the child deleted. 229 * @param parentProc the parent procedure to serialize and write to the store. 230 * @param subProcIds the IDs of the sub-procedure to remove. 231 */ 232 void delete(Procedure<?> parentProc, long[] subProcIds); 233 234 /** 235 * The specified procIds were removed from the executor, due to completion, abort or failure. The 236 * store implementor should remove all the information about the specified procIds. 237 * @param procIds the IDs of the procedures to remove. 238 * @param offset the array offset from where to start to delete 239 * @param count the number of IDs to delete 240 */ 241 void delete(long[] procIds, int offset, int count); 242 243 /** 244 * Will be called by the framework to give the store a chance to do some clean up works. 245 * <p/> 246 * Notice that this is for periodical clean up work, not for the clean up after close, if you want 247 * to close the store just call the {@link #stop(boolean)} method above. 248 */ 249 default void cleanup() { 250 } 251}