流量回放工具之goreplay output-凯发k8国际娱乐官网入口

zuozewei 发表于 2021/11/06 12:00:16 2021/11/06
【摘要】 goreplay 对数据流的抽象出了两个概念,即用 输入(input )和 输出(output )来表示数据来源与去向,统称为 plugin,用介于输入和输出模块之间的中间件实现拓展机制。

前言

goreplay 对数据流的抽象出了两个概念,即用 输入(input )输出(output ) 来表示数据来源与去向,统称为 plugin,用介于输入和输出模块之间的中间件实现拓展机制。

output_http.go:主要是http输出的插件,实现 http 协议, 实现 io.writer 接口,最后根据配置注册到 plugin.outputs 队列里。

参数说明

-output-http value  //转发进入的请求到一个http地址上
        forwards incoming requests to given http address.
                # redirect all incoming requests to staging.com address 
                gor --input-raw :80 --output-http http://staging.com
  -output-http-elasticsearch string  //把请求和响应状态发送到 elasticsearch
        send request and response stats to elasticsearch:
                gor --input-raw :8080 --output-http staging.com --output-http-elasticsearch 'es_host:api_port/index_name'
  -output-http-queue-len int //http输出队列大小
        number of requests that can be queued for output, if all workers are busy. default = 1000 (default 1000)
  -output-http-redirects int  // 设置多少次重定向被允许,默认忽略
        enable how often redirects should be followed.
  -output-http-response-buffer value  //最大接收响应大小(缓冲区)
        http response buffer size, all data after this size will be discarded.
  -output-http-skip-verify
        don't verify hostname on tls secure connection.
  -output-http-stats  //每5秒钟输出一次输出队列的状态
        report http output queue stats to console every n milliseconds. see output-http-stats-ms
  -output-http-stats-ms int
        report http output queue stats to console every n milliseconds. default: 5000 (default 5000)
  -output-http-timeout duration  //指定 http 的 request/response 超时时间,默认是 5 秒 
        specify http request/response timeout. by default 5s. example: --output-http-timeout 30s (default 5s)
  -output-http-track-response
        if turned on, http output responses will be set to all outputs like stdout, file and etc.
  -output-http-worker-timeout duration
        duration to rollback idle workers. (default 2s)
  -output-http-workers int  //gor默认是动态的扩展工作者数量,你也可以指定固定数量的工作者
        gor uses dynamic worker scaling. enter a number to set a maximum number of workers. default = 0 = unlimited.
  -output-http-workers-min int
        gor uses dynamic worker scaling. enter a number to set a minimum number of workers. default = 1.

默认情况下,gor 创建一个动态工作池:
它从 10 开始,并在 http 输出队列长度大于 10 时创建更多的 http 输出工作者。创建的工人数量(n)等于该工作时间的队列长度检查并发现其长度大于10.每次将消息写入http输出队列时都检查队列长度。在产生n名工人的请求得到满足之前,不会再有工人产卵。如果动态工作人员当时不能处理消息,它将睡眠 100 毫秒。如果动态工作人员无法处理消息2秒钟,则会死亡。可以使用 --output-http-workers=20 选项指定固定数量的工人

http 输出工作数量

newhttpoutput 默认情况:

// newhttpoutput constructor for httpoutput
// initialize workers
func newhttpoutput(address string, config *httpoutputconfig) pluginreadwriter {
	o := new(httpoutput)
	var err error
	config.url, err = url.parse(address)
	if err != nil {
		log.fatal(fmt.sprintf("[output-http] parse http output url error[%q]", err))
	}
	if config.url.scheme == "" {
		config.url.scheme = "http"
	}
	config.rawurl = config.url.string()
	if config.timeout < time.millisecond*100 {
		config.timeout = time.second
	}
	if config.buffersize <= 0 {
		config.buffersize = 100 * 1024 // 100kb
	}
	if config.workersmin <= 0 {
		config.workersmin = 1
	}
	if config.workersmin > 1000 {
		config.workersmin = 1000
	}
	if config.workersmax <= 0 {
		config.workersmax = math.maxint32 // idealy so large
	}
	if config.workersmax < config.workersmin {
		config.workersmax = config.workersmin
	}
	if config.queuelen <= 0 {
		config.queuelen = 1000
	}
	if config.redirectlimit < 0 {
		config.redirectlimit = 0
	}
	if config.workertimeout <= 0 {
		config.workertimeout = time.second * 2
	}
	o.config = config
	o.stop = make(chan bool)
	//是否收集统计信息,统计输出间隔是多少
	if o.config.stats {
		o.queuestats = newgorstat("output_http", o.config.statsms)
	}
	o.queue = make(chan *message, o.config.queuelen)
	if o.config.trackresponses {
		o.responses = make(chan *response, o.config.queuelen)
	}
	// it should not be buffered to avoid races
	o.stopworker = make(chan struct{})
	if o.config.elasticsearch != "" {
		o.elasticsearch = new(esplugin)
		o.elasticsearch.init(o.config.elasticsearch)
	}
	o.client = newhttpclient(o.config)
	o.activeworkers  = int32(o.config.workersmin)
	for i := 0; i < o.config.workersmin; i {
		go o.startworker()
	}
	go o.workermaster()
	return o
}

配置后启动 httpclient:

o.client = newhttpclient(o.config)
	o.activeworkers  = int32(o.config.workersmin)
	for i := 0; i < o.config.workersmin; i {
		go o.startworker()
	}

启动多个发送进程:

func (o *httpoutput) startworker() {
	for {
		select {
		case <-o.stopworker:
			return
		case msg := <-o.queue:
			o.sendrequest(o.client, msg)
		}
	}
}

执行发送:

func (o *httpoutput) sendrequest(client *httpclient, msg *message) {
	if !isrequestpayload(msg.meta) {
		return
	}
	uuid := payloadid(msg.meta)
	start := time.now()
	resp, err := client.send(msg.data)
	stop := time.now()
	if err != nil {
		debug(1, fmt.sprintf("[http-output] error when sending: %q", err))
		return
	}
	if resp == nil {
		return
	}
	if o.config.trackresponses {
		o.responses <- &response{resp, uuid, start.unixnano(), stop.unixnano() - start.unixnano()}
	}
	if o.elasticsearch != nil {
		o.elasticsearch.responseanalyze(msg.data, resp, start, stop)
	}
}

发送细节,各种配置生效点:

// send sends an http request using client create by newhttpclient
func (c *httpclient) send(data []byte) ([]byte, error) {
	var req *http.request
	var resp *http.response
	var err error
	req, err = http.readrequest(bufio.newreader(bytes.newreader(data)))
	if err != nil {
		return nil, err
	}
	// we don't send connect or options request
	if req.method == http.methodconnect {
		return nil, nil
	}
	if !c.config.originalhost {
		req.host = c.config.url.host
	}
	// fix #862
	if c.config.url.path == "" && c.config.url.rawquery == "" {
		req.url.scheme = c.config.url.scheme
		req.url.host = c.config.url.host
	} else {
		req.url = c.config.url
	}
	// force connection to not be closed, which can affect the global client
	req.close = false
	// it's an error if this is not equal to empty string
	req.requesturi = ""
	resp, err = c.client.do(req)
	if err != nil {
		return nil, err
	}
	if c.config.trackresponses {
		return httputil.dumpresponse(resp, true)
	}
	_ = resp.body.close()
	return nil, nil

http 输出队列

队列用在哪儿呢?

代码逻辑调用图

【凯发k8国际娱乐官网入口的版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区),文章链接,文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件至:进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容。
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。