一、我的收获

部署CRM

这次有个比较大的收获就是自己部署CRM系统,CRM系统很复杂,光部署的用户就有10多个。这次的系统部署中,部署的用户有deploy, web, app, cache, search, sec(权限缓存redis), sna(Session缓存redis),kafka, zk, proxy, nginx

要启动系统,这些都要启动。比如:不启动nginx。就获取不到静态资源,登陆的js就会报错,点击登录按钮没反应;没有启动proxy,CRM的页面就不能访问,这个代理程序可以配置端口;还有redis也需要启动,不然登陆出现错误;

其次学会了很多linux 命令操作,尤其是vi的操作。比如vi文件后,假如要删除里面某些内容,我以前通过退格键一个一个数据的删除,后来才知道 :dd --删除当前行ndd --删除n行数据,dG --删除当前后之后的全部行ESC+u --撤销操作

学会了简单的python脚本,shell脚本的编写

shell脚本举例:linux 软件安装脚本

python脚本举例: 远程发布文件 —— 远程发布文件脚本

python比较文件异同: 文件异同比较

Log4x部署学习
1
2
log4x@test186 ~/support $ ls
elasticsearch hadoop hbase jdk jstorm kafka redis-4.0.9 zk

log4x需要安装 以上的软件,部署本身没啥难度,只要把配置文件的地址,路径配对。系统就可以启起来。

工作流程:app,web的日志发送到kafka里,jstorm会从这里面取数据(实时计算),把处理后的数据存储到hbase,oracle,elasticsearch中。zk相当于一个公告板,告诉这些软件有哪些信息。

Amber部署学习

进行中…..

二、我对技术的一些了解

关于redis

Redis是一种key/value型数据库,我们存值时采用如下命令:

1
redis>SET message "hello redis"

那么keymessagevalue“hello redis”。这里的valuestring类型的。redis的值还有其他类型。

类型常量 对象的名称
REDIS_STRING 字符串对象
REDIS_LIST 列表对象
REDIS_HASH 哈希对象
REDIS_SET 集合对象
REDIS_ZSET 有序集合对象
直接启动

进入redis根目录,执行命令:

1
./redis-server &

在工作中遇到过一个问题,就是当机器非正常断电关机时,无法直接启动redis。直接启动会报错,说进程已经存在。但是进程明明没有了。

指定配置文件启动

这时候可以同过配置文件启动,假如我们redis的配置为:etc/redis-7001.conf
进入redis根目录,输入命令:

1
./redis-server etc/redis-7001.conf

这样就能启动redis。

如果给redis配置了自定义的端口,使用redis-cli客户端连接时,也需要指定端口,例如:

1
./redis-cli -p 7001
关于redis消息订阅

redis的发布订阅系统有点类似于我们生活中的电台,电台可以在某一个频率上发送广播,而我们可以接收任何一个频率的广播,这种消息订阅没有kafka高效。

订阅消息的方式如下:

1
2
3
4
5
6
7
8
9
10
11
127.0.0.1:12001> SUBSCRIBE c1 c2 c3
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "c1"
3) (integer) 1
1) "subscribe"
2) "c2"
3) (integer) 2
1) "subscribe"
2) "c3"
3) (integer) 3

这个表示接收c1,c2,c3三个频道传来的消息,发送消息的方式如下:

1
2
127.0.0.1:12001> PUBLISH c1 "I am lvshen"
(integer) 1

当c1这个频道上有消息发出时,此时在消息订阅控制台可以看到如下输出:

1
2
3
1) "message"
2) "c1"
3) "I am lvshen"

在redis中,我们也可以使用模式匹配订阅,如下:

1
2
3
4
5
127.0.0.1:12001> PSUBSCRIBE c*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "c*"
3) (integer) 1

此时可以接收到所有以c开头的频道发来的消息。

创建c1频道
创建c1频道
另一边在这个频道上发布一个消息
另一边在这个频道上发布一个消息
这里接收到我发布的消息
这里接收到我发布的消息
Redis3.0集群搭建
1
2
amber@test185 ~/support/redis/etc $ ls
redis-7001.conf redis-7002.conf redis-7003.conf redis-7004.conf redis-7005.conf redis-7006.conf redis-7007.conf

假设我们要把redis-7007.conf,添加到之前的集群中,并作为7001的从属节点。

先启动redis-7007,sbin目录下:./redis-server ../etc/redis-7007.conf

