Merge pull request #55735 from nextcloud/feat/taskprocessing-api-next-batch

feat(TaskProcessingApiController): Add new next_batch endpoint
pull/55803/head
Marcel Klehr 2025-10-16 09:55:49 +07:00 committed by GitHub
commit 7320322d52
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 795 additions and 26 deletions

@ -532,29 +532,7 @@ class TaskProcessingApiController extends OCSController {
#[ApiRoute(verb: 'GET', url: '/tasks_provider/next', root: '/taskprocessing')]
public function getNextScheduledTask(array $providerIds, array $taskTypeIds): DataResponse {
try {
$providerIdsBasedOnTaskTypesWithNull = array_unique(array_map(function ($taskTypeId) {
try {
return $this->taskProcessingManager->getPreferredProvider($taskTypeId)->getId();
} catch (Exception) {
return null;
}
}, $taskTypeIds));
$providerIdsBasedOnTaskTypes = array_filter($providerIdsBasedOnTaskTypesWithNull, fn ($providerId) => $providerId !== null);
// restrict $providerIds to providers that are configured as preferred for the passed task types
$possibleProviderIds = array_values(array_intersect($providerIdsBasedOnTaskTypes, $providerIds));
// restrict $taskTypeIds to task types that can actually be run by one of the now restricted providers
$possibleTaskTypeIds = array_values(array_filter($taskTypeIds, function ($taskTypeId) use ($possibleProviderIds) {
try {
$providerForTaskType = $this->taskProcessingManager->getPreferredProvider($taskTypeId)->getId();
} catch (Exception) {
// no provider found for task type
return false;
}
return in_array($providerForTaskType, $possibleProviderIds, true);
}));
[$possibleProviderIds, $possibleTaskTypeIds] = $this->intersectTaskTypesAndProviders($taskTypeIds, $providerIds);
if (count($possibleProviderIds) === 0 || count($possibleTaskTypeIds) === 0) {
throw new NotFoundException();
@ -596,6 +574,61 @@ class TaskProcessingApiController extends OCSController {
}
}
/**
* Returns the next n scheduled tasks for the specified set of taskTypes and providers
* The returned tasks are capped at ~50MiB
*
* @param list<string> $providerIds The ids of the providers
* @param list<string> $taskTypeIds The ids of the task types
* @param int $numberOfTasks The number of tasks to return
* @return DataResponse<Http::STATUS_OK, array{tasks: list<array{task: CoreTaskProcessingTask, provider: string}>, has_more: bool}, array{}>|DataResponse<Http::STATUS_INTERNAL_SERVER_ERROR, array{message: string}, array{}>
*
* 200: Tasks returned
*/
#[ExAppRequired]
#[ApiRoute(verb: 'GET', url: '/tasks_provider/next_batch', root: '/taskprocessing')]
public function getNextScheduledTaskBatch(array $providerIds, array $taskTypeIds, int $numberOfTasks = 1): DataResponse {
try {
[$possibleProviderIds, $possibleTaskTypeIds] = $this->intersectTaskTypesAndProviders($taskTypeIds, $providerIds);
if (count($possibleProviderIds) === 0 || count($possibleTaskTypeIds) === 0) {
return new DataResponse([
'tasks' => [],
'has_more' => false,
]);
}
$tasks = $this->taskProcessingManager->getNextScheduledTasks($possibleTaskTypeIds, numberOfTasks: $numberOfTasks + 1);
$tasksJson = [];
// Stop when $numberOfTasks is reached or the json payload is larger than 50MiB
while (count($tasks) > 0 && count($tasksJson) < $numberOfTasks && strlen(json_encode($tasks)) < 50 * 1024 * 1024) {
// Until we find a task whose task type is set to be provided by the providers requested with this request
// Or no scheduled task is found anymore (given the taskIds to ignore)
$task = array_shift($tasks);
try {
$provider = $this->taskProcessingManager->getPreferredProvider($task->getTaskTypeId());
if (in_array($provider->getId(), $possibleProviderIds, true)) {
if ($this->taskProcessingManager->lockTask($task)) {
$tasksJson[] = ['task' => $task->jsonSerialize(), 'provider' => $provider->getId()];
continue;
}
}
} catch (Exception) {
// There is no provider set for the task type of this task
// proceed to ignore this task
}
}
$hasMore = count($tasks) > 0;
return new DataResponse([
'tasks' => $tasksJson,
'has_more' => $hasMore,
]);
} catch (Exception) {
return new DataResponse(['message' => $this->l->t('Internal error')], Http::STATUS_INTERNAL_SERVER_ERROR);
}
}
/**
* @param resource $data
* @return int
@ -611,4 +644,36 @@ class TaskProcessingApiController extends OCSController {
$file = $folder->newFile(time() . '-' . rand(1, 100000), $data);
return $file->getId();
}
/**
* @param array $taskTypeIds
* @param array $providerIds
* @return array
*/
private function intersectTaskTypesAndProviders(array $taskTypeIds, array $providerIds): array {
$providerIdsBasedOnTaskTypesWithNull = array_unique(array_map(function ($taskTypeId) {
try {
return $this->taskProcessingManager->getPreferredProvider($taskTypeId)->getId();
} catch (Exception) {
return null;
}
}, $taskTypeIds));
$providerIdsBasedOnTaskTypes = array_filter($providerIdsBasedOnTaskTypesWithNull, fn ($providerId) => $providerId !== null);
// restrict $providerIds to providers that are configured as preferred for the passed task types
$possibleProviderIds = array_values(array_intersect($providerIdsBasedOnTaskTypes, $providerIds));
// restrict $taskTypeIds to task types that can actually be run by one of the now restricted providers
$possibleTaskTypeIds = array_values(array_filter($taskTypeIds, function ($taskTypeId) use ($possibleProviderIds) {
try {
$providerForTaskType = $this->taskProcessingManager->getPreferredProvider($taskTypeId)->getId();
} catch (Exception) {
// no provider found for task type
return false;
}
return in_array($providerForTaskType, $possibleProviderIds, true);
}));
return [$possibleProviderIds, $possibleTaskTypeIds];
}
}

@ -1364,6 +1364,223 @@
}
}
}
},
"/ocs/v2.php/taskprocessing/tasks_provider/next_batch": {
"get": {
"operationId": "task_processing_api-get-next-scheduled-task-batch",
"summary": "Returns the next n scheduled tasks for the specified set of taskTypes and providers The returned tasks are capped at ~50MiB",
"description": "This endpoint requires admin access",
"tags": [
"task_processing_api"
],
"security": [
{
"bearer_auth": []
},
{
"basic_auth": []
}
],
"parameters": [
{
"name": "providerIds[]",
"in": "query",
"description": "The ids of the providers",
"required": true,
"schema": {
"type": "array",
"items": {
"type": "string"
}
}
},
{
"name": "taskTypeIds[]",
"in": "query",
"description": "The ids of the task types",
"required": true,
"schema": {
"type": "array",
"items": {
"type": "string"
}
}
},
{
"name": "numberOfTasks",
"in": "query",
"description": "The number of tasks to return",
"schema": {
"type": "integer",
"format": "int64",
"default": 1
}
},
{
"name": "OCS-APIRequest",
"in": "header",
"description": "Required to be true for the API request to pass",
"required": true,
"schema": {
"type": "boolean",
"default": true
}
}
],
"responses": {
"200": {
"description": "Tasks returned",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {
"type": "object",
"required": [
"tasks",
"has_more"
],
"properties": {
"tasks": {
"type": "array",
"items": {
"type": "object",
"required": [
"task",
"provider"
],
"properties": {
"task": {
"$ref": "#/components/schemas/TaskProcessingTask"
},
"provider": {
"type": "string"
}
}
}
},
"has_more": {
"type": "boolean"
}
}
}
}
}
}
}
}
}
},
"500": {
"description": "",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {
"type": "object",
"required": [
"message"
],
"properties": {
"message": {
"type": "string"
}
}
}
}
}
}
}
}
}
},
"401": {
"description": "Current user is not logged in",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {}
}
}
}
}
}
}
},
"403": {
"description": "Logged in account must be an admin",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {}
}
}
}
}
}
}
}
}
}
}
},
"tags": [

@ -11473,6 +11473,223 @@
}
}
},
"/ocs/v2.php/taskprocessing/tasks_provider/next_batch": {
"get": {
"operationId": "task_processing_api-get-next-scheduled-task-batch",
"summary": "Returns the next n scheduled tasks for the specified set of taskTypes and providers The returned tasks are capped at ~50MiB",
"description": "This endpoint requires admin access",
"tags": [
"task_processing_api"
],
"security": [
{
"bearer_auth": []
},
{
"basic_auth": []
}
],
"parameters": [
{
"name": "providerIds[]",
"in": "query",
"description": "The ids of the providers",
"required": true,
"schema": {
"type": "array",
"items": {
"type": "string"
}
}
},
{
"name": "taskTypeIds[]",
"in": "query",
"description": "The ids of the task types",
"required": true,
"schema": {
"type": "array",
"items": {
"type": "string"
}
}
},
{
"name": "numberOfTasks",
"in": "query",
"description": "The number of tasks to return",
"schema": {
"type": "integer",
"format": "int64",
"default": 1
}
},
{
"name": "OCS-APIRequest",
"in": "header",
"description": "Required to be true for the API request to pass",
"required": true,
"schema": {
"type": "boolean",
"default": true
}
}
],
"responses": {
"200": {
"description": "Tasks returned",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {
"type": "object",
"required": [
"tasks",
"has_more"
],
"properties": {
"tasks": {
"type": "array",
"items": {
"type": "object",
"required": [
"task",
"provider"
],
"properties": {
"task": {
"$ref": "#/components/schemas/TaskProcessingTask"
},
"provider": {
"type": "string"
}
}
}
},
"has_more": {
"type": "boolean"
}
}
}
}
}
}
}
}
}
},
"500": {
"description": "",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {
"type": "object",
"required": [
"message"
],
"properties": {
"message": {
"type": "string"
}
}
}
}
}
}
}
}
}
},
"401": {
"description": "Current user is not logged in",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {}
}
}
}
}
}
}
},
"403": {
"description": "Logged in account must be an admin",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {}
}
}
}
}
}
}
}
}
}
},
"/ocs/v2.php/twofactor/state": {
"get": {
"operationId": "two_factor_api-state",

@ -233,4 +233,35 @@ class TaskMapper extends QBMapper {
return 0;
}
}
/**
* @param list<string> $taskTypes
* @param list<int> $taskIdsToIgnore
* @param int $numberOfTasks
* @return list<Task>
* @throws Exception
*/
public function findNOldestScheduledByType(array $taskTypes, array $taskIdsToIgnore, int $numberOfTasks) {
$qb = $this->db->getQueryBuilder();
$qb->select(Task::$columns)
->from($this->tableName)
->where($qb->expr()->eq('status', $qb->createPositionalParameter(\OCP\TaskProcessing\Task::STATUS_SCHEDULED, IQueryBuilder::PARAM_INT)))
->setMaxResults($numberOfTasks)
->orderBy('last_updated', 'ASC');
if (!empty($taskTypes)) {
$filter = [];
foreach ($taskTypes as $taskType) {
$filter[] = $qb->expr()->eq('type', $qb->createPositionalParameter($taskType));
}
$qb->andWhere($qb->expr()->orX(...$filter));
}
if (!empty($taskIdsToIgnore)) {
$qb->andWhere($qb->expr()->notIn('id', $qb->createNamedParameter($taskIdsToIgnore, IQueryBuilder::PARAM_INT_ARRAY)));
}
return $this->findEntities($qb);
}
}

