印迹

大数据

1. 是个什么东西?

  1. 核心有点?
  2. 出现的问题?
  3. 优化
  4. 单独优化过?那些点

Hadoop(HDFS)

Hadoop是一个根据google三大论文实现的一套海量数据存储、计算、查询的开源大数据框架。它包含三个基础组建 HDFS、Mapreduce、Hbase

为什么HDFS的blocksize是128M?

机架位置优化
磁盘的块大小为512byte,HDFS的块大小为128Mb,主要原因是为了减少磁盘的寻道时间。一次寻道大约需要10ms,目前市面上的千兆网卡速度为100Mb/s,寻道时间应该只占传输时间的1%,我们需要设置一块的大小为100M,因此设置块的大小为128M。

HDFS不适合的场景

1)低延迟的访问 HDFS在高吞吐量的数据传输上面做了很多优化,而这是以牺牲低延迟为代价的。  
2)太多小文件  因为NameNode是在内存中保存文件的元数据的,因此能够存储的文件个数受内存大小的限制。每一个文件、目录和块都占用大约150byte,因此假设有100W个文件,每一个文件占一个块,那就需要大约300M的内存。

partition

我们对于错综复杂的数据归类。比如在动物园里有牛羊鸡鸭鹅,他们都是混在一起的,但是到了晚上他们就各自牛回牛棚,羊回羊圈,鸡回鸡窝。partition的作用就是把这些数据归类。只不过在写程序的时候,mapreduce使用哈希HashPartitioner帮我们归类了。这个我们也可以自定义。

HDFS优化项

1. dfs.block.size 块越大,元数据越少,但是影响分片时间
2. mapred.local.dir 多盘提高IO
3. yarn的容器最大内存、最小内存 对应java的最大内存最小内存。
4. nodemanager 所管理的每个节点的最大内存(留出10%~20%的余量给系统和其他)
5. CPU资源,nodemanager管理的cpu量(流出20%给系统) 虚拟cpu数量(系统一共10个cpu 核数,你可以把这个cpu个数当成20个用。这20个cpu就是虚拟出来的)

Hbase

rowkey设计
越短越好

(1)数据的持久化文件HFile中是按照KeyValue存储的,如果Rowkey过长比如100个字节,1000万列数据光Rowkey就要占用100*1000万=10亿个字节,将近1G数据,这会极大影响HFile的存储效率;
(2)MemStore将缓存部分数据到内存,如果Rowkey字段过长内存的有效利用率会降低,系统将无法缓存更多的数据,这会降低检索效率。因此Rowkey的字节长度越短越好。
(3)目前操作系统是都是64位系统,内存8字节对齐。控制在16个字节,8字节的整数倍利用操作系统的最佳特性。

rowkey离散

如果Rowkey是按时间戳的方式递增,不要将时间放在二进制码的前面,建议将Rowkey的高位作为散列字段,由程序循环生成,低位放时间字段,这样将提高数据均衡分布在每个Regionserver实现负载均衡的几率。如果没有散列字段,首字段直接是时间信息将产生所有新数据都在一个 RegionServer上堆积的热点现象,这样在做数据检索的时候负载将会集中在个别RegionServer,降低查询效率。

hbase 获取数据的方式:

(1) 通过单个Rowkey访问,即按照某个Rowkey键值进行get操作,这样获取唯一一条记录;
(2) 通过Rowkey的range进行scan,即通过设置startRowKey和endRowKey,在这个范围内进行扫描。这样可以按指定的条件获取一批记录;
(3) 全表扫描,即直接扫描整张表中所有行记录。

多条件查询

(1)scan可以通过setCaching与setBatch方法提高速度(以空间换时间);
(2)scan可以通过setStartRow与setEndRow来限定范围。范围越小,性能越高。
通过巧妙的RowKey设计使我们批量获取记录集合中的元素挨在一起(应该在同一个Region下),可以在遍历结果时获得很好的性能。
(3)scan可以通过setFilter方法添加过滤器,这也是分页、多条件查询的基础。
在满足长度、三列、唯一原则后,我们需要考虑如何通过巧妙设计RowKey以利用scan方法的范围功能,使得获取一批记录的查询速度能提高。下例就描述如何将多个列组合成一个RowKey,使用scan的range来达到较快查询速度。

hbase内部机制