将7007添加到7001上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
amber@test185 ~/support/redis/sbin $ ./redis-trib.rb add-node 10.174.26.185:7007 10.174.26.185:7001
>>> Adding node 10.174.26.185:7007 to cluster 10.174.26.185:7001
>>> Performing Cluster Check (using node 10.174.26.185:7001)
M: b43c58a7052cbe85e4b1118780874556ba70894c 10.174.26.185:7001
slots:0-5460 (5461 slots) master
1 additional replica(s)
M: d30c4cde028c4480a4ab69c8b30d103377d36777 10.174.26.185:7002
slots:5461-10922 (5462 slots) master
1 additional replica(s)
S: ed81aa0569744c8bb78d92480e69500d77536c6d 10.174.26.185:7004
slots: (0 slots) slave
replicates 0c42214d66597845cc58da5fed9e17018adba246
M: 0c42214d66597845cc58da5fed9e17018adba246 10.174.26.185:7003
slots:10923-16383 (5461 slots) master
1 additional replica(s)
S: 79b4f97b36e62849028915752d2c5c117095a0d0 10.174.26.185:7006
slots: (0 slots) slave
replicates d30c4cde028c4480a4ab69c8b30d103377d36777
S: 6e5eb750cae44b638cdc3372bc2a9146aff56df5 10.174.26.185:7005
slots: (0 slots) slave
replicates b43c58a7052cbe85e4b1118780874556ba70894c
[OK] All nodes agree about slots configuration.
>>> Check for open slots...
>>> Check slots coverage...
[OK] All 16384 slots covered.
>>> Send CLUSTER MEET to node 10.174.26.185:7007 to make it join the cluster.
[OK] New node added correctly.

查看集群信息:

1
2
3
4
5
6
7
amber@test185 ~/support/redis/sbin $ ./redis-trib.rb info 10.174.26.185:7007
10.174.26.185:7007 (ad0f484f...) -> 0 keys | 0 slots | 0 slaves.
10.174.26.185:7001 (b43c58a7...) -> 0 keys | 5461 slots | 1 slaves.
10.174.26.185:7003 (0c42214d...) -> 0 keys | 5461 slots | 1 slaves.
10.174.26.185:7002 (d30c4cde...) -> 0 keys | 5462 slots | 1 slaves.
[OK] 0 keys in 4 masters.
0.00 keys per slot on average.

7007并没有挂在7001上。

挂载从属节点:

1
2
3
4
5
6
7
8
9
10
11
12
./redis-cli -h 10.174.26.185 -c -p 7007
10.174.26.185:7007> cluster nodes
b43c58a7052cbe85e4b1118780874556ba70894c 10.174.26.185:7001@17001 master - 0 1536119538737 1 connected 0-5460
ed81aa0569744c8bb78d92480e69500d77536c6d 10.174.26.185:7004@17004 slave 0c42214d66597845cc58da5fed9e17018adba246 0 1536119541000 3 connected
6e5eb750cae44b638cdc3372bc2a9146aff56df5 10.174.26.185:7005@17005 slave b43c58a7052cbe85e4b1118780874556ba70894c 0 1536119540800 1 connected
0c42214d66597845cc58da5fed9e17018adba246 10.174.26.185:7003@17003 master - 0 1536119541833 3 connected 10923-16383
79b4f97b36e62849028915752d2c5c117095a0d0 10.174.26.185:7006@17006 slave d30c4cde028c4480a4ab69c8b30d103377d36777 0 1536119539789 2 connected
ad0f484fc7b6399e0aa9e3b6d67d5771554a91aa 10.174.26.185:7007@17007 myself,master - 0 1536119539000 0 connected
d30c4cde028c4480a4ab69c8b30d103377d36777 10.174.26.185:7002@17002 master - 0 1536119539000 2 connected 5461-10922
10.174.26.185:7007> cluster replicate b43c58a7052cbe85e4b1118780874556ba70894c
OK
10.174.26.185:7007> quit

查看结果:

1
2
3
4
5
6
amber@test185 ~/support/redis/sbin $ ./redis-trib.rb info 10.174.26.185:7007
10.174.26.185:7001 (b43c58a7...) -> 0 keys | 5461 slots | 2 slaves.
10.174.26.185:7003 (0c42214d...) -> 0 keys | 5461 slots | 1 slaves.
10.174.26.185:7002 (d30c4cde...) -> 0 keys | 5462 slots | 1 slaves.
[OK] 0 keys in 3 masters.
0.00 keys per slot on average.

查看nodes-7001.conf文件

