Search This Blog

Tuesday, September 13, 2016

jBPM Asynchronous Executor with Example

In version 6, jBPM introduces new component called jbpm executor which provides quite advanced features for asynchronous execution. It delivers generic environment for background execution of commands. Commands are nothing more than business logic encapsulated within simple interface. It does not have any process runtime related information, that means no need to complete work items, or anything of that sort. It purely focuses on the business logic to be executed. It receives data via CommandContext and returns results of the execution with ExecutionResults.
Before looking into details on jBPM support for asynchronous execution let's look at what are the common requirements for such execution:
  • allows asynchronous execution of given piece of business logic
  • allows to retry in case of resources are temporarily unavailable e.g. external system interaction
  • allows to handle errors in case all retries have been attempted
  • provides cancelation option
  • provides history log of execution
When confronting these requirements with the "simple async handler" (exeucted as separate thread) you can directly notice that all of these would need to be implemented all over again by different systems. Due to that a common, generic component has been provided out of the box to simplify and empower usage.
jBPM executor operates on commands, which are essential piece of code that is going to be executed as background job.
/**

 * Executor's Command are dedicated to contain purely business logic that should be executed. 

 * It should not have any reference to underlying process engine and should not be concerned

 * with any process runtime related logic such us completing work item, sending signals, etc.

 * <br/>

 * Information that are taken from process will be delivered as part of data instance of 

 * <code>CommandContext</code>. Depending on the execution context that data can vary but 

 * in most of the cases following will be given:

 * <ul>

 *  <li></li>

 *  <li>businessKey - usually unique identifier of the caller</li>

 *  <li>callbacks - FQCN of the <code>CommandCollback</code> that shall be used on command completion</li>

 * </ul>

 * When executed as part of the process (work item handler) additional data can be expected:

 * <ul>

 *  <li>workItem - the actual work item that is being executed with all it's parameters</li>

 *  <li>processInstanceId - id of the process instance that triggered this work</li>

 *  <li>deploymentId - if given process instance is part of an active deployment</li>

 * </ul>

 * Important note about implementations is that it shall always be possible to be initialized with default constructor

 * as executor service is an async component so it will initialize the command on demand using reflection.

 * In case there is a heavy logic on initialization it should be placed in another service implementation that 

 * can be looked up from within command.

 */

public interface Command {

    

    /**

     * Executed this command's logic.

     * @param ctx - contextual data given by the executor service

     * @return returns any results in case of successful execution

     * @throws Exception in case execution failed and shall be retried if possible

     */

    public ExecutionResults execute(CommandContext ctx) throws Exception;

}

Looking at the interface above, there is no specific integration with the jBPM runtime engine, it's decoupled from it to put main focus on the actual logic that shall be executed as part of that command rather to worry about integration with process engine. This design promotes reuse of already existing logic by simply wrapping it with Command implementation.
Input data is transferred from process engine to command via CommandContext. It acts purely as data transfer object and puts single requirement on the data it holds - all objects must be serializable.

/**

 * Data holder for any contextual data that shall be given to the command upon execution.

 * Important note that every object that is added to the data container must be serializable 

 * meaning it must implement <code>java.io.Seriazliable</code>

 *

 */

public class CommandContext implements Serializable {


    private static final long serialVersionUID = -1440017934399413860L;

    private Map<String, Object> data;


    public CommandContext() {

        data  = new HashMap<String, Object>();

    }


    public CommandContext(Map<String, Object> data) {

        this.data = data;

    }


    public void setData(Map<String, Object> data) {

        this.data = data;

    }


    public Map<String, Object> getData() {

        return data;

    }


    public Object getData(String key) {

        return data.get(key);

    }


    public void setData(String key, Object value) {

        data.put(key, value);

    }


    public Set<String> keySet() {

        return data.keySet();

    }


    @Override

    public String toString() {

        return "CommandContext{" + "data=" + data + '}';

    }

}

Next outcome is provided to process engine via ExecutionResults, which is very similar in nature to the CommandContext and acts as data transfer object.


