#!/usr/bin/python2 import time import re import os import sys from datetime import datetime from .. import scanner_logger, mem_logger, stresser_logger from ..common_fn import get_current_time, get_scan_summary_no_shift,\ exec_cmd_and_set_object_by_key, exec_cmd_and_get_list,\ exe_shell_cmd_and_check_output, exe_shell_cmd_and_get_process,\ exe_shell_cmd_and_check_call from .. import data_manager class mem(object): def __init__(self): self.is_scan = False self.scan_result = None self.test_result = None def start(self): pass def stop(self): pass def parse_dmi(self, content): """ Parse the whole dmidecode output. Returns a list of tuples of (type int, value dict). """ info = [] lines = iter(content.strip().splitlines()) while True: try: line = next(lines) except StopIteration: break if line.startswith('Handle 0x'): typ = int(line.split(',', 2)[1].strip()[len('DMI type'):]) if typ is 17: info.append(self._parse_handle_section(lines)) return info def _parse_handle_section(self, lines): """ Parse a section of dmidecode output * 1st line contains address, type and size * 2nd line is title * line started with one tab is one option and its value * line started with two tabs is a member of list """ data = { '_title': next(lines).rstrip(), } for line in lines: line = line.rstrip() if line.startswith('\t\t'): data[k].append(line.lstrip()) elif line.startswith('\t'): k, v = [i.strip() for i in line.lstrip().split(':', 1)] if v: data[k] = v else: data[k] = [] else: break return data def locator2slot(self, locator): for dimm in self.dev_config: if locator in dimm['id']: return dimm['slot'] else: continue # ============================= Scan function =========================== # def scan(self, dev_config): data_manager.mem_obj_list = [] dev_list = [] self.dev_config = dev_config self.scan_result = False cmd = 'dmidecode -t memory' resp, result = exe_shell_cmd_and_check_output(cmd) ''' Maximum Voltage 1.2 V Error Information Handle Not Provided Type DDR4 Array Handle 0x004C Serial Number 174BE0DF Total Width 72 bits Form Factor DIMM Manufacturer Micron Data Width 64 bits Configured Clock Speed 2666 MHz Asset Tag CPU2_DIMM_M1_AssetTag Bank Locator NODE 4 Minimum Voltage 1.2 V _title Memory Device Configured Voltage 1.2 V Part Number 18ASF2G72XF12G6V21AB Set None Rank 1 Locator CPU2_DIMM_M1 Type Detail Synchronous Speed 2666 MHz Size 16384 MB ''' if resp: dmi_mem_obj_list = self.parse_dmi(result) for obj in dmi_mem_obj_list: # convert DIMM size from MB to GB MB_size = re.findall('\d+(?= MB)', obj['Size']) if MB_size: obj['Size'] = '{} GB'.format(int(MB_size[0])/1024) # mapping Locator to slot if self.locator2slot(obj['Locator']): obj['slot'] = self.locator2slot(obj['Locator']) obj['name'] = 'dimm{}'.format(self.locator2slot(obj['Locator'])) data_manager.mem_obj_list.append(obj) else: obj['slot'] = None obj['name'] = None # parse 'No Module Installed' if obj['Size'] in 'No Module Installed': obj['Size'] = 'N/A' obj['Part Number'] = 'N/A' obj['Serial Number'] = 'N/A' obj['Speed'] = 'N/A' obj['Rank'] = 'N/A' obj['Manufacturer'] = 'N/A' data_manager.mem_obj_list = sorted(data_manager.mem_obj_list, key=lambda k: int(k['slot'])) self.show_dev_info(data_manager.mem_obj_list) self.is_scan = True for dimm in data_manager.mem_obj_list: dev_list.append(dimm['Locator']) return dev_list def show_dev_info(self, dev_list): title = ["DIMM", "Size", "Speed", "Rank", "Manufacturer", "PN", "SN"] scanner_logger.info("=======================================================") scanner_logger.info("Memory:") self.print_info(title) for dev in dev_list: self.print_info( [dev['name'], dev['Size'], dev['Speed'], dev['Rank'], dev['Manufacturer'], dev['Part Number'], dev['Serial Number']]) def scan_summary(self): if self.is_scan is False: scanner_logger.info("Hasn't scan MEMORY yet") return available_dev_cnt = len([dev for dev in data_manager.mem_obj_list if dev['Part Number'] != 'N/A']) scanner_logger.info('') scanner_logger.info(get_scan_summary_no_shift( "Memory", len(self.dev_config), available_dev_cnt)) title = ["Slot", "PN", "SN"] self.print_summary_info(title) for dev in data_manager.mem_obj_list: self.print_summary_info( [dev['name'], dev['Part Number'], dev['Serial Number']]) self.scan_result = bool(len(self.dev_config) == available_dev_cnt) def print_info(self, info_list): if len(info_list) < 7: return scanner_logger.info( "{0:<7} {1:<9} {2:<9} {3:<5} {4:<19} {5:<22} {6:<9}".format( info_list[0], info_list[1], info_list[2], info_list[3], info_list[4], info_list[5], info_list[6])) def print_summary_info(self, info_list): if len(info_list) < 3: return scanner_logger.info("{0:<7} {1:<22} {2:<9}".format( info_list[0], info_list[1], info_list[2])) # ======================= Test function ================================= # def get_mem_obj_by_name(self, name): for mem in data_manager.mem_obj_list: if mem['name'] == name: return mem return None def mem_test_precheck(self): if self.scan_result is None: mem_logger.info("Please run system scan first") return False if self.scan_result is False: mem_logger.info( 'PLEASE fix issues reported by the System Scan first!') return False return True def mem_clear_ECC(self): mem_logger.info('Clear ECC:') cmd = 'diag -d mem -p dimm -t resetecc' if not exe_shell_cmd_and_check_call(cmd): mem_logger.error('Clear ECC Failed') return False cmd = 'dmesg | grep "HANDLING MCE MEMORY ERROR" -c' mem_logger.debug(cmd) ps = exe_shell_cmd_and_get_process(cmd) (stdoutdata, stderrdata) = ps.communicate() if int(stdoutdata) > 0: cmd = 'dmesg -c > /dev/null' if not exe_shell_cmd_and_check_call(cmd): mem_logger.error('Clear ECC Failed') return False return True def test(self, test_item, mem_range_list, pattern=0): self.test_result = 'FAILED' if self.mem_test_precheck() is False: return False test_option = '{} {}'.format(mem_range_list[0], mem_range_list[1]) mem_logger.info( '============================================================\n' 'Memory {} Test:\n' '============================================================\n'. format(test_item)) if self.mem_clear_ECC() is False: return False if 'Pattern' in test_item: mem_logger.info( '------------------------------------------------------------\n' 'Run {} Test for {} ~ {} with pattern {}:'.format( test_item, mem_range_list[0], mem_range_list[1], pattern)) test_cmd = 'diag -d mem -p dimm -t {} -b {} {}'.format( test_item.lower(), test_option, pattern) elif 'Random' in test_item: mem_logger.info( '------------------------------------------------------------\n' 'Run {} Test for {} ~ {} with pseudo random pattern:'.format( test_item, mem_range_list[0], mem_range_list[1])) test_cmd = 'diag -d mem -p dimm -t prpattern -b {}'.format( test_option) else: return False start_time = datetime.now() mem_logger.info( "\nMemory Test Start: {}\n".format(get_current_time())) mem_logger.info('Test Cmd: {}'.format(test_cmd)) proc = exe_shell_cmd_and_get_process(test_cmd) output = '' while True: char = proc.stdout.read(1) if char == '' and proc.poll() != None: break if char != '': output = output + char sys.stdout.write(char) sys.stdout.flush() if 'Error' in output: mem_logger.info(output) mem_logger.error("Memory ({}-{}) Test Failed".format( mem_range_list[0], mem_range_list[1])) return False stop_time = datetime.now() elapsed_time = stop_time - start_time mem_logger.info("\nMemory Test Stop: {}\n".format(get_current_time())) mem_logger.info('------------------------------------------------\n') mem_logger.info("Elapsed Time: {}\n".format(elapsed_time)) mem_logger.info('------------------------------------------------\n') time.sleep(1) if self.check_ERR_and_show_failed_DIMM() is True: self.test_result = 'PASSED' mem_logger.info(self.test_result) return True def check_ERR_and_show_failed_DIMM(self): mem_logger.info('Check ECC:') check_ecc_cmd = 'diag -d mem -p dimm -t geteccdata' mem_logger.debug(check_ecc_cmd) resp, output = exe_shell_cmd_and_check_output(check_ecc_cmd) if 'Number of ECC errors: 0x0' in output: return True failed_dimm_list = re.findall('(?<=bad DIMM ID:)\s+\d+', output) scanner_logger.info("{0:<16} {1:<20} {2:<16}".format( "Failed DIMM", "Part Number", "Serial Number")) for dimm in failed_dimm_list: mem_obj = self.get_mem_obj_by_name('dimm{}'.format(dimm.strip())) if mem_obj is None: scanner_logger.info("{0:<16} {1:<20} {2:<16}".format( dimm.strip(), 'N/A', 'N/A')) else: scanner_logger.info("{0:<16} {1:<20} {2:<16}".format( dimm.strip(), mem_obj['Part Number'], mem_obj['Serial Number'])) scanner_logger.info( "****************** ECC Detailed Information ****************") check_cmd = 'dmesg -T | grep "EDAC MC" | grep syndrome' mem_logger.debug(check_cmd) resp, output = exe_shell_cmd_and_check_output(check_cmd) if resp is False: scanner_logger.error("dmesg command failed") return False scanner_logger.info(output) scanner_logger.info( "************************************************************") # ======================= Stress function =============================== # def stress(self, dev_queue, stop_on_error, test_secs, test_dev_list=None): if os.path.exists('/tmp/memstress_running'): os.remove('/tmp/memstress_running') stop_on_error = (1 if stop_on_error is True else 0) test_cmd = 'memstress_mfg.sh 1&' print test_cmd resp, output = exe_shell_cmd_and_check_output(test_cmd) if resp is False: return False check_err_files = ['/tmp/memStressStopOnError.log'] for log_file in check_err_files: if os.path.exists(log_file): with open(log_file, 'r') as f: output = f.read() stresser_logger.info('----------------- [{}] ----------------'.format(log_file)) stresser_logger.info('{}'.format(output)) stresser_logger.info('') return True