登录
注册
node.js 学习社区
让node.js与其它语言进行通信(thrift),并使用zookeeper做服务协同

赵科科

2014-12-15 04:53

目标

需要node.js实现系统某一模块,其它模块之间通过thrift进行通信,为了统一也使用thrift在node.js的部分接口间与其它模块进行通讯(一部分接口通过http方式暴露)。为了便于管理服务,使用zookeeper做各个服务间协同处理。

下面主要以java和node.js为例叙述如何实现。

选用模块

  • 其它模块使用的是thrift0.6(如java,不使用高版本是因为有我们避免不了的Bug未修复)
  • java端使用thrift0.6,node.js端使用thrift0.7(注意在thrift0.8及以下版本中,node.js的lib不能抛出自定义异常,可以通过返回特殊变量值来绕过这个问题),分别用两边的thrift生成各自代码即可。
  • zookeeper的node.js client使用node-zookeeper的最新版本

主要代码

  • 首先定义thrift文件,nodis.thrift
namespace gen.service

struct User {
  1: i64 uid,
  2: string username,
  3: string info
}

service Nodis {
  /*void index(1: User user) throws (1: NodisException nodisException) -- can not catch exception <= thrift version 0.8*/
  string index(1: User user)
  string remove(1: string username)
}
  • 分别生成java代码
 thrift --gen java nodis.thrift

和node.js代码

thrift --gen js:node nodis.thrift
  • 实现node.js编写的server端代码, NodisServer.js
var thrift = require('thrift');
var ttransport = require('thrift/lib/thrift/transport');
var Nodis = require('./thrift/gen-nodejs/Nodis.js');
var NodisTypes = require('./thrift/gen-nodejs/nodis_types.js');
var zkUtils = require('./zk.js');

var index = function(user, fn){
    console.log(user);
    fn('ok');
    return;
};

var remove = function(username, fn){
    console.log(username);
    fn('ok');
    return;
};

var server_framed = thrift.createServer(Nodis, {
    index : index,
    remove : remove,

});

server_framed.listen(9998);

console.log('NodisServer is running now...');

zkUtils.registerService('/thrift_services/nodis', '192.168.1.126:9998');//在zookeeper上注册server服务

//add shutdown hook: remove service from zookeeper
process.on('SIGTERM', function () {
  console.log('Got SIGTERM.  Removing Zookeeper Registry.');
  zkUtils.removeServiceThenExit('/thrift_services/nodis', '192.168.126:9998', function(){
      //zkUtils.close();
      process.exit();//put this in 'close' callback later, now unsupported by node-zookeeper
  });
});
  • 实现java客户端调用代码,NodisClient.java:node.js端是异步处理的,所以java端选用TFramedTransport传输,node.js的thrift客户端也支持适用它的两种方式而已。
public class NodisProxy {
    private final static String SERVICE_NAME = "nodis";
    private static final Logger logger = LoggerFactory.getLogger(NodisProxy.class);

    public static Nodis.Client getClient() {
        List<String> hostAndPortList = ClusterLocation.getService(SERVICE_NAME);//取得zookeeper上已注册的nodis服务的所有节点信息
        Collections.shuffle(hostAndPortList);
        String hostAndPort = "";
        for (int i = 0; i < hostAndPortList.size(); i++) {
            try {
                hostAndPort = hostAndPortList.get(i);
                TTransport transport = new TFramedTransport(new TSocket(hostAndPort.split(":")[0], Integer.valueOf(hostAndPort.split(":")[1])));
                TProtocol protocol = new TBinaryProtocol(transport);
                transport.open();
                return new Nodis.Client.Factory().getClient(protocol);
            }
            catch (Exception e) {
                logger.error(" fail times : " + i + " and begin retry ");
            }
        }

        logger.error("failed to get zk nodis servive" );
        throw new Exception();
    }

    public static void release(final Nodis.Client client) {
        if (client == null) {
            return;
        }
        close(client);
    }

    protected static final void close(TServiceClient client) {
        client.getInputProtocol().getTransport().close();
        client.getOutputProtocol().getTransport().close();
    }
}

public class NodisProxyTest {
    public static void main(String[] args) {
        Nodis.Client client = NodisProxy.getClient();
        try {
            System.out.println(client.index(new User(2323, "sumory", "just test...")));
            System.out.println(client.remove("sumory"));
        }
        catch (TException e) {
            e.printStackTrace();
        }
        NodisProxy.release(client);
    }
}
  • 实现zookeeper基本功能的node.js代码,zk.js
var ZOOKEEPER = require('zookeeper');
var path = require('path');
var dateFormat = require('./lib/date_format.js');
var hosts='192.168.1.118:2181,192.168.1.118:2182,192.168.1.118:2183';

//var servers = this.connect.split(',');
//var serverNum = this.servers.length;
//var sleepTime = this.timeout/this.serverNum;

function ZkClient(hosts){
    this.zk = newZk(hosts, this);//
}

function newZk(hosts, zkClient){
    var zk = new ZOOKEEPER();
    var timeout = 60000;
    var options = {connect:hosts, timeout:timeout, debug_level:ZOOKEEPER.ZOO_LOG_LEVEL_INFO, host_order_deterministic:false};
    zk.init (options);
    zk.on('connect', function(zkk){
        console.log(new Date().format('yyyy-mm-dd HH:MM:ss'), ' zk session established, id = %s', zkk.client_id);
    });
    zk.on('close',function(zkk){
        console.log(new Date().format('yyyy-mm-dd HH:MM:ss'), ' zk session close...');
        zkClient.zk = newZk(hosts, zkClient);
    });
    return zk;
}

var zkclient = new ZkClient(hosts);

/**
  * @path /thrift_services/nodis
  * @node 192.168.126:9998
  */
exports.registerService = function(path, node){
    // EPHEMERAL:创建临时节点,ZooKeeper在感知连接机器宕机后会清除它创建的瞬节点
    zkclient.zk.a_create(path + '/' + node, '', ZOOKEEPER.ZOO_EPHEMERAL, function (rc, error, path)  {
        if (rc != 0){//error occurs
            console.log('node create result: %d, error: "%s", path: %s', rc, error, path);
        }
        else{
            console.log('node create result: ok, path: %s', path);
        }
    });
};

exports.removeServiceThenExit = function(path, node, fn){
    zkclient.zk.a_delete_(path + '/' + node, null, function(rc, err){
        if(rc!=0){
            console.log('delete error: ', rc, err);
        }
        else{
            console.log('delete ok');
        }  
        fn();
    });
};

exports.close = function(){
    zkclient.zk.close();
};
测试及注意点
  • zookeeper的timeout参数设置,不能太小,否则会很快loss connection,试验过60000比较合适
  • 退出server端的时候应注销zookeeper上的节点,否则下次在过期时间内再重启时,信息有变客户端就会出现问题
  • node-zookeeper使用的是c的zookeeper lib,目前版本仍有bug存在,遇到问题注意查阅它的issues列表
  • thrift的node.js包存在不能throw异常的bug(除非用trunk版本;查看生成的代码也很容易理解为什么不能throw,success函数的处理有问题),需要绕过这个问题,建议不要使用自定义的异常,使用自定义的异常或者错误code

原文引自:http://cnodejs.org/topic/501280dcf767cc9a51419013

回复 · 0

发表回复

你可以在回复中 @ 其他人