1
2
3
4
5
6
7
8
amber@test185 ~/data/redis-data $ more nodes-7001.conf
d30c4cde028c4480a4ab69c8b30d103377d36777 10.174.26.185:7002@17002 master - 0 1536130505000 2 connected 5461-10922
ed81aa0569744c8bb78d92480e69500d77536c6d 10.174.26.185:7004@17004 slave 0c42214d66597845cc58da5fed9e17018adba246 0 1536130507000 4 connected
0c42214d66597845cc58da5fed9e17018adba246 10.174.26.185:7003@17003 master - 0 1536130507099 3 connected 10923-16383
79b4f97b36e62849028915752d2c5c117095a0d0 10.174.26.185:7006@17006 slave d30c4cde028c4480a4ab69c8b30d103377d36777 0 1536130507000 6 connected
b43c58a7052cbe85e4b1118780874556ba70894c 10.174.26.185:7001@17001 myself,master - 0 1536130504000 1 connected 0-5460
6e5eb750cae44b638cdc3372bc2a9146aff56df5 10.174.26.185:7005@17005 slave b43c58a7052cbe85e4b1118780874556ba70894c 0 1536130507000 5 connected
ad0f484fc7b6399e0aa9e3b6d67d5771554a91aa 10.174.26.185:7007@17007 slave b43c58a7052cbe85e4b1118780874556ba70894c 0 1536130507912 1 connected
关于Jstorm

Jstorm是阿里巴巴在storm上改造的软件,特点:24小时时时计算。用于数据分析,如分析日志。

Jstorm用于海量数据计算,除此之外还有Hadoop的MapReduce编程模型。这个理论模型是由Google发表的。

MapReduce可以分成MapReduce两部分理解。

1.Map:映射过程,把一组数据按照某种Map函数映射成新的数据。

2.Reduce:归约过程,把若干组映射结果进行汇总并输出。

MapReduce
MapReduce

但是hadoop的这中MapReduce job是执行完就结束,进程退出。而jstorm是执行一遍,又执行一边,24小时永久执行。JStorm之所以能源源不断的运行,是由于当Worker失效或机器出现故障时, 自动分配新的Worker替换失效Worker。使得JStorm具有良好的健壮性。

JStorm组件
spout

spout代表输入的数据源(kafka,DB,HBase,HDFS)

bolt

bolt代表处理逻辑,bolt收到消息之后,对消息做处理(即执行用户的业务逻辑),处理完以后,既可以将处理后的消息继续发送到下游的bolt,这样会形成一个处理流水线(pipeline,不过更精确的应该是个有向图);也可以直接结束。

通常一个流水线的最后一个bolt,会做一些数据的存储工作,比如将实时计算出来的数据写入DB、HBase等,以供前台业务进行查询和展现。

tuple

在JStorm中有对于流stream的抽象,流是一个不间断的无界的连续tuple,注意JStorm在建模事件流时,把流中的事件抽象为tuple即元组。

一个tuple就是一个值列表value list,list中的每个value都有一个name,并且该value可以是基本类型,字符类型,字节数组等,当然也可以是其他可序列化的类型。拓扑的每个节点都要说明它所发射出的元组的字段的name,其他节点只需要订阅该name就可以接收处理。

可以抽象为:spout是一个水龙头,tuple就是水龙头流出的水,bolt就是一个水处理器。spout源头接收其他中间件吐出的数据,数据装配为tuple,流向bolt进行数据处理。处理后的数据存入数据库了。

基于消息的流水线系统
流水线系统
流水线系统
JVM中的jstorm进程
JVM中的JStorm进程
JVM中的JStorm进程
Jstorm相关角色
1
2
3
4
5
–Nimbus:资源调度角色
–Supervisor:接受nimubs 任务安排,启动任务
–Worker:进程
–Executor:执行线程
–Task:执行逻辑单元
Storm原理
Storm原理
Storm原理
组件接口

JStorm框架对spout组件定义了一个接口:nextTuple,顾名思义,就是获取下一条消息。执行时,可以理解成JStorm框架会不停地调这个接口,以从数据源拉取数据并往bolt发送数据。

同时,bolt组件定义了一个接口:execute,这个接口就是用户用来处理业务逻辑的地方。

每一个topology,既可以有多个spout,代表同时从多个数据源接收消息,也可以多个bolt,来执行不同的业务逻辑。

调度执行

接下来就是topology的调度和执行原理,对一个topology,JStorm最终会调度成一个或多个worker,每个worker即为一个真正的操作系统执行进程,分布到一个集群的一台或者多台机器上并行执行。

而每个worker中,又可以有多个task,分别代表一个执行线程。每个task就是上面提到的组件(component)的实现,要么是spout要么是bolt。

一个topology对应多个worker;每个worker,对应多个task;每个task,需要靠spout或bolt实现

消息之间的通信

spout与bolt,bolt与bolt之间是怎么通信的?

首先,从spout发送消息的时候,JStorm会计算出消息要发送的目标task id列表,然后看目标task id是在本进程中,还是其他进程中,如果是本进程中,那么就可以直接走进程内部通信(如直接将这个消息放入本进程中目标task的执行队列中);如果是跨进程,那么JStorm会使用netty来将消息发送到目标task中。

