summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoreinhverfr <einhverfr@4979c152-3d1c-0410-bac9-87ea11338e46>2007-12-12 22:27:12 +0000
committereinhverfr <einhverfr@4979c152-3d1c-0410-bac9-87ea11338e46>2007-12-12 22:27:12 +0000
commit8f62103e122793ebe3db93819417e688ce5b1e18 (patch)
tree8816d01f47923ba2f0dc1b873cdf4ce0448e5631
parentc5c2bcb35af4b06be72d53364fb587123a1df930 (diff)
Basic outline of Job Queue System
git-svn-id: https://ledger-smb.svn.sourceforge.net/svnroot/ledger-smb/trunk@1975 4979c152-3d1c-0410-bac9-87ea11338e46
-rw-r--r--LedgerSMB/DBObject/Payment.pm21
-rw-r--r--sql/Pg-database.sql70
-rw-r--r--sql/modules/Payment.sql144
-rw-r--r--sql/modules/Roles.sql4
-rw-r--r--sql/modules/Settings.sql2
-rw-r--r--utils/process_queue/config.pl18
-rw-r--r--utils/process_queue/process_queue.pl89
7 files changed, 319 insertions, 29 deletions
diff --git a/LedgerSMB/DBObject/Payment.pm b/LedgerSMB/DBObject/Payment.pm
index 417843df..abd45de3 100644
--- a/LedgerSMB/DBObject/Payment.pm
+++ b/LedgerSMB/DBObject/Payment.pm
@@ -385,6 +385,11 @@ sub get_payment_detail_data {
sub post_bulk {
my ($self) = @_;
my $total_count = 0;
+ my ($ref) = $self->callproc(
+ procname => 'setting_get',
+ args => ['queue_payments'],
+ );
+ my $queue_payments = $ref->{setting_get};
$self->{payment_date} = $self->{datepaid};
for my $contact_row (1 .. $self->{contact_count}){
my $contact_id = $self->{"contact_$contact_row"};
@@ -408,10 +413,20 @@ sub post_bulk {
}
$self->{transactions} = $invoice_array;
$self->{source} = $self->{"source_$contact_id"};
- $self->exec_method(funcname => 'payment_bulk_post');
-
+ if ($queue_payments){
+ my ($job_ref) = $self->exec_method(
+ funcname => 'job__create'
+ )
+ $self->{job_id} = $job_ref->{job__create};
+ $self->exec_method(
+ funcname => 'payment_bulk_queue_entry'
+ );
+ } else {
+ $self->exec_method(funcname => 'payment_bulk_post');
+ }
}
- $self->{dbh}->commit;
+ $self->{queue_payments} = $queue_payments;
+ return $self->{dbh}->commit;
}
1;
diff --git a/sql/Pg-database.sql b/sql/Pg-database.sql
index 6e159ac3..4c620226 100644
--- a/sql/Pg-database.sql
+++ b/sql/Pg-database.sql
@@ -332,13 +332,7 @@ CREATE TABLE defaults (
setting_key text primary key,
value text
);
-/*
- inventory_accno_id int,
- income_accno_id int,
- expense_accno_id int,
- fxgain_accno_id int,
- fxloss_accno_id int,
-*/
+
\COPY defaults FROM stdin WITH DELIMITER |
sinumber|1
sonumber|1
@@ -358,7 +352,14 @@ customernumber|1
vendornumber|1
glnumber|1
projectnumber|1
+queue_payments|0
+poll_frequency|1
\.
+
+COMMENT ON TABLE defaults IS $$
+Note that poll_frequency is in seconds. poll_frequency and queue_payments
+are not exposed via the admin interface as they are advanced features best
+handled via DBAs. $$;
-- */
CREATE TABLE acc_trans (
trans_id int NOT NULL REFERENCES transactions(id),
@@ -2695,4 +2696,59 @@ CREATE INDEX location_address_three_gist__idx ON location USING gist(line_three
CREATE INDEX location_city_prov_gist_idx ON location USING gist(city gist_trgm_ops);
CREATE INDEX entity_name_gist_idx ON entity USING gist(name gist_trgm_ops);
+CREATE TABLE pending_job (
+ id serial not null unique,
+ batch_class int references batch_class(id),
+ entered_by text REFERENCES users(username)
+ not null default SESSION_USER,
+ entered_at timestamp default now(),
+ batch_id int references batch(id),
+ completed_at timestamp,
+ success bool,
+ error_condition text,
+ CHECK (completed_at IS NULL OR success IS NOT NULL),
+ CHECK (success IS NOT FALSE OR error_condition IS NOT NULL)
+);
+COMMENT ON table pending_job IS
+$$ Purpose: This table stores pending/queued jobs to be processed async.
+Additionally, this functions as a log of all such processing for purposes of
+internal audits, performance tuning, and the like. $$;
+
+CREATE INDEX pending_job_batch_id_pending ON pending_job(batch_id) where success IS NULL;
+
+CREATE INDEX pending_job_entered_by ON pending_job(entered_by);
+
+CREATE OR REPLACE FUNCTION trigger_pending_job() RETURNS TRIGGER
+AS
+$$
+BEGIN
+ IF NEW.success IS NULL THEN
+ NOTIFY job_entered;
+ END IF;
+ RETURN NEW;
+END;
+$$ LANGUAGE PLPGSQL;
+
+CREATE TRIGGER notify_pending_jobs BEFORE INSERT OR UPDATE ON pending_job
+FOR EACH ROW EXECUTE PROCEDURE trigger_pending_job();
+
+CREATE TABLE payments_queue (
+ transactions numeric[],
+ batch_id int,
+ source text,
+ total numeric,
+ ar_ap_accno text,
+ cash_accno text,
+ payment_date date,
+ account_class int,
+ job_id int references pending_job(id)
+ DEFAULT currval('pending_job_id_seq')
+);
+
+CREATE INDEX payments_queue_job_id ON payments_queue(job_id);
+
+COMMENT ON table payments_queue IS
+$$ This is a holding table and hence not a candidate for normalization.
+Jobs should be deleted from this table when they complete successfully.$$;
+
commit;
diff --git a/sql/modules/Payment.sql b/sql/modules/Payment.sql
index 08755ce6..aae4917e 100644
--- a/sql/modules/Payment.sql
+++ b/sql/modules/Payment.sql
@@ -207,6 +207,88 @@ This then returns a set of contact information with a 2 dimensional array
cnsisting of outstanding invoices.
$$;
+CREATE OR REPLACE FUNCTION payment_create_queue_entry() RETURNS int AS
+$$
+$$ LANGUAGE PLPGSQL;
+
+CREATE OR REPLACE FUNCTION payment_bulk_queue
+(in_transactions numeric[], in_batch_id int, in_source text, in_total numeric,
+ in_ar_ap_accno text, in_cash_accno text,
+ in_payment_date date, in_account_class int)
+returns int as
+$$
+BEGIN
+ INSERT INTO payments_queue
+ (transactions, batch_id, source, total, ar_ap_accno, cash_accno,
+ payment_date, account_class)
+ VALUES
+ (in_transactions, in_batch_id, in_source, in_total, in_ar_ap_accno,
+ in_cash_accno, in_payment_date, in_account_class);
+
+ RETURN array_upper(in_transactions, 1) -
+ array_lower(in_transactions, 1);
+END;
+$$ LANGUAGE PLPGSQL;
+
+CREATE OR REPLACE FUNCTION job__process_payments(in_job_id int)
+RETURNS bool AS $$
+DECLARE
+ queue_record RECORD
+ t_auth_name text,
+BEGIN
+ -- TODO: Move the set session authorization into a utility function
+ SELECT created_by INTO t_auth_name FROM pending_jobs
+ WHERE id = in_job_id;
+
+ EXECUTE 'SET SESSION AUTHORIZATION ' quote_ident(t_auth_name);
+
+ FOR queue_record IN
+ SELECT * from payments_queue WHERE job_id = in_job_id
+ LOOP
+ PERFORM payment_bulk_post
+ (transactions, batch_id, source, total, ar_ap_accno, cash_accno,
+ payment_date, account_class);
+ END LOOP;
+ UPDATE pending_job
+ SET completed_at = timeofday()::timestamp,
+ success = true
+ WHERE id = in_job_id;
+ RETURN TRUE;
+END;
+$$ language plpgsql;
+
+CREATE OR REPLACE FUNCTION job__create(in_batch_class int, in_batch_id int)
+RETURNS int AS
+$$
+BEGIN
+ INSERT INTO pending_job (batch_class, batch_id)
+ VALUES (in_batch_class, in_batch_id);
+
+ RETURN currval('pending_job_id_seq');
+END;
+$$ LANGUAGE PLPGSQL;
+
+CREATE TYPE job__status AS (
+ completed int, -- 1 for completed, 0 for no
+ success int, -- 1 for success, 0 for no
+ completed_at timestamp,
+ error_condition text -- error if not successful
+);
+
+CREATE OR REPLACE FUNCTION job__status(in_job_id int) RETURNS job__status AS
+$$
+DECLARE out_row job__status;
+BEGIN
+ SELECT (completed_at IS NULL)::INT, success::int, completed_at,
+ error_condition
+ INTO out_row
+ FROM pending_job
+ WHERE id = in_job_id;
+
+ RETURN out_row;
+END;
+$$ language plpgsql;
+
CREATE OR REPLACE FUNCTION payment_bulk_post
(in_transactions numeric[], in_batch_id int, in_source text, in_total numeric,
in_ar_ap_accno text, in_cash_accno text,
@@ -265,7 +347,6 @@ BEGIN
CASE WHEN t_voucher_id IS NULL THEN true
ELSE false END,
t_voucher_id, in_payment_date);
- insert into test_pay(id, amount) values (in_transactions[out_count][1],in_transactions[out_count][2]);
UPDATE ap
set paid = paid +in_transactions[out_count][2]
where id =in_transactions[out_count][1];
@@ -449,21 +530,37 @@ $$ language plpgsql;
COMMENT ON FUNCTION payment_get_vc_info(in_entity_id int) IS
$$ This function return vendor or customer info, its under construction $$;
+CREATE TYPE payment_record AS (
+ amount numeric,
+ meta_number text,
+ company_paid text,
+ cash_account_id int,
+ cash_accno text,
+ cash_account_description text,
+ ar_ap_account_id int,
+ ar_ap_accno text,
+ ar_ap_description text
+);
+
CREATE OR REPLACE FUNCTION payment__retrieve
(in_source text, in_meta_number text, in_account_class int, in_cash_accno text)
-RETURNS SETOF numeric AS
+RETURNS SETOF payment_record AS
$$
-DECLARE out_row RECORD;
+DECLARE out_row payment_record;
BEGIN
FOR out_row IN
- SELECT amount * -1 AS amount
- FROM acc_trans
- WHERE source = in_source
- AND trans_id IN (
- SELECT id FROM ar
- WHERE in_account_class = 2 AND
- entity_credit_account =
- (select id
+ SELECT sum(case when at.amount > 0 then at.amount else 0 end)
+ AS amount, ec.meta_number,
+ c.legal_name, max(cc.id), max(cc.accno),
+ max(cc.description), max(ac.id), max(ac.accno),
+ max(ac.description)
+ FROM acc_trans at
+ JOIN entity_credit_account ec ON
+ (at.trans_id IN
+ (select id FROM ar
+ WHERE in_account_class = 2
+ AND entity_credit_account =
+ (SELECT id
FROM entity_credit_account
WHERE meta_number
= in_meta_number
@@ -478,18 +575,31 @@ BEGIN
WHERE meta_number
= in_meta_number
AND entity_class =
- in_account_class)
- AND chart_id =
- (SELECT id FROM chart
- WHERE accno = in_cash_accno)
+ in_account_class)))
+
+ JOIN company c ON (ec.entity_id = c.entity_id)
+ LEFT JOIN chart cc ON (at.chart_id = cc.id AND
+ cc.link LIKE '%paid%')
+ JOIN chart ac ON (at.chart_id = ac.id AND
+ ((in_account_class = 1 AND ac.link = 'AP') OR
+ (in_account_class = 2 AND ac.link = 'AR')))
+ WHERE source = in_source
+ GROUP BY ec.meta_number, c.legal_name
+ HAVING max(cc.accno) = in_cash_accno
LOOP
- return next out_row.amount;
+ return next out_row;
END LOOP;
END;
$$ LANGUAGE plpgsql;
+
CREATE OR REPLACE FUNCTION payment__reverse
(in_source text, in_date_paid date, in_credit_id int, in_cash_accno text)
RETURNS INT
AS $$
-
+DECLARE
+ count int;
+BEGIN
+ count := 0;
+ FOR
+END;
$$ LANGUAGE PLPGSQL;
diff --git a/sql/modules/Roles.sql b/sql/modules/Roles.sql
index 45a4213f..575a7a1a 100644
--- a/sql/modules/Roles.sql
+++ b/sql/modules/Roles.sql
@@ -1387,7 +1387,9 @@ GRANT SELECT ON language, project TO public;
GRANT SELECT ON business, exchangerate, department, shipto, tax TO public;
GRANT ALL ON recurring, recurringemail, recurringprint, status TO public;
GRANT ALL ON transactions, entity_employee, customer, vendor TO public;
---TODO, lock recurring down more
+GRANT ALL ON pending_job, payment_queue TO PUBLIC;
+GRANT ALL ON pending_job_id_seq TO public;
+--TODO, lock recurring, pending_job, payment_queue down more
-- CT: The following grant is required for now, but will hopefully become less
-- important when we get to 1.4 and can more sensibly lock things down.
diff --git a/sql/modules/Settings.sql b/sql/modules/Settings.sql
index 5b8d2b28..b9c71dc8 100644
--- a/sql/modules/Settings.sql
+++ b/sql/modules/Settings.sql
@@ -15,7 +15,7 @@ DECLARE
out_value varchar;
BEGIN
SELECT value INTO out_value FROM defaults WHERE setting_key = in_key;
- RETURN value;
+ RETURN out_value;
END;
$$ LANGUAGE plpgsql;
diff --git a/utils/process_queue/config.pl b/utils/process_queue/config.pl
new file mode 100644
index 00000000..c977a2b8
--- /dev/null
+++ b/utils/process_queue/config.pl
@@ -0,0 +1,18 @@
+#!/usr/bin/perl
+
+use vars qw($database $db_user
+ $db_passwd);
+
+# The databases containing LedgerSMB
+our $database = ("ledgersmb");
+
+# The user to connect with. This must be a superuser so that set session auth
+# works as expected
+
+our $db_user = "postgres";
+
+# The password for the db user:
+our $db_passwd = "mypasswd";
+
+1;
+
diff --git a/utils/process_queue/process_queue.pl b/utils/process_queue/process_queue.pl
new file mode 100644
index 00000000..7c56869a
--- /dev/null
+++ b/utils/process_queue/process_queue.pl
@@ -0,0 +1,89 @@
+#!/usr/bin/perl
+
+# TODO: Add POD -CT
+
+require "config.pl";
+
+use DBI;
+# TODO: Convert config.pl to namespace so we can use strict.
+
+my $cycle_delay;
+
+my $dbh = db_init();
+
+# Basic db connection setup routines
+
+
+
+my $sth;
+
+$dbh->do("LISTEN job_entered");
+while (1) { # loop infinitely
+ if ( $dbh->func('pg_notifies') ) {
+ &on_notify;
+ }
+ sleep $cycle_delay;
+}
+
+sub on_notify {
+ my $job_id = 1;
+ while ($job_id){
+ ($job_id) = $dbh->selectrow_array(
+ "SELECT min(id) from pending_job
+ WHERE completed_at IS NULL
+ FOR UPDATE"
+ );
+ if ($job_id){
+ $job_id = $dbh->quote($job_id);
+ my ($job_class) = $dbh->selectrow_array(
+ "select class from batch_class where id =
+ (select batch_class from pending_job where id = $job_id"
+ );
+ # Right now, we assume that every pending job has a batch id.
+ # Longer-run we may need to use a template handle as well. -CT
+ $dbh->execute('SELECT ' .
+ $dbh->quote_identifier("job__process_$job_class") . "($job_id)"
+ );
+ my $errstr = $dbh->errstr;
+ if (!$dbh->commit){ # Note error and clean up
+ # Note that this does not clean up the queue holding tables.
+ # This is a feature, not a bug, as it allows someone to review
+ # the actual data and then delete if required separately -CT
+ $dbh->do(
+ "UPDATE pending_job
+ SET completed_at = now(),
+ success = false,
+ error_condition = " . $dbh->quote($errstr) . "
+ WHERE id = $job_id"
+ );
+ $dbh->commit;
+ }
+ # The line below is necessary because the job process functions
+ # use set session authorization so one must reconnect to reset
+ # administrative permissions. -CT
+ $dbh = db_init();
+ }
+ }
+}
+
+sub db_init {
+ my $dsn = "dbi:Pg:dbname=$database";
+ my $dbh = DBI->connect(
+ $dsn, $db_user,
+ $db_passwd,
+ {
+ AutoCommit => 0,
+ PrintError => 0,
+ RaiseError => 1,
+ }
+ );
+ $dbh->{pg_enable_utf8} = 1;
+ ($cycle_delay) = $dbh->selectrow_array(
+ "SELECT value FROM defaults
+ WHERE setting_key = 'poll_frequency'"
+ );
+ if (!$cycle_delay){
+ die "No Polling Frequency Set Up!";
+ }
+ return $dbh;
+}