package MogileFS::Worker::Delete;
# deletes files
use strict;
use base 'MogileFS::Worker';
use MogileFS::Util qw(error);
# we select 1000 but only do a random 100 of them, to allow
# for stateless paralleism
use constant LIMIT => 1000;
use constant PER_BATCH => 100;
# TODO: use LWP and persistent connections to do deletes. less local ports used.
sub new {
my ($class, $psock) = @_;
my $self = fields::new($class);
$self->SUPER::new($psock);
return $self;
}
sub watchdog_timeout { 60 }
sub work {
my $self = shift;
my $sleep_for = 0; # we sleep longer and longer until we hit max_sleep
my $sleep_max = 5; # max sleep when there's nothing to do.
# wait for one pass of the monitor
$self->wait_for_monitor;
PASS:
while (1) {
$self->parent_ping;
$self->validate_dbh;
# see if we have anything from the parent
my $start_time = time();
my $end_time = $start_time + 5;
while (1) {
# report in to parent periodically
next PASS if time() >= $end_time;
# call our workers, and have them do things
# RETVAL = 0; I think I am done working for now
# RETVAL = 1; I have more work to do
my $tempres = $self->process_tempfiles;
$self->reenqueue_delayed_deletes;
my $delres = $self->process_deletes;
# unless someone did some work, let's sleep
unless ($tempres || $delres) {
$sleep_for++ if $sleep_for < $sleep_max;
sleep $sleep_for;
next PASS;
}
$sleep_for = 0;
}
}
}
sub process_tempfiles {
my $self = shift;
# also clean the tempfile table
#mysql> select * from tempfile where createtime < unix_timestamp() - 86400 limit 50;
#+--------+------------+---------+------+---------+--------+
#| fid | createtime | classid | dmid | dkey | devids |
#+--------+------------+---------+------+---------+--------+
#| 3253 | 1149451058 | 1 | 1 | file574 | 1,2 |
#| 4559 | 1149451156 | 1 | 1 | file83 | 1,2 |
#| 11024 | 1149451697 | 1 | 1 | file836 | 2,1 |
#| 19885 | 1149454542 | 1 | 1 | file531 | 1,2 |
# BUT NOTE:
# the fids might exist on one of the devices in devids column if we assigned them those,
# they wrote some to one of them, then they died or for wahtever reason didn't create_close
# to use, so we shouldn't delete from tempfile before going on a hunt of the missing fid.
# perhaps we should just add to the file_on table for both devids, and let the regular delete
# process discover via 404 that they're not there.
# so we should:
# select fid, devids from tempfile where createtime < unix_timestamp() - 86400
# add file_on rows for both of those,
# add fid to fids_to_delete table,
# delete from tempfile where fid=?
# dig up some temporary files to purge
my $sto = Mgd::get_store();
my $too_old = int($ENV{T_TEMPFILE_TOO_OLD} || 3600);
my $tempfiles = $sto->old_tempfiles($too_old);
return 0 unless $tempfiles && @$tempfiles;
# insert the right rows into file_on and file_to_delete and remove the
# now expunged (or soon to be) rows from tempfile
my (@devfids, @fidids);
foreach my $row (@$tempfiles) {
push @fidids, $row->[0];
# sanity check the string column.
my $devids = $row->[1];
unless ($devids =~ /^(\d+)(,\d+)*$/) {
$devids = "";
}
foreach my $devid (split /,/, $devids) {
push @devfids, MogileFS::DevFID->new($devid, $row->[0]);
}
}
$sto->mass_insert_file_on(@devfids);
$sto->enqueue_fids_to_delete(@fidids);
$sto->dbh->do("DELETE FROM tempfile WHERE fid IN (" . join(',', @fidids) . ")");
return 1;
}
sub process_deletes {
my $self = shift;
my $sto = Mgd::get_store();
my $dbh = $sto->dbh;
my $delmap = $dbh->selectall_arrayref("SELECT fd.fid, fo.devid ".
"FROM file_to_delete fd ".
"LEFT JOIN file_on fo ON fd.fid=fo.fid ".
"LIMIT " . LIMIT);
my $count = $delmap ? scalar @$delmap : 0;
return 0 unless $count;
my $done = 0;
foreach my $dm (List::Util::shuffle(@$delmap)) {
last if ++$done > PER_BATCH;
$self->still_alive;
$self->read_from_parent;
my ($fid, $devid) = @$dm;
error("deleting fid $fid, on devid ".($devid || 'NULL')."...") if $Mgd::DEBUG >= 2;
my $done_with_fid = sub {
my $reason = shift;
$dbh->do("DELETE FROM file_to_delete WHERE fid=?", undef, $fid);
$sto->condthrow("Failure to delete from file_to_delete for fid=$fid");
};
my $done_with_devid = sub {
my $reason = shift;
$dbh->do("DELETE FROM file_on WHERE fid=? AND devid=?",
undef, $fid, $devid);
$sto->condthrow("Failure to delete from file_on for $fid/$devid");
die "Failed to delete from file_on: " . $dbh->errstr if $dbh->err;
};
my $reschedule_fid = sub {
my ($secs, $reason) = (int(shift), shift);
$sto->insert_ignore("INTO file_to_delete_later (fid, delafter) ".
"VALUES (?,".$sto->unix_timestamp."+$secs)", undef,
$fid);
error("delete of fid $fid rescheduled: $reason") if $Mgd::DEBUG >= 2;
$done_with_fid->("rescheduled");
};
# Cases:
# devid is null: doesn't exist anywhere anymore, we're done with this fid.
# devid is observed down/readonly: delay for 10 minutes
# devid is marked readonly: delay for 2 hours
# devid is marked dead or doesn't exist: consider it deleted on this devid.
# CASE: devid is null, which means we're done deleting all instances.
unless (defined $devid) {
$done_with_fid->("no_more_locations");
next;
}
# CASE: devid is marked dead or doesn't exist: consider it deleted on this devid.
# (Note: we're tolerant of '0' as a devid, due to old buggy version which
# would sometimes put that in there)
my $dev = $devid ? MogileFS::Device->of_devid($devid) : undef;
unless ($dev && $dev->exists) {
$done_with_devid->("devid_doesnt_exist");
next;
}
if ($dev->dstate->is_perm_dead) {
$done_with_devid->("devid_marked_dead");
next;
}
# CASE: devid is observed down/readonly: delay for 10 minutes
unless ($dev->observed_writeable) {
$reschedule_fid->(60 * 10, "not_observed_writeable");
next;
}
# CASE: devid is marked readonly/down/etc: delay for 2 hours
unless ($dev->can_delete_from) {
$reschedule_fid->(60 * 60 * 2, "devid_marked_not_alive");
next;
}
my $dfid = MogileFS::DevFID->new($dev, $fid);
my $path = $dfid->url;
# dormando: "There are cases where url can return undefined,
# Mogile appears to try to replicate to bogus devices
# sometimes?"
unless ($path) {
error("in deleter, url(devid=$devid, fid=$fid) returned nothing");
next;
}
my $urlparts = MogileFS::Util::url_parts($path);
# hit up the server and delete it
# TODO: (optimization) use MogileFS->get_observed_state and don't try to delete things known to be down/etc
my $sock = IO::Socket::INET->new(PeerAddr => $urlparts->[0],
PeerPort => $urlparts->[1],
Timeout => 2);
unless ($sock) {
# timeout or something, mark this device as down for now and move on
$self->broadcast_host_unreachable($dev->hostid);
$reschedule_fid->(60 * 60 * 2, "no_sock_to_hostid");
next;
}
# send delete request
error("Sending delete for $path") if $Mgd::DEBUG >= 2;
$sock->write("DELETE $urlparts->[2] HTTP/1.0\r\n\r\n");
my $response = <$sock>;
if ($response =~ m!^HTTP/\d+\.\d+\s+(\d+)!) {
if (($1 >= 200 && $1 <= 299) || $1 == 404) {
# effectively means all went well
$done_with_devid->("deleted");
} else {
# remote file system error? mark node as down
my $httpcode = $1;
error("Error: unlink failure: $path: HTTP code $httpcode");
$reschedule_fid->(60 * 30, "http_code_$httpcode");
next;
}
} else {
error("Error: unknown response line deleting $path: $response");
}
}
# as far as we know, we have more work to do
return 1;
}
sub reenqueue_delayed_deletes {
my $self = shift;
my $sto = Mgd::get_store();
my $dbh = $sto->dbh;
my @fidids = $sto->fids_to_delete_again
or return;
$sto->enqueue_fids_to_delete(@fidids);
$dbh->do("DELETE FROM file_to_delete_later WHERE fid IN (" .
join(",", @fidids) . ")");
$sto->condthrow("reenqueue file_to_delete_later delete");
}
1;
# Local Variables:
# mode: perl
# c-basic-indent: 4
# indent-tabs-mode: nil
# End:
syntax highlighted by Code2HTML, v. 0.9.1