如何在 Go 语言中使用 Redis 连接池

redis_go

关于连接池

一个数据库服务器只拥有有限的资源,并且如果你没有充分使用这些资源,你可以通过使用更多的连接来提高吞吐量。一旦所有的资源都在使用,那么你就不能通过增加更多的连接来提高吞吐量。事实上,吞吐量在连接负载较大时就开始下降了。通常可以通过限制与可用的资源相匹配的数据库连接的数量来提高延迟和吞吐量。

如果不使用连接池,那么,每次传输数据,我们都需要进行创建连接,收发数据,关闭连接。在并发量不高的场景,基本上不会有什么问题,一旦并发量上去了,那么,一般就会遇到下面几个常见问题:

  • 性能普遍上不去
  • CPU 大量资源被系统消耗
  • 网络一旦抖动,会有大量 TIME_WAIT 产生,不得不定期重启服务或定期重启机器
  • 服务器工作不稳定,QPS 忽高忽低

要想解决这些问题,我们就要用到连接池了。连接池的思路很简单,在初始化时,创建一定数量的连接,先把所有长连接存起来,然后,谁需要使用,从这里取走,干完活立马放回来。 如果请求数超出连接池容量,那么就排队等待、退化成短连接或者直接丢弃掉。

使用连接池遇到的坑

最近在一个项目中,需要实现一个简单的 Web Server 提供 Redis 的 HTTP interface,提供 JSON 形式的返回结果。考虑用 Go 来实现。

首先,去看一下 Redis 官方推荐的 Go Redis driver。官方 Star 的项目有两个:Radix.v2 和 Redigo。经过简单的比较后,选择了更加轻量级和实现更加优雅的 Radix.v2

Radix.v2 包是根据功能划分成一个个的 sub package,每一个 sub package 在一个独立的子目录中,结构非常清晰。我的项目中会用到的 sub package 有 redis 和 pool。

由于我想让这种被 fork 的进程最好简单点,做的事情单一一些,所以,在没有深入去看 Radix.v2 的 pool 的实现之前,我选择了自己实现一个 Redis pool。(这里,就不贴代码了。后来发现自己实现的 Redis pool 与 Radix.v2 实现的 Redis pool 的原理是一样的,都是基于 channel 实现的, 遇到的问题也是一样的。)

不过在测试过程中,发现了一个诡异的问题。在请求过程中经常会报 EOF 错误。而且是概率性出现,一会有问题,一会又好了。通过反复的测试,发现 bug 是有规律的,当程序空闲一会后,再进行连续请求,会发生3次失败,然后之后的请求都能成功,而我的连接池大小设置的是3。再进一步分析,程序空闲300秒后,再请求就会失败,发现我的 Redis server 配置了 timeout 300,至此,问题就清楚了。是连接超时 Redis server 主动断开了连接。客户端这边从一个超时的连接请求就会得到 EOF 错误。

然后我看了一下 Radix.v2 的 pool 包的源码,发现这个库本身并没有检测坏的连接,并替换为新的连接的机制。也就是说我每次从连接池里面 Get 的连接有可能是坏的连接。所以,我当时临时的解决方案是通过增加失败后自动重试来解决了。不过,这样的处理方案,连接池的作用好像就没有了。技术债能早点还的还是早点还上。

使用连接池的正确姿势

想到我们的 ngx_lua 项目里面也大量使用 redis 连接池,他们怎么没有遇到这个问题呢。只能去看看源码了。

经过抽象分离, ngx_lua 里面使用 redis 连接池部分的代码大致是这样的

server {
    location /pool {
        content_by_lua_block {
            local redis = require "resty.redis"
            local red = redis:new()

            local ok, err = red:connect("127.0.0.1", 6379)
            if not ok then
                ngx.say("failed to connect: ", err)
                return
            end

            ok, err = red:set("hello", "world")
            if not ok then
                return
            end

            red:set_keepalive(10000, 100)
        }
    }
}

发现有个 set_keepalive 的方法,查了一下官方文档,方法的原型是 syntax: ok, err = red:set_keepalive(max_idle_timeout, pool_size) 貌似 max_idle_timeout 这个参数,就是我们所缺少的东西,然后进一步跟踪源码,看看里面是怎么保证连接有效的。

function _M.set_keepalive(self, ...)
    local sock = self.sock
    if not sock then
        return nil, "not initialized"
    end

    if self.subscribed then
        return nil, "subscribed state"
    end

    return sock:setkeepalive(...)
end

至此,已经清楚了,使用了 tcp 的 keepalive 心跳机制。

于是,通过与 Radix.v2 的作者一些讨论,选择自己在 redis 这层使用心跳机制,来解决这个问题。

最后的解决方案

在创建连接池之后,起一个 goroutine,每隔一段 idleTime 发送一个 PING 到 Redis server。其中,idleTime 略小于 Redis server 的 timeout 配置。

连接池初始化部分代码如下:

