HTML5 直播协议之 WebSocket 和 MSE

当前为了满足比较火热的移动 Web 端直播需求, 一系列的 HTML5 直播技术迅速的发展了起来.

常见的可用于 HTML5 的直播技术有 HLS, WebSocket 与 WebRTC. 今天我要向大家介绍一下 WebSocket 与 MSE 相关的内容, 并在最后通过一个实际的例子, 来展示其具体的用法.

大纲

  • WebSocket 协议介绍.
  • WebSocket Client/Server API 介绍.
  • MSE 介绍.
  • fMP4 介绍.
  • Demo 展示.

WebSocket

通常的 Web 应用都是围绕着 HTTP 的请求/响应模型而构建的. 所有的 HTTP 通信都是通过客户端来控制的, 都是由客户端向服务器发出一个请求, 服务器接收和处理完毕后再返回结果给客户端, 客户端再将数据展现出来. 这种模式不能满足实时应用的需求, 于是出现了 SSE, Comet 等 “服务器推” 的长连接技术.

WebSocket 是直接基于 TCP 连接之上的通信协议, 可以在单个 TCP 连接上进行全双工的通信. WebSocket 在 2011 年被 IETF 定为标准 RFC 6455, 并被 RFC 7936 所补充规范, WebSocket API 被 W3C 定为标准.

WebSocket 是独立的创建在 TCP 上的协议, HTTP 协议中的那些概念都不复存在, 和 HTTP 的唯一关联是使用 HTTP 协议的 101 状态码进行协议切换, 使用的 TCP 端口是 80, 可以用于绕过大多数防火墙的限制.

websocket_protocol

WebSocket 握手

为了更方便地部署新协议,HTTP/1.1 引入了 Upgrade 机制, 它使得客户端和服务端之间可以借助已有的 HTTP 语法升级到其它协议. 这个机制在 RFC7230 的 6.7 Upgrade) 一节中有详细描述.

要发起 HTTP/1.1 协议升级,客户端必须在请求头部中指定这两个字段:

Connection: Upgrade
Upgrade: protocol-name[/protocol-version]

如果服务端同意升级, 那么需要这样响应:

HTTP/1.1 101 Switching Protocols
Connection: upgrade
Upgrade: protocol-name[/protocol-version]

[... data defined by new protocol ...]

可以看到, HTTP Upgrade 响应的状态码是 101, 并且响应正文可以使用新协议定义的数据格式.

WebSocket 握手就利用了这种 HTTP Upgrade 机制. 一旦握手完成,后续数据传输就直接在 TCP 上完成.

WebSocket JavaScript API

目前主流的浏览器提供了 WebSocket 的 API 接口, 可以发送消息(文本或者二进制)给服务器, 并且接收事件驱动的响应数据.

Step1 检查浏览器是否支持 WebSocket.

if(window.WebSocket) {
	// WebSocket代码
}

Step2 建立连接

var ws = new WebSocket('ws://localhost:8327');

Step3 注册回调函数以及收发数据

分别注册 WebSocket 对象的 onopen, onclose, onerror 以及 onmessage 回调函数.

通过 ws.send() 来进行发送数据, 这里不仅可以发送字符串, 也可以发送 Blob 或 ArrayBuffer 类型的数据.

如果接收的是二进制数据,需要将连接对象的格式设为 blob 或 arraybuffer.

ws.binaryType = 'arraybuffer';

WebSocket Golang API

服务器端 WebSocket 库我推荐使用 Google 自己的 golang.org/x/net/websocket, 可以非常方便的与 net/http 一起使用.

可以将 websocket 的 handler function 通过 websocket.Handler 转换成 http.Handler, 这样就可以跟 net/http 库一起使用了.

然后通过 websocket.Message.Receive 来接收数据, 通过 websocket.Message.Send 来发送数据.

具体代码可以看下面的 Demo 部分.

MSE

在介绍 MSE 之前, 我们先看看 HTML5 <audio><video> 有哪些限制.

HTML5

  • 不支持流.
  • 不支持 DRM 和加密.
  • 很难自定义控制, 以及保持跨浏览器的一致性.
  • 编解码和封装在不同浏览器支持不同.

MSE 是解决 HTML5 的流问题.

Media Source Extensions (MSE) 是一个主流浏览器支持的新的 Web API. MSE 是一个 W3C 标准, 允许 JavaScript 动态的构建 <video><audio> 的媒体流. 他定义了对象, 允许 JavaScript 传输媒体流片段到一个 HTMLMediaElement.

通过使用 MSE, 你可以动态地修改媒体流而不需要任何的插件. 这让前端 JavaScript 可以做更多的事情, 我们可以在 JavaScript 进行转封装, 处理, 甚至转码.

虽然 MSE 不能让流直接传输到 media tags 上, 但是 MSE 提供了构建跨浏览器播放器的核心技术, 让浏览器通过 JavaScript API 来推音视频到 media tags 上.

现在每个客户端平台都开始逐步开放流媒体相关的 API: Flash 平台有 Netstream, Android 平台有 Media Codec API, 而 Web 上对应的就是标准的 MSE. 由此可以看出, 未来的趋势是在客户端可以做越来越多的事情.

