平时的开发中线程是个少不了的东西,比如tomcat里的servlet就是线程,没有线程我们如何提供多用户访问呢?不过很多刚开始接触线程的开发攻城师却在这个上面吃了不少苦头。怎么做一套简便的线程开发模式框架让大家从单线程开发快速转入多线程开发,这确实是个比较难搞的工程。

那具体什么是线程呢?首先看看进程是什么,进程就是系统中执行的一个程序,这个程序可以使用内存、处理器、文件系统等相关资源。例如 QQ软件、eclipse、tomcat等就是一个exe程序,运行启动起来就是一个进程。为什么需要多线程?如果每个进程都是单独处理一件事情不能多个任务同时处理,比如我们打开qq只能和一个人聊天,我们用eclipse开发代码的时候不能编译代码,我们请求tomcat服务时只能服务一个用户请求,那我想我们还在原始社会。多线程的目的就是让一个进程能够同时处理多件事情或者请求。比如现在我们使用的QQ软件可以同时和多个人聊天,我们用eclipse开发代码时还可以编译代码,tomcat可以同时服务多个用户请求。

线程这么多好处,怎么把单进程程序变成多线程程序呢?不同的语言有不同的实现,这里说下java语言的实现多线程的两种方式:扩展java.lang.Thread类、实现java.lang.Runnable接口。
先看个例子,假设有100个数据需要分发并且计算。看下单线程的处理速度:

package thread;

import java.util.Vector;

public class OneMain {
       public static void main(String[] args) throws InterruptedException {
            Vector<Integer> list = new Vector<Integer>(100);

             for (int i = 0; i < 100; i++) {
                  list.add(i);
            }

             long start = System.currentTimeMillis();
             while (list.size() > 0) {
                   int val = list.remove(0);
                  Thread. sleep(100);//模拟处理
                  System. out.println(val);
            }
             long end = System.currentTimeMillis();

            System. out.println("消耗 " + (end - start) + " ms");

      }

       // 消耗 10063 ms
}

再看一下多线程的处理速度,采用了10个线程分别处理:

package thread;

import java.util.Vector;
import java.util.concurrent.CountDownLatch;

public class MultiThread extends Thread {
     static Vector<Integer> list = new Vector<Integer>(100);
     static CountDownLatch count = new CountDownLatch(10);

     public void run() {

          while (list.size() > 0) {
               try {
                    int val = list.remove(0);
                    System.out.println(val);
                    Thread.sleep(100);//模拟处理
               } catch (Exception e) {
                    // 可能数组越界,这个地方只是为了说明问题,忽略错误
               }

          }
         
          count.countDown(); // 删除成功减一

     }

     public static void main(String[] args) throws InterruptedException {
         
          for (int i = 0; i < 100; i++) {
               list.add(i);
          }
         
          long start = System.currentTimeMillis();

          for (int i = 0; i < 10; i++) {
               new MultiThread().start();
          }

         

          count.await();
          long end = System.currentTimeMillis();
          System.out.println("消耗 " + (end - start) + " ms");

     }

     // 消耗 1001 ms
}

大家看到了线程的好处了吧!单线程需要10S,10个线程只需要1S。充分利用了系统资源实现并行计算。也许这里会产生一个误解,是不是增加的线程个数越多效率越高。线程越多处理性能越高这个是错误的,范式都要合适,过了就不好了。需要普及一下计算机硬件的一些知识。我们的cpu是个运算器,线程执行就需要这个运算器来运行。不过这个资源只有一个,大家就会争抢。一般通过以下几种算法实现争抢cpu的调度:

1、队列方式,先来先服务。不管是什么任务来了都要按照队列排队先来后到。
2、时间片轮转,这也是最古老的cpu调度算法。设定一个时间片,每个任务使用cpu的时间不能超过这个时间。如果超过了这个时间就把任务暂停保存状态,放到队列尾部继续等待执行。
3、优先级方式:给任务设定优先级,有优先级的先执行,没有优先级的就等待执行。

