blob: 4255ff65cf5702248406846066decc16ee73f1bf [file] [log] [blame]
#
# Copyright (C) 2004-2020 Savoir-faire Linux Inc.
#
# Author: Eloi Bail <eloi.bail@savoirfairelinux.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
import sys
import os
import time
try:
import configparser
except ImportError as e:
import ConfigParser as configparser
except Exception as e:
print(str(e))
exit(1)
from threading import Thread
from random import shuffle
from errors import *
ALL_TEST_NAME = {
'TestConfig': 'testConfig',
'LoopCallDht': 'testLoopCallDht',
'VideoBitrateDHT': 'testLoopCallDhtWithIncBitrate',
'SimultenousDHTCall' : 'testSimultaneousLoopCallDht',
'DhtCallHold' : 'testLoopCallDhtWithHold'
}
class DRingTester():
# init to default values
dhtAccountId = ''
sipAccountId = ''
dhtTestAccount = ''
sipTestAccount = ''
inputFile = ''
minBitrate = 0
maxBitrate = 0
incBitrate = 0
codecAudio = ''
codecVideo = ''
def testConfig(self, ctrl):
print("**[BEGIN] test config")
allCodecs = ctrl.getAllCodecs()
if len(allCodecs) == 0:
print("error no codec on the system")
return 0
print("**[SUCCESS] test config")
print("**[END] test config")
return 1
#
# Helpers
#
def getTestName(self):
return ALL_TEST_NAME.keys()
def checkIP2IPAccount(self, ctrl):
ipAccount = ctrl.getAllAccounts()
print(ipAccount)
if len(ipAccount) == 0:
print("no IP2IP account")
return 0
return 1
def registerAccount(self, ctrl, account):
print("registering:" + account)
ctrl.setAccountRegistered(account, True)
def setActiveCodecs(self, ctrl, account):
codecList = ctrl.getAllCodecs()
shuffle(codecList)
codecVideoId = codecAudioId = 0
nameCodecAudio = nameCodecVideo = ''
if (self.codecVideo == 'H264'):
codecVideoId = 1
nameCodecVideo = 'H264'
elif (self.codecVideo == 'VP8'):
codecVideoId = 2
nameCodecVideo = 'VP8'
elif (self.codecVideo == 'MPEG4'):
codecVideoId = 3
nameCodecVideo = 'MPEG4'
elif (self.codecVideo == 'H263'):
codecVideoId = 4
nameCodecVideo = 'H263'
elif (self.codecVideo == 'RANDOM'):
for aCodec in codecList:
detail = ctrl.getCodecDetails(account, aCodec)
if ( detail['CodecInfo.type'] == 'VIDEO'):
codecVideoId = aCodec
nameCodecVideo = detail['CodecInfo.name']
break
if (self.codecAudio == 'OPUS'):
codecAudioId = 5
nameCodecAudio = 'OPUS'
if (self.codecAudio == 'G722'):
codecAudioId = 6
nameCodecAudio = 'G722'
if (self.codecAudio == 'SPEEX32'):
codecAudioId = 7
nameCodecAudio = 'SPEEX32'
elif (self.codecAudio == 'SPEEX16'):
codecAudioId = 8
nameCodecAudio = 'SPEEX16'
elif (self.codecAudio == 'SPEEX8'):
codecAudioId = 9
nameCodecAudio = 'SPEEX8'
elif (self.codecAudio == 'PCMA'):
codecAudioId = 10
nameCodecAudio = 'PCMA'
elif (self.codecAudio == 'PCMU'):
codecAudioId = 11
nameCodecAudio = 'PCMU'
elif (self.codecAudio == 'RANDOM'):
for aCodec in codecList:
detail = ctrl.getCodecDetails(account, aCodec)
if ( detail['CodecInfo.type'] == 'AUDIO'):
codecAudioId = aCodec
nameCodecAudio = detail['CodecInfo.name']
break
if (codecAudioId == 0 or codecVideoId == 0):
print ("please configure at least one A/V codec !")
return
print ("selecting codecs audio= "+nameCodecAudio + " video= "+nameCodecVideo)
ctrl.setActiveCodecList(account, (str(codecVideoId)+","+ str(codecAudioId)))
def holdToggle(self, ctrl, callId, delay):
time.sleep(delay)
print("Holding: " + callId)
ctrl.Hold(callId)
time.sleep(delay)
print("UnHolding: " + callId)
ctrl.UnHold(callId)
#
# tests
#
# testLoopCallDht
# perform <nbIteration> DHT calls using <delay> between each call
def testLoopCallDht(self, ctrl, nbIteration, delay):
print("**[BEGIN] DHT Call Test")
count = 0
currBitrate = self.minBitrate
while count < nbIteration:
print("[%s/%s]" % (count, nbIteration))
self.setActiveCodecs(ctrl, self.dhtAccountId)
print("setting video bitrate to "+str(currBitrate))
ctrl.setVideoCodecBitrate(self.dhtAccountId, currBitrate)
callId = ctrl.Call(self.dhtTestAccount)
# switch to file input
ctrl.switchInput(callId,'file://'+self.inputFile)
time.sleep(delay)
ctrl.HangUp(callId)
count += 1
print("**[SUCCESS] DHT Call Test")
print("**[END] DHT Call Test")
# testLoopCallDhtWithHold
# perform <nbIteration> DHT calls using <delay> between each call
# perform stress hold/unhold between each call
def testLoopCallDhtWithHold(self, ctrl, nbIteration, delay):
print("**[BEGIN] DHT Call Test With Hold")
count = 0
while count < nbIteration:
print("[%s/%s]" % (count, nbIteration))
self.setActiveCodecs(ctrl, self.dhtAccountId)
callId = ctrl.Call(self.dhtTestAccount)
# switch to file input
ctrl.switchInput(callId,'file://'+self.inputFile)
delayHold = 5
nbHold = delay / (delayHold * 2)
countHold = 0
while countHold < nbHold:
self.holdToggle(ctrl, callId, delayHold)
countHold = countHold + 1
ctrl.HangUp(callId)
count += 1
print("**[SUCCESS] DHT Call Test With Hold")
print("**[END] DHT Call Test With Hold")
# testLoopCallDhtWithIncBitrate
# perform <nbIteration> DHT calls using <delay> between each call
# inc bitrate between each iteration
def testLoopCallDhtWithIncBitrate(self, ctrl, nbIteration, delay):
print("**[BEGIN] VIDEO Bitrate Test")
count = 0
currBitrate = self.minBitrate
while count < nbIteration:
print("[%s/%s]" % (count, nbIteration))
self.setActiveCodecs(ctrl, self.dhtAccountId)
print("setting video bitrate to "+str(currBitrate))
ctrl.setVideoCodecBitrate(self.dhtAccountId, currBitrate)
callId = ctrl.Call(self.dhtTestAccount)
# switch to file input
ctrl.switchInput(callId,'file://'+self.inputFile)
time.sleep(delay)
ctrl.HangUp(callId)
count += 1
currBitrate += self.incBitrate
if (currBitrate > self.maxBitrate):
currBitrate = self.minBitrate
print("**[SUCCESS] VIDEO Bitrate Test")
print("**[END] VIDEO Bitrate Test")
# testSimultaneousLoopCallDht
# perform <nbIteration> simultaneous DHT calls using <delay> between each call
def testSimultaneousLoopCallDht(self, ctrl, nbIteration, delay):
print("**[BEGIN] Simultaneous DHT call test")
count = 0
while count < nbIteration:
print("[%s/%s]" % (count, nbIteration))
self.setActiveCodecs(ctrl, self.dhtAccountId)
# do all the calls
currCall = 0
NB_SIMULTANEOUS_CALL = 10;
while currCall <= NB_SIMULTANEOUS_CALL:
ctrl.Call(self.dhtTestAccount)
time.sleep(1)
currCall = currCall + 1
time.sleep(delay)
# hangup each call
for callId in ctrl.getAllCalls():
ctrl.HangUp(callId)
count += 1
print("**[SUCCESS] Simultaneous DHT call test")
print("**[END] Simultaneous DHT call test")
# Main function
def start(self, ctrl, testName):
if not testName in ALL_TEST_NAME:
print(("wrong test name"))
return
#getConfig
self.dhtAccountId = ctrl.getAllAccounts('RING')[0]
self.sipAccountId = ctrl.getAllAccounts('SIP')[0]
config = configparser.ConfigParser()
config.read('test_config.ini')
self.dhtTestAccount = str(config['dht']['testAccount'])
self.sipTestAccount = str(config['sip']['testAccount'])
self.inputFile = str(config['codec']['inputFile'])
self.minBitrate = int(config['codec']['minBitrate'])
self.maxBitrate = int(config['codec']['maxBitrate'])
self.incBitrate = int(config['codec']['incBitrate'])
self.codecAudio = str(config['codec']['codecAudio'])
self.codecVideo = str(config['codec']['codecVideo'])
testNbIteration = config['iteration']['nbIteration']
delay = config['iteration']['delay']
getattr(self, ALL_TEST_NAME[testName])(ctrl,int(testNbIteration), int(delay))