一、我的收获
部署CRM
这次有个比较大的收获就是自己部署CRM系统,CRM系统很复杂,光部署的用户就有10多个。这次的系统部署中,部署的用户有deploy, web, app, cache, search, sec(权限缓存redis), sna(Session缓存redis),kafka, zk, proxy, nginx
。
要启动系统,这些都要启动。比如:不启动nginx
。就获取不到静态资源,登陆的js就会报错,点击登录按钮没反应;没有启动proxy,CRM的页面就不能访问,这个代理程序可以配置端口;还有redis也需要启动,不然登陆出现错误;
其次学会了很多linux 命令操作,尤其是vi的操作。比如vi文件后,假如要删除里面某些内容,我以前通过退格键一个一个数据的删除,后来才知道 :dd --删除当前行
,ndd --删除n行数据,dG --删除当前后之后的全部行
。ESC+u --撤销操作
。
学会了简单的python脚本,shell脚本的编写
shell脚本举例:linux 软件安装脚本
python脚本举例: 远程发布文件 —— 远程发布文件脚本
python比较文件异同: 文件异同比较
Log4x部署学习
1 2
| log4x@test186 ~/support $ ls elasticsearch hadoop hbase jdk jstorm kafka redis-4.0.9 zk
|
log4x需要安装 以上的软件,部署本身没啥难度,只要把配置文件的地址,路径配对。系统就可以启起来。
工作流程:app,web的日志发送到kafka里,jstorm会从这里面取数据(实时计算),把处理后的数据存储到hbase,oracle,elasticsearch中。zk相当于一个公告板,告诉这些软件有哪些信息。
Amber部署学习
进行中…..
二、我对技术的一些了解
关于redis
Redis是一种key/value
型数据库,我们存值时采用如下命令:
1
| redis>SET message "hello redis"
|
那么key
为message
,value
为“hello redis”
。这里的value
为string
类型的。redis的值还有其他类型。
类型常量 |
对象的名称 |
REDIS_STRING |
字符串对象 |
REDIS_LIST |
列表对象 |
REDIS_HASH |
哈希对象 |
REDIS_SET |
集合对象 |
REDIS_ZSET |
有序集合对象 |
直接启动
进入redis根目录,执行命令:
在工作中遇到过一个问题,就是当机器非正常断电关机时,无法直接启动redis。直接启动会报错,说进程已经存在。但是进程明明没有了。
指定配置文件启动
这时候可以同过配置文件启动,假如我们redis的配置为:etc/redis-7001.conf
进入redis根目录,输入命令:
1
| ./redis-server etc/redis-7001.conf
|
这样就能启动redis。
如果给redis配置了自定义的端口,使用redis-cli
客户端连接时,也需要指定端口,例如:
关于redis消息订阅
redis的发布订阅系统有点类似于我们生活中的电台,电台可以在某一个频率上发送广播,而我们可以接收任何一个频率的广播,这种消息订阅没有kafka高效。
订阅消息的方式如下:
1 2 3 4 5 6 7 8 9 10 11
| 127.0.0.1:12001> SUBSCRIBE c1 c2 c3 Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "c1" 3) (integer) 1 1) "subscribe" 2) "c2" 3) (integer) 2 1) "subscribe" 2) "c3" 3) (integer) 3
|
这个表示接收c1,c2,c3三个频道传来的消息,发送消息的方式如下:
1 2
| 127.0.0.1:12001> PUBLISH c1 "I am lvshen" (integer) 1
|
当c1这个频道上有消息发出时,此时在消息订阅控制台可以看到如下输出:
1 2 3
| 1) "message" 2) "c1" 3) "I am lvshen"
|
在redis中,我们也可以使用模式匹配订阅,如下:
1 2 3 4 5
| 127.0.0.1:12001> PSUBSCRIBE c* Reading messages... (press Ctrl-C to quit) 1) "psubscribe" 2) "c*" 3) (integer) 1
|
此时可以接收到所有以c开头的频道发来的消息。
Redis3.0集群搭建
1 2
| amber@test185 ~/support/redis/etc $ ls redis-7001.conf redis-7002.conf redis-7003.conf redis-7004.conf redis-7005.conf redis-7006.conf redis-7007.conf
|
假设我们要把redis-7007.conf
,添加到之前的集群中,并作为7001
的从属节点。
先启动redis-7007,sbin
目录下:./redis-server ../etc/redis-7007.conf
将7007添加到7001上:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| amber@test185 ~/support/redis/sbin $ ./redis-trib.rb add-node 10.174.26.185:7007 10.174.26.185:7001 >>> Adding node 10.174.26.185:7007 to cluster 10.174.26.185:7001 >>> Performing Cluster Check (using node 10.174.26.185:7001) M: b43c58a7052cbe85e4b1118780874556ba70894c 10.174.26.185:7001 slots:0-5460 (5461 slots) master 1 additional replica(s) M: d30c4cde028c4480a4ab69c8b30d103377d36777 10.174.26.185:7002 slots:5461-10922 (5462 slots) master 1 additional replica(s) S: ed81aa0569744c8bb78d92480e69500d77536c6d 10.174.26.185:7004 slots: (0 slots) slave replicates 0c42214d66597845cc58da5fed9e17018adba246 M: 0c42214d66597845cc58da5fed9e17018adba246 10.174.26.185:7003 slots:10923-16383 (5461 slots) master 1 additional replica(s) S: 79b4f97b36e62849028915752d2c5c117095a0d0 10.174.26.185:7006 slots: (0 slots) slave replicates d30c4cde028c4480a4ab69c8b30d103377d36777 S: 6e5eb750cae44b638cdc3372bc2a9146aff56df5 10.174.26.185:7005 slots: (0 slots) slave replicates b43c58a7052cbe85e4b1118780874556ba70894c [OK] All nodes agree about slots configuration. >>> Check for open slots... >>> Check slots coverage... [OK] All 16384 slots covered. >>> Send CLUSTER MEET to node 10.174.26.185:7007 to make it join the cluster. [OK] New node added correctly.
|
查看集群信息:
1 2 3 4 5 6 7
| amber@test185 ~/support/redis/sbin $ ./redis-trib.rb info 10.174.26.185:7007 10.174.26.185:7007 (ad0f484f...) -> 0 keys | 0 slots | 0 slaves. 10.174.26.185:7001 (b43c58a7...) -> 0 keys | 5461 slots | 1 slaves. 10.174.26.185:7003 (0c42214d...) -> 0 keys | 5461 slots | 1 slaves. 10.174.26.185:7002 (d30c4cde...) -> 0 keys | 5462 slots | 1 slaves. [OK] 0 keys in 4 masters. 0.00 keys per slot on average.
|
7007
并没有挂在7001
上。
挂载从属节点:
1 2 3 4 5 6 7 8 9 10 11 12
| ./redis-cli -h 10.174.26.185 -c -p 7007 10.174.26.185:7007> cluster nodes b43c58a7052cbe85e4b1118780874556ba70894c 10.174.26.185:7001@17001 master - 0 1536119538737 1 connected 0-5460 ed81aa0569744c8bb78d92480e69500d77536c6d 10.174.26.185:7004@17004 slave 0c42214d66597845cc58da5fed9e17018adba246 0 1536119541000 3 connected 6e5eb750cae44b638cdc3372bc2a9146aff56df5 10.174.26.185:7005@17005 slave b43c58a7052cbe85e4b1118780874556ba70894c 0 1536119540800 1 connected 0c42214d66597845cc58da5fed9e17018adba246 10.174.26.185:7003@17003 master - 0 1536119541833 3 connected 10923-16383 79b4f97b36e62849028915752d2c5c117095a0d0 10.174.26.185:7006@17006 slave d30c4cde028c4480a4ab69c8b30d103377d36777 0 1536119539789 2 connected ad0f484fc7b6399e0aa9e3b6d67d5771554a91aa 10.174.26.185:7007@17007 myself,master - 0 1536119539000 0 connected d30c4cde028c4480a4ab69c8b30d103377d36777 10.174.26.185:7002@17002 master - 0 1536119539000 2 connected 5461-10922 10.174.26.185:7007> cluster replicate b43c58a7052cbe85e4b1118780874556ba70894c OK 10.174.26.185:7007> quit
|
查看结果:
1 2 3 4 5 6
| amber@test185 ~/support/redis/sbin $ ./redis-trib.rb info 10.174.26.185:7007 10.174.26.185:7001 (b43c58a7...) -> 0 keys | 5461 slots | 2 slaves. 10.174.26.185:7003 (0c42214d...) -> 0 keys | 5461 slots | 1 slaves. 10.174.26.185:7002 (d30c4cde...) -> 0 keys | 5462 slots | 1 slaves. [OK] 0 keys in 3 masters. 0.00 keys per slot on average.
|
查看nodes-7001.conf
文件
1 2 3 4 5 6 7 8
| amber@test185 ~/data/redis-data $ more nodes-7001.conf d30c4cde028c4480a4ab69c8b30d103377d36777 10.174.26.185:7002@17002 master - 0 1536130505000 2 connected 5461-10922 ed81aa0569744c8bb78d92480e69500d77536c6d 10.174.26.185:7004@17004 slave 0c42214d66597845cc58da5fed9e17018adba246 0 1536130507000 4 connected 0c42214d66597845cc58da5fed9e17018adba246 10.174.26.185:7003@17003 master - 0 1536130507099 3 connected 10923-16383 79b4f97b36e62849028915752d2c5c117095a0d0 10.174.26.185:7006@17006 slave d30c4cde028c4480a4ab69c8b30d103377d36777 0 1536130507000 6 connected b43c58a7052cbe85e4b1118780874556ba70894c 10.174.26.185:7001@17001 myself,master - 0 1536130504000 1 connected 0-5460 6e5eb750cae44b638cdc3372bc2a9146aff56df5 10.174.26.185:7005@17005 slave b43c58a7052cbe85e4b1118780874556ba70894c 0 1536130507000 5 connected ad0f484fc7b6399e0aa9e3b6d67d5771554a91aa 10.174.26.185:7007@17007 slave b43c58a7052cbe85e4b1118780874556ba70894c 0 1536130507912 1 connected
|
关于Jstorm
Jstorm是阿里巴巴在storm上改造的软件,特点:24小时时时计算。用于数据分析,如分析日志。
Jstorm用于海量数据计算,除此之外还有Hadoop的MapReduce编程模型。这个理论模型是由Google发表的。
MapReduce可以分成Map和Reduce两部分理解。
1.Map:映射过程,把一组数据按照某种Map函数映射成新的数据。
2.Reduce:归约过程,把若干组映射结果进行汇总并输出。
但是hadoop的这中MapReduce job是执行完就结束,进程退出。而jstorm是执行一遍,又执行一边,24小时永久执行。JStorm之所以能源源不断的运行,是由于当Worker失效或机器出现故障时, 自动分配新的Worker替换失效Worker。使得JStorm具有良好的健壮性。
JStorm组件
spout
spout代表输入的数据源(kafka,DB,HBase,HDFS)
bolt
bolt代表处理逻辑,bolt收到消息之后,对消息做处理(即执行用户的业务逻辑),处理完以后,既可以将处理后的消息继续发送到下游的bolt,这样会形成一个处理流水线(pipeline,不过更精确的应该是个有向图);也可以直接结束。
通常一个流水线的最后一个bolt,会做一些数据的存储工作,比如将实时计算出来的数据写入DB、HBase等,以供前台业务进行查询和展现。
tuple
在JStorm中有对于流stream的抽象,流是一个不间断的无界的连续tuple,注意JStorm在建模事件流时,把流中的事件抽象为tuple即元组。
一个tuple就是一个值列表value list,list中的每个value都有一个name,并且该value可以是基本类型,字符类型,字节数组等,当然也可以是其他可序列化的类型。拓扑的每个节点都要说明它所发射出的元组的字段的name,其他节点只需要订阅该name就可以接收处理。
可以抽象为:spout是一个水龙头,tuple就是水龙头流出的水,bolt就是一个水处理器。spout源头接收其他中间件吐出的数据,数据装配为tuple,流向bolt进行数据处理。处理后的数据存入数据库了。
基于消息的流水线系统
JVM中的jstorm进程
Jstorm相关角色
1 2 3 4 5
| –Nimbus:资源调度角色 –Supervisor:接受nimubs 任务安排,启动任务 –Worker:进程 –Executor:执行线程 –Task:执行逻辑单元
|
Storm原理
组件接口
JStorm框架对spout组件定义了一个接口:nextTuple
,顾名思义,就是获取下一条消息。执行时,可以理解成JStorm框架会不停地调这个接口,以从数据源拉取数据并往bolt发送数据。
同时,bolt组件定义了一个接口:execute
,这个接口就是用户用来处理业务逻辑的地方。
每一个topology,既可以有多个spout,代表同时从多个数据源接收消息,也可以多个bolt,来执行不同的业务逻辑。
调度执行
接下来就是topology的调度和执行原理,对一个topology,JStorm最终会调度成一个或多个worker,每个worker即为一个真正的操作系统执行进程,分布到一个集群的一台或者多台机器上并行执行。
而每个worker中,又可以有多个task,分别代表一个执行线程。每个task就是上面提到的组件(component)的实现,要么是spout要么是bolt。
一个topology对应多个worker;每个worker,对应多个task;每个task,需要靠spout或bolt实现
消息之间的通信
spout与bolt,bolt与bolt之间是怎么通信的?
首先,从spout发送消息的时候,JStorm会计算出消息要发送的目标task id列表,然后看目标task id是在本进程中,还是其他进程中,如果是本进程中,那么就可以直接走进程内部通信(如直接将这个消息放入本进程中目标task的执行队列中);如果是跨进程,那么JStorm会使用netty来将消息发送到目标task中。
ack机制
用于判断spout发出的消息是否被成功处理,或失败处理。
1 2 3
| 在规定的时间内,spout收到Acker的ack响应,即认为该tuple 被后续bolt成功处理 在规定的时间内,没有收到Acker的ack响应tuple,就触发fail动作,即认为该tuple处理失败, 或者收到Acker发送的fail响应tuple,也认为失败,触发fail动作
|
源码分析
源码围绕Spout和Bolt构建
Spout
1 2 3 4 5 6 7 8 9
| public interface ISpout extends Serializable { void open(Map conf, TopologyContext context, SpoutOutputCollector collector); void close(); void activate(); void deactivate(); void nextTuple(); void ack(Object msgId); void fail(Object msgId); }
|
其中注意:
- spout对象必须是继承Serializable, 因此要求spout内所有数据结构必须是可序列化的
- spout可以有构造函数,但构造函数只执行一次,是在提交任务时,创建spout对象,因此在task分配到具体worker之前的初始化工作可以在此处完成,一旦完成,初始化的内容将携带到每一个task内(因为提交任务时将spout序列化到文件中去,在worker起来时再将spout从文件中反序列化出来)。
- open是当task起来后执行的初始化动作
- close是当task被shutdown后执行的动作
- activate 是当task被激活时,触发的动作
- deactivate 是task被deactive时,触发的动作
- nextTuple 是spout实现核心, nextuple完成自己的逻辑,即每一次取消息后,用collector 将消息emit出去。
- ack, 当spout收到一条ack消息时,触发的动作
- fail, 当spout收到一条fail消息时,触发的动作
Bolt
1 2 3 4 5
| public interface IBolt extends Serializable { void prepare(Map stormConf, TopologyContext context, OutputCollector collector); void execute(Tuple input); void cleanup(); }
|
注意:
- prepare是当task起来后执行的初始化动作
- cleanup是当task被shutdown后执行的动作
- execute是bolt实现核心, 完成自己的逻辑,即接受每一次取消息后,处理完,有可能用collector 将产生的新消息emit出去。在executor中,当程序处理一条消息时,需要执行collector.ack, 在executor中,当程序无法处理一条消息时或出错时,需要执行collector.fail。
额外学习:《Java8实战》读书笔记
Java8的两个新特性:Lambda表达式和stream()的使用,简化了我们的开发。举个例子:
Lambda-对苹果按重量排序
1 2 3 4 5 6 7 8 9 10 11
| Collections.sort(inventory, new Comparator<Apple>() { public int compare(Apple a1, Apple a2){ return a1.getWeight().compareTo(a2.getWeight()); } }); inventory.sort(comparing(Apple::getWeight)); Comparator<Apple> byWeight = (Apple a1, Apple a2) -> a1.getWeight().compareTo(a2.getWeight());
|
Stream-筛选金额较高的交易
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| Map<Currency, List<Transaction>> transactionsByCurrencies = new HashMap<>(); for (Transaction transaction : transactions) { if(transaction.getPrice() > 1000){ Currency currency = transaction.getCurrency(); List<Transaction> transactionsForCurrency = transactionsByCurrencies.get(currency); if (transactionsForCurrency == null) { transactionsForCurrency = new ArrayList<>(); transactionsByCurrencies.put(currency,transactionsForCurrency); } transactionsForCurrency.add(transaction); } } import static java.util.stream.Collectors.toList; Map<Currency, List<Transaction>> transactionsByCurrencies = transactions.stream().filter((Transaction t) -> t.getPrice() > 1000).collect(groupingBy(Transaction::getCurrency));
|
Java8中的default关键字
用于在接口中扩充方法,而不影响子接口,或子类。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public interface DefaultTest { default void foo(){ System.out.println("Calling A.foo()"); } } public class DefaultTestImpl implements DefaultTest { public static void main(String[] args){ DefaultTestImpl defaultTest = new DefaultTestImpl(); defaultTest.foo(); } }
|
三、下一步学习方向
linux系统方面的,python编程,大数据(Hbase,kafka,zk等)尽量了解源码,了解他们的代码风格。学习学习。
具体步骤:一看技术文章/书籍,二请教大佬,三动手实践操作。
Python Study方案:
大数据学习:
扒源码,最近在看Jstorm的源码。
Linux系统学习:
命令,shell脚本等等 红帽RHCE认证
四、现有工作建议
- 自己工作主动性缺乏,需要改正
- 工作内容不明确,有时候不知道我们的工作内容,哪些是需要我们做的。