TS隨機測試Chunked工具

Server端:從文件讀取body_size,構造任意隨機分片的chunked響應 javascript

test端:把隨機body_size寫入文件,循環進行curl請求,獲取隨機Server的隨機chunked html

socketServer.py java

from socket import *
from time import sleep
import random
import gzip
import datetime
import re
import os
import commands
from cStringIO import StringIO

HOST_G = gethostbyname(gethostname())
PORT_G = 80
CRLF = "\r\n"

class setDate():
    def timenow(self):
        GMT_FORMAT = '%a, %d %b %Y %H:%M:%S GMT'
        return datetime.datetime.utcnow().strftime(GMT_FORMAT)
    def compress_gzip(self,uri):
        buf=StringIO()
        f=gzip.GzipFile(mode="wb", fileobj=buf)
        try:
                f.write(uri)
                gzip_str = buf.getvalue()
        finally:
                f.close()
        gzip_str = "\037\213" +"\10"+"\00"*7 +gzip_str[10:]
        return gzip_str


    def get_chunked(self,uri,type=1):


        dlen = commands.getoutput("cat ttb.txt")
        clen = int(dlen)          
        slc=[]
        b=[]
        cklist=[]
        if type ==1:
            gbody = ((self.compress_gzip(uri))*((clen+len(uri))/(len(uri))))[:clen] 
        else:
            gbody = (uri*((clen+len(uri))/len(uri)))[:clen]        
        #print "body-size =",gbody_len
        for i in range (gbody_len):
            # slice 0~10k
            slc.append(min(random.randint(1,gbody_len-sum(slc)),10240))
            b.append(gbody[len("".join(b)):len("".join(b))+slc[i]])
            if sum(slc) == gbody_len:
                 break
        slc.append(0)
        b.append(CRLF)
        [cklist.append(hex(slc[i])[2:]+CRLF+b[i]+CRLF) for i in range(len(slc))]
 
        print "#################"
        print "silce in ",slc
        print "#################"
        
        return cklist


class ServerMock():
    
    def __init__(self):
        self.dt = setDate() 
        self.header="""HTTP/1.1 200 OK
Server: Tengine
Date: %s
Content-Type: application/x-javascript
Last-Modified: Mon, 29 Jul 2013 03:56:46 GMT
Connection: keep-alive
Vary: Accept-Encoding
Cache-Control: public,max-age=3600
Content-Encoding: gzip
Transfer-Encoding: chunked\r\n\r\n"""%self.dt.timenow()
 
    def _CreateNewSocket(self,hst, prt):
        sock = socket(AF_INET, SOCK_STREAM)
        sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
        sock.bind((hst, prt))
        sock.listen(10)
        return sock
    
    def server_forever(self):
        sock =self._CreateNewSocket(HOST_G, PORT_G)
        while True:
            connection,address = sock.accept()
            try:
                buf = connection.recv(2048)
                uri =  (buf.split()[1])[1:]
                if not uri:
                    uri = ""
                    #uri = "index.html"


                connection.send(self.header)
                if re.findall("Accept-Encoding: gzip",buf,re.I):     
                    ss= self.dt.get_chunked(uri,type=1)
                else:
                    ss= self.dt.get_chunked(uri,type=0)
                for k in range(len(ss)):
                    #print "###################################slice %d,each sice length = %s,########"%(k,ss[k].split(CRLF)[0])
                    connection.sendall(ss[k])
                    #sleep(3)
                    if k == (len(ss)-1):
                         print "###################################slice %d,each sice length = %s,########"%(k,ss[k].split(CRLF)[0])              
            except timeout:
                print 'time out'
            finally:
                connection.close()
                print "response achive...."


if __name__ == "__main__":
     server = ServerMock()
     server.server_forever() 


test.py python

import time
import os
import sys
import random
import re
import commands
import gzip
from socketServer import ServerMock,HOST_G
from cStringIO import StringIO


# tsIP in cluster
tsIP = ["10.125.202.112","10.125.204.84","10.125.204.86"]
tsPort=8080
targetURL = "http://%s/"%HOST_G

#compress Gzip
def compress_gzip (puredata):
        buf=StringIO()
        f=gzip.GzipFile(mode="wb", fileobj=buf)
        try:
                f.write(puredata)
                gzip_str = buf.getvalue()
        finally:
                f.close()
        gzip_str = "\037\213" +"\10"+"\00"*7 +gzip_str[10:]
        return gzip_str

def assert_case(output,exhead,exbody):
    head = output.split("\r\n\r\n")[0]
    body = "\r\n\r\n".join(output.split("\r\n\r\n")[1:])
    #print body
    print head.split("\r\n")[-1:]

    if not re.findall(exhead,head,re.I):
         print "head Err"
         print "exhead =",exhead
         print "achead =",head
         sys.exit(1)
    elif body!=exbody:
         print "body Err"
         #print "exbody =",repr(exbody)
         #print "acbody =",repr(body)
         print "exbody =",len(exbody)
         print "acbody =",len(body)
         sys.exit(1)
    else:
         print "pass"
         
def write_file(cont):
    file = "ttb.txt"
    out = open(file,'w')
    try:
        out.write(cont)
    finally:
        out.close()

def exec_case(k):
    
    uri = "".join(random.sample(list("123456789abceefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTURWXYZ"), 5))
    
    clen = random.randint(1,10485760)
    write_file(str(clen))
    print "It is case%d,cont-length=%s"%(k,clen)
   
    while(True):
        cmd = "curl -i -s -x  %s:%d %s%s -X PURGE" %(tsIP[k%2], tsPort,targetURL,uri)
        print cmd
        output =commands.getoutput(cmd)
        if "404"==output.split()[1]:
            break;

    rqH = ["","Accept-Encoding: gzip","","Accept-Encoding: gzip"]
    rtH = [".*cMsSfW.*",".*cMsSfW.*",".*cHs f .*|.*cRs f .*",".*cHs f .*|.*cRs f .*"]
    for i in range(4): 
        print "[curl in %d times begin,time %s]"%(i,time.clock())
        if i in(0,2):
            exbody= (uri*((clen+len(uri))/len(uri)))[:clen]         
        else:
            exbody=((compress_gzip(uri))*((clen+len(uri))/(len(uri))))[:clen] 
        cmd = "curl -i -s -m 120 -x  %s:%d %s%s -H '%s'" %(tsIP[k%2], tsPort,targetURL,uri,rqH[i])
        print cmd
        status,output = commands.getstatusoutput(cmd)
        time.sleep(1)
        assert_case(output,rtH[i],exbody)
        print "[curl in %d times achive,time %s]"%(i,time.clock())
    print "It is case%d achievd"%k

if __name__ == "__main__":
    server = ServerMock()
    import thread
    thread.start_new_thread(server.server_forever,())
    print "suite case begin..."
    
    for k in range(1,301):
        exec_case(k)
    time.sleep(3)
    print "[suite case achived]........"
相關文章
相關標籤/搜索