@ -1184,11 +1184,23 @@ class Manager implements IManager {
$taskEntity = $this->taskMapper->findOldestScheduledByType($taskTypeIds, $taskIdsToIgnore);
return $taskEntity->toPublicTask();
} catch (DoesNotExistException $e) {
throw new \OCP\TaskProcessing\Exception\NotFoundException('Could not find the task', 0, $e);
throw new \OCP\TaskProcessing\Exception\NotFoundException('Could not find the task', previous: $e);
} catch (\OCP\DB\Exception $e) {
throw new \OCP\TaskProcessing\Exception\Exception('There was a problem finding the task', 0, $e);
throw new \OCP\TaskProcessing\Exception\Exception('There was a problem finding the task', previous: $e);
} catch (\JsonException $e) {
throw new \OCP\TaskProcessing\Exception\Exception('There was a problem parsing JSON after finding the task', 0, $e);
throw new \OCP\TaskProcessing\Exception\Exception('There was a problem parsing JSON after finding the task', previous: $e);
}
}
public function getNextScheduledTasks(array $taskTypeIds = [], array $taskIdsToIgnore = [], int $numberOfTasks = 1): array {
try {
return array_map(fn ($taskEntity) => $taskEntity->toPublicTask(), $this->taskMapper->findNOldestScheduledByType($taskTypeIds, $taskIdsToIgnore, $numberOfTasks));
} catch (DoesNotExistException $e) {
throw new \OCP\TaskProcessing\Exception\NotFoundException('Could not find the task', previous: $e);
} catch (\OCP\DB\Exception $e) {
throw new \OCP\TaskProcessing\Exception\Exception('There was a problem finding the task', previous: $e);
} catch (\JsonException $e) {
throw new \OCP\TaskProcessing\Exception\Exception('There was a problem parsing JSON after finding the task', previous: $e);
}
}