Browser Support

通过 caniuse 来检查是否浏览器支持情况.

mse-support

通过 MediaSource.isTypeSupported() 可以进一步地检查 codec MIME 类型是否支持.

比较常用的视频封装格式有 webm 和 fMP4.

WebM 和 WebP 是两个姊妹项目, 都是由 Google 赞助的. 由于 WebM 是基于 Matroska 的容器格式, 所以天生是流式的, 很适合用在流媒体领域里.

下面着重介绍一些 fMP4 格式.

我们都知道 MP4 是由一系列的 Boxes 组成的. 普通的 MP4 的是嵌套结构的, 客户端必须要从头加载一个 MP4 文件, 才能够完整播放, 不能从中间一段开始播放.

而 fMP4 由一系列的片段组成, 如果你的服务器支持 byte-range 请求, 那么, 这些片段可以独立的进行请求到客户端进行播放, 而不需要加载整个文件.

为了更加形象的说明这一点, 下面我介绍几个常用的分析 MP4 文件的工具.

  • gpac 原名 mp4box, 是一个媒体开发框架, 在其源码下有大量的媒体分析工具可以使用, testapps
  • mp4box.js 是 mp4box 的 Javascript 版本.
  • bento4 一个专门用于 MP4 的分析工具.
  • mp4parser 在线 MP4 文件分析工具.

fragment mp4 vs non-fragment mp4

下面一个 fragment mp4 文件通过 mp4parser 分析后的截图

fmp4

下面一个 non-fragment mp4 文件通过 mp4parser 分析后的截图

nfmp4

Apple 在今年的 WWDC 大会上宣布会在 iOS 10, tvOS, macOS 的 HLS 中支持 fMP4.

值得一提的是, fMP4, CMAF, ISOBMFF 其实都是类似的东西.

MSE JavaScript API

从高层次上看, MSE 提供了 * 一套 JavaScript API 来构建 media streams. * 一个拼接和缓存模型. * 识别一些 byte 流类型: * WebM * ISO Base Media File Format * MPEG-2 Transport Streams

MSE 内部结构

mse_arch

MSE 本身的设计是不依赖任务特定的编解码和容器格式的, 但是不同的浏览器支持程度是不一样的. 可以通过传递一个 MIME 类型的字符串到静态方法: MediaSource.isTypeSupported 来检查.

比如:

MediaSource.isTypeSupported('audio/mp3'); // false
MediaSource.isTypeSupported('video/mp4'); // true
MediaSource.isTypeSupported('video/mp4; codecs="avc1.4D4028, mp4a.40.2"'); // true

获取 Codec MIME string 的方法可以通过在线的 mp4info 或者使用命令行 mp4info test.mp4 | grep Codecs

可以得到类似如下结果:

❯ mp4info fmp4.mp4| grep Codec
    Codecs String: mp4a.40.2
    Codecs String: avc1.42E01E

当前, H.264 + AAC 的 MP4 容器在所有的浏览器都支持.

普通的 MP4 文件是不能和 MSE 一起使用的, 需要将 MP4 进行 fragment 化.

检查一个 MP4 是否已经 fragment 的方法

mp4dump test.mp4 | grep "\[m"

如果是 non-fragment 会显示类似信息.

❯ mp4dump nfmp4.mp4 | grep "\[m"
[mdat] size=8+50873
[moov] size=8+7804
  [mvhd] size=12+96
    [mdia] size=8+3335
      [mdhd] size=12+20
      [minf] size=8+3250
    [mdia] size=8+3975
      [mdhd] size=12+20
      [minf] size=8+3890
            [mp4a] size=8+82
    [meta] size=12+78

如果已经 fragment, 会显示如下类似信息.

❯ mp4dump fmp4.mp4 | grep "\[m" | head -n 30
[moov] size=8+1871
  [mvhd] size=12+96
    [mdia] size=8+312
      [mdhd] size=12+20
      [minf] size=8+219
            [mp4a] size=8+67
    [mdia] size=8+371
      [mdhd] size=12+20
      [minf] size=8+278
    [mdia] size=8+248
      [mdhd] size=12+20
      [minf] size=8+156
    [mdia] size=8+248
      [mdhd] size=12+20
      [minf] size=8+156
  [mvex] size=8+144
    [mehd] size=12+4
[moof] size=8+600
  [mfhd] size=12+4
[mdat] size=8+138679
[moof] size=8+536
  [mfhd] size=12+4
[mdat] size=8+24490
[moof] size=8+592
  [mfhd] size=12+4
[mdat] size=8+14444
[moof] size=8+312
  [mfhd] size=12+4
[mdat] size=8+1840
[moof] size=8+600

把一个 non-fragment MP4 转换成 fragment MP4.

可以使用 FFmpeg 的 -movflags 来转换

对于原始文件为非 MP4 文件

ffmpeg -i trailer_1080p.mov -c:v copy -c:a copy -movflags frag_keyframe+empty_moov bunny_fragmented.mp4

对于原始文件已经是 MP4 文件

ffmpeg -i non_fragmented.mp4 -movflags frag_keyframe+empty_moov fragmented.mp4

或者使用 mp4fragment

mp4fragment input.mp4 output.mp4

