feat(taskprocessing): add cleanup flag to tasks to decide if they should be cleaned up automatically

Signed-off-by: Julien Veyssier <julien-nc@posteo.net>
pull/54272/head
Julien Veyssier 2025-08-06 12:21:23 +07:00
parent 5660a73a3d
commit 8c52b6c0fe
No known key found for this signature in database
GPG Key ID: 4141FEE162030638
12 changed files with 190 additions and 50 deletions

@ -391,7 +391,7 @@ class TaskProcessingApiController extends OCSController {
* @return StreamResponse<Http::STATUS_OK, array{}>|DataResponse<Http::STATUS_INTERNAL_SERVER_ERROR|Http::STATUS_NOT_FOUND, array{message: string}, array{}>
*/
private function getFileContentsInternal(Task $task, int $fileId): StreamResponse|DataResponse {
$ids = $this->extractFileIdsFromTask($task);
$ids = $this->taskProcessingManager->extractFileIdsFromTask($task);
if (!in_array($fileId, $ids)) {
return new DataResponse(['message' => $this->l->t('Not found')], Http::STATUS_NOT_FOUND);
}
@ -428,45 +428,6 @@ class TaskProcessingApiController extends OCSController {
return $response;
}
/**
* @param Task $task
* @return list<int>
* @throws NotFoundException
*/
private function extractFileIdsFromTask(Task $task): array {
$ids = [];
$taskTypes = $this->taskProcessingManager->getAvailableTaskTypes();
if (!isset($taskTypes[$task->getTaskTypeId()])) {
throw new NotFoundException('Could not find task type');
}
$taskType = $taskTypes[$task->getTaskTypeId()];
foreach ($taskType['inputShape'] + $taskType['optionalInputShape'] as $key => $descriptor) {
if (in_array(EShapeType::getScalarType($descriptor->getShapeType()), [EShapeType::File, EShapeType::Image, EShapeType::Audio, EShapeType::Video], true)) {
/** @var int|list<int> $inputSlot */
$inputSlot = $task->getInput()[$key];
if (is_array($inputSlot)) {
$ids = array_merge($inputSlot, $ids);
} else {
$ids[] = $inputSlot;
}
}
}
if ($task->getOutput() !== null) {
foreach ($taskType['outputShape'] + $taskType['optionalOutputShape'] as $key => $descriptor) {
if (in_array(EShapeType::getScalarType($descriptor->getShapeType()), [EShapeType::File, EShapeType::Image, EShapeType::Audio, EShapeType::Video], true)) {
/** @var int|list<int> $outputSlot */
$outputSlot = $task->getOutput()[$key];
if (is_array($outputSlot)) {
$ids = array_merge($outputSlot, $ids);
} else {
$ids[] = $outputSlot;
}
}
}
}
return $ids;
}
/**
* Sets the task progress
*

@ -0,0 +1,49 @@
<?php
declare(strict_types=1);
/**
* SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
namespace OC\Core\Migrations;
use Closure;
use OCP\DB\ISchemaWrapper;
use OCP\DB\Types;
use OCP\Migration\Attributes\AddColumn;
use OCP\Migration\Attributes\ColumnType;
use OCP\Migration\IOutput;
use OCP\Migration\SimpleMigrationStep;
/**
*
*/
#[AddColumn(table: 'taskprocessing_tasks', name: 'cleanup', type: ColumnType::SMALLINT)]
class Version32000Date20250806110519 extends SimpleMigrationStep {
/**
* @param IOutput $output
* @param Closure $schemaClosure The `\Closure` returns a `ISchemaWrapper`
* @param array $options
* @return null|ISchemaWrapper
*/
public function changeSchema(IOutput $output, Closure $schemaClosure, array $options): ?ISchemaWrapper {
/** @var ISchemaWrapper $schema */
$schema = $schemaClosure();
if ($schema->hasTable('taskprocessing_tasks')) {
$table = $schema->getTable('taskprocessing_tasks');
if (!$table->hasColumn('cleanup')) {
$table->addColumn('cleanup', Types::SMALLINT, [
'notnull' => true,
'default' => 1,
'unsigned' => true,
]);
return $schema;
}
}
return null;
}
}

@ -200,6 +200,7 @@ namespace OC\Core;
* scheduledAt: ?int,
* startedAt: ?int,
* endedAt: ?int,
* cleanup: bool,
* }
*
* @psalm-type CoreProfileAction = array{

@ -1512,6 +1512,7 @@ return array(
'OC\\Core\\Migrations\\Version31000Date20250213102442' => $baseDir . '/core/Migrations/Version31000Date20250213102442.php',
'OC\\Core\\Migrations\\Version32000Date20250620081925' => $baseDir . '/core/Migrations/Version32000Date20250620081925.php',
'OC\\Core\\Migrations\\Version32000Date20250731062008' => $baseDir . '/core/Migrations/Version32000Date20250731062008.php',
'OC\\Core\\Migrations\\Version32000Date20250806110519' => $baseDir . '/core/Migrations/Version32000Date20250806110519.php',
'OC\\Core\\Notification\\CoreNotifier' => $baseDir . '/core/Notification/CoreNotifier.php',
'OC\\Core\\ResponseDefinitions' => $baseDir . '/core/ResponseDefinitions.php',
'OC\\Core\\Service\\LoginFlowV2Service' => $baseDir . '/core/Service/LoginFlowV2Service.php',

@ -1553,6 +1553,7 @@ class ComposerStaticInit749170dad3f5e7f9ca158f5a9f04f6a2
'OC\\Core\\Migrations\\Version31000Date20250213102442' => __DIR__ . '/../../..' . '/core/Migrations/Version31000Date20250213102442.php',
'OC\\Core\\Migrations\\Version32000Date20250620081925' => __DIR__ . '/../../..' . '/core/Migrations/Version32000Date20250620081925.php',
'OC\\Core\\Migrations\\Version32000Date20250731062008' => __DIR__ . '/../../..' . '/core/Migrations/Version32000Date20250731062008.php',
'OC\\Core\\Migrations\\Version32000Date20250806110519' => __DIR__ . '/../../..' . '/core/Migrations/Version32000Date20250806110519.php',
'OC\\Core\\Notification\\CoreNotifier' => __DIR__ . '/../../..' . '/core/Notification/CoreNotifier.php',
'OC\\Core\\ResponseDefinitions' => __DIR__ . '/../../..' . '/core/ResponseDefinitions.php',
'OC\\Core\\Service\\LoginFlowV2Service' => __DIR__ . '/../../..' . '/core/Service/LoginFlowV2Service.php',

@ -45,6 +45,8 @@ use OCP\TaskProcessing\Task as OCPTask;
* @method int getStartedAt()
* @method setEndedAt(int $endedAt)
* @method int getEndedAt()
* @method setCleanup(int $cleanup)
* @method int getCleanup()
*/
class Task extends Entity {
protected $lastUpdated;
@ -63,16 +65,17 @@ class Task extends Entity {
protected $scheduledAt;
protected $startedAt;
protected $endedAt;
protected $cleanup;
/**
* @var string[]
*/
public static array $columns = ['id', 'last_updated', 'type', 'input', 'output', 'status', 'user_id', 'app_id', 'custom_id', 'completion_expected_at', 'error_message', 'progress', 'webhook_uri', 'webhook_method', 'scheduled_at', 'started_at', 'ended_at'];
public static array $columns = ['id', 'last_updated', 'type', 'input', 'output', 'status', 'user_id', 'app_id', 'custom_id', 'completion_expected_at', 'error_message', 'progress', 'webhook_uri', 'webhook_method', 'scheduled_at', 'started_at', 'ended_at', 'cleanup'];
/**
* @var string[]
*/
public static array $fields = ['id', 'lastUpdated', 'type', 'input', 'output', 'status', 'userId', 'appId', 'customId', 'completionExpectedAt', 'errorMessage', 'progress', 'webhookUri', 'webhookMethod', 'scheduledAt', 'startedAt', 'endedAt'];
public static array $fields = ['id', 'lastUpdated', 'type', 'input', 'output', 'status', 'userId', 'appId', 'customId', 'completionExpectedAt', 'errorMessage', 'progress', 'webhookUri', 'webhookMethod', 'scheduledAt', 'startedAt', 'endedAt', 'cleanup'];
public function __construct() {
@ -94,6 +97,7 @@ class Task extends Entity {
$this->addType('scheduledAt', 'integer');
$this->addType('startedAt', 'integer');
$this->addType('endedAt', 'integer');
$this->addType('cleanup', 'integer');
}
public function toRow(): array {
@ -122,6 +126,7 @@ class Task extends Entity {
'scheduledAt' => $task->getScheduledAt(),
'startedAt' => $task->getStartedAt(),
'endedAt' => $task->getEndedAt(),
'cleanup' => $task->getCleanup(),
]);
return $taskEntity;
}
@ -144,6 +149,7 @@ class Task extends Entity {
$task->setScheduledAt($this->getScheduledAt());
$task->setStartedAt($this->getStartedAt());
$task->setEndedAt($this->getEndedAt());
$task->setCleanup($this->getCleanup() !== 0);
return $task;
}
}

@ -183,16 +183,38 @@ class TaskMapper extends QBMapper {
/**
* @param int $timeout
* @param bool $force If true, ignore the cleanup flag
* @return int the number of deleted tasks
* @throws Exception
*/
public function deleteOlderThan(int $timeout): int {
public function deleteOlderThan(int $timeout, bool $force = false): int {
$qb = $this->db->getQueryBuilder();
$qb->delete($this->tableName)
->where($qb->expr()->lt('last_updated', $qb->createPositionalParameter($this->timeFactory->getDateTime()->getTimestamp() - $timeout)));
if (!$force) {
$qb->andWhere($qb->expr()->eq('cleanup', $qb->createPositionalParameter(1, IQueryBuilder::PARAM_INT)));
}
return $qb->executeStatement();
}
/**
* @param int $timeout
* @param bool $force If true, ignore the cleanup flag
* @return \Generator<Task>
* @throws Exception
*/
public function getTasksToCleanup(int $timeout, bool $force = false): \Generator {
$qb = $this->db->getQueryBuilder();
$qb->select($this->tableName)
->where($qb->expr()->lt('last_updated', $qb->createPositionalParameter($this->timeFactory->getDateTime()->getTimestamp() - $timeout)));
if (!$force) {
$qb->andWhere($qb->expr()->eq('cleanup', $qb->createPositionalParameter(1, IQueryBuilder::PARAM_INT)));
}
foreach ($this->yieldEntities($qb) as $entity) {
yield $entity;
};
}
public function update(Entity $entity): Entity {
$entity->setLastUpdated($this->timeFactory->now()->getTimestamp());
return parent::update($entity);

@ -1448,6 +1448,45 @@ class Manager implements IManager {
}
}
/**
* @param Task $task
* @return list<int>
* @throws NotFoundException
*/
public function extractFileIdsFromTask(Task $task): array {
$ids = [];
$taskTypes = $this->getAvailableTaskTypes();
if (!isset($taskTypes[$task->getTaskTypeId()])) {
throw new NotFoundException('Could not find task type');
}
$taskType = $taskTypes[$task->getTaskTypeId()];
foreach ($taskType['inputShape'] + $taskType['optionalInputShape'] as $key => $descriptor) {
if (in_array(EShapeType::getScalarType($descriptor->getShapeType()), [EShapeType::File, EShapeType::Image, EShapeType::Audio, EShapeType::Video], true)) {
/** @var int|list<int> $inputSlot */
$inputSlot = $task->getInput()[$key];
if (is_array($inputSlot)) {
$ids = array_merge($inputSlot, $ids);
} else {
$ids[] = $inputSlot;
}
}
}
if ($task->getOutput() !== null) {
foreach ($taskType['outputShape'] + $taskType['optionalOutputShape'] as $key => $descriptor) {
if (in_array(EShapeType::getScalarType($descriptor->getShapeType()), [EShapeType::File, EShapeType::Image, EShapeType::Audio, EShapeType::Video], true)) {
/** @var int|list<int> $outputSlot */
$outputSlot = $task->getOutput()[$key];
if (is_array($outputSlot)) {
$ids = array_merge($outputSlot, $ids);
} else {
$ids[] = $outputSlot;
}
}
}
}
return $ids;
}
/**
* Make a request to the task's webhookUri if necessary
*

@ -9,10 +9,15 @@ namespace OC\TaskProcessing;
use OC\TaskProcessing\Db\TaskMapper;
use OCP\AppFramework\Utility\ITimeFactory;
use OCP\BackgroundJob\TimedJob;
use OCP\DB\Exception;
use OCP\Files\AppData\IAppDataFactory;
use OCP\Files\File;
use OCP\Files\InvalidPathException;
use OCP\Files\IRootFolder;
use OCP\Files\NotFoundException;
use OCP\Files\NotPermittedException;
use OCP\Files\SimpleFS\ISimpleFolder;
use OCP\TaskProcessing\IManager;
use Psr\Log\LoggerInterface;
class RemoveOldTasksBackgroundJob extends TimedJob {
@ -22,6 +27,8 @@ class RemoveOldTasksBackgroundJob extends TimedJob {
public function __construct(
ITimeFactory $timeFactory,
private TaskMapper $taskMapper,
private IManager $taskProcessingManager,
private IRootFolder $rootFolder,
private LoggerInterface $logger,
IAppDataFactory $appDataFactory,
) {
@ -37,6 +44,11 @@ class RemoveOldTasksBackgroundJob extends TimedJob {
* @inheritDoc
*/
protected function run($argument): void {
try {
$this->cleanupTaskProcessingTaskFiles();
} catch (\Exception $e) {
$this->logger->warning('Failed to delete stale task processing tasks files', ['exception' => $e]);
}
try {
$this->taskMapper->deleteOlderThan(self::MAX_TASK_AGE_SECONDS);
} catch (\OCP\DB\Exception $e) {
@ -52,11 +64,6 @@ class RemoveOldTasksBackgroundJob extends TimedJob {
} catch (NotFoundException $e) {
// noop
}
try {
$this->clearFilesOlderThan($this->appData->getFolder('TaskProcessing'), self::MAX_TASK_AGE_SECONDS);
} catch (NotFoundException $e) {
// noop
}
}
/**
@ -76,4 +83,29 @@ class RemoveOldTasksBackgroundJob extends TimedJob {
}
}
/**
* @return void
* @throws InvalidPathException
* @throws NotFoundException
* @throws \JsonException
* @throws Exception
* @throws \OCP\TaskProcessing\Exception\NotFoundException
*/
private function cleanupTaskProcessingTaskFiles(): void {
foreach ($this->taskMapper->getTasksToCleanup(self::MAX_TASK_AGE_SECONDS) as $task) {
$ocpTask = $task->toPublicTask();
$fileIds = $this->taskProcessingManager->extractFileIdsFromTask($ocpTask);
foreach ($fileIds as $fileId) {
// only look for output files stored in appData/TaskProcessing/
$file = $this->rootFolder->getFirstNodeByIdInPath($fileId, '/' . $this->rootFolder->getAppDataDirectoryName() . '/TaskProcessing/');
if ($file instanceof File) {
try {
$file->delete();
} catch (NotPermittedException $e) {
$this->logger->warning('Failed to delete a stale task processing file', ['exception' => $e]);
}
}
}
}
}
}

@ -234,4 +234,14 @@ interface IManager {
* @since 30.0.0
*/
public function setTaskStatus(Task $task, int $status): void;
/**
* Extract all input and output file IDs from a task
*
* @param Task $task
* @return list<int>
* @throws NotFoundException
* @since 32.0.0
*/
public function extractFileIdsFromTask(Task $task): array;
}

@ -66,6 +66,7 @@ final class Task implements \JsonSerializable {
protected ?int $scheduledAt = null;
protected ?int $startedAt = null;
protected ?int $endedAt = null;
protected bool $cleanup = true;
/**
* @param string $taskTypeId
@ -253,7 +254,23 @@ final class Task implements \JsonSerializable {
}
/**
* @psalm-return array{id: int, lastUpdated: int, type: string, status: 'STATUS_CANCELLED'|'STATUS_FAILED'|'STATUS_SUCCESSFUL'|'STATUS_RUNNING'|'STATUS_SCHEDULED'|'STATUS_UNKNOWN', userId: ?string, appId: string, input: array<string, list<numeric|string>|numeric|string>, output: ?array<string, list<numeric|string>|numeric|string>, customId: ?string, completionExpectedAt: ?int, progress: ?float, scheduledAt: ?int, startedAt: ?int, endedAt: ?int}
* @return bool
* @since 32.0.0
*/
final public function getCleanup(): bool {
return $this->cleanup;
}
/**
* @param bool $cleanup
* @since 32.0.0
*/
final public function setCleanup(bool $cleanup): void {
$this->cleanup = $cleanup;
}
/**
* @psalm-return array{id: int, lastUpdated: int, type: string, status: 'STATUS_CANCELLED'|'STATUS_FAILED'|'STATUS_SUCCESSFUL'|'STATUS_RUNNING'|'STATUS_SCHEDULED'|'STATUS_UNKNOWN', userId: ?string, appId: string, input: array<string, list<numeric|string>|numeric|string>, output: ?array<string, list<numeric|string>|numeric|string>, customId: ?string, completionExpectedAt: ?int, progress: ?float, scheduledAt: ?int, startedAt: ?int, endedAt: ?int, cleanup: bool}
* @since 30.0.0
*/
final public function jsonSerialize(): array {
@ -272,6 +289,7 @@ final class Task implements \JsonSerializable {
'scheduledAt' => $this->getScheduledAt(),
'startedAt' => $this->getStartedAt(),
'endedAt' => $this->getEndedAt(),
'cleanup' => $this->getCleanup(),
];
}

@ -9,7 +9,7 @@
// between betas, final and RCs. This is _not_ the public version number. Reset minor/patch level
// when updating major/minor version number.
$OC_Version = [32, 0, 0, 2];
$OC_Version = [32, 0, 0, 3];
// The human-readable string
$OC_VersionString = '32.0.0 dev';