# # Copyright (c) 2011 NetApp, Inc., All Rights Reserved # Any use, modification, or distribution is prohibited # without prior written consent from NetApp, Inc. # ## @summary Multiple Aggregate Task Module ## @author bruce.blinn@netapp.com ## @status shared ## @pod here package NACL::MTask::AggregateMulti; use strict; use warnings; use base qw(NACL::STask::Aggregate); use Data::Dumper; use List::Util qw(min max); use NATE::BaseException qw(:try); use NATE::Exceptions::Argument; use Params::Validate qw(validate SCALAR SCALARREF ARRAYREF BOOLEAN); use Tharn qw( subtest logresult); use NATE::Log qw(log_global); my $Log = log_global(); my $may_enter = $Log->may_enter(); my $may_exit = $Log->may_exit(); =head1 NAME NACL::MTask::AggregateMulti =head1 DESCRIPTION C adds methods to C to make it easier to create or purge multiple aggregates in a single method call. This module will be used in tests and utility scripts needed to test very large configurations, such as those being tested in Panamax. =head1 METHODS =head2 create_multi my @aggr_names = NACL::MTask::AggregateMulti->create_multi( command_interface => $command_interface, aggregate => $aggregate, diskcount => $num_disks, nacltask_num_aggrs => $num_aggregates, nacltask_num_subtests => $num_subtests, nacltask_start_index => $start_index, nacltask_subtest_delay => $num_seconds, nacltask_subtests_per_second => $num_subtests, %other_options ); (Class method) Create multiple aggregates in a single method call. The standard aggregate attributes can be passed to this method, in which case, they will be passed through to the create() method when each aggregate is created. However, the job_component and nacltask_wait attributes are used by this method, and therefore, cannot be used by the caller. If this method is called in array context, it will return an array containing the names of the aggregates that were created (or started to be created, if there was an error); it cannot return the aggregate objects because they are created in a separate process. If this method is called in a scalar context, it will return the number of aggregates created. =over =item Options =over =item C<< command_interface => $command_interface >> (Required) As C. A component object that represents the host to which to send commands. =item C<< aggregate => $aggregate >> (Required) As C, except if the number of aggregates to create is more than one, it will be appended with an underscore and a 5-digit number to form the name of each aggregate. For example, if the aggregate parameter is "my_aggr", the name of the first aggregate will be "my_aggr_00000" (see also, the nacltask_start_index option). =item C<< diskcount => $num_disks >> (Optional) As C, except if the number of disks is two, the the RAID type will be set to "raid4" because "raid_dp" is not allowed with two disks. This behavior is used only if the "nacltask_min_size" and "raidtype" parameters are not explicitly passed by the caller. =item C<< nacltask_num_aggrs => $num_aggregates >> (Optional) This is the number of aggregates to create. The default is to create one aggregate. =item C<< nacltask_num_subtests => $num_subtests >> (Optional) This is the number of parallel processes, or subtests, that will be used to create the aggregates. The number of subtests that can be used is restricted by the maximum number of RSH or SSH connections that can be made. The default value for this option is 50 subtests in C-mode and 15 subtests in 7-mode. =item C<< nacltask_start_index => $start_index >> (Optional) This is the starting value for the aggregate number that will be appended to the name of the each aggregate. The default value is 0. =item C<< nacltask_subtest_delay => $num_seconds >> (Optional) By default, filers are configured to allow a maximum of 10 new SSH connections per second. A separate subtest is used to create each volume, and each subtest starts another SSH connection. This parameter is the number of seconds to wait between every "nacltask_subtests_per_second" subtests that are started. The default is two seconds. Note: A value a one second does not always work because even though the subtests are not started at more than 10 per second, it is possible for more that 10 requests to arrive at the filer in less than one second. Setting this value to two seconds seems to clear up that intermittent failure. This failure shows up in the /mroot/etc/log/mlog/messages.log file as the following error from the xinetd daemon: "Deactivating service ssh due to excessive incoming connections." =item C<< nacltask_subtests_per_second => $num_subtests >> (Optional) This parameter is the number of subtests to start before waiting "nacltask_subtest_delay" seconds. The default is 10 subtests. See also the nacltask_subtest_delay parameter. =item C<< nacltask_if_error => $continue_or_die >> (Optional) This parameter is to either fail or to continue when an aggregate create fails. Sometimes during stress testing we might be exactly bothered about the number of aggregates we create and we might need only multiple aggregates to run our tests. During the course of creation a few aggr creates might fail because of different disk types etc, and here we dont want to fail the script. So here we can pass the value "continue" to continue without any errors and in this case the return value will be the array of aggregates successfully created. The default value is to "die". =back =back =head2 purge_multi my $num_purged = NACL::MTask::AggregateMulti->purge_multi( command_interface => $command_interface, aggregate => $aggr_names, nacltask_num_subtests => $num_subtests, nacltask_if_error => $action, nacltask_subtest_delay => $num_seconds, nacltask_subtests_per_second => $num_subtests, nacltask_if_error => $continue_or_die, %other_options ); (Class method) This method will purge multiple aggregates in a single method call. The standard aggregate attributes can be passed to this method, in which case, they will be passed through to the purge() method when each aggregate is purged. =over =item Options =over =item C<< command_interface => $command_interface >> (Required) As C. A component object that represents the host to which to send commands. =item C<< aggregate => $aggr_names >> (Required) This is either an aggregate name or a reference to an array of aggregate names, which are the names of the aggregates to purge. The aggregate name(s) may contain wildcard characters to allow the name to match more than one aggregate. The wildcard characters are those defined by the filer's command line interpreter. For example, if you pass: aggregate => ['eng_aggrs*', 'mkt_aggrs*'] This method will purge all the aggregates with names that begin with the string "eng_aggrs" or "mkt_aggrs". =item C<< nacltask_num_subtests => $num_subtests >> (Optional) This is the number of parallel processes, or subtests, that will be used to purge the aggregates. The number of subtests that can be used is restricted by the maximum number of RSH or SSH connections that can be made. The default value for this option is 50 subtests in C-mode and 15 subtests in 7-mode. =item C<< nacltask_if_error => $action >> (Optional) This parameter specifies the action to take if there is an error trying to remove one of the aggregates. The possible values are "die" or "continue". These strings are not case sensitive. The default is "die". =item C<< nacltask_subtest_delay => $num_seconds >> (Optional) By default, filers are configured to allow a maximum of 10 new SSH connections per second. A separate subtest is used to create each volume, and each subtest starts another SSH connection. This parameter is the number of seconds to wait between every "nacltask_subtests_per_second" subtests that are started. The default is two seconds. Note: A value a one second does not always work because even though the subtests are not started at more than 10 per second, it is possible for more that 10 requests to arrive at the filer in less than one second. Setting this value to two seconds seems to clear up that intermittent failure. This failure shows up in the /mroot/etc/log/mlog/messages.log file as the following error from the xinetd daemon: "Deactivating service ssh due to excessive incoming connections." =item C<< nacltask_subtests_per_second => $num_subtests >> (Optional) This parameter is the number of subtests to start before waiting "nacltask_subtest_delay" seconds. The default is 10 subtests. See also the nacltask_subtest_delay parameter. =item C<< nacltask_wait => 0|1 >> (Optional) If 0 (the default), do not wait for aggregates to come online. If 1, wait for aggregates to come online. =back =back =cut sub create_multi { $Log->enter() if $may_enter; my $pkg = shift; my %opts = $pkg->_common_validate_with( params => \@_, additional_spec => { command_interface => { type => SCALAR, optional => 0 }, aggregate => { type => SCALAR, optional => 0 }, nacltask_num_aggrs => { type => SCALAR, default => 1 }, nacltask_num_subtests => { type => SCALAR, default => 0 }, nacltask_start_index => { type => SCALAR, default => 0 }, nacltask_subtest_delay => { type => SCALAR, default => 2 }, nacltask_subtests_per_second => { type => SCALAR, default => 10 }, nacltask_wait => { type => SCALAR, default => 0 }, nacltask_if_error => { type => SCALAR, default => 'die' }, }, allow_extra => 1, ); my $num_aggrs = delete $opts{nacltask_num_aggrs}; my $num_subtests = delete $opts{nacltask_num_subtests}; my $start_index = delete $opts{nacltask_start_index}; my $subtest_delay = delete $opts{nacltask_subtest_delay}; my $subtests_per_second = delete $opts{nacltask_subtests_per_second}; my $aggregate = delete $opts{aggregate}; my $ci = $opts{command_interface}; my @aggr_names; $Log->trace("num_aggrs = $num_aggrs"); $Log->trace("num_subtests = $num_subtests"); $Log->trace("start_index = $start_index"); $Log->trace("subtest_delay = $subtest_delay"); $Log->trace("subtests_per_second = $subtests_per_second"); $Log->trace("aggregate = $aggregate"); $Log->trace("if_error = $opts{nacltask_if_error}"); # The following option are used by this subroutine, so make sure they are # not also used by the caller. if (defined($opts{job_component})) { $Log->exit() if $may_exit; NATE::Exceptions::Argument->throw( "The \"job_component\" cannot be used with this method."); } if ($num_subtests <= 0) { if ($ci->mode() eq 'CMode') { $num_subtests = 50; } else { $num_subtests = 15; } } # When creating a 3-disk aggregate, only raid4 can be used (not raid_dp). # Change the RAID type if necessary rather than letting it fail, unless the # caller explicitly passed nacltask_min_size or raidtype. if ($opts{diskcount} == 3) { if (!defined($opts{nacltask_min_size}) && !defined($opts{raidtype})) { $opts{raidtype} = 'raid4'; } } my $aggr_id = $start_index; my $last_aggr_id = $num_aggrs + $start_index - 1; while ($aggr_id <= $last_aggr_id) { # # Start a batch of subtests. # my $num_left = $last_aggr_id - $aggr_id + 1; my $batch_size = min($num_left, $num_subtests); my @subtests = (); foreach (1..$batch_size) { my $aggr_name = $aggregate; if ($num_aggrs > 1) { $aggr_name .= '_' . sprintf('%05d', $aggr_id); } if (($aggr_id++ % $subtests_per_second) == 0) { # By default, filers are configured to allow a maximum of 10 # ssh connections per second. This prevents intermittent # connection failures on fast filers. sleep $subtest_delay; } push(@aggr_names, $aggr_name); srand; # Give each subtest a different seed. $Log->trace("Creating $aggr_name"); my $subtest = subtest(\&_create_aggr, -bg, -runid => "cr_$aggr_name", '--', $aggr_name, %opts, ); push(@subtests, $subtest); } # # Wait for the current batch of subtests to finish. # $Log->trace('Waiting for subtests to finish.'); try { Subtest::wait_finish(subtests => \@subtests); } otherwise { my $e = shift; $Log->exit() if $may_exit; if ($e->isa("NATE::Result::Fatal")) { NATE::BaseException->throw("@{$e->{messages}}"); } else { $e->throw(); } }; } # # If an aggregate create fails the aggregate name must not be returned # if ( $opts{nacltask_if_error} eq 'continue' ) { my @return_aggrs = @aggr_names; foreach my $aggr_name ( @aggr_names ) { try { my $aggr = NACL::STask::Aggregate->find( command_interface => $opts{command_interface}, filter => { aggregate => $aggr_name }, ); } otherwise { @return_aggrs = grep { $_ ne $aggr_name } @return_aggrs; }; } @aggr_names = @return_aggrs; } $Log->exit() if $may_exit; if (wantarray()) { # Aggregates are created in a separate process, so you cannot return # the aggregate object to the parent process. return @aggr_names; } return scalar(@aggr_names); } sub purge_multi { $Log->enter() if $may_enter; my $pkg = shift; my %opts = $pkg->_common_validate_with( params => \@_, additional_spec => { command_interface => { type => SCALAR, optional => 0 }, aggregate => { type => ARRAYREF | SCALAR, optional => 0 }, nacltask_num_subtests => { type => SCALAR, default => 0 }, nacltask_if_error => { type => SCALAR, default => 'die' }, nacltask_subtest_delay => { type => SCALAR, default => 2 }, nacltask_subtests_per_second => { type => SCALAR, default => 10 }, }, allow_extra => 1, ); my $ci = delete $opts{command_interface}; my $aggregate = delete $opts{aggregate}; my $num_subtests = delete $opts{nacltask_num_subtests}; my $if_error = delete $opts{nacltask_if_error}; my $subtest_delay = delete $opts{nacltask_subtest_delay}; my $subtests_per_second = delete $opts{nacltask_subtests_per_second}; $Log->trace("aggregate = $aggregate"); $Log->trace("num_subtests = $num_subtests"); $Log->trace("subtest_delay = $subtest_delay"); $Log->trace("subtests_per_second = $subtests_per_second"); $Log->trace("if_error = $if_error"); if ($num_subtests <= 0) { if ($ci->mode() eq 'CMode') { $num_subtests = 50; } else { $num_subtests = 15; } } my @aggrs = _find_aggregates($ci, $aggregate); $Log->trace("Found ". scalar(@aggrs) . " aggregates."); my $aggr_id = 0; my $purged = 0; while (scalar(@aggrs)) { # # Start a batch of subtests. # my $num_aggrs = scalar(@aggrs); my $batch_size = min($num_aggrs, $num_subtests); my @subtests = (); foreach (1..$batch_size) { my $aggr = shift(@aggrs); my $aggr_name = $aggr->aggregate(); if (($aggr_id++ % $subtests_per_second) == 0) { # By default, filers are configured to allow a maximum of 10 # ssh connections per second. This prevents intermittent # connection failures on fast filers. sleep $subtest_delay; } $Log->trace("Removing aggregate $aggr_name"); $purged++; my $subtest = subtest(\&_purge_aggr, -bg, -runid => "rm_$aggr_name", '--', $aggr, %opts); my $subtest_info = {}; $subtest_info->{subtest} = $subtest; $subtest_info->{aggr_name} = $aggr_name; push(@subtests, $subtest_info); } # # Wait for the current batch of subtests to finish. # $Log->trace('Waiting for subtests to finish.'); foreach my $subtest_info (@subtests) { my $subtest = $subtest_info->{subtest}; my $aggr_name = $subtest_info->{aggr_name}; try { $Log->trace("Waiting for aggregate $aggr_name to be removed."); $subtest->wait_finish(); } otherwise { if ($if_error =~ /continue/i) { $Log->warn("Purge aggregate $aggr_name failed; continuing."); $purged--; } else { my $e = shift; $Log->exit() if $may_exit; if ($e->isa("NATE::Result::Fatal")) { NATE::BaseException->throw("@{$e->{messages}}"); } else { $e->throw(); } } }; } } $Log->exit() if $may_exit; return $purged; } ######################################################################## # P R I V A T E M E T H O D S ######################################################################## sub _create_aggr { $Log->enter() if $may_enter; my ($aggr_name, %opts) = @_; my $ci = $opts{command_interface}; my $wait = $opts{nacltask_wait}; my $if_error = delete $opts{nacltask_if_error}; my $job; try { my $aggr = NACL::STask::Aggregate->create( aggregate => $aggr_name, nacltask_wait => $wait, job_component => \$job, %opts, ); } catch NATE::BaseException with { my $e = shift; if ($if_error =~ /continue/i) { $Log->warn("Create aggregate $aggr_name failed; continuing."); } else { logresult( type => 'FAIL', msg => "Creating aggregate $aggr_name failed: " . $e->text(), ); } }; $Log->exit() if $may_exit; return 0; } sub _find_aggregates { my ($ci, $aggr_name) = @_; # The aggr_name can be a string or a reference to an array of strings. my @aggr_names = ($aggr_name); if (ref $aggr_name eq 'ARRAY') { @aggr_names = @$aggr_name; } my @aggrs; foreach my $name (@aggr_names) { try { my @found = NACL::STask::Aggregate->find( command_interface => $ci, filter => { aggregate => $name }, ); push(@aggrs, @found); } catch NACL::Exceptions::NoElementsFound with { }; } return @aggrs; } sub _purge_aggr { $Log->enter() if $may_enter; my ($aggr, %opts) = @_; $Log->trace('Removing aggregate ' . $aggr->aggregate()); $aggr->purge(%opts); $Log->exit() if $may_exit; } 1;