JStorm运用场景

JStorm处理数据的方式是基于消息的流水线处理, 因此特别适合无状态计算,也就是计算单元的依赖的数据全部在接受的消息中可以找到, 并且最好一个数据流不依赖另外一个数据流。

因此,常常用于

  • 日志分析,从日志中分析出特定的数据,并将分析的结果存入外部存储器如数据库。目前,主流日志分析技术就使用JStorm或Storm
  • 管道系统, 将一个数据从一个系统传输到另外一个系统, 比如将数据库同步到Hadoop
  • 消息转化器, 将接受到的消息按照某种格式进行转化,存储到另外一个系统如消息中间件
  • 统计分析器, 从日志或消息中,提炼出某个字段,然后做count或sum计算,最后将统计值存入外部存储器。中间处理过程可能更复杂。

Hadoop与JStorm的区别

hadoop的MR,提交到hadoop的MR job,执行完就结束了,进程就退出了,而一个JStorm任务(JStorm中称为topology),是7*24小时永远在运行的,除非用户主动kill。JStorm之所以能源源不断的运行,是由于当Worker失效或机器出现故障时, 自动分配新的Worker替换失效Worker。使得JStorm具有良好的健壮性。

JStorm组件

spout

spout代表输入的数据源(kafka,DB,HBase,HDFS)

bolt

bolt代表处理逻辑,bolt收到消息之后,对消息做处理(即执行用户的业务逻辑),处理完以后,既可以将处理后的消息继续发送到下游的bolt,这样会形成一个处理流水线(pipeline,不过更精确的应该是个有向图);也可以直接结束。

通常一个流水线的最后一个bolt,会做一些数据的存储工作,比如将实时计算出来的数据写入DB、HBase等,以供前台业务进行查询和展现。

tuple

在JStorm中有对于流stream的抽象,流是一个不间断的无界的连续tuple,注意JStorm在建模事件流时,把流中的事件抽象为tuple即元组。

一个tuple就是一个值列表value list,list中的每个value都有一个name,并且该value可以是基本类型,字符类型,字节数组等,当然也可以是其他可序列化的类型。拓扑的每个节点都要说明它所发射出的元组的字段的name,其他节点只需要订阅该name就可以接收处理。

可以抽象为:spout是一个水龙头,tuple就是水龙头流出的水,bolt就是一个水处理器。spout源头接收其他中间件吐出的数据,数据装配为tuple,流向bolt进行数据处理。处理后的数据存入数据库了。

基于消息的流水线系统
流水线系统
流水线系统
JVM中的jstorm进程
JVM中的JStorm进程
JVM中的JStorm进程

Jstorm相关角色

1
2
3
4
5
–Nimbus:资源调度角色
–Supervisor:接受nimubs 任务安排,启动任务
–Worker:进程
–Executor:执行线程
–Task:执行逻辑单元
Storm原理
Storm原理
Storm原理
组件接口

JStorm框架对spout组件定义了一个接口:nextTuple,顾名思义,就是获取下一条消息。执行时,可以理解成JStorm框架会不停地调这个接口,以从数据源拉取数据并往bolt发送数据。

同时,bolt组件定义了一个接口:execute,这个接口就是用户用来处理业务逻辑的地方。

每一个topology,既可以有多个spout,代表同时从多个数据源接收消息,也可以多个bolt,来执行不同的业务逻辑。

调度执行

接下来就是topology的调度和执行原理,对一个topology,JStorm最终会调度成一个或多个worker,每个worker即为一个真正的操作系统执行进程,分布到一个集群的一台或者多台机器上并行执行。

而每个worker中,又可以有多个task,分别代表一个执行线程。每个task就是上面提到的组件(component)的实现,要么是spout要么是bolt。

一个topology对应多个worker;每个worker,对应多个task;每个task,需要靠spout或bolt实现

消息之间的通信

spout与bolt,bolt与bolt之间是怎么通信的?

首先,从spout发送消息的时候,JStorm会计算出消息要发送的目标task id列表,然后看目标task id是在本进程中,还是其他进程中,如果是本进程中,那么就可以直接走进程内部通信(如直接将这个消息放入本进程中目标task的执行队列中);如果是跨进程,那么JStorm会使用netty来将消息发送到目标task中。

ack机制

用于判断spout发出的消息是否被成功处理,或失败处理。

1
2
3
在规定的时间内,spout收到Acker的ack响应,即认为该tuple 被后续bolt成功处理
在规定的时间内,没有收到Acker的ack响应tuple,就触发fail动作,即认为该tuple处理失败,
或者收到Acker发送的fail响应tuple,也认为失败,触发fail动作

源码分析

源码围绕Spout和Bolt构建

Spout
1
2
3
4
5
6
7
8
9
public interface ISpout extends Serializable {
void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
void close();
void activate();
void deactivate();
void nextTuple();
void ack(Object msgId);
void fail(Object msgId);
}

其中注意:

  • spout对象必须是继承Serializable, 因此要求spout内所有数据结构必须是可序列化的
  • spout可以有构造函数,但构造函数只执行一次,是在提交任务时,创建spout对象,因此在task分配到具体worker之前的初始化工作可以在此处完成,一旦完成,初始化的内容将携带到每一个task内(因为提交任务时将spout序列化到文件中去,在worker起来时再将spout从文件中反序列化出来)。
  • open是当task起来后执行的初始化动作
  • close是当task被shutdown后执行的动作
  • activate 是当task被激活时,触发的动作
  • deactivate 是task被deactive时,触发的动作
  • nextTuple 是spout实现核心, nextuple完成自己的逻辑,即每一次取消息后,用collector 将消息emit出去。
  • ack, 当spout收到一条ack消息时,触发的动作
  • fail, 当spout收到一条fail消息时,触发的动作

Bolt

1
2
3
4
5
public interface IBolt extends Serializable {
void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
void execute(Tuple input);
void cleanup();
}

注意:

  • prepare是当task起来后执行的初始化动作
  • cleanup是当task被shutdown后执行的动作
  • execute是bolt实现核心, 完成自己的逻辑,即接受每一次取消息后,处理完,有可能用collector 将产生的新消息emit出去。 ** 在executor中,当程序处理一条消息时,需要执行collector.ack, 在executor中,当程序无法处理一条消息时或出错时,需要执行collector.fail。