summaryrefslogtreecommitdiff
path: root/utils/process_queue/process_queue.pl
blob: 28401eb657051a86be28a68593526c7bbf0aeda2 (plain)
  1. #!/usr/bin/perl
  2. # TODO: Add POD -CT
  3. require "config.pl";
  4. use DBI;
  5. # TODO: Convert config.pl to namespace so we can use strict.
  6. my $cycle_delay;
  7. my $dbh = db_init();
  8. # Basic db connection setup routines
  9. my $sth;
  10. while (1) { # loop infinitely
  11. if ( $dbh->func('pg_notifies') ) {
  12. on_notify();
  13. }
  14. sleep $cycle_delay;
  15. }
  16. sub on_notify {
  17. my $job_id = 1;
  18. while ($job_id){
  19. ($job_id) = $dbh->selectrow_array(
  20. "SELECT min(id) from pending_job
  21. WHERE completed_at IS NULL
  22. FOR UPDATE"
  23. );
  24. if ($job_id){
  25. $job_id = $dbh->quote($job_id);
  26. my ($job_class) = $dbh->selectrow_array(
  27. "select class from batch_class where id =
  28. (select batch_class from pending_job where id = $job_id"
  29. );
  30. # Right now, we assume that every pending job has a batch id.
  31. # Longer-run we may need to use a template handle as well. -CT
  32. $dbh->execute('SELECT ' .
  33. $dbh->quote_identifier("job__process_$job_class") . "($job_id)"
  34. );
  35. my $errstr = $dbh->errstr;
  36. if (!$dbh->commit){ # Note error and clean up
  37. # Note that this does not clean up the queue holding tables.
  38. # This is a feature, not a bug, as it allows someone to review
  39. # the actual data and then delete if required separately -CT
  40. $dbh->do(
  41. "UPDATE pending_job
  42. SET completed_at = now(),
  43. success = false,
  44. error_condition = " . $dbh->quote($errstr) . "
  45. WHERE id = $job_id"
  46. );
  47. $dbh->commit;
  48. }
  49. # The line below is necessary because the job process functions
  50. # use set session authorization so one must reconnect to reset
  51. # administrative permissions. -CT
  52. $dbh = db_init();
  53. }
  54. }
  55. }
  56. sub db_init {
  57. my $dsn = "dbi:Pg:dbname=$database";
  58. my $dbh = DBI->connect(
  59. $dsn, $db_user,
  60. $db_passwd,
  61. {
  62. AutoCommit => 0,
  63. PrintError => 0,
  64. RaiseError => 1,
  65. }
  66. );
  67. $dbh->{pg_enable_utf8} = 1;
  68. ($cycle_delay) = $dbh->selectrow_array(
  69. "SELECT value FROM defaults
  70. WHERE setting_key = 'poll_frequency'"
  71. );
  72. if (!$cycle_delay){
  73. die "No Polling Frequency Set Up!";
  74. }
  75. $dbh->do("LISTEN job_entered");
  76. $dbh->commit;
  77. return $dbh;
  78. }