这三种算法都有优缺点,实际操作系统是结合多种算法,保证优先级的能够先处理,但是也不能一直处理优先级的任务。硬件方面为了提高效率也有多核cpu、多线程cpu等解决方案。目前看得出来线程增多了会带来cpu调度的负载增加,cpu需要调度大量的线程,包括创建线程、销毁线程、线程是否需要换出cpu、是否需要分配到cpu。这些都是需要消耗系统资源的,由此,我们需要一个机制来统一管理这一堆线程资源。线程池的理念提出解决了频繁创建、销毁线程的代价。线程池指预先创建好一定大小的线程等待随时服务用户的任务处理,不必等到用户需要的时候再去创建。特别是在java开发中,尽量减少垃圾回收机制的消耗就要减少对象的频繁创建和销毁。

之前我们都是自己实现的线程池,不过随之jdk1.5的推出,jdk自带了 java.util.concurrent并发开发框架,解决了我们大部分线程池框架的重复工作。可以使用Executors来建立线程池,列出以下大概的,后面再介绍。
newCachedThreadPool 建立具有缓存功能线程池
newFixedThreadPool 建立固定数量的线程
newScheduledThreadPool 建立具有时间调度的线程

有了线程池后有以下几个问题需要考虑:
1、线程怎么管理,比如新建任务线程。
2、线程如何停止、启动。
3、线程除了scheduled模式的间隔时间定时外能否实现精确时间启动。比如晚上1点启动。
4、线程如何监控,如果线程执行过程中死掉了,异常终止我们怎么知道。

考虑到这几点,我们需要把线程集中管理起来,用java.util.concurrent是做不到的。需要做以下几点:
1、将线程和业务分离,业务的配置单独做成一个表。
2、构建基于concurrent的线程调度框架,包括可以管理线程的状态、停止线程的接口、线程存活心跳机制、线程异常日志记录模块。
3、构建灵活的timer组件,添加quartz定时组件实现精准定时系统。
4、和业务配置信息结合构建线程池任务调度系统。可以通过配置管理、添加线程任务、监控、定时、管理等操作。
组件图为:
分布式调度框架-lanceyan.com

构建好线程调度框架是不是就可以应对大量计算的需求了呢?答案是否定的。因为一个机器的资源是有限的,上面也提到了cpu是时间周期的,任务一多了也会排队,就算增加cpu,一个机器能承载的cpu也是有限的。所以需要把整个线程池框架做成分布式的任务调度框架才能应对横向扩展,比如一个机器上的资源呢达到瓶颈了,马上增加一台机器部署调度框架和业务就可以增加计算能力了。好了,如何搭建?如下图:
分布式调度框架-lanceyan.com

基于jeeframework我们封装spring、ibatis、数据库等操作,并且可以调用业务方法完成业务处理。主要组件为:
1、任务集中存储到数据库服务器
2、控制中心负责管理集群中的节点状态,任务分发
3、线程池调度集群负责控制中心分发的任务执行
4、web服务器通过可视化操作任务的分派、管理、监控。

一般这个架构可以应对常用的分布式处理需求了,不过有个缺陷就是随着开发人员的增多和业务模型的增多,单线程的编程模型也会变得复杂。比如需要对1000w数据进行分词,如果这个放到一个线程里来执行,不算计算时间消耗光是查询数据库就需要耗费不少时间。有人说,那我把1000w数据打散放到不同机器去运算,然后再合并不就行了吗?因为这是个特例的模式,专为了这个需求去开发相应的程序没有问题,但是以后又有其他的海量需求如何办?比如把倒退3年的所有用户发的帖子中发帖子最多的粉丝转发的最高的用户作息时间取出来。又得编一套程序实现,太麻烦!分布式云计算架构要解决的就是这些问题,减少开发复杂度并且要高性能,大家会不会想到一个最近很热的一个框架,hadoop,没错就是这个玩意。hadoop解决的就是这个问题,把大的计算任务分解、计算、合并,这不就是我们要的东西吗?不过玩过这个的人都知道他是一个单独的进程。不是!他是一堆进程,怎么和我们的调度框架结合起来?看图说话:
task31

基本前面的分布式调度框架组件不变,增加如下组件和功能:
1、改造分布式调度框架,可以把本身线程任务变成mapreduce任务并提交到hadoop集群。
2、hadoop集群能够调用业务接口的spring、ibatis处理业务逻辑访问数据库。
3、hadoop需要的数据能够通过hive查询。
4、hadoop可以访问hdfs/hbase读写操作。
5、业务数据要及时加入hive仓库。
6、hive处理离线型数据、hbase处理经常更新的数据、hdfs是hive和hbase的底层结构也可以存放常规文件。