ack机制

用于判断spout发出的消息是否被成功处理,或失败处理。

1
2
3
在规定的时间内,spout收到Acker的ack响应,即认为该tuple 被后续bolt成功处理
在规定的时间内,没有收到Acker的ack响应tuple,就触发fail动作,即认为该tuple处理失败,
或者收到Acker发送的fail响应tuple,也认为失败,触发fail动作
源码分析

源码围绕Spout和Bolt构建

Spout
1
2
3
4
5
6
7
8
9
public interface ISpout extends Serializable {
void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
void close();
void activate();
void deactivate();
void nextTuple();
void ack(Object msgId);
void fail(Object msgId);
}

其中注意:

  • spout对象必须是继承Serializable, 因此要求spout内所有数据结构必须是可序列化的
  • spout可以有构造函数,但构造函数只执行一次,是在提交任务时,创建spout对象,因此在task分配到具体worker之前的初始化工作可以在此处完成,一旦完成,初始化的内容将携带到每一个task内(因为提交任务时将spout序列化到文件中去,在worker起来时再将spout从文件中反序列化出来)。
  • open是当task起来后执行的初始化动作
  • close是当task被shutdown后执行的动作
  • activate 是当task被激活时,触发的动作
  • deactivate 是task被deactive时,触发的动作
  • nextTuple 是spout实现核心, nextuple完成自己的逻辑,即每一次取消息后,用collector 将消息emit出去。
  • ack, 当spout收到一条ack消息时,触发的动作
  • fail, 当spout收到一条fail消息时,触发的动作
Bolt
1
2
3
4
5
public interface IBolt extends Serializable {
void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
void execute(Tuple input);
void cleanup();
}

注意:

  • prepare是当task起来后执行的初始化动作
  • cleanup是当task被shutdown后执行的动作
  • execute是bolt实现核心, 完成自己的逻辑,即接受每一次取消息后,处理完,有可能用collector 将产生的新消息emit出去。在executor中,当程序处理一条消息时,需要执行collector.ack, 在executor中,当程序无法处理一条消息时或出错时,需要执行collector.fail。
额外学习:《Java8实战》读书笔记

Java8的两个新特性:Lambda表达式和stream()的使用,简化了我们的开发。举个例子:

Lambda-对苹果按重量排序
1
2
3
4
5
6
7
8
9
10
11
//Java8之前
Collections.sort(inventory, new Comparator<Apple>() {
public int compare(Apple a1, Apple a2){
return a1.getWeight().compareTo(a2.getWeight());
}
});
//Java8特性(方法引用)
inventory.sort(comparing(Apple::getWeight));
//Lambda表达式
Comparator<Apple> byWeight =
(Apple a1, Apple a2) -> a1.getWeight().compareTo(a2.getWeight());
Stream-筛选金额较高的交易
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//未使用流
Map<Currency, List<Transaction>> transactionsByCurrencies = new HashMap<>();
for (Transaction transaction : transactions) {
if(transaction.getPrice() > 1000){
Currency currency = transaction.getCurrency();
List<Transaction> transactionsForCurrency = transactionsByCurrencies.get(currency);
if (transactionsForCurrency == null) {
transactionsForCurrency = new ArrayList<>();
transactionsByCurrencies.put(currency,transactionsForCurrency);
}
transactionsForCurrency.add(transaction);
}
}
//使用流
import static java.util.stream.Collectors.toList;
Map<Currency, List<Transaction>> transactionsByCurrencies = transactions.stream().filter((Transaction t) -> t.getPrice() > 1000).collect(groupingBy(Transaction::getCurrency));
Java8中的default关键字

用于在接口中扩充方法,而不影响子接口,或子类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//接口中的方法用default修饰后,可以有结构体
public interface DefaultTest {
default void foo(){
System.out.println("Calling A.foo()");
}
}
//该类实现了DefaultTest接口,并不用实现foo(),因为foo()被default关键字修饰
public class DefaultTestImpl implements DefaultTest {
public static void main(String[] args){
DefaultTestImpl defaultTest = new DefaultTestImpl();
defaultTest.foo();
}
}

三、下一步学习方向

linux系统方面的,python编程,大数据(Hbase,kafka,zk等)尽量了解源码,了解他们的代码风格。学习学习。

具体步骤:一看技术文章/书籍,二请教大佬,三动手实践操作。

Python Study方案:

Python Study
Python Study

大数据学习:

扒源码,最近在看Jstorm的源码。

Linux系统学习:

命令,shell脚本等等 红帽RHCE认证

四、现有工作建议

  1. 自己工作主动性缺乏,需要改正
  2. 工作内容不明确,有时候不知道我们的工作内容,哪些是需要我们做的。