Job control object

Job control modelled on Unix: declare "job" objects, submit them, wait for them to complete. Uses DBMS_JOB to submit tasks, DBMS_ALERT for jobs to pass back status and error messages, so the calling procedure can easily track the background jobs it started.

-- PL/SQL job control modelled on Unix: submit one or more tasks, wait for them to complete.
-- Uses DBMS_JOB to submit tasks, DBMS_ALERT for jobs to pass back started/completed/failed messages.
-- The DBMS_ALERT mechanism is hidden from the submit/wait interface.
--
-- Object-based approach allows you to declare a job object then call myjob.submit() etc.
-- (See demo block for examples.)
--
-- Prerequisites: Execute permission on sys.dbms_alert.
--
-- William Robertson 2004, www.williamrobertson.net

-- Package (spec only) for constants used in type body:
create or replace package job_pkg
as
   k_error_code         constant pls_integer := -20000;
   k_delimiter          constant varchar2(3) := chr(0);  -- Delimiter for packing messages
   k_job_queue_interval constant interval day(0) to second(0) default interval '5' second;

   -- Job statuses:
   k_submitted          constant varchar2(30) := 'Submitted';
   k_not_submitted      constant varchar2(30) := 'Not submitted';
   k_waiting_confirm    constant varchar2(30) := 'Awaiting start confirmation';
   k_submit_failed      constant varchar2(30) := 'Submit failed';
   k_in_progress        constant varchar2(30) := 'In Progress';
   k_wait_timed_out     constant varchar2(30) := 'Wait timeout';
   k_failed             constant varchar2(30) := 'Abnormal Termination';
   k_completed_ok       constant varchar2(30) := 'Completed Successfully';
end job_pkg;
/

show errors

-- job_ot object type:
-- Submit one or more tasks, wait for them to complete.
-- Uses dbms_job to submit tasks, dbms_alert to pass back started/completed/failed messages.
-- The dbms_alert mechanism is hidden from the submit/wait interface.
--
-- Object-based approach allows you to declare a new job_ot object then call myjob.submit() etc.

create sequence job_seq start with 1 maxvalue 999 cycle;

create or replace type job_ot as object
   ( name            varchar2(30)
   , job_number      number             -- Number set by dbms_job.submit
   , handle          varchar2(30)       -- Generated unique id formed from name + generated sequence
   , command         varchar2(4000)     -- Command to execute
   , description     varchar2(500)      -- Text description
   , message         varchar2(1000)     -- Message returned from execution; error message if failed
   , wait_timeout    interval day to second(0)
   , created_time    timestamp
   , submitted_time  timestamp
   , started_time    timestamp
   , completed_time  timestamp
   , run_status      varchar2(50)       -- In progress, failed etc

   , member procedure submit

   , member procedure wait
     ( p_timeout interval day to second default null
     , p_alertname varchar2 default null )  -- What to wait for: defaults to self.handle, above

   , constructor function job_ot
     ( p_name             varchar2
     , p_command          varchar2
     , p_description      varchar2 default null
     , p_timeout          interval day to second default interval '1' minute )
     return self as result

   , static procedure execute
     ( p_command          varchar2   -- What to execute
     , p_receipt_alert    varchar2   -- Signal name for confirming receipt to submitting procedure
     , p_completed_alert  varchar2   -- Signal name for confirming job completion to submitting procedure
     , p_invoking_session varchar2 ) -- Session id of submitting process

   , member procedure print
)
/

show errors