@ -160,6 +160,16 @@ interface IManager {
*/
public function getNextScheduledTask(array $taskTypeIds = [], array $taskIdsToIgnore = []): Task;
/**
* @param list<string> $taskTypeIds
* @param list<int> $taskIdsToIgnore
* @param int $numberOfTasks
* @return list<Task>
* @throws Exception If the query failed
* @since 33.0.0
*/
public function getNextScheduledTasks(array $taskTypeIds = [], array $taskIdsToIgnore = [], int $numberOfTasks = 1): array;
/**
* @param int $id The id of the task
* @param string|null $userId The user id that scheduled the task

@ -14989,6 +14989,223 @@
}
}
},
"/ocs/v2.php/taskprocessing/tasks_provider/next_batch": {
"get": {
"operationId": "core-task_processing_api-get-next-scheduled-task-batch",
"summary": "Returns the next n scheduled tasks for the specified set of taskTypes and providers The returned tasks are capped at ~50MiB",
"description": "This endpoint requires admin access",
"tags": [
"core/task_processing_api"
],
"security": [
{
"bearer_auth": []
},
{
"basic_auth": []
}
],
"parameters": [
{
"name": "providerIds[]",
"in": "query",
"description": "The ids of the providers",
"required": true,
"schema": {
"type": "array",
"items": {
"type": "string"
}
}
},
{
"name": "taskTypeIds[]",
"in": "query",
"description": "The ids of the task types",
"required": true,
"schema": {
"type": "array",
"items": {
"type": "string"
}
}
},
{
"name": "numberOfTasks",
"in": "query",
"description": "The number of tasks to return",
"schema": {
"type": "integer",
"format": "int64",
"default": 1
}
},
{
"name": "OCS-APIRequest",
"in": "header",
"description": "Required to be true for the API request to pass",
"required": true,
"schema": {
"type": "boolean",
"default": true
}
}
],
"responses": {
"200": {
"description": "Tasks returned",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {
"type": "object",
"required": [
"tasks",
"has_more"
],
"properties": {
"tasks": {
"type": "array",
"items": {
"type": "object",
"required": [
"task",
"provider"
],
"properties": {
"task": {
"$ref": "#/components/schemas/CoreTaskProcessingTask"
},
"provider": {
"type": "string"
}
}
}
},
"has_more": {
"type": "boolean"
}
}
}
}
}
}
}
}
}
},
"500": {
"description": "",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {
"type": "object",
"required": [
"message"
],
"properties": {
"message": {
"type": "string"
}
}
}
}
}
}
}
}
}
},
"401": {
"description": "Current user is not logged in",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {}
}
}
}
}
}
}
},
"403": {
"description": "Logged in account must be an admin",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {}
}
}
}
}
}
}
}
}
}
},
"/ocs/v2.php/twofactor/state": {
"get": {
"operationId": "core-two_factor_api-state",