demo

  • MSE Vod Demo 展示利用 MSE 和 WebSocket 实现一个点播服务.
  • MSE Live Demo 展示利用 MSE 和 WebSocket 实现一个直播服务.

MSE VOD Demo

MSE Live Demo

Refs

WebSocket

MSE

直播协议 HTTP-FLV 详解

传统的直播协议要么使用 Adobe 的基于 TCP 的 RTMP 协议, 要么使用 Apple 的基于 HTTP 的 HLS 协议.

今天我要向大家介绍另外一种结合了 RTMP 的低延时, 以及可以复用现有 HTTP 分发资源的流式协议 HTTP-FLV.

FLV

首先, 一定要先介绍一下 FLV 文件格式的细节.

FLV adobe 官方标准

FLV 文件格式标准是写在 F4V/FLV file format spec v10.1 的附录 E 里面的 FLV File Format.

单位说明

类型 说明
Unit data types
SI8 Signed 8-bit integer
SI16 Signed 16-bit integer
SI24 Signed 24-bit integer
SI32 Signed 32-bit integer
SI64 Signed 32-bit integer
UI8 Unsigned 8-bit integer
UI16 Unsigned 16-bit integer
UI24 Unsigned 24-bit integer
UI32 Unsigned 32-bit integer
UI64 Unsigned 64-bit integer
xxx[] Slice of type xxx
xxx[n] Array of type xxx
STRING Sequence of Unicode 8-bit characters (UTF-8), terminated with 0x00

FLV 文件头和文件体 (E.2, E.3)

从整个文件上看, FLV = FLV File Header + FLV File Body

字段 类型 说明
FLV File Header
Signature UI8[3] 签名, 总是 ‘FLV’ (0x464C56)
Version UI8 版本, 总是 0x01, 表示 FLV version 1
TypeFlagsReserved UB [5] 全 0
TypeFlagsAudio UB[1] 1 = 有音频
TypeFlagsReserved UB[1] 全 0
TypeFlagsVideo UB[1] 1 = 有视频
DataOffset UI32 整个文件头长度, 对于FLV v1, 总是 9
FLV File Body
PreviousTagSize0 UI32 总是 0
Tag1 FLVTAG 第一个 tag
PreviousTagSize1 UI32 前一个 tag 的大小, 包括他的 header, 即: 11 + 前一个 tag 的大小
Tag2 FLVTAG 第二个 tag
PreviousTagSizeN-1 UI32 前一个 tag 大小
TagN FLVTAG 最后一个 tag
PreviousTagSizeN UI32 最后一个 tag 大小, 包括他的 header

通常, FLV 的前 13 个字节(flv header + PreviousTagSize0)完全相同, 所以, 程序中会单独定义一个常量来指定.

FLV Tag (E.4)

字段 类型 说明
FLV Tag
Reserved UB[2] 保留给FMS, 应为 0
Filter UB[1] 0 = unencrypted tags, 1 = encrypted tags
TagType UB [5] 类型, 0x08 = audio, 0x09 = video, 0x12 = script data
DataSize UI24 message 长度, 从 StreamID 到 tag 结束(len(tag) - 11)
Timestamp UI24 相对于第一个 tag 的时间戳(unit: ms), 第一个 tag 总是 0
TimestampExtended UI8 Timestamp 的高 8 位. 扩展 Timestamp 为 SI32 类型
StreamID UI24 总是 0, 至此为 11 bytes
AudioTagHeader IF TagType == 0x08
VideoTagHeader IF TagType == 0x09
EncryptionHeader IF Filter == 1
FilterParams IF Filter == 1
Data AUDIODATA 或者 VIDEODATA 或者 SCRIPTDATA

Timestamp 和 TimestampExtended 组成了这个 TAG 包数据的 PTS 信息, PTS = Timestamp | TimestampExtended << 24.

AudioTag (E.4.2)

由于 AAC 编码的特殊性, 这里着重说明了 AAC 编码的 Tag 格式.

字段 类型 说明
Audio Tag
AudioTagHeader
SoundFormat UB[4] 音频编码格式. 2 = MP3, 10 = AAC, 11 = Speex
SoundRate UB[2] 采样率. 0 = 5.5 kHz, 1 = 11 kHz, 2 = 22 kHz, 3 = 44 kHz
SoundSize UB[1] 采样大小. 0 = 8-bit, 1 = 16-bit
SoundType UB[1] 音频声道数. 0 = Mono, 1 = Stereo
AACPacketType UI8 只有当 SoundFormat 为 10 时, 才有该字段. 0 = AAC sequence header, 1 = AAC raw
AACAUDIODATA
Data AudioSpecificConfig IF AACPacketType == 0, 包含着一些更加详细音频的信息
Data Raw AAC frame data in UI8 [n] IF AACPacketType == 1, audio payload, n = [AAC Raw data length] - ([has CRC] ? 9 : 7)

AudioTagHeader 的第一个字节, 也就是接跟着 StreamID 的 1 个字节包含了音频类型, 采样率等的基本信息.

