This is the place where I share my ideas.You can also visit me on @CSDN or @oschina.
Give Chapter and Verse and Author for Transshipment, copy or other using. Thanks!
广播变量允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。广播变量可被用于有效地给每个节点一个大输入数据集的副本。Spark还尝试使用高效地广播算法来分发变量,进而减少通信的开销。 Spark的动作通过一系列的步骤执行,这些步骤由分布式的洗牌操作分开。Spark自动地广播每个步骤每个任务需要的通用数据。这些广播数据被序列化地缓存,在运行任务之前被反序列化出来。这意味着当我们需要在多个阶段的任务之间使用相同的数据,或者以反序列化形式缓存数据是十分重要的时候,显式地创建广播变量才有用。
累加器是仅仅被相关操作累加的变量,因此可以在并行中被有效地支持。它可以被用来实现计数器和总和。Spark原生地只支持数字类型的累加器,编程者可以添加新类型的支持。如果创建累加器时指定了名字,可以在Spark的UI界面看到。这有利于理解每个执行阶段的进程。(对于python还不支持) 累加器通过对一个初始化了的变量v调用SparkContext.accumulator(v)来创建。在集群上运行的任务可以通过add或者”+=”方法在累加器上进行累加操作。但是,它们不能读取它的值。只有驱动程序能够读取它的值,通过累加器的value方法。
``` package com.Streaming;
import org.apache.spark.Accumulator; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.Time; import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import scala.Tuple2;
import java.util.*;
/**
? */ public class BroadcastAccumulator {
/**
/**
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local[2]").
setAppName("WordCountOnlieBroadcast");
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));
/**
* 没有action的话,广播并不会发出去!
*
* 使用broadcast广播黑名单到每个Executor中!
*/
broadcastList = jsc.sc().broadcast(Arrays.asList("Hadoop","Mahout","Hive"));
/**
* 全局计数器!用于统计在线过滤了多少个黑名单!
*/
accumulator = jsc.sparkContext().accumulator(0,"OnlineBlackListCounter");
JavaReceiverInputDStream<String> lines = jsc.socketTextStream("Master", 9999);
/**
* 这里省去flatmap因为名单是一个个的!
*/
JavaPairDStream<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) {
return new Tuple2<String, Integer>(word, 1);
}
});
JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) {
return v1 + v2;
}
});
/**
* Funtion里面 前几个参数是 入参。
* 后面的出参。
* 体现在call方法里面!
*
* 这里直接基于RDD进行操作了!
*/
wordsCount.foreach(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
@Override
public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws Exception {
rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() {
@Override
public Boolean call(Tuple2<String, Integer> wordPair) throws Exception {
if (broadcastList.value().contains(wordPair._1)) {
/**
* accumulator不应该仅仅用来计数。
* 可以同时写进数据库或者redis中!
*/
accumulator.add(wordPair._2);
return false;
}else {
return true;
}
};
/**
* 这里真的希望 广播和计数器执行的话。要进行一个action操作!
*/
}).collect();
System.out.println("广播器里面的值"+broadcastList.value());
System.out.println("计时器里面的值"+accumulator.value());
return null;
}
});
jsc.start();
jsc.awaitTermination();
jsc.close();
}
}
task处理到一半,失败了。可是数据 已经在输出存储在数据库了。如果第二次继续重试task,数据是不是就重复输出了?
Spark Streaming 的任务失败,讲会自动进行重试, 数据会被多次写入到存储程序中。
1.能够处理且处理一次。 2.能够输出且只输出一次。
Exactly one
举例:A 给B转账,A一定会被扣除,且被扣除一次。B能接受到A转账的数据,且只接受到一次。
问:会不会完全失败呢? 答:可能性不大,一般会被处理。 除非硬件 全 部崩溃。
数据不断流进Executor。Receiver。
Executor 中只有 具体该怎么算 这个事情。
WAL: Write-Ahead Logging[1] 预写日志系统 数据库中一种高效的日志算法,对于非内存数据库而言,磁盘I/O操作是数据库效率的一大瓶颈。在相同的数据量下,采用WAL日志的数据库系统在事务提交时,磁盘写操作只有传统的回滚日志的一半左右,大大提高了数据库磁盘I/O操作的效率,从而提高了数据库的性能。
1.数据零丢失:必须有可靠的数据来源和可靠的Receiver,且整个应用程序的metadata必须进行checkpoint,且通过WAL来保证数据安全。 2.Spark Streaming 1.3 的时候 为了避免WAL的性能 损失和实现Exactly Once而提供了Kafka Direct API,把Kafka作为文件存储系统!!!(兼具了流的优势和文件系统的优势。Kafka消耗磁盘和HDFS消耗磁盘是一样的。直接操作它的offset) 至此,Spark Streaming + Kafka 就构建了完美的流处理世界。 为什么完美: 1,数据不用拷贝副本。2.不用WAL 而性能损耗。3.Kafka比HDFS搞笑很多。 又是怎么解决重复消费问题呢? 直接管理offset,所以也不好重复消费数据。自己处理了哪些东西,自己知道。 所有的Executor 通过Kafka API直接消费数据。 事务 实现啦!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Receiver在接受数据 的时候崩溃,并没有完成副本的复制。Receiver恢复之后要重新接受【前提:Kafka】 接受数据 会在zookeeper里面管理。 确认接受到信息,才会读取下面的信息。
如果Driver 突然崩溃。则此时,Executor会被Kill掉,那么Executor中的数据就会丢失。(不做WAL的情况下)
此时就必须通过,例如WAL的方式(先把数据保存,再谈其他事情),让所有的数据都通过例如HDFS的方式首先进行安全性容错处理。 此时如果Executor 中的数据 丢失的话,就可以通过WAL恢复回来。
Kafka是毫无疑问的第一选择。必须精通。做Spark ,Kafka的重要性绝对比HDFS重要。 在Receiver 收到数据且保存到了HDFS等持久化引擎 但是没有来得及进行updateOffsets,此时Receiver崩溃后重新启动就会通过管理Kafka的Zookeeper中元数据再次重复读取数据,但是此时Spark Streaming 认为是成功的。 但是,Kafka认为是失败的(因为没有更新offset到zookeeper中),此时就会导致数据重复消费的情况。 怎么解决:处理数据的时候可以访问到元数据信息,那么可以把元数据信息 写入到内存数据库。(MemReset) 查询一下 元数据信息是否被处理过,处理过就跳过。 每次处理都查。 一个很简单的内存数据结构。SQLite。
1.通过WAL的方式弊端 :会极大的损伤Spark Streaming中Receivers 接受数据的性能。实际生产环境下,用Receivers 的情况并不多,而是用Kafka的Direct API。 2.如果通过Kafka作为数据来源的话,Kafka中有数据,然后Receiver接受的时候又会有数据副本,这个时候其实是存储资源的浪费。
因为Spark Streaming 在计算的时候基于Spark Core ,而Spark Core 天生会做以下事情导致Spark Streaming 的结果(部分)重复输出; 1.1Task重试。 1.2Stage重试。 1.3Job重试。
2.1 设置Spark.task.maxFailures (最大失败次数)次数为一。 2.2 设置spark.speculation为关闭状态(因为慢任务推测非常消耗性能,所以关闭后可以显著的提高Spark的处理性能),就不会出现两个相同的任务在运行了。
Spark Streaming on Kafka的话,Job失败后,可以设置auto.offset.reset为largest的方式。这样就自动的进行恢复。
最后再次强调,可以通过transform和foreachRDD基于业务逻辑代码进行逻辑控制来实现数据不重复消费和输出不重复! 这两个方式类似于Spark Streaming的后门,可以做任意想象的控制操作!
最近做项目时,碰到一些情况,需要修改一小部分代码。而tomcat每次修改完代码都要重启才能生效。浪费了时间。
在网上看到一些方法,都要改配置文件啊,或者加 JavaRebel插件啊才能实现热部署,比较麻烦。
这里有个简单的小方法,实现不用重启tomcat服务器,修改方法里的内容(不包括类加方法,改结构喔),就能生效。达到快速调试的目的。
两步OK!轻松加愉快喔! 注意此方法 也有局限~增减方法还是要重启的~感谢阅读~
##Redis介绍:
redis是一个key-value存储系统。和Memcached类似,它支持存储的value类型相对更多,包括string(字符串)、list(链表)、set(集合)、zset(sorted set –有序集合)和hash(哈希类型)。这些数据类型都支持push/pop、add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的。在此基础上,redis支持各种不同方式的排序。与memcached一样,为了保证效率,数据都是缓存在内存中。区别的是redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步。 Redis 是一个高性能的key-value数据库。 redis的出现,很大程度补偿了memcached这类key/value存储的不足,在部 分场合可以对关系数据库起到很好的补充作用。它提供了Java,C/C++,C#,PHP,JavaScript,Perl,Object-C,Python,Ruby,Erlang等客户端,使用很方便。[1] Redis支持主从同步。数据可以从主服务器向任意数量的从服务器上同步,从服务器可以是关联其他从服务器的主服务器。这使得Redis可执行单层树复制。存盘可以有意无意的对数据进行写操作。由于完全实现了发布/订阅机制,使得从数据库在任何地方同步树时,可订阅一个频道并接收主服务器完整的消息发布记录。同步对读取操作的可扩展性和数据冗余很有帮助。
##Redis安装:
在linux执行以下命令
sudo apt-get install redis-server
启动 Redis:
redis-server
连接redis:
redis-cli
输入:
redis 127.0.0.1:6379> ping
显示:
PONG
说明你已经成功地安装Redis在您的机器上。
首先要修改/etc/redis的redis配置。讲ip绑定为127.0.0.1注释掉。 否则无法在另外一台电脑上远程连接redis!(例如在windows里远程连接虚拟机linux里的redis)
jedis-2.7.2.jar
commons-pool2-2.4.2.jar
public class RedisJava {
public static void main(String[] args) {
// Connecting to Redis server on localhost
Jedis jedis = new Jedis("10.137.12.12", 6379);
System.out.println("Connection to server sucessfully");
// check whether server is running or not
System.out.println("Server is running: " + jedis.ping());//成功会输出PONG
jedis.set("tch", "sb");
// Get the stored data and print it
System.out.println("tch:: " + jedis.get("tch"));
}
}
附录:
RDD(Resilient Distributed Datasets) ,弹性分布式数据集, 是分布式内存的一个抽象概念,指的是一个只读的,可分区的分布式数据集,这个数据集的全部或部分可以缓存在内存中,在多次计算间重用。
RDD提供了一种高度受限的共享内存模型,即RDD是只读的记录分区的集合,只能通过在其他RDD执行确定的转换操作(如map、join和group by)而创建,然而这些限制使得实现容错的开销很低。对开发者而言,RDD可以看作是Spark的一个对象,它本身运行于内存中,如读文件是一个RDD,对文件计算是一个RDD,结果集也是一个RDD ,不同的分片、 数据之间的依赖 、key-value类型的map数据都可以看做RDD。
1.自动进行内存和磁盘切换 2.基于lineage的高效容错 3.task如果失败会特定次数的重试 4.stage如果失败会自动进行特定次数的重试,而且只会只计算失败的分片 5.checkpoint【每次对RDD操作都会产生新的RDD,如果链条比较长,计算比较笨重,就把数据放在硬盘中】和persist 【内存或磁盘中对数据进行复用】(检查点、持久化) 6.数据调度弹性:DAG TASK 和资源管理无关 7.数据分片的高度弹性repartion