"Request a database snapshot",
'Download' => "Download snapshot file",
'ProcessFragment' => "Process fragments",
'CategoryDepth' => 'Updating category depth',
'Finish' => 'Finalize the import',
];
public $current_task;
public $current_task_class;
public $current_task_num;
public $current_task_desc = '';
public $current_task_retry = 0;
public $fragment_size;
public $import_id;
public $nonce;
public $is_finished = false;
public function __construct($environment, $dbl, $message_handler, $api_client, $scheduler, $importing_parts) {
$this->environment = $environment;
$this->dbl = $dbl;
$this->message_handler = $message_handler;
$this->api_client = $api_client;
$this->scheduler = $scheduler;
$this->importing_parts = $importing_parts;
}
public function generate_import_id() {
return \Arlo\Utilities::get_random_int();
}
public function set_import_id($import_id) {
$this->import_id = $import_id;
}
public function set_current_import_id($import_id) {
update_option('arlo_import_id', $import_id);
$this->current_import_id = $import_id;
}
public function get_current_import_id() {
//need to access the db directly, get_option('arlo_import_id'); can return a cached (old) value
$table_name = $this->dbl->prefix . "options";
$sql = "SELECT option_value
FROM $table_name
WHERE option_name = 'arlo_import_id'";
$this->current_import_id = $this->dbl->get_var($sql);
return $this->current_import_id;
}
public function set_last_import_date() {
$now = \Arlo\Utilities::get_now_utc();
$timestamp = $now->format("Y-m-d H:i:s");
update_option('arlo_last_import', $timestamp);
$this->last_import_date = $timestamp;
}
public function set_tax_exempt_events($import_id) {
$settings = get_option('arlo_settings');
if (!empty($settings['taxexempt_tag'])) {
$sql = $this->dbl->prepare("
UPDATE
{$this->dbl->prefix}arlo_events AS e,
{$this->dbl->prefix}arlo_events_tags AS et,
{$this->dbl->prefix}arlo_tags AS t
SET
e_is_taxexempt = 1
WHERE
t.tag = %s
AND
t.id = et.tag_id
AND
et.e_id = e.e_id
AND
et.import_id = %d
AND
t.import_id = %d
AND
e.import_id = %d
", [trim($settings['taxexempt_tag']), $import_id, $import_id, $import_id]);
} else {
$sql = $this->dbl->prepare("
UPDATE
{$this->dbl->prefix}arlo_events AS e
SET
e_is_taxexempt = 0
WHERE
e.import_id = %d
", [$import_id]);
}
$query = $this->dbl->query($sql);
if ($query === false) {
throw new \Exception('SQL error at set_tax_exempt_events: ' . $this->dbl->last_error);
}
}
public function get_last_import_date() {
if(!is_null($this->last_import_date)) {
return $this->last_import_date;
}
$this->last_import_date = get_option('arlo_last_import');
return $this->last_import_date;
}
public function set_state($state) {
$this->state = $state;
$task_keys = array_keys($this->import_tasks);
if (!empty($state)) {
if (!empty($state->current_subtask)) {
$this->set_current_task($state->current_subtask);
$this->current_task_iteration = (isset($state->iteration) && is_numeric($state->iteration) ? ((isset($state->subtask_state) && $state->subtask_state->iteration_finished == 1) || !isset($state->subtask_state) ? $state->iteration + 1 : $state->iteration ) : 0);
} else if (!empty($state->finished_subtask)) {
//figure out the next task;
$k = array_search($state->finished_subtask, $task_keys);
if ($k !== false && isset($task_keys[++$k])) {
$this->set_current_task($task_keys[$k]);
$state->current_subtask_retry = 0;
} else {
$this->is_finished = true;
}
} else {
$this->set_current_task($task_keys[0]);
}
} else {
$this->set_current_task($task_keys[0]);
}
$this->current_task_retry = (isset($state->current_subtask_retry) ? $state->current_subtask_retry + 1 : $this->current_task_retry + 1);
$this->scheduler->update_task_data($this->task_id, $this->get_state());
}
public function get_state() {
$state = [
'finished_subtask' => null,
'current_subtask' => null,
'current_subtask_retry' => $this->current_task_retry,
'iteration' => null,
'subtask_state' => null,
];
if (!is_null($this->current_task_class)) {
$state['subtask_state'] = $this->current_task_class->get_state();
if ($this->current_task_class->is_finished) {
$state['finished_subtask'] = $this->current_task;
} else {
$state['current_subtask'] = $this->current_task;
$state['iteration'] = $this->current_task_class->iteration;
}
} else {
$state['current_subtask'] = $this->current_task;
}
return $state;
}
public function should_importer_run($force = false) {
if(!$force) {
Logger::log('Synchronization Started', $this->import_id);
Logger::log('Synchronization identified as automatic synchronization.', $this->import_id);
if(!empty($last)) {
Logger::log('Previous successful synchronization found.', $this->import_id);
if(strtotime('-1 hour') > strtotime($this->get_last_import_date())) {
Logger::log('Synchronization more than an hour old. Synchronization required.', $this->import_id);
}
else {
Logger::log('Synchronization less than an hour old (' . date("Y-m-d H:i:s", strtotime($this->get_last_import_date())) . '). Synchronization stopped.', $this->import_id);
return false;
}
}
}
return true;
}
public function check_viable_execution_environment() {
return $this->environment->check_viable_execution_environment();
}
public function set_current_task($task_step) {
$this->current_task = $task_step;
$this->current_task_num = array_search($this->current_task, array_keys($this->import_tasks));
$this->current_task_desc = $this->import_tasks[$this->current_task];
}
private function get_data_json() {
$item = $this->importing_parts->get_import_part("image", null, $this->import_id);
if (empty($item)) {
throw new \Exception("Import Error: the import \"image\" part cannot be found");
}
if (empty($item->import_text)) {
throw new \Exception("Import Error: the content of the import part is empty");
}
$this->data_json = json_decode($item->import_text);
if (is_null($this->data_json)) {
throw new \Exception("JSON Error: " . json_last_error_msg());
}
}
public function run($force = false, $task_id = 0) {
$this->task_id = intval($task_id);
$retval = true;
$this->set_import_id(\Arlo\Utilities::get_random_int());
if ($this->task_id > 0) {
$task = $this->scheduler->get_task_data($this->task_id);
if (count($task)) {
$task = $task[0];
};
if (empty($task->task_data_text) && $this->should_importer_run($force)) {
Logger::log('Synchronization Started', $this->import_id);
$this->scheduler->update_task_data($this->task_id, ['import_id' => $this->import_id]);
} else {
$task->task_data_text = json_decode($task->task_data_text);
if (empty($task->task_data_text->import_id)) {
return false;
} else {
$this->set_import_id($task->task_data_text->import_id);
}
}
}
//if an import is already running, exit
if ($this->acquire_import_lock()) {
set_error_handler ( function($num, $str, $file, $line, $context = null) {
error_log($str . ' in ' . $file . ' on line ' . $line);
//pretty nasty, but need to know if our plugin throws the error or something else (like a cache plugin)
//arlo- is in case $file would not include the path; just 'arlo' would catch all errors for hosted servers like vanguard.wpdemo.arlo.co where domain is used in the plugin file path
if (strpos($file, 'arlo-') !== false || strpos($file, 'arlowp')) {
// specific error for file permission
if (strpos($str, 'fopen(') === 0) {
if (strpos($str, 'ermission denied') > 0) {
Logger::log("Missing write permission" . (strpos($str, "/import/") > 0 ? " on 'import' directory" : ""), $this->import_id);
}
}
throw new \Exception($str);
}
}, E_ALL & ~E_USER_NOTICE & ~E_NOTICE & ~E_DEPRECATED);
try {
$this->set_state($task->task_data_text);
if (!$this->is_finished) {
if (!$this->is_finished && isset($this->import_tasks[$this->current_task])) {
$this->run_import_task($this->current_task);
}
//means that wasn't any error/warning during the task
$this->current_task_retry--;
$this->scheduler->update_task_data($this->task_id, $this->get_state());
$this->scheduler->update_task($this->task_id, 1);
}
if ($this->is_finished) {
//finish task
$this->scheduler->update_task($this->task_id, 4, "Import finished");
$this->scheduler->clear_cron();
$this->importing_parts->delete_all_import_parts();
} else if ($this->current_task_num > 0) {
$this->kick_off_scheduler();
}
} catch(\Exception $e) {
if ($this->should_retry($this->get_state()) && !($e instanceof \Arlo\SchedulerException)) {
//pause the task
$this->scheduler->update_task($this->task_id, 1);
$this->kick_off_scheduler();
} else {
Logger::log($e->getMessage(), $this->import_id);
Logger::log('Synchronization failed, please check the Log ', $this->import_id);
//cancel the task
$this->scheduler->update_task($this->task_id, 3);
$retval = false;
}
}
restore_error_handler();
} else {
Logger::log('Synchronization LOCK found, please wait 5 minutes and try again', $this->import_id);
$retval = false;
}
$this->clear_import_lock();
$this->scheduler->unlock_process('import');
return $retval;
}
private function update_task_data_for_retry($state) {
$data = ['current_subtask_retry' => $state['current_subtask_retry']];
if (!is_null($state['subtask_state'])) {
$data['subtask_state']['current_subtask_retry'] = $state['subtask_state']['current_subtask_retry'];
}
$this->scheduler->update_task_data($this->task_id, $data);
}
private function should_retry($state) {
if ((is_null($state['subtask_state']) && $state['current_subtask_retry'] >= self::MAX_RETRY_ATTEMPT) || (!is_null($state['subtask_state']) && $state['subtask_state']['current_subtask_retry'] >= self::MAX_RETRY_ATTEMPT)) {
$subtask_desc = $this->get_subtask_state_desc();
Logger::log("Maximum retry attempt reached for '" . $this->current_task_desc . $subtask_desc . "'", $this->import_id);
return false;
}
return true;
}
public function clear_import_lock() {
$table_name = $this->dbl->prefix . "arlo_import_lock";
$query = $this->dbl->query('DELETE FROM ' . $table_name);
}
public function get_import_lock_entries_number() {
$table_name = $this->dbl->prefix ."arlo_import_lock";
$sql = '
SELECT
lock_acquired
FROM
' . $table_name . '
WHERE
lock_expired > NOW()
';
$this->dbl->get_results($sql);
return $this->dbl->num_rows;
}
private function cleanup_import_lock() {
$table_name = $this->dbl->prefix ."arlo_import_lock";
$this->dbl->query(
'DELETE FROM
' . $table_name . '
WHERE
lock_expired < NOW()
'
);
}
private function add_import_lock() {
$table_lock = $this->dbl->prefix . "arlo_import_lock";
$table_log = $this->dbl->prefix . "arlo_log";
$query = $this->dbl->query(
'INSERT INTO ' . $table_lock . ' (import_id, lock_acquired, lock_expired)
SELECT ' . $this->import_id . ', NOW(), ADDTIME(NOW(), "00:05:00.00") FROM ' . $table_log . ' WHERE (SELECT count(1) FROM ' . $table_lock . ') = 0 LIMIT 1');
return $query !== false && $query == 1;
}
public function acquire_import_lock() {
$lock_entries_num = $this->get_import_lock_entries_number();
if ($lock_entries_num == 0) {
$this->cleanup_import_lock();
if ($this->add_import_lock($this->import_id)) {
return true;
}
} else if ($lock_entries_num == 1) {
return $this->check_import_lock($this->import_id);
}
return false;
}
public function check_import_lock() {
$table_name = "{$this->dbl->prefix}arlo_import_lock";
$sql = '
SELECT
lock_acquired
FROM
' . $table_name . '
WHERE
import_id = ' . $this->import_id . '
AND
lock_expired > NOW()';
$this->dbl->get_results($sql);
if ($this->dbl->num_rows == 1) {
return true;
}
return false;
}
private function run_import_task($import_task) {
$this->data_json = null;
if ($this->current_task_num == 2) {
$this->get_data_json();
}
$this->environment->start_time = time(); // Set start time of current process.
$class_name = "Arlo\Importer\\" . $import_task;
$this->current_task_class = new $class_name($this, $this->dbl, $this->message_handler, (!empty($this->data_json->$import_task) ? $this->data_json->$import_task : null), $this->current_task_iteration, $this->api_client, $this->scheduler, $this->importing_parts);
$this->current_task_class->task_id = $this->task_id;
//we need to do some special setup for different tasks
switch($this->current_task) {
case 'ImportRequest':
$this->current_task_class->fragment_size = $this->fragment_size;
break;
case 'Download':
$import = $this->get_import_entry($this->import_id, null, 1);
if (!is_null($import)) {
if (!empty($import->callback_json)) {
$callback_json = json_decode($import->callback_json);
if (json_last_error() != JSON_ERROR_NONE) {
error_log("JSON Decode error: " . json_last_error_msg());
Logger::log_error("JSON Decode error: " . json_last_error_msg(), $this->import_id);
}
if (!empty($callback_json->SnapshotUri)) {
$this->current_task_class->uri = $callback_json->SnapshotUri;
$this->current_task_class->import_part = "image";
$this->current_task_class->import_iteration = null;
$this->current_task_class->response_json = json_decode($import->response_json);
} elseif (!empty($callback_json->Error)) {
Logger::log_error($callback_json->Error->Code . ': ' . $callback_json->Error->Message, $this->import_id);
}
} else {
Logger::log_error('The import callback did not happen', $this->import_id);
}
} else {
Logger::log_error('Couldn\'t retrive the import from database', $this->import_id);
}
break;
case 'ProcessFragment':
$this->current_task_class->set_state($this->state->subtask_state);
if (!empty($this->data_json->FullImageFragments->Elements[$this->current_task_iteration])) {
$this->current_task_desc .= ' ' . ($this->current_task_iteration+1) . '/' . count($this->data_json->FullImageFragments->Elements);
$this->current_task_class->uri = $this->data_json->FullImageFragments->Elements[$this->current_task_iteration]->Uri;
} else {
$this->current_task_class->is_finished = true;
}
break;
}
if (!$this->current_task_class->is_finished && !$this->current_task_class->iteration_finished) {
$subtask_desc = $this->get_subtask_state_desc();
$this->scheduler->update_task($this->task_id, 2, "Import is running: task " . ($this->current_task_num + 1) . "/" . count($this->import_tasks) . ": " . $this->current_task_desc . $subtask_desc);
Logger::log('Import subtask started: ' . ($this->current_task_num + 1) . "/" . count($this->import_tasks) . ": " . $this->current_task_desc . $subtask_desc, $this->import_id);
$this->current_task_class->run();
Logger::log('Import subtask ended: ' . ($this->current_task_num + 1) . "/" . count($this->import_tasks) . ": " . $this->current_task_desc . $subtask_desc, $this->import_id);
}
}
private function get_subtask_state_desc() {
$subtask_desc = '';
if (!is_null($this->current_task_class)) {
$subtask_state = $this->current_task_class->get_state();
if (!is_null($subtask_state)) {
$subtask_desc = ': ' . ($subtask_state['current_subtask_num'] + 1) . '/' . count($this->current_task_class->import_tasks) . ' ' . $subtask_state['current_subtask_desc'];
}
}
return $subtask_desc;
}
private function validate_import_entry($nonce = null, $request_id = null) {
$import = $this->get_import_entry(null, $request_id, 1);
if (!is_null($import)) {
if ($nonce == $import->nonce) {
return $import;
}
}
return false;
}
private function kick_off_scheduler() {
$this->scheduler->unlock_process('import');
$this->scheduler->kick_off_scheduler();
}
public function callback() {
try {
$callback_json = json_decode(utf8_encode(file_get_contents("php://input")));
if (!is_null($callback_json) && !empty($callback_json->Nonce) &&
($import = $this->validate_import_entry($callback_json->Nonce, $callback_json->RequestID)) !== false && !empty($callback_json->__jwe__)) {
$this->set_import_id($import->import_id);
$this->update_import_entry(['callback_json' => json_encode($callback_json) ]);
$response_json = json_decode($import->response_json);
//JWE decode
$decoded = preg_replace('/[\x00-\x1F\x7F]/', '', utf8_decode(Crypto::jwe_decrypt($callback_json->__jwe__, $response_json->Callback->EncryptedResponse->key->k)));
$decoded_json = json_decode($decoded);
if (!empty($decoded_json->SnapshotUri)) {
$this->update_import_entry(['callback_json' => $decoded]);
$this->kick_off_scheduler();
} else {
if (!empty($decoded_json->Error)) {
throw new \Exception($decoded_json->Error->Code . ': ' . $decoded_json->Error->Message);
} else {
throw new \Exception('Error in the response for the snapshot request');
}
}
} else {
throw new \Exception('no Nonce or the requested import is not valid');
}
} catch(\Exception $e) {
Logger::log($e->getMessagE(), (!empty($import->import_id)) ? $import->import_id : null);
Logger::log('Synchronization failed, please check the Log ', $this->import_id);
if (!empty($import->import_id)) {
$task = $this->scheduler->get_tasks([1,2], null, null, 1, $import->import_id);
if (!empty($task[0]->task_id)) {
$this->scheduler->update_task($task[0]->task_id, 3);
}
}
}
}
public function get_import_entry($import_id = null, $request_id = null, $limit = null) {
$utc_date = gmdate("Y-m-d H:i:s");
$import_id = (!empty($import_id) && is_numeric($import_id) ? $import_id : null);
$limit = (!empty($limit) && is_numeric($limit) ? $limit : null);
$request_id = (!empty($request_id) ? $request_id : null);
if (is_null($request_id) && is_null($import_id))
return null;
$table_name = $this->dbl->prefix . "arlo_import";
$sql = '
SELECT
import_id,
request_id,
nonce,
callback_json,
response_json,
created,
modified,
expired
FROM
' . $table_name . '
WHERE
1
' . (!is_null($import_id) ? ' AND import_id = ' . $import_id : '' ) . '
' . (!is_null($request_id) ? ' AND request_id = "' . esc_sql($request_id) . '"' : '' ) . '
AND
expired >= "' . $utc_date . '"
' . (!is_null($limit) ? ' LIMIT ' . $limit : '' ) . '
';
if (is_null($import = $this->dbl->get_results($sql))) {
Logger::log_error('Couldn\'t find valid import');
} else if ($limit == 1) {
$import = $import[0];
}
return $import;
}
public function update_import_entry($data = array()) {
$utc_date = gmdate("Y-m-d H:i:s");
$table_name = $this->dbl->prefix . "arlo_import";
$available_fields_for_update = [
'request_id',
'callback_json',
'response_json'
];
$update_fields = [];
foreach ($available_fields_for_update as $field) {
if (!empty($data[$field])) {
$update_fields[] = $field . '="' . $this->dbl->_real_escape($data[$field]) . '"';
}
}
$sql = '
UPDATE
' . $table_name . '
SET
' . (count($update_fields) ? implode(', ', $update_fields) . ', ' : '' ) . '
modified = "' . $utc_date . '"
WHERE
import_id = ' . $this->import_id . '
';
if ($this->dbl->query($sql) === false) {
throw new \Exception('SQL error: ' . $this->dbl->last_error . ' ' .$this->dbl->last_query);
}
}
public function set_import_entry($nonce = null) {
$utc_date = gmdate("Y-m-d H:i:s");
$utc_plusonehour = gmdate("Y-m-d H:i:s", time() + (60 * 60));
$table_name = $this->dbl->prefix . "arlo_import";
$sql = '
INSERT INTO
' . $table_name . '
(import_id, nonce, created, expired)
VALUES
(%s, %s, %s, %s)
';
$query = $this->dbl->query($this->dbl->prepare($sql, $this->import_id, $nonce, $utc_date, $utc_plusonehour));
if ($query) {
return $this->dbl->insert_id;
} else {
return false;
}
}
}