--- notifications.cron.inc 2009/10/05 17:51:13 1.6.2.6.2.20.2.8 +++ notifications.cron.inc 2010/03/12 19:50:34 1.6.2.6.2.20.2.9 @@ -125,8 +125,7 @@ // Calculate time limit. We get the smaller of all these times in seconds // There's an issue with poormanscron not setting the cron semaphore so it will default to current time $timelimit = array(); - $cronstart = variable_get('cron_semaphore', time()); - + $cronstart = variable_get('cron_semaphore', time()); // Max execution time may be zero meaning no limit, then no limits based on this if ($maxtime = ini_get('max_execution_time')) { $timelimit[] = $cronstart + $maxtime - NOTIFICATIONS_TIME_MARGIN; @@ -143,33 +142,39 @@ $limit['time'] = min($timelimit); } break; - - break; case 'init': $current[$name] = 0; $limit[$name] = $value; break; - case 'count': - $value = $value ? $value : 1; - isset($current[$name]) ? ($current[$name] += $value) : $current[$name] = $value; - break; + case 'option': if (isset($value)) { $options[$name] = $value; } return isset($options[$name]) ? $options[$name] : FALSE; + break; + case 'limit': + // Return limit value for counter + return isset($limit[$name]) ? $limit[$name] : 0; + case 'current': + // Return current value for counter + return isset($current[$name]) ? $current[$name] : 0; + case 'count': + $value = $value ? $value : 1; + isset($current[$name]) ? ($current[$name] += $value) : $current[$name] = $value; + break; + case 'check': + // Check all limits till we find a false one + $current['time'] = time(); + foreach ($limit as $name => $value) { + if ($value && !empty($current[$name]) && $current[$name] >= $value) { + watchdog('notifications', 'Reached processing limit on queue processing: %name = %value', array('%name' => $name, '%value' => $value)); + return FALSE; + } + } + return TRUE; } - - $current['time'] = time(); - - // Check all limits till we find a false one - foreach ($limit as $name => $value) { - if ($value && !empty($current[$name]) && $current[$name] >= $value) { - watchdog('notifications', 'Reached processing limit on queue processing: %name = %value', array('%name' => $name, '%value' => $value)); - return FALSE; - } - } - return TRUE; + } /** @@ -208,6 +213,7 @@ // Group rows by user, send_method, send_interval before composing and sending // This loop has to run a final time after all rows have been fetched while (($queue = db_fetch_object($result)) || $processed) { + notifications_process('count', 'row'); if (!$account || !$queue || ($queue->module != $module) || ($queue->uid != $account->uid) || ($queue->destination != $destination) || $queue->send_method != $send_method || $queue->send_interval != $send_interval) { // New user or sending method or destination, send if not the first row and reset if ($account && $events && $subscriptions) { @@ -290,7 +296,16 @@ $count = 0; // This is the time from which stored rows will be sent $timelimit = time() - $send_interval; - + // Check remaining rows to process to adjust query limits for both users and rows + $step_users = NOTIFICATIONS_STEP_USERS; + $step_rows = NOTIFICATIONS_STEP_ROWS; + if ($row_limit = notifications_process('limit', 'row')) { + $remaining_rows = $row_limit - notifications_process('current', 'row'); + if ($remaining_rows > 0) { + $step_users = min($remaining_rows, $step_users); + $step_rows = min($remaining_rows, $step_rows); + } + } // Get users to process messages for, with this time interval and ordered by squid // Order by last sent for this send interval // Note: If we get the users with more messages pending first this may save some time @@ -300,30 +315,30 @@ $sql .= " AND (su.uid IS NULL OR su.sent < %d) "; // Note: the group by su.sent seems to be needed by pgsql $sql .= " GROUP BY q.uid, q.destination, q.module, q.send_method, su.sent ORDER BY su.sent"; - $result = db_query_range($sql, $send_interval, $max_sqid, $timelimit, 0, NOTIFICATIONS_STEP_USERS); + $result = db_query_range($sql, $send_interval, $max_sqid, $timelimit, 0, $step_users); // We create a bach for each user, destination method and handle it to notifications_process_rows() while (($queue = db_fetch_object($result)) && notifications_process('check')) { notifications_log('Queue processing', array('user' => $queue->uid , 'rows' => $queue->count_rows, 'send method' => $queue->send_method)); $module = $queue->module; $events = $subscriptions = $processed = array(); - // Process all rows for this user. With some hard limit to prevent process lock ups. // In case we have too many rows, we go updating step by step - if ($queue->count_rows > NOTIFICATIONS_STEP_ROWS) { - $limit = NOTIFICATIONS_STEP_ROWS; + if ($queue->count_rows > $step_rows) { + $limit = $step_rows; $update = TRUE; } else { $limit = $queue->count_rows; $update = FALSE; } + $batch = array( 'uid' => $queue->uid, 'destination' => $queue->destination, 'module' => $queue->module, 'send_method' => $queue->send_method, 'send_interval' => $send_interval, 'cron' => 1, 'max_sqid' => $max_sqid, ); - // These rows may be processed by a different module + // These rows may be processed by a different module. Defaults to notifications_process_rows() $processed = notifications_callback($queue->module, 'process_rows', $batch, $limit, $update); $count += $processed;