From 8f62103e122793ebe3db93819417e688ce5b1e18 Mon Sep 17 00:00:00 2001 From: einhverfr Date: Wed, 12 Dec 2007 22:27:12 +0000 Subject: Basic outline of Job Queue System git-svn-id: https://ledger-smb.svn.sourceforge.net/svnroot/ledger-smb/trunk@1975 4979c152-3d1c-0410-bac9-87ea11338e46 --- LedgerSMB/DBObject/Payment.pm | 21 ++++- sql/Pg-database.sql | 70 +++++++++++++++-- sql/modules/Payment.sql | 144 ++++++++++++++++++++++++++++++----- sql/modules/Roles.sql | 4 +- sql/modules/Settings.sql | 2 +- utils/process_queue/config.pl | 18 +++++ utils/process_queue/process_queue.pl | 89 ++++++++++++++++++++++ 7 files changed, 319 insertions(+), 29 deletions(-) create mode 100644 utils/process_queue/config.pl create mode 100644 utils/process_queue/process_queue.pl diff --git a/LedgerSMB/DBObject/Payment.pm b/LedgerSMB/DBObject/Payment.pm index 417843df..abd45de3 100644 --- a/LedgerSMB/DBObject/Payment.pm +++ b/LedgerSMB/DBObject/Payment.pm @@ -385,6 +385,11 @@ sub get_payment_detail_data { sub post_bulk { my ($self) = @_; my $total_count = 0; + my ($ref) = $self->callproc( + procname => 'setting_get', + args => ['queue_payments'], + ); + my $queue_payments = $ref->{setting_get}; $self->{payment_date} = $self->{datepaid}; for my $contact_row (1 .. $self->{contact_count}){ my $contact_id = $self->{"contact_$contact_row"}; @@ -408,10 +413,20 @@ sub post_bulk { } $self->{transactions} = $invoice_array; $self->{source} = $self->{"source_$contact_id"}; - $self->exec_method(funcname => 'payment_bulk_post'); - + if ($queue_payments){ + my ($job_ref) = $self->exec_method( + funcname => 'job__create' + ) + $self->{job_id} = $job_ref->{job__create}; + $self->exec_method( + funcname => 'payment_bulk_queue_entry' + ); + } else { + $self->exec_method(funcname => 'payment_bulk_post'); + } } - $self->{dbh}->commit; + $self->{queue_payments} = $queue_payments; + return $self->{dbh}->commit; } 1; diff --git a/sql/Pg-database.sql b/sql/Pg-database.sql index 6e159ac3..4c620226 100644 --- a/sql/Pg-database.sql +++ b/sql/Pg-database.sql @@ -332,13 +332,7 @@ CREATE TABLE defaults ( setting_key text primary key, value text ); -/* - inventory_accno_id int, - income_accno_id int, - expense_accno_id int, - fxgain_accno_id int, - fxloss_accno_id int, -*/ + \COPY defaults FROM stdin WITH DELIMITER | sinumber|1 sonumber|1 @@ -358,7 +352,14 @@ customernumber|1 vendornumber|1 glnumber|1 projectnumber|1 +queue_payments|0 +poll_frequency|1 \. + +COMMENT ON TABLE defaults IS $$ +Note that poll_frequency is in seconds. poll_frequency and queue_payments +are not exposed via the admin interface as they are advanced features best +handled via DBAs. $$; -- */ CREATE TABLE acc_trans ( trans_id int NOT NULL REFERENCES transactions(id), @@ -2695,4 +2696,59 @@ CREATE INDEX location_address_three_gist__idx ON location USING gist(line_three CREATE INDEX location_city_prov_gist_idx ON location USING gist(city gist_trgm_ops); CREATE INDEX entity_name_gist_idx ON entity USING gist(name gist_trgm_ops); +CREATE TABLE pending_job ( + id serial not null unique, + batch_class int references batch_class(id), + entered_by text REFERENCES users(username) + not null default SESSION_USER, + entered_at timestamp default now(), + batch_id int references batch(id), + completed_at timestamp, + success bool, + error_condition text, + CHECK (completed_at IS NULL OR success IS NOT NULL), + CHECK (success IS NOT FALSE OR error_condition IS NOT NULL) +); +COMMENT ON table pending_job IS +$$ Purpose: This table stores pending/queued jobs to be processed async. +Additionally, this functions as a log of all such processing for purposes of +internal audits, performance tuning, and the like. $$; + +CREATE INDEX pending_job_batch_id_pending ON pending_job(batch_id) where success IS NULL; + +CREATE INDEX pending_job_entered_by ON pending_job(entered_by); + +CREATE OR REPLACE FUNCTION trigger_pending_job() RETURNS TRIGGER +AS +$$ +BEGIN + IF NEW.success IS NULL THEN + NOTIFY job_entered; + END IF; + RETURN NEW; +END; +$$ LANGUAGE PLPGSQL; + +CREATE TRIGGER notify_pending_jobs BEFORE INSERT OR UPDATE ON pending_job +FOR EACH ROW EXECUTE PROCEDURE trigger_pending_job(); + +CREATE TABLE payments_queue ( + transactions numeric[], + batch_id int, + source text, + total numeric, + ar_ap_accno text, + cash_accno text, + payment_date date, + account_class int, + job_id int references pending_job(id) + DEFAULT currval('pending_job_id_seq') +); + +CREATE INDEX payments_queue_job_id ON payments_queue(job_id); + +COMMENT ON table payments_queue IS +$$ This is a holding table and hence not a candidate for normalization. +Jobs should be deleted from this table when they complete successfully.$$; + commit; diff --git a/sql/modules/Payment.sql b/sql/modules/Payment.sql index 08755ce6..aae4917e 100644 --- a/sql/modules/Payment.sql +++ b/sql/modules/Payment.sql @@ -207,6 +207,88 @@ This then returns a set of contact information with a 2 dimensional array cnsisting of outstanding invoices. $$; +CREATE OR REPLACE FUNCTION payment_create_queue_entry() RETURNS int AS +$$ +$$ LANGUAGE PLPGSQL; + +CREATE OR REPLACE FUNCTION payment_bulk_queue +(in_transactions numeric[], in_batch_id int, in_source text, in_total numeric, + in_ar_ap_accno text, in_cash_accno text, + in_payment_date date, in_account_class int) +returns int as +$$ +BEGIN + INSERT INTO payments_queue + (transactions, batch_id, source, total, ar_ap_accno, cash_accno, + payment_date, account_class) + VALUES + (in_transactions, in_batch_id, in_source, in_total, in_ar_ap_accno, + in_cash_accno, in_payment_date, in_account_class); + + RETURN array_upper(in_transactions, 1) - + array_lower(in_transactions, 1); +END; +$$ LANGUAGE PLPGSQL; + +CREATE OR REPLACE FUNCTION job__process_payments(in_job_id int) +RETURNS bool AS $$ +DECLARE + queue_record RECORD + t_auth_name text, +BEGIN + -- TODO: Move the set session authorization into a utility function + SELECT created_by INTO t_auth_name FROM pending_jobs + WHERE id = in_job_id; + + EXECUTE 'SET SESSION AUTHORIZATION ' quote_ident(t_auth_name); + + FOR queue_record IN + SELECT * from payments_queue WHERE job_id = in_job_id + LOOP + PERFORM payment_bulk_post + (transactions, batch_id, source, total, ar_ap_accno, cash_accno, + payment_date, account_class); + END LOOP; + UPDATE pending_job + SET completed_at = timeofday()::timestamp, + success = true + WHERE id = in_job_id; + RETURN TRUE; +END; +$$ language plpgsql; + +CREATE OR REPLACE FUNCTION job__create(in_batch_class int, in_batch_id int) +RETURNS int AS +$$ +BEGIN + INSERT INTO pending_job (batch_class, batch_id) + VALUES (in_batch_class, in_batch_id); + + RETURN currval('pending_job_id_seq'); +END; +$$ LANGUAGE PLPGSQL; + +CREATE TYPE job__status AS ( + completed int, -- 1 for completed, 0 for no + success int, -- 1 for success, 0 for no + completed_at timestamp, + error_condition text -- error if not successful +); + +CREATE OR REPLACE FUNCTION job__status(in_job_id int) RETURNS job__status AS +$$ +DECLARE out_row job__status; +BEGIN + SELECT (completed_at IS NULL)::INT, success::int, completed_at, + error_condition + INTO out_row + FROM pending_job + WHERE id = in_job_id; + + RETURN out_row; +END; +$$ language plpgsql; + CREATE OR REPLACE FUNCTION payment_bulk_post (in_transactions numeric[], in_batch_id int, in_source text, in_total numeric, in_ar_ap_accno text, in_cash_accno text, @@ -265,7 +347,6 @@ BEGIN CASE WHEN t_voucher_id IS NULL THEN true ELSE false END, t_voucher_id, in_payment_date); - insert into test_pay(id, amount) values (in_transactions[out_count][1],in_transactions[out_count][2]); UPDATE ap set paid = paid +in_transactions[out_count][2] where id =in_transactions[out_count][1]; @@ -449,21 +530,37 @@ $$ language plpgsql; COMMENT ON FUNCTION payment_get_vc_info(in_entity_id int) IS $$ This function return vendor or customer info, its under construction $$; +CREATE TYPE payment_record AS ( + amount numeric, + meta_number text, + company_paid text, + cash_account_id int, + cash_accno text, + cash_account_description text, + ar_ap_account_id int, + ar_ap_accno text, + ar_ap_description text +); + CREATE OR REPLACE FUNCTION payment__retrieve (in_source text, in_meta_number text, in_account_class int, in_cash_accno text) -RETURNS SETOF numeric AS +RETURNS SETOF payment_record AS $$ -DECLARE out_row RECORD; +DECLARE out_row payment_record; BEGIN FOR out_row IN - SELECT amount * -1 AS amount - FROM acc_trans - WHERE source = in_source - AND trans_id IN ( - SELECT id FROM ar - WHERE in_account_class = 2 AND - entity_credit_account = - (select id + SELECT sum(case when at.amount > 0 then at.amount else 0 end) + AS amount, ec.meta_number, + c.legal_name, max(cc.id), max(cc.accno), + max(cc.description), max(ac.id), max(ac.accno), + max(ac.description) + FROM acc_trans at + JOIN entity_credit_account ec ON + (at.trans_id IN + (select id FROM ar + WHERE in_account_class = 2 + AND entity_credit_account = + (SELECT id FROM entity_credit_account WHERE meta_number = in_meta_number @@ -478,18 +575,31 @@ BEGIN WHERE meta_number = in_meta_number AND entity_class = - in_account_class) - AND chart_id = - (SELECT id FROM chart - WHERE accno = in_cash_accno) + in_account_class))) + + JOIN company c ON (ec.entity_id = c.entity_id) + LEFT JOIN chart cc ON (at.chart_id = cc.id AND + cc.link LIKE '%paid%') + JOIN chart ac ON (at.chart_id = ac.id AND + ((in_account_class = 1 AND ac.link = 'AP') OR + (in_account_class = 2 AND ac.link = 'AR'))) + WHERE source = in_source + GROUP BY ec.meta_number, c.legal_name + HAVING max(cc.accno) = in_cash_accno LOOP - return next out_row.amount; + return next out_row; END LOOP; END; $$ LANGUAGE plpgsql; + CREATE OR REPLACE FUNCTION payment__reverse (in_source text, in_date_paid date, in_credit_id int, in_cash_accno text) RETURNS INT AS $$ - +DECLARE + count int; +BEGIN + count := 0; + FOR +END; $$ LANGUAGE PLPGSQL; diff --git a/sql/modules/Roles.sql b/sql/modules/Roles.sql index 45a4213f..575a7a1a 100644 --- a/sql/modules/Roles.sql +++ b/sql/modules/Roles.sql @@ -1387,7 +1387,9 @@ GRANT SELECT ON language, project TO public; GRANT SELECT ON business, exchangerate, department, shipto, tax TO public; GRANT ALL ON recurring, recurringemail, recurringprint, status TO public; GRANT ALL ON transactions, entity_employee, customer, vendor TO public; ---TODO, lock recurring down more +GRANT ALL ON pending_job, payment_queue TO PUBLIC; +GRANT ALL ON pending_job_id_seq TO public; +--TODO, lock recurring, pending_job, payment_queue down more -- CT: The following grant is required for now, but will hopefully become less -- important when we get to 1.4 and can more sensibly lock things down. diff --git a/sql/modules/Settings.sql b/sql/modules/Settings.sql index 5b8d2b28..b9c71dc8 100644 --- a/sql/modules/Settings.sql +++ b/sql/modules/Settings.sql @@ -15,7 +15,7 @@ DECLARE out_value varchar; BEGIN SELECT value INTO out_value FROM defaults WHERE setting_key = in_key; - RETURN value; + RETURN out_value; END; $$ LANGUAGE plpgsql; diff --git a/utils/process_queue/config.pl b/utils/process_queue/config.pl new file mode 100644 index 00000000..c977a2b8 --- /dev/null +++ b/utils/process_queue/config.pl @@ -0,0 +1,18 @@ +#!/usr/bin/perl + +use vars qw($database $db_user + $db_passwd); + +# The databases containing LedgerSMB +our $database = ("ledgersmb"); + +# The user to connect with. This must be a superuser so that set session auth +# works as expected + +our $db_user = "postgres"; + +# The password for the db user: +our $db_passwd = "mypasswd"; + +1; + diff --git a/utils/process_queue/process_queue.pl b/utils/process_queue/process_queue.pl new file mode 100644 index 00000000..7c56869a --- /dev/null +++ b/utils/process_queue/process_queue.pl @@ -0,0 +1,89 @@ +#!/usr/bin/perl + +# TODO: Add POD -CT + +require "config.pl"; + +use DBI; +# TODO: Convert config.pl to namespace so we can use strict. + +my $cycle_delay; + +my $dbh = db_init(); + +# Basic db connection setup routines + + + +my $sth; + +$dbh->do("LISTEN job_entered"); +while (1) { # loop infinitely + if ( $dbh->func('pg_notifies') ) { + &on_notify; + } + sleep $cycle_delay; +} + +sub on_notify { + my $job_id = 1; + while ($job_id){ + ($job_id) = $dbh->selectrow_array( + "SELECT min(id) from pending_job + WHERE completed_at IS NULL + FOR UPDATE" + ); + if ($job_id){ + $job_id = $dbh->quote($job_id); + my ($job_class) = $dbh->selectrow_array( + "select class from batch_class where id = + (select batch_class from pending_job where id = $job_id" + ); + # Right now, we assume that every pending job has a batch id. + # Longer-run we may need to use a template handle as well. -CT + $dbh->execute('SELECT ' . + $dbh->quote_identifier("job__process_$job_class") . "($job_id)" + ); + my $errstr = $dbh->errstr; + if (!$dbh->commit){ # Note error and clean up + # Note that this does not clean up the queue holding tables. + # This is a feature, not a bug, as it allows someone to review + # the actual data and then delete if required separately -CT + $dbh->do( + "UPDATE pending_job + SET completed_at = now(), + success = false, + error_condition = " . $dbh->quote($errstr) . " + WHERE id = $job_id" + ); + $dbh->commit; + } + # The line below is necessary because the job process functions + # use set session authorization so one must reconnect to reset + # administrative permissions. -CT + $dbh = db_init(); + } + } +} + +sub db_init { + my $dsn = "dbi:Pg:dbname=$database"; + my $dbh = DBI->connect( + $dsn, $db_user, + $db_passwd, + { + AutoCommit => 0, + PrintError => 0, + RaiseError => 1, + } + ); + $dbh->{pg_enable_utf8} = 1; + ($cycle_delay) = $dbh->selectrow_array( + "SELECT value FROM defaults + WHERE setting_key = 'poll_frequency'" + ); + if (!$cycle_delay){ + die "No Polling Frequency Set Up!"; + } + return $dbh; +} -- cgit v1.2.3