Elgg  Version 6.1
DatabaseQueue.php
Go to the documentation of this file.
1 <?php
2 
3 namespace Elgg\Queue;
4 
10 
17 class DatabaseQueue implements \Elgg\Queue\Queue {
18 
19  use TimeUsing;
20 
24  public const TABLE_NAME = 'queue';
25 
29  protected $workerId;
30 
37  public function __construct(protected string $name, protected \Elgg\Database $db) {
38  $this->workerId = md5(microtime() . getmypid());
39  }
40 
44  public function enqueue($item) {
45  $insert = Insert::intoTable(self::TABLE_NAME);
46  $insert->values([
47  'name' => $insert->param($this->name, ELGG_VALUE_STRING),
48  'data' => $insert->param(serialize($item), ELGG_VALUE_STRING),
49  'timestamp' => $insert->param($this->getCurrentTime()->getTimestamp(), ELGG_VALUE_TIMESTAMP),
50  ]);
51 
52  return $this->db->insertData($insert) !== false;
53  }
54 
58  public function dequeue() {
59  // get a record for processing
60  $select = Select::fromTable(self::TABLE_NAME);
61  $select->select('*')
62  ->where($select->compare('name', '=', $this->name, ELGG_VALUE_STRING))
63  ->andWhere($select->expr()->isNull('worker'))
64  ->orderBy('id', 'ASC')
65  ->setMaxResults(1);
66 
67  $row = $this->db->getDataRow($select);
68  if (empty($row)) {
69  return;
70  }
71 
72  // lock a record for processing
73  $update = Update::table(self::TABLE_NAME);
74  $update->set('worker', $update->param($this->workerId, ELGG_VALUE_STRING))
75  ->where($update->compare('name', '=', $this->name, ELGG_VALUE_STRING))
76  ->andWhere($update->compare('id', '=', $row->id, ELGG_VALUE_ID))
77  ->andWhere($update->expr()->isNull('worker'));
78 
79  if ($this->db->updateData($update, true) !== 1) {
80  return;
81  }
82 
83  // remove locked record from database
84  $delete = Delete::fromTable(self::TABLE_NAME);
85  $delete->where($delete->compare('id', '=', $row->id, ELGG_VALUE_ID));
86 
87  $this->db->deleteData($delete);
88 
89  return unserialize($row->data);
90  }
91 
95  public function clear() {
96  $delete = Delete::fromTable(self::TABLE_NAME);
97  $delete->where($delete->compare('name', '=', $this->name, ELGG_VALUE_STRING));
98 
99  $this->db->deleteData($delete);
100  }
101 
105  public function size() {
106  $select = Select::fromTable(self::TABLE_NAME);
107  $select->select('COUNT(*) AS total')
108  ->where($select->compare('name', '=', $this->name, ELGG_VALUE_STRING));
109 
110  $result = $this->db->getDataRow($select);
111  return (int) $result->total;
112  }
113 }
if(!$items) $item
Definition: delete.php:13
static table(string $table)
Returns a QueryBuilder for updating data in a given table.
Definition: Update.php:17
if(!$user||!$user->canDelete()) $name
Definition: delete.php:22
The Elgg database.
Definition: Database.php:26
clear()
{Clear all items from the queue.void}
$delete
const ELGG_VALUE_ID
Definition: constants.php:114
trait TimeUsing
Adds methods for setting the current time (for testing)
Definition: TimeUsing.php:10
Queue interface.
Definition: Queue.php:11
static intoTable(string $table)
Returns a QueryBuilder for inserting data in a given table.
Definition: Insert.php:17
dequeue()
{Remove an item from the queue.mixed}
getCurrentTime($modifier= '')
Get the (cloned) time.
Definition: TimeUsing.php:25
FIFO queue that uses the database for persistence.
__construct(protected string $name, protected\Elgg\Database $db)
Create a queue.
foreach($recommendedExtensions as $extension) if(empty(ini_get('session.gc_probability'))||empty(ini_get('session.gc_divisor'))) $db
static fromTable(string $table)
Returns a QueryBuilder for deleting data from a given table.
Definition: Delete.php:17
size()
{Get the size of the queue.int}
const ELGG_VALUE_TIMESTAMP
Definition: constants.php:115
enqueue($item)
{Add an item to the queue.Item to add to queue bool}
const ELGG_VALUE_STRING
Definition: constants.php:112
static fromTable(string $table, string $alias=null)
Returns a QueryBuilder for selecting data from a given table.
Definition: Select.php:18