这样,整个改造基本完成。不过需要注意的是架构设计一定要减少开发程序的复杂度。这里虽然引入了hadoop模型,但是框架上开发者还是隐藏的。业务处理类既可以在单机模式下运行也可以在hadoop上运行,并且可以调用spring、ibatis。减少了开发的学习成本,在实战中慢慢体会就学会了一项新技能。

界面截图:
task4

原创文章,转载请注明: 转载自LANCEYAN.COM

本文链接地址: JAVA线程池管理及分布式HADOOP调度框架搭建

互联网创业中大部分人都是草根创业,这个时候没有强劲的服务器,也没有钱去买很昂贵的海量数据库。在这样严峻的条件下,一批又一批的创业者从创业中获得成功,这个和当前的开源技术、海量数据架构有着必不可分的关系。比如我们使用mysql、nginx等开源软件,通过架构和低成本服务器也可以搭建千万级用户访问量的系统。新浪微博、淘宝网、腾讯等大型互联网公司都使用了很多开源免费系统搭建了他们的平台。所以,用什么没关系,只要能够在合理的情况下采用合理的解决方案。

那怎么搭建一个好的系统架构呢?这个话题太大,这里主要说一下数据分流的方式。比如我们的数据库服务器只能存储200个数据,突然要搞一个活动预估达到600个数据。
可以采用两种方式:横向扩展或者纵向扩展。
纵向扩展是升级服务器的硬件资源。但是随着机器的性能配置越高,价格越高,这个代价对于一般的小公司是承担不起的。
横向扩展是采用多个廉价的机器提供服务。这样一个机器只能处理200个数据、3个机器就可以处理600个数据了,如果以后业务量增加还可以快速配置增加。在大多数情况都选择横向扩展的方式。如下图:
图1

图2

现在有个问题了,这600个数据如何路由到对应的机器。需要考虑如果均衡分配,假设我们600个数据都是统一的自增id数据,从1~600,分成3堆可以采用 id mod 3的方式。其实在真实环境可能不是这种id是字符串。需要把字符串转变为hashcode再进行取模。

目前看起来是不是解决我们的问题了,所有数据都很好的分发并且没有达到系统的负载。但如果我们的数据需要存储、需要读取就没有这么容易了。业务增多怎么办,大家按照上面的横向扩展知道需要增加一台服务器。但是就是因为增加这一台服务器带来了一些问题。看下面这个例子,一共9个数,需要放到2台机器(1、2)上。各个机器存放为:1号机器存放1、3、5、7、9 ,2号机器存放 2、4、6、8。如果扩展一台机器3如何,数据就要发生大迁移,1号机器存放1、4、7, 2号机器存放2、5、8, 3号机器存放3、6、9。如图:

图3
从图中可以看出 1号机器的3、5、9迁移出去了、2好机器的4、6迁移出去了,按照新的秩序再重新分配了一遍。数据量小的话重新分配一遍代价并不大,但如果我们拥有上亿、上T级的数据这个操作成本是相当的高,少则几个小时多则数天。并且迁移的时候原数据库机器负载比较高,那大家就有疑问了,是不是这种水平扩展的架构方式不太合理?

—————————–华丽分割线—————————————

一致性hash就是在这种应用背景提出来的,现在被广泛应用于分布式缓存,比如memcached。下面简单介绍下一致性hash的基本原理。最早的版本 http://dl.acm.org/citation.cfm?id=258660。国内网上有很多文章都写的比较好。如: http://blog.csdn.net/x15594/article/details/6270242

下面简单举个例子来说明一致性hash。

准备:1、2、3 三台机器
还有待分配的9个数 1、2、3、4、5、6、7、8、9
一致性hash算法架构

