#!PERL_EXE # $Id: queue 203 2010-05-18 00:38:11Z cbworden $ USGS BEGIN { $ENV{PATH} = '/usr/ucb:/bin' } use strict; use FindBin; use lib "$FindBin::Bin/../lib"; use ShakeConfig; use lib "$ShakeConfig::shake_perl_lib"; use Socket; use IO::File; use File::Copy; use File::Path; use Shake::Opt; use Shake::List; use Shake::Graphics_2D; use Shake::LoadConfig; use Shake::Event; use Shake::EqInfo; use POSIX "sys_wait_h"; ############################################################################ # Prototypes for the logging routines ############################################################################ sub logmsg; sub logver; sub logerr; sub logscr; sub mydate; #---------------------------------------------------------------------- # We may fork several times, so we don't want anything to accumulate # in the output buffer or it'll print every time a child process exits... #---------------------------------------------------------------------- STDERR->autoflush(1); STDOUT->autoflush(1); #---------------------------------------------------------------------- # Globals set by the configuration file; see sub reconfig for default # values #---------------------------------------------------------------------- my $logfile; # Where we dump all of our messages my $log; # Filehandle of the log file my $verbose; # Make more noise if > 0 #---------------------------------------------------------------------- # servers is a list of valid servers who may send events #---------------------------------------------------------------------- my @servers; my $queue_length; # Max # of queued events my $nspawnmax; # Max # of simultaneous processing shells my $minmag; # Min magnitude for processing an event #---------------------------------------------------------------------- # boxes is an array of hashes as follows: # @boxes = [ { 'MAG' => mag, # 'DELTAP' => delta_priority, # 'COORDS' => [ [ lon1, lat1 ], ..., [ lonN, latN ] ], # 'POLY' => polygon_reference }, # { ... }, ... ]; #---------------------------------------------------------------------- my @boxes; # Metro boxes #---------------------------------------------------------------------- # repeats is an array of hashes as follows: # @repeats = [ { 'MAG' => mag, # 'DELAYS' => "delay1 delay2 ... delayN" }, # { ... }, ... ]; #---------------------------------------------------------------------- my @repeats; #---------------------------------------------------------------------- # agemods is an array of hashes as follows: # @agemods = [ { 'AGE' => time, # 'DELTAP' => delta_priority }, # { ... }, ... ]; #---------------------------------------------------------------------- my @agemods; my $configerr; # Number of errors in the most recent # run of sub configure() my $eo; # Event object #---------------------------------------------------------------------- # End of configuration globals #---------------------------------------------------------------------- #---------------------------------------------------------------------- # Program global variables #---------------------------------------------------------------------- my @event_queue; # The pending event queue my @process_queue; # The current processing queue my @repeater_queue; # The current repeater queue my $config_file = "queue.conf"; # Where to find config file my $db_file = "db.conf"; # Where to find database config file my $state_file = "/$shake_home/logs/queue.state"; my $cancel_file = "/$shake_home/logs/cancelled_events"; my $port = PORT_NUM; # Port for socket to connect to #---------------------------------------------------------------------- # End of program global variables #---------------------------------------------------------------------- ####################################################################### # Stuff to handle command line options and program documentation: ####################################################################### my $desc = "Program to receive event alarms from the real time system " . "and run 'shake' with the appropriate event ID. See the " . "configuration file '$config_file' for details. " . "The queue will respond to the following signals:n" . "SIGHUP causes the queue to re-read its configuration file; " . "SIGUSR1 causes a dump of the current configuration and " . "the process and event queues - it is intended for debugging " . "and troubleshooting; " . "SIGTERM is used to kill the process gracefully"; my $flgs = [{ FLAG => 'restart', DESC => "Restarts the queue and reinitializes using the " . "state file '$state_file' from the previous run"}, { FLAG => 'config', DESC => 'Check the configuration file, then quit.'}, { FLAG => 'help', DESC => 'Print program documentation and quit.'} ]; my $options = Opt::setOptions($flgs) or logerr "Error in setOptions"; if (defined $options->{'help'}) { Opt::printDoc($desc); exit 0; } if (defined $options->{'config'}) { configure(); if ($configerr) { logscr "there were errors in the configuration"; } else { logscr "config succeeded"; } exit 0; } my $restart = defined $options->{'restart'} ? 1 : 0; logerr "Unknown argument(s): @ARGV" if (@ARGV); ####################################################################### # End of command line option stuff ####################################################################### #---------------------------------------------------------------------- # Run the program #---------------------------------------------------------------------- main(); ####################################################################### # Subroutines ####################################################################### sub main { #---------------------------------------------------------------------- # Write the queue.pid file #---------------------------------------------------------------------- if (not -e "$shake_home/logs") { mkpath("$shake_home/logs", 0, 0755) or logerr "Couldn't create log dir"; } my $pidfile = "$shake_home/logs/queue.pid"; my $fh = IO::File->new($pidfile, 'w') or logerr "unable to open $pidfile -- $!\n"; $fh->print($$,"\n"); $fh->close; #---------------------------------------------------------------------- # Set signal handlers; #---------------------------------------------------------------------- catch(); #---------------------------------------------------------------------- # Set the defaults then read the configuration file #---------------------------------------------------------------------- reconfig(); #---------------------------------------------------------------------- # If this is a restart, we initialize the queues using the state file # and begin running events #---------------------------------------------------------------------- if ($restart) { readState() } else { unlink($state_file); } #---------------------------------------------------------------------- # Set up the socket; we don't need to check the return value since # openSocket() will die if it fails #---------------------------------------------------------------------- my $server = openSocket($port); logmsg "server started on port $port"; my $paddr; # Data port of incoming connection my $cdata; # Data read from client my $name; my ($iport, $iaddr); my ($action, $eventid, $repeat); #---------------------------------------------------------------------- # Loop forever, waiting for connections #---------------------------------------------------------------------- for (;;close Client) { #---------------------------------------------------------------------- # Here we wait for a valid connection with accept() -- note that signals # will break the accept, so we need to call it again if it returns # nothing. # # With asynchronous processes that don't have reliable signal delivery # (and since we occasionally ignore signals), it is possible to get into # a deadlock situation, so we alarm() this call and check for things to # do (by calling the alarmer() handler) if it times out #---------------------------------------------------------------------- alarm 30 if (@event_queue > 0 || @process_queue > 0 || @repeater_queue > 0); next if (!defined ($paddr = accept(Client, $$server))); alarm 0; ($iport, $iaddr) = sockaddr_in($paddr); if (!defined $iport || !defined $iaddr) { logmsg "ERROR in sockaddr_in: iport=$iport iaddr=$iaddr"; next; } $name = gethostbyaddr($iaddr, AF_INET); logmsg "connection from $name [" . inet_ntoa($iaddr) . "] at port $iport" if ($verbose > 0); if (not grep $name eq $_, @servers) { logmsg "connection from $name refused: not in valid server list"; next; } #---------------------------------------------------------------------- # Read the data sent by the client #---------------------------------------------------------------------- if (!defined ($cdata = )) { logver "no data from $name"; next; } #---------------------------------------------------------------------- # Client data should be three items : "action", "eventid" and "repeat_time" # where repeat_time is the number of minutes since the last repeat #---------------------------------------------------------------------- if ($cdata !~ /^(\w+)\s+(\w+)\s+(\d+)$/) { logver "invalid data: $cdata from $name"; next; } $action = $1; $eventid = $2; $repeat = $3; logmsg "got $action: $eventid $repeat from $name"; #---------------------------------------------------------------------- # Is the alarm a cancellation? #---------------------------------------------------------------------- if ($action eq $cancel_evt) { writeCancelFile($eventid); dequeueEvent($eventid); writeState(); next; } #----------------------------------------------------------------------- # The repeat on an alarm should always be 0, even if it is an # update; the system will handle this situation fairly intelligently #----------------------------------------------------------------------- processEvent($eventid, 0); checkRepeats(); writeState(); #---------------------------------------------------------------------- # Done with this iteration, go back to accept() and wait for another # connection #---------------------------------------------------------------------- } logerr "ERROR: Outside loop -- quitting"; 0; } ############################################################################ # Open a socket on port "$port" and return a reference to the handle ############################################################################ sub openSocket { my $proto = getprotobyname('tcp'); my $port = shift; logerr "ERROR: openSocket: no port specified" if (!defined $port); logerr "ERROR: invalid port: $port" if ($port <= 0); socket(Server, PF_INET, SOCK_STREAM, $proto) or logerr "socket: $!"; setsockopt(Server, SOL_SOCKET, SO_REUSEADDR, pack("l", 1)) or logerr "setsockopt: $!"; bind(Server, sockaddr_in($port, INADDR_ANY)) or logerr "bind: $!"; listen(Server, SOMAXCONN) or logerr "listen: $!"; return \*Server; } ############################################################################ # Check that the event fits the processing criteria and add it to the # queue ############################################################################ sub processEvent { my $eventid = shift; my $repeat = shift; #---------------------------------------------------------------------- # Check for cancelled event #---------------------------------------------------------------------- my $cancelled = readCancelFile(); if (grep $eventid eq $_, @$cancelled) { logver "event $eventid has been cancelled, skipping"; return; } #---------------------------------------------------------------------- # Get event info from database #---------------------------------------------------------------------- my $eqo = $eo->eqInfo($eventid); if (!defined $eqo) { logver "couldn't get event info for event $eventid, skipping"; return; } #---------------------------------------------------------------------- # Do we want to run this event? If the magnitude is smaller than the # cutoff (modified by any metro boxes) then we normally wouldn't, but # if the event has been run before (or is running now), we probably # want to update. We check to see if the event's output directory # exists (which indicates that at least grind has run). This approach # is far from foolproof, but has the advantage of being very simple. #---------------------------------------------------------------------- if (not -d "/$shake_home/data/$eventid/output" and ($eqo->mag < magCheck($eqo->lat, $eqo->lon))) { my $mag = $eqo->mag; logver "event $eventid (M=$mag) too small, skipping"; return; } #---------------------------------------------------------------------- # Set priority, add to queue #---------------------------------------------------------------------- my $prio = priority($eqo->mag, $eqo->lat, $eqo->lon, $repeat); my $mdate = mydate; logver "event $eventid priority $prio on $mdate" if ($verbose > 0); queueEvent($eventid, $eqo->mag, $prio, $repeat); return; } ############################################################################ # Determine process priority from magnitude, location, and age (i.e. repeat) # prioBox = delta_priority of box if in Metro Box (see config file) # = 0.0 otherwise # agemods = specified in config file ############################################################################ sub priority { my ($mag, $lat, $lon, $age) = @_; my $priority = $mag; $priority += prioBox($lat, $lon); foreach my $mod ( @agemods ) { if ($age <= $mod->{AGE}) { $priority += $mod->{DELTAP}; last; } } return $priority; } ############################################################################ # Return the delta_priority of the first box the event falls within if # the location is within one of the Metro Boxes, otherwise return 0 ############################################################################ sub prioBox { my ($lat, $lon) = @_; foreach my $box ( @boxes ) { return $box->{DELTAP} if ($box->{POLY}->crossingstest([$lon, $lat])); } return 0; } ############################################################################ # Return the magnitude required for processing; $minmag is returned # unless the event falls within a metro box, in which case the # metro magnitude of the first box the event falls within is returned ############################################################################ sub magCheck { my ($lat, $lon) = @_; foreach my $box ( @boxes ) { return $box->{MAG} if ($box->{POLY}->crossingstest([$lon, $lat])); } return $minmag; } ############################################################################ # Add an event to the queue (and run as many events as possible) # We block signals here so that the integrity of the queues is # maintained. ############################################################################ sub queueEvent { my $event = getNewEvent(@_); my $item; my $n; #---------------------------------------------------------------------- # Block signals during this section of code #---------------------------------------------------------------------- block(); #---------------------------------------------------------------------- # If this is a new event (i.e. REPEAT == 0) and the event is already # being repeated (i.e. we are getting a new alarm, probably and update, # on an existing event), we want to remove the repeater so it can be # reinstalled later... #---------------------------------------------------------------------- removeItem(\@repeater_queue, 'EID', $event->{EID}) if ($event->{REPEAT} == 0); #---------------------------------------------------------------------- # If the event is already in the event queue remove the old one and discard # it so it can be replaced with the new info #---------------------------------------------------------------------- $item = removeItem(\@event_queue, 'EID', $event->{EID}); logver "event $event->{EID} already exists in queue, replacing" if (defined $item and $verbose > 1); #---------------------------------------------------------------------- # If the event is already in the processing queue, just discard the # new one and mark the repeat as '0' if it is a new event # (this will cause the repeater to be restarted with the initial # repeat program, as if this were a new event) (note that we killed # the repeater above, if it existed) #---------------------------------------------------------------------- if (defined ($item = findItem(\@process_queue, 'EID', $event->{EID}))) { logver "event $event->{EID} is already being processed" if ($verbose > 1); $item->{REPEAT} = 0 if ($event->{REPEAT} == 0); undef $event; } #---------------------------------------------------------------------- # Insert the event into the queue if it wasn't undef'ed above #---------------------------------------------------------------------- insertItem(\@event_queue, $event, 'PRIORITY') if (defined $event); #---------------------------------------------------------------------- # Prune the queue -- we have to account for the possibility of a # reconfig, so we may need to delete more than one event #---------------------------------------------------------------------- my @dequeued = truncateList(\@event_queue, $queue_length); if ($verbose > 0) { foreach my $deq ( @dequeued ) { logver "event $deq->{EID} (M=$deq->{MAG}) dropped from queue"; } } #---------------------------------------------------------------------- # Try to run some events #---------------------------------------------------------------------- runEvents(); dump_queues() if ($verbose > 1); #---------------------------------------------------------------------- # Start handling signals again #---------------------------------------------------------------------- catch(); return 0; } ############################################################################ # Remove an event from the queues (and run as many events as possible) # We block signals here so that the integrity of the queues is # maintained. ############################################################################ sub dequeueEvent { my $evid = shift; my $item; #---------------------------------------------------------------------- # Block signals during this section of code #---------------------------------------------------------------------- block(); #---------------------------------------------------------------------- # Remove the repeater if it is there #---------------------------------------------------------------------- removeItem(\@repeater_queue, 'EID', $evid); #---------------------------------------------------------------------- # If the event is in the event queue, pluck and discard it #---------------------------------------------------------------------- $item = removeItem(\@event_queue, 'EID', $evid); logver "event $evid removed from event queue" if ($verbose > 1 and defined $item); #---------------------------------------------------------------------- # If the event is in the processing queue, mark it so the reaper can # run the cancel program and avoid starting a repeatrer #---------------------------------------------------------------------- if (defined ($item = findItem(\@process_queue, 'EID', $evid))) { $item->{CANCEL} = 1; } else { cancelEvent($evid); } #---------------------------------------------------------------------- # Start handling signals again #---------------------------------------------------------------------- catch(); return 0; } ############################################################################ # Run as many events as we can # This function should only be called when signals are BLOCKED by the # calling routine ############################################################################ sub runEvents { while (@process_queue < $nspawnmax && @event_queue > 0) { #---------------------------------------------------------------------- # Get the highest-priority event (the first on the list) #---------------------------------------------------------------------- my $event = shift @event_queue; if (spawn($event->{EID}, $event->{MAG}, $event->{REPEAT}) != 0) { logmsg "ERROR: can't spawn event $event->{EID}"; } } return 0; } ############################################################################ # Fork and exec a new processing shell # Signals should be BLOCKED here (by the calling routine) ############################################################################ sub spawn { my ($eventid, $mag, $repeat) = @_; # # Fork # FORK: { if (my $pid = fork) { #---------------------------------------------------------------------- # Parent notes the pid, eventid, magnitude, and repeat status of the # child #---------------------------------------------------------------------- insertItem(\@process_queue, getNewProcess($pid, $eventid, $mag, $repeat)); } elsif (defined $pid) { #---------------------------------------------------------------------- # Child exec's a processing shell #---------------------------------------------------------------------- -e $shake_code or logerr "ERROR: can't find shake"; exec "$shake_code -event $eventid -once_only" or logerr "ERROR: couldn't exec shake: $!"; } elsif ($! =~ /No more process/) { #---------------------------------------------------------------------- # EAGAIN, supposedly recoverable fork error #---------------------------------------------------------------------- sleep 5; redo FORK; } else { #---------------------------------------------------------------------- # unknown fork error #---------------------------------------------------------------------- chomp $!; logmsg "ERROR: fork error: $!"; return 1; } } return 0; } ############################################################################ # Fork and exec an event cancellation program ############################################################################ sub cancelEvent { my $eventid = shift; # # Fork # FORK: { if (my $pid = fork) { #---------------------------------------------------------------------- # Parent #---------------------------------------------------------------------- return 0; } elsif (defined $pid) { #---------------------------------------------------------------------- # Child exec's a cancellation shell #---------------------------------------------------------------------- -e $cancel_code or logerr "ERROR: can't find $cancel_code"; exec "$cancel_code -event $eventid" or logerr "ERROR: couldn't exec $cancel_code: $!"; } elsif ($! =~ /No more process/) { #---------------------------------------------------------------------- # EAGAIN, supposedly recoverable fork error #---------------------------------------------------------------------- sleep 5; redo FORK; } else { #---------------------------------------------------------------------- # unknown fork error #---------------------------------------------------------------------- chomp $!; logmsg "ERROR: fork error: $!"; return 1; } } return 0; } ############################################################################ # Reap dead children, start a repeater, and run as many events as possible ############################################################################ sub reaper { my $item; #---------------------------------------------------------------------- # Block signals during this section of code #---------------------------------------------------------------------- block(); #---------------------------------------------------------------------- # Loop while there are dead children #---------------------------------------------------------------------- while ((my $pid = waitpid -1, WNOHANG) > 0) { my $cstat = $?; #---------------------------------------------------------------------- # Is it a shake run? If it is, it will be on this list: #---------------------------------------------------------------------- $item = removeItem(\@process_queue, 'PID', $pid); #---------------------------------------------------------------------- # It is possible that the dead child was a cancel, in which case the # child would not be in the queue #---------------------------------------------------------------------- next if (!defined $item); logmsg "Reaped child for event $item->{EID}" if ($verbose > 1); if ($cstat != 0) { logmsg "ERROR: child $pid returned error status $cstat"; } #---------------------------------------------------------------------- # Run the cancel program if this event was cancelled while it was # running, otherwise start a repeater if this was a shake run and it # was the first run (i.e. repeat == 0) #---------------------------------------------------------------------- if (defined $item->{CANCEL}) { cancelEvent($item->{EID}); } else { repeater($item->{EID}, $item->{MAG}) if ($item->{REPEAT} == 0); } } #---------------------------------------------------------------------- # There should be space to run an event now (unless the configuration # has changed) #---------------------------------------------------------------------- runEvents(); #---------------------------------------------------------------------- # Start handling signals again #---------------------------------------------------------------------- catch(); writeState(); return 0; } ############################################################################ # Run through the list of repeaters and see if any events need to be # rerun ############################################################################ sub checkRepeats { my $ctime = time; foreach my $rep ( @repeater_queue ) { if ($rep->{PTIME} + getMinutes($rep->{REPS}->[0]) * 60 <= $ctime) { my $reptime = shift @{$rep->{REPS}}; removeItem(\@repeater_queue, 'EID', $rep->{EID}) if (@{$rep->{REPS}} <= 0); processEvent($rep->{EID}, getMinutes($reptime)); } } return; } ############################################################################ # Start a repeater -- the re-run schedule is defined in the config file ############################################################################ sub repeater { my ($eventid, $mag) = @_; my @args; #---------------------------------------------------------------------- # Determine the timing and number of re-runs based on the size of # the event; do not start a repeater if there are no "repeat" lines # in the config file or if the event magnitude is less than the # lowest magnitude specified on any repeat line #---------------------------------------------------------------------- return 0 if (@repeats == 0); foreach my $rep ( @repeats ) { if ($mag >= $rep->{MAG}) { @args = split " ", $rep->{DELAYS}; last; } } return 0 if (@args <= 0); insertItem(\@repeater_queue, getNewRepeater($eventid, $mag, time, @args), 'MAG'); return 0; } ############################################################################ # Get a new link for the event queue ############################################################################ sub getNewEvent { my $event = {}; $event->{EID} = shift; $event->{MAG} = shift; $event->{PRIORITY} = shift; $event->{REPEAT} = shift; return $event; } ############################################################################ # Get a new link for the process queue ############################################################################ sub getNewProcess { my $process = {}; $process->{PID} = shift; $process->{EID} = shift; $process->{MAG} = shift; $process->{REPEAT} = shift; return $process; } ############################################################################ # Get a new link for the repeater queue ############################################################################ sub getNewRepeater { my $repeater = {}; $repeater->{EID} = shift; $repeater->{MAG} = shift; $repeater->{PTIME} = shift; $repeater->{REPS} = []; push @{$repeater->{REPS}}, @_; return $repeater; } ############################################################################ # sub getMinutes( ) returns the number of minutes represented # by a string of the form '< |m|h|d>' where ' ' and 'm' mean # minutes, 'h' hours, and 'd' days; -1 is returned on error ############################################################################ sub getMinutes { my $t = shift; my $mult = 1; my ($val, $unit) = $t =~ /^\s*(\d+)(\S?)\s*$/; return -1 if not defined $val; if (defined $unit) { if ($unit eq '' or $unit eq 'm') { $mult = 1; } elsif ($unit eq 'h') { $mult = 60; } elsif ($unit eq 'd') { $mult = 24 * 60; } else { return -1; } } return $mult * $val; } ############################################################################ # Set up the signal handlers: # SIGCHLD means a child process has died (processing shell or repeater) # SIGHUP is a signal to reconfigure the server # SIGUSR1 causes a dump of the current configuration and the process # and event queues; it is intended for debugging and # troubleshooting # SIGALRM is used to break out of the accept() in the main # loop and run the reaper; it is a failsafe to avoid # deadlocks caused by unreliable signals # SIGTERM is used to kill the process ############################################################################ sub catch { $SIG{CHLD} = \&reaper; $SIG{HUP} = \&reconfig; $SIG{USR1} = \&dump_state; $SIG{ALRM} = \&alarmer; $SIG{TERM} = \&quit_queue; return; } ############################################################################ # Block signals during critical sections of code ############################################################################ sub block { $SIG{CHLD} = 'DEFAULT'; $SIG{HUP} = 'IGNORE'; $SIG{USR1} = 'IGNORE'; $SIG{ALRM} = 'IGNORE'; return; } ############################################################################ # On SIGUSR1, dump the system state ############################################################################ sub dump_state { printConfig(); dump_queues(); return; } ############################################################################ # dump the processing and event queues ############################################################################ sub dump_queues { logver "event queue:"; printList(\@event_queue); logver "processing queue:"; printList(\@process_queue); logver "repeater queue:"; printList(\@repeater_queue); return; } ############################################################################ # sub printList( ): print out each of the items on the list ############################################################################ sub printList { my $arr = shift; my $cnt = 1; my $str; foreach my $item ( @$arr ) { $str = "Item $cnt: "; foreach my $key ( sort keys %$item ) { if (ref $item->{$key} eq 'ARRAY') { $str .= "$key => " . join(" ", @{$item->{$key}}) . "\t"; } else { $str .= "$key => $item->{$key}\t"; } } $cnt++; logver $str; } return; } ############################################################################ # alarmer - Called for SIGALRM; basically a failsafe to avoid deadlock # situations caused by unreliable signal delivery ############################################################################ sub alarmer { reaper(); checkRepeats(); writeState(); return; } ############################################################################ # quit_queue - Called for SIGTERM; causes the process to exit; ############################################################################ sub quit_queue { writeState(); logmsg "server shutting down on signal SIGTERM"; my $pidfile = "$shake_home/logs/queue.pid"; unlink $pidfile if -e $pidfile; exit 0; } ############################################################################ # Read the queue state file and initialize the lists accordingly; events # are processed as they are read and repeats are checked when the list # is complete ############################################################################ sub readState { my $fh = new IO::File("$state_file"); if (not defined $fh) { logmsg "Couldn't open state file '$state_file' for reading"; return; } while (<$fh>) { next if $_ =~ /EVENT_QUEUE/; last if $_ =~ /REPEAT_QUEUE/; chomp; processEvent($_, 0); } while (<$fh>) { chomp; my ($eid, $mag, $ptime, @reps) = split " "; insertItem(\@repeater_queue, getNewRepeater($eid, $mag, $ptime, @reps), 'MAG'); } $fh->close; checkRepeats(); return; } ############################################################################ # Write the queues to the state file ############################################################################ sub writeState { my $file = "${state_file}.tmp"; my $fh = new IO::File("> $file"); if (not defined $fh) { logmsg "Couldn't open temporary state file '$file'"; return; } block(); print $fh "EVENT_QUEUE\n"; #----------------------------------------------------------------------- # We don't know if the events in the process queue finished, so we treat # them as if they are in the event queue #----------------------------------------------------------------------- foreach my $proc ( @process_queue, @event_queue ) { print $fh "$proc->{EID}\n"; } print $fh "REPEAT_QUEUE\n"; foreach my $rep ( @repeater_queue ) { next if defined findItem(\@process_queue, 'EID', $rep->{EID}) or defined findItem(\@event_queue, 'EID', $rep->{EID}); print $fh "$rep->{EID} $rep->{MAG} $rep->{PTIME} ", join(" ", @{$rep->{REPS}}), "\n"; } $fh->close; copy("$file", "$state_file"); unlink($file); catch(); return; } ############################################################################ # Write a new event to the cancel file ############################################################################ sub writeCancelFile { my $eventid = shift; my $cancelled = readCancelFile(); my $evt; my $fh = new IO::File("> ${cancel_file}.tmp"); if (not defined $fh) { logmsg "Couldn't create temporary cancel file '${cancel_file}.tmp'"; return; } foreach $evt ( @$cancelled ) { print $fh "$evt\n"; if ($evt == $eventid) { undef $eventid; } } print $fh "$eventid\n" if defined $eventid; $fh->close; copy("${cancel_file}.tmp", "$cancel_file"); unlink("${cancel_file}.tmp"); return; } ############################################################################ # Read the cancel file and return a ref to an array of cancelled events ############################################################################ sub readCancelFile { my $ref = []; my $fh = new IO::File("$cancel_file"); if (not defined $fh) { return $ref; } while (<$fh>) { chomp; push @$ref, $_; } $fh->close; return $ref; } ############################################################################ # Read the "queue.conf" file and configure the server ############################################################################ sub configure { my $cfg = new LoadConfig($config_file, $config_dirs, \&logmsg) or logerr "can't open config file"; $configerr = $cfg->parse( { 'logfile' => \&logfile, 'verbose' => \&verbose, 'server' => \&server, 'queue_length' => \&queue_length, 'nspawnmax' => \&nspawnmax, 'minmag' => \&minmag, 'box' => \&box, 'repeat' => \&repeat, 'agemod' => \&agemod } ); } ############################################################################ # On SIGHUP, reconfigure the server from the config file ############################################################################ sub reconfig { #---------------------------------------------------------------------- # Reset config globals to default values (and clean out the arrays) #---------------------------------------------------------------------- $logfile = 'STDOUT'; $verbose = 0; @servers = ('localhost'); $queue_length = 10; $nspawnmax = 1; $minmag = 4.0; @boxes = (); @repeats = (); @agemods = (); $configerr = 0; $eo = undef; #---------------------------------------------------------------------- # If the 'logfile' entry doesn't appear in the config file, we write # to STDOUT, this is nice for debugging #---------------------------------------------------------------------- $log = \*STDOUT; #---------------------------------------------------------------------- # Reread the config file... #---------------------------------------------------------------------- configure(); #---------------------------------------------------------------------- # Re-establish the Event object (which connects to the database) #---------------------------------------------------------------------- if (!defined ($eo = new Event($db_file, $config_dirs))) { logmsg "CONFIG ERROR: couldn't establish Event object"; $configerr++; } if ($configerr) { logmsg "there were errors in the configuration"; } else { logmsg "config succeeded"; } printConfig() if ($verbose > 0); return; } ############################################################################ # Print the current configuration info ############################################################################ sub printConfig { logver "Logfile: $logfile"; logver "Servers:"; foreach my $server ( @servers ) { logver "\t$server"; } logver "Queue length: $queue_length"; logver "Max spawned processes: $nspawnmax"; logver "Min magnitude: $minmag"; logver "Metro Boxes: ", scalar @boxes; my $cnt = 1; foreach my $box ( @boxes ) { logver "\tBox $cnt:"; logver "\t Magnitude: $box->{MAG}"; logver "\t Delta Priority: $box->{DELTAP}"; logver "\t Coords:"; my $cref = $box->{COORDS}; foreach my $pref ( @$cref ) { logver "\t\t $pref->[0], $pref->[1]"; } $cnt++; } logver "Databases: ", scalar @{$eo->{DBC}->{DBS}}; foreach my $db ( @{$eo->{DBC}->{DBS}} ) { logver "\t database\t => $db->{DB}"; logver "\t\t server => $db->{SERVER}"; logver "\t\t login => $db->{LOGIN}"; #---------------------------------------------------------------------- # Probably shouldn't print the password in the log file... # # logver "\t\t password => $db->{PASSWD}"; #---------------------------------------------------------------------- logver "\t\t password => ********"; if ($db->{SERVER} eq 'Oracle') { logver "\t\t oracle home => $db->{ORACLE_HOME}"; logver "\t\t TWO_TASK => $db->{TWO_TASK}"; } } logver "Repeats: ", scalar @repeats; foreach my $rep ( @repeats ) { logver "\t Mag >= $rep->{MAG} \t : repeats $rep->{DELAYS}"; } logver "Agemods: ", scalar @agemods; foreach my $mod ( @agemods ) { logver "\t Age <= $mod->{AGE} \t : delta priority = $mod->{DELTAP}"; } logver "There", $configerr == 1 ? "was" : "were", "$configerr", $configerr == 1 ? "error" : "errors", "in the configuration"; return; } ############################################################################ # The following routines handle specific lines in the config file: # sub logfile # sub verbose # sub server # sub queue_length # sub nspawnmax # sub minmag # sub box # sub repeat # sub agemod # # Most of these routines could use better error checking ############################################################################ sub logfile { $logfile = shift; #---------------------------------------------------------------------- # Replace the string with the install dir #---------------------------------------------------------------------- $logfile =~ s//$shake_home/; #---------------------------------------------------------------------- # logfile is guaranteed to be something, but can we open it? #---------------------------------------------------------------------- if (!open(LOGOUT, ">> $logfile")) { logscr "can't open $logfile; trying /tmp/queue.log"; $logfile = '/tmp/queue.log'; if (!open(LOGOUT, ">> $logfile")) { logscr "can't open $logfile; will message to STDOUT"; $logfile = 'STDOUT'; $log = \*STDOUT; return "Couldn't open a logfile"; } LOGOUT->autoflush(1); logscr "ok: logging to $logfile"; $log = \*LOGOUT; return "Logging to $logfile"; } LOGOUT->autoflush(1); $log = \*LOGOUT; return undef; } sub verbose { $verbose = shift; return undef; } sub server { my @list = split ' ', shift; foreach my $item ( @list ) { $item =~ s/\s\s*//g; push @servers, $item; } return undef; } sub queue_length { my $arg = shift; return "ERROR: queue_length $arg < 0" if $arg < 0; $queue_length = $arg; return undef; } sub nspawnmax { my $arg = shift; return "ERROR: nspawnmax $arg <= 0" if $arg <= 0; $nspawnmax = $arg; return undef; } sub minmag { $minmag = shift; return undef; } sub box { #---------------------------------------------------------------------- # @boxes = [ { 'MAG' => mag, # 'DELTAP' => delta_priority, # 'COORDS' => [ [ lat1, lon1 ], ..., [ latN, lonN ] ], # 'POLY' => polygon_reference }, # { ... }, ... ]; #---------------------------------------------------------------------- my ($nc, $poly, $lat, $lon); my @args = split ' ', shift; my $box = {}; my $coords = []; if (my $bad = isflt(@args)) { return "ERROR: non-numeric argument found: '$bad'\n" . "-----: make sure coordinate list is properly terminated"; } $box->{MAG} = shift @args; $box->{DELTAP} = shift @args; $box->{COORDS} = $coords; return "ERROR: coordinates must be in pairs" if (($nc = @args) % 2 != 0); $nc /= 2; return "ERROR: too few coordinates: $nc < 3" if $nc < 3; while (@args) { $lat = shift @args; $lon = shift @args; push @$coords, [ $lon, $lat ]; } $box->{POLY} = Polygon->new(@$coords); push @boxes, $box; return undef; } sub repeat { #---------------------------------------------------------------------- # @repeats = [ { 'MAG' => mag, # 'DELAYS' => "delay1 delay2 ... delayN" }, # { ... }, ... ]; #---------------------------------------------------------------------- my $rep = {}; ($rep->{MAG}, $rep->{DELAYS}) = split " ", shift, 2; return "ERROR: incomplete definition; specify magnitude and delays" if !defined $rep->{DELAYS}; my @replist = split " ", $rep->{DELAYS}; foreach my $val ( @replist ) { return "ERROR: invalid repeat: $val, must be of the form < |m|h|d>" if getMinutes($val) <= 0; } push @repeats, $rep; #---------------------------------------------------------------------- # Do an inverse sort so that the larger magnitudes come first #---------------------------------------------------------------------- if (@repeats > 1) { @repeats = sort { $b->{MAG} <=> $a->{MAG} } @repeats; } return undef; } sub agemod { #---------------------------------------------------------------------- # @agemods = [ { 'AGE' => time, # 'DELTAP' => delta_priority }, # { ... }, ... ]; #---------------------------------------------------------------------- my $mod = {}; ($mod->{AGE}, $mod->{DELTAP}) = split ' ', shift, 2; return "ERROR: incomplete definition; specify age and delta priority" if !defined $mod->{DELTAP}; push @agemods, $mod; #---------------------------------------------------------------------- # Do a normal sort so that the earlier times come first #---------------------------------------------------------------------- if (@agemods > 1) { @agemods = sort { $a->{AGE} <=> $b->{AGE} } @agemods; } return undef; } ############################################################################ # Logs a message to the logfile with time/date stamp ############################################################################ sub logmsg { print $log "$$: @_ on ", mydate, "\n"; return; } ############################################################################ # Logs a message to the logfile without time/date stamp ############################################################################ sub logver { print $log "$$: @_\n"; return; } ############################################################################ # Logs a message with time/date stamp to the logfile, then quits ############################################################################ sub logerr { logmsg shift; exit 1; } ############################################################################ # Logs a message with to the screen ############################################################################ sub logscr { print STDOUT "$0 $$: @_ on ", mydate, "\n"; return; } sub mydate { my ($sec, $min, $hr, $day, $mon, $yr) = localtime(); return sprintf('%02d/%02d/%4d %02d:%02d:%02d', $mon + 1, $day, $yr + 1900, $hr, $min, $sec); } sub isflt { foreach my $val ( @_ ) { return $val if ($val !~ /^-*\d*\.*\d*$/); } return undef; }