AudioTagHeader 之后跟着的就是 AUDIODATA 部分了. 但是, 这里有个特例, 如果音频格式(SoundFormat)是 AAC, AudioTagHeader 中会多出 1 个字节的数据 AACPacketType, 这个字段来表示 AACAUDIODATA 的类型: 0 = AAC sequence header, 1 = AAC raw.

AudioSpecificConfig 结构描述非常复杂, 在标准文档中是用伪代码描述的, 这里先假定要编码的音频格式, 做一下简化.

音频编码为: AAC-LC, 音频采样率为 44100.

字段 类型 说明
AudioSpecificConfig
audioObjectType UB[5] 编码结构类型, AAC-LC 为 2
samplingFrequencyIndex UB[4] 音频采样率索引值, 44100 对应值 4
channelConfiguration UB[4] 音频输出声道, 2
GASpecificConfig
frameLengthFlag UB[1] 标志位, 用于表明 IMDCT 窗口长度, 0
dependsOnCoreCoder UB[1] 标志位, 表明是否依赖于 corecoder, 0
extensionFlag UB[1] 选择了 AAC-LC, 这里必须为 0

在 FLV 的文件中, 一般情况下 AAC sequence header 这种包只出现1次, 而且是第一个 audio tag, 为什么需要这种 tag, 因为在做 FLV demux 的时候, 如果是 AAC 的音频, 需要在每帧 AAC ES 流前边添加 7 个字节 ADST 头, ADST 是解码器通用的格式, 也就是说 AAC 的纯 ES 流要打包成 ADST 格式的 AAC 文件, 解码器才能正常播放. 就是在打包 ADST 的时候, 需要 samplingFrequencyIndex 这个信息, samplingFrequencyIndex 最准确的信息是在 AudioSpecificConfig 中, 这样, 你就完全可以把 FLV 文件中的音频信息及数据提取出来, 送给音频解码器正常播放了.

VideoTag (E.4.3)

由于 AVC(H.264) 编码的特殊性, 这里着重说明了 AVC(H.264) 编码的 Tag 格式.

字段 类型 说明
Video Tag
VideoTagHeader
FrameType UB[4] 1 = key frame, 2 = inter frame
CodecID UB[4] 7 = AVC
AVCPacketType UI8 IF CodecID == 7, 0 = AVC sequence header(AVCDecoderConfigurationRecord), 1 = One or more AVC NALUs (Full frames are required), 2 = AVC end of sequence
CompositionTime SI24 IF AVCPacketType == 1 Composition time offset ELSE 0

VideoTagHeader 的第一个字节, 也就是接跟着 StreamID 的 1 个字节包含着视频帧类型及视频 CodecID 等最基本信息.

VideoTagHeader 之后跟着的就是 VIDEODATA 部分了. 但是, 这里有个特例, 如果视频格式(CodecID)是 AVC, VideoTagHeader 会多出 4 个字节的信息.

AVCDecoderConfigurationRecord 包含着是 H.264 解码相关比较重要的 SPS 和 PPS 信息, 在给 AVC 解码器送数据流之前一定要把 SPS 和 PPS 信息送出,否则的话, 解码器不能正常解码. 而且在解码器 stop 之后再次 start 之前, 如 seek, 快进快退状态切换等, 都需要重新送一遍 SPS 和 PPS 的信息. AVCDecoderConfigurationRecord 在 FLV 文件中一般情况也只出现 1 次, 也就是第一个 video tag.

AVCDecoderConfigurationRecord 长度为 sizeof(UI8) * (11 + sps_size + pps_size)

字段 类型 说明
AVCDecoderConfigurationRecord
configurationVersion UI8 版本号, 1
AVCProfileIndication UI8 SPS[1]
profileCompatibility UI8 SPS[2]
AVCLevelIndication UI8 SPS[3]
reserved UB[6] 111111
lengthSizeMinusOne UB[2] NALUnitLength - 1, 一般为 3
reserved UB[3] 111
numberOfSequenceParameterSets UB[5] SPS 个数, 一般为 1
sequenceParameterSetNALUnits UI8[sps_size + 2] sps_size(16bits) + sps(UI8[sps_size])
numberOfPictureParameterSets UI8 PPS 个数, 一般为 1
pictureParameterSetNALUnits UI8[pps_size + 2] pps_size(16bits) + pps(UI8[pps_size])

SCRIPTDATA (E.4.4)

ScriptTagBody 内容用 AMF 编码

字段 类型 说明
SCRIPTDATA
ScriptTagBody
Name SCRIPTDATAVALUE Method or object name. SCRIPTDATAVALUE.Type = 2 (String)
Vale SCRIPTDATAVALUE AMF arguments or object properties.
SCRIPTDATAVALUE
Type UI8 ScriptDataValue 的类型
ScriptDataValue 各种类型 Script data 值

一个 SCRIPTDATAVALUE 记录包含一个有类型的 ActionScript 值.

onMetadata (E.5)

FLV metadata object 保存在 SCRIPTDATA 中, 叫 onMetaData. 不同的软件生成的 FLV 的 properties 不同.

