task.task-picker-class
Specify the name of the class that will be used to pick tasks off the queue to be run.
Key: task.task-picker-class
Type: String
Can be set in: global.cfg
Description
The sets the fully qualified name of the class the at will be used to pick tasks off the task queue to be run.
A custom task picker class could, for example:
- Limit the number of tasks being run concurrently.
- Run tasks of some users before other users.
The class must implement:
package com.funnelback.admin.api.service.queue.task.tasktypes.run.pick.shared;
public interface TaskPicker {
/**
* Allows for selecting the next task(s) to run.
*
* <p>This method will be called periodically and will allow for the
* selection of the next task to run. The context provides the list of
* tasks which can be run along with the currently running set of tasks,
* implementations are to pick tasks from the running list and submit
* them to {@link PickTaskContext#submitTaskToRun} which may run the task
* if it is possible.</p>
*
* <p>On each invocation of this method, the implementations will likely
* submit as many tasks as possible to be run (perhaps limited by some
* constraint) rather than submit one task and wait for the next
* invocation.</p>
*
* @param context
*/
void pickNextTasksToRun(PickTaskContext context);
}
The class will want to interact with the PickTaskContext
to get the tasks from the queue and decide if they should be run. The context will implement:
package com.funnelback.admin.api.service.queue.task.tasktypes.run.pick.shared;
import java.util.List;
import java.io.File;
public interface PickTaskContext {
/**
* Tasks that are currently in the queue which could be selected to run.
*
* <p>If a task that is in this queue is passed to submitTaskToRun then
* it will no longer returned in the list returned by this method.</p>
*
* <p>This is not the complete list of tasks that are in the queue, instead
* this is the list of tasks that can be run at the same time as the tasks
* that are currently running. Tasks that can not run at the same time for
* example are a Instant update and re-index at the same time on the same
* collection.<p>
*
* <p>Multiple calls to this may NOT result in the same list being
* returned.</p>
*
* <p>The order is by priority then by order in the queue</p>
*
* @return the list of currently queued tasks.
*/
public List<Task> getQueuedTasksWhichCanRun();
/**
* Currently running tasks.
*
* <p>If a task completes while the {@link TaskPicker} is running
* the returned list may become outdated.</p>
*
* <p>Multiple calls to this may NOT result in the same list being
* returned.</p>
*
* @return
*/
public List<Task> getRunningTasks();
/**
* Attempts to submit a task to be run.
*
* <p>It is possible that a task can not submitted because something else
* is currently running the task outside of the queue system for example
* from the command line. This will attempt to catch those cases and return
* false, rather than submit the task to then have the task later find out
* it could not be run.</p>
*
* @param task to run.
* @return true if the task can be submitted to run false if it
* could not (typically when another tasks that uses the same resources as
* the given task is already running e.g. submitting a RE_INDEX and a
* INSTANT_UPDATE is already running for the same collection).
*/
public boolean submitTaskToRun(Task task);
/**
* Returns the search home for the installation this is running under.
*
* @return search home.
*/
public File getSearchHome();
}
The tasks will implement:
package com.funnelback.admin.api.service.queue.task.tasktypes.run.pick.shared;
import java.time.ZonedDateTime;
import com.funnelback.admin.api.service.queue.task.param.TaskParams;
import com.funnelback.admin.api.service.queue.task.tasktypes.TaskTypes;
import com.funnelback.common.task.queue.Priority;
public interface Task {
/**
* Gets the ID of the task.
*
* return the ID of the task.
*/
public String getId();
/**
* Gets the date and time the task was created.
*
* @return the data and time the task was created.
*/
public ZonedDateTime getCreated();
/**
* Gets the last modified data and time of the task.
*
* @return the last modified date and time of the task.
*/
public ZonedDateTime getLastModified();
/**
* Gets the type of the task.
*
* @return the task type.
*/
public TaskTypes getType();
/**
* The task parameters, this will match what was given to the API.
*
* @return the task parameters.
*/
public TaskParams getParams();
/**
* Gets the position of the task within the list of all tasks in the queue.
*
* @return
*/
public int getPosition();
/**
* Gets the username of the user that created the task.
*
* @return the username of the user that created the task.
*/
public String getAddedBy();
/**
* Gets the priority of the task.
*
* @return the priority of the task
*/
public Priority getPriority();
/**
* Gets the collection this task runs on.
*
* @return the collection this task runs on or null if it doesn't run on a
* collection.
*/
public String getCollectionTheTaskRunsOn();
}
Default Value
By default the task picker is set to a class which will run all tasks immediately.
task.task-picker-class=com.funnelback.admin.api.service.queue.task.tasktypes.run.pick.RunImmediatelyTaskPicker
Examples
This will show an example of implementing a custom groovy class which limits the number tasks running based on the available CPUs.
The groovy script will be placed into:
$SEARCH_HOME/lib/java/groovy/com/foo/OneTaskPerCPU.groovy
The groovy code is:
package com.foo;
import com.funnelback.admin.api.service.queue.task.tasktypes.run.pick.shared.*;
import com.funnelback.common.task.queue.Priority;
public class OneTaskPerCPU implements TaskPicker {
private static final org.apache.logging.log4j.Logger log =
org.apache.logging.log4j.LogManager.getLogger(OneTaskPerCPU.class);
public void pickNextTasksToRun(PickTaskContext context) {
int availableProcessors = Runtime.getRuntime().availableProcessors();
// Tasks come in order of priority then in order they are in the queue.
// ALL RUN_ASAP tasks will come first.
for(Task task : context.getQueuedTasksWhichCanRun()) {
// wrap everything in an try catch, this way we can get to the next
// task if something goes wrong.
try {
if(task.getPriority() != Priority.RUN_ASAP
&& context.getRunningTasks().size() >= availableProcessors) {
// There are no more RUN_ASAP tasks and we are already
// running as many tasks as CPUs.
return;
}
context.submitTaskToRun(task);
} catch (Exception e) {
try {
log.trace("An error occure when working on "
+ "task '{}' of type '{}' with params '{}'.",
task.getId(),
task.getType(),
task.getParams(), // the parameters of every task has a
// human readable toString method so
// this will tell us everything we
// need to know.
e);
} catch (Exception e2) {
e2.addSuppressed(e);
log.info("Error logging error about a task", e2);
}
}
}
}
}
The global.cfg
file will then need to contain:
task.task-picker-class=com.foo.OneTaskPerCPU
The change will come into affect immediately, no restart is required.