设计优先的 Restful API 服务开发与服务解耦实践

最近公司需要一个 API 服务提供给用户用来查询直播系统的一些流相关数据接口。正好我们公司应用组的同事 CatTail 分享了他们在这个季度将他们的 API 服务进行了重构, 使用了 swagger 这套 OpenAPI 标准工具。正好借此机会, 学来用用。

Swagger 框架的选择

应用组这边用的是 Node.js 开发的 API 服务, 我这边涉及到数据的上报与高并发处理, 使用 Go 开发。所以, 需要选择一个 Go 语言版的 Swagger API 框架

经过简单的调研比较后, 我选择了 go-swagger 这个框架。这个开源项目是由 VMware 赞助并维护的, 支持最新的 Swagger 2.0 标准。项目的文档还不算太详细, 有的地方需要自己来摸索下。

我选择这个框架基于如下几点考虑:

  • 支持标准的 http middleware, 方便集成 Alice 插件管理工具, 可以直接使用大量与 net/http 兼容的各种 http middleware。
  • 代码侵入性不大, 如果后期想改用其他框架, 迁移起来也不太麻烦。
  • 自带 validator 功能, API spec 设计好后, 无需在代码中自己去写繁琐的边界校验功能。
  • 可以通过代码生成 API 文档, 这样能 100% 的保证代码与文档保持一致。
  • 由于使用 Swagger 的标准, 方便使用兼容这一标准的一整套工具链, 保证 API 监控, spec render 等, 也能复用应用组那边的资源, 哈哈。
  • 项目也在发展中, 目前支持的 scheme 有 http 和 https, 后续会支持 ws 和 wss。我将来计划开发的 WebRTC 信令服务也可以考虑用这个框架来做。
  • go-swagger 提供很多基础的 helper function 库实现: https://github.com/go-openapi

这个框架不太好的地方, 或者对于 Go 这种强类型语言不太好的地方, 就是 Swagger 基于 spec 定义的 definitions 生成的 model 与 数据库的 model 不能使用一套 struct, 因为涉及到一些 struct tag 的添加没法添加, 区分开后, 就涉及到两种结构之间的数据拷贝的问题, 代码显得有点冗长。暂时我还没想到更好的办法。

直播 API 服务架构设计

首先是整理需求, 和抽象需求的模型, 我这边收到几个类似的业务需求有 禁播/踢流 服务, 触发/定时截图, 触发/定时录制, 流事件回调与查询。都涉及到配置变更和推流与断流事件触发相应的业务逻辑。所以, 我抽象出了如下服务架构。

origin_admin

这个架构同时解决了两个问题:

  • 与应用解耦, 在过去应用配置数据是通过 redis 数据结构与底层服务进行对接的, 当数据结构变更与扩展时, 都会发生各种各样繁琐的数据兼容与繁琐的流程。现在底层服务直接提供 Restful API 风格的 CRUD 操作, 底层服务自身来进行数据的持久化, 当需要存储结构调优和变更时, 就可以自己内部来调整, 而不用去动对外的接口。这在互联网环境, 需求与架构不断变化, 快速迭代开发的情况相匹配。
  • 由于 CRUD 接口都在我的服务内部, 配置变更触发事件, 这个逻辑变得更加容易实现和控制。

统一的异步消息处理机制与事件回调机制:

  • 配置变更 与 流事件触发, 可以用类似的异步逻辑来处理, 这里目前是通过 NSQ 消息队列, 来异步处理消费这些事件。
  • 推流断流事件与配置变更事件可以联动触发其他服务的业务逻辑。这里我设计了统一的事件回调机制, 可以回调到录制服务, 截图服务, 踢流服务甚至客户自己的服务等。

说说技术选型

  • swagger: OpenAPI 标准, 可以利用 swagger 一整套工具, 方便后续做 API 统计与监控。
  • mq: NSQ, 简单, 够用, 稳定。
  • db: mongodb, 文档存储, 支持大数据, 支持单个 field 的 CRUD 操作, 方便 scale, 够用的读写性能, 够用的查询功能。

通过 Swagger spec 来设计 API

这里截取一小部分来说明

swagger: "2.0"
info:
  contact:
    email: akagi201@gmail.com
    name: Akagi201
    url: http://akagi201.org
  description: UPYUN live streaming API service based on go-swagger
  title: UPYUN live streaming API service
  version: 0.1.0
# during dev, should point to your local machine
host: localhost:2201
# basePath will be prefixed to all paths
basePath: /api/v1
produces:
  - application/json
consumes:
  - application/json
schemes:
  - http
  - https