字段 类型 说明
onMetaData
audiocodecid Number Audio codec ID used in the file
audiodatarate Number Audio bit rate in kilobits per second
audiodelay Number Delay introduced by the audio codec in seconds
audiosamplerate Number Frequency at which the audio stream is replayed
audiosamplesize Number Resolution of a single audio sample
canSeekToEnd Boolean Indicating the last video frame is a key frame
creationdate String Creation date and time
duration Number Total duration of the file in seconds
filesize Number Total size of the file in bytes
framerate Number Number of frames per second
height Number Height of the video in pixels
stereo Boolean Indicating stereo audio
videocodecid Number Video codec ID used in the file (see E.4.3.1 for available CodecID values)
videodatarate Number Video bit rate in kilobits per second
width Number Width of the video in pixels

keyframes 索引信息

官方的文档中并没有对 keyframes index 做描述, 但是, flv 的这种结构每个 tag 又不像 TS 有同步头, 如果没有 keyframes index 的话, seek 及快进快退的效果会非常差, 因为需要一个 tag 一个 tag 的顺序读取. 后来在做 flv 文件合成的时候, 发现网上有的 flv 文件将 keyframes 信息隐藏在 Script Tag 中. keyframes 几乎是一个非官方的标准, 也就是民间标准.

两个常用的操作 metadata 的工具是 flvtool2 和 FLVMDI, 都是把 keyframes 作为一个默认的元信息项目. 在 FLVMDI 的主页上有描述:

  keyframes: (Object) This object is added only if you specify the /k switch. 'keyframes' is known to FLVMDI and if /k switch is not specified, 'keyframes' object will be deleted.

  'keyframes' object has 2 arrays: 'filepositions' and 'times'. Both arrays have the same number of elements, which is equal to the number of key frames in the FLV. Values in times array are in 'seconds'. Each correspond to the timestamp of the n'th key frame. Values in filepositions array are in 'bytes'. Each correspond to the fileposition of the nth key frame video tag (which starts with byte tag type 9).

也就是说 keyframes 中包含着 2 个内容 ‘filepositions’ 和 ‘times’ 分别指的是关键帧的文件位置和关键帧的 PTS. 通过 keyframes 可以建立起自己的 Index, 然后在 seek 和快进快退的操作中, 快速有效地跳转到你想要找的关键帧位置进行处理.

FLV 分析工具

HTTP-FLV

HTTP-FLV, 即将音视频数据封装成 FLV, 然后通过 HTTP 协议传输给客户端.

这里首先要说一下, HLS 其实是一个 “文本协议”, 而并不是一个流媒体协议. 那么, 什么样的协议才能称之为流媒体协议呢?

流(stream): 数据在网络上按时间先后次序传输和播放的连续音/视频数据流. 之所以可以按照顺序传输和播放连续是因为在类似 RTMP, FLV 协议中, 每一个音视频数据都被封装成了包含时间戳信息头的数据包. 而当播放器拿到这些数据包解包的时候能够根据时间戳信息把这些音视频数据和之前到达的音视频数据连续起来播放. MP4, MKV 等等类似这种封装, 必须拿到完整的音视频文件才能播放, 因为里面的单个音视频数据块不带有时间戳信息, 播放器不能将这些没有时间戳信息数据块连续起来, 所以就不能实时的解码播放.

延迟分析

理论上(除去网络延迟外), FLV 可以做到仅仅一个音视频 tag 的延迟.

相比 RTMP 的优点:

  • 可以在一定程度上避免防火墙的干扰 (例如, 有的机房只允许 80 端口通过).
  • 可以很好的兼容 HTTP 302 跳转, 做到灵活调度.
  • 可以使用 HTTPS 做加密通道.
  • 很好的支持移动端(Android, IOS).

抓包分析

打开网宿的 HTTP-FLV 流:

http://175.25.168.16/pl3.live.panda.tv/live_panda/d4e0a83a7e0b0c6e4c5d03774169fa3e.flv?wshc_tag=0&wsts_tag=57e233b1&wsid_tag=6a27c14e&wsiphost=ipdbm

HTTP/1.1 200 OK
Expires: Wed, 21 Sep 2016 07:16:02 GMT
Cache-Control: no-cache
Content-Type: video/x-flv
Pragma: no-cache
Via: 1.1 yc16:3 (Cdn Cache Server V2.0)
Connection: close

发现响应头中出现 Connection: close 的字段, 表示网宿采用的是短连接, 则直接可以通过服务器关闭连接来确定消息的传输长度.

如果 HTTP Header 中有 Content-Length, 那么这个 Content-Length 既表示实体长度, 又表示传输长度. 而 HTTP-FLV 这种流, 服务器是不可能预先知道内容大小的, 这时就可以使用 Transfer-Encoding: chunked 模式来传输数据了.

如下的响应就是采用的Chunked的方式进行的传输的响应头:

HTTP/1.1 200 OK
Server: openresty
Date: Wed, 21 Sep 2016 07:38:01 GMT
Content-Type: video/x-flv
Transfer-Encoding: chunked
Connection: close
Expires: Wed, 21 Sep 2016 07:38:00 GMT
Cache-Control: no-cache

mqkv 一个通用的基于分布式消息队列的分布式存储

公司直播系统的源站集群需要一个中心存储服务来提供流的元数据信息的存储和查询功能。由于源站集群是部署在全国各地的多个机房, 多个节点上的。所以, 这里是一个典型的分布式存储的应用场景。这个服务的稳定性非常重要, 也直接影响到直播系统整个服务整体的可用性。