/**

 * Data holder for command's result data. Whatever command produces should be placed in

 * this results so they can be later on referenced by name by the requester - e.g. process instance.

 *

 */

public class ExecutionResults implements Serializable {


    private static final long serialVersionUID = -1738336024526084091L;

    private Map<String, Object> data = new HashMap<String, Object>();


    public ExecutionResults() {

    }


    public void setData(Map<String, Object> data) {

        this.data = data;

    }


    public Map<String, Object> getData() {

        return data;

    }


    public Object getData(String key) {

        return data.get(key);

    }


    public void setData(String key, Object value) {

        data.put(key, value);

    }


    public Set<String> keySet() {

        return data.keySet();

    }


    @Override

    public String toString() {

        return "ExecutionResults{" + "data=" + data + '}';

    }

    

    

}

Implementation of the  Command Interface with calendar date changing based on the input expression received from the CommandContext class and the execution result then send back via the ExecutionResult class object.

package com.jbpm.executor;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.kie.internal.executor.api.Command;
import org.kie.internal.executor.api.CommandContext;
import org.kie.internal.executor.api.ExecutionResults;
import org.kie.internal.executor.api.Reoccurring;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HelloCommand implements Command , Reoccurring{
private static final Logger logger = LoggerFactory.getLogger(HelloCommand.class);
private Integer frequency=0;
private String expression="";
public ExecutionResults execute(CommandContext arg0) throws Exception {
ExecutionResults execResult= new ExecutionResults();
Map<String, Object> outputData = new HashMap<String, Object>();
Map<String, Object> inputData;
String userId;
String recordId;
try {
inputData=arg0.getData();
userId=(String) inputData.get("userId");
recordId= (String)inputData.get("recordId");
//frequency= Integer.parseInt((String)inputData.get("frequency"));
expression=(String)inputData.get("expression");
logger.info("User Id recieved is : "+userId+ " for Record Id : "+recordId+ " and for recurring frequency : "+frequency);
outputData.put("executionResult", "Success");
execResult.setData(outputData);
} catch (Exception e) {
logger.error("Exception Occured in execute() of HelloCommand class :" +e);
e.printStackTrace();
}
return execResult;
}
public Date getScheduleTime() {
logger.info("Expression Recieved is : "+expression);
/*Calendar c = Calendar.getInstance();
c.setTime(new Date());
c.add(Calendar.DATE, frequency); // number of days to add
logger.info("Inside getScheduleTime() method of MyHelloCommand : "+c.getTime());*/
return getFrequency();
}
public Date getFrequency(){
logger.info("Entering getFrequency()");
Calendar calendar = Calendar.getInstance();
calendar.setTime(new Date());
String[] ary = null;
try {
if(expression!=null && expression!=""){
ary =expression.split(",");
for(int i=0 ;i<ary.length;i++){
String val=ary[i];
if(!val.equals("0") && !val.equals("")){
int interval= Integer.parseInt(val);
if(i==0){
calendar.add(Calendar.SECOND, interval);
}
if(i==1){
calendar.add(Calendar.MINUTE, interval);
}
if(i==2){
calendar.add(Calendar.HOUR, interval);
}
if(i==3){
calendar.add(Calendar.DATE, interval);
}
if(i==4){
calendar.add(Calendar.MONTH, interval);
}
if(i==5){
calendar.add(Calendar.YEAR, interval);
}
}
}
}
} catch (NumberFormatException e) {
e.printStackTrace();
}
catch (Exception e) {
e.printStackTrace();
}
logger.info("Scheduled Interval is :"+ calendar.getTime());
return calendar.getTime();
}
}



Executor covers all requirements listed above and provides user interface as part of jbpm console and kie workbench (kie-wb) applications.



Figure 24.1. 

Above screenshot illustrates history view of executor's job queue. As can be seen on it there are several options available:

  • view details of the job
  • cancel given job
  • create new job
For more information follow my Tutorial  online @ jbpm tutorial master 

No comments:

Post a Comment