simba/lib/Simba/CA.pm

1003 lines
35 KiB
Perl

#!/usr/bin/perl
=head1 NAME
Simba::CA
=head1 DESCRIPTION
Collecting Agent of the Simba backup system.
This class represents one instance of a running collecting agent.
The only user-callable methods are the constructor new and the instance
method run, which collects all the files from various disk agents.
The Simba::CA object is a hashref with the following keys:
=over
=item basedir
=item unknown_uid
=item unknown_gid
=item fh_log
=item log_level
=item dbh
=item targets
A list of entries (hashes) from table filesets.
=item ssh_id_file
=item target
=item last_backup
=item last_backup_id
=item timestamp
=item this_backup
=item session_id
=item file_pid
=item file_cfd
=item file_dfd
=item start_time
Timestamp when the backup of the current target started. Used to test when to abort due to timeout.
=back
=cut
package Simba::CA;
use strict;
use warnings;
use Encode;
use IPC::Open2;
use POSIX qw(strftime);
use Simba::Util qw(quote unquote typestr);
use Readonly;
use Digest::SHA;
use List::Util qw(min);
use IO::Handle;
use File::stat;
use Scalar::Util qw(tainted);
use DBI;
use Time::HiRes qw(gettimeofday);
Readonly my $BUFSIZE => 128 * 1024;
sub new {
my ($class, $opt) = @_;
my $self = {};
bless $self, $class;
$self->{basedir} = '/backup';
$self->{unknown_uid} = 65533;
$self->{unknown_gid} = 65533;
$self->{fh_log} = exists($opt->{fh_log}) ? $opt->{fh_log} : \*STDERR;
$self->{log_level} = 99;
$self->{record_file_id} = 0;
$self->{record_time} = 0;
$self->{parallel} = $opt->{parallel};
if ($opt->{dbi_file}) {
my $fn = $opt->{dbi_file};
open(FN, "<$fn") or die "cannot open $fn: $!";
my $line = <FN>;
close(FN);
my @cred = split(/[\s\n]+/, $line);
$self->{dbi} = \@cred;
}
$self->{dbh} = DBI->connect(@{ $self->{dbi} },
{ AutoCommit => 0,
PrintError => 1,
RaiseError => 1
}
);
$self->{instances_part_size} = 10_000_000;
$self->adjust_partitions;
# Order by ascending sort order with nulls last:
# MySQL considers NULL to be smaller than any number, so negate the numbers
# (making the largest the smallest) and then sort descending.
# Idea from https://www.designcise.com/web/tutorial/how-to-order-null-values-first-or-last-in-mysql
$self->{targets} = $self->{dbh}->selectall_arrayref("select * from filesets order by -sortorder desc", { Slice => {} });
if ($opt->{filesets}) {
$self->{targets} =
[
grep {
my $id = $_->{id};
grep { $id == $_ } @{ $opt->{filesets} }
} @{ $self->{targets} }
];
} else {
$self->{targets} =
[
grep {
$_->{active};
} @{ $self->{targets} }
];
}
if ($ENV{HOME} =~ m{([/\w]*)}) {
if (-f "$1/.ssh/id_rsa") {
if (my $st = stat("$1/.ssh/id_rsa")) {
if ($st->uid == $>) {
$self->{ssh_id_file} = "$1/.ssh/id_rsa";
}
}
}
}
return $self;
}
sub run {
my ($self) = @_;
# run sequentially for prototype. In production we probably
# want some concurrency
if ($self->{parallel}) {
$self->{dbh}->disconnect();
my %running = ();
for my $target (@{$self->{targets}}) {
$self->log(3, "found target host " . $target->{host} . " dir " . $target->{dir});
while (scalar keys %running >= $self->{parallel}) {
$self->log(3, "reached parallel limit - waiting");
my $pid = wait();
delete $running{$pid};
$self->log(3, "child with pid $pid terminated, " . (scalar keys %running) . " remaining");
}
my $pid = fork();
if (!defined($pid)) {
die "fork failed: $!";
}
if ($pid == 0) {
$self->{dbh} = DBI->connect(@{ $self->{dbi} },
{ AutoCommit => 0,
PrintError => 1,
RaiseError => 1
}
);
$self->backup2disk($target);
$self->{dbh}->disconnect();
exit(0);
} else {
$running{$pid} = 1;
$self->log(3, "child with pid $pid started, " . (scalar keys %running) . " running");
}
sleep(10);
}
while (scalar keys %running) {
my $pid = wait();
delete $running{$pid};
$self->log(3, "child with pid $pid terminated, " . (scalar keys %running) . " remaining");
}
$self->{dbh} = DBI->connect(@{ $self->{dbi} },
{ AutoCommit => 0,
PrintError => 1,
RaiseError => 1
}
);
} else {
for my $target (@{$self->{targets}}) {
$self->backup2disk($target);
}
}
$self->export();
}
sub backup2disk {
my ($self, $target) = @_;
unless ($self->reserve_fileset($target)) {
return;
}
$self->log(3, "starting backup for target host " . $target->{host} . " dir " . $target->{dir});
$self->{target} = $target;
$self->{start_time} = time();
# get previous generation
$self->get_last_session();
my $timestamp = $self->{timestamp} || strftime('%Y-%m-%dT%H.%M.%S', localtime);
$self->{this_backup} = $self->{basedir} . "/$timestamp/" . $target->{host} . '/' . $target->{dir};
$self->new_session();
my ($list_pid, $list_cfd, $list_dfd); # connection to get list of files
$list_pid = open2($list_dfd, $list_cfd,
"/usr/bin/ssh",
"-l", "simba_da",
$self->{ssh_id_file} ? ("-i", $self->{ssh_id_file}) : (),
$target->{host}, "da");
$list_cfd->printflush("list $target->{dir}\n"); # XXX - encode!
close($list_cfd);
my $count = 0;
my $last_report = $self->{start_time};
while (<$list_dfd>) {
my $now = time();
if ($target->{timeout}) {
my $elapsed = $now - $self->{start_time};
$self->log(10, "checking timeout " . $elapsed . " > " . $target->{timeout});
if ($elapsed > $target->{timeout}) {
$self->log(3, "Timeout exceeded. $elapsed > $target->{timeout}");
last;
}
}
$count++;
chomp;
$self->log(10, "file: $_");
# split into fields
chomp;
my $f = $self->parse($_);
if ($now - $last_report >= 10) {
$self->log(9, "file $count: $f->{name}");
$last_report = $now;
}
my $success = 1;
if ($f->{t} eq 'f') {
$success = $self->store_file($f);
} elsif ($f->{t} eq 'd') {
my $d = "$self->{this_backup}/$f->{name}";
$d =~ s,//+,/,g;
mkdir_p($d) or die "cannot mkdir $d: $!"; # XXX
$self->setmeta($f);
} elsif ($f->{t} eq 'l') {
my $l = "$self->{this_backup}/$f->{name}";
unless (symlink($f->{lt}, $l)) {
die "cannot symlink $l -> $f->{lt}: $!"; # XXX
}
# $self->setmeta($f); ignore for symlinks. would need to use
# lchown, lchmod, etc.
} else {
# create local copy (or insert into DB only?)
$self->log(5, "ignored $_\n");
}
# insert into DB.
$self->db_record_version($target, $f) if ($success);
}
$self->flush_insert_instances();
$self->close_session();
$self->release_fileset($target);
$self->log(3, "finished backup for target host " . $target->{host} . " dir " . $target->{dir} . ": $count files");
$self->{counts}{objects} += $count;
$self->log(3, "statistics:");
for (sort keys %{ $self->{counts} }) {
$self->log(3, " $_: $self->{counts}{$_}");
}
for (sort keys %{ $self->{times} }) {
$self->log(3, " $_: $self->{times}{$_} s");
}
}
sub reserve_fileset {
my ($self, $target) = @_;
my $start = time();
my $pid;
while (time() - $start < 3600) {
my $rows = $self->{dbh}->do(q{update filesets set pid=? where id = ? and pid is null}, {}, $$, $target->{id});
return 1 if $rows == 1;
$pid = $self->{dbh}->selectrow_array(q{select pid from filesets where id = ?}, {}, $target->{id});
$self->log(3, "fileset $target->{id} appears to be in use by pid $pid");
if (!kill(0, $pid) && $!{ESRCH}) {
$self->log(3, "pid $pid doesn't exist, trying to release fileset $target->{id}");
$self->{dbh}->do(q{update filesets set pid=null where id = ? and pid=?}, {}, $target->{id}, $pid);
}
sleep 60;
}
$self->log(2, "fileset $target->{id} appears to be still in use by pid $pid after grace period - giving up");
return 0;
}
sub release_fileset {
my ($self, $target) = @_;
$self->log(3, "releasing fileset $target->{id}");
$self->{dbh}->do(q{update filesets set pid=null where id=? and pid=?}, {}, $target->{id}, $$);
$self->{dbh}->commit();
}
sub parse {
my ($self, $s) = @_;
my @s = split(/ +/, $s);
my $f = {};
$f->{name} = shift @s;
$f->{name} = $1 if ($f->{name} =~ /(.*)/); # detaint XXX
for (@s) {
my ($k, $v) = split(/=/, $_, 2);
$f->{$k} = $v;
# special processing for permissions etc, here?
}
$f->{o} = unquote($f->{o});
$f->{g} = unquote($f->{g});
$f->{acl} = unquote($f->{acl});
$f->{m} = $1 if $f->{m} =~ /^(\d+)$/;
$f->{lt} = unquote($1) if defined $f->{lt} && $f->{lt} =~ /(.*)/;
return $f;
}
sub present {
my ($self, $f) = @_;
return unless $self->{last_backup};
my $st = lstat("$self->{last_backup}/$f->{name}");
return unless $st;
if ($st->mtime == $f->{m} &&
$st->size == $f->{s} &&
$st->uid == $self->name2uid($f->{o}) &&
$st->gid == $self->name2gid($f->{g}) &&
($st->mode & 07777) == $self->acl2mode($f)
) {
return 1;
} else {
return 0;
}
}
sub mkdir_p {
my ($dir, $perm) = @_;
$perm = 0777 unless(defined($perm));
if (-d $dir) {
return 1;
} elsif (mkdir($dir, $perm)) {
return 1;
} elsif ($!{ENOENT}) {
my $parentdir = $dir;
$parentdir =~ s|(.*)/.+|$1|;
mkdir_p($parentdir, $perm);
if (-d $dir) {
return 1;
} else {
return mkdir($dir, $perm);
}
} else {
return undef;
}
}
sub basedir {
my ($self, $dir) = @_;
$self->{basedir} = $dir if defined($dir);
return $self->{basedir};
}
sub targets {
my ($self, $targets) = @_;
$self->{targets} = $targets if defined($targets);
return $self->{targets};
}
sub add_target {
my ($self, $target) = @_;
push @{ $self->{targets} }, $target;
return $self->{targets};
}
my %permstrbits = (
'---' => 0,
'--x' => 1,
'-w-' => 2,
'-wx' => 3,
'r--' => 4,
'r-x' => 5,
'rw-' => 6,
'rwx' => 7,
);
sub setmeta {
my ($self, $f) = @_;
my $fn = "$self->{this_backup}/$f->{name}";
$self->log(3, "$fn is tainted!") if tainted($fn);
my $mode = $self->acl2mode($f);
$self->log(3, "$mode is tainted!") if tainted($mode);
chown($self->name2uid($f->{o}), $self->name2gid($f->{g}), $fn);
chmod($mode, $fn);
utime(time, $f->{m}, $fn);
}
# computes the mode from the acl (and the set[ug]id and sticky bits)
# and returns it. Optional ACL entries are currently ignored but should
# eventually be returned as a second value.
sub acl2mode {
my ($self, $f) = @_;
my $mode = 0;
if ($f->{acl}) {
for my $ace (split(',', $f->{acl})) {
if ($ace =~ /^u::(...)$/) {
$mode |= ($permstrbits{$1} << 6);
} elsif ($ace =~ /^g::(...)$/) {
$mode |= ($permstrbits{$1} << 3);
} elsif ($ace =~ /^o:(...)$/) {
$mode |= ($permstrbits{$1} << 0);
} else {
$self->log(5, "warning: unknown ACE $ace ignored");
}
}
}
if ($f->{setuid}) { $mode |= 04000 }
if ($f->{setgid}) { $mode |= 02000 }
if ($f->{sticky}) { $mode |= 01000 }
return $mode;
}
my %ucache;
sub name2uid {
my ($self, $uname) = @_;
$uname = $1 if $uname =~ /(.*)/; # detaint
return $ucache{$uname} if (defined $ucache{$uname});
if ($uname =~ /^\d+$/) {
return $ucache{$uname} = $uname;
} else {
my $uid = getpwnam($uname);
if (defined($uid)) {
return $ucache{$uname} = $uid;
} else {
return $ucache{$uname} = $self->{unknown_uid};
}
}
}
my %gcache;
sub name2gid {
my ($self, $gname) = @_;
$gname = $1 if $gname =~ /(.*)/; # detaint
return $gcache{$gname} if (defined $gcache{$gname});
if ($gname =~ /^\d+$/) {
return $gcache{$gname} = $gname;
} else {
my $gid = getgrnam($gname);
if (defined($gid)) {
return $gcache{$gname} = $gid;
} else {
return $gcache{$gname} = $self->{unknown_gid};
}
}
}
# currently used log levels:
# 0 - fatal error, backup failed
# 3 - global progress messages, like start and end of a backup, statistics.
# 5 - errors which prevent a file from being backed up
# 10 - progress messages for single files.
sub log {
my ($self, $level, $msg) = @_;
if ($level <= $self->{log_level}) {
$self->{fh_log}->print(strftime("%Y-%m-%dT%H:%M:%S%z", localtime), " $$ [$level]: $msg\n")
or die "write to log failed: $!";
}
}
sub log_level {
my ($self, $log_level) = @_;
$self->{log_level} = $log_level if defined($log_level);
return $self->{log_level};
}
sub db_record_version {
my ($self, $target, $f) = @_;
my $t0 = gettimeofday();
my $db_f = $self->{dbh}->selectall_arrayref("select * from files where fileset=? and path=?",
{ Slice => {} },
$target->{id}, $f->{name});
my $t1 = gettimeofday();
$self->{times}{db_record_version_select_files} += $t1 - $t0;
unless (@$db_f) {
$self->{dbh}->do("insert into files(fileset, path) values(?, ?)",
{},
$target->{id}, $f->{name});
$db_f = $self->{dbh}->selectall_arrayref("select * from files where fileset=? and path=?",
{ Slice => {} },
$target->{id}, $f->{name});
}
my $t2 = gettimeofday();
$self->{times}{db_record_version_insert_files} += $t2 - $t1;
if ($f->{t} eq 'f' && !defined($f->{checksum})) {
# this must be a link to the previous version
$f->{checksum} = $self->get_checksum($db_f->[0]{id});
}
my $t2a = gettimeofday();
$self->{times}{db_record_version_versions2_get_checksum} += $t2a - $t2;
my $query =
"select id from versions2
where
file_type=? and file_size=? and file_mtime=? and
file_owner=? and file_group=? and file_acl=? and
file_unix_bits=?
";
my @args = (
$f->{t}, $f->{s}, $f->{m},
$f->{o}, $f->{g}, $f->{acl},
join(',', map {$f->{$_} ? ($_) : ()} qw(setuid setgid sticky)),
);
if ($f->{t} eq 'f') {
$query .= " and checksum=?";
push @args, $f->{checksum};
} elsif ($f->{t} eq 'l') {
$query .= " and file_linktarget=?";
push @args, $f->{lt};
} elsif ($f->{t} eq 'b' || $f->{t} eq 'c') {
$query .= " and file_rdev=?";
push @args, $f->{rdev};
}
my $version_id = $self->{dbh}->selectrow_array($query, {}, @args);
my $t2b = gettimeofday();
$self->{times}{db_record_version_versions2_get_version_id} += $t2b - $t2a;
unless ($version_id) {
# We use a 32 bit int field in the database, so we have to clamp the mtime to that range.
# Need to fix this sometime before 2038 :-)
if ($f->{m} < -2147483648) {
$f->{m} = -2147483648;
} elsif ($f->{m} > 2147483647) {
$f->{m} = 2147483647;
}
$self->log(10, "insert into versions2(..., file_mtime=$f->{m}, ...)");
# XXX why is $f->{checksum} undef here for ./bin/dash?
$self->{dbh}->do("insert into versions2(
file_type, file_size, file_mtime,
file_owner, file_group, file_acl,
file_unix_bits,
file_rdev,
checksum, file_linktarget
)
values(
?, ?, ?,
?, ?, ?,
?,
?,
?, ?
)",
{},
$f->{t}, $f->{s}, $f->{m},
$f->{o}, $f->{g}, $f->{acl},
join(',', map {$f->{$_} ? ($_) : ()} qw(setuid setgid sticky)),
$f->{rdev},
$f->{checksum}, $f->{lt},
);
$version_id = $self->{dbh}->{mysql_insertid};
$self->log(10, "insert into versions2 -> $version_id");
}
my $t3 = gettimeofday();
$self->{times}{db_record_version_versions2} += $t3 - $t2;
$self->{times}{db_record_version_versions2_insert} += $t3 - $t2b;
push @{ $self->{caches}{insert_instances} },
[
$db_f->[0]{id},
$self->{record_file_id} ? $f->{id} : undef,
$self->{record_time} ? time() : undef,
1,
$self->{session_id}, $version_id
];
if (@{ $self->{caches}{insert_instances} } > 10) {
$self->flush_insert_instances();
}
my $t4 = gettimeofday();
$self->{times}{db_record_version_insert_instances} += $t4 - $t3;
$self->{times}{db_record_version} += $t4 - $t0;
}
sub get_checksum {
my ($self, $file) = @_;
if ($self->{caches}{file_checksums}{$self->{last_backup_id}}) {
return $self->{caches}{file_checksums}{$self->{last_backup_id}}{$file};
} else {
$self->{caches}{file_checksums}{$self->{last_backup_id}} = my $file_checksums = {};
my $sth = $self->{dbh}->prepare(
"select file, checksum from versions2 v, instances i
where session=? and v.id = i.version");
$sth->execute($self->{last_backup_id});
while (my ($file, $checksum) = $sth->fetchrow_array) {
$file_checksums->{$file} = $checksum;
}
return $file_checksums->{$file};
}
}
sub flush_insert_instances {
my ($self) = @_;
my $dbh = $self->{dbh};
if ($self->{caches}{insert_instances} && @{ $self->{caches}{insert_instances} }) {
my $cmd = "insert into instances(file, file_id, date, online, session, version)"
. " values "
. join(", ",
map {
"("
. join(",", map { $dbh->quote($_) } @$_)
. ")"
} @{ $self->{caches}{insert_instances} }
);
$dbh->do($cmd);
}
$dbh->commit();
$self->{caches}{insert_instances} = [];
}
sub new_session {
my ($self) = @_;
$self->{dbh}->do("insert into sessions(start_date, prefix) values(?, ?)", {}, time(), $self->{this_backup});
$self->{session_id} = $self->{dbh}->{'mysql_insertid'};
}
sub close_session {
my ($self) = @_;
$self->{dbh}->do("update sessions set end_date=? where id=?", {}, time(), $self->{session_id});
$self->{dbh}->commit();
$self->close_file_connection;
delete $self->{target};
}
sub close_file_connection {
my ($self) = @_;
if ($self->{file_pid}) {
close($self->{file_cfd});
close($self->{file_dfd});
$self->log(3, "waiting for $self->{file_pid}");
waitpid $self->{file_pid}, 0;
$self->log(3, "$self->{file_pid} terminated with status $?");
delete $self->{file_cfd};
delete $self->{file_dfd};
delete $self->{file_pid};
}
}
sub get_last_session {
my ($self) = @_;
my $sessions
= $self->{dbh}->selectall_arrayref(
"select * from sessions where end_date is not null and prefix like ? order by end_date desc",
{ Slice => {} },
$self->{basedir} . '/%/' . $self->{target}->{host} . '/' . $self->{target}->{dir}
);
$self->{last_backup} = $sessions->[0]{prefix};
$self->{last_backup_id} = $sessions->[0]{id};
$self->log(3, "last backup: $self->{last_backup} ($self->{last_backup_id})");
}
=head2 linkdup
Try to find a duplicate of the current file in the database and replace the
current file with a hardlink to it. This is useful if you
have multiple copies of a file stored in different locations.
The search starts from the newest session and continues into the past until
either linking successful or we run out of duplicates.
This is done because creating a hard link may not always be possible (duplicate is
on a different file system or has already reached the maximum link count)
and it is more likely that we can link to new copies than to old ones.
=cut
sub linkdup {
my ($self, $f, $backup_filename) = @_;
my $t0 = gettimeofday();
# XXX - this seems to be slow
# XXX - creates huge temp files. Restrict to last few sessions or at least sessions on the same device?
# XXX - that's not quite as simple: We only have the prefix, but there are many prefixes on the same
# device. We can create a list of them of them at first call, though and then pass the list
# to the query. Maybe even shorten the list. ($n newest sessions only)
# XXX - another possible optimization is to check the last few files we've written: .svn/prop-base
# normally contains a lot of identical files.
unless ($self->{sessions_on_same_device}) {
my $st = stat($backup_filename);
my $my_dev = defined $st ? $st->dev : ""; # can this happen?
my $sth = $self->{dbh}->prepare("select * from sessions order by id desc");
$sth->execute();
while (my $r = $sth->fetchrow_hashref()) {
my $st = lstat $r->{prefix};
my $dev = defined $st ? $st->dev : "";;
next unless $dev eq $my_dev;
last if $self->{sessions_on_same_device} && @{ $self->{sessions_on_same_device} } > 30;
push @{ $self->{sessions_on_same_device} }, $r->{id}; # We only use the id
}
my $self->{min_instance_id} = $self->{dbh}->selectrow_array("select min(id) from instances where session in " . join(", ", map("?", @{ $self->{sessions_on_same_device} })) . ")");
$self->log(3, "min_instance_id set to $self->{min_instance_id}");
}
my $tdb0 = gettimeofday();
my $tdb1;
my $sth = $self->{dbh}->prepare("select * from versions2, instances, files, sessions
where file_type=? and file_size=? and file_mtime=?
and file_owner=? and file_group=? and file_acl=?
and file_unix_bits=?
and checksum=? and online=1
and versions2.id=instances.version
and instances.id >= ?
and instances.file=files.id
and instances.session=sessions.id
and sessions.id in (" . join(", ", map("?", @{ $self->{sessions_on_same_device} })) . ")" .
" order by instances.session desc
");
$sth->execute(
$f->{t}, $f->{s}, $f->{m},
$f->{o}, $f->{g}, $f->{acl},
join(',', map {$f->{$_} ? ($_) : ()} qw(setuid setgid sticky)),
$f->{checksum},
$self->{min_instance_id},
@{ $self->{sessions_on_same_device} },
);
my $st = stat($backup_filename);
my $my_dev = defined $st ? $st->dev : "";
while (my $r = $sth->fetchrow_hashref()) {
unless ($tdb1) {
$tdb1 = gettimeofday();
$self->{times}{linkdup_db} += $tdb1 - $tdb0;
}
# check if old file is on same device. If it isn't, skip it.
# XXX - this should now be obsolete because we already selected only matching sessions above.
unless ($self->{prefix_device}{$r->{prefix}}) {
my $st = lstat $r->{prefix};
$self->{prefix_device}{$r->{prefix}}
= defined $st ? $st->dev : "";
}
next unless $self->{prefix_device}{$r->{prefix}} eq $my_dev;
my $oldfile = "$r->{prefix}/$r->{path}";
if (my $st = lstat($oldfile)) {
if ($st->mtime == $f->{m} &&
$st->size == $f->{s} &&
$st->uid == $self->name2uid($f->{o}) &&
$st->gid == $self->name2gid($f->{g}) &&
($st->mode & 07777) == $self->acl2mode($f)
) {
my ($dirname, $basename) = $backup_filename =~ m{(.*)/(.*)};
my $tmpname = "$basename.$$.simba_backup";
if (length($tmpname) > 255) {
$tmpname = substr($basename, 0, 235) . ".$$.simba_backup";
}
$tmpname = "$dirname/$tmpname";
rename($backup_filename, "$tmpname") or die "cannot save $backup_filename to $tmpname: $!";
if (link($oldfile, $backup_filename)) {
$self->log(10, "linked (dup)");
unlink("$tmpname") or die "cannot unlink $tmpname: $!";
$sth->finish();
my $t1 = gettimeofday();
$self->{counts}{dup2}++;
$self->{times}{linkdup} += $t1 - $t0;
return $oldfile;
} else {
$self->log(5, "cannot link $oldfile to $backup_filename");
rename("$tmpname", $backup_filename) or die "cannot restore $backup_filename from $tmpname: $!";
}
}
}
}
my $t1 = gettimeofday();
$self->{times}{linkdup} += $t1 - $t0;
unless ($tdb1) {
$tdb1 = gettimeofday();
$self->{times}{linkdup_db} += $tdb1 - $tdb0;
}
$self->{counts}{linkdup_miss}++;
return;
}
=head2 store_file
store a file in the local filesystem. If the file appears to be unchanged since
the last backup, try to create a hard link. Otherwise, get the contents of the
file from the DA, and search for a file with the same contents (i.e., checksum)
and metadata, but possibly different name and try to link to that. If no link
can be created to an existing file, create a new one.
=cut
sub store_file {
my ($self, $f) = @_;
my $success = 1;
if($self->present($f)) {
if (link("$self->{last_backup}/$f->{name}", "$self->{this_backup}/$f->{name}")) {
$self->log(10, "linked");
$self->{counts}{dup1}++;
return $success;
} else {
$self->log(5, "cannot link $self->{last_backup}/$f->{name} to $self->{this_backup}/$f->{name}: $!");
}
}
# If size is zero, check if we have seen a matching file before. If we have, link to it.
# Ubuntu contains a lot of zero sized files (about 8000 per installed kernel).
# Searching for them in the database is slow, so we special-case that here.
# We could generalize that, but I don't think that there will ever be enough identical
# non-empty files to make that worthwhile.
# XXX - not yet implemented.
if ($f->{s} == 0 && $f->{t} eq 'f') {
no warnings 'uninitialized'; # unix bits may not exist
my $k = "$f->{m} $f->{o} $f->{g} $f->{acl} $f->{setuid} $f->{setgid} $f->{sticky}";
if ($self->{null_files}{$k}) {
my $oldfile = $self->{null_files}{$k}{name};
my $backup_filename = "$self->{this_backup}/$f->{name}";
if (link($oldfile, $backup_filename)) {
$self->log(10, "linked (empty)");
$self->{counts}{dup10}++;
return $success;
}
}
}
# else request from da
unless ($self->{file_pid}) {
$self->{file_pid} = open2($self->{file_dfd}, $self->{file_cfd},
"/usr/bin/ssh",
"-l", "simba_da",
$self->{ssh_id_file} ? ("-i", $self->{ssh_id_file}) : (),
$self->{target}->{host}, "da");
}
$self->{file_cfd}->printflush("get $self->{target}->{dir}/$f->{name}\n"); # XXX - encode!
my $header = $self->{file_dfd}->getline; # this should be the same as $_ - check?
if ($header =~ /^data (.*)/) {
my $f2 = $self->parse($1);
my $backup_filename = "$self->{this_backup}/$f->{name}";
my $file_bfd;
unless (open($file_bfd, '>:raw', $backup_filename)) {
$self->log(5, "cannot open backup file $backup_filename: $!");
# XXX - There may be some errors from which we can recover, e.g., for
# "File name too long" we could just shorten the file name. But for
# now we just skip the file:
# XXX - some other errors are almost certainly fatal, e.g., ENOENT
# probably means that our backup device has been unmounted (BTDT).
$self->close_file_connection;
return 0;
}
my $size = $f2->{s};
my $err;
my $sha1 = Digest::SHA->new(1);
while ($size > 0) {
my $buffer;
my $rc = read($self->{file_dfd}, $buffer, min($size, $BUFSIZE));
if (!defined($rc)) {
# I/O error
$self->log(5, "error reading from data socket: $!");
last;
} elsif ($rc == 0) {
# premature EOF.
$self->log(5, "unexpected EOF reading from data socket");
last;
}
$file_bfd->print($buffer) or die "write to backup failed: $!";
$size -= length($buffer);
$sha1->add($buffer);
}
close($file_bfd) or die "write to backup failed: $!";
my $trailer = $self->{file_dfd}->getline; # should be empty line
$trailer = $self->{file_dfd}->getline;
if ($trailer =~ /^fail /) {
$self->log(5, $trailer);
$success = 0;
} elsif ($trailer =~ /^chk sha1 (\w+)/) {
my $checksum = $sha1->hexdigest;
if ($checksum ne $1) {
$self->log(5, "checksum error\n");
}
$f->{checksum} = $checksum;
} else {
$self->log(5, "unexpected trailer $trailer\n");
$self->close_file_connection;
$success = 0;
}
unless ($self->linkdup($f, $backup_filename)) {
$self->setmeta($f);
$self->log(10, "stored");
}
if ($f->{s} == 0 && $f->{t} eq 'f') {
no warnings 'uninitialized'; # unix bits may not exist
my $k = "$f->{m} $f->{o} $f->{g} $f->{acl} $f->{setuid} $f->{setgid} $f->{sticky}";
$self->{null_files}{$k}{name} = $backup_filename;
}
} else {
$self->log(5, "unexpected header $header\n");
$self->close_file_connection;
$success = 0;
}
return $success;
}
sub adjust_partitions {
my ($self) = @_;
my $dbh = $self->{dbh};
my $database = $dbh->selectrow_array("select database()");
my $max_id = $dbh->selectrow_array("select max(id) from instances");
my ($max_part, $limit)
= $dbh->selectrow_array(
"select partition_ordinal_position, partition_description
from information_schema.partitions
where table_schema='$database' and table_name='instances'
order by partition_ordinal_position desc
limit 1");
while ($max_id + $self->{instances_part_size} > $limit) {
$max_part++;
$limit += $self->{instances_part_size};
my $partition_name = sprintf("p%03d", $max_part);
$dbh->do("alter table instances add partition (partition $partition_name values less than ($limit))");
$self->log(3, "added partition $partition_name < $limit");
}
}
sub export {
my ($self) = @_;
my $dbh = $self->{dbh};
my $exportdir = "/var/lib/simba/export";
mkdir($exportdir);
my $sessions = $dbh->selectcol_arrayref("select id from sessions order by id");
for my $session_id (@$sessions) {
next if (-f "$exportdir/$session_id.tsv"
|| -f "$exportdir/$session_id.tsv.gz");
$self->log(3, "exporting session $session_id");
my $sth
= $dbh->prepare(
qq{select sessions.id as "sessions.id", start_date, end_date, prefix,
instances.id as "instances.id", file_id, date, online,
versions2.id as "versions2.id", file_type, file_size, file_mtime, file_owner, file_group, file_acl, file_unix_bits, file_rdev, checksum, file_linktarget,
files.id as "files.id", path,
filesets.id as "filesets.id", host, dir, options, pid
from sessions, instances, versions2, files, filesets
where sessions.id=$session_id
and sessions.id=instances.session
and instances.version=versions2.id
and instances.file=files.id
and files.fileset=filesets.id
});
$sth->execute;
open my $fh, '>', "$exportdir/$session_id.tsv.$$" or die "cannot create $exportdir/$session_id.tsv.$$: $!";
print $fh join("\t", @{$sth->{NAME}}), "\n";
while (my $r = $sth->fetchrow_arrayref) {
no warnings 'uninitialized';
print $fh join("\t", @$r), "\n" or die "cannot write to $exportdir/$session_id.tsv.$$: $!";
}
close($fh);
rename "$exportdir/$session_id.tsv.$$", "$exportdir/$session_id.tsv" or die "cannot rename $exportdir/$session_id.tsv.$$ to $exportdir/$session_id.tsv: $!";
system "/bin/gzip", "$exportdir/$session_id.tsv";
}
}
sub DESTROY {
my ($self) = @_;
$self->{dbh}->disconnect();
}
# vim: tw=0 expandtab
1;