下面, 我来与大家分享交流一下, 我们源站集群共享存储方案经历了哪些变化, 最后, 介绍下我们的下一代共享存储方案 mqkv, 也欢迎熟悉分布式与存储的小伙伴提提建议。

由于, 我们公司的 CDN 系统是使用 Redis 的主从同步机制来进行配置元数据的同步和分发到全国各个 CDN 节点的, 我们对 Redis 的各种特性比较熟。所以, 这里, 我们最先考虑的也是 Redis 的方案。

应用场景

分布式存储也是一个很宽泛的概念, 可以选择的技术很多。首先, 要分析好我们的应用场景是怎样的, 哪些是我们 care 的, 哪些是我们不太 care 的。

推流源站会对流的元数据信息进行写操作, 而拉流源站会进行流的元数据的读操作。而直播是个典型的一对多提供服务的场景, 所以, 相应的, 我们的共享存储方案也是写少读多。同时, 我们可以允许少量的数据丢失, 我们更看重的是服务无SPOF, 稳定性与读写性能。

方案一: Redis Cluster

Redis 3.0 之后推出了自己的 Redis Cluster 集群方案, 所以, 在最开始, 我们还是优先去尝试官方的集群方案。但是, 我们发现 Redis Cluster 仍然不能实现跨机房容灾, 跨机房高可用的功能还是需要自己来实现。所以, 我们源站集群共享存储的最初版本是每台源站的读写都是去操作部署在一个 BGP 机房的 Redis Cluster。但这个方案会导致读写性能都不理想, 所以, 后面我们考虑了在程序中引入缓存, 来减少读压力。

方案二: Redis Cluster + TTL MemoryCache

为了优化读性能, 我们首先考虑在程序中加入缓存, 结合我们的业务场景, 我们开发引入了基于 groupcache 的超时缓存方案。我们比较专注最新的数据, 过期的数据没有意义, 反而会影响我们的业务逻辑, 所以, 元数据的每个 key 都可以配置一个 TTL 过期时间, 当时间到达时, 这个 key 就会 expire 掉。在一个 key expire 的同时有大量访问这个 key 的请求这个临界点时, groupcache 的内部有锁机制保障, 不会出现大量的回源请求, 给中心存储造成压力, 这个方案在我们的测试环境中, 测试结果跟我们预期基本一致。

方案三: Redis Master/Slave 读写分离

而其实我们上面的方案并没有上线, 就有人提出了 Redis 读写分离的方案。写到一个中心 Redis Master 节点, 然后每台源站只去读本机的 Redis Slave 节点, 通过 Redis 的主从同步机制来确保数据一致性。最开始没有用这套方案是因为 Redis 的主从同步机制与 Redis Cluster/Sentinel 有冲突, 不能共存。后来, 运维提供了通过 keepalived 来保证 redis 的高可用, 所以, 我们线上采用了这套方案, 通过运行实际效果来看, 比较理想。

方案四: etcd

我们也在调研一些其他的分布式 kv 存储的方案, 下面是 etcd 的 benchmark 结果。

etcd_test

etcd 并发量10, 100, 1000 分别测试 PUT, GET, DELETE 连续三个操作。结果平均响应时间也跟着上去了。这个结果我们是无法接受的, 所以, 这个方案没有再继续深入研究了。

方案五: mqkv

在对现有的一些分布式存储以及集群方案测试结果非常失望后, 我们开始考虑自研适用于我们这种业务场景的分布式存储方案。

我们预期要达到的效果:

  • 每台源站读写操作都在本地, 要有较好的读写性能。写操作可以异步化, 尽快返回。
  • 无中心节点, 无 SPOF。(写在一个中心的方案, 写的这个主 Redis 还是一个单节点, 一旦机房断网或者断电, 那么, 整个直播服务就不可用了)
  • 允许出现少量的写数据失败的情况。

基于以上几点我设计了 mqkv: * 支持跨机房部署, 避免的单点问题。 * 读写操作都在同一个源站的本机, 读写性能均达到最佳。 * mqkv 提供给应用的接口使用的协议是 Redis 协议, 兼容大量的 redis client driver。

mqkv 架构图

mqkv_arch

实现过程

首先, 我实现了一个 Go 语言版的 redis server 的 api 框架 RedFace。设计主要参考了 net/http 的接口, 由于, 目前的业务逻辑还比较简单, 所以, 太复杂的代码并不多。

比较巧的是, 在我实现了这个包的那个周末, 我看到了 hacker news 上有个跟我的项目功能非常类似的一个项目上了头条, 叫 redcon。不过从接口可以明显的看出, 我实现的版本接口更加简洁, 友好。具体地, 可以对比下 redcon 的 example 和 redface 的 example

不过, redcon 的 benchmark 性能确实比我实现的要好, 这里, 我暂时还没有找到具体的原因, 哈哈, 欢迎高手帮忙分析下。