create or replace type body job_ot as
   constructor function job_ot
      ( p_name          varchar2
      , p_command       varchar2
      , p_description   varchar2 default null
      , p_timeout       interval day to second default interval '1' minute )
      return self as result
   is
      v_seq  pls_integer;
      v_handle varchar2(3000);  -- Bigger than required to ensure assignment does not fail
   begin
      select job_seq.nextval
      into   v_seq
      from   dual;

      v_handle := substr(nvl(upper(p_name),'alert') || '_' || v_seq,1,30);

      -- "_receipt" must not be longer than 35 characters:
      if length(v_handle) > 27 then
         v_handle := substr(upper(p_name),1,14) || '_' || v_seq;
      end if;

      self.name := p_name;
      self.handle := v_handle;
      self.command := p_command;
      self.description := p_description;
      self.wait_timeout := nvl(p_timeout,interval '1' minute);
      self.created_time := systimestamp;
      self.run_status := job_pkg.k_not_submitted;
      return;
   end;


   member procedure submit
   is
      pragma autonomous_transaction;

      k_session_id constant varchar2(30) := dbms_session.unique_session_id;
      k_wait_timeout interval day(0) to second(0) := job_pkg.k_job_queue_interval * 2;

      cursor c_jobs(cp_command varchar2 := self.command)
      is
         select job
         from   user_jobs
         where  what = cp_command;

      v_command varchar2(5000);

      v_job_number   pls_integer;
      v_message      varchar2(1000);
      v_job_status   pls_integer;

      v_receipt_name varchar2(30);
   begin
      if self.command is null then
         raise_application_error
         ( job_pkg.k_error_code
         , 'Command must be specified' );
      elsif self.handle is null then
         raise_application_error
         ( job_pkg.k_error_code
         , 'Job handle must be specified' );
      end if;

      v_receipt_name := self.handle || '_receipt';

      v_command :=
         'job_ot.execute(''begin '
         || replace(rtrim(self.command,'; '),'''','''''')
         || '; end;'', '''
         || v_receipt_name || ''', '''
         || self.handle
         || ''','''
         || k_session_id || ''');';

      -- In case same command already in job queue, remove any jobs for same command:
      for r_job in c_jobs(self.command)
      loop
         dbms_job.remove(r_job.job);
      end loop;

      -- Register interest in both receipt and completion alerts.
      -- submit waits for receipt; calling procedure waits for completion.
      dbms_alert.register(v_receipt_name);
      dbms_alert.register(self.handle);
      commit;

      submitted_time := systimestamp;

      begin
         dbms_job.submit
         ( self.job_number  -- 'out' parameter: job number returned by dbms_job.submit
         , v_command        -- in format 'job_ot.execute('begin cmd; end;', receipt_alert, completed_alert);'
         , sysdate          -- First run
         , null );          -- Interval (none = no repeat)

         commit;

         run_status := job_pkg.k_submitted;
      exception
         when others then
            raise_application_error
            ( job_pkg.k_error_code
            , 'dbms_job.submit failed'
            , true );
      end;


      -- Wait for background job just submitted to confirm that it is ready to begin:
      -- Job should send initial confirmation before doing anything (picked up here) followed by
      -- completion signal on success/failure at end (picked up by calling procedure).

      -- Note that when a member procedure fails with an exception, changes to the object state
      -- are lost, e.g. run_status will revert to 'Not submitted' - we cannot both raise an exception
      -- and change an attribute value.
      begin
         -- wait() will raise exceptions on failure
         self.wait(k_wait_timeout, v_receipt_name);

         dbms_alert.remove(v_receipt_name);

         if self.run_status = job_pkg.k_wait_timed_out then
            raise_application_error
            ( job_pkg.k_error_code
            , 'Submitted task failed to start (no confirmation received after ' ||
              ltrim(to_char(k_wait_timeout),'+') || ')'
            , true );
         end if;
      end;

      started_time := systimestamp;
      run_status := job_pkg.k_in_progress;

      commit;
   exception
      when others then
         run_status := job_pkg.k_submit_failed;

         raise_application_error
         ( job_pkg.k_error_code
         , 'Failed to submit job ' || v_command
         , true );
   end submit;


   member procedure wait
      ( p_timeout interval day to second default null
      , p_alertname varchar2 default null )  -- What to wait for: defaults to self.handle, above
   is
      k_alertname  constant varchar2(50) := nvl(p_alertname,self.handle);
      k_is_confirmation constant boolean := k_alertname = self.handle;
      k_timeout    constant interval day to second := nvl(p_timeout,self.wait_timeout);

      k_maxwait constant pls_integer :=
         nvl
         ( extract(day from k_timeout) * 86400
         + extract(hour from k_timeout) * 3600
         + extract(minute from k_timeout) * 60
         + extract(second from k_timeout)
         , dbms_alert.maxwait );

      v_delimiter_position pls_integer;
      v_message varchar2(32767);
      v_dbms_alert_status integer;
   begin
      if k_is_confirmation then
         run_status := job_pkg.k_waiting_confirm;
      end if;

      dbms_alert.waitone
      ( k_alertname
      , v_message
      , v_dbms_alert_status
      , k_maxwait );

      dbms_alert.remove(k_alertname);

      v_delimiter_position := instr(v_message, job_pkg.k_delimiter);

      -- The returned message may be in the form "status[delimiter]message":
      -- if so, extract into run_status and message:
      if v_delimiter_position > 0 then
         run_status := substr(v_message,1,v_delimiter_position -1);
         message :=    substr(v_message,v_delimiter_position + length(job_pkg.k_delimiter));
      else
         message := v_message;
      end if;

      if v_dbms_alert_status = 1 then
         -- Check status returned by dbms_alert: 1 = "timed out while waiting"
         self.run_status := job_pkg.k_wait_timed_out;

         self.message :=
         'Timeout after waiting ' || k_maxwait
         || ' seconds for job ' || self.name || ': "'
         || substr(self.command,1,80)
         || case when length(self.command) > 80 then '...' else null end
         || '"';

         -- Not good idea to have wait() fail, as then we lose any state changes that
         -- wait() made. Instead, the calling application should check thisjob.status.
      else
         if k_is_confirmation then
            completed_time := systimestamp;
            message := nvl(message,job_pkg.k_completed_ok);
         end if;

      end if;
   end wait;


   static procedure execute
      ( p_command          varchar2   -- What to execute
      , p_receipt_alert    varchar2   -- Signal name for confirming receipt to submitting procedure
      , p_completed_alert  varchar2   -- Signal name for confirming job completion to submitting procedure
      , p_invoking_session varchar2 ) -- Session id of submitting process
   is
      pragma autonomous_transaction;

      v_status varchar2(30) := job_pkg.k_in_progress;
      v_message varchar2(1000) := job_pkg.k_in_progress;
   begin
      dbms_application_info.set_module('job_ot.execute', p_command);

      -- Send initial message to confirm instruction received:
      -- (echo back the value of 'p_receipt_alert')
      dbms_alert.signal(p_receipt_alert,v_status);
      commit;

      begin
         execute immediate(p_command);

         v_message :=
         job_pkg.k_completed_ok ||
         job_pkg.k_delimiter ||
         job_pkg.k_completed_ok;
      exception
         when others then
            v_message :=
            job_pkg.k_failed ||
            job_pkg.k_delimiter ||
            sqlerrm;
      end;

      -- Send specified alert to inform waiting process that task has completed:
      dbms_alert.signal(p_completed_alert,v_message);
      commit;
   end execute;


   member procedure print
   is
      procedure print_item
         ( p_name varchar2
         , p_value varchar2 )
      is
      begin
         dbms_output.put_line(rpad(p_name || ':', 15) || p_value);
      end print_item;
   begin
      print_item('Job name', name);
      print_item('Command', command);
      print_item('Description', description);
      print_item('Message', message);
      print_item('Created', created_time);
      print_item('Submitted', submitted_time);
      print_item('Started', started_time);
      print_item('Completed', completed_time);
      print_item('Run status', run_status);
      dbms_output.put_line(chr(9));
   end print;
end;
/

show errors

set serverout on size 100000

prompt
prompt Demo/test (allow 20 seconds for submit/wait/output cycle to complete):
prompt

set echo on

declare
   -- Create some job objects:

   v_ok_job job_ot := new job_ot('should_work', 'null;', 'Background job test: successful job');

   v_invalid_job job_ot := new job_ot('should_fail', 'nosuchcommand', 'Background job test: handle failure');

   v_slow_job job_ot :=
   new job_ot
   ( 'should_timeout'
   , 'dbms_lock.sleep(20)'   -- User must have execute permission on sys.dbms_lock
   , 'Background job demo 2: trapping failure'
   , interval '5' second );  -- Job will fail to complete within time allowed
begin
   dbms_output.put_line('Defined the following jobs:' || chr(10));

   v_ok_job.print();         -- Display basic info about the job
   v_invalid_job.print();
   v_slow_job.print();

   dbms_output.new_line;
   dbms_output.put_line('Calling submit() method of each job in turn...' || chr(10));

   v_ok_job.submit();        -- Submit in background
   v_invalid_job.submit();
   v_slow_job.submit();

   dbms_output.put_line('Jobs have now been submitted in background.');
   dbms_output.put_line('We can now get on with something else until we are ready to wait for them to complete.');

   dbms_output.put_line(chr(10)||'...'||chr(10));

   dbms_output.put_line('Calling wait() method of each job in turn...' || chr(10));
   v_ok_job.wait();
   v_invalid_job.wait();
   v_slow_job.wait();

   dbms_output.put_line('Job details after all jobs have returned:' || chr(10));
   v_ok_job.print();
   v_invalid_job.print();
   v_slow_job.print();
end;
/


set echo off