在HBase 中无论是增加新行还是修改已有的行,其内部流程都是相同的。HBase 接到命令后存下变化信息,或者写入失败抛出异常。默认情况下,执行写入时会写到两个地方:预写式日志(write-ahead log,也称HLog)和MemStore(见图2-1)。HBase 的默认方式是把写入动作记录在这两个地方,以保证数据持久化。只有当这两个地方的变化信息都写入并确认后,才认为写动作完成。
MemStore 是内存里的写入缓冲区,HBase 中数据在永久写入硬盘之前在这里累积。当MemStore 填满后,其中的数据会刷写到硬盘,生成一个HFile。HFile 是HBase 使用的底层存储格式。HFile 对应于列族,一个列族可以有多个HFile,但一个HFile 不能存储多个列族的数据。在集群的每个节点上,每个列族有一个MemStore。
大型分布式系统中硬件故障很常见,HBase 也不例外。设想一下,如果MemStore还没有刷写,服务器就崩溃了,内存中没有写入硬盘的数据就会丢失。HBase 的应对办法是在写动作完成之前先写入WAL。HBase 集群中每台服务器维护一个WAL 来记录发生的变化。WAL 是底层文件系统上的一个文件。直到WAL 新记录成功写入后,写动作才被认为成功完成。这可以保证HBase 和支撑它的文件系统满足持久性。大多数情况下,HBase 使用Hadoop 分布式文件系统(HDFS)来作为底层文件系统。
如果HBase 服务器宕机,没有从MemStore 里刷写到HFile 的数据将可以通过回放WAL 来恢复。你不需要手工执行。Hbase 的内部机制中有恢复流程部分来处理。每台HBase 服务器有一个WAL,这台服务器上的所有表(和它们的列族)共享这个WAL。
你可能想到,写入时跳过WAL 应该会提升写性能。但我们不建议禁用WAL,除非你愿意在出问题时丢失数据。如果你想测试一下,如下代码可以禁用WAL:
注意:不写入WAL 会在RegionServer 故障时增加丢失数据的风险。关闭WAL,出现故障时HBase 可能无法恢复数据,没有刷写到硬盘的所有写入数据都会丢失。

Hive

Hive和Hbase有各自不同的特征:hive是高延迟、结构化和面向分析的,hbase是低延迟、非结构化和面向编程的。Hive数据仓库在hadoop上是高延迟的。Hive集成Hbase就是为了使用hbase的一些特性

Spark

Spark 是一个基于MapReduce模式的一个分布式并行模型计算框架,他的存储是基于内存的。

集群

Spark 集群的核心是专注于集中调度服务器上的资源进行分布式作协计算(ClusterMaster),Spark 的集群模式分为三种一种是Standalone 、Yarn、Mesos模式, Spark的资源管理一般分为Master-Worker模式。

Application

就是Spark程序入口,他指定DAG的运行流程

SparkContext

他是Spark的最核心的部分,负责用户逻辑的处理API的调用,所有交互的接口处,包括想Cluster交互申请资源等。

Driver

每个Spark Job在运行的时候都会启动一个Driver和若干Executor, Driver 将Job转化为若干Task,再将Task 提交到各个Executor上运行,
1517985514301_3

Executor

Executor是运行具体Task的地方,我认为Task的下面的小任务可以认为是一个Stage。

RDD

Rdd是Resilient Distributed Dataset 的缩写,意思是弹性分布式数据集,他是一个抽象话的概念,我们可以简单理解成程序获取了一个文件的句柄,不同文件有read/write/seek等操作,而RDD也同样有自己的操作API,他的API分为三种

  1. 创建(read) 加载数据 比如从HDFS,HBASE等其他数据源。
  2. **Transformation(转换) ** 对已有的RDD做转换操作,从而产生新的RDD,Transformation操作有懒惰机制是不会立刻运行的,只有遇到Action的时候才会进行计算。 如Map、Filter等做操
  3. Action 就是运行时调用,产生计算结果。如save操作等。 ##Partition
    Partition是RDD在物理上的分区,partition 分布在各个节点上,spark就是利用多partition实现分布式并行计算的。
    ##Dependency
    对RDD进行Transform和Action操作就会产生RDD之间的依赖,这个依赖关系可以想象成一条链条,这个链条就是DAG(有向无环图)
    这里的依赖关系分为两种:宽依赖(wide dependency)和窄依赖(norrow dependency)
    窄依赖:窄依赖是parent 的每个partition 和 children partition 是一一对应的依赖关系。
    宽依赖: 宽依赖是parent的每个分区都可能被children的多个partition所依赖,可以理解为一对多依赖关系,在这个过程中一般会有记录的互相重组,所以这个过程也叫shuffle Dependency(shuffle依赖) 。

Job

一个正常的spark任务就是一个Job,一个Job包含N个Transformation和最少1个Action。

Shuffle

Spark的架构核心和hadoop 的mapreduce是一样,所以先说下mapreduce的shuffle,通常我们把map->reduce 的中间这个过程叫做shuffle我,shuffle就是数据打诨重组的过程。再看看spark,数据打诨重组的过程只发生在宽依赖中,所以宽依赖也叫Shuffle Dependency。

Stage

Spark 里边一个Job 他会由若干个任务组成的有向五环图,Spark 以shuffle为边界,将不同的任务分配到一个stage中去。也就是从头开始只要遇到一个宽依赖就分为一个stage。一直到结束。当然stage的运行也是前后依赖的关系。这样分配的意义在于再依赖是可以通过串行计算去完成的,他会放在一个Task组里边去执行,而这些Task组也会分配到同一个Executor中去,减少网络消耗,提高运算效率。

Task

spark 的Stage 里边有很多任务,也就是Task ,这些Task会放入到一个Taskset里边去,再将TaskSet作用到每个Partition上去执行。Task就是一系列的执行动作。

Persist&Cache

Persist是讲数据加载到内存或者磁盘上,他是可以指定参数加载,而Cache就是只是将数据加载到内存中,以MEMORY_ONLY的方式进行缓存,其实cache函数就是调动persist(MEMORY_ONLY)进行缓存的
Persist 有两种触发方式,一种是执行Action的时候,另一个是在直接执行persist或者cache的时候。

Checkpoint

为什么有CheckPoint 机制,是因为spark 是内存计算模型,内存是有限的,数据是无限的,在一串任务中,如果内存不够了,那么spark就会清理掉老的RDD数据,比如: 如果你的Job任务有100个Stage,第100个stag需要依赖第2个,那么中间从第3个到99个计算中完全可能把第二个Rdd的数据清理掉,到第100Stage进行计算的时候,就需要在计算一次第二个Stage的结果。为了避免这种情况Spark,引入了checkpoint机制,就是spark需要依赖一个stage数据时候他会先check缓存-> 如果没有 -> check checkpoint ->如果没有->重新计算。这是整个命中过程。

SparkStreaming

优化

  • join操作是否可以使用map广播的方式替代。
  • 使用reduceByKey/aggregateByKey来代替groupByKey,因为前者可以进行combiner操作,减少网络IO;
  • 使用foreachPartition代替foreach操作
  • 使用foreachPartition代替foreach操作
  • 使用Repartition操作可以有效增加任务的处理并行度

参数的优化

  • 根据资源情况,可以添加Executor的个数来有效,参数为 spark.executor.instances
  • 调整每个Executor的使用内核数, 参数为spark.executor.cores
  • 调整每个Executor的内存, 参数为spark.executor.memory
  • shuffle write task的buffer大小, 参数为spark.shuffle.file.buffer
  • shuffle read task的buffer大小, 参数为spark.reducer.maxSizeInFlight
  • 每一个stage的task的默认并行度, 默认为200, 建议修改为1000左右, 参数 spark.default.parallelism
  • 用于RDD的持久化使用的内存比例,默认0.6, 参数 ** spark.storage.memoryFraction**
  • 用户shuffle使用的内存比例, 默认为0.2, 参数 ** spark.shuffle.memoryFraction**
  • 限制读取Kafka数据的速率,参数 spark.streaming.kafka.maxRatePerPartition

问题

  • 内存太小OOM
  • 块丢失
  • 数据丢失
  • 计算问题

Elasticsearch

概述

ElasticSearch是一个基于Lucene构建的开源,分布式,RESTful搜索引擎。设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。支持通过HTTP使用JSON进行数据索引。

选用主要原因有:

实时性能优越;`安装、配置、维护、使用简单`;
`RESTful API` 和 `JSON格式的文档型数据`,降低开发调试的难度。 
`ES自带了中文分词`,支持中文搜索,但是,可以换用更高效精确的分词插件。 
他有自己`成熟的软件生态ELK`,
`强大的插件群`。
`分布式灵活扩展`
`文档齐全`
`社区强大`
 特点优势 
(1)Open Source(开源) 
(2)Apache Lucene(基于 Lucene) 
(3)Schema Free(模式自由) 
(4)Document Oriented(面向文档型的设计) 
(5)Real Time Data & Analytics(实时索引数据) 
(6)Distributed(分布式) 
(7)High Availability(高可靠性) 
(8)其他特性:RESTful API;JSON format;multi-tenancy;full text search;conflict management;per-operation persistence 

es的一些术语

cluster
  代表一个集群,集群中有多个节点,其中有一个为主节点,这个主节点是可以通过选举产生的,主从节点是对于集群内部来说的。es的一个概念就是去中心化,字面上理解就是无中心节点,这是对于集群外部来说的,因为从外部来看es集群,在逻辑上是个整体,你与任何一个节点的通信和与整个es集群通信是等价的。
shards
  代表索引分片,es可以把一个完整的索引分成多个分片,这样的好处是可以把一个大的索引拆分成多个,分布到不同的节点上。构成分布式搜索。分片的数量只能在索引创建前指定,并且索引创建后不能更改。
replicas
  代表索引副本,es可以设置多个索引的副本,副本的作用一是提高系统的容错性,当个某个节点某个分片损坏或丢失时可以从副本中恢复。二是提高es的查询效率,es会自动对搜索请求进行负载均衡。
recovery
  代表数据恢复或叫数据重新分布,es在有节点加入或退出时会根据机器的负载对索引分片进行重新分配,挂掉的节点重新启动时也会进行数据恢复。
river
  代表es的一个数据源,也是其它存储方式(如:数据库)同步数据到es的一个方法。它是以插件方式存在的一个es服务,通过读取river中的数据并把它索引到es中,官方的river有couchDB的,RabbitMQ的,Twitter的,Wikipedia的。
gateway

  代表es索引快照的存储方式,es默认是先把索引存放到内存中,当内存满了时再持久化到本地硬盘。gateway对索引快照进行存储,当这个es集群关闭再重新启动时就会从gateway中读取索引备份数据。es支持多种类型的gateway,有本地文件系统(默认),分布式文件系统,Hadoop的HDFS和amazon的s3云存储服务。

discovery.zen
  代表es的自动发现节点机制,es是一个基于p2p的系统,它先通过广播寻找存在的节点,再通过多播协议来进行节点之间的通信,同时也支持点对点的交互。
Transport
  代表es内部节点或集群与客户端的交互方式,默认内部是使用tcp协议进行交互,同时它支持http协议(json格式)、thrift、servlet、memcached、zeroMQ等的传输协议(通过插件方式集成)。
Mapping
mapping就是ES在索引的过程中讲字段推断为那种类型,也可以做一些设置,Mapping 可以分为settingmappings 两大类, settings主要设置一些index上的参数,如副本数,分片数;而mappings 主要是对文档结构进行描述。
mappings又可以分为三类:

我的话

优化方式有很多:

从系统大类上有ulimit 修改,如`句柄数`、`线程数`、`ES的JVM 内存调整`等;

进入ES内部优化包含也很多,举个例子就说mapping ,mapping是描述索引结构的一个术语,它主要分为两类:
    1. 一类呢:是settings,里边主要的配置项就是分片和副本数,当然也有其他的,如refresh time等。
    2. 第二类呢:mappings 它主要描述文档结构:
    它里边呢也有主要有三大配置:_all、_source、properites
        1. _all 主要是将文档中所有的字段全部包含进来进行索引了,在没有指定字段的情况下进行检索,它默认会检索这个字段,但是在我们业务中,往往都是对字段进行精确检索,我们需要关闭这个功能来提高索引速度、而且同时也会提高磁盘IO。"_all" : {"enabled" : false} 
        2. _source 主要是讲文档的所有字段数据保存在source document中,你可以理解为es不仅保存一份索引数据,而且还将原始数据进行保存下来。所以source字段在我们检索查看数据的时候很有用,如果这个字段{"enabled" : false} 那么检索的时候只返回ID,es就需要通过字段取索引中获取数据,效率就很低了,如果{"enabled" : true} 就会产生另外一个问题,那就是索引膨胀的速度就非常的快,我们就可以通过Compress来设置的是否压缩。
        2.1 当然,也有少数场景是可以关闭 _source 的:
        - 把 ES 作为时间序列数据库使用,只要聚合统计结果,不要源数据内容。
        - 把 ES 作为纯检索工具使用,_id 对应的内容在 HDFS 上另外存储,搜索后使用所得 _id 去 HDFS 上读取内容。


        3. propertis 就是最主要的指定文档结构了,它包括字段类型、父子关系,也就是主要的数据建模部分。这个根据业务的不同优化点也不同。比如: 他有很多数据类型,除了int string float之外还有数组类型、对象类型,如果只是geo的话还有geopoint类型、嵌套类型等。
        
    **优化篇:**
    ES作为一个快速搜索引擎最核心的功能就是,索引和检索,如何提高他的索引效率就是ES优化的核心问题:
    "index.translog.flush_threshold_ops": "100000" 
    "index.refresh_interval": "-1", 
    1. translog主要是让ES集群各个节点之间进行数据平衡,ES 做translog的这个过程比较消耗时间和资源,第一个参数是达到多少条进行translog操作,这个参数默认是5000, 可以将它调大些。 第二个参数 是集群每隔多少秒进行一次translog操作,模式120s ,这个频率太高设置大些(需要根据业务)或者直接设置为-1 我们手动做同步。
    
    2. 分片数量的选取
    测试分片数量=数据总量/单片数据总量
    需要计算单片数据总量达到多少时候可以满足要求:
    经验:单片不要超过1.88亿条,每个机器不要超过200个分片。
    
    3. 副本的选取
    es的副本机制会保障es在某一数据片失败的情况下进行数据恢复,保证数据的安全性。而es的副本机制还有一个特性就是,在高效检索的情况下做负载均衡,以提高检索效率;事实证明副本数不是越多或越少都不好,只有根据集群配置选取适中的副本数量就可以了。建议副本数为1~3个。副本优化案例:
        3.1 主片体现了索引,副片没有体现索引,是应为settings里边translog没有到达同步时间。
        3.2 在进行大规模数据导入的时候,进行讲副本数设置为0 ,因为在导入的过程中,副本也同时在复制,导致效率下降,同步完成之后将副本设置为1,他会自动复制副本。从而达到高效数据导入。
    4. 索引段segment
    es的底层是lucence,lucence 的索引保存在磁盘上的文件就是segment,segment 越多检索速度就越慢,es可以设置max_num_segments=1 也就是不让segment分裂,这样就可以保障快速检索。但是如果我们的数据量巨大,name这个segment就会特别大,检索起来也会有问题,而且大大提高索引错误率。所以这个优化需要慎重。
    5. 删除文档
    
    
    
    
    

遇到的问题

比如索引速度慢,查询速度慢。
索引速度调整索引 refresh_interval 、translog时间、副本数量等。
查询速度慢, 查询语句复杂,join过程优化。
分片复制错误等。 ulimit太小导致不能写入。

Data too large : indices.fielddata.cache.size ,在该数值达到一个百分比的时候会自动清空缓存,默认是无限制 就会产生该错误。设置为30%

Solr和ES对比

首先solr和ES都是基于lucence的开发的,有人ES的性能要优于solr我觉得其实不然,首先两者都是基于lucence、JVM ,虽然底层相同单他们各自有不用feature,不同的feature他的 test case (测试场景) 就不同,所以做benchmark 的意义就不是大。
我觉得ES能流行的原因是可能是, es 出现的时候比较晚,无论在设计上还是实现上都比较考究,新式吧,也就是比较接地气,更容易上手。加上Elastic 的生态也比较完善,扩展性更强,我们随便扩展自己的插件,说到这里es的插件生态也非常强大,几乎覆盖大数据生态相关组件,在这方面solr就逊色很多。

大面上的配置

索引分片数 index.number_of_shards: 5 
 
索引副本数  index.number_of_replicas: 1 

机架配置   node.rack_id:rack_one

内存配置  ES_MIN_MEM=27G  ES_MAX_MEM=27G 

优化配置

bootstrap.mlockall: true #这样可以elasticsearch确保使用物理内存,不使用linux swap 。

Mapping

Kafka

概述

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。
他的结构是: 它分为 topic->broker->partion-> 副本
他的使用模式很简单就是生产者消费者模式,生产者可以有多个,而他的消费者最好和他的partition数量相同,如果大于partition个数那么多余的consumer,

Kakfa Broker Leader的选举: 抢占式做Leader,谁第一个到就是谁。同时已有一个Broker可以做leader。
Consumergroup

  • 每个消费者都所属一个分组,分组的多少是可以有多个的。
  • 每个组的消费情况使用offset来控制,
  • 无论consumer 的thread有多少,都会讲所有partition消费完。
  • Consumer的增加或者减少都会进行balence操作

Partition leader与follower
partition和副本的关系是leader和follower的关系,producer写kafka的时候先写partition leader,再由partition leader push给其他的partition follower

遇到的问题

1.一个partition只能被一个消费者消费(一个消费者可以同时消费多个partition)
2.分区数据量不均衡:
Topic上设置了四个分区,压测过程中,发现每个分区的数据量差别挺大的,极端的时候,只有一个分区有数据,其余三个分区空闲。
解决方法,在用生产者生产数据的时候,send方法需要指定key。Kafka会根据key的值,通过一定的算法,如hash,将数据平均的发送到不同的分区上。

  1. kafka的副本问题。
    副本不参与消费。
    参考:http://blog.csdn.net/ychenfeng/article/details/74980531