接下来, 就是 mqkv 的实现了, 其实在架构与逻辑确定好了, 轮子也造好了之后, 写代码就变成很简单的事情了。简单的说, 我就是将应用的 write 操作都异步化, 通过分布式消息队列将消息发送出去, read 操作直接 proxy 本地的 kv 存储。其中利用了 nsq 的 PUB/SUB 模型, 所有 write 操作都 produce 到 mqkv_topic 这个 topic 下, 同时, 每个 mqkv 也作为消费者注册消费 topic 为 mqkv_topic, channel 为本机 hostname 的消息。这样, 就实现一写多读的消息分发模型了。每个 mqkv 在本地的 redis 进行全量的 kv 存储, 这里的 Redis 连接, 我也是用了 Redis 中间件 来兼容 normal redis/redis sentinel/redis cluster 各种集群与高可用 redis 方案。

这样, mqkv 就完成了, 是不是很简单。

benchmark

  • 当 mqkv 启动后, 其实, 对于应用来说, 他本身就化身成为了一个 normal redis, 所以, 可用 redis-benchmark 进行压测。
❯ redis-benchmark -p 6389 -t set,get -n 1000000 -q -P 512 -c 512

SET: 28659.04 requests per second

GET: 23171.21 requests per second

以上是在我的 macbook pro 上性能测试结果。

监控

  • 支持 pprof 性能监控: GET /debug/pprof/profile
  • 支持 stats channel 信息 api: GET /api/v1/consumer_stats 可以参看当前 mqkv 自己所连 channel 的消息消费情况。

其他的一些还需解决的问题

当然 mqkv 还存在很多不完美的地方。

  • nsq 部署依赖 dns, 需要将 机器的 hostname 与 ip 关系写到 /etc/hosts 里面。不知道有没有更简单的方法。
  • 机器扩容, 源站宕机一段时间后恢复, 元数据如果恢复, 如何保证数据一致性。
  • 也在考虑开发一个 kafka 版本进行对照, 看是否能保证更好的数据一致性。

设计优先的 Restful API 服务开发与服务解耦实践

最近公司需要一个 API 服务提供给用户用来查询直播系统的一些流相关数据接口。正好我们公司应用组的同事 CatTail 分享了他们在这个季度将他们的 API 服务进行了重构, 使用了 swagger 这套 OpenAPI 标准工具。正好借此机会, 学来用用。

Swagger 框架的选择

应用组这边用的是 Node.js 开发的 API 服务, 我这边涉及到数据的上报与高并发处理, 使用 Go 开发。所以, 需要选择一个 Go 语言版的 Swagger API 框架

经过简单的调研比较后, 我选择了 go-swagger 这个框架。这个开源项目是由 VMware 赞助并维护的, 支持最新的 Swagger 2.0 标准。项目的文档还不算太详细, 有的地方需要自己来摸索下。

我选择这个框架基于如下几点考虑:

  • 支持标准的 http middleware, 方便集成 Alice 插件管理工具, 可以直接使用大量与 net/http 兼容的各种 http middleware。
  • 代码侵入性不大, 如果后期想改用其他框架, 迁移起来也不太麻烦。
  • 自带 validator 功能, API spec 设计好后, 无需在代码中自己去写繁琐的边界校验功能。
  • 可以通过代码生成 API 文档, 这样能 100% 的保证代码与文档保持一致。
  • 由于使用 Swagger 的标准, 方便使用兼容这一标准的一整套工具链, 保证 API 监控, spec render 等, 也能复用应用组那边的资源, 哈哈。
  • 项目也在发展中, 目前支持的 scheme 有 http 和 https, 后续会支持 ws 和 wss。我将来计划开发的 WebRTC 信令服务也可以考虑用这个框架来做。
  • go-swagger 提供很多基础的 helper function 库实现: https://github.com/go-openapi

这个框架不太好的地方, 或者对于 Go 这种强类型语言不太好的地方, 就是 Swagger 基于 spec 定义的 definitions 生成的 model 与 数据库的 model 不能使用一套 struct, 因为涉及到一些 struct tag 的添加没法添加, 区分开后, 就涉及到两种结构之间的数据拷贝的问题, 代码显得有点冗长。暂时我还没想到更好的办法。

直播 API 服务架构设计

首先是整理需求, 和抽象需求的模型, 我这边收到几个类似的业务需求有 禁播/踢流 服务, 触发/定时截图, 触发/定时录制, 流事件回调与查询。都涉及到配置变更和推流与断流事件触发相应的业务逻辑。所以, 我抽象出了如下服务架构。

origin_admin

这个架构同时解决了两个问题:

  • 与应用解耦, 在过去应用配置数据是通过 redis 数据结构与底层服务进行对接的, 当数据结构变更与扩展时, 都会发生各种各样繁琐的数据兼容与繁琐的流程。现在底层服务直接提供 Restful API 风格的 CRUD 操作, 底层服务自身来进行数据的持久化, 当需要存储结构调优和变更时, 就可以自己内部来调整, 而不用去动对外的接口。这在互联网环境, 需求与架构不断变化, 快速迭代开发的情况相匹配。
  • 由于 CRUD 接口都在我的服务内部, 配置变更触发事件, 这个逻辑变得更加容易实现和控制。

统一的异步消息处理机制与事件回调机制:

  • 配置变更 与 流事件触发, 可以用类似的异步逻辑来处理, 这里目前是通过 NSQ 消息队列, 来异步处理消费这些事件。
  • 推流断流事件与配置变更事件可以联动触发其他服务的业务逻辑。这里我设计了统一的事件回调机制, 可以回调到录制服务, 截图服务, 踢流服务甚至客户自己的服务等。

说说技术选型

  • swagger: OpenAPI 标准, 可以利用 swagger 一整套工具, 方便后续做 API 统计与监控。
  • mq: NSQ, 简单, 够用, 稳定。
  • db: mongodb, 文档存储, 支持大数据, 支持单个 field 的 CRUD 操作, 方便 scale, 够用的读写性能, 够用的查询功能。

通过 Swagger spec 来设计 API

这里截取一小部分来说明

swagger: "2.0"
info:
  contact:
    email: akagi201@gmail.com
    name: Akagi201
    url: http://akagi201.org
  description: UPYUN live streaming API service based on go-swagger
  title: UPYUN live streaming API service
  version: 0.1.0
# during dev, should point to your local machine
host: localhost:2201
# basePath will be prefixed to all paths
basePath: /api/v1
produces:
  - application/json
consumes:
  - application/json
schemes:
  - http
  - https
tags:
  - name: system
    description: 系统信息
  - name: stream
    description: 流信息
  - name: event
    description: 对内 事件 接口
  - name: config
    description: 对内 应用配置 接口
paths:
  /system/version:
    get:
      tags:
        - system
      summary: 获取版本信息
      operationId: getVersion
      responses:
        200:
          description: get versions
          schema:
            $ref: "#/definitions/version"
        default:
          description: error
          schema:
            $ref: "#/definitions/error_response"
definitions:
  version:
    type: object
    title: version_info
    required:
      - version
      - signature
    properties:
      version:
        type: string
        minLength: 1
      signature:
        type: string
        minLength: 1
  error_response:
    type: object
    title: error_response
    required:
      - code
      - data
    properties:
      code:
        type: integer
        format: int32
        description: 所有错误码统一定义
        example: 201
      data:
        type: string
        minLength: 1
        description: 详细错误信息
        example: Operator already exists

限制条件与文档说明, 例子都可以写到 spec 里面。完整的 spec 文档, 请参考 http://swagger.io/specification/

可以使用开源的 swagger-ui 来 render 这份文档, 当然如果你觉得他太丑了, 可以自己写个 render。

uplive_api

使用 go-swagger 框架

目录结构

❯ tree -L 3
.
├── README.md
├── client // client SDK
│   ├── config
│   │   ├── config_client.go
│   │   ├── del_stream_config_parameters.go
│   │   ├── del_stream_config_responses.go
│   │   ├── get_stream_config_parameters.go
│   │   ├── get_stream_config_responses.go
│   │   ├── set_stream_config_parameters.go
│   │   └── set_stream_config_responses.go
│   ├── event
│   │   ├── event_client.go
│   │   ├── get_stream_events_parameters.go
│   │   ├── get_stream_events_responses.go
│   │   ├── stream_event_collector_parameters.go
│   │   └── stream_event_collector_responses.go
│   ├── origin_admin_client.go
│   ├── stream
│   │   ├── get_stream_status_parameters.go
│   │   ├── get_stream_status_responses.go
│   │   └── stream_client.go
│   └── system
│       ├── get_version_parameters.go
│       ├── get_version_responses.go
│       └── system_client.go
├── cmd // 最终生成的二进制
│   └── origin-admin-server
│       └── main.go
├── models // 根据 spec definition 生成的 model
│   ├── error_response.go
│   ├── result_response.go
│   ├── stream_action.go
│   ├── stream_event.go
│   ├── stream_events.go
│   ├── stream_info.go
│   ├── stream_status.go
│   └── version.go
├── restapi // api 处理逻辑
│   ├── configure_origin_admin.go // 自定义 API 逻辑入口文件
│   ├── controller.go // 我添加的针对每个 API 的具体处理 Handler 逻辑实现, 后期可以拆分成目录和多个文件的结构
│   ├── doc.go
│   ├── embedded_spec.go
│   ├── operations
│   │   ├── config
│   │   ├── event
│   │   ├── origin_admin_api.go
│   │   ├── stream
│   │   └── system
│   └── server.go
├── store // 与 mgo 交互的 model 定义
│   └── mongo
│       ├── event.go
│       └── stream_config.go
└── swagger.yml // spec 定义的地方

总结与体会

  • 对于新人来说, 会有一定的学习成本, 包括开发理念的传递。不过 design first 这个理念是非常好的。
  • 整体来说使用这套框架还是比较顺利, 没有遇到什么无法解决的问题, 也希望 go-swagger 能够不断完善起来, 另外, 大家用的多了, 自然功能也就完善了, 所以, 在此, 还是向大家隆重推荐一下。

Reading WebRTCBook

webrtcbook-cover

5 月 29 号在美国亚马逊上买了 这本书. 邮件上预计是 7 月 5 日到. 结果今天( 6 月 13 日)就到了, 真是开心. BTW, 这算是我买过的最贵的书了. 购买地址

webrtcbook-order

书里面的图片都是彩印的. 不算太厚, 才 295 页.