| # $Id: mod_sipp.py 4188 2012-06-29 09:01:17Z nanang $ |
| |
| ## Automatic test module for SIPp. |
| ## |
| ## This module will need a test driver for each SIPp scenario: |
| ## - For simple scenario, i.e: make/receive call (including auth), this |
| ## test module can auto-generate a default test driver, i.e: make call |
| ## or apply auto answer. Just name the SIPp scenario using "uas" or |
| ## "uac" prefix accordingly. |
| ## - Custom test driver can be defined in a python script file containing |
| ## a list of the PJSUA instances and another list for PJSUA expects/ |
| ## commands. The custom test driver file must use the same filename as |
| ## the SIPp XML scenario. See samples of SIPp scenario + its driver |
| ## in tests/pjsua/scripts-sipp/ folder for detail. |
| ## |
| ## Here are defined macros that can be used in the custom driver: |
| ## - $SIPP_PORT : SIPp binding port |
| ## - $SIPP_URI : SIPp SIP URI |
| ## - $PJSUA_PORT[N] : binding port of PJSUA instance #N |
| ## - $PJSUA_URI[N] : SIP URI of PJSUA instance #N |
| |
| import ctypes |
| import time |
| import imp |
| import sys |
| import os |
| import re |
| import subprocess |
| from inc_cfg import * |
| import inc_const |
| |
| # flags that test is running in Unix |
| G_INUNIX = False |
| if sys.platform.lower().find("win32")!=-1 or sys.platform.lower().find("microsoft")!=-1: |
| G_INUNIX = False |
| else: |
| G_INUNIX = True |
| |
| # /dev/null handle, for redirecting output when SIPP is not in background mode |
| FDEVNULL = None |
| |
| # SIPp executable path and param |
| #SIPP_PATH = '"C:\\Program Files (x86)\\Sipp_3.2\\sipp.exe"' |
| SIPP_PATH = 'sipp' |
| SIPP_PORT = 6000 |
| SIPP_PARAM = "-m 1 -i 127.0.0.1 -p " + str(SIPP_PORT) |
| SIPP_TIMEOUT = 60 |
| # On BG mode, SIPp doesn't require special terminal |
| # On non-BG mode, on win, it needs env var: "TERMINFO=c:\cygwin\usr\share\terminfo" |
| # TODO: on unix with BG mode, waitpid() always fails, need to be fixed |
| SIPP_BG_MODE = False |
| #SIPP_BG_MODE = not G_INUNIX |
| |
| # Will be updated based on the test driver file (a .py file whose the same name as SIPp XML file) |
| PJSUA_INST_PARAM = [] |
| PJSUA_EXPECTS = [] |
| |
| # Default PJSUA param if test driver is not available: |
| # - no-tcp as SIPp is on UDP only |
| # - id, username, and realm: to allow PJSUA sending re-INVITE with auth after receiving 401/407 response |
| PJSUA_DEF_PARAM = "--null-audio --max-calls=1 --no-tcp --id=sip:a@localhost --username=a --realm=*" |
| |
| # Get SIPp scenario (XML file) |
| SIPP_SCEN_XML = "" |
| if ARGS[1].endswith('.xml'): |
| SIPP_SCEN_XML = ARGS[1] |
| else: |
| exit(-99) |
| |
| |
| # Functions for resolving macros in the test driver |
| def resolve_pjsua_port(mo): |
| return str(PJSUA_INST_PARAM[int(mo.group(1))].sip_port) |
| |
| def resolve_pjsua_uri(mo): |
| return PJSUA_INST_PARAM[int(mo.group(1))].uri[1:-1] |
| |
| def resolve_driver_macros(st): |
| st = re.sub("\$SIPP_PORT", str(SIPP_PORT), st) |
| st = re.sub("\$SIPP_URI", "sip:sipp@127.0.0.1:"+str(SIPP_PORT), st) |
| st = re.sub("\$PJSUA_PORT\[(\d+)\]", resolve_pjsua_port, st) |
| st = re.sub("\$PJSUA_URI\[(\d+)\]", resolve_pjsua_uri, st) |
| return st |
| |
| |
| # Init test driver |
| if os.access(SIPP_SCEN_XML[:-4]+".py", os.R_OK): |
| # Load test driver file (the corresponding .py file), if any |
| cfg_file = imp.load_source("cfg_file", SIPP_SCEN_XML[:-4]+".py") |
| for ua_idx, ua_param in enumerate(cfg_file.PJSUA): |
| ua_param = resolve_driver_macros(ua_param) |
| PJSUA_INST_PARAM.append(InstanceParam("pjsua"+str(ua_idx), ua_param)) |
| PJSUA_EXPECTS = cfg_file.PJSUA_EXPECTS |
| else: |
| # Generate default test driver |
| if os.path.basename(SIPP_SCEN_XML)[0:3] == "uas": |
| # auto make call when SIPp is as UAS |
| ua_param = PJSUA_DEF_PARAM + " sip:127.0.0.1:" + str(SIPP_PORT) |
| else: |
| # auto answer when SIPp is as UAC |
| ua_param = PJSUA_DEF_PARAM + " --auto-answer=200" |
| PJSUA_INST_PARAM.append(InstanceParam("pjsua", ua_param)) |
| |
| |
| # Start SIPp process, returning PID |
| def start_sipp(): |
| global SIPP_BG_MODE |
| sipp_proc = None |
| |
| sipp_param = SIPP_PARAM + " -sf " + SIPP_SCEN_XML |
| if SIPP_BG_MODE: |
| sipp_param = sipp_param + " -bg" |
| if SIPP_TIMEOUT: |
| sipp_param = sipp_param + " -timeout "+str(SIPP_TIMEOUT)+"s -timeout_error" + " -deadcall_wait "+str(SIPP_TIMEOUT)+"s" |
| |
| # add target param |
| sipp_param = sipp_param + " 127.0.0.1:" + str(PJSUA_INST_PARAM[0].sip_port) |
| |
| # run SIPp |
| fullcmd = os.path.normpath(SIPP_PATH) + " " + sipp_param |
| print "Running SIPP: " + fullcmd |
| if SIPP_BG_MODE: |
| sipp_proc = subprocess.Popen(fullcmd, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE, shell=G_INUNIX, universal_newlines=False) |
| else: |
| # redirect output to NULL |
| global FDEVNULL |
| #FDEVNULL = open(os.devnull, 'w') |
| FDEVNULL = open("logs/sipp_output.tmp", 'w') |
| sipp_proc = subprocess.Popen(fullcmd, shell=G_INUNIX, stdout=FDEVNULL, stderr=FDEVNULL) |
| |
| if not SIPP_BG_MODE: |
| if sipp_proc == None or sipp_proc.poll(): |
| return None |
| return sipp_proc |
| |
| else: |
| # get SIPp child process PID |
| pid = 0 |
| r = re.compile("PID=\[(\d+)\]", re.I) |
| |
| while True: |
| line = sipp_proc.stdout.readline() |
| pid_r = r.search(line) |
| if pid_r: |
| pid = int(pid_r.group(1)) |
| break |
| if not sipp_proc.poll(): |
| break |
| |
| if pid != 0: |
| # Win specific: get process handle from PID, as on win32, os.waitpid() takes process handle instead of pid |
| if (sys.platform == "win32"): |
| SYNCHRONIZE = 0x00100000 |
| PROCESS_QUERY_INFORMATION = 0x0400 |
| hnd = ctypes.windll.kernel32.OpenProcess(SYNCHRONIZE | PROCESS_QUERY_INFORMATION, False, pid) |
| pid = hnd |
| |
| return pid |
| |
| |
| # Wait SIPp process to exit, returning SIPp exit code |
| def wait_sipp(sipp): |
| if not SIPP_BG_MODE: |
| global FDEVNULL |
| sipp.wait() |
| FDEVNULL.close() |
| return sipp.returncode |
| |
| else: |
| print "Waiting SIPp (PID=" + str(sipp) + ") to exit.." |
| wait_cnt = 0 |
| while True: |
| try: |
| wait_cnt = wait_cnt + 1 |
| [pid_, ret_code] = os.waitpid(sipp, 0) |
| if sipp == pid_: |
| #print "SIPP returned ", ret_code |
| ret_code = ret_code >> 8 |
| |
| # Win specific: Close process handle |
| if (sys.platform == "win32"): |
| ctypes.windll.kernel32.CloseHandle(sipp) |
| |
| return ret_code |
| except os.error: |
| if wait_cnt <= 5: |
| print "Retry ("+str(wait_cnt)+") waiting SIPp.." |
| else: |
| return -99 |
| |
| |
| # Execute PJSUA flow |
| def exec_pjsua_expects(t, sipp): |
| # Get all PJSUA instances |
| ua = [] |
| for ua_idx in range(len(PJSUA_INST_PARAM)): |
| ua.append(t.process[ua_idx]) |
| |
| ua_err_st = "" |
| while len(PJSUA_EXPECTS): |
| expect = PJSUA_EXPECTS.pop(0) |
| ua_idx = expect[0] |
| expect_st = expect[1] |
| send_cmd = resolve_driver_macros(expect[2]) |
| # Handle exception in pjsua flow, to avoid zombie SIPp process |
| try: |
| if expect_st != "": |
| ua[ua_idx].expect(expect_st, raise_on_error = True) |
| if send_cmd != "": |
| ua[ua_idx].send(send_cmd) |
| except TestError, e: |
| ua_err_st = e.desc |
| break; |
| except: |
| ua_err_st = "Unknown error" |
| break; |
| |
| # Need to poll here for handling these cases: |
| # - If there is no PJSUA EXPECT scenario, we must keep polling the stdout, |
| # otherwise PJSUA process may stuck (due to stdout pipe buffer full?). |
| # - last PJSUA_EXPECT contains a pjsua command that needs time to |
| # finish, for example "v" (re-INVITE), the SIPp XML scenario may expect |
| # that re-INVITE transaction to be completed and without stdout poll |
| # PJSUA process may stuck. |
| # Ideally the poll should be done contiunously until SIPp process is |
| # terminated. |
| for ua_idx in range(len(ua)): |
| ua[ua_idx].expect(inc_const.STDOUT_REFRESH, raise_on_error = False) |
| |
| return ua_err_st |
| |
| |
| def sipp_err_to_str(err_code): |
| if err_code == 0: |
| return "All calls were successful" |
| elif err_code == 1: |
| return "At least one call failed" |
| elif err_code == 97: |
| return "exit on internal command. Calls may have been processed" |
| elif err_code == 99: |
| return "Normal exit without calls processed" |
| elif err_code == -1: |
| return "Fatal error (timeout)" |
| elif err_code == -2: |
| return "Fatal error binding a socket" |
| else: |
| return "Unknown error" |
| |
| |
| # Test body function |
| def TEST_FUNC(t): |
| |
| sipp_ret_code = 0 |
| ua_err_st = "" |
| |
| sipp = start_sipp() |
| if not sipp: |
| raise TestError("Failed starting SIPp") |
| |
| ua_err_st = exec_pjsua_expects(t, sipp) |
| |
| sipp_ret_code = wait_sipp(sipp) |
| |
| if ua_err_st != "": |
| raise TestError(ua_err_st) |
| |
| if sipp_ret_code: |
| rc = ctypes.c_byte(sipp_ret_code).value |
| raise TestError("SIPp returned error " + str(rc) + ": " + sipp_err_to_str(rc)) |
| |
| |
| # Here where it all comes together |
| test = TestParam(SIPP_SCEN_XML[:-4], |
| PJSUA_INST_PARAM, |
| TEST_FUNC) |