Elgg  Version 5.1
DatabaseQueue.php
Go to the documentation of this file.
1 <?php
2 
3 namespace Elgg\Queue;
4 
10 
17 class DatabaseQueue implements \Elgg\Queue\Queue {
18 
19  use TimeUsing;
20 
24  const TABLE_NAME = 'queue';
25 
29  protected $name;
30 
34  protected $db;
35 
39  protected $workerId;
40 
47  public function __construct(string $name, \Elgg\Database $db) {
48  $this->db = $db;
49  $this->name = $name;
50  $this->workerId = md5(microtime() . getmypid());
51  }
52 
56  public function enqueue($item) {
57  $insert = Insert::intoTable(self::TABLE_NAME);
58  $insert->values([
59  'name' => $insert->param($this->name, ELGG_VALUE_STRING),
60  'data' => $insert->param(serialize($item), ELGG_VALUE_STRING),
61  'timestamp' => $insert->param($this->getCurrentTime()->getTimestamp(), ELGG_VALUE_TIMESTAMP),
62  ]);
63 
64  return $this->db->insertData($insert) !== false;
65  }
66 
70  public function dequeue() {
71  // get a record for processing
72  $select = Select::fromTable(self::TABLE_NAME);
73  $select->select('*')
74  ->where($select->compare('name', '=', $this->name, ELGG_VALUE_STRING))
75  ->andWhere($select->expr()->isNull('worker'))
76  ->orderBy('id', 'ASC')
77  ->setMaxResults(1);
78 
79  $row = $this->db->getDataRow($select);
80  if (empty($row)) {
81  return;
82  }
83 
84  // lock a record for processing
85  $update = Update::table(self::TABLE_NAME);
86  $update->set('worker', $update->param($this->workerId, ELGG_VALUE_STRING))
87  ->where($update->compare('name', '=', $this->name, ELGG_VALUE_STRING))
88  ->andWhere($update->compare('id', '=', $row->id, ELGG_VALUE_ID))
89  ->andWhere($update->expr()->isNull('worker'));
90 
91  if ($this->db->updateData($update, true) !== 1) {
92  return;
93  }
94 
95  // remove locked record from database
96  $delete = Delete::fromTable(self::TABLE_NAME);
97  $delete->where($delete->compare('id', '=', $row->id, ELGG_VALUE_ID));
98 
99  $this->db->deleteData($delete);
100 
101  return unserialize($row->data);
102  }
103 
107  public function clear() {
108  $delete = Delete::fromTable(self::TABLE_NAME);
109  $delete->where($delete->compare('name', '=', $this->name, ELGG_VALUE_STRING));
110 
111  $this->db->deleteData($delete);
112  }
113 
117  public function size() {
118  $select = Select::fromTable(self::TABLE_NAME);
119  $select->select('COUNT(*) AS total')
120  ->where($select->compare('name', '=', $this->name, ELGG_VALUE_STRING));
121 
122  $result = $this->db->getDataRow($select);
123  return (int) $result->total;
124  }
125 }
if(!$items) $item
Definition: delete.php:13
static table($table, $alias=null)
{}
Definition: Update.php:13
The Elgg database.
Definition: Database.php:25
clear()
{Clear all items from the queue.void}
__construct(string $name,\Elgg\Database $db)
Create a queue.
$delete
const ELGG_VALUE_ID
Definition: constants.php:114
trait TimeUsing
Adds methods for setting the current time (for testing)
Definition: TimeUsing.php:10
Queue interface.
Definition: Queue.php:11
dequeue()
{Remove an item from the queue.mixed}
getCurrentTime($modifier= '')
Get the (cloned) time.
Definition: TimeUsing.php:25
FIFO queue that uses the database for persistence.
static intoTable($table)
{}
Definition: Insert.php:13
size()
{Get the size of the queue.int}
const ELGG_VALUE_TIMESTAMP
Definition: constants.php:115
static fromTable($table, $alias=null)
{}
Definition: Select.php:13
enqueue($item)
{Add an item to the queue.Item to add to queue bool}
const ELGG_VALUE_STRING
Definition: constants.php:112
$site name
Definition: settings.php:15
static fromTable($table, $alias=null)
{}
Definition: Delete.php:13