/* $Id$ */ #include #include //node* clusterNodes; threadHandler::threadHandler(presetHandler pHandle, CLIHandler* cHandle, outputHandler* oHandle, vector nodes) { presetHandle = pHandle; cliHandle = cHandle; commonOutputHandler = oHandle; hostFlag = true; switchFlag = true; clusterNodes = new node[(int)nodes.size()]; } void processData(boost::barrier* serThreadBar, node * nodeData) { #ifndef WIN32 sigset_t mask; int rc; //sigfillset(&mask); // Mask all allowed signals sigemptyset (&mask); sigaddset (&mask, SIGINT); sigaddset (&mask, SIGABRT); rc = pthread_sigmask(SIG_BLOCK, &mask, NULL); #endif (*nodeData).processCommands(serThreadBar); } void processStatData(node * nodeData) { #ifndef WIN32 sigset_t mask; int rc; //sigfillset(&mask); // Mask all allowed signals sigemptyset (&mask); sigaddset (&mask, SIGINT); sigaddset (&mask, SIGABRT); rc = pthread_sigmask(SIG_BLOCK, &mask, NULL); #endif (*nodeData).processParallelStatsCommands(); } void processTimerData(int* maxTime) { boost::xtime currTime; //boost::xtime_get(&currTime, boost::TIME_UTC); boost::xtime_get(&currTime, boost::TIME_UTC_); currTime.sec += ((*maxTime)*60); boost::thread::sleep(currTime); raise(SIGABRT); } void processSwitchData(boost::barrier *switchThreadBar, targetSwitch *tswitch) { #ifndef WIN32 sigset_t mask; int rc; //sigfillset(&mask); // Mask all allowed signals sigemptyset (&mask); sigaddset (&mask, SIGINT); sigaddset (&mask, SIGABRT); rc = pthread_sigmask(SIG_BLOCK, &mask, NULL); #endif (*tswitch).processCommands(switchThreadBar); } // This piece of code is duplicated in node.cpp, parallelNode.cpp also...Need to check and remove this duplicacy bool threadHandler :: checkConnectivity(const string& ip, CLIHandler* cliHandle, string& testCmd) { testCmd = ""; vector filerInfo = cliHandle->getFilerCreds(ip); //string user = "root"; string user = cliHandle->getFilerLoginUser(); string passwd = ""; if(cliHandle->getSSHKeyProvidedFlag() == false) { if(cliHandle->getLegacyMode() == true) { #ifdef WIN32 if(cliHandle->getSSHFlag() == true) { if(filerInfo.size() != 2) { return false; } user = filerInfo[0]; passwd = filerInfo[1]; } #endif } else { if(filerInfo.size() != 2) { return false; } user = filerInfo[0]; passwd = filerInfo[1]; } } #ifdef WIN32 // If --ssh-privatekey option is not present if(cliHandle->getSSHKeyProvidedFlag() == false) { //if(cliHandle->getSSHFlag() == false) { testCmd = "rsh.exe " + ip; testCmd = testCmd + " -l " + user + " "; //} /* else { testCmd = "plink.exe -ssh -x -a -batch -pw "; testCmd = testCmd + passwd + " -l " + user; testCmd = testCmd + " " + ip; testCmd = testCmd + " "; } */ } else { if(cliHandle->getOPenSSHFlag() == true) { string strPrivKeyPath = cliHandle->getSSHPrivateKeyPath(); string strSSHKnownHostsFilePath = cliHandle->getSSHKnownHostsFilePath(); string temp = "known_hosts"; if(strSSHKnownHostsFilePath.empty()) { strSSHKnownHostsFilePath = strPrivKeyPath; strSSHKnownHostsFilePath.replace(strSSHKnownHostsFilePath.find_last_of("\\") + 1, temp.length(), temp); } strSSHKnownHostsFilePath.insert(0,"UserKnownHostsFile="); testCmd = "ssh.exe -l "; testCmd = testCmd + user + " -i "; testCmd = testCmd + "\"" + strPrivKeyPath + "\"" + " -o " + "\"" + strSSHKnownHostsFilePath + "\"" + " "; testCmd = testCmd + ip + " "; } else { //if(cliHandle->getSSHFlag() == false) { testCmd = "rsh.exe " + ip; testCmd = testCmd + " -l " + user + " "; //} /* else { string privKeyPath = cliHandle->getSSHPrivateKeyPath(); testCmd = "plink.exe -ssh -x -a "; testCmd = testCmd + ip + " -l " + user + " -i "; testCmd = testCmd + "\"" + privKeyPath + "\"" +" "; } */ } } #else if(cliHandle->getSSHKeyProvidedFlag() == false) { // If --ssh-privatekey is not provided run with RSH testCmd = "rsh " + ip; testCmd = testCmd + " -l " + user + " "; } else { if( cliHandle->getSSHFlag() == true) { string privKeyPath = cliHandle->getSSHPrivateKeyPath(); testCmd = "ssh -l "; testCmd = testCmd + user + " -i "; testCmd = testCmd + privKeyPath; testCmd.append(" "); testCmd.append(ip); testCmd.append(" "); } else { testCmd = "rsh " + ip; testCmd = testCmd + " -l " + user + " "; } } #endif string cmd = testCmd + "\"version\" >perfstat_connectivity_check 2>perfstat_connectivity_error"; #ifdef WIN32 _flushall(); #endif int result = std::system(cmd.c_str()); string out = commonOutputHandler->readFromFile("perfstat_connectivity_check"); bool bRes = true; if(!out.empty()) { if (out.find("NetApp") == string::npos && out.find("ONTAP") == string::npos){ bRes = false; } else { bRes = true; } } else { bRes = false; } remove("perfstat_connectivity_check"); remove("perfstat_connectivity_error"); return bRes; //return ((result == 0) ? true : false); } void threadHandler::startThreadHandler(vector nodes, vector names, vector spasswd, int maxTime, map legacyFLags) { int numOfNodes = (int)nodes.size(); boost::barrier serThreadBar(numOfNodes); boost::thread_group nodeThreadGrp; string strRSH = "RSH"; if(cliHandle->getSSHFlag() == true) { strRSH = "SSH"; } else { #ifdef WIN32 if(cliHandle->getOPenSSHFlag() == true) { strRSH = "OPENSSH"; } #endif } /* * Set the clusterFlag to true for all the clusters. As we process the * first node in the cluster we set the flag false. */ if (!(cliHandle->getFilerMode()).compare("10-mode")) { for(int i = 0; i < cliHandle->getClusterIP().size(); i++) { clusterFlag[cliHandle->getClusterIP()[i]] = true; } } // NODE Thread for(int nodeIndex = 0; nodeIndex < numOfNodes; ++nodeIndex) { // Added this piece of code to check whether the controller could carry on the RSH/SSH if(legacyFLags[nodes[nodeIndex]] == true) { commonOutputHandler->writeDataToLogFile("STATUS", nodes[nodeIndex], "Checking the connectivity : " + strRSH); string testResult = ""; bool bcheckConnectivity = checkConnectivity(nodes[nodeIndex], cliHandle, testResult); if(bcheckConnectivity == false) { testResult = testResult + "\"version\""; string errInfo = "Problem occured while connecting through " + strRSH; commonOutputHandler->writeDataToLogFile("STATUS", nodes[nodeIndex], errInfo); if(strRSH.compare("RSH") == 0) { commonOutputHandler->writeDataToLogFile("STATUS", nodes[nodeIndex], "please ensure that the /etc/hosts.equiv file in the node contains the localhost ip address and username."); #ifndef WIN32 commonOutputHandler->writeDataToLogFile("STATUS", nodes[nodeIndex], "Then try the following command \" rsh -l \"version\" \" to ensure you have rsh connectivity."); #else commonOutputHandler->writeDataToLogFile("STATUS", nodes[nodeIndex], "Then try the following command \" rsh.exe -l \"version\" \" to ensure you have rsh connectivity."); #endif } else if(strRSH.compare("SSH") == 0) { errInfo = "Then try the following command "; #ifdef WIN32 errInfo = errInfo + "ssh.exe -o BatchMode=yes -2 -ax -i -o -l \"version\""; #else errInfo = errInfo + " ssh -l -i \"version\" "; #endif errInfo = errInfo + " to ensure you have ssh connectivity."; commonOutputHandler->writeDataToLogFile("STATUS", nodes[nodeIndex], errInfo); } else if(strRSH.compare("OPENSSH") == 0) { commonOutputHandler->writeDataToLogFile("STATUS", nodes[nodeIndex], "Then try the following command \" ssh.exe -o BatchMode=yes -2 -ax -i -o -l \"version\" \" to ensure you have openssh connectivity."); } else { // FOR FUTURE REFERENCE } //testResult = "Problem while checking the connectivity. Please, try " + testResult; //commonOutputHandler->writeDataToLogFile("INFO", nodes[nodeIndex], testResult); continue; } } bool bLegacyFlag = legacyFLags[nodes[nodeIndex]]; // 10-mode system if(!(cliHandle->getFilerMode()).compare("10-mode")) { clusterNodes[nodeIndex] = node(presetHandle, cliHandle, commonOutputHandler, nodes[nodeIndex], names[nodeIndex], spasswd[nodeIndex], hostFlag, switchFlag, clusterFlag[cliHandle->findNodeCluster(nodes[nodeIndex])], bLegacyFlag); nodeThreadGrp.create_thread(boost::bind(processData, &serThreadBar, &clusterNodes[nodeIndex])); //We create parallel stat thread only for 10-mode nodeThreadGrp.create_thread(boost::bind(processStatData, &clusterNodes[nodeIndex])); //This is to make sure that we should be running the cluster commands once for all the Clusters clusterFlag[cliHandle->findNodeCluster(nodes[nodeIndex])] = false; } // 7-mode else { clusterNodes[nodeIndex] = node(presetHandle, cliHandle, commonOutputHandler, nodes[nodeIndex], names[nodeIndex], spasswd[nodeIndex], hostFlag, switchFlag, true, bLegacyFlag); nodeThreadGrp.create_thread(boost::bind(processData, &serThreadBar, &clusterNodes[nodeIndex])); } hostFlag = false; switchFlag = false; } if(maxTime != -1) { nodeThreadGrp.create_thread(boost::bind(processTimerData, &maxTime)); } int numOfSwitches = (cliHandle->getTargetSwitches()).size(); if(cliHandle->getSwitchFlag() == true && numOfSwitches > 0) { boost::barrier switchThreadBar(numOfSwitches); boost::thread_group switchThreadGrp; vector tmpvecSwitches = cliHandle->getTargetSwitches(); switches = new targetSwitch[numOfSwitches]; for(int switchIndex = 0; switchIndex < numOfSwitches; ++switchIndex) { switches[switchIndex] = targetSwitch(presetHandle, cliHandle, commonOutputHandler, tmpvecSwitches[switchIndex]); switchThreadGrp.create_thread(boost::bind(processSwitchData, &switchThreadBar, &switches[switchIndex])); } switchThreadGrp.join_all(); } nodeThreadGrp.join_all(); } void threadHandler::startTimerThread(int maxTime) { boost::thread timerThread(boost::bind(processTimerData, &maxTime)); timerThread.join(); } node* threadHandler :: getNodes() const { return clusterNodes; }