新書推薦:
《
少女映像室 唯美人像摄影从入门到实战
》
售價:HK$
110.9
《
詹姆斯·伍德系列:不负责任的自我:论笑与小说(“美国图书评论奖”入围作品 当代重要文学批评家詹姆斯·伍德对“文学中的笑与喜剧”的精湛研究)
》
售價:HK$
87.4
《
武当内家散手
》
售價:HK$
50.4
《
诛吕:“诸吕之乱”的真相与吕太后时期的权力结构
》
售價:HK$
99.7
《
炙野(全2册)
》
售價:HK$
78.2
《
女人的胜利
》
售價:HK$
55.9
《
数据有道:数据分析+图论与网络+微课+Python编程(鸢尾花数学大系:从加减乘除到机器学习)
》
售價:HK$
266.6
《
500万次倾听:陪伤心的人聊聊
》
售價:HK$
53.8
|
編輯推薦: |
本书是作者理论研究与实践操作的成果,较深入的探讨了Storm的理论基础与实践应用,语言通俗易懂,适用于各类大数据、Storm开发者,爱好者。使读者在较短的时间内系统掌握Storm的理论基础,面向Linux平台,搭建与研发自己的基于Storm的大数据处理平台。
|
內容簡介: |
本书由基础知识、安装与部署、研发与维护、进阶知识、企业应用5个模块构成,并细分为20个章节,其中基础知识6章、安装与部署4章、研发与维护4章、进阶知识5章、企业应用1章,分别介绍了Storm的基本原理、Topology组件、Spout组件、Bolt组件、ZooKeeper集群、Storm的安装与配置、实战环节等内容,包括理论基础、环境搭建、研发准备、企业应用等。
本书理论联系实际,通过大量实例分析,让读者在较短的时间内掌握Storm的使用,搭建并研发出自己的基于Storm的大数据处理平台。
本书适合所有大数据处理、实时流数据处理、Storm的开发者或爱好者,也适合高等院校和培训学校相关专业的师生参考使用。
|
關於作者: |
赵必厦,硕士,具有多年的面向对象语言研发经验,熟练掌握C 、Java、C#等面向对象语言,专注于云计算、大数据、互联网等方面的研究与研发工作,积累了丰富的经验。程丽明,硕士,目前从事服务器虚拟化与云计算构建、信息化安全的研究和产品开发工作。有多年的项目开发经验,研究兴趣包括虚拟化、云计算、SDN、移动办公安全等。
|
目錄:
|
目 录
第1章 Storm简介
1.1 什么是Storm 1
1.2 Storm的诞生 3
1.2.1 从Twitter说起 3
1.2.2 Twitter需要处理大批实时性要求高的大数据业务 3
1.2.3 Storm帮助Twitter解决实时海量大数据处理问题 4
1.3 Storm的成长 5
1.3.1 Storm正式开源
5
1.3.2 Apache接管并孵化Storm 5
1.3.3 Storm的核心技术和基本组成 6
1.3.4 Storm的项目小组
7
1.3.5 Storm的技术支持网站 10
1.4 Storm的优势 13
1.4.1 集成多种技术 13
1.4.2 简单的API 13
1.4.3 可扩展的 14
1.4.4 容错的 14
1.4.5 保证数据处理 14
1.4.6 可以使用任何语言
14
1.4.7 部署和操作简单 15
1.4.8 自由开源 15
1.5 Storm的应用现状和发展趋势 15
1.5.1 应用现状 16
1.5.2 发展趋势 18
1.6 如何学习Storm 20
1.7 本书的章节安排及学习建议
21
1.7.1 本书的章节安排 21
1.7.2 关于如何阅读本书的建议 22
1.8 本章小结 23
第 2 章 Storm的基本知识
2.1 概念 24
2.1.1 元组(Tuple) 24
2.1.2 流(Stream) 25
2.1.3 龙卷(Spout) 26
2.1.4 闪电(Bolt) 27
2.1.5 拓扑(Topology) 27
2.1.6 主控节点与工作节点
28
2.1.7 Nimbus进程与Supervisor进程 28
2.1.8 流分组(Stream
grouping) 28
2.1.9 工作进程(Worker) 28
2.1.10 任务(Task) 28
2.1.11 执行器(Executor) 28
2.1.12 可靠性(Reliability) 29
2.2 Storm的配置 29
2.2.1 Storm的配置类型
29
2.2.2 defaults.yaml文件 30
2.2.3 storm.yaml文件 33
2.2.4 Config类 34
2.3 序列化(Serialization) 35
2.3.1 动态类型 36
2.3.2 自定义序列化 36
2.3.3 Java序列化 37
2.3.4 特定组件序列化注册
37
2.4 容错机制 37
2.4.1 Worker进程死亡
37
2.4.2 节点死亡 38
2.4.3 Nimbus或者Supervisor守护进程死亡 38
2.4.4 Nimbus是否是单点故障 38
2.5 可靠性机制保证消息处理 38
2.5.1 消息被完全处理的含义 38
2.5.2 如果一个消息被完全处理或完全处理失败会发生什么 39
2.5.3 Storm如何保证可靠性 40
2.5.4 Storm如何实现可靠性 43
2.5.5 调节可靠性 44
2.6 消息传输机制 45
2.6.1 ZeroMQ 45
2.6.2 Netty 45
2.6.3 自定义消息通信机制
45
2.7 Storm的开发环境与生产环境 46
2.7.1 开发环境与本地模式
46
2.7.2 生产环境与远程模式
46
2.7.3 开发环境与生产环境的对比 47
2.8 Storm拓扑的并行度(parallelism) 48
2.8.1 工作进程、执行器和任务 48
2.8.2 配置拓扑的并行度
49
2.8.3 拓扑示例 50
2.8.4 如何改变运行中拓扑的并行度 51
2.9 Storm命令行客户端
52
2.10 Javadoc文档
56
2.11 本章小结 56
第 3 章 拓扑详解
3.1 什么是拓扑 57
3.2 TopologyBuilder 57
3.3 流分组 59
3.3.1 什么是流分组 59
3.3.2 不同的流分组方式
60
3.4 一个简单的拓扑 64
3.5 在本地模式下运行拓扑
67
3.6 在生产集群上运行拓扑
68
3.6.1 常见的配置 70
3.6.2 杀死拓扑 70
3.6.3 更新运行中的拓扑
71
3.6.4 监控拓扑 71
3.7 拓扑的常见模式 71
3.7.1 流连接(Stream
Join) 71
3.7.2 批处理(Batching) 72
3.7.3 BasicBolt 72
3.7.4 内存中缓存与字段的组合 72
3.7.5 流的top N 72
3.7.6 高效保存最近更新缓存对象的TimeCacheMap(已弃用) 74
3.7.7 分布式RPC的CoordinatedBolt与KeyedFairBolt 75
3.8 本地模式与StormSubmitter的对比 75
3.9 多语言协议(Multi-Language
Protocol) 77
3.10 使用非JVM语言操作Storm 81
3.10.1 支持的非Java语言 81
3.10.2 对Storm使用非Java语言 81
3.10.3 实现非Java
DSL的笔记 82
3.11 Hook 82
3.12 本章小结 83
第 4 章 组件详解
4.1 基本接口 84
4.1.1 IComponent接口 84
4.1.2 ISpout接口
85
4.1.3 IBolt接口 86
4.1.4 IRichSpout与IRichBolt接口 88
4.1.5 IBasicBolt接口 88
4.1.6 IStateSpout与IRichStateSpout接口 89
4.2 基本抽象类 90
4.2.1 BaseComponent抽象类 90
4.2.2 BaseRichSpout抽象类 90
4.2.3 BaseRichBolt抽象类 91
4.2.4 BaseBasicBolt抽象类 92
4.3 事务接口 92
4.3.1 IPartitionedTransactionalSpout 92
4.3.2
IOpaquePartitionedTransactionalSpout 94
4.3.3 ITransactionalSpout 95
4.3.4 ICommitterTransactionalSpout 96
4.3.5 IBatchBolt 97
4.4 组件之间的相互关系 97
4.5 本章小结 98
第 5 章 Spout详解
5.1 可靠的与不可靠的消息
99
5.2 Spout获取数据的方式
102
5.2.1 直接连接(Direct
Connection) 102
5.2.2 消息队列(Enqueued
Messages) 103
5.2.3 DRPC(分布式RPC) 104
5.3 常用的Spout 104
5.3.1 Kestrel作为Spout的数据源 104
5.3.2 AMQP作为Spout的数据源 104
5.3.3 JMS作为Spout的数据源 105
5.3.4 Redis作为Spout的数据源 105
5.3.5 beanstalkd作为Spout的数据源 105
5.4 学习编写Spout类 105
5.5 本章小结 106
第 6 章 Bolt详解
6.1 Bolt概述 107
6.2 可靠的与不可靠的Bolt
108
6.2.1 使用Anchoring机制实现可靠的Bolt 108
6.2.2 使用IBasicBolt接口实现自动确认 109
6.3 复合流与复合Anchoring
110
6.3.1 复合流 110
6.3.2 复合Anchoring
110
6.4 使用其他语言定义Bolt
111
6.5 学习编写Bolt类 111
6.5.1 可靠的Bolt
111
6.5.2 不可靠的Bolt
112
6.6 本章小结 113
第 7 章 ZooKeeper详解
7.1 ZooKeeper简介
114
7.2 ZooKeeper的下载和部署 114
7.2.1 ZooKeeper的下载 114
7.2.2 ZooKeeper的部署 115
7.3 ZooKeeper的配置
117
7.4 ZooKeeper的运行
119
7.5 ZooKeeper的本地模式实例 120
7.6 ZooKeeper的数据模型 121
7.6.1 ZNode 122
7.6.2 ZooKeeper中的时间 123
7.6.3 ZooKeeper的Stat结构 123
7.7 ZooKeeper的命令行操作范例 124
7.8 Storm在ZooKeeper中的目录结构 127
7.9 本章小结 128
第 8 章 基础软件的安装与使用
8.1 Linux的基本操作
129
8.1.1 环境变量 129
8.1.2 常用命令 130
8.2 JDK的下载与配置
134
8.2.1 Sun JDK的下载
134
8.2.2 在Linux下安装JDK 135
8.2.3 在Windows下安装JDK 136
8.3 GitHub托管项目的下载 141
8.4 Maven的下载与配置
143
8.4.1 Maven的下载
143
8.4.2 在Linux下部署Maven 144
8.4.3 在Windows下部署Maven 145
8.5 其他软件Notepad
146
8.6 本章小结 147
第 9 章 Storm的安装与配置
9.1 Storm集群的安装步骤与准备工作 148
9.1.1 搭建ZooKeeper集群 148
9.1.2 安装Storm的本地依赖 148
9.1.3 下载并解压Storm发行版本 151
9.1.4 配置storm.yaml文件 153
9.1.5 启动Storm的守护进程 154
9.2 本地模式的Storm完整的配置命令 157
9.3 本章小结 159
第 10 章 Storm集群搭建实践
10.1 准备工作 160
10.1.1 概述 160
10.1.2 配置hosts文件 161
10.1.3 配置静态IP 161
10.1.4 集群SSH无密码 163
10.1.5 修改主机名 164
10.1.6 关闭防火墙 164
10.1.7 同步时间 164
10.1.8 安装JDK 165
10.2 ZooKeeper集群的搭建 166
10.2.1 部署第一个节点
166
10.2.2 部署第i个节点 167
10.2.3 启动ZooKeeper集群 167
10.2.4 查看ZooKeeper状态 168
10.2.5 关闭ZooKeeper集群 168
10.2.6 清理ZooKeeper集群 168
10.3 Storm集群的搭建
168
10.3.1 安装Storm依赖(每个Storm节点) 168
10.3.2 部署第一个节点
169
10.3.3 部署第i个节点 171
10.3.4 启动Storm守护进程 171
10.4 本章小结 172
第 11 章 准备Storm的开发环境
11.1 Storm的开发环境
173
11.1.1 什么是Storm的开发环境 173
11.1.2 如何管理Storm
173
11.1.3 如何提交拓扑到集群
176
11.2 Eclipse的下载与配置 176
11.2.1 Eclipse的下载 176
11.2.2 Eclipse的配置与运行 177
11.2.3 Eclipse插件的安装 178
11.3 使用Maven管理项目 180
11.3.1 Maven的下载与配置 180
11.3.2 配置pom.xml文件 180
11.3.3 运行Maven命令 182
11.4 使用Nexus搭建本地Maven私服 183
11.4.1 下载Nexus
183
11.4.2 运行Nexus
184
11.4.3 登录Nexus后台 184
11.4.4 配置Repositories
185
11.4.5 配置setting.xml文件 187
11.4.6 修改Eclipse的Maven插件的配置 189
11.5 使用SVN管理代码版本 189
11.5.1 在Windows下搭建SVN服务器 189
11.5.2 在Linux下搭建SVN服务器 191
11.5.3 安装SVN客户端 191
11.6 部署单节点的Storm集群 192
11.6.1 部署伪分布的ZooKeeper
192
11.6.2 部署伪分布的Storm集群 192
11.7 本章小结 194
第 12 章 开发自己的Storm应用
12.1 新建Maven项目 195
12.2 修改为适合Storm开发的项目 198
12.2.1 对包名进行分类管理
198
12.2.2 修改pom.xml文件 199
12.3 编写代码 201
12.3.1 编写Spout类 201
12.3.2 编写Bolt类 202
12.3.3 编写Topology类 203
12.4 本地测试运行 204
12.5 提交到Storm集群运行 205
12.5.1 使用Maven打包 205
12.5.2 提交jar包到集群 205
12.6 本章小结 206
第 13 章 storm-starter详解
13.1 storm-starter项目概述 207
13.2 storm-starter的下载 209
13.3 使用Maven进行管理 211
13.3.1 使用Maven打包storm-starter 211
13.3.2 使用Maven直接运行ExclamationTopology 211
13.3.3 使用Maven运行单元测试 211
13.4 在Eclipse中运行 212
13.4.1 新建Maven项目的方式 212
13.4.2 导入已存在的项目的方式 214
13.5 storm-starter的入门例子 214
13.5.1 ExclamationTopology 214
13.5.2 WordCountTopology 216
13.5.3 ReachTopology 219
13.6 storm-starter的其他例子 225
13.6.1 BasicDRPCTopology 225
13.6.2 ManualDRPC 226
13.6.3 PrintSampleStream 226
13.6.4 RollingTopWords 227
13.6.5 SkewedRollingTopWords 228
13.6.6 SingleJoinExample 229
13.6.7 TransactionalGlobalCount 230
13.6.8 TransactionalWords 230
13.6.9 WordCountTopologyNode 231
13.7 本章小结 232
第 14 章 研发与集群管理技巧
14.1 使用daemontools监控Storm进程 233
14.1.1 daemontools简介 233
14.1.2 安装daemontools
234
14.1.3 编写监控脚本
234
14.2 使用Monit监控Storm 236
14.2.1 Monit简介
236
14.2.2 安装Monit
237
14.2.3 配置Monit
238
14.2.4 启动Monit
240
14.2.5 获取Monit帮助信息 241
14.3 常用的集群操作命令
242
14.4 使用Storm的经验与建议 243
14.5 本章小结 244
第 15 章 DRPC详解
15.1 概述 245
15.2 DRPCTopologyBuilder 246
15.2.1 LinearDRPCTopologyBuilder 246
15.2.2 LinearDRPCTopologyBuilder提供的方法 246
15.2.3 LinearDRPCTopologyBuilder使用范例 248
15.2.4 LinearDRPCTopologyBuilder的工作原理 249
15.2.5 LinearDRPCTopologyBuilder目前已弃用 249
15.3 本地模式的DRPC
249
15.4 远程模式的DRPC
250
15.5 一个复杂的DRPC例子(计算reach值) 250
15.6 非线性DRPC 253
15.7 本章小结 253
第 16 章 事务拓扑详解
16.1 什么是事务拓扑 254
16.1.1 设计1 254
16.1.2 设计2 255
16.1.3 设计3(Storm的设计 256
16.2 事务拓扑的设计细节
256
16.3 事务拓扑的实现细节
257
16.3.1 事务Spout的工作原理 257
16.3.2 对于给定的事务id不能发射相同的Batch的处理 258
16.3.3 更多的细节 260
16.4 事务拓扑API 260
16.4.1 Bolt 260
16.4.2 事务Spout
261
16.4.3 配置 262
16.5 TransactionalTopologyBuilder 262
16.5.1 TransactionalTopologyBuilder提供的方法 262
16.5.2 TransactionalTopologyBuilder类已弃用 266
16.6 一个简单的例子 266
16.7 本章小结 269
第 17 章 Trident详解
17.1 Trident概述
270
17.1.1 简单的例子单词统计(TridentWordCount) 270
17.1.2 另一个例子计算Reach值(TridentReach) 274
17.1.3 字段和元组 275
17.1.4 状态(State) 276
17.1.5 Trident拓扑的执行 277
17.2 Trident API 279
17.2.1 概述 279
17.2.2 本地分区操作
279
17.2.3 重新分区操作
283
17.2.4 聚合操作 284
17.2.5 流分组操作 284
17.2.6 合并与连接 285
17.3 Trident的状态
285
17.3.1 Trident状态分类 286
17.3.2 事务Spout(Transactional Spout) 286
17.3.3 不透明事务Spout(Opaque Transactional Spout) 288
17.3.4 非事务Spout(Non-transactional Spout) 289
17.3.5 Spout与State之间的联系 289
17.3.6 State API 290
17.3.7 persistentAggregate方法 294
17.3.8 实现
MapStates 294
17.4 Trident Spout 295
17.4.1 流水线(Pipelining) 296
17.4.2 Trident Spout的类型 296
17.5 本章小结 296
第 18 章 Storm的内部实现
18.1 文件系统分析 297
18.2 数据目录结构 298
18.2.1 Nimbus节点的目录结构 299
18.2.2 Supervisor节点的目录结构 299
18.3 代码库的结构 300
18.3.1 storm.thrift 301
18.3.2 Java接口
316
18.3.3 实现 316
18.4 拓扑的生命周期 318
18.4.1 启动拓扑 319
18.4.2 监控拓扑 321
18.4.3 杀死拓扑 321
18.5 Acking框架的实现
322
18.5.1 异或计算的基本原理
322
18.5.2 Acking框架的实现原理 322
18.5.3 Acker的execute方法 323
18.5.4 待定元组(pending
tuple)和RotatingMap 323
18.6 Metric 324
18.7 本章小结 329
第 19 章 Storm相关的其他项目
19.1 JStorm项目
330
19.1.1 项目简介 330
19.1.2 下载与部署 331
19.1.3 源代码编译 332
19.2 storm-deploy项目 332
19.3 Storm与Kafka
333
19.3.1 Kafka简介
333
19.3.2 Kafka的安装
333
19.3.3 启动服务 334
19.3.4 测试运行 335
19.3.5 Storm与Kafka的项目 337
19.4 storm-kestrel项目 338
19.4.1 storm-kestrel项目简介 338
19.4.2 使用storm-kestrel项目 338
19.4.3 Kestrel服务器和队列 339
19.4.4 添加元素到kestrel
339
19.4.5 从Kestrel中移除元素 340
19.4.6 持续添加元素到Kestrel
341
19.4.7 使用KestrelSpout
342
19.4.8 执行 342
19.5 本章小结 343
第 20 章 企业应用案例
20.1 Storm席卷众多互联网企业 344
20.1.1 Storm的典型应用场景 344
20.1.2 Storm的三大基本应用 345
20.2 Storm在Twitter中的应用 345
20.2.1 Twitter公司简介 345
20.2.2 Storm帮助Twitter提升产品性能 346
20.2.3 MapR在Twitter中的应用简介 346
20.3 Storm在阿里巴巴集团的应用 348
20.3.1 阿里巴巴集团简介
348
20.3.2 Storm在阿里巴巴的应用 348
20.3.3 Storm在淘宝公司的应用 350
20.3.4 Storm在支付宝公司的应用 350
20.4 其他应用Storm的知名企业和项目 351
20.5 本章小结 367
参考资料 368
|
內容試閱:
|
第 5 章
? Spout详解 ?
Spout是Storm数据流的入口。在这一章,将学习Storm数据流的入口点Spout。通过本章,应该掌握Storm获取数据的方式和方法,了解常用的Spout,学会如何编写Spout类。
5.1 可靠的与不可靠的消息
设计拓扑时,一件很重要的事情是要考虑消息的可靠性。如果消息不能被处理而丢失是很严重的问题,我们需要决定如何处理丢失的消息,如何与拓扑作为一个整体处理。例如,处理银行存款的时候,事务一致性是很重要的,不能失去任何消息,任何消息都要被处理。又例如,在数以百万计的数据中计算某些统计指标,但是丢失了少量的数据,最终的计算指标仍然可以假设是相当准确的。
在Storm中,根据每个拓扑的需要,保证信息可靠性,这涉及一个平衡:一个可靠的拓扑必须处理丢失的消息,这就需要更多的资源;一个不可靠的拓扑可能会丢失一些消息,但不占用资源。不管你选择哪一种可靠性策略,Storm都可以提供工具来实现它。
为了管理Spout的可靠性,可以在发射元组的时候,在元组里面包含一个消息ID(collector.emitnew Values,tupleId)。
当元组处理成功时调用ack方法,当元组处理失败时调用
fail方法。当元组被所有的目标Bolt和所有的锚定Bolt所处理时,认为元组处理成功。当如下情况发生时,元组处理会失败:
l collector.failtuple方法被目标Spout调用。
l 处理时间超过配置的超时时间。
下面,让我们来看Getting Started with Storm一书中的例子,关于其完整的代码,可以参考如下网址:
https:github.comstorm-bookexamples-ch04-spoutstreemastersrcmain
javabanktransactions
假设要处理银行事务,有以下的要求:
l 如果一个事务失败,则重发消息。
l 如果事务失败了多次,则终止拓扑。
在拓扑中,有1个Spout和1个Bolt。Spout会发送100个随机的事务ID,Bolt在接收一个元组时有80%的可能性会失败。Bolt使用Map来发射事务消息元组,因此很容易对消息进行重发。
Spout的主要成员变量定义如下:
private static final Integer MAX_FAILS = 2;
最大失败次数
MapInteger,String messages;
全部的消息Map
MapInteger,Integer transactionFailureCount; 消息的失败次数计数Map
MapInteger,String toSend;
发送的消息Map
private SpoutOutputCollector collector;
SpoutOutputCollector对象
由于是说明性的例子,所以,在open方法中进行了一系列的初始化。
public void openMap conf, TopologyContext context, SpoutOutputCollector
collector {
Random random = new Random;
messages = new HashMapInteger,
String;
toSend = new HashMapInteger,
String;
transactionFailureCount = new
HashMapInteger, Integer;
forint i = 0; i 100; i {
messages.puti,
"transaction_" random.nextInt;
transactionFailureCount.puti, 0;
}
toSend.putAllmessages;
this.collector = collector;
}
nextTuple方法定义如下:
public void nextTuple {
if!toSend.isEmpty{
forMap.EntryInteger, String
transactionEntry : toSend.entrySet{
Integer transactionId =
transactionEntry.getKey;
String transactionMessage =
transactionEntry.getValue;
collector.emitnew
ValuestransactionMessage,transactionId;
}
toSend.clear;
}
try {
Thread.sleep1;
} catch InterruptedException e {}
}
如果toSend消息队列Map不为空,即存在消息等待发送,则把每个事务的ID与事务消息作为一个元组发送,然后清空toSend消息队列。在nextTuple中调用Map的clear方法是安全的,因为nextTuple、fail、ack方法是修改Map的方法,它们都运行在相同的线程中。
messages和failCounterMessages两个Map用来跟踪等待发送的事务消息以及每笔交易已经失败的次数。
ack方法通过msgId来删除每个列表中的事务消息。
public void ackObject msgId {
messages.removemsgId;
failCounterMessages.removemsgId;
}
fail方法决定是否重发一个事务消息,或者事务消息已经失败了太多次而最终是失败的。如果在拓扑中使用广播分组,那么任何Bolt实例失败,Spout的fail方法都会被调用。
public void failObject msgId {
获取事务id
Integer transactionId = Integer msgId;
获取msgId的事务的失败次数,并加1
Integer failures =
transactionFailureCount.gettransactionId 1;
判断事务的失败次数是否大于最大允许失败次数
iffails = MAX_FAILS{
如果失败次数大于或者等于最大允许失败次数,终止拓扑并抛出异常
throw new RuntimeException"Error,
transaction id ["
transactionId "] has had too many errors [" failures "]";
}
如果失败次数小于最大允许失败次数,保存失败计数并把消息放入发送消息队列中
transactionFailureCount.puttransactionId,
failures;
toSend.puttransactionId,messages.gettransactionId;
LOG.info"Re-sending message
[" msgId "]";
}
fail方法会检查已经失败的事务的次数。如果一个事务失败了很多次,超过最大允许失败次数,会抛出一个RuntimeException异常并终止运行中的Worker进程。否则,保存失败计数,把事务消息放到toSend队列,当nextTuple方法被调用的时候会重发消息。
Storm节点不维护状态。如果在内存中存储信息,并且节点又宕机了,将会失去所有的信息。所以,Storm节点的状态由外部的ZooKeeper集群所维护。
Storm是一种快速失败系统。如果抛出一个异常,拓扑将会失败。但Storm会在一个一致性状态中重启进程,然后正常恢复进程的执行。
5.2 Spout获取数据的方式
在Storm中,主要有3种Spout获取数据的模式:直接连接、消息队列和DRPC。
5.2.1 直接连接(Direct
Connection)
在Spout直接连接的架构中,Spout直接与数据源相连接,如图5.1所示。
图5.1 Spout直接连接的架构图
这种架构实现起来很容易,特别是当消息发射器是一个已知的设备或一个已知的设备组时。一个已知的设备是指在拓扑启动时是已知的,在整个拓扑生命周期中保持不变的设备。一个未知的设备是拓扑已经运行后添加进来的设备。一个已知的设备组是指组中的所有设备在开始时都是已知的。
可以使用多个Spout从多个消息发射器中获取消息,如图5.2所示。使用这种技术,可以均匀地分发收集器访问数据源,例如,从Web服务器中收集日志文件。
在前面介绍了连接Spout到一个已知设备,可以使用同样的方法连接到未知设备,并使用协调系统来维护设备列表,如图5.3所示。如果协调器检测到列表有变更,就创建或者删除连接。例如,从Web服务器收集日志文件,Web服务器的列表可能会随时间而变化。当一个Web服务器被添加时,协调器检测到变化,为它创建一个新的Spout。
图5.2 多个Spout直接连接的架构图
图5.3 基于协调器的Spout直接连接的架构图
建议创建从Spout到消息发射器的连接,而不是反过来。因为,如果运行Spout的主机宕机,Storm会在另一台主机上重新启动它,Spout定位消息发射器比消息发射器跟踪主机上的Spout要容易些。
5.2.2 消息队列(Enqueued
Messages)
第二种方法是Spout连接到消息队列系统,如图5.4所示,消息发射器把消息发送到消息队列系统,Spout从消息队列系统获取消息。
图5.4 Spout连接到消息队列系统的架构图
使用消息队列系统的优势是,它可以作为Spout和数据源之间的中间件。这意味着Spout不需要知道关于消息发射器的任何东西,添加和删除发射器的过程将比直接连接更容易。这种架构的问题是,消息队列系统会成为故障点,并且在处理流程中添加了一个新层。
5.2.3 DRPC(分布式RPC)
DRPCSpout是一个Spout实现,从DRPC服务器接收函数调用流并处理它。通常情况下,使用backtype.storm.drpc.DRPCSpout就足够了,也可以使用DRPC类来创建自己的实现。
5.3 常用的Spout
5.3.1 Kestrel作为Spout的数据源
通过storm-kestrel项目可以使用Kestrel作为Spout的输入数据源。项目主页如下:
https:github.comnathanmarzstorm-kestrel
storm-kestrel项目组已经对Kestrel 2.2进行过测试,该项目对Kestrel 2.2版本是有效的。
5.3.2 AMQP作为Spout的数据源
通过storm-amqp-spout项目,可以使用AMQP(Advanced Message Queuing Protocol,高级消息队列协议)作为Spout的输入数据源。项目主页如下:
https:github.comrapportive-ossstorm-amqp-spout
该版本的storm-amqp-spout已经过时,与最新的Storm可能会存在兼容性问题,在RabbitMQ 2.3.1、2.6.1、2.7.0下测试通过,可能会兼容其他版本的AMQP。
新版的storm-amqp-spout有两个不同的分支。
其中一个分支在RabbitMQ 2.3.1、2.6.1、2.7.0、2.8.2、3.0.2下测试通过,可能会兼容其他版本的AMQP。
https:github.comstorm-amqpstorm-amqp-spout
另一个分支在RabbitMQ 2.3.1、2.6.1、2.7.0下测试通过,可能会兼容其他版本的AMQP。
https:github.comdkincaidstorm-amqp-spout
5.3.3 JMS作为Spout的数据源
通过storm-jms项目,可以使用JMS作为Spout的输入数据源。项目主页如下:
https:github.comptgoetzstorm-jms
Storm JMS是一种通用的框架,在Storm框架中集成JMS消息。
Storm JMS允许你使用一个通用的JMS Spout将数据写入Storm中,使用一个通用的JMS Bolt从Storm中读取数据。
要使用JMS Spout和JMS Bolt,需要提供一个简单的Java类,连接JMS和Storm
API,封装特定领域的逻辑。
5.3.4 Redis作为Spout的数据源
通过storm-pubsub项目,可以使用Redis作为Spout的输入数据源。项目主页如下:
https:github.comsorenmacbethstorm-redis-pubsub
5.3.5 beanstalkd作为Spout的数据源
通过storm-beanstalkd-spout项目,可以使用beanstalkd作为Spout的输入数据源。项目主页如下:
https:github.comhaitaoyaostorm-beanstalkd-spout
beanstalkd是一个简单快速的工作队列。项目主页如下:
http:kr.github.combeanstalkd
5.4 学习编写Spout类
一般来说,通过继承BaseRichSpout抽象类来实现Spout。
下面,编写一个MySpout类,通过继承BaseRichSpout类的方式来实现。
public class MySpout extends BaseRichSpout {
@Override
public void
declareOutputFieldsOutputFieldsDeclarer declarer {
TODO Auto-generated method stub
}
@Override
public void openMap conf,
TopologyContext context, SpoutOutputCollector collector {
TODO Auto-generated method stub
}
@Override
public void nextTuple {
TODO Auto-generated method stub
}
}
继承BaseRichSpout的Spout类,还需要重写declareOutputFields、open和nextTuple 3个方法,编码步骤如下:
在declareOutputFields方法中声明字段。
在open方法中初始化参数和变量。
在nextTuple方法中接收产生元组,处理逻辑,然后发射出去。
另外,还可以通过实现IRichSpout接口或者ISpout接口,定义逻辑更复杂的Spout。
5.5 本章小结
本章对Spout进行了详细的讲解,包括Spout获取数据的方式、常用的Spout、学习如何编写Spout类等内容。
Spout是Storm的数据来源,必须掌握和灵活运用它才能进行更深入的Storm开发。
|
|