步骤
一、构造出来 2的32次方 个虚拟节点出来,因为计算机里面是01的世界,进行划分时采用2的次方数据容易分配均衡。另 2的32次方是42亿,我们就算有超大量的服务器也不可能超过42亿台吧,扩展和均衡性都保证了。
一致性hash
二、将三台机器分别取IP进行hashcode计算(这里也可以取hostname,只要能够唯一区别各个机器就可以了),然后映射到2的32次方上去。比如1号机器算出来的hashcode并且mod (2^32)为 123(这个是虚构的),2号机器算出来的值为 2300420,3号机器算出来为 90203920。这样三台机器就映射到了这个虚拟的42亿环形结构的节点上了。
图5
三、将数据(1-9)也用同样的方法算出hashcode并对42亿取模将其配置到环形节点上。假设这几个节点算出来的值为 1:10,2:23564,3:57,4:6984,5:5689632,6:86546845,7:122,8:3300689,9:135468。可以看出 1、3、7小于123, 2、4、9 小于 2300420 大于 123, 5、6、8 大于 2300420 小于90203920。从数据映射到的位置开始顺时针查找,将数据保存到找到的第一个Cache节点上。如果超过2^32仍然找不到Cache节点,就会保存到第一个Cache节点上。也就是1、3、7将分配到1号机器,2、4、9将分配到2号机器,5、6、8将分配到3号机器。
图6
这个时候大家可能会问,我到现在没有看见一致性hash带来任何好处,比传统的取模还增加了复杂度。现在马上来做一些关键性的处理,比如我们增加一台机器。按照原来我们需要把所有的数据重新分配到四台机器。一致性hash怎么做呢?现在4号机器加进来,他的hash值算出来取模后是12302012。 5、8 大于2300420 小于12302012 ,6 大于 12302012 小于90203920 。这样调整的只是把5、8从3号机器删除,4号机器中加入 5、8。
图7
同理,删除机器怎么做呢,假设2号机器挂掉,受影响的也只是2号机器上的数据被迁移到离它节点,上图为4号机器。
图8
大家应该明白一致性hash的基本原理了吧。不过这种算法还是有缺陷,比如在机器节点比较少、数据量大的时候,数据的分布可能不是很均衡,就会导致其中一台服务器的数据比其他机器多很多。为了解决这个问题,需要引入虚拟服务器节点的机制。如我们一共有只有三台机器,1、2、3。但是实际又不可能有这么多机器怎么解决呢?把 这些机器各自虚拟化出来3台机器,也就是 1a 1b 1c 2a 2b 2c 3a 3b 3c,这样就变成了9台机器。实际 1a 1b 1c 还是对应1。但是实际分布到环形节点就变成了9台机器。数据分布也就能够更分散一点。如图:
图91

写了这么多一致性hash,这个和分布式搜索有什么半点关系?我们现在使用solr4搭建了分布式搜索,测试了基于solrcloud的分布式平台提交20条数据居然需要几十秒,所以就废弃了solrcloud。采用自己hack solr平台,不用zookeeper做分布式一致性管理平台,自己管理数据的分发机制。既然需要自己管理数据的分发,就需要考虑到索引的创建,索引的更新。这样我们的一致性hash也就用上了。整体架构如下图:

图10
建立和更新需要维持机器的位置,能够根据数据的key找到对应的数据分发并更新。这里需要考虑的是如何高效、可靠的把数据建立、更新到索引里。
备份服务器防止建立服务器挂掉,可以根据备份服务器快速恢复。
读服务器主要做读写分离使用,防止写索引影响查询数据。
集群管理服务器管理整个集群内的服务器状态、告警。

整个集群随着业务增多还可以按照数据的类型划分,比如用户、微博等。每个类型按照上图架构搭建,就可以满足一般性能的分布式搜索。对于solr和分布式搜索的话题后续再聊。

扩展阅读:
java的hashmap随着数据量的增加也会出现map调整的问题,必要的时候就初始化足够大的size以防止容量不足对已有数据进行重新hash计算。

疫苗:Java HashMap的死循环 http://coolshell.cn/articles/9606.html
一致性哈希算法的优化—-关于如何保正在环中增加新节点时,命中率不受影响 (原拍拍同事scott)http://scottina.iteye.com/blog/650380

语言实现:
http://weblogs.java.net/blog/2007/11/27/consistent-hashing java 版本的例子
http://blog.csdn.net/mayongzhan/archive/2009/06/25/4298834.aspx PHP 版的例子
http://www.codeproject.com/KB/recipes/lib-conhash.aspx C语言版本例子

原创文章,转载请注明: 转载自LANCEYAN.COM

本文链接地址: 一致性hash和solr千万级数据分布式搜索引擎中的应用