p, err := pool.New("tcp", u.Host, concurrency)
errHndlr(err)
go func() {
    for {
        p.Cmd("PING")
        time.Sleep(idelTime * time.Second)
    }
}()

使用 redis 传输数据部分代码如下:

func redisDo(p *pool.Pool, cmd string, args ...interface{}) (reply *redis.Resp, err error) {
	reply = p.Cmd(cmd, args...)
	if err = reply.Err; err != nil {
		if err != io.EOF {
			Fatal.Println("redis", cmd, args, "err is", err)
		}
	}

	return
}

其中,Radix.v2 连接池内部进行了连接池内连接的获取和放回,代码如下:

// Cmd automatically gets one client from the pool, executes the given command
// (returning its result), and puts the client back in the pool
func (p *Pool) Cmd(cmd string, args ...interface{}) *redis.Resp {
	c, err := p.Get()
	if err != nil {
		return redis.NewResp(err)
	}
	defer p.Put(c)

	return c.Cmd(cmd, args...)
}

这样,我们就有了 keepalive 的机制,不会出现 timeout 的连接了,从 redis 连接池里面取出的连接都是可用的连接了。看似简单的代码,却完美的解决了连接池里面超时连接的问题。同时,就算 Redis server 重启等情况,也能保证连接自动重连。

References

Learning LuaDist

官网

wiki

如何提交Package到PyPI

PyPI官方文档

Steps

注册账号

填写配置文件~/.pypirc

[distutils]
index-servers=
    pypi
    pypitest

[pypitest]
repository = https://testpypi.python.org/pypi
username = <your user name goes here>
password = <your password goes here>

[pypi]
repository = https://pypi.python.org/pypi
username = <your user name goes here>
password = <your password goes here>

准备你的package

PyPI上的每个package需要一个setup.py文件在项目的根目录, 如果你在一个一个markdown格式的README文件, 你还需要一个setup.cfg文件. 一个LICENSE.txt文件. 假设包名为:mypackage, 目录结构如下.

├── LICENSE.txt
├── README.md
├── mypackage
│   ├── __init__.py
│   ├── bar.py
│   ├── baz.py
│   └── foo.py
├── setup.cfg
└── setup.py

setup.py

from distutils.core import setup

setup(
    name='mypackage',
    packages=['mypackage'],  # this must be the same as the name above
    version='0.1',
    description='A random test lib',
    author='Akagi201',
    author_email='[email protected]',
    url='https://github.com/Akagi201/mypackage',  # use the URL to the github repo
    download_url='https://github.com/Akagi201/mypackage/tarball/0.1',  # I'll explain this in a second
    keywords=['testing', 'logging', 'example'],  # arbitrary keywords
    classifiers=[],
)

download_url: repo源码的下载链接, 使用git tag后, github会为你host. 使用命令: git tag 0.1 -m "Adds a tag so that we can put this on PyPI.", 使用 git push --tags origin master 提交到github

setup.cfg

告诉PyPI你的README文件的位置.

[metadata]
description-file = README.md

提交package到PyPI Test

  • 注册: python setup.py register -r pypitest
  • 上传: python setup.py sdist upload -r pypitest
  • 安装: pip install -i https://testpypi.python.org/pypi <package name>

提交package到PyPI Live

  • 注册: python setup.py register -r pypi
  • 上传: python setup.py sdist upload -r pypi
  • web form提交: https://pypi.python.org/pypi?:action=submit_form