tags:
  - name: system
    description: 系统信息
  - name: stream
    description: 流信息
  - name: event
    description: 对内 事件 接口
  - name: config
    description: 对内 应用配置 接口
paths:
  /system/version:
    get:
      tags:
        - system
      summary: 获取版本信息
      operationId: getVersion
      responses:
        200:
          description: get versions
          schema:
            $ref: "#/definitions/version"
        default:
          description: error
          schema:
            $ref: "#/definitions/error_response"
definitions:
  version:
    type: object
    title: version_info
    required:
      - version
      - signature
    properties:
      version:
        type: string
        minLength: 1
      signature:
        type: string
        minLength: 1
  error_response:
    type: object
    title: error_response
    required:
      - code
      - data
    properties:
      code:
        type: integer
        format: int32
        description: 所有错误码统一定义
        example: 201
      data:
        type: string
        minLength: 1
        description: 详细错误信息
        example: Operator already exists

限制条件与文档说明, 例子都可以写到 spec 里面。完整的 spec 文档, 请参考 http://swagger.io/specification/

可以使用开源的 swagger-ui 来 render 这份文档, 当然如果你觉得他太丑了, 可以自己写个 render。

uplive_api

使用 go-swagger 框架

目录结构

❯ tree -L 3
.
├── README.md
├── client // client SDK
│   ├── config
│   │   ├── config_client.go
│   │   ├── del_stream_config_parameters.go
│   │   ├── del_stream_config_responses.go
│   │   ├── get_stream_config_parameters.go
│   │   ├── get_stream_config_responses.go
│   │   ├── set_stream_config_parameters.go
│   │   └── set_stream_config_responses.go
│   ├── event
│   │   ├── event_client.go
│   │   ├── get_stream_events_parameters.go
│   │   ├── get_stream_events_responses.go
│   │   ├── stream_event_collector_parameters.go
│   │   └── stream_event_collector_responses.go
│   ├── origin_admin_client.go
│   ├── stream
│   │   ├── get_stream_status_parameters.go
│   │   ├── get_stream_status_responses.go
│   │   └── stream_client.go
│   └── system
│       ├── get_version_parameters.go
│       ├── get_version_responses.go
│       └── system_client.go
├── cmd // 最终生成的二进制
│   └── origin-admin-server
│       └── main.go
├── models // 根据 spec definition 生成的 model
│   ├── error_response.go
│   ├── result_response.go
│   ├── stream_action.go
│   ├── stream_event.go
│   ├── stream_events.go
│   ├── stream_info.go
│   ├── stream_status.go
│   └── version.go
├── restapi // api 处理逻辑
│   ├── configure_origin_admin.go // 自定义 API 逻辑入口文件
│   ├── controller.go // 我添加的针对每个 API 的具体处理 Handler 逻辑实现, 后期可以拆分成目录和多个文件的结构
│   ├── doc.go
│   ├── embedded_spec.go
│   ├── operations
│   │   ├── config
│   │   ├── event
│   │   ├── origin_admin_api.go
│   │   ├── stream
│   │   └── system
│   └── server.go
├── store // 与 mgo 交互的 model 定义
│   └── mongo
│       ├── event.go
│       └── stream_config.go
└── swagger.yml // spec 定义的地方

总结与体会

  • 对于新人来说, 会有一定的学习成本, 包括开发理念的传递。不过 design first 这个理念是非常好的。
  • 整体来说使用这套框架还是比较顺利, 没有遇到什么无法解决的问题, 也希望 go-swagger 能够不断完善起来, 另外, 大家用的多了, 自然功能也就完善了, 所以, 在此, 还是向大家隆重推荐一下。

Reading WebRTCBook

webrtcbook-cover

5 月 29 号在美国亚马逊上买了 这本书. 邮件上预计是 7 月 5 日到. 结果今天( 6 月 13 日)就到了, 真是开心. BTW, 这算是我买过的最贵的书了. 购买地址

webrtcbook-order

书里面的图片都是彩印的. 不算太厚, 才 295 页.

Reading ng-book 2

哈哈, 最近在同时学两个东西, 都是 ngxxx 一个是 nginx, 另一个就是 angular.

同样地, 这是一篇无聊的文章, 只是给我个人用来记录进度的.(不记录, 坚持不下来啊)

学习动机

  • 目前工作一个项目用的 angular 1.x
  • data 驱动 view
  • 前端太 tmd 的乱, 框架一大堆, 看别人分析, 都希望出现一个统一的框架. 我希望是 angular.
  • 想摆脱手写html, 手写js, 手写 css 的低端模式, 学个框架, 以后可以装逼说自己会前端了.

文档地址

进度

