Creating General Plugins
Plugins that implement arbitrary functionality callable from workflows.
Use Cases
- Data aggregation and analysis from log DB
- External API integration
- Custom calculations and transformations
- File operations
Complete Implementation Example
POJO
package com.example.plugin;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Logger;
/**
* POJO for custom aggregation functionality.
* No framework dependencies.
*/
public class LogAnalyzer {
private static final Logger logger = Logger.getLogger(LogAnalyzer.class.getName());
private Connection connection;
private long sessionId = -1;
// ========================================================================
// Setters (called from IIAR)
// ========================================================================
public void setConnection(Connection connection) {
this.connection = connection;
}
public void setSessionId(long sessionId) {
this.sessionId = sessionId;
}
// ========================================================================
// Business Logic
// ========================================================================
/**
* Aggregate logs matching the specified pattern.
*
* @param pattern Search pattern
* @return Aggregation result
*/
public String analyze(String pattern) {
if (connection == null || sessionId < 0) {
logger.warning("LogAnalyzer: connection or sessionId not set");
return "";
}
try {
return doAnalyze(pattern);
} catch (Exception e) {
logger.warning("LogAnalyzer: error: " + e.getMessage());
return "Error: " + e.getMessage();
}
}
private String doAnalyze(String pattern) throws Exception {
List<String> results = new ArrayList<>();
String sql = "SELECT node_id, message, timestamp FROM logs " +
"WHERE session_id = ? AND message LIKE ? " +
"ORDER BY timestamp";
try (PreparedStatement ps = connection.prepareStatement(sql)) {
ps.setLong(1, sessionId);
ps.setString(2, "%" + pattern + "%");
try (ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
String nodeId = rs.getString("node_id");
String message = rs.getString("message");
results.add(nodeId + ": " + message);
}
}
}
if (results.isEmpty()) {
return "No matches found for: " + pattern;
}
StringBuilder sb = new StringBuilder();
sb.append("[Log Analysis: ").append(pattern).append("]\n");
for (String result : results) {
sb.append(result).append("\n");
}
sb.append("\nTotal: ").append(results.size()).append(" entries\n");
return sb.toString();
}
/**
* Get count of error logs.
*
* @return Error count
*/
public int countErrors() {
if (connection == null || sessionId < 0) {
return -1;
}
try {
String sql = "SELECT COUNT(*) FROM logs " +
"WHERE session_id = ? AND level = 'SEVERE'";
try (PreparedStatement ps = connection.prepareStatement(sql)) {
ps.setLong(1, sessionId);
try (ResultSet rs = ps.executeQuery()) {
if (rs.next()) {
return rs.getInt(1);
}
}
}
} catch (Exception e) {
logger.warning("LogAnalyzer: countErrors error: " + e.getMessage());
}
return -1;
}
}
IIAR
package com.example.plugin;
import com.scivicslab.actoriac.log.DistributedLogStore;
import com.scivicslab.pojoactor.core.Action;
import com.scivicslab.pojoactor.core.ActionResult;
import com.scivicslab.pojoactor.workflow.IIActorRef;
import com.scivicslab.pojoactor.workflow.IIActorSystem;
import java.sql.Connection;
import java.util.logging.Logger;
/**
* IIAR wrapper for LogAnalyzer.
*/
public class LogAnalyzerIIAR extends IIActorRef<LogAnalyzer> {
private static final Logger logger = Logger.getLogger(LogAnalyzerIIAR.class.getName());
public LogAnalyzerIIAR(String actorName, IIActorSystem system) {
super(actorName, new LogAnalyzer(), system);
initializeFromSystem();
}
private void initializeFromSystem() {
// Get DB connection
DistributedLogStore logStore = DistributedLogStore.getInstance();
if (logStore != null) {
Connection conn = logStore.getConnection();
if (conn != null) {
object.setConnection(conn);
logger.fine("LogAnalyzerIIAR: initialized database connection");
}
}
// Get session ID
long sessionId = getSessionIdFromNodeGroup();
if (sessionId >= 0) {
object.setSessionId(sessionId);
logger.fine("LogAnalyzerIIAR: initialized sessionId=" + sessionId);
}
}
private long getSessionIdFromNodeGroup() {
if (actorSystem == null) {
return -1;
}
IIActorRef<?> nodeGroup = ((IIActorSystem) actorSystem).getIIActor("nodeGroup");
if (nodeGroup == null) {
return -1;
}
ActionResult result = nodeGroup.callByActionName("getSessionId", "");
if (result.isSuccess()) {
try {
return Long.parseLong(result.getResult());
} catch (NumberFormatException e) {
logger.warning("LogAnalyzerIIAR: invalid sessionId");
}
}
return -1;
}
// ========================================================================
// Actions
// ========================================================================
@Action("analyze")
public ActionResult analyze(String args) {
String result = object.analyze(args != null ? args : "");
return new ActionResult(true, result);
}
@Action("countErrors")
public ActionResult countErrors(String args) {
int count = object.countErrors();
if (count < 0) {
return new ActionResult(false, "Failed to count errors");
}
return new ActionResult(true, String.valueOf(count));
}
}
Workflow Usage
name: analyze-logs
description: Analyze logs
steps:
- states: ["0", "1"]
note: Load plugin and create analyzer
actions:
- actor: loader
method: loadJar
arguments: ["plugins/log-analyzer-1.0.0.jar"]
- actor: loader
method: createChild
arguments: ["ROOT", "analyzer", "com.example.plugin.LogAnalyzerIIAR"]
- states: ["1", "2"]
note: Run some workflow that generates logs
actions:
- actor: nodeGroup
method: apply
arguments:
actor: "node-*"
method: runWorkflow
arguments: ["some-workflow.yaml"]
- states: ["2", "3"]
note: Analyze ERROR logs
actions:
- actor: analyzer
method: analyze
arguments: ["ERROR"]
- states: ["3", "end"]
note: Count errors
actions:
- actor: analyzer
method: countErrors
Plugins with Multiple Actions
You can define multiple actions in one plugin:
@Action("analyze")
public ActionResult analyze(String args) { ... }
@Action("countErrors")
public ActionResult countErrors(String args) { ... }
@Action("summarize")
public ActionResult summarize(String args) { ... }
@Action("export")
public ActionResult export(String args) { ... }
External API Integration Example
public class ApiClient {
private String apiEndpoint;
public void setApiEndpoint(String endpoint) {
this.apiEndpoint = endpoint;
}
public String fetchData(String query) {
// Call external API with HTTP client
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(apiEndpoint + "?q=" + URLEncoder.encode(query, UTF_8)))
.GET()
.build();
HttpResponse<String> response = client.send(request,
HttpResponse.BodyHandlers.ofString());
return response.body();
}
}
Set endpoint in IIAR:
public ApiClientIIAR(String actorName, IIActorSystem system) {
super(actorName, new ApiClient(), system);
parseActorName(actorName);
}
private void parseActorName(String actorName) {
// Format: "apiClient:https://api.example.com"
String[] parts = actorName.split(":", 2);
if (parts.length >= 2) {
object.setApiEndpoint(parts[1]);
}
}
Workflow:
- actor: loader
method: createChild
arguments: ["ROOT", "apiClient:https://api.example.com", "...ApiClientIIAR"]
- actor: apiClient
method: fetchData
arguments: ["search query"]
Stateful Plugins
Plugins can maintain state. Useful for sharing data between actions:
public class StatefulPlugin {
private List<String> collectedData = new ArrayList<>();
public void collect(String data) {
collectedData.add(data);
}
public String getAll() {
return String.join("\n", collectedData);
}
public void clear() {
collectedData.clear();
}
}
Collect data progressively in workflow:
- actor: collector
method: collect
arguments: ["data1"]
- actor: collector
method: collect
arguments: ["data2"]
- actor: collector
method: getAll
# Result: "data1\ndata2"