fix(TaskProcessingApiController): Implement getNextScheduledTasks for next_batch endpoint

in order to avoid hitting the DB with multiple 1 row requests

Signed-off-by: Marcel Klehr <mklehr@gmx.net>
pull/55735/head
Marcel Klehr 2025-10-14 09:30:42 +07:00
parent 80eb3dd0d0
commit 16da8bbc8a
4 changed files with 60 additions and 19 deletions

@ -576,6 +576,7 @@ class TaskProcessingApiController extends OCSController {
/**
* Returns the next n scheduled tasks for the specified set of taskTypes and providers
* The returned tasks are capped at ~50MiB
*
* @param list<string> $providerIds The ids of the providers
* @param list<string> $taskTypeIds The ids of the task types
@ -597,23 +598,18 @@ class TaskProcessingApiController extends OCSController {
]);
}
/** @var list<array{task:CoreTaskProcessingTask, provider:string}> $tasks */
$tasks = [];
$taskIdsToIgnore = [];
$tasks = $this->taskProcessingManager->getNextScheduledTasks($possibleTaskTypeIds, numberOfTasks: $numberOfTasks + 1);
$tasksJson = [];
// Stop when $numberOfTasks is reached or the json payload is larger than 50MiB
while (count($tasks) < $numberOfTasks && strlen(json_encode($tasks)) < 50 * 1024 * 1024) {
while (count($tasks) > 0 && count($tasksJson) < $numberOfTasks && strlen(json_encode($tasks)) < 50 * 1024 * 1024) {
// Until we find a task whose task type is set to be provided by the providers requested with this request
// Or no scheduled task is found anymore (given the taskIds to ignore)
try {
$task = $this->taskProcessingManager->getNextScheduledTask($possibleTaskTypeIds, $taskIdsToIgnore);
} catch (NotFoundException) {
break;
}
$task = array_shift($tasks);
try {
$provider = $this->taskProcessingManager->getPreferredProvider($task->getTaskTypeId());
if (in_array($provider->getId(), $possibleProviderIds, true)) {
if ($this->taskProcessingManager->lockTask($task)) {
$tasks[] = ['task' => $task->jsonSerialize(), 'provider' => $provider->getId()];
$tasksJson[] = ['task' => $task->jsonSerialize(), 'provider' => $provider->getId()];
continue;
}
}
@ -621,16 +617,8 @@ class TaskProcessingApiController extends OCSController {
// There is no provider set for the task type of this task
// proceed to ignore this task
}
$taskIdsToIgnore[] = (int)$task->getId();
}
try {
$this->taskProcessingManager->getNextScheduledTask($possibleTaskTypeIds, $taskIdsToIgnore);
$hasMore = true;
} catch (\Throwable) {
$hasMore = false;
}
$hasMore = count($tasks) > 0;
return new DataResponse([
'tasks' => $tasks,

@ -233,4 +233,35 @@ class TaskMapper extends QBMapper {
return 0;
}
}
/**
* @param list<string> $taskTypes
* @param list<int> $taskIdsToIgnore
* @param int $numberOfTasks
* @return list<Task>
* @throws Exception
*/
public function findNOldestScheduledByType(array $taskTypes, array $taskIdsToIgnore, int $numberOfTasks) {
$qb = $this->db->getQueryBuilder();
$qb->select(Task::$columns)
->from($this->tableName)
->where($qb->expr()->eq('status', $qb->createPositionalParameter(\OCP\TaskProcessing\Task::STATUS_SCHEDULED, IQueryBuilder::PARAM_INT)))
->setMaxResults($numberOfTasks)
->orderBy('last_updated', 'ASC');
if (!empty($taskTypes)) {
$filter = [];
foreach ($taskTypes as $taskType) {
$filter[] = $qb->expr()->eq('type', $qb->createPositionalParameter($taskType));
}
$qb->andWhere($qb->expr()->orX(...$filter));
}
if (!empty($taskIdsToIgnore)) {
$qb->andWhere($qb->expr()->notIn('id', $qb->createNamedParameter($taskIdsToIgnore, IQueryBuilder::PARAM_INT_ARRAY)));
}
return $this->findEntities($qb);
}
}

@ -1192,6 +1192,18 @@ class Manager implements IManager {
}
}
public function getNextScheduledTasks(array $taskTypeIds = [], array $taskIdsToIgnore = [], int $numberOfTasks = 1): array {
try {
return array_map(fn ($taskEntity) => $taskEntity->toPublicTask(), $this->taskMapper->findNOldestScheduledByType($taskTypeIds, $taskIdsToIgnore, $numberOfTasks);
} catch (DoesNotExistException $e) {
throw new \OCP\TaskProcessing\Exception\NotFoundException('Could not find the task', 0, $e);
} catch (\OCP\DB\Exception $e) {
throw new \OCP\TaskProcessing\Exception\Exception('There was a problem finding the task', 0, $e);
} catch (\JsonException $e) {
throw new \OCP\TaskProcessing\Exception\Exception('There was a problem parsing JSON after finding the task', 0, $e);
}
}
/**
* Takes task input data and replaces fileIds with File objects
*

@ -160,6 +160,16 @@ interface IManager {
*/
public function getNextScheduledTask(array $taskTypeIds = [], array $taskIdsToIgnore = []): Task;
/**
* @param list<string> $taskTypeIds
* @param list<int> $taskIdsToIgnore
* @param int $numberOfTasks
* @return list<Task>
* @throws Exception If the query failed
* @since 33.0.0
*/
public function getNextScheduledTasks(array $taskTypeIds = [], array $taskIdsToIgnore = [], int $numberOfTasks = 1): array;
/**
* @param int $id The id of the task
* @param string|null $userId The user id that scheduled the task