# ---------------------------------------------------------------- # * Copyright (c) 2018-2023 # * Broadcom Corporation # * All Rights Reserved. # *--------------------------------------------------------------- # Redistribution and use in source and binary forms, with or without modification, are permitted # provided that the following conditions are met: # # Redistributions of source code must retain the above copyright notice, this list of conditions # and the following disclaimer. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the documentation and/or other # materials provided with the distribution. Neither the name of the Broadcom nor the names of # contributors may be used to endorse or promote products derived from this software without # specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR # IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND # FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USEn, # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER # IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT # OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # # Author Robert J. McMahon, Broadcom LTD # Date April 2016 - December 2023 import re import subprocess import logging import asyncio, sys import time, datetime import locale import signal import weakref import os import getpass import math import scipy import scipy.spatial import numpy as np import tkinter import ctypes import ipaddress import collections import csv from datetime import datetime as datetime, timezone from scipy import stats from scipy.cluster import hierarchy from scipy.cluster.hierarchy import linkage import matplotlib.pyplot as plt from collections import defaultdict logger = logging.getLogger(__name__) class iperf_flow(object): port = 61000 iperf = '/usr/bin/iperf' instances = weakref.WeakSet() _loop = None flow_scope = ("flowstats") tasks = [] flowid2name = defaultdict(str) @classmethod def get_instances(cls): return list(iperf_flow.instances) @classmethod @property def loop(cls): if not cls._loop : try : cls._loop = asyncio.get_running_loop() except : if os.name == 'nt': # On Windows, the ProactorEventLoop is necessary to listen on pipes cls._loop = asyncio.ProactorEventLoop() else: cls._loop = asyncio.new_event_loop() return cls._loop @classmethod def close_loop(cls): if iperf_flow.loop.is_running(): iperf_flow.loop.run_until_complete(loop.shutdown_asyncgens()) iperf_flow.loop.close() @classmethod def sleep(cls, time=0, text=None, stoptext=None) : if text : logging.info('Sleep {} ({})'.format(time, text)) iperf_flow.loop.run_until_complete(asyncio.sleep(time)) if stoptext : logging.info('Sleep done ({})'.format(stoptext)) @classmethod def run(cls, time=None, amount=None, flows='all', sample_delay=None, io_timer=None, preclean=True, parallel=None, epoch_sync=False) : if flows == 'all' : flows = iperf_flow.get_instances() if not flows: logging.warn('flow run method called with no flows instantiated') return if preclean: hosts = [flow.server for flow in flows] hosts.extend([flow.client for flow in flows]) hosts=list(set(hosts)) tasks = [asyncio.ensure_future(iperf_flow.cleanup(user='root', host=host), loop=iperf_flow.loop) for host in hosts] try : iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10)) except asyncio.TimeoutError: logging.error('preclean timeout') raise logging.info('flow run invoked') tasks = [asyncio.ensure_future(flow.rx.start(time=time), loop=iperf_flow.loop) for flow in flows] try : iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10)) except asyncio.TimeoutError: logging.error('flow server start timeout') raise iperf_flow.sleep(time=0.3, text="wait for rx up", stoptext="rx up done") if epoch_sync : dt = (datetime.now()).timestamp() tsec = str(dt).split('.') epoch_sync_time = int(tsec[0]) + 2 else : epoch_sync_time = None tasks = [asyncio.ensure_future(flow.tx.start(time=time, amount=amount, parallel=parallel, epoch_sync_time=epoch_sync_time), loop=iperf_flow.loop) for flow in flows] try : iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10)) except asyncio.TimeoutError: logging.error('flow client start timeout') raise if sample_delay : iperf_flow.sleep(time=0.3, text="ramp up", stoptext="ramp up done") if io_timer : tasks = [asyncio.ensure_future(flow.is_traffic(), loop=iperf_flow.loop) for flow in flows] try : iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10)) except asyncio.TimeoutError: logging.error('flow traffic check timeout') raise if time : iperf_flow.sleep(time=time + 4, text="Running traffic start", stoptext="Stopping flows") # Signal the remote iperf client sessions to stop them tasks = [asyncio.ensure_future(flow.tx.signal_stop(), loop=iperf_flow.loop) for flow in flows] try : iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=3)) except asyncio.TimeoutError: logging.error('flow tx stop timeout') raise elif amount: tasks = [asyncio.ensure_future(flow.transmit_completed(), loop=iperf_flow.loop) for flow in flows] try : iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10)) except asyncio.TimeoutError: logging.error('flow tx completed timed out') raise logging.info('flow transmit completed') # Now signal the remote iperf server sessions to stop them tasks = [asyncio.ensure_future(flow.rx.signal_stop(), loop=iperf_flow.loop) for flow in flows] try : iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=3)) except asyncio.TimeoutError: logging.error('flow tx stop timeout') raise # iperf_flow.loop.close() logging.info('flow run finished') @classmethod def commence(cls, time=None, flows='all', sample_delay=None, io_timer=None, preclean=True) : if flows == 'all' : flows = iperf_flow.get_instances() if not flows: logging.warn('flow run method called with no flows instantiated') return if preclean: hosts = [flow.server for flow in flows] hosts.extend([flow.client for flow in flows]) hosts=list(set(hosts)) tasks = [asyncio.ensure_future(iperf_flow.cleanup(user='root', host=host), loop=iperf_flow.loop) for host in hosts] try : iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10)) except asyncio.TimeoutError: logging.error('preclean timeout') raise logging.info('flow start invoked') tasks = [asyncio.ensure_future(flow.rx.start(time=time), loop=iperf_flow.loop) for flow in flows] try : iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10)) except asyncio.TimeoutError: logging.error('flow server start timeout') raise iperf_flow.sleep(time=0.3, text="wait for rx up", stoptext="rx up done") tasks = [asyncio.ensure_future(flow.tx.start(time=time), loop=iperf_flow.loop) for flow in flows] try : iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10)) except asyncio.TimeoutError: logging.error('flow client start timeout') raise @classmethod def plot(cls, flows='all', title='None', directory='None') : if flows == 'all' : flows = iperf_flow.get_instances() tasks = [] for flow in flows : for this_name in flow.histogram_names : path = directory + '/' + this_name os.makedirs(path, exist_ok=True) i = 0 # group by name histograms = [h for h in flow.histograms if h.name == this_name] for histogram in histograms : if histogram.ks_index is not None : histogram.output_dir = directory + '/' + this_name + '/' + this_name + str(i) else : histogram.output_dir = directory + '/' + this_name + '/' + this_name + str(histogram.ks_index) logging.info('scheduling task {}'.format(histogram.output_dir)) tasks.append(asyncio.ensure_future(histogram.async_plot(directory=histogram.output_dir, title=title), loop=iperf_flow.loop)) i += 1 try : logging.info('runnings tasks') iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=600)) except asyncio.TimeoutError: logging.error('plot timed out') raise @classmethod def cease(cls, flows='all') : if flows == 'all' : flows = iperf_flow.get_instances() # Signal the remote iperf client sessions to stop them tasks = [asyncio.ensure_future(flow.tx.signal_stop(), loop=iperf_flow.loop) for flow in flows] try : iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10)) except asyncio.TimeoutError: logging.error('flow tx stop timeout') # Now signal the remote iperf server sessions to stop them tasks = [asyncio.ensure_future(flow.rx.signal_stop(), loop=iperf_flow.loop) for flow in flows] try : iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10)) except asyncio.TimeoutError: logging.error('flow rx stop timeout') @classmethod async def cleanup(cls, host=None, sshcmd='/usr/bin/ssh', user='root') : if host: logging.info('ssh {}@{} pkill iperf'.format(user, host)) childprocess = await asyncio.create_subprocess_exec(sshcmd, '{}@{}'.format(user, host), 'pkill', 'iperf', stdout=subprocess.PIPE, stderr=subprocess.STDOUT) stdout, _ = await childprocess.communicate() if stdout: logging.info('cleanup: host({}) stdout={} '.format(host, stdout)) @classmethod def tos_to_txt(cls, tos) : switcher = { int(0x0) : "BE", int(0x02) : "BK", int(0xC0) : "VO", int(0x80) : "VI", } return switcher.get(int(tos), None) @classmethod def txt_to_tos(cls, txt) : switcher = { "BE" : "0x0", "BESTEFFORT" : "0x0", "0x0" : "0x0", "BK" : "0x20", "BACKGROUND" : "0x20", "0x20" : "0x20", "VO" : "0xC0", "VOICE" : "0xC0", "0xC0" : "0xC0", "VI" : "0x80", "VIDEO" : "0x80", "0x80" : "0x80", } return switcher.get(txt.upper(), None) def __init__(self, name='iperf', server=None, client=None, user=None, proto='TCP', dstip='127.0.0.1', interval=1, format='b', offered_load=None, tos='BE', window='4M', src=None, srcip=None, srcport=None, dstport=None, debug=False, length=None, ipg=0.0, amount=None, trip_times=True, prefetch=None, latency=False, bb=False, working_load=False, bb_period=None, bb_hold=None, txstart_delay_sec=None, burst_size=None, burst_period=None, fullduplex=False, cca=None, tcp_tx_delay=None): iperf_flow.instances.add(self) self.name = name self.latency = latency if not dstport : iperf_flow.port += 1 self.dstport = iperf_flow.port else: self.dstport = dstport self.dstip = dstip self.srcip = srcip self.srcport = srcport try : self.server = server.ipaddr except AttributeError: self.server = server try : self.client = client.ipaddr except AttributeError: self.client = client self.client_device = client.device self.server_device = server.device if not user : self.user = getpass.getuser() else : self.user = user self.proto = proto self.tcp_tx_delay = tcp_tx_delay self.tos = tos if length : self.length = length if amount : self.amount = amount if trip_times : self.trip_times = trip_times if burst_period : self.burst_period = burst_period if burst_size : self.burst_size = burst_size if txstart_delay_sec: self.txstart_delay_sec = txstart_delay_sec if cca: self.cca = cca self.interval = round(interval,3) self.format = format self.offered_load = offered_load if self.offered_load : if len(self.offered_load.split(':')) == 2 : self.isoch = True self.name += '-isoch' else : self.isoch = False self.prefetch = prefetch self.ipg = ipg self.debug = debug self.TRAFFIC_EVENT_TIMEOUT = round(self.interval * 4, 3) self.bb = bb self.working_load = working_load self.bb_period = bb_period self.bb_hold = bb_hold self.fullduplex = fullduplex # use python composition for the server and client # i.e. a flow has a server and a client self.rx = iperf_server(name='{}->RX({})'.format(name, str(self.server)), loop=iperf_flow.loop, host=self.server, flow=self, debug=self.debug) self.tx = iperf_client(name='{}->TX({})'.format(name, str(self.client)), loop=iperf_flow.loop, host=self.client, flow=self, debug=self.debug) self.rx.window=window self.tx.window=window self.ks_critical_p = 0.01 self.stats_reset() #def __del__(self) : # iperf_flow.instances.remove(self) def destroy(self) : iperf_flow.instances.remove(self) def __getattr__(self, attr) : if attr in self.flowstats : return self.flowstats[attr] def stats_reset(self) : # Initialize the flow stats dictionary self.flowstats = {'current_rxbytes' : None , 'current_txbytes' : None , 'flowrate' : None, 'starttime' : None, 'flowid' : None, 'endtime' : None} self.flowstats['txdatetime']=[] self.flowstats['txbytes']=[] self.flowstats['txthroughput']=[] self.flowstats['writes']=[] self.flowstats['errwrites']=[] self.flowstats['retry']=[] self.flowstats['cwnd']=[] self.flowstats['rtt']=[] self.flowstats['rxdatetime']=[] self.flowstats['rxbytes']=[] self.flowstats['rxthroughput']=[] self.flowstats['reads']=[] self.flowstats['histograms']=[] self.flowstats['histogram_names'] = set() self.flowstats['connect_time']=[] self.flowstats['trip_time']=[] self.flowstats['jitter']=[] self.flowstats['rxlostpkts']=[] self.flowstats['rxtotpkts']=[] self.flowstats['meanlat']=[] self.flowstats['minlat']=[] self.flowstats['maxlat']=[] self.flowstats['stdevlat']=[] self.flowstats['rxpps']=[] self.flowstats['inP']=[] self.flowstats['inPvar']=[] self.flowstats['rxpkts']=[] self.flowstats['netPower']=[] async def start(self): self.flowstats = {'current_rxbytes' : None , 'current_txbytes' : None , 'flowrate' : None, 'flowid' : None} await self.rx.start() await self.tx.start() async def is_traffic(self) : if self.interval < 0.005 : logging.warn('{} {}'.format(self.name, 'traffic check invoked without interval sampling')) else : self.rx.traffic_event.clear() self.tx.traffic_event.clear() logging.info('{} {}'.format(self.name, 'traffic check invoked')) await self.rx.traffic_event.wait() await self.tx.traffic_event.wait() async def transmit_completed(self) : logging.info('{} {}'.format(self.name, 'waiting for transmit to complete')) await self.tx.txcompleted.wait() async def stop(self): self.tx.stop() self.rx.stop() def stats(self): logging.info('stats') def compute_ks_table(self, plot=True, directory='.', title=None) : if len(self.histogram_names) < 1 : tmp = "***Failed. Expected 1 histogram_names, but instead got {0}".format(len(self.histogram_names)) logging.info(tmp) print(tmp) #raise for this_name in self.histogram_names : # group by name histograms = [h for h in self.histograms if h.name == this_name] for index, h in enumerate(histograms) : h.ks_index = index tmp = "{} KS Table has {} entries".format(self.name, len(histograms)) logging.info(tmp) print(tmp) self.condensed_distance_matrix = ([]) tasks = [] for rowindex, h1 in enumerate(histograms) : resultstr = rowindex * 'x' maxp = None minp = None for h2 in histograms[rowindex:] : d,p = stats.ks_2samp(h1.samples, h2.samples) if h1 is not h2 : self.condensed_distance_matrix = np.append(self.condensed_distance_matrix,d) logging.debug('D,p={},{} cp={}'.format(str(d),str(p), str(self.ks_critical_p))) if not minp or p < minp : minp = p if not maxp or (p != 1 and p > maxp) : maxp = p if p > self.ks_critical_p : resultstr += '1' else : resultstr += '0' if plot : tasks.append(asyncio.ensure_future(flow_histogram.plot_two_sample_ks(h1=h1, h2=h2, flowname=self.name, title=title, directory=directory), loop=iperf_flow.loop)) print('KS: {0}({1:3d}):{2} minp={3} ptest={4}'.format(this_name, rowindex, resultstr, str(minp), str(self.ks_critical_p))) logging.info('KS: {0}({1:3d}):{2} minp={3} ptest={4}'.format(this_name, rowindex, resultstr, str(minp), str(self.ks_critical_p))) if tasks : try : logging.debug('running KS table plotting coroutines for {} row {}'.format(this_name,str(rowindex))) iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=300)) except asyncio.TimeoutError: logging.error('plot timed out') raise logging.info('{} {}(condensed distance matrix)\n{}'.format(self.name, this_name,self.condensed_distance_matrix)) self.linkage_matrix=linkage(self.condensed_distance_matrix, 'ward') try : plt.figure(figsize=(18,10)) dn = hierarchy.dendrogram(self.linkage_matrix) plt.title("{} {}".format(self.name, this_name)) plt.savefig('{}/dn_{}_{}.png'.format(directory,self.name,this_name)) logging.info('{} {}(distance matrix)\n{}'.format(self.name, this_name,scipy.spatial.distance.squareform(self.condensed_distance_matrix))) print('{} {}(distance matrix)\n{}'.format(self.name, this_name,scipy.spatial.distance.squareform(self.condensed_distance_matrix))) print('{} {}(cluster linkage)\n{}'.format(self.name,this_name,self.linkage_matrix)) logging.info('{} {}(cluster linkage)\n{}'.format(self.name,this_name,self.linkage_matrix)) flattened=scipy.cluster.hierarchy.fcluster(self.linkage_matrix, 0.75*self.condensed_distance_matrix.max(), criterion='distance') print('{} {} Clusters:{}'.format(self.name, this_name, flattened)) logging.info('{} {} Clusters:{}'.format(self.name, this_name, flattened)) except: pass def dump_stats(self, directory='.') : logging.info("\n********************** dump_stats for flow {} **********************".format(self.name)) #logging.info('This flow Name={} id={} items_cnt={}'.format(iperf_flow.flowid2name[self.flowstats['flowid']], str(self.flowstats['flowid']), len(self.flowstats))) #logging.info('All flows Name and id: {}'.format(str(iperf_flow.flowid2name))) #logging.info('This flow Name={} flowstats={}'.format(self.name, str(self.flowstats))) csvfilename = os.path.join(directory, '{}.csv'.format(self.name)) if not os.path.exists(directory): logging.debug('Making results directory {}'.format(directory)) os.makedirs(directory) logging.info("Writing stats to '{}'".format(csvfilename)) for stat_name in [stat for stat in self.flowstats.keys() if stat != 'histograms'] : logging.info("{}={}".format(stat_name, str(self.flowstats[stat_name]))) with open(csvfilename, 'w', newline='') as fd : keynames = self.flowstats.keys() writer = csv.writer(fd) writer.writerow(keynames) writer.writerow([self.flowstats[keyname] for keyname in keynames]) writer.writerow([h.samples for h in self.flowstats['histograms']]) class iperf_server(object): class IperfServerProtocol(asyncio.SubprocessProtocol): def __init__(self, server, flow): self.__dict__['flow'] = flow self._exited = False self._closed_stdout = False self._closed_stderr = False self._mypid = None self._server = server self._stdoutbuffer = "" self._stderrbuffer = "" def __setattr__(self, attr, value): if attr in iperf_flow.flow_scope: self.flow.__setattr__(self.flow, attr, value) else: self.__dict__[attr] = value # methods and attributes not here are handled by the flow object, # aka, the flow object delegates to this object per composition def __getattr__(self, attr): if attr in iperf_flow.flow_scope: return getattr(self.flow, attr) @property def finished(self): return self._exited and self._closed_stdout and self._closed_stderr def signal_exit(self): if not self.finished: return self._server.closed.set() self._server.opened.clear() def connection_made(self, trans): self._server.closed.clear() self._mypid = trans.get_pid() logging.debug('server connection made pid=({})'.format(self._mypid)) def pipe_data_received(self, fd, data): if self.debug : logging.debug('{} {}'.format(fd, data)) data = data.decode("utf-8") if fd == 1: self._stdoutbuffer += data while "\n" in self._stdoutbuffer: line, self._stdoutbuffer = self._stdoutbuffer.split("\n", 1) self._server.adapter.info('{} (stdout,{})'.format(line, self._server.remotepid)) if not self._server.opened.is_set() : m = self._server.regex_open_pid.match(line) if m : self._server.remotepid = m.group('pid') self._server.opened.set() logging.debug('{} pipe reading (stdout,{})'.format(self._server.name, self._server.remotepid)) else : if self._server.proto == 'TCP' : m = self._server.regex_traffic.match(line) if m : timestamp = datetime.now() if not self._server.traffic_event.is_set() : self._server.traffic_event.set() bytes = float(m.group('bytes')) if self.flowstats['current_txbytes'] : flowrate = round((bytes / self.flowstats['current_txbytes']), 2) # *consume* the current *txbytes* where the client pipe will repopulate on its next sample # do this by setting the value to None self.flowstats['current_txbytes'] = None # logging.debug('{} flow ratio={:.2f}'.format(self._server.name, flowrate)) self.flowstats['flowrate'] = flowrate else : # *produce* the current *rxbytes* so the client pipe can know this event occurred # indicate this by setting the value to value self.flowstats['current_rxbytes'] = bytes self.flowstats['rxdatetime'].append(timestamp) self.flowstats['rxbytes'].append(m.group('bytes')) self.flowstats['rxthroughput'].append(m.group('throughput')) self.flowstats['reads'].append(m.group('reads')) else : m = self._server.regex_trip_time.match(line) if m : self.flowstats['trip_time'].append(float(m.group('trip_time')) * 1000) else : m = self._server.regex_traffic_udp.match(line) if m : timestamp = datetime.now() if not self._server.traffic_event.is_set() : self._server.traffic_event.set() self.flowstats['rxbytes'].append(m.group('bytes')) self.flowstats['rxthroughput'].append(m.group('throughput')) self.flowstats['jitter'].append(m.group('jitter')) self.flowstats['rxlostpkts'].append(m.group('lost_pkts')) self.flowstats['rxtotpkts'].append(m.group('tot_pkts')) self.flowstats['meanlat'].append(m.group('lat_mean')) self.flowstats['minlat'].append(m.group('lat_min')) self.flowstats['maxlat'].append(m.group('lat_max')) self.flowstats['stdevlat'].append(m.group('lat_stdev')) self.flowstats['rxpps'].append(m.group('pps')) self.flowstats['inP'].append(m.group('inP')) self.flowstats['inPvar'].append(m.group('inPvar')) self.flowstats['rxpkts'].append(m.group('pkts')) self.flowstats['netPower'].append(m.group('netPower')) m = self._server.regex_final_histogram_traffic.match(line) if m : timestamp = datetime.now(timezone.utc).astimezone() self.flowstats['endtime']= timestamp self.flowstats['histogram_names'].add(m.group('pdfname')) this_histogram = flow_histogram(name=m.group('pdfname'),values=m.group('pdf'), population=m.group('population'), binwidth=m.group('binwidth'), starttime=self.flowstats['starttime'], endtime=timestamp, outliers=m.group('outliers'), uci=m.group('uci'), uci_val=m.group('uci_val'), lci=m.group('lci'), lci_val=m.group('lci_val')) self.flowstats['histograms'].append(this_histogram) logging.info('pdf {} found with bin width={} us'.format(m.group('pdfname'), m.group('binwidth'))) elif fd == 2: self._stderrbuffer += data while "\n" in self._stderrbuffer: line, self._stderrbuffer = self._stderrbuffer.split("\n", 1) logging.info('{} {} (stderr)'.format(self._server.name, line)) m = self._server.regex_rx_bind_failed.match(line) if m : logging.error('RX Bind Failed. Check LAN / WLAN between server and client.') iperf_flow.loop.stop() raise def pipe_connection_lost(self, fd, exc): if fd == 1: self._closed_stdout = True logging.debug('stdout pipe to {} closed (exception={})'.format(self._server.name, exc)) elif fd == 2: self._closed_stderr = True logging.debug('stderr pipe to {} closed (exception={})'.format(self._server.name, exc)) if self._closed_stdout and self._closed_stderr : self.remotepid = None; self.signal_exit() def process_exited(self): logging.debug('subprocess with pid={} closed'.format(self._mypid)) self._exited = True self._mypid = None self.signal_exit() class CustomAdapter(logging.LoggerAdapter): def process(self, msg, kwargs): return '[%s] %s' % (self.extra['connid'], msg), kwargs def __init__(self, name='Server', loop=None, host='localhost', flow=None, debug=False): self.__dict__['flow'] = flow self.name = name self.iperf = '/usr/local/bin/iperf' self.ssh = '/usr/bin/ssh' self.host = host self.flow = flow self.debug = debug self.opened = asyncio.Event() self.closed = asyncio.Event() self.closed.set() self.traffic_event = asyncio.Event() self._transport = None self._protocol = None self.time = time conn_id = '{}'.format(self.name) self.adapter = self.CustomAdapter(logger, {'connid': conn_id}) # ex. [ 4] 0.00-0.50 sec 657090 Bytes 10513440 bits/sec 449 449:0:0:0:0:0:0:0 self.regex_traffic = re.compile(r'\[\s+\d+] (?P.*) sec\s+(?P[0-9]+) Bytes\s+(?P[0-9]+) bits/sec\s+(?P[0-9]+)') self.regex_traffic_udp = re.compile(r'\[\s+\d+] (?P.*) sec\s+(?P[0-9]+) Bytes\s+(?P[0-9]+) bits/sec\s+(?P[0-9.]+)\sms\s(?P[0-9]+)/(?P[0-9]+).+(?P[0-9.]+)/(?P[0-9.]+)/(?P[0-9.]+)/(?P[0-9.]+)\sms\s(?P[0-9]+)\spps\s+(?P[0-9\.]+)\/(?P[0-9]+)\((?P[0-9]+)\)\spkts\s(?P[0-9]+)') self.regex_final_histogram_traffic = re.compile(r'\[\s*\d+\] (?P.*) sec\s+(?P[A-Za-z0-9\-]+)\(f\)-PDF: bin\(w=(?P[0-9]+)us\):cnt\((?P[0-9]+)\)=(?P.+)\s+\((?P[0-9\.]+)/(?P[0-9\.]+)/(?P[0-9\.]+)%=(?P[0-9]+)/(?P[0-9]+)/(?P[0-9]+),Outliers=(?P[0-9]+),obl/obu=[0-9]+/[0-9]+\)') # 0.0000-0.5259 trip-time (3WHS done->fin+finack) = 0.5597 sec self.regex_trip_time = re.compile(r'.+trip\-time\s+\(3WHS\sdone\->fin\+finack\)\s=\s(?P\d+\.\d+)\ssec') self.regex_rx_bind_failed = re.compile(r'listener bind failed: Cannot assign requested address') def __getattr__(self, attr): return getattr(self.flow, attr) async def start(self, time=time): if not self.closed.is_set() : return # ex. Server listening on TCP port 61003 with pid 2565 self.regex_open_pid = re.compile(r'^Server listening on {} port {} with pid (?P\d+)'.format(self.proto, str(self.dstport))) self.opened.clear() self.remotepid = None if time : iperftime = time + 30 self.sshcmd=[self.ssh, self.user + '@' + self.host, self.iperf, '-s', '-p ' + str(self.dstport), '-P 1', '-e', '-t ' + str(iperftime), '-f{}'.format(self.format), '-w' , self.window, '--realtime'] else : self.sshcmd=[self.ssh, self.user + '@' + self.host, self.iperf, '-s', '-p ' + str(self.dstport), '-P 1', '-e', '-f{}'.format(self.format), '-w' , self.window, '--realtime'] if self.interval >= 0.005 : self.sshcmd.extend(['-i ', str(self.interval)]) if self.server_device and self.srcip : self.sshcmd.extend(['-B ', '{}%{}'.format(self.dstip, self.server_device)]) if self.proto == 'UDP' : self.sshcmd.extend(['-u']) if self.latency : self.sshcmd.extend(['--histograms=100u,100000,50,95']) self.sshcmd.extend(['--jitter-histograms']) logging.info('{}'.format(str(self.sshcmd))) self._transport, self._protocol = await iperf_flow.loop.subprocess_exec(lambda: self.IperfServerProtocol(self, self.flow), *self.sshcmd) await self.opened.wait() async def signal_stop(self): if self.remotepid and not self.finished : childprocess = await asyncio.create_subprocess_exec(self.ssh, '{}@{}'.format(self.user, self.host), 'kill', '-HUP', '{}'.format(self.remotepid), stdout=subprocess.PIPE, stderr=subprocess.STDOUT) logging.debug('({}) sending signal HUP to {} (pid={})'.format(self.user, self.host, self.remotepid)) stdout, _ = await childprocess.communicate() if stdout: logging.info('kill remote pid {} {}({}) {}'.format(self.remotepid, self.user, self.host, stdout)) if not self.closed.is_set() : await self.closed.wait() logging.info('await kill completed remote pid {} {}({}) {}'.format(self.remotepid, self.user, self.host, stdout)) logging.info('kill remote pid {} {}({}) {}'.format(self.remotepid, self.user, self.host, stdout)) class iperf_client(object): # Asyncio protocol for subprocess transport class IperfClientProtocol(asyncio.SubprocessProtocol): def __init__(self, client, flow): self.__dict__['flow'] = flow self._exited = False self._closed_stdout = False self._closed_stderr = False self._mypid = None self._client = client self._stdoutbuffer = "" self._stderrbuffer = "" def __setattr__(self, attr, value): if attr in iperf_flow.flow_scope: self.flow.__setattr__(self.flow, attr, value) else: self.__dict__[attr] = value def __getattr__(self, attr): if attr in iperf_flow.flow_scope: return getattr(self.flow, attr) @property def finished(self): return self._exited and self._closed_stdout and self._closed_stderr def signal_exit(self): if not self.finished: return self._client.closed.set() self._client.opened.clear() self._client.txcompleted.set() def connection_made(self, trans): self._client.closed.clear() self._mypid = trans.get_pid() logging.debug('client connection made pid=({})'.format(self._mypid)) def pipe_data_received(self, fd, data): if self.debug : logging.debug('{} {}'.format(fd, data)) data = data.decode("utf-8") if fd == 1: self._stdoutbuffer += data while "\n" in self._stdoutbuffer: line, self._stdoutbuffer = self._stdoutbuffer.split("\n", 1) self._client.adapter.info('{} (stdout,{})'.format(line, self._client.remotepid)) if not self._client.opened.is_set() : m = self._client.regex_open_pid.match(line) if m : self._client.opened.set() self._client.remotepid = m.group('pid') self.flowstats['starttime'] = datetime.now(timezone.utc).astimezone() logging.debug('{} pipe reading at {} (stdout,{})'.format(self._client.name, self.flowstats['starttime'].isoformat(), self._client.remotepid)) else : if self.flowstats['flowid'] is None : m = self._client.regex_flowid.match(line) if m : # [ 1] local 192.168.1.15%enp1s0 port 7001 connected with 192.168.1.232 port 7001 (trip-times) (sock=3) on 2021-10-11 14:39:45 (PDT) # self.regex_flowid = re.compile(r'local\s(?P[0-9]{0,3}\.[0-9]{0,3}\.[0-9]{0,3}\.[0-9]{0,3}).*\sport\s(?P[0-9]+)\sconnected with\s(?P[0-9]{0,3}\.[0-9]{0,3}\.[0-9]{0,3}\.[0-9]{0,3})\sport\s(?P[0-9]+)') # # temp = htonl(config->src_ip); # checksum ^= bcm_compute_xor32((volatile uint32 *)&temp, sizeof(temp) / sizeof(uint32)); # temp = htonl(config->dst_ip); # checksum ^= bcm_compute_xor32((volatile uint32 *)&temp, sizeof(temp) / sizeof(uint32)); # temp = (hton16(config->dst_port) << 16) | hton16(config->src_port); # checksum ^= bcm_compute_xor32((volatile uint32 *)&temp, sizeof(temp) / sizeof(uint32)); # temp = config->proto; # checksum ^= bcm_compute_xor32((volatile uint32 *)&temp, sizeof(temp) / sizeof(uint32)); # return "%08x" % netip # NOTE: the network or big endian byte order srcipaddr = ipaddress.ip_address(m.group('srcip')) srcip32 = ctypes.c_uint32(int.from_bytes(srcipaddr.packed, byteorder='little', signed=False)) dstipaddr = ipaddress.ip_address(m.group('dstip')) dstip32 = ctypes.c_uint32(int.from_bytes(dstipaddr.packed, byteorder='little', signed=False)) dstportbytestr = int(m.group('dstport')).to_bytes(2, byteorder='big', signed=False) dstport16 = ctypes.c_uint16(int.from_bytes(dstportbytestr, byteorder='little', signed=False)) srcportbytestr = int(m.group('srcport')).to_bytes(2, byteorder='big', signed=False) srcport16 = ctypes.c_uint16(int.from_bytes(srcportbytestr, byteorder='little', signed=False)) ports32 = ctypes.c_uint32((dstport16.value << 16) | srcport16.value) if self._client.proto == 'UDP': proto32 = ctypes.c_uint32(0x11) else : proto32 = ctypes.c_uint32(0x06) quintuplehash = srcip32.value ^ dstip32.value ^ ports32.value ^ proto32.value self.flowstats['flowid'] = '0x{:08x}'.format(quintuplehash) if self._client.flow.name : flowkey = self._client.flow.name else : flowkey = '0x{:08x}'.format(quintuplehash) iperf_flow.flowid2name[self.flowstats['flowid']] = flowkey logging.info('Flow quintuple hash of {} uses name {}'.format(self.flowstats['flowid'], flowkey)) if self._client.proto == 'TCP': m = self._client.regex_traffic.match(line) if m : timestamp = datetime.now() if not self._client.traffic_event.is_set() : self._client.traffic_event.set() bytes = float(m.group('bytes')) if self.flowstats['current_rxbytes'] : flowrate = round((self.flowstats['current_rxbytes'] / bytes), 2) # *consume* the current *rxbytes* where the server pipe will repopulate on its next sample # do this by setting the value to None self.flowstats['current_rxbytes'] = None # logging.debug('{} flow ratio={:.2f}'.format(self._client.name, flowrate)) self.flowstats['flowrate'] = flowrate else : # *produce* the current txbytes so the server pipe can know this event occurred # indicate this by setting the value to value self.flowstats['current_txbytes'] = bytes self.flowstats['txdatetime'].append(timestamp) self.flowstats['txbytes'].append(m.group('bytes')) self.flowstats['txthroughput'].append(m.group('throughput')) self.flowstats['writes'].append(m.group('writes')) self.flowstats['errwrites'].append(m.group('errwrites')) self.flowstats['retry'].append(m.group('retry')) self.flowstats['cwnd'].append(m.group('cwnd')) self.flowstats['rtt'].append(m.group('rtt')) else : m = self._client.regex_connect_time.match(line) if m : self.flowstats['connect_time'].append(float(m.group('connect_time'))) else : pass elif fd == 2: self._stderrbuffer += data while "\n" in self._stderrbuffer: line, self._stderrbuffer = self._stderrbuffer.split("\n", 1) logging.info('{} {} (stderr)'.format(self._client.name, line)) m = self._client.regex_tx_bind_failed.match(line) if m : logging.error('TX Bind Failed. Check LAN / WLAN between server and client.') iperf_flow.loop.stop() raise def pipe_connection_lost(self, fd, exc): if fd == 1: logging.debug('stdout pipe to {} closed (exception={})'.format(self._client.name, exc)) self._closed_stdout = True elif fd == 2: logging.debug('stderr pipe to {} closed (exception={})'.format(self._client.name, exc)) self._closed_stderr = True self.signal_exit() def process_exited(self): logging.debug('subprocess with pid={} closed'.format(self._mypid)) self._exited = True self._mypid = None self.signal_exit() class CustomAdapter(logging.LoggerAdapter): def process(self, msg, kwargs): return '[%s] %s' % (self.extra['connid'], msg), kwargs def __init__(self, name='Client', loop=None, host='localhost', flow = None, debug=False): self.__dict__['flow'] = flow self.opened = asyncio.Event() self.closed = asyncio.Event() self.txcompleted = asyncio.Event() self.closed.set() self.txcompleted.clear() self.traffic_event = asyncio.Event() self.name = name self.iperf = '/usr/local/bin/iperf' self.ssh = '/usr/bin/ssh' self.host = host self.debug = debug self.flow = flow self._transport = None self._protocol = None conn_id = '{}'.format(self.name) self.adapter = self.CustomAdapter(logger, {'connid': conn_id}) # traffic ex: [ 3] 0.00-0.50 sec 655620 Bytes 10489920 bits/sec 14/211 446 446K/0 us self.regex_traffic = re.compile(r'\[\s+\d+] (?P.*) sec\s+(?P\d+) Bytes\s+(?P\d+) bits/sec\s+(?P\d+)/(?P\d+)\s+(?P\d+)\s+(?P\d+)K/(?P\d+) us') self.regex_connect_time = re.compile(r'\[\s+\d+]\slocal.*\(ct=(?P\d+\.\d+) ms\)') # local 192.168.1.4 port 56949 connected with 192.168.1.1 port 61001 self.regex_flowid = re.compile(r'\[\s+\d+]\slocal\s(?P[0-9]{0,3}\.[0-9]{0,3}\.[0-9]{0,3}\.[0-9]{0,3}).*\sport\s(?P[0-9]+)\sconnected with\s(?P[0-9]{0,3}\.[0-9]{0,3}\.[0-9]{0,3}\.[0-9]{0,3})\sport\s(?P[0-9]+)') self.regex_tx_bind_failed = re.compile(r'bind failed: Cannot assign requested address') def __getattr__(self, attr): return getattr(self.flow, attr) async def start(self, time=None, amount=None, parallel=None, epoch_sync_time=None): if not self.closed.is_set() : return self.opened.clear() self.txcompleted.clear() self.remotepid = None self.flowstats['flowid']=None # Client connecting to 192.168.100.33, TCP port 61009 with pid 1903 self.regex_open_pid = re.compile(r'Client connecting to .*, {} port {} with pid (?P\d+)'.format(self.proto, str(self.dstport))) if self.client_device : client_dst = self.dstip + '%' + self.client_device else : client_dst = self.dstip self.sshcmd=[self.ssh, self.user + '@' + self.host, self.iperf, '-c', client_dst, '-p ' + str(self.dstport), '-e', '-f{}'.format(self.format), '-w' , self.window ,'--realtime'] if self.tcp_tx_delay : self.sshcmd.extend(['--tcp-tx-delay', self.tcp_tx_delay]) if self.tos : self.sshcmd.extend(['-S ', self.tos]) if self.length : self.sshcmd.extend(['-l ', str(self.length)]) if time: self.sshcmd.extend(['-t ', str(time)]) elif amount: iperftime = time self.sshcmd.extend(['-n ', amount]) if parallel : self.sshcmd.extend(['-P', str(parallel)]) if self.trip_times : self.sshcmd.extend(['--trip-times']) if self.prefetch : self.sshcmd.extend(['--tcp-write-prefetch', self.prefetch]) self.sshcmd.extend(['--histograms=1m,100000,5,95']) if self.srcip : if self.srcport : self.sshcmd.extend(['-B ', '{}:{}'.format(self.srcip, self.srcport)]) else : self.sshcmd.extend(['-B {}'.format(self.srcip)]) if self.cca : self.sshcmd.extend(['-Z ', self.cca]) if self.interval >= 0.005 : self.sshcmd.extend(['-i ', str(self.interval)]) if self.proto == 'UDP' : self.sshcmd.extend(['-u ']) if self.isoch : self.sshcmd.extend(['--isochronous=' + self.offered_load, ' --ipg ', str(self.ipg)]) elif self.offered_load : self.sshcmd.extend(['-b', self.offered_load]) elif self.proto == 'TCP' and self.offered_load : self.sshcmd.extend(['-b', self.offered_load]) elif self.proto == 'TCP' and self.burst_size and self.burst_period : self.sshcmd.extend(['--burst-size', str(self.burst_size)]) self.sshcmd.extend(['--burst-period', str(self.burst_period)]) elif self.proto == 'TCP' and self.bb : self.sshcmd.extend(['--bounceback']) self.sshcmd.extend(['--bounceback-hold', str(self.bb_hold)]) self.sshcmd.extend(['--bounceback-period', str(self.bb_period)]) elif self.proto == 'TCP' and self.offered_load : self.sshcmd.extend(['-b', self.offered_load]) if not self.bb and self.fullduplex : self.sshcmd.extend(['--full-duplex', str(" ")]) if self.flow.bb : self.sshcmd.extend(['--bounceback']) if self.flow.working_load : self.sshcmd.extend(['--working-load']) if epoch_sync_time : self.sshcmd.extend(['--txstart-time', str(epoch_sync_time)]) elif self.txstart_delay_sec : # use incoming txstart_delay_sec and convert it to epoch_time_sec to use with '--txstart-time' iperf parameter logging.info('{}'.format(str(datetime.now()))) epoch_time_sec = (datetime.now()).timestamp() logging.info('Current epoch_time_sec = {}'.format(str(epoch_time_sec))) new_txstart_time = epoch_time_sec + self.txstart_delay_sec logging.info('new_txstart_time = {}'.format(str(new_txstart_time))) self.sshcmd.extend(['--txstart-time', str(new_txstart_time)]) logging.info('{}'.format(str(self.sshcmd))) try : self._transport, self._protocol = await iperf_flow.loop.subprocess_exec(lambda: self.IperfClientProtocol(self, self.flow), *self.sshcmd) await self.opened.wait() except: logging.error('flow client start error per: {}'.format(str(self.sshcmd))) pass async def signal_stop(self): if self.remotepid and not self.finished : childprocess = await asyncio.create_subprocess_exec(self.ssh, '{}@{}'.format(self.user, self.host), 'kill', '-HUP', '{}'.format(self.remotepid), stdout=subprocess.PIPE, stderr=subprocess.STDOUT) logging.debug('({}) sending signal HUP to {} (pid={})'.format(self.user, self.host, self.remotepid)) stdout, _ = await childprocess.communicate() if stdout: logging.info('{}({}) {}'.format(self.user, self.host, stdout)) if not self.closed.is_set(): await self.closed.wait() async def signal_pause(self): if self.remotepid : childprocess = await asyncio.create_subprocess_exec(self.ssh, '{}@{}'.format(self.user, self.host), 'kill', '-STOP', '{}'.format(self.remotepid), stdout=subprocess.PIPE, stderr=subprocess.STDOUT) logging.debug('({}) sending signal STOP to {} (pid={})'.format(self.user, self.host, self.remotepid)) stdout, _ = await childprocess.communicate() if stdout: logging.info('{}({}) {}'.format(self.user, self.host, stdout)) if not self.closed.is_set(): await self.closed.wait() async def signal_resume(self): if self.remotepid : childprocess = await asyncio.create_subprocess_exec(self.ssh, '{}@{}'.format(self.user, self.host), 'kill', '-CONT', '{}'.format(self.remotepid), stdout=subprocess.PIPE, stderr=subprocess.STDOUT) logging.debug('({}) sending signal CONT to {} (pid={})'.format(self.user, self.host, self.remotepid)) stdout, _ = await childprocess.communicate() if stdout: logging.info('{}({}) {}'.format(self.user, self.host, stdout)) if not self.closed.is_set(): await self.closed.wait() class flow_histogram(object): @classmethod async def plot_two_sample_ks(cls, h1=None, h2=None, outputtype='png', directory='.', flowname=None, title=None): lci_val = int(h2.lci_val) * h2.binwidth uci_val = int(h2.uci_val) * h2.binwidth mytitle = '{} {} two sample KS({},{}) ({} samples) {}/{}%={}/{} us outliers={}\\n{}'.format(flowname, h1.name, h1.ks_index, h2.ks_index, h2.population, h2.lci, h2.uci, lci_val, uci_val, h2.outliers, title) if h1.basefilename is None : h1.output_dir = directory + '/' + flowname + h1.name + '/' + h1.name + '_' + str(h1.ks_index) await h1.write(directory=h1.output_dir) if h2.basefilename is None : h2.output_dir = directory + '/' + flowname + h2.name + '/' + h2.name + '_' + str(h2.ks_index) await h2.write(directory=h2.output_dir) if (h1.basefilename is not None) and (h2.basefilename is not None) : basefilename = '{}_{}_{}'.format(h1.basefilename, h1.ks_index, h2.ks_index) gpcfilename = basefilename + '.gpc' #write out the gnuplot control file with open(gpcfilename, 'w') as fid : if outputtype == 'canvas' : fid.write('set output \"{}.{}\"\n'.format(basefilename, 'html')) fid.write('set terminal canvas standalone mousing size 1024,768\n') if outputtype == 'svg' : fid.write('set output \"{}_svg.{}\"\n'.format(basefilename, 'html')) fid.write('set terminal svg size 1024,768 dynamic mouse\n') else : fid.write('set output \"{}.{}\"\n'.format(basefilename, 'png')) fid.write('set terminal png size 1024,768\n') fid.write('set key bottom\n') fid.write('set title \"{}\" noenhanced\n'.format(mytitle)) if float(uci_val) < 400: fid.write('set format x \"%.2f"\n') else : fid.write('set format x \"%.1f"\n') fid.write('set format y \"%.1f"\n') fid.write('set yrange [0:1.01]\n') fid.write('set y2range [0:*]\n') fid.write('set ytics add 0.1\n') fid.write('set y2tics nomirror\n') fid.write('set grid\n') fid.write('set xlabel \"time (ms)\\n{} - {}\"\n'.format(h1.starttime, h2.endtime)) default_minx = -0.5 if float(uci_val) < 0.4: fid.write('set xrange [{}:0.4]\n'.format(default_minx)) fid.write('set xtics auto\n') elif h1.max < 2.0 and h2.max < 2.0 : fid.write('set xrange [{}:2]\n'.format(default_minx)) fid.write('set xtics auto\n') elif h1.max < 5.0 and h2.max < 5.0 : fid.write('set xrange [{}:5]\n'.format(default_minx)) fid.write('set xtics auto\n') elif h1.max < 10.0 and h2.max < 10.0: fid.write('set xrange [{}:10]\n'.format(default_minx)) fid.write('set xtics add 1\n') elif h1.max < 20.0 and h2.max < 20.0 : fid.write('set xrange [{}:20]\n'.format(default_minx)) fid.write('set xtics add 1\n') fid.write('set format x \"%.0f"\n') elif h1.max < 40.0 and h2.max < 40.0: fid.write('set xrange [{}:40]\n'.format(default_minx)) fid.write('set xtics add 5\n') fid.write('set format x \"%.0f"\n') elif h1.max < 50.0 and h2.max < 50.0: fid.write('set xrange [{}:50]\n'.format(default_minx)) fid.write('set xtics add 5\n') fid.write('set format x \"%.0f"\n') elif h1.max < 75.0 and h2.max < 75.0: fid.write('set xrange [{}:75]\n'.format(default_minx)) fid.write('set xtics add 5\n') fid.write('set format x \"%.0f"\n') elif h1.max < 100.0 and h2.max < 100.0 : fid.write('set xrange [{}:100]\n'.format(default_minx)) fid.write('set xtics add 10\n') fid.write('set format x \"%.0f"\n') else : fid.write('set xrange [{}:*]\n'.format(default_minx)) fid.write('set xtics auto\n') fid.write('set format x \"%.0f"\n') fid.write('plot \"{0}\" using 1:2 index 0 axes x1y2 with impulses linetype 3 notitle, \"{1}\" using 1:2 index 0 axes x1y2 with impulses linetype 2 notitle, \"{1}\" using 1:3 index 0 axes x1y1 with lines linetype 1 linewidth 2 notitle, \"{0}\" using 1:3 index 0 axes x1y1 with lines linetype -1 linewidth 2 notitle\n'.format(h1.datafilename, h2.datafilename)) childprocess = await asyncio.create_subprocess_exec(flow_histogram.gnuplot,gpcfilename, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) stdout, stderr = await childprocess.communicate() if stderr : logging.error('Exec {} {}'.format(flow_histogram.gnuplot, gpcfilename)) else : logging.debug('Exec {} {}'.format(flow_histogram.gnuplot, gpcfilename)) gnuplot = '/usr/bin/gnuplot' def __init__(self, binwidth=None, name=None, values=None, population=None, starttime=None, endtime=None, title=None, outliers=None, lci = None, uci = None, lci_val = None, uci_val = None) : self.raw = values self._entropy = None self._ks_1samp_dist = None self.bins = self.raw.split(',') self.name = name self.ks_index = None self.population = int(population) self.samples = np.zeros(int(self.population)) self.binwidth = int(binwidth) self.createtime = datetime.now(timezone.utc).astimezone() self.starttime=starttime self.endtime=endtime self.title=title self.outliers=outliers self.uci = uci self.uci_val = uci_val self.lci = lci self.lci_val = lci_val self.basefilename = None ix = 0 for bin in self.bins : x,y = bin.split(':') for i in range(int(y)) : self.samples[ix] = x ix += 1 @property def entropy(self) : if not self._entropy : self._entropy = 0 for bin in self.bins : x,y = bin.split(':') y1 = float(y) / float(self.population) self._entropy -= y1 * math.log2(y1) return self._entropy @property def ks_1samp_dist(self): if not self._ks_1samp_dist : self._ks_1samp_dist,p = stats.ks_1samp(self.samples, stats.norm.cdf) return self._ks_1samp_dist @property def ampdu_dump(self) : return self._ampdu_rawdump @ampdu_dump.setter def ampdu_dump(self, value): self._ampdu_rawdump = value async def __exec_gnuplot(self) : logging.info('Plotting {} {}'.format(self.name, self.gpcfilename)) childprocess = await asyncio.create_subprocess_exec(flow_histogram.gnuplot, self.gpcfilename, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) stdout, stderr = await childprocess.communicate() if stderr : logging.error('Exec {} {}'.format(flow_histogram.gnuplot, self.gpcfilename)) else : logging.debug('Exec {} {}'.format(flow_histogram.gnuplot, self.gpcfilename)) async def write(self, directory='.', filename=None) : # write out the datafiles for the plotting tool, e.g. gnuplot if filename is None: filename = self.name if not os.path.exists(directory): logging.debug('Making results directory {}'.format(directory)) os.makedirs(directory) logging.debug('Writing {} results to directory {}'.format(directory, filename)) basefilename = os.path.join(directory, filename) datafilename = os.path.join(directory, filename + '.data') self.max = None with open(datafilename, 'w') as fid : cummulative = 0.0 for bin in self.bins : x,y = bin.split(':') #logging.debug('bin={} x={} y={}'.format(bin, x, y)) if (float(y) > 1.0) or ((cummulative / float(self.population)) < 0.99) : cummulative += float(y) perc = cummulative / float(self.population) self.max = float(x) * float(self.binwidth) / 1000.0 # max is the last value fid.write('{} {} {}\n'.format((float(x) * float(self.binwidth) / 1000.0), int(y), perc)) self.basefilename = basefilename self.datafilename = datafilename async def async_plot(self, title=None, directory='.', outputtype='png', filename=None) : if self.basefilename is None : await self.write(directory=directory, filename=filename) if self.basefilename is not None : self.gpcfilename = self.basefilename + '.gpc' #write out the gnuplot control file with open(self.gpcfilename, 'w') as fid : if outputtype == 'canvas' : fid.write('set output \"{}.{}\"\n'.format(basefilename, 'html')) fid.write('set terminal canvas standalone mousing size 1024,768\n') if outputtype == 'svg' : fid.write('set output \"{}_svg.{}\"\n'.format(basefilename, 'html')) fid.write('set terminal svg size 1024,768 dynamic mouse\n') else : fid.write('set output \"{}.{}\"\n'.format(basefilename, 'png')) fid.write('set terminal png size 1024,768\n') if not title and self.title : title = self.title fid.write('set key bottom\n') if self.ks_index is not None : fid.write('set title \"{}({}) {}({}) E={}\" noenhanced\n'.format(self.name, str(self.ks_index), title, int(self.population), self.entropy)) else : fid.write('set title \"{}{}({}) E={}\" noenhanced\n'.format(self.name, title, int(self.population), self.entropy)) fid.write('set format x \"%.0f"\n') fid.write('set format y \"%.1f"\n') fid.write('set yrange [0:1.01]\n') fid.write('set y2range [0:*]\n') fid.write('set ytics add 0.1\n') fid.write('set y2tics nomirror\n') fid.write('set grid\n') fid.write('set xlabel \"time (ms)\\n{} - {}\"\n'.format(self.starttime, self.endtime)) if self.max < 5.0 : fid.write('set xrange [0:5]\n') fid.write('set xtics auto\n') elif self.max < 10.0 : fid.write('set xrange [0:10]\n') fid.write('set xtics add 1\n') elif self.max < 20.0 : fid.write('set xrange [0:20]\n') fid.write('set xtics add 1\n') elif self.max < 40.0 : fid.write('set xrange [0:40]\n') fid.write('set xtics add 5\n') elif self.max < 50.0 : fid.write('set xrange [0:50]\n') fid.write('set xtics add 5\n') elif self.max < 75.0 : fid.write('set xrange [0:75]\n') fid.write('set xtics add 5\n') else : fid.write('set xrange [0:100]\n') fid.write('set xtics add 10\n') fid.write('plot \"{0}\" using 1:2 index 0 axes x1y2 with impulses linetype 3 notitle, \"{0}\" using 1:3 index 0 axes x1y1 with lines linetype -1 linewidth 2 notitle\n'.format(datafilename)) if outputtype == 'png' : # Create a thumbnail too fid.write('unset output; unset xtics; unset ytics; unset key; unset xlabel; unset ylabel; unset border; unset grid; unset yzeroaxis; unset xzeroaxis; unset title; set lmargin 0; set rmargin 0; set tmargin 0; set bmargin 0\n') fid.write('set output \"{}_thumb.{}\"\n'.format(basefilename, 'png')) fid.write('set terminal png transparent size 64,32 crop\n') fid.write('plot \"{0}\" using 1:2 index 0 axes x1y2 with impulses linetype 3 notitle, \"{0}\" using 1:3 index 0 axes x1y1 with lines linetype -1 linewidth 2 notitle\n'.format(datafilename)) await self.__exec_gnuplot()