2016-02-17

  • 当前的 angular 2 状态是 beta.
  • angular: https://angular.io/
  • typescript: http://www.typescriptlang.org/
  • 简单的看了下网上各路人马的评价, 在typescript跟es6中选择了typescript.
  • TypeScript is a superset of JavaScript ES6 that adds types.
  • ES5 == normal JavaScript
  • ES6 == ES2015
  • Angular 2 is written in TypeScript and generally that’s what everyone is using.
  • Angular 2 itself is a javascript file.
  • Shim. A shim is a library that brings a new API to an older environment, using only the means of that environment.
  • Polyfill. A polyfill is a piece of code (or plugin) that provides the technology that you, the developer, expect the browser to provide natively. Flattening the API landscape if you will.

2016-05-18

Angular 2 项目基础

  • package.json
  • tsconfig.json
  • tslint.json

Angular 2 依赖

  • ES6 Shim
  • Zones
  • Reflect Metadata
  • SystemJS

notes

  • CSS 使用 Semantic-UI.
  • reference(in .d.ts) 语句指定 typing 文件的路径.
  • import 语句是来自 ES6, 叫做 destructuring.
  • Component 是 Angular 1里的 directives 的新版本.
  • 一个基础的 Component 包含两个部分: Component annotation 和 component definition class.
  • 把 annotation 看做 metadata added to your code. (? 跟 python 的 docorator 类似吧)
  • selector: angular自己的selector mix, 类似 CSS selector, XPath, JQuery selector.
  • npm run tsc 编译.
  • npm run tsc:w 编译并监听.
  • tsc --watch 每次修改都编译.
  • 读到 Page 16

2016-05-19

  • npm run go 监听修改并serve, 注意 8080 端口不能被占用.
  • array: Angular1 的 ng-repeat 在 Angular2 中类似的指令是 NgFor.
  • #newtitle 语法叫做一个 resolve. 效果是让这个变量在这个 view 范围的的表达式有效.
  • 绑定 input 到 value, newtitle 是一个 object, 代表这个 input DOM 元素, 类型是 HTMLInputElement. 可以访问 newtitle.value.
  • 绑定 actions 到 events, 组件类的一个函数赋值给 (click) 属性.
  • 反引号: ES6 语法, 反引号的字符串将会展开模板变量.
  • 可以在 attribute values 里面使用模板.
  • 在 Angular 1, directives 全局 match, 在 Angular 2, 你需要显式指定你用哪个组件(因此, 哪个 selector).
  • 在 js 里, 默认传播 click 事件给所有的父组件.
  • 一个好的实践是当写 Angular 代码时, 尽量将你使用的数据结构与组件代码隔离.
  • 封装的原则: LoD, https://en.wikipedia.org/wiki/Law_of_Demeter
  • train-wreck: 小心 long-method chaining foo.bar.baz.bam
  • 读到 Page 42

2016-05-20

  • MVC guideline: Skinny Controller, Fat Model
  • 核心理念是: 将大多数我们的 domain logic 移动到我们的 models, 因此我们的 components 尽可能做最少的工作.
  • 数组的两种表示: 1. 普通: Article[]; 2. generics: Array
    .
  • Component 的 attribute: inputs
  • 创建 coimponent 的核心不仅仅是封装, 而且是为了重用.

总结编写 Angular 2 应用步骤

  1. 将你的应用分解成 components.
  2. 创建 view.
  3. 定义你的 model.
  4. 显示你的 model.
  5. 添加互动.

typescript

  • ES6 = ES5 + classes + modules
  • TypeScript = ES6 + types + annotations
  • transpiler / transcompiler : TypeScript -> ES5.
  • TypeScript to ES5 有一个单一的 transpiler, 由核心 TypeScript team 开发.
  • ES6 to ES5 有两个 transpiler: traceur (by google), babel (by js community).
  • REPL: ts-node, tsun
  • number: 在TS中, 所有的number都是浮点数.
  • any: any 是默认类型, 如果我们忽略给一个变量指定类型,
  • classes: 在 ES5 中, OO是通过 prototype-based objects 实现的.
  • 最佳实践, 关于补充js中没有class: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Guide
  • oo in js: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Introduction_to_Object-Oriented_JavaScript
  • class 有 properties, methods, constructors.
  • A void value is also a valid any value.
  • constructor: 必须被命名为: constructor. 每个 class 只能有一个 constructor.
  • inheritance in ES5: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Inheritance_and_the_prototype_chain
  • inheritance: 使用 extends 关键字.
  • Utilities(语法糖): ES6 提供一些语法糖: 1. fat arrow function syntax; 2. template strings;
  • Fat Arrow Function (=>): 是一个缩写来写函数.
  • 在 ES5 中, 如果我们想要使用一个函数作为参数, 我们需要使用 function 关键字和 {}.
  • => 语法的一个重要特性是, 他和包围他的代码使用相同的 this. 这与你在 JS 中创建一个普通函数不同.
  • 通常当你在 JS 中写一个函数时, 这个函数被分配他自己的 this.
  • => 函数是一个很好的方法来清理你的 inline 函数. 他使在JS中使用高阶函数更简单.
  • Template Strings: 在 ES6 中, 新的 template strings 被引入. 两个好的特性: 1. Varaibles within strings. 2. Multi-line strings.
  • Variables in strings: 也叫 string interpolation. 要使用反引号, 不能使用单引号或者双引号.
  • Angualr 2应用由 Component 组成(a tree of components), 一种理解 Component 的方式是教会浏览器新 tag.
  • Angular 2 的 Component 和 Angular 1 的 directive 类似. 同时, Angular 2 也有 directive.
  • component 是 composable.
  • Angular 2 没有指定一个 model library.
  • 读到 Page 76.
TypeScript 比 ES5 多的特性
  • types
  • classes
  • annotations
  • imports
  • language utilities (e.g. desctructuring)

2016-05-26

  • componet decorator: 包含一个 selector, 一个 template.
  • component controller: 由一个 class 定义.
  • component selector: 两种写法. <inventory-app></inventory-app>
<div inventory-app></div>
  • 添加实例到组件里来显示子组件.
  • template binding: {{...}}, 里面不仅仅是一个变量, 是一个表达式.
  • Inputs 和 Outputs: [squareBrackets] 传入 inputs, (parenthesis) 处理 outputs.
  • 数据流入你的组件, 通过 input binding, 事件流出你的组件, 通过 output binding.
  • 可以把 input + output bindings 看做你的组件的 public API.
  • component inputs:
  • Observer pattern: https://en.wikipedia.org/wiki/Observer_pattern

Reading Nginx tutorial

这是一篇无聊的文章, 用于记录自己的阅读进度而已.

文档地址

进度

2016-02-16

2016-02-17

2016-02-19

  • ngx_http_proxy_module: http://nginx.org/en/docs/http/ngx_http_proxy_module.html
  • 不是所有的 Nginx 变量都拥有存放值的容器。拥有值容器的变量在 Nginx 核心中被称为“被索引的”(indexed);反之,则被称为“未索引的”(non-indexed)。
  • 读完 Nginx 变量漫谈(三)
  • 读完 Nginx 变量漫谈 (四)
  • Nginx 变量值容器的生命期是与当前请求相关联的。每个请求都有所有变量值容器的独立副本,只不过当前请求既可以是“主请求”,也可以是“子请求”。即便是父子请求之间,同名变量一般也不会相互干扰。
  • Module ngx_http_auth_request_module: http://nginx.org/en/docs/http/ngx_http_auth_request_module.html
  • Nginx 变量漫谈(五)

2016-02-22

*

如何在 Go 语言中使用 Redis 连接池

redis_go

关于连接池

一个数据库服务器只拥有有限的资源,并且如果你没有充分使用这些资源,你可以通过使用更多的连接来提高吞吐量。一旦所有的资源都在使用,那么你就不能通过增加更多的连接来提高吞吐量。事实上,吞吐量在连接负载较大时就开始下降了。通常可以通过限制与可用的资源相匹配的数据库连接的数量来提高延迟和吞吐量。

如果不使用连接池,那么,每次传输数据,我们都需要进行创建连接,收发数据,关闭连接。在并发量不高的场景,基本上不会有什么问题,一旦并发量上去了,那么,一般就会遇到下面几个常见问题:

  • 性能普遍上不去
  • CPU 大量资源被系统消耗
  • 网络一旦抖动,会有大量 TIME_WAIT 产生,不得不定期重启服务或定期重启机器
  • 服务器工作不稳定,QPS 忽高忽低

要想解决这些问题,我们就要用到连接池了。连接池的思路很简单,在初始化时,创建一定数量的连接,先把所有长连接存起来,然后,谁需要使用,从这里取走,干完活立马放回来。 如果请求数超出连接池容量,那么就排队等待、退化成短连接或者直接丢弃掉。

使用连接池遇到的坑

最近在一个项目中,需要实现一个简单的 Web Server 提供 Redis 的 HTTP interface,提供 JSON 形式的返回结果。考虑用 Go 来实现。

首先,去看一下 Redis 官方推荐的 Go Redis driver。官方 Star 的项目有两个:Radix.v2 和 Redigo。经过简单的比较后,选择了更加轻量级和实现更加优雅的 Radix.v2

Radix.v2 包是根据功能划分成一个个的 sub package,每一个 sub package 在一个独立的子目录中,结构非常清晰。我的项目中会用到的 sub package 有 redis 和 pool。

由于我想让这种被 fork 的进程最好简单点,做的事情单一一些,所以,在没有深入去看 Radix.v2 的 pool 的实现之前,我选择了自己实现一个 Redis pool。(这里,就不贴代码了。后来发现自己实现的 Redis pool 与 Radix.v2 实现的 Redis pool 的原理是一样的,都是基于 channel 实现的, 遇到的问题也是一样的。)

不过在测试过程中,发现了一个诡异的问题。在请求过程中经常会报 EOF 错误。而且是概率性出现,一会有问题,一会又好了。通过反复的测试,发现 bug 是有规律的,当程序空闲一会后,再进行连续请求,会发生3次失败,然后之后的请求都能成功,而我的连接池大小设置的是3。再进一步分析,程序空闲300秒后,再请求就会失败,发现我的 Redis server 配置了 timeout 300,至此,问题就清楚了。是连接超时 Redis server 主动断开了连接。客户端这边从一个超时的连接请求就会得到 EOF 错误。

然后我看了一下 Radix.v2 的 pool 包的源码,发现这个库本身并没有检测坏的连接,并替换为新的连接的机制。也就是说我每次从连接池里面 Get 的连接有可能是坏的连接。所以,我当时临时的解决方案是通过增加失败后自动重试来解决了。不过,这样的处理方案,连接池的作用好像就没有了。技术债能早点还的还是早点还上。

使用连接池的正确姿势

想到我们的 ngx_lua 项目里面也大量使用 redis 连接池,他们怎么没有遇到这个问题呢。只能去看看源码了。

经过抽象分离, ngx_lua 里面使用 redis 连接池部分的代码大致是这样的

server {
    location /pool {
        content_by_lua_block {
            local redis = require "resty.redis"
            local red = redis:new()

            local ok, err = red:connect("127.0.0.1", 6379)
            if not ok then
                ngx.say("failed to connect: ", err)
                return
            end

            ok, err = red:set("hello", "world")
            if not ok then
                return
            end

            red:set_keepalive(10000, 100)
        }
    }
}

发现有个 set_keepalive 的方法,查了一下官方文档,方法的原型是 syntax: ok, err = red:set_keepalive(max_idle_timeout, pool_size) 貌似 max_idle_timeout 这个参数,就是我们所缺少的东西,然后进一步跟踪源码,看看里面是怎么保证连接有效的。

function _M.set_keepalive(self, ...)
    local sock = self.sock
    if not sock then
        return nil, "not initialized"
    end

    if self.subscribed then
        return nil, "subscribed state"
    end

    return sock:setkeepalive(...)
end

至此,已经清楚了,使用了 tcp 的 keepalive 心跳机制。

于是,通过与 Radix.v2 的作者一些讨论,选择自己在 redis 这层使用心跳机制,来解决这个问题。

最后的解决方案

在创建连接池之后,起一个 goroutine,每隔一段 idleTime 发送一个 PING 到 Redis server。其中,idleTime 略小于 Redis server 的 timeout 配置。

连接池初始化部分代码如下:

p, err := pool.New("tcp", u.Host, concurrency)
errHndlr(err)
go func() {
    for {
        p.Cmd("PING")
        time.Sleep(idelTime * time.Second)
    }
}()

使用 redis 传输数据部分代码如下:

func redisDo(p *pool.Pool, cmd string, args ...interface{}) (reply *redis.Resp, err error) {
	reply = p.Cmd(cmd, args...)
	if err = reply.Err; err != nil {
		if err != io.EOF {
			Fatal.Println("redis", cmd, args, "err is", err)
		}
	}

	return
}

其中,Radix.v2 连接池内部进行了连接池内连接的获取和放回,代码如下:

// Cmd automatically gets one client from the pool, executes the given command
// (returning its result), and puts the client back in the pool
func (p *Pool) Cmd(cmd string, args ...interface{}) *redis.Resp {
	c, err := p.Get()
	if err != nil {
		return redis.NewResp(err)
	}
	defer p.Put(c)

	return c.Cmd(cmd, args...)
}

这样,我们就有了 keepalive 的机制,不会出现 timeout 的连接了,从 redis 连接池里面取出的连接都是可用的连接了。看似简单的代码,却完美的解决了连接池里面超时连接的问题。同时,就算 Redis server 重启等情况,也能保证连接自动重连。

References