summaryrefslogtreecommitdiff
path: root/utils/process_queue/process_queue.pl
blob: 5bcc24cadd7e7b4c1a4e26bd9e681622a61b603f (plain)
  1. #!/usr/bin/perl
  2. # TODO: Add POD -CT
  3. require "config.pl";
  4. use DBI;
  5. # TODO: Convert config.pl to namespace so we can use strict.
  6. my $cycle_delay;
  7. my $dbh = db_init();
  8. # Basic db connection setup routines
  9. my $sth;
  10. while (1) { # loop infinitely
  11. if ( $dbh->func('pg_notifies') ) {
  12. on_notify();
  13. }
  14. sleep $cycle_delay;
  15. }
  16. sub on_notify {
  17. my $job_id = 1;
  18. while ($job_id){
  19. ($job_id) = $dbh->selectrow_array(
  20. "SELECT id from pending_job
  21. WHERE completed_at IS NULL
  22. ORDER BY id LIMIT 1
  23. FOR UPDATE"
  24. );
  25. if ($job_id){
  26. $job_id = $dbh->quote($job_id);
  27. my ($job_class) = $dbh->selectrow_array(
  28. "select class from batch_class where id =
  29. (select batch_class from pending_job where id = $job_id)"
  30. );
  31. # Right now, we assume that every pending job has a batch id.
  32. # Longer-run we may need to use a template handle as well. -CT
  33. $dbh->do('SELECT ' .
  34. $dbh->quote_identifier("job__process_$job_class") . "($job_id)"
  35. );
  36. my $errstr = $dbh->errstr;
  37. if (!$dbh->commit){ # Note error and clean up
  38. # Note that this does not clean up the queue holding tables.
  39. # This is a feature, not a bug, as it allows someone to review
  40. # the actual data and then delete if required separately -CT
  41. $dbh->do(
  42. "UPDATE pending_job
  43. SET completed_at = now(),
  44. success = false,
  45. error_condition = " . $dbh->quote($errstr) . "
  46. WHERE id = $job_id"
  47. );
  48. $dbh->commit;
  49. }
  50. # The line below is necessary because the job process functions
  51. # use set session authorization so one must reconnect to reset
  52. # administrative permissions. -CT
  53. $dbh->disconnect;
  54. $dbh = db_init();
  55. }
  56. }
  57. }
  58. sub db_init {
  59. my $dsn = "dbi:Pg:dbname=$database";
  60. my $dbh = DBI->connect(
  61. $dsn, $db_user,
  62. $db_passwd,
  63. {
  64. AutoCommit => 0,
  65. PrintError => 0,
  66. RaiseError => 1,
  67. }
  68. );
  69. $dbh->{pg_enable_utf8} = 1;
  70. ($cycle_delay) = $dbh->selectrow_array(
  71. "SELECT value FROM defaults
  72. WHERE setting_key = 'poll_frequency'"
  73. );
  74. if (!$cycle_delay){
  75. die "No Polling Frequency Set Up!";
  76. }
  77. $dbh->do("LISTEN job_entered");
  78. $dbh->commit;
  79. return $dbh;
  80. }