Commit 753a367d authored by Simon Gadient's avatar Simon Gadient

[IMP] Cleanup stalled imports

refs KIME-4583
parent c05951cd
...@@ -51,7 +51,7 @@ class SpreadsheetImportCommandController extends CommandController { ...@@ -51,7 +51,7 @@ class SpreadsheetImportCommandController extends CommandController {
public function importCommand() { public function importCommand() {
$currentImportingCount = $this->spreadsheetImportRepository->countByImportingStatus(SpreadsheetImport::IMPORTING_STATUS_IN_PROGRESS); $currentImportingCount = $this->spreadsheetImportRepository->countByImportingStatus(SpreadsheetImport::IMPORTING_STATUS_IN_PROGRESS);
if ($currentImportingCount > 0) { if ($currentImportingCount > 0) {
$this->outputFormatted('Previous spreadsheet import is still in progress.'); $this->outputLine('Previous spreadsheet import is still in progress.');
$this->quit(); $this->quit();
} }
/** @var SpreadsheetImport $spreadsheetImport */ /** @var SpreadsheetImport $spreadsheetImport */
...@@ -67,35 +67,65 @@ class SpreadsheetImportCommandController extends CommandController { ...@@ -67,35 +67,65 @@ class SpreadsheetImportCommandController extends CommandController {
try { try {
$this->spreadsheetImportService->import(); $this->spreadsheetImportService->import();
$spreadsheetImport->setImportingStatus(SpreadsheetImport::IMPORTING_STATUS_COMPLETED); $spreadsheetImport->setImportingStatus(SpreadsheetImport::IMPORTING_STATUS_COMPLETED);
$this->outputFormatted('Spreadsheet has been imported. %d inserted, %d updated, %d deleted, %d skipped', $this->outputLine('Spreadsheet has been imported. %d inserted, %d updated, %d deleted, %d skipped',
array($spreadsheetImport->getTotalInserted(), $spreadsheetImport->getTotalUpdated(), $spreadsheetImport->getTotalDeleted(), $spreadsheetImport->getTotalSkipped())); array($spreadsheetImport->getTotalInserted(), $spreadsheetImport->getTotalUpdated(), $spreadsheetImport->getTotalDeleted(), $spreadsheetImport->getTotalSkipped()));
} catch (Exception $e) { } catch (\Exception $e) {
$spreadsheetImport->setImportingStatus(SpreadsheetImport::IMPORTING_STATUS_FAILED); $spreadsheetImport->setImportingStatus(SpreadsheetImport::IMPORTING_STATUS_FAILED);
$this->outputFormatted('Spreadsheet import failed.'); $this->outputLine('Spreadsheet import failed.');
} }
try {
$this->spreadsheetImportRepository->update($spreadsheetImport); $this->spreadsheetImportRepository->update($spreadsheetImport);
} catch (\Exception $e) {
$this->outputLine('Spreadsheet import status update error. It remains in progress until cleanup.');
}
} else { } else {
$this->outputFormatted('No spreadsheet import in queue.'); $this->outputFormatted('No spreadsheet import in queue.');
} }
} }
/** /**
* Cleanup previous spreadsheet imports. Threashold defined in settings. * Cleanup past and stalled imports.
*
* @param int $keepPastImportsThreasholdDays Overwrites the setting value
* @param int $maxExecutionThreasholdMinutes Overwrites the setting value
*/ */
public function cleanupCommand() { public function cleanupCommand($keepPastImportsThreasholdDays = -1, $maxExecutionThreasholdMinutes = -1) {
$cleanupImportsThreasholdDays = intval($this->settings['cleanupImportsThreasholdDays']); $keepPastImportsThreasholdDays = ($keepPastImportsThreasholdDays >= 0) ? $keepPastImportsThreasholdDays : intval($this->settings['keepPastImportsThreasholdDays']);
$cleanupFromDate = new \DateTime(); $maxExecutionThreasholdMinutes = ($maxExecutionThreasholdMinutes >= 0) ? $maxExecutionThreasholdMinutes : intval($this->settings['maxExecutionThreasholdMinutes']);
$cleanupFromDate->sub(new \DateInterval('P' . $cleanupImportsThreasholdDays . 'D')); $this->cleanupPastImports($keepPastImportsThreasholdDays);
$oldSpreadsheetImports = $this->spreadsheetImportRepository->findPreviousImportsBySpecificDate($cleanupFromDate); $this->cleanupStalledImports($maxExecutionThreasholdMinutes);
if ($oldSpreadsheetImports->count() > 0) {
/** @var SpreadsheetImport $oldSpreadsheetImport */
foreach ($oldSpreadsheetImports as $oldSpreadsheetImport) {
$this->spreadsheetImportRepository->remove($oldSpreadsheetImport);
} }
$this->outputLine('%d spreadsheet imports were removed.', array($oldSpreadsheetImports->count()));
} else { /**
$this->outputLine('There is no spreadsheet import in queue to remove.'); * Delete past imports
*
* @param int $keepPastImportsThreasholdDays
*/
private function cleanupPastImports($keepPastImportsThreasholdDays) {
$cleanupFromDateTime = new \DateTime();
$cleanupFromDateTime->sub(new \DateInterval('P' . $keepPastImportsThreasholdDays . 'D'));
$spreadsheetImports = $this->spreadsheetImportRepository->findBySpecificDateTimeAndImportingStatus($cleanupFromDateTime);
/** @var SpreadsheetImport $spreadsheetImport */
foreach ($spreadsheetImports as $spreadsheetImport) {
$this->spreadsheetImportRepository->remove($spreadsheetImport);
} }
$this->outputLine('%d spreadsheet imports removed.', array($spreadsheetImports->count()));
} }
/**
* Set stalled imports to failed
*
* @param int $maxExecutionThreasholdMinutes
*/
private function cleanupStalledImports($maxExecutionThreasholdMinutes) {
$cleanupFromDateTime = new \DateTime();
$cleanupFromDateTime->sub(new \DateInterval('PT' . $maxExecutionThreasholdMinutes . 'M'));
$spreadsheetImports = $this->spreadsheetImportRepository->findBySpecificDateTimeAndImportingStatus($cleanupFromDateTime, SpreadsheetImport::IMPORTING_STATUS_IN_PROGRESS);
/** @var SpreadsheetImport $spreadsheetImport */
foreach ($spreadsheetImports as $spreadsheetImport) {
$spreadsheetImport->setImportingStatus(SpreadsheetImport::IMPORTING_STATUS_FAILED);
$this->spreadsheetImportRepository->update($spreadsheetImport);
}
$this->outputLine('%d spreadsheet imports set to failed.', array($spreadsheetImports->count()));
}
} }
...@@ -38,10 +38,19 @@ class SpreadsheetImport { ...@@ -38,10 +38,19 @@ class SpreadsheetImport {
protected $file; protected $file;
/** /**
* DateTime when import is scheduled to progress
*
* @var \DateTime * @var \DateTime
*/ */
protected $scheduleDate; protected $scheduleDate;
/**
* Actual DateTime when import is set to in progress
*
* @var \DateTime
*/
protected $progressDate;
/** /**
* @var string * @var string
* @ORM\Column(type="text") * @ORM\Column(type="text")
...@@ -100,6 +109,7 @@ class SpreadsheetImport { ...@@ -100,6 +109,7 @@ class SpreadsheetImport {
*/ */
public function __construct() { public function __construct() {
$this->scheduleDate = new \DateTime(); $this->scheduleDate = new \DateTime();
$this->progressDate = new \DateTime();
} }
/** /**
...@@ -142,6 +152,7 @@ class SpreadsheetImport { ...@@ -142,6 +152,7 @@ class SpreadsheetImport {
*/ */
public function setScheduleDate($scheduleDate) { public function setScheduleDate($scheduleDate) {
$this->scheduleDate = $scheduleDate; $this->scheduleDate = $scheduleDate;
$this->progressDate = $scheduleDate;
} }
/** /**
...@@ -238,6 +249,9 @@ class SpreadsheetImport { ...@@ -238,6 +249,9 @@ class SpreadsheetImport {
*/ */
public function setImportingStatus($importingStatus) { public function setImportingStatus($importingStatus) {
$this->importingStatus = $importingStatus; $this->importingStatus = $importingStatus;
if ($importingStatus === self::IMPORTING_STATUS_IN_PROGRESS) {
$this->progressDate = new \DateTime();
}
} }
/** /**
......
...@@ -42,12 +42,16 @@ class SpreadsheetImportRepository extends Repository { ...@@ -42,12 +42,16 @@ class SpreadsheetImportRepository extends Repository {
/** /**
* @param \DateTime $dateTime * @param \DateTime $dateTime
* @param int $importingStatus
* *
* @return \TYPO3\Flow\Persistence\QueryResultInterface * @return \TYPO3\Flow\Persistence\QueryResultInterface
*/ */
public function findPreviousImportsBySpecificDate(\DateTime $dateTime) { public function findBySpecificDateTimeAndImportingStatus(\DateTime $dateTime, $importingStatus = -1) {
$query = $this->createQuery(); $query = $this->createQuery();
$constraint = $query->lessThanOrEqual('scheduleDate', $dateTime); $constraint = $query->lessThanOrEqual('progressDate', $dateTime);
if ($importingStatus >= 0) {
$constraint = $query->logicalAnd($constraint, $query->equals('importingStatus', $importingStatus));
}
return $query->matching($constraint)->execute(); return $query->matching($constraint)->execute();
} }
......
WE: WE:
SpreadsheetImport: SpreadsheetImport:
cleanupImportsThreasholdDays: 365 keepPastImportsThreasholdDays: 365
maxExecutionThreasholdMinutes: 720
persistRecordsChunkSize: 100 persistRecordsChunkSize: 100
WE: WE:
SpreadsheetImport: SpreadsheetImport:
cleanupImportsThreasholdDays: 365 keepPastImportsThreasholdDays: 365
maxExecutionThreasholdMinutes: 720
persistRecordsChunkSize: 100 persistRecordsChunkSize: 100
default: default:
domain: WE\Sample\Domain\Model\User domain: WE\Sample\Domain\Model\User
......
<?php
namespace TYPO3\Flow\Persistence\Doctrine\Migrations;
use Doctrine\DBAL\Migrations\AbstractMigration;
use Doctrine\DBAL\Schema\Schema;
/**
* Add progressdate property
*/
class Version20161103191621 extends AbstractMigration
{
/**
* @return string
*/
public function getDescription() {
return '';
}
/**
* @param Schema $schema
* @return void
*/
public function up(Schema $schema)
{
$this->abortIf($this->connection->getDatabasePlatform()->getName() != 'mysql', 'Migration can only be executed safely on "mysql".');
$this->addSql('ALTER TABLE we_spreadsheetimport_domain_model_spreadsheetimport ADD progressdate DATETIME NOT NULL');
}
/**
* @param Schema $schema
* @return void
*/
public function down(Schema $schema)
{
$this->abortIf($this->connection->getDatabasePlatform()->getName() != 'mysql', 'Migration can only be executed safely on "mysql".');
$this->addSql('ALTER TABLE we_spreadsheetimport_domain_model_spreadsheetimport DROP progressdate');
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment