日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

locust压测工具:启动概述

發(fā)布時間:2023/12/20 编程问答 43 豆豆
生活随笔 收集整理的這篇文章主要介紹了 locust压测工具:启动概述 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

locust壓測工具啟動概述

本文環(huán)境python3.5.2 locust版本0.9.0

locust概述

locust是一個簡單易用、分布式的用戶壓測工具,并確定系統(tǒng)可以處理多少并發(fā)用戶。在測試的時候,讓你定義的每一個測試用例都執(zhí)行,并且可以從web界面來監(jiān)控所有執(zhí)行的用例的執(zhí)行過程。這將有助于您在讓真正的用戶進入之前對您的代碼進行測試和識別瓶頸。locust基于事件驅動,因此可以在一臺機器上支持數千個并發(fā)用戶,與許多其他基于事件的應用程序相比,它不使用回調。相反,它通過gevent,讓每個測試用例都高效獨立運行。locust提供了純凈的Python的編程環(huán)境,支持分布式與可擴展可支持數十萬用戶的并發(fā),提供了人性化的web界面進行監(jiān)控和操作讓使用更簡單,能夠測試很多系統(tǒng)。locust是一款基于好多組件來實現運行的壓測工具,包括依賴于gevent,requests和flask等許多成熟的第三方組件。本文就簡單的分析一下locust的簡單的啟動的執(zhí)行流程。

locust的啟動準備

首先引用官方文檔的示例代碼:

from locust import HttpLocust, TaskSet, taskclass UserBehavior(TaskSet):def on_start(self):""" on_start is called when a Locust start before any task is scheduled """self.login()def on_stop(self):""" on_stop is called when the TaskSet is stopping """self.logout()def login(self):self.client.post("/login", {"username":"ellen_key", "password":"education"})def logout(self):self.client.post("/logout", {"username":"ellen_key", "password":"education"})@task(2)def index(self):self.client.get("/")@task(1)def profile(self):self.client.get("/profile")class WebsiteUser(HttpLocust):task_set = UserBehaviormin_wait = 5000max_wait = 9000

然后將示例代碼保存為test_locust.py文件,配置好待測試的服務器的ip,在終端命令行中輸入:

locust -f ./test_locust.py --host=http://127.0.0.1:8000 --no-web

此時就啟動了locust來執(zhí)行,壓測的功能,此時終端上就會輸出當前的執(zhí)行的信息。

locust的啟動流程與概述

從執(zhí)行的locust過程可知,在終端中調用了locust來執(zhí)行,從locust的setup.py文件中可知,啟動的入口函數locust.main:main函數;

entry_points={'console_scripts': ['locust = locust.main:main',] },

此時查看main相關代碼:

def main():parser, options, arguments = parse_options() # 解析傳入的參數# setup loggingsetup_logging(options.loglevel, options.logfile) # 建立日志文件logger = logging.getLogger(__name__) # 獲取日志的Loggerif options.show_version: # 檢查是否是顯示Locust的版本號print("Locust %s" % (version,))sys.exit(0) # 打印版本后退出locustfile = find_locustfile(options.locustfile) # 尋找傳入的測試文件if not locustfile: # 如果沒有找到傳入的測試文件則報錯logger.error("Could not find any locustfile! Ensure file ends in '.py' and see --help for available options.")sys.exit(1)if locustfile == "locust.py": # 如果傳入的測試文件名就是locust.py文件則報錯logger.error("The locustfile must not be named `locust.py`. Please rename the file and try again.")sys.exit(1)docstring, locusts = load_locustfile(locustfile) # 加載傳入的實例文件并查找相關的Locust類if options.list_commands:console_logger.info("Available Locusts:")for name in locusts:console_logger.info(" " + name)sys.exit(0)if not locusts: # 如果文件中沒有Locust類被找到則退出logger.error("No Locust class found!")sys.exit(1)# make sure specified Locust existsif arguments: # 檢查是否存在Locust類missing = set(arguments) - set(locusts.keys()) # 減去已經找到的Locust類if missing: # 如果有缺失的則報錯退出logger.error("Unknown Locust(s): %s\n" % (", ".join(missing)))sys.exit(1)else:names = set(arguments) & set(locusts.keys()) # 取并集locust_classes = [locusts[n] for n in names] else:# list() call is needed to consume the dict_view object in Python 3locust_classes = list(locusts.values()) # 獲取locust類列表if options.show_task_ratio: # 是否顯示任務信息console_logger.info("\n Task ratio per locust class")console_logger.info( "-" * 80)print_task_ratio(locust_classes)console_logger.info("\n Total task ratio")console_logger.info("-" * 80)print_task_ratio(locust_classes, total=True)sys.exit(0) if options.show_task_ratio_json: # 是否是以json的方式導出執(zhí)行信息from json import dumpstask_data = {"per_class": get_task_ratio_dict(locust_classes), "total": get_task_ratio_dict(locust_classes, total=True)}console_logger.info(dumps(task_data))sys.exit(0)if options.run_time: # 是否設置了運行結束時間if not options.no_web: # 檢查是否啟動了web界面展示logger.error("The --run-time argument can only be used together with --no-web")sys.exit(1)try:options.run_time = parse_timespan(options.run_time) # 解析成時間戳except ValueError:logger.error("Valid --run-time formats are: 20, 20s, 3m, 2h, 1h20m, 3h30m10s, etc.")sys.exit(1)def spawn_run_time_limit_greenlet(): # 定義退出函數logger.info("Run time limit set to %s seconds" % options.run_time)def timelimit_stop():logger.info("Time limit reached. Stopping Locust.")runners.locust_runner.quit()gevent.spawn_later(options.run_time, timelimit_stop) # 指定時間后調用退出方法if not options.no_web and not options.slave: # 檢查是否是開啟了web與從模式# spawn web greenletlogger.info("Starting web monitor at %s:%s" % (options.web_host or "*", options.port))main_greenlet = gevent.spawn(web.start, locust_classes, options) # 啟動web并開始if not options.master and not options.slave: # 如果既不是主也不是從模式runners.locust_runner = LocalLocustRunner(locust_classes, options) # 默認locust_runner為LocalLocustRunner類# spawn client spawning/hatching greenletif options.no_web: # 如果沒有web展示模式runners.locust_runner.start_hatching(wait=True) # 則開始執(zhí)行main_greenlet = runners.locust_runner.greenlet # 設置main_greenletif options.run_time: # 是否設置了運行結束時間spawn_run_time_limit_greenlet() # 如果設置了運行結束時間則在指定時間后退出elif options.master: # 是否是主進程模式runners.locust_runner = MasterLocustRunner(locust_classes, options) # 設置啟動類為MasterLocustRunnerif options.no_web: # 如果沒有web展示while len(runners.locust_runner.clients.ready)<options.expect_slaves:logging.info("Waiting for slaves to be ready, %s of %s connected",len(runners.locust_runner.clients.ready), options.expect_slaves)time.sleep(1)runners.locust_runner.start_hatching(options.num_clients, options.hatch_rate) # 啟動運行main_greenlet = runners.locust_runner.greenlet # 設置主greenletif options.run_time: # 是否設置了運行時間限制spawn_run_time_limit_greenlet() # 注冊指定時間后退出函數elif options.slave: # 檢查啟動是否是從模式if options.run_time: # 從模式不能設置執(zhí)行結束時間logger.error("--run-time should be specified on the master node, and not on slave nodes")sys.exit(1)try:runners.locust_runner = SlaveLocustRunner(locust_classes, options) # 設置從locust類main_greenlet = runners.locust_runner.greenletexcept socket.error as e:logger.error("Failed to connect to the Locust master: %s", e)sys.exit(-1)if not options.only_summary and (options.print_stats or (options.no_web and not options.slave)):# spawn stats printing greenletgevent.spawn(stats_printer) # 輸出相關調試信息if options.csvfilebase:gevent.spawn(stats_writer, options.csvfilebase) # 是否輸出csv文件def shutdown(code=0):"""Shut down locust by firing quitting event, printing/writing stats and exiting"""logger.info("Shutting down (exit code %s), bye." % code)logger.info("Cleaning up runner...")if runners.locust_runner is not None: # 接收到退出信號量的時候runners.locust_runner.quit() # 調用locust_runner的quit方法logger.info("Running teardowns...")events.quitting.fire(reverse=True) # 調用退出的注冊的所有方法print_stats(runners.locust_runner.request_stats) # 打印相關信息print_percentile_stats(runners.locust_runner.request_stats)if options.csvfilebase:write_stat_csvs(options.csvfilebase)print_error_report()sys.exit(code) # 退出# install SIGTERM handlerdef sig_term_handler():logger.info("Got SIGTERM signal")shutdown(0)gevent.signal(signal.SIGTERM, sig_term_handler) # 注冊信號try:logger.info("Starting Locust %s" % version) main_greenlet.join() # 開始啟動code = 0if len(runners.locust_runner.errors): # 如果出錯code = 1shutdown(code=code) # 調用shutdown退出except KeyboardInterrupt as e:shutdown(0) # 退出

從該函數可知,首先會解析傳入參數,然后根據傳入的參數依次去查找待執(zhí)行的示例文件,通過加裝的示例文件查找相關的Locust類,然后根據傳入的是否展示web界面等信息執(zhí)行執(zhí)行運行啟動,在本例的示例代碼中,由于不使用web來控制,所以就直接運行,此時我們繼續(xù)分析查找locust文件的方法,find_locustfile函數;

def find_locustfile(locustfile):"""Attempt to locate a locustfile, either explicitly or by searching parent dirs."""# Obtain env valuenames = [locustfile] # 傳入的路徑名稱# Create .py version if necessaryif not names[0].endswith('.py'): # 檢查是否已.py結束的文件names += [names[0] + '.py'] # 沒有已.py結束則添加后綴# Does the name contain path elements?if os.path.dirname(names[0]): # 是否是文件夾# If so, expand home-directory markers and test for existencefor name in names: # 查找路徑名expanded = os.path.expanduser(name)if os.path.exists(expanded): # 如果存在if name.endswith('.py') or _is_package(expanded): # 是否已.py結尾或者是否可導入 return os.path.abspath(expanded) # 返回文件的絕對路徑else:# Otherwise, start in cwd and work downwards towards filesystem root path = os.path.abspath('.') # 從當前文件夾開始查找while True:for name in names: # 遍歷joined = os.path.join(path, name) # 添加到文件名稱中if os.path.exists(joined): # 檢查是否存在該路徑if name.endswith('.py') or _is_package(joined): # 是否已.py結尾或者是可導入return os.path.abspath(joined) # 返回文件絕對路勁parent_path = os.path.dirname(path) # 獲取自目錄if parent_path == path: # 依次遍歷檢查# we've reached the root path which has been checked this iterationbreakpath = parent_path# Implicit 'return None' if nothing was found

該函數就是一直去尋找定義的示例文件并返回文件的絕對路徑,當找到了絕對路徑之后就會去導入該模塊,此時我們繼續(xù)分析load_locustfile函數的執(zhí)行過程:

def is_locust(tup):"""Takes (name, object) tuple, returns True if it's a public Locust subclass."""name, item = tup return bool(inspect.isclass(item) # 是否是類and issubclass(item, Locust) # 是否是Locust的子類and hasattr(item, "task_set") # 是否有task_set屬性and getattr(item, "task_set") # 獲取task_set屬性and not name.startswith('_') # 不以'_'名稱開頭)def load_locustfile(path):"""Import given locustfile path and return (docstring, callables).Specifically, the locustfile's ``__doc__`` attribute (a string) and adictionary of ``{'name': callable}`` containing all callables which passthe "is a Locust" test."""def __import_locustfile__(filename, path):"""Loads the locust file as a module, similar to performing `import`"""try:# Python 3 compatiblesource = importlib.machinery.SourceFileLoader(os.path.splitext(locustfile)[0], path) # 導入moudle獲取導入的屬性imported = source.load_module()except AttributeError:# Python 2.7 compatibleimport impimported = imp.load_source(os.path.splitext(locustfile)[0], path)return imported# Get directory and locustfile namedirectory, locustfile = os.path.split(path) # 分離路徑# If the directory isn't in the PYTHONPATH, add it so our import will workadded_to_path = Falseindex = Noneif directory not in sys.path: # 檢查文件夾是否在系統(tǒng)路徑中sys.path.insert(0, directory) # 不在系統(tǒng)路徑中則插入added_to_path = True# If the directory IS in the PYTHONPATH, move it to the front temporarily,# otherwise other locustfiles -- like Locusts's own -- may scoop the intended# one.else:i = sys.path.index(directory) # 獲取文件夾的索引if i != 0:# Store index for later restorationindex = i# Add to front, then remove from original positionsys.path.insert(0, directory) # 如果不為0 則插入第一個路徑del sys.path[i + 1]# Perform the importimported = __import_locustfile__(locustfile, path) # 導入該module# Remove directory from path if we added it ourselves (just to be neat)if added_to_path:del sys.path[0]# Put back in original index if we moved itif index is not None:sys.path.insert(index + 1, directory)del sys.path[0]# Return our two-tuplelocusts = dict(filter(is_locust, vars(imported).items())) # 獲取導入module的所有的環(huán)境的值并依次傳入is_locust函數進行檢查return imported.__doc__, locusts # 返回導入的module的注釋,并返回找到的Locust類

此時就找出了在示例文件test_locust.py中的WebsiteUser,因為該類繼承自HttpLocust,此時在main的執(zhí)行流程中就會繼續(xù)執(zhí)行到

if not options.master and not options.slave:runners.locust_runner = LocalLocustRunner(locust_classes, options)# spawn client spawning/hatching greenletif options.no_web:runners.locust_runner.start_hatching(wait=True)main_greenlet = runners.locust_runner.greenletif options.run_time:spawn_run_time_limit_greenlet()

此時,locust_runner就是LocalLocustRunner,并且調用了該實例的start_hatching(wait=True)函數;

class LocalLocustRunner(LocustRunner):def __init__(self, locust_classes, options):super(LocalLocustRunner, self).__init__(locust_classes, options)# register listener thats logs the exception for the local runnerdef on_locust_error(locust_instance, exception, tb):formatted_tb = "".join(traceback.format_tb(tb))self.log_exception("local", str(exception), formatted_tb)events.locust_error += on_locust_errordef start_hatching(self, locust_count=None, hatch_rate=None, wait=False):self.hatching_greenlet = gevent.spawn(lambda: super(LocalLocustRunner, self).start_hatching(locust_count, hatch_rate, wait=wait)) # 調用了父類LocustRunner的start_hatching方法 self.greenlet = self.hatching_greenlet # 設置了greenletclass LocustRunner(object):def __init__(self, locust_classes, options):self.options = options # 解析的傳入參數self.locust_classes = locust_classes # 找到的locustself.hatch_rate = options.hatch_rate # 執(zhí)行速率self.num_clients = options.num_clients # 啟動的客戶端數量self.host = options.host # 訪問的hostself.locusts = Group()self.greenlet = self.locustsself.state = STATE_INITself.hatching_greenlet = Noneself.exceptions = {}self.stats = global_stats# register listener that resets stats when hatching is completedef on_hatch_complete(user_count):self.state = STATE_RUNNINGif self.options.reset_stats:logger.info("Resetting stats\n")self.stats.reset_all()events.hatch_complete += on_hatch_completedef weight_locusts(self, amount, stop_timeout = None):"""Distributes the amount of locusts for each WebLocust-class according to it's weightreturns a list "bucket" with the weighted locusts"""bucket = []weight_sum = sum((locust.weight for locust in self.locust_classes if locust.task_set)) # 計算總的權重for locust in self.locust_classes: # 遍歷locust列表if not locust.task_set: # 如果locust沒有task_set屬性則跳過warnings.warn("Notice: Found Locust class (%s) got no task_set. Skipping..." % locust.__name__)continueif self.host is not None: # 檢查是否有hostlocust.host = self.hostif stop_timeout is not None: # 檢查是否有體制時間locust.stop_timeout = stop_timeout# create locusts depending on weightpercent = locust.weight / float(weight_sum) # 計算權重num_locusts = int(round(amount * percent)) # 獲取需要啟動的客戶端數量bucket.extend([locust for x in xrange(0, num_locusts)]) # 生成對應數量的locusts數量return bucketdef spawn_locusts(self, spawn_count=None, stop_timeout=None, wait=False):if spawn_count is None: # 獲取生成客戶端的數量spawn_count = self.num_clientsbucket = self.weight_locusts(spawn_count, stop_timeout) # 傳入數量并計算權重之后哦返回待執(zhí)行的locust實例列表spawn_count = len(bucket)if self.state == STATE_INIT or self.state == STATE_STOPPED: # 如果是初始化狀態(tài)或停止狀態(tài)self.state = STATE_HATCHING # 修改為hatching狀態(tài)self.num_clients = spawn_count # 設置數量else:self.num_clients += spawn_count # 否則添加數量logger.info("Hatching and swarming %i clients at the rate %g clients/s..." % (spawn_count, self.hatch_rate))occurence_count = dict([(l.__name__, 0) for l in self.locust_classes])def hatch():sleep_time = 1.0 / self.hatch_rate # 通過速率獲取睡眠的時間while True:if not bucket: # 如果為0則執(zhí)行完成logger.info("All locusts hatched: %s" % ", ".join(["%s: %d" % (name, count) for name, count in six.iteritems(occurence_count)]))events.hatch_complete.fire(user_count=self.num_clients)returnlocust = bucket.pop(random.randint(0, len(bucket)-1)) # 隨機獲取一個locust實例occurence_count[locust.__name__] += 1 # 記錄該locust實例執(zhí)行的次數def start_locust(_):try:locust().run(runner=self) # 調用locust實例的run方法except GreenletExit:passnew_locust = self.locusts.spawn(start_locust, locust) # 執(zhí)行start_locust函數if len(self.locusts) % 10 == 0:logger.debug("%i locusts hatched" % len(self.locusts))gevent.sleep(sleep_time) # 休息執(zhí)行時間hatch() # 調用hatch函數運行if wait:self.locusts.join()logger.info("All locusts dead\n")def start_hatching(self, locust_count=None, hatch_rate=None, wait=False):if self.state != STATE_RUNNING and self.state != STATE_HATCHING: # 不是運行狀態(tài)并且不是hatching狀態(tài)self.stats.clear_all()self.stats.start_time = time()self.exceptions = {}events.locust_start_hatching.fire()# Dynamically changing the locust countif self.state != STATE_INIT and self.state != STATE_STOPPED: # 是否初始化狀態(tài)并且不是停止狀態(tài)self.state = STATE_HATCHINGif self.num_clients > locust_count:# Kill some locustskill_count = self.num_clients - locust_countself.kill_locusts(kill_count)elif self.num_clients < locust_count:# Spawn some locustsif hatch_rate:self.hatch_rate = hatch_ratespawn_count = locust_count - self.num_clientsself.spawn_locusts(spawn_count=spawn_count)else:events.hatch_complete.fire(user_count=self.num_clients)else:if hatch_rate: # 是否設置了速率self.hatch_rate = hatch_rateif locust_count is not None: # 如果傳入了數量self.spawn_locusts(locust_count, wait=wait) # 傳入參數else: self.spawn_locusts(wait=wait) # 啟動

從該類的分析可知,最終調用的都是Locust的run方法,

class Locust(object):"""Represents a "user" which is to be hatched and attack the system that is to be load tested.The behaviour of this user is defined by the task_set attribute, which should point to a :py:class:`TaskSet <locust.core.TaskSet>` class.This class should usually be subclassed by a class that defines some kind of client. For example when load testing an HTTP system, you probably want to use the :py:class:`HttpLocust <locust.core.HttpLocust>` class."""host = None"""Base hostname to swarm. i.e: http://127.0.0.1:1234"""min_wait = 1000"""Minimum waiting time between the execution of locust tasks"""max_wait = 1000"""Maximum waiting time between the execution of locust tasks"""wait_function = lambda self: random.randint(self.min_wait,self.max_wait) """Function used to calculate waiting time between the execution of locust tasks in milliseconds"""task_set = None"""TaskSet class that defines the execution behaviour of this locust"""stop_timeout = None"""Number of seconds after which the Locust will die. If None it won't timeout."""weight = 10"""Probability of locust being chosen. The higher the weight, the greater is the chance of it being chosen."""client = NoClientWarningRaiser()_catch_exceptions = True_setup_has_run = False # Internal state to see if we have already run_teardown_is_set = False # Internal state to see if we have already run_lock = gevent.lock.Semaphore() # Lock to make sure setup is only run oncedef __init__(self):super(Locust, self).__init__()self._lock.acquire()if hasattr(self, "setup") and self._setup_has_run is False:self._set_setup_flag()self.setup()if hasattr(self, "teardown") and self._teardown_is_set is False:self._set_teardown_flag()events.quitting += self.teardownself._lock.release()@classmethoddef _set_setup_flag(cls):cls._setup_has_run = True@classmethoddef _set_teardown_flag(cls):cls._teardown_is_set = Truedef run(self, runner=None):task_set_instance = self.task_set(self) # 實例化task_set實例try:task_set_instance.run() # 調用task_set實例的run方法except StopLocust:passexcept (RescheduleTask, RescheduleTaskImmediately) as e:six.reraise(LocustError, LocustError("A task inside a Locust class' main TaskSet (`%s.task_set` of type `%s`) seems to have called interrupt() or raised an InterruptTaskSet exception. The interrupt() function is used to hand over execution to a parent TaskSet, and should never be called in the main TaskSet which a Locust class' task_set attribute points to." % (type(self).__name__, self.task_set.__name__)), sys.exc_info()[2])except GreenletExit as e:if runner:runner.state = STATE_CLEANUP# Run the task_set on_stop method, if it has oneif hasattr(task_set_instance, "on_stop"):task_set_instance.on_stop()raise # Maybe something relies on this except being raised?

由該類可知,最終調用了tesk_set實例的run方法,該類TaskSet的代碼如下:

class TaskSetMeta(type):"""Meta class for the main Locust class. It's used to allow Locust classes to specify task execution ratio using an {task:int} dict, or a [(task0,int), ..., (taskN,int)] list."""def __new__(mcs, classname, bases, classDict):new_tasks = []for base in bases: # 檢查父類if hasattr(base, "tasks") and base.tasks: # 如果父類有tasksnew_tasks += base.tasks # 添加父類的tasksif "tasks" in classDict and classDict["tasks"] is not None: # 檢查tasks是否在屬性列表中tasks = classDict["tasks"] # 獲取傳入的tasks數據if isinstance(tasks, dict):tasks = six.iteritems(tasks) # 迭代tasksfor task in tasks: # 遍歷tasksif isinstance(task, tuple): # 如果是元組task, count = task for i in xrange(0, count):new_tasks.append(task) # 添加到tasks列表中else: new_tasks.append(task) # 直接添加到tasks列表中for item in six.itervalues(classDict):if hasattr(item, "locust_task_weight"): # 檢查是否有l(wèi)ocust_task_weight屬性for i in xrange(0, item.locust_task_weight): # 添加對應數量的任務到列表中new_tasks.append(item)classDict["tasks"] = new_tasks # 重新設置tasks的列表return type.__new__(mcs, classname, bases, classDict) # 生成該類@six.add_metaclass(TaskSetMeta) class TaskSet(object):tasks = []def __init__(self, parent):self._task_queue = []self._time_start = time()if isinstance(parent, TaskSet):self.locust = parent.locustelif isinstance(parent, Locust):self.locust = parentelse:raise LocustError("TaskSet should be called with Locust instance or TaskSet instance as first argument")self.parent = parent# if this class doesn't have a min_wait, max_wait or wait_function defined, copy it from Locustif not self.min_wait:self.min_wait = self.locust.min_waitif not self.max_wait:self.max_wait = self.locust.max_waitif not self.wait_function:self.wait_function = self.locust.wait_functionself._lock.acquire()if hasattr(self, "setup") and self._setup_has_run is False:self._set_setup_flag()self.setup()if hasattr(self, "teardown") and self._teardown_is_set is False:self._set_teardown_flag()events.quitting += self.teardownself._lock.release()@classmethoddef _set_setup_flag(cls):cls._setup_has_run = True@classmethoddef _set_teardown_flag(cls):cls._teardown_is_set = Truedef run(self, *args, **kwargs):self.args = argsself.kwargs = kwargstry:if hasattr(self, "on_start"): # 檢查是否有on_start屬性,有則調用該函數self.on_start()except InterruptTaskSet as e:if e.reschedule:six.reraise(RescheduleTaskImmediately, RescheduleTaskImmediately(e.reschedule), sys.exc_info()[2])else:six.reraise(RescheduleTask, RescheduleTask(e.reschedule), sys.exc_info()[2])while (True):try:if self.locust.stop_timeout is not None and time() - self._time_start > self.locust.stop_timeout: # 檢查執(zhí)行的時間是否已到,如果到了則返回returnif not self._task_queue: # 檢查任務列表是否有任務self.schedule_task(self.get_next_task()) # 有待執(zhí)行的任務則調用該任務try: self.execute_next_task() # 執(zhí)行下一個待調度的任務except RescheduleTaskImmediately:passexcept RescheduleTask:self.wait() # 等待else:self.wait()except InterruptTaskSet as e:if e.reschedule:six.reraise(RescheduleTaskImmediately, RescheduleTaskImmediately(e.reschedule), sys.exc_info()[2])else:six.reraise(RescheduleTask, RescheduleTask(e.reschedule), sys.exc_info()[2])except StopLocust:raiseexcept GreenletExit:raiseexcept Exception as e:events.locust_error.fire(locust_instance=self, exception=e, tb=sys.exc_info()[2])if self.locust._catch_exceptions:sys.stderr.write("\n" + traceback.format_exc())self.wait()else:raisedef execute_next_task(self):task = self._task_queue.pop(0) # 獲取任務隊列的第一個任務self.execute_task(task["callable"], *task["args"], **task["kwargs"]) # 執(zhí)行該任務def execute_task(self, task, *args, **kwargs):# check if the function is a method bound to the current locust, and if so, don't pass self as first argumentif hasattr(task, "__self__") and task.__self__ == self: # 檢查是否有__self__屬性并且__self__是否為自己# task is a bound method on selftask(*args, **kwargs) # 是則直接調用task執(zhí)行elif hasattr(task, "tasks") and issubclass(task, TaskSet):# task is another (nested) TaskSet classtask(self).run(*args, **kwargs) # 檢查是否有tasks屬性并且是否是TaskSet的子類如果是則繼續(xù)調用run方法else:# task is a functiontask(self, *args, **kwargs) # 如果task是個function則直接調用def schedule_task(self, task_callable, args=None, kwargs=None, first=False):"""Add a task to the Locust's task execution queue.*Arguments*:* task_callable: Locust task to schedule* args: Arguments that will be passed to the task callable* kwargs: Dict of keyword arguments that will be passed to the task callable.* first: Optional keyword argument. If True, the task will be put first in the queue."""task = {"callable":task_callable, "args":args or [], "kwargs":kwargs or {}}if first:self._task_queue.insert(0, task) # 如果是第一個任務則添加到_task_queue中else:self._task_queue.append(task) # 否則就添加到_task_queue任務列表中def get_next_task(self):return random.choice(self.tasks) # 隨機抽取一個任務def get_wait_secs(self):millis = self.wait_function() # return millis / 1000.0def wait(self):self._sleep(self.get_wait_secs())def _sleep(self, seconds):gevent.sleep(seconds)

其中使用到了元類編程的相關知識,此時就將實例代碼中的如下兩個任務執(zhí)行了;

@task(2) def index(self):self.client.get("/")@task(1) def profile(self):self.client.get("/profile")

task的函數如下,

def task(weight=1):"""Used as a convenience decorator to be able to declare tasks for a TaskSet inline in the class. Example::class ForumPage(TaskSet):@task(100)def read_thread(self):pass@task(7)def create_thread(self):pass"""def decorator_func(func):func.locust_task_weight = weightreturn func"""Check if task was used without parentheses (not called), like this::@taskdef my_task()pass"""if callable(weight):func = weightweight = 1return decorator_func(func)else:return decorator_func

給每個定義的函數都添加了locust_task_weight屬性,讓這些方法都在TaskSet初始化的時候都被添加到tasks屬性中了,然后在task被調用的時候執(zhí)行,至此,locust的基本的task的調用流程基本完成。

總結

locust壓測工具支持多種的應用場景,是用Python實現的一個壓測框架,本文首先只是簡單的分析了一下該框架的基本的執(zhí)行流程,就是通過執(zhí)行定義的task來實現測試用例的執(zhí)行,本文還有些許細節(jié)沒有詳細說明,只是簡單的概述了啟動執(zhí)行的大致流程,啟動執(zhí)行的更為詳細的流程大家有興趣可自行查閱。鑒于本人才疏學淺,如有疏漏請批評指正。

總結

以上是生活随笔為你收集整理的locust压测工具:启动概述的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。