不使用setuptools, 使用更安全的twine上传

  • twine upload dist/*

Refs

专业投机原理书评

读这本书的文字就能体会到, 作者那种自信而坚定的行事风格, 很喜欢这种略带装逼的文风. 作者的经历我也很喜欢, 学历不高, 小时候就精通打牌(阅读了所有相关书籍, 并勤于练习), 16-20岁之间就收入颇丰, 主要来自扑克赌局与牌技表演. 为了练习玩牌, 他随身带着一副牌. 有一句话蛮逗的, 哈哈. “我与女朋友在看电影时, 我的左手会练习单手切牌, 而右手就放在每一个16岁男孩与女朋友看电影时所应游走的位置.” 当发现赌博不适合作为一生的职业后, 走上了证劵交易员的道路.

摘录

  • 他年少时精于玩牌, 并从中认识到“胜算”和“自律”的重要性.
  • 要了解市场, 更要了解自己.
  • 在阅读任何领域的书时, 我总是要求该书的作者或讨论的对象拥有工人的成就.
  • 知识本身绝对不是成功的保证. 除了知识, 你还需要一套执行知识的管理计划以及严格遵守计划的心理素质, 这样才可以免除情绪的干扰.
  • 如果你希望获胜, 你就必须了解规则; 另外, 你还必须愿意坐上赌桌, 这样你才有获胜的机会.
  • 赌博必须承担不利胜算的风险, 投机是掌握有利胜算的情况下才承担风险.
  • 赌博从来都不是一种高风险的行为, 输赢的关键是如何管理胜算. 我记住每一种牌型的胜算几率, 并依此决定对策. 这便是风险管理方法的要点所在.
  • 不可基于帮助朋友的立场, 免费提供任何有关市场的建议.
  • 结合技术分析, 统计方法以及经济基本面等因素, 评估任何投机头寸的风险. 唯有这三个因素相互配合时, 我才会在市场中建立重要头寸.
  • 作为交易者, 我的目标始终是: 在经济独立的情况下保有自由, 换句话说, 我的目标是: 经年累月地稳定赚钱.
  • 我的哲学基于三个原则, 按重要性排列如下: 保障资本, 一致性的获利能力以及追求卓越的回报. 这三者是我的基本原则, 因为他们是所有市场决策的最高指导原则.

一些原则

  • 鳄鱼原则: 万一鳄鱼咬住你的脚, 务必记住: 你唯一的机会便是牺牲一只脚.
  • 道氏理论: 长期趋势最为重要, 也最容易被辨认, 归类与了解. 中期趋势对投资者来说较为次要, 但却是投机者的主要考虑因素. 短期趋势最难预测, 唯有交易者才会随时考虑他.
  • 交易准则: 1. 根据计划进行交易, 并严格遵守计划. 2. 顺势交易, “趋势是你的朋友”. 3. 在许可的范围内, 尽可能采用止损单. 4. 一旦心存怀疑, 立即出场. 5. 务必要有耐心, 不可过度扩张交易. 6. 迅速认赔, 让获利头寸持续发展. 7. 不可让获利头寸演变为亏损(或者, 尽可能持有必然). 8. 在弱势中买进, 在强势中卖出. 应该以买进的意愿来同等对待卖出. 9. 在多头市场的初期阶段, 应该扮演投资者的角色. 在多头市场的后期与空头市场中, 应该扮演投机的角色. 10. 不可摊平亏损–亏损头寸不可加码. 11. 不可仅因价格偏低而买进, 不可仅因价格偏高而卖出. 12. 只在流动性高的市场中交易. 13. 在价格变动迅速时, 不可建立头寸.

取代netcat的瑞士军刀socat

desc

  • netcat++
  • Multipurpose relay (SOcket CAT)
  • http://www.dest-unreach.org/socat/
  • 曾经一直纠结netcat这么好用的测试用具怎么就好久不更新了呢, 原来是有更好的取代者.
  • socat相比netcat功能更加强大, 同时也相对复杂了一些.

包含的工具

  • socat: establishes two bidirectional byte streams and transfers data between them.
  • filan: prints information about its active file descriptors to stdout.
  • procan: prints information about process parameters to stdout

工作原理 - life cycle of a socat instance (4 phases)

  1. init phase(初始化阶段), the command line options are parsed and logging is initialized. (解析命令行以及初始化日志系统.)
  2. open phase(打开连接阶段), opens the first address and afterwards the second address. These steps are usually blocking; thus, especially for complex address types like socks, connection requests or authentication dialogs must be completed before the next step is started. (先打开第一个连接, 再打开第二个连接. 这个单步执行的. 如果第一个连接失败, 则会直接退出.)
  3. transfer phase(数据转发阶段), socat watches both streams’ read and write file descriptors via select() , and, when data is available on one side and can be written to the other side, socat reads it, performs newline character conversions if required, and writes the data to the write file descriptor of the other stream, then continues waiting for more data in both directions. (谁有数据就转发到另外一个连接上, read/write互换.)
  4. closing phase(关闭阶段), one of the streams effectively reaches EOF. Socat transfers the EOF condition to the other stream, i.e. tries to shutdown only its write stream, giving it a chance to terminate gracefully. For a defined time socat continues to transfer data in the other direction, but then closes all remaining channels and terminates. (其中一个连接断开, 执行处理另外一个连接.)

Options

  • 命令行参数用来修改程序的行为. 他们与所谓的作为address specifications的一部分的address options无关.
  • 详见: socat -h

Address specifications

  • 一个address specification通常包含一个address type关键字, 0或者更多必要的address parameters与keyword用’:‘分隔以及他们相互之间, 和0或者更多address options用’,‘分隔.
  • keyword指定address type(如: TCP4, OPEN, EXEC). 对于有些keywords有同义词(‘-‘与STDIO, TCP与TCP4). 关键字是大小写敏感的. 对于一些特殊的address type, keyword可以被忽略: Address specifications以数字开头的被认为是FD address(raw file descriptor). 如果一个’/‘被发现, 在第一个’:‘或者’,‘前, GOPEN(generic file open)被认定.
  • 0或者更多address options可以被给到每一个地址上. 他们在一些方式上影响地址. Options由一个option keyword组成或者一个option keyword=value组成. Option keywords是大小写不敏感的.

Address Types

  • socat -h

Address Options

  • socat -h

Data Values

Refs