# Schedule::Load::Chooser.pm -- distributed lock handler
# $Id: Chooser.pm 111 2007-05-25 14:40:56Z wsnyder $
######################################################################
#
# Copyright 2000-2006 by Wilson Snyder. This program is free software;
# you can redistribute it and/or modify it under the terms of either the GNU
# General Public License or the Perl Artistic License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
######################################################################
package Schedule::Load::Chooser;
require 5.004;
require Exporter;
@ISA = qw(Exporter);
use POSIX;
use Socket;
use IO::Socket;
use IO::Select;
use Tie::RefHash;
use Net::hostent;
use Sys::Hostname;
use Time::HiRes qw (gettimeofday);
BEGIN { eval 'use Data::Dumper; $Data::Dumper::Indent=1;';} #Ok if doesn't exist: debugging only
#use Devel::Leak; our $Leak;
use Schedule::Load qw (:_utils);
use Schedule::Load::Schedule;
use Schedule::Load::Hosts;
use IPC::PidStat;
use strict;
use vars qw($VERSION $Debug %Clients $Hosts $Client_Num $Select
$Exister
$Time $Time_Usec
$Server_Self %ChooInfo);
use vars qw(%Holds); # $Holds{hold_key}[listofholds] = HOLD {hostname=>, scheduled=>1,}
use Carp;
######################################################################
#### Configuration Section
# Other configurable settings.
$Debug = $Schedule::Load::Debug;
$VERSION = '3.051';
use constant RECONNECT_TIMEOUT => 180; # If reconnect 5 times in 3m then somthing is wrong
use constant RECONNECT_NUMBER => 5;
######################################################################
#### Globals
%Clients = ();
tie %Clients, 'Tie::RefHash';
stash_time();
%ChooInfo = (# Information to pass to "rschedule info"
slchoosed_hostname => hostname(),
slchoosed_connect_time => time(),
slchoosed_status => "Started",
slchoosed_version => $VERSION,
);
######################################################################
#### Creator
sub start {
# Establish the server
@_ >= 1 or croak 'usage: Schedule::Load::Chooser->start ({options})';
my $proto = shift;
my $class = ref($proto) || $proto;
my $self = {
%Schedule::Load::_Default_Params,
#Documented
dynamic_cache_timeout=>10, # Secs to hold cache for, if not set differently by reporter
ping_dead_timeout=>90, # Secs lack of ping indicates dead (greater than reporter's alive_time)
subchooser_restart_num=>12, # For first 12 times,
subchooser_first_time=>20, # Sec between first 12 chooser_restart_if_reporters
subchooser_repeat_time=>(5*60), # Sec between other chooser_restart_if_reporters
@_,};
bless $self, $class;
$Server_Self = $self; # Only should be one... Need some options
# Open the socket
_timelog("Server up, listening on $self->{port}\n") if $Debug;
my $server = IO::Socket::INET->new( Proto => 'tcp',
LocalPort => $self->{port},
Listen => SOMAXCONN,
Reuse => 1)
or die "$0: Error, socket: $!";
# Update status
$ChooInfo{slchoosed_connect_time} = time();
$ChooInfo{slchoosed_status} = "Connected";
$Select = IO::Select->new($server);
$Hosts = Schedule::Load::Schedule->new(_fetched=>-1,); #Mark as always fetched
$Exister = new IPC::PidStat();
$Select->add($Exister->fh);
$self->_probe_reset_check();
while (1) {
# Anything to read?
foreach my $fh ($Select->can_read(3)) { #3 secs maximum
stash_time(); # Cache the time
if ($fh == $server) {
# Accept a new connection
_timelog("Accept\n") if $Debug;
my $clientfh = $server->accept();
next if !$clientfh;
$Select->add($clientfh);
my $flags = fcntl($clientfh, F_GETFL, 0) or die "%Error: Can't get flags";
fcntl($clientfh, F_SETFL, $flags | O_NONBLOCK) or die "%Error: Can't nonblock";
# new Client
my $client = {socket=>$clientfh,
delayed=>0,
ping_time => $Time,
last_reqdyn_time => undef,
};
$Clients{$clientfh} = $client;
}
elsif ($fh == $Exister->fh) {
_exist_traffic();
}
else {
# Input traffic on other client
_client_service($Clients{$fh});
}
}
# Action or timer expired, only do this if time passed
if (time() != $Time) {
stash_time(); # Cache the time
_hold_timecheck();
_client_ping_timecheck();
$self->_probe_reset_check();
}
}
}
sub stash_time {
# Cache the time, store in a variable to avoid a OS call inside a loop
($Time, $Time_Usec) = gettimeofday();
}
######################################################################
#### Host probing
sub _probe_init {
my $self = shift;
# Create list of all hosts "below" this one on the list of all slchoosed servers
my @hostlist = ($self->{dhost});
@hostlist = @{$self->{dhost}} if (ref($self->{dhost}) eq "ARRAY");
my $host_this = gethost(hostname());
my $hit = 0;
my @subhosts;
foreach my $host (@hostlist) {
my $hostx = $host;
if (my $h = gethost($host)) {
_timelog("_probe_init (host $host => ",$host_this->name,")\n") if $Debug;
if (lc($h->name) eq lc($host_this->name)) {
$hit = 1;
} elsif ($hit) {
push @subhosts, $host;
}
}
}
_timelog("_probe_init subhosts= (@subhosts)\n") if $Debug;
$self->{_subhosts} = \@subhosts;
}
sub _probe_reset_check {
my $self = shift;
# Send a _probe_reset every so often
if (($self->{_probe_reset_next_time}||0) < $Time) {
my $delay = ((($self->{_probe_reset_times}||0) < $self->{subchooser_restart_num})
? $self->{subchooser_first_time} : $self->{subchooser_repeat_time});
$self->{_probe_reset_times}++;
$self->{_probe_reset_next_time} = $Time + $delay;
$self->_probe_reset();
}
}
sub _probe_reset {
my $self = shift;
# Tell all subserviant hosts that a new master is on the scene.
# Start at top and work down, want to ignore ourself and everyone
# before ourself.
$self->_probe_init();
foreach my $host (@{$self->{_subhosts}}) {
_timelog("_probe_reset $host $self->{port} trying...\n") if $Debug;
my $fhreset = Schedule::Load::Socket->new (
PeerAddr => $host,
PeerPort => $self->{port},
Timeout => $self->{timeout},
);
if ($fhreset) {
_timelog("_probe_reset $host restarting\n") if $Debug;
print $fhreset _pfreeze("chooser_restart_if_reporters", {}, $Debug);
$fhreset->close();
_timelog("_probe_reset $host DONE\n") if $Debug;
}
}
}
######################################################################
######################################################################
#### Client servicing
sub _client_close {
# Close this client
my $client = shift || return;
my $fh = $client->{socket};
_timelog("Closing client $fh\n") if $Debug;
if ($client->{host}) {
my $host = $client->{host};
my $hostname = $host->hostname || ""; # Will be deleted, so get before delete
_timelog(" Closing host ",$host->hostname,"\n") if $Debug;
delete $host->{const}; # Delete before user_done, so user doesn't see them
delete $host->{stored};
delete $host->{dynamic};
_user_done_finish ($host);
delete $host->{client};
delete $Hosts->{hosts}{$hostname};
}
$Select->remove($fh);
eval {
$fh->close();
};
delete $client->{host}; # Prevent circular ref leak
delete $Clients{$fh};
if (0) { #Leak checking
$fh = undef;
$client = undef;
#Devel::Leak::CheckSV($Leak) if $Leak;
#Devel::Leak::NoteSV($Leak);
}
}
sub _client_close_all {
# For debugging; close all clients
my @clients = (values %Clients);
foreach (@clients) { _client_close ($_); }
}
sub _client_done {
# Done with this client
my $client = shift || return;
_client_send($client, "DONE\n");
}
sub _client_service {
# Loop getting commands from a specific client
my $client = shift || return;
my $fh = $client->{socket};
my $data = '';
my $rv;
while (1) {
$! = undef;
$rv = $fh->sysread($data, POSIX::BUFSIZ);
last if $! != POSIX::EINTR && $! != POSIX::EAGAIN;
}
if (!defined $rv || (length $data == 0))
{
# End of the file
_client_close ($client);
return;
}
$client->{inbuffer} .= $data;
$client->{ping_time} = $Time;
while ($client->{inbuffer} =~ s/^([^\n]*)\n//) {
next if $client->{_broken};
my $line = $1;
#_timelog("CHOOSER GOT: $line\n") if $Debug;
_timelog("$client->{host}{hostname} ") if ($Debug && $client->{host});
my ($cmd, $params) = _pthaw($line, $Debug);
if ($cmd eq "report_ping") {
# NOP, timestamp recorded above
} elsif ($cmd eq "report_const") {
# Older reporters don't have the _update flag, so support them too
_host_start ($client, $params) if !$params->{_update};
_host_dynamic ($client, "const", $params) if !$client->{_broken};
_host_const_chooseinfo($client->{host});
} elsif ($cmd eq "report_stored") {
_host_dynamic ($client, "stored", $params);
} elsif ($cmd eq "report_dynamic") {
_host_dynamic ($client, "dynamic", $params);
$client->{host}{_dyn_update} = $Time;
$client->{host}{_reqdyn_pending} = 0;
if ($client->{last_reqdyn_time}) {
my $sec = $Time - $client->{last_reqdyn_time}[0];
my $usec = $Time_Usec - $client->{last_reqdyn_time}[1];
$client->{host}{const}{slreportd_delay} = $sec + $usec * 1.0e-6;
}
_user_done_finish ($client->{host});
}
# User commands
elsif ($cmd eq "get_const_load_proc"
|| $cmd eq "get_const_load_proc_chooinfo"
) {
_user_get ($client, "report_get_dynamic\n", $cmd);
} elsif ($cmd eq "schedule") {
_user_schedule ($client, $params);
} elsif ($cmd =~ /^report_fwd_/) { # All can just be forwarded
_user_to_reporter ($client, [$params->{host}], $line."\n");
_client_done ($client);
} elsif ($cmd eq "hold_release") {
_hold_delete ($Holds{$params->{hold_key}});
_client_done ($client);
}
# User reset
elsif ($cmd eq "report_restart") {
_user_to_reporter ($client, '-all', "report_restart\n");
_client_done ($client);
} elsif ($cmd eq "chooser_restart") {
# Overall fork loop will deal with it.
warn "-Info: chooser_restart\n" if $Debug;
exit(0);
} elsif ($cmd eq "chooser_restart_if_reporters") {
# Used by master chooser to restart subservient chooser
foreach my $host ($Hosts->hosts_unsorted, (values %Clients)) {
next if $host eq $client; # Skip the requestor itself
# Overall fork loop will deal with it.
warn "-Info: chooser_restart_if_reporters\n" if $Debug;
exit(0);
} # else no hosts
_client_done ($client);
} elsif ($cmd eq "chooser_close_all") {
_client_close_all ($client);
} else {
print "REQ UNKNOWN '$line\n" if $Debug;
}
}
}
sub _client_send {
my $client = shift || return;
my $out = join "", @_;
# Send any arguments to the client
# Returns 0 if failed, else 1
$SIG{PIPE} = 'IGNORE';
my $fh = $client->{socket};
my $ok = Schedule::Load::Socket::send_and_check($fh, $out);
if (!$ok) {
_client_close ($client);
return 0;
}
return 1;
}
sub _client_ping_timecheck {
# See if any clients haven't pinged
foreach my $client (values %Clients) {
#print "Ping Check $client->{ping_time} Now $Time Dead $Server_Self->{ping_dead_timeout}\n" if $Debug;
if ($client->{host} && ($client->{ping_time} < ($Time - $Server_Self->{ping_dead_timeout}))) {
_timelog("Client hasn't pinged lately, disconnecting\n") if $Debug;
_client_close ($client);
}
}
}
######################################################################
######################################################################
######################################################################
######################################################################
#### Services for slreportd calls
sub _host_start {
my $client = shift || return;
my $params = shift;
# const command: establish a new host, load constants
my $hostname = $params->{hostname};
# Only sent at first establishment, so we blow away old info
_timelog("Connecting $hostname\n") if $Debug;
my $host = { client => $client,
hostname => $hostname,
waiters => {},
const => $params,
};
bless $host, "Schedule::Load::Hosts::Host";
_host_const_chooseinfo($host);
# Remove any earlier connection
my $oldhost = $Hosts->{hosts}{$hostname};
if (defined $oldhost->{client}) {
_timelog("$hostname was connected before, reconnected\n") if $Debug;
if ($host->{const}{slreportd_connect_time}
< ($oldhost->{const}{slreportd_connect_time} + RECONNECT_TIMEOUT)) {
$host->{const}{slreportd_status} = "Reconnected";
$host->{const}{slreportd_reconnects} = ($oldhost->{const}{slreportd_reconnects}||0)+1;
if ($host->{const}{slreportd_reconnects} > RECONNECT_NUMBER) {
# We have two reporters fighting. Tell what's up and ignore all data.
my $cmt = ("%Error: Conflicting slreportd deamons on ".$oldhost->slreportd_hostname
." and ".$host->slreportd_hostname);
$oldhost->{const}{slreportd_status} = $cmt;
$oldhost->{stored}{reserved} = $cmt;
$host->{client}{_broken} = 1;
$client->{_broken} = 1;
return;
}
_client_close($oldhost->{client});
}
}
tie %{$host->{waiters}}, 'Tie::RefHash';
$Hosts->{hosts}{$hostname} = $host;
$client->{host} = $host;
#_timelog("const: ", Data::Dumper::Dumper($host)) if $Debug;
}
sub _host_const_chooseinfo {
my $host = shift;
$host->{const}{slreportd_connect_time} ||= time();
$host->{const}{slreportd_status} ||= "Connected";
$host->{const}{slreportd_delay} ||= undef;
}
sub _host_dynamic {
my $client = shift || return;
my $field = shift;
my $params = shift;
# load/proc command:
$client->{host}{$field} = $params;
}
######################################################################
######################################################################
######################################################################
######################################################################
#### Services for user calls
sub _user_to_reporter {
my $userclient = shift;
my $hostnames = shift; # array ref, or '-all'
my $cmd = shift;
if ($hostnames eq '-all') {
my @hostnames = ();
foreach my $host ($Hosts->hosts_unsorted) {
push @hostnames, $host->hostname;
}
$hostnames = \@hostnames;
}
foreach my $hostname (@{$hostnames}) {
my $host = $Hosts->{hosts}{$hostname};
next if !$host;
$host->{_dyn_update} = 0; # Kill cache, will need refresh
$host->{_reqdyn_pending} = 0; # Kill cache, will need refresh
_timelog("_user_to_reporter ->$hostname $cmd") if $Debug;
_client_send ($host->{client}, $cmd);
}
}
sub _user_get {
my $userclient = shift;
my $cmd = shift;
my $flags = shift;
my $cmd_start_time = [$Time, $Time_Usec];
_user_done_action ($userclient,
\&_user_send_done_cb, [$userclient, $flags, $cmd_start_time]);
_user_all_hosts_cmd ($userclient, $cmd);
_user_done_check($userclient);
}
sub _user_send_done_cb {
my $userclient = shift;
my $flags = shift;
my $cmd_start_time = shift;
_user_send ($userclient, $flags, $cmd_start_time);
_client_done ($userclient);
}
sub _user_all_hosts_cmd {
my $userclient = shift;
my $cmd = shift;
foreach my $host ($Hosts->hosts_unsorted) {
_timelog("GET ->", $host->hostname, " $cmd") if $Debug;
my $dynto = ($host->get_undef('dynamic_cache_timeout') || $Server_Self->{dynamic_cache_timeout});
if ($host->{_dyn_update} > ($Time - $dynto)) {
_timelog(" skipping: _cached ",$host->hostname,"\n") if $Debug;
} else {
# Cache is out of date.
if ($host->{_reqdyn_pending} # Otherwise only issue one request, it'll satisfy everyone.
&& $host->{_reqdyn_pending} > ($Time - $Server_Self->{ping_dead_timeout})) { # Recent
# Already issued response, but haven't gotten it, wait for existing request
_timelog(" skipping: _reqdyn_pending ",$host->hostname,"\n") if $Debug;
_user_done_mark ($host, $userclient);
} else {
if ($cmd =~ /report_get_dynamic/) {
$host->{client}{last_reqdyn_time} = [$Time, $Time_Usec]; # For DELAY column
}
if (_client_send ($host->{client}, $cmd)) {
# Mark that we need activity from each of these before being done
_user_done_mark ($host, $userclient);
$host->{_reqdyn_pending} = $Time;
} # Else host down; ignore it
}
}
}
}
sub _user_send {
my $client = shift;
my $types = shift;
my $cmd_start_time = shift;
# Send requested types of information back to the user
_timelog("_user_send $client $types\n") if $Debug;
_holds_adjust();
_user_send_type ($client, "const") if ($types =~ /const/);
_user_send_type ($client, "stored") if ($types =~ /load/);
_user_send_type ($client, "dynamic") if ($types =~ /load/ || $types =~ /proc/);
_make_chooinfo (($Time - $cmd_start_time->[0]) + ($Time_Usec - $cmd_start_time->[1]) * 1.0e-6);
_client_send ($client, _pfreeze ("chooinfo", \%ChooInfo, 0)) if ($types =~ /chooinfo/);
}
sub _user_send_type {
my $client = shift;
my $type = shift;
# Send specific data type to user
my @frozen;
foreach my $host ($Hosts->hosts_sorted) {
if (defined $host->{$type}) {
#_timelog("Host $host name $host->{hostname}\n") if $Debug;
my %params = (table => $host->{$type},
type => $type,
hostname => $host->{hostname},
);
# Rather then sending lots of little packets, join them all up to send
# in one large packet.
push @frozen, _pfreeze ("host", \%params, 0&&$Debug);
}
}
if (0==_client_send ($client, join('',@frozen))) {
# Send failed
}
}
######################################################################
sub _user_done_action {
my $userclient = shift;
my $callback = shift;
my $argsref = shift;
$userclient->{wait_count} = 0;
$userclient->{wait_action} = $callback;
$userclient->{wait_action_argsref} = $argsref;
}
sub _user_done_mark {
my $host = shift;
my $userclient = shift;
# Mark this user as needing new info from host before returning status
$host->{waiters}{$userclient} = 1;
$userclient->{wait_count} ++;
}
sub _user_done_finish {
my $host = shift;
# Host finished, dec count see if done with everything client needed
foreach my $userclient (keys %{$host->{waiters}}) {
_timelog("Dewait $host $userclient\n") if $Debug;
delete $host->{waiters}{$userclient};
$userclient->{wait_count} --;
_user_done_check($userclient);
}
}
sub _user_done_check {
my $userclient = shift;
if ($userclient->{wait_count} == 0) {
_timelog("Dewait *DONE*\n") if $Debug;
&{$userclient->{wait_action}} (@{$userclient->{wait_action_argsref}});
$userclient->{wait_action} = undef; # Done, prevent leaks
$userclient->{wait_action_argsref} = undef; # Done, prevent leaks
}
}
######################################################################
######################################################################
######################################################################
######################################################################
#### Scheduling
sub _user_schedule_sendback {
my $userclient = shift;
my $schparams = shift;
# Schedule and return results to the user
my $schresult = _schedule ($schparams);
_client_send ($userclient, _pfreeze ("schrtn", $schresult, $Debug));
_client_done ($userclient);
}
sub _user_schedule {
my $userclient = shift;
my $schparams = shift;
_user_done_action ($userclient,
\&_user_schedule_sendback, [$userclient, $schparams]);
_user_all_hosts_cmd ($userclient, "report_get_dynamic\n");
_user_done_check($userclient);
}
sub _schedule_one_resource {
my $schparams = shift;
my $resreq = shift; # ResourceReq reference
my $resscratch = shift; # Passed to user's callback; not safe for internals; they may modify it!
#Factors:
# hosts_match: reserved, match_cb, classes
# -> Things that absolutely must be correct to schedule here
# rating: rating_cb: load_limit, cpus, clock, adj_load, tot_pctcpu, rating_adder, rating_mult
# -> How to prioritize, if 0 it's overbooked
# loads_avail: holds, fixed_load
# -> How many more jobs host can take before we should turn off new jobs
# Note we need to subtract resources which aren't scheduled yet, but have higher
# priorities then this request. This allows for a pool request of 10 machines
# to eventually start without being starved by little 1 machine requests that keep
# getting issued.
my $bestref = undef;
my $bestrating = undef;
my $favorref = undef;
my $favorhost = 0; $favorhost = $Hosts->get_host($resreq->{favor_host}) || 0 if ($resreq->{favor_host});
my $freecpus = 0;
my $totcpus = 0;
# hosts_match can be slow, plus it constructs a list. It's faster to loop here.
foreach my $host ($Hosts->hosts_sorted) {
# host_match takes: classes, match_cb, allow_reserved
# we can remove $resscratch from here when code migrates to use rating_cb instead
next if !$host->host_match_chooser($resreq,$resscratch);
# Process the host
$totcpus += $host->cpus;
my $rating = $host->rating_chooser ($resreq->{rating_cb},$resscratch);
_timelog("\tTest host ", $host->hostname," rate ",$rating,", cpus ",$host->cpus,", free ",$host->free_cpus,"\n") if $Debug;
#_timelog("\t adj_load ",$host->adj_load,", load_limit ",$host->load_limit,"\n") if $Debug;
#_timelog(Data::Dumper->Dump([$host], ['host']),"\n") if $Debug;
if ($rating > 0) {
my $machfreecpus = $host->free_cpus;
$freecpus += $machfreecpus;
if (!$schparams->{allow_none} || $machfreecpus) {
# Else, w/allow_none even if this host has cpu time
# left and a better rating, it might not have free job slots
if ($host == $favorhost && $machfreecpus) {
# Found the favored host has resources, force it to win
$favorref = $host;
$bestref = undef; # For next if statement to catch
}
if (!defined $bestref
|| (($rating < $bestrating) && !$favorref)) {
$bestref = $host;
$bestrating = $rating;
}
}
}
}
my $jobs = $freecpus;
if ($resreq->{max_jobs}<=0) { # Fraction that's percent of clump if negative
$jobs = _min($jobs, int($totcpus * (-$resreq->{max_jobs})) - ($resreq->{jobs_running}||0));
} else {
$jobs = _min($jobs, $resreq->{max_jobs} - ($resreq->{jobs_running}||0));
}
my $keep_idle = $resreq->{keep_idle_cpus} || 0;
if (($resreq->{keep_idle_cpus}||0) < 0) { # Fraction that's percent of clump if negative
$keep_idle = _max($keep_idle, int($totcpus * (-$resreq->{keep_idle_cpus})));
}
if ($schparams->{allow_none} && ($jobs<1 || $freecpus < $keep_idle)) {
$bestref = undef;
}
$jobs = _max($jobs, 1);
_timelog(" _Schedule_one Best ".($bestref?1:'none')
." Jobs $jobs Totcpu $totcpus Free $freecpus Running ".($resreq->{jobs_running}||0)
." Max $resreq->{max_jobs} KI $keep_idle\n") if $Debug;
return ($bestref,$jobs);
}
sub _schedule {
# Choose the best host and total resources available for scheduling
my $schparams = shift; #allow_none=>$, hold=>ref, requests=>[ref,ref...]
_timelog("_schedule $schparams->{hold}{hold_key}\n") if $Debug;
# Clear holds for this request, the user may have scheduled (and failed) earlier.
my $schhold = $schparams->{hold};
if (my $oldhold = $Holds{$schhold->hold_key}) {
# Keep a old req_time, as a new identical request doesn't deserve to move to the end of the queue
# This also prevents livelock problems where a scheduled hold was issued to the oldest request,
# then that request returns and is no longer the oldest.
$schhold->{req_time} = _min($schhold->{req_time}, $oldhold->{req_time});
_hold_delete($oldhold);
}
_holds_clear_unallocated();
_holds_adjust();
_hold_add_schreq($schparams);
$schparams->{hold} = undef; # Now have schparams under hold, don't need circular reference
# Loop through all requests and issue hold keys to those we can
_timelog("_schedule_loop $schhold->{hold_key}\n") if $Debug;
my $resdone = 1;
my @reshostnames = ();
my $resjobs;
foreach my $hold (sort {$a->compare_pri_time($b)} (values %Holds)) {
my $schreq = $hold->{schreq};
next if !$schreq;
# Careful, we generally want $schreq rather then $schparams in this loop...
_timelog(" SCHREQ for $hold->{hold_key}\n") if $Debug;
my %resscratch = ( partial_hosts=>[] ); # Passed to the user's callback
foreach my $resref (@{$schreq->{resources}}) {
_timelog(" Ressch for $hold->{hold_key}\n") if $Debug;
my ($bestref,$jobs) = _schedule_one_resource($schreq,$resref,\%resscratch);
my $okref = $bestref;
if ($bestref) {
# Found at least one CPU slot for this job
_timelog(" Resdn $jobs on ",$bestref->hostname," for $hold->{hold_key}\n") if $Debug;
# We may have only gotten a single free CPU out of many wanted.
# If this requires much tweaking, we'll make it a callback insted.
if (my $limit = $bestref->get_undef('load_limit')) {
my $wantload = _hold_load_host_adjusted($hold,$bestref);
if (($limit - $bestref->adj_load) < $wantload) {
_timelog(" **Not all CPUs ready on ",$bestref->hostname
," (($limit-",$bestref->adj_load,")<$wantload),"
," for $hold->{hold_key}\n") if $Debug;
$okref = undef;
}
}
# Hold this resource so next schedule loop doesn't hit it
_hold_add_host($hold, $bestref);
$bestref = undef; # Don't use bestref below, use okref
}
if ($okref) {
# Found all the needed loads to complete this resource request
push @{$resscratch{partial_hosts}}, $okref; # For the user's rating_cb
}
if ($hold == $schhold) { # We're scheduling the one the user asked for
$resjobs = $jobs;
if ($okref) {
push @reshostnames, $okref->hostname;
} else {
# None found, we didn't schedule all resources it wanted
# Note there may be other resources it wants, so continue the loop...
$resdone = 0;
}
}
}
}
# If we scheduled ok, move the hold to a assigment, so next schedule doesn't kill it
if ($resdone) {
$schhold->{allocated} = 1; # _holds_clear_unallocated checks this
$schhold->{schreq} = undef; # So we don't schedule it again
# We don't need to do another hold_new, since we've changed the reference each host points to.
}
_timelog("DONE_HOLDS: ",Data::Dumper::Dumper (\%Holds)) if $Debug;
# Return the list of hosts we scheduled
if ($resdone) {
return {jobs => $resjobs,
best => \@reshostnames,
hold => $schhold,
};
} else {
return {jobs => $resjobs,
best => undef,
hold => undef,
};
}
}
######################################################################
######################################################################
#### Holds
sub _hold_add_schreq {
my $schreq = shift;
# Add this request to the ordered list of requests
$Holds{$schreq->{hold}->hold_key} = $schreq->{hold};
my $hold = $Holds{$schreq->{hold}->hold_key};
$hold->{schreq} = $schreq;
$hold->{schreq}{hold} = undef; # Now have schreq under hold, don't need circular reference
}
sub _hold_add_host {
my $hold = shift;
my $host = shift;
# Hostnames must be a list, not a hash as we can have multiple holds
# w/same request applying to the same host.
$hold->{hostnames} ||= [];
push @{$hold->{hostnames}}, $host->hostname;
# Not: _holds_adjust, save unnecessary looping and just add the load directly.
$host->{dynamic}{adj_load} += _hold_load_host_adjusted($hold,$host);
}
sub _hold_load_host_adjusted {
my $hold = shift;
my $host = shift;
return (($hold->{hold_load}>=0) ? $hold->{hold_load} : $host->cpus);
}
sub _hold_delete {
my $hold = shift;
# Remove a load hold under speced key
return if !defined $hold;
_timelog("_hold_delete($hold->{hold_key})\n") if $Debug;
delete $Holds{$hold->{hold_key}}{schreq}; # So don't loose memory from circular reference
delete $Holds{$hold->{hold_key}};
}
sub _holds_clear_unallocated {
# We're going to start a schedule run, delete any holds not on resources
# that have been truely allocated
foreach my $hold (values %Holds) {
if (!$hold->{allocated}) {
$hold->{hostnames} = []; # Although we delete, there may still be other references to it...
}
}
}
sub _hold_timecheck {
# Called once every 3 seconds.
# See if any holds have expired; if so delete them
#_timelog("hold_timecheck $Time\n") if $Debug;
foreach my $hold (values %Holds) {
$hold->{expires} ||= ($Time + ($hold->{hold_time}||10));
if ($Time > $hold->{expires}) {
#_timelog("HOST DONE MARK $host $hostname $key EXP $hold->{expires}\n") if $Debug;
# Same cleanup below in _exist_traffic
_hold_delete ($hold);
} else {
$Exister->pid_request(host=>$hold->{req_hostname}, pid=>$hold->{req_pid});
}
}
}
sub _holds_adjust {
# Adjust loading on all machines to make up for scheduler holds
_timelog("HOLDS: ",Data::Dumper::Dumper (\%Holds)) if $Debug;
# Reset adjusted loads
foreach my $host ($Hosts->hosts_unsorted) {
$host->{dynamic}{adj_load} = $host->{dynamic}{report_load};
$host->{dynamic}{holds} = [];
}
# adj_load is the report_load plus any hold_keys allocated on a specific host
# plus any hold_keys waiting to finish their scheduling run
foreach my $hold (values %Holds) {
foreach my $hostname (@{$hold->{hostnames}}) {
my $host = $Hosts->get_host($hostname);
if (!$host) {
# This can happen when we do a hold on a host and that host's reporter
# then goes down. It's harmless, as all will be better when it comes back.
warn "No host $hostname" if $Debug;
} else {
$host->{dynamic}{adj_load} += _hold_load_host_adjusted($hold,$host);
push @{$host->{dynamic}{holds}}, $hold;
}
}
}
}
######################################################################
sub _exist_traffic {
# Handle UDP responses from our $Exister->pid_request calls.
#_timelog("UDP PidStat in...\n") if $Debug;
my ($pid,$exists,$onhost) = $Exister->recv_stat();
return if !defined $pid;
return if !defined $exists || $exists; # We only care about known-missing processes
_timelog(" UDP PidStat PID $onhost:$pid no longer with us. RIP.\n") if $Debug;
# We don't maintain a table sorted by pid, as these messages
# are rare, and there can be many holds per pid.
foreach my $hold (values %Holds) {
if ($hold->{req_pid}==$pid && $hold->{req_hostname} eq $onhost) {
# Same cleanup above when timer expires
_hold_delete ($hold);
}
}
}
######################################################################
######################################################################
#### Information to pass up to "rschedule status" for debugging
sub _make_chooinfo {
my $delta_time = shift;
# Load information we want to pass up to rschedule for debugging chooser
# details from a client application
$ChooInfo{last_command_delay} = $delta_time;
$ChooInfo{schreqs} = {};
foreach my $hold (values %Holds) {
next if !$hold->{schreq};
$ChooInfo{schreqs}{$hold->hold_key} = $hold;
}
}
######################################################################
######################################################################
#### Little stuff
sub _timelog {
my $msg = join('',@_);
my ($time, $time_usec) = gettimeofday();
my ($sec,$min,$hour,$mday,$mon) = localtime($time);
printf +("[%02d/%02d %02d:%02d:%02d.%06d] %s",
$mon+1, $mday, $hour, $min, $sec, $time_usec, $msg);
}
######################################################################
######################################################################
#### Package return
1;
######################################################################
__END__
=pod
=head1 NAME
Schedule::Load::Chooser - Distributed load choosing daemon
=head1 SYNOPSIS
use Schedule::Load::Chooser;
Schedule::Load::Chooser->start(port=>1234,);
=head1 DESCRIPTION
L<Schedule::Load::Chooser> on startup creates a daemon that clients can
connect to using the Schedule::Load package.
=over 4
=item start ([parameter=>value ...]);
Starts the chooser daemon. Does not return.
=back
=head1 PARAMETERS
=over 4
=item port
The port number of slchoosed. Defaults to 'slchoosed' looked up via
/etc/services, else 1752.
=item ping_dead_timeout
Seconds after which if a client doesn't respond to a ping, it is considered
dead.
=back
=head1 DISTRIBUTION
The latest version is available from CPAN and from L<http://www.veripool.com/>.
Copyright 1998-2006 by Wilson Snyder. This package is free software; you
can redistribute it and/or modify it under the terms of either the GNU
Lesser General Public License or the Perl Artistic License.
=head1 AUTHORS
Wilson Snyder <wsnyder@wsnyder.org>
=head1 SEE ALSO
L<Schedule::Load>, L<slchoosed>
=cut
syntax highlighted by Code2HTML, v. 0.9.1