百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程文章 > 正文

格物致知-记一次Nodejs源码分析的经历

qiyuwang 2024-10-31 15:51 15 浏览 0 评论

作者: theanarkh 来源:编程杂技

昨天分析http模块相关的代码时,遇到了一个晦涩的逻辑,看了想,想了看还是没看懂。百度、谷歌了很多帖子也没看到合适的答案。突然看到一个题目有点相似的搜索结果,点进去是Stack Overflow上的帖子,但是已经404,最后还是通过快照功能成功看到内容。这个帖子[1]和我的疑惑不相关,但是突然给了我一些灵感。沿着这个灵感去看了代码,最后下载nodejs源码,加了一些log,编译了一夜(太久了,等不及编译完成,得睡觉了)。上午起来验证,终于解开了疑惑。这个问题源于下面这段代码。

function connectionListenerInternal(server, socket) {

socket.server = server;

// 分配一个http解析器

const parser = parsers.alloc();

// 解析请求报文

parser.initialize(

HTTPParser.REQUEST,

new HTTPServerAsyncResource('HTTPINCOMINGMESSAGE', socket),

server.maxHeaderSize || 0,

server.insecureHTTPParser === undefined ?

isLenient() : server.insecureHTTPParser,

);

parser.socket = socket;

// 开始解析头部的开始时间

parser.parsingHeadersStart = nowDate();

socket.parser = parser;

const state = {

onData: null,

onEnd: null,

onClose: null,

onDrain: null,

// 同一tcp连接上,请求和响应的的队列

outgoing: [],

incoming: [],

outgoingData: 0,

keepAliveTimeoutSet: false

};

state.onData = socketOnData.bind(undefined, server, socket, parser, state);

socket.on('data', state.onData);


if (socket._handle && socket._handle.isStreamBase &&

!socket._handle._consumed) {

parser._consumed = true;

socket._handle._consumed = true;

parser.consume(socket._handle);

}

parser[kOnExecute] =

onParserExecute.bind(undefined, server, socket, parser, state);


socket._paused = false;

}

这段代码看起来很多,这是启动http服务器后,有新的tcp连接建立时执行的回调。问题在于tcp上有数据到来时,是怎么处理的,上面代码中nodejs监听了socket的data事件,同时注册了钩子kOnExecute。data事件我们都知道是流上有数据到来时触发的事件。我们看一下socketOnData做了什么事情。

function socketOnData(server, socket, parser, state, d) {

// 交给http解析器处理,返回已经解析的字节数

const ret = parser.execute(d);

onParserExecuteCommon(server, socket, parser, state, ret, d);

}

这看起来没有问题,socket上有数据,然后交给http解析器处理。几乎所有http模块源码解析的文章也是这样分析的,我第一反应也觉得这个没问题,那kOnExecute是做什么的呢?kOnExecute钩子函数的值是onParserExecute,这个看起来也是解析tcp上的数据的,看起来和onSocketData是一样的作用,难道tcp上的数据有两个消费者?我们看一下kOnExecute什么时候被回调的。

void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override {


Local<Value> ret = Execute(buf.base, nread);

Local<Value> cb =

object()->Get(env()->context(), kOnExecute).ToLocalChecked();

MakeCallback(cb.As<Function>(), 1, &ret);

}

在node_http_parser.cc中的OnStreamRead中被回调,那么OnStreamRead又是什么时候被回调的呢?OnStreamRead是nodejs中c++层流操作的通用函数,当流有数据的时候就会执行该回调。而且OnStreamRead中也会把数据交给http解析器解析。这看起来真的有两个消费者?这就很奇怪,为什么一份数据会交给http解析器处理两次?这时候我的想法就是这两个地方肯定是互斥的。但是我一直没有找到是哪里做了处理。最后在connectionListenerInternal的一段代码中找到了答案。

if (socket._handle && socket._handle.isStreamBase && !socket._handle._consumed) {

parser._consumed = true;

socket._handle._consumed = true;

parser.consume(socket._handle);

}

因为tcp流是继承StreamBase类的,所以if成立(后面会具体分析)。我们看一下consume的实现。

static void Consume(const FunctionCallbackInfo<Value>& args) {

Parser* parser;

ASSIGN_OR_RETURN_UNWRAP(&parser, args.Holder());

CHECK(args[0]->IsObject());

StreamBase* stream = StreamBase::FromObjject(args[0].As<Object>());

CHECK_NOT_NULL(stream);

stream->PushStreamListener(parser);

}

http解析器把自己注册为tcp stream的一个listener。这里涉及到了c++层对流的设计。我们从头开始。看一下PushStreamListener做了什么事情。c++层中,流的操作由类StreamResource进行了封装。

class StreamResource {

public:

virtual ~StreamResource();

virtual int ReadStart() = 0;

virtual int ReadStop() = 0;

virtual int DoShutdown(ShutdownWrap* req_wrap) = 0;

virtual int DoTryWrite(uv_buf_t** bufs, size_t* count);

virtual int DoWrite(WriteWrap* w,

uv_buf_t* bufs,

size_t count,

uv_stream_t* send_handle) = 0;

void PushStreamListener(StreamListener* listener);

void RemoveStreamListener(StreamListener* listener);


protected:

uv_buf_t EmitAlloc(size_t suggested_size);

void EmitRead(ssize_t nread, const uv_buf_t& buf = uv_buf_init(nullptr, 0));


StreamListener* listener_ = nullptr;

uint64_t bytes_read_ = 0;

uint64_t bytes_written_ = 0;

friend class StreamListener;

};

我们看到StreamResource是一个基类,定义了操作流的公共方法。其中有一个成员是StreamListener类的实例。我们看看StreamListener的实现。

class StreamListener {

public:

virtual ~StreamListener();

virtual uv_buf_t OnStreamAlloc(size_t suggested_size) = 0;

virtual void OnStreamRead(ssize_t nread,

const uv_buf_t& buf) = 0;

virtual void OnStreamDestroy() {}

inline StreamResource* stream() { return stream_; }


protected:

void PassReadErrorToPreviousListener(ssize_t nread);


StreamResource* stream_ = nullptr;

StreamListener* previous_listener_ = nullptr;

friend class StreamResource;

};

StreamListener是一个负责消费流数据的类。StreamListener 和StreamResource类的关系如下。

null我们看到一个流可以注册多个listener,多个listener形成一个链表。接着我们看一下创建一个c++层的tcp对象是怎样的。下面是TCPWrap的继承关系。

class TCPWrap : public ConnectionWrap<TCPWrap, uv_tcp_t>{}

class ConnectionWrap : public LibuvStreamWrap{}

class LibuvStreamWrap : public HandleWrap, public StreamBase{}

class StreamBase : public StreamResource {}

我们看到tcp流是继承于StreamResource的。新建一个tcp的c++的对象时(tcp_wrap.cc),会不断往上调用父类的构造函数,其中在StreamBase中有一个关键的操作。

inline StreamBase::StreamBase(Environment* env) : env_(env) {

PushStreamListener(&default_listener_);

}


EmitToJSStreamListener default_listener_;

StreamBase会默认给流注册一个listener。我们看下EmitToJSStreamListener 具体的定义。

class ReportWritesToJSStreamListener : public StreamListener {

public:

void OnStreamAfterWrite(WriteWrap* w, int status) override;

void OnStreamAfterShutdown(ShutdownWrap* w, int status) override;


private:

void OnStreamAfterReqFinished(StreamReq* req_wrap, int status);

};


class EmitToJSStreamListener : public ReportWritesToJSStreamListener {

public:

uv_buf_t OnStreamAlloc(size_t suggested_size) override;

void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;

};

EmitToJSStreamListener继承StreamListener ,定义了分配内存和读取接收数据的函数。接着我们看一下PushStreamListener做了什么事情。

inline void StreamResource::PushStreamListener(StreamListener* listener) {

// 头插法

listener->previous_listener_ = listener_;

listener->stream_ = this;

listener_ = listener;

}

PushStreamListener就是构造出上图的结构。对应到创建一个c++层的tcp对象中,如下图。

然后我们看一下对于流来说,读取数据的整个链路。首先是js层调用readStart

function tryReadStart(socket) {

socket._handle.reading = true;

const err = socket._handle.readStart();

if (err)

socket.destroy(errnoException(err, 'read'));

}


// 注册等待读事件

Socket.prototype._read = function(n) {

tryReadStart(this);

};

我们看看readStart

int LibuvStreamWrap::ReadStart() {

return uv_read_start(stream(), [](uv_handle_t* handle,

size_t suggested_size,

uv_buf_t* buf) {

static_cast<LibuvStreamWrap*>(handle->data)->OnUvAlloc(suggested_size, buf);

}, [](uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {

static_cast<LibuvStreamWrap*>(stream->data)->OnUvRead(nread, buf);

});

}

ReadStart调用libuv的uv_read_start注册等待可读事件,并且注册了两个回调函数OnUvAlloc和OnUvRead。

void LibuvStreamWrap::OnUvRead(ssize_t nread, const uv_buf_t* buf) {

EmitRead(nread, *buf);

}


inline void StreamResource::EmitRead(ssize_t nread, const uv_buf_t& buf) {

// bytes_read_表示已读的字节数

if (nread > 0)

bytes_read_ += static_cast<uint64_t>(nread);

listener_->OnStreamRead(nread, buf);

}

通过层层调用最后会调用listener_的OnStreamRead。我们看看tcp的OnStreamRead

void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {

StreamBase* stream = static_cast<StreamBase*>(stream_);

Environment* env = stream->stream_env();

HandleScope handle_scope(env->isolate());

Context::Scope context_scope(env->context());

AllocatedBuffer buf(env, buf_);

stream->CallJSOnreadMethod(nread, buf.ToArrayBuffer());

}

继续回调CallJSOnreadMethod

MaybeLocal<Value> StreamBase::CallJSOnreadMethod(ssize_t nread,

Local<ArrayBuffer> ab,

size_t offset,

StreamBaseJSChecks checks) {

Environment* env = env_;

// ...

AsyncWrap* wrap = GetAsyncWrap();

CHECK_NOT_NULL(wrap);

Local<Value> onread = wrap->object()->GetInternalField(kOnReadFunctionField);

CHECK(onread->IsFunction());

return wrap->MakeCallback(onread.As<Function>(), arraysize(argv), argv);

}

CallJSOnreadMethod会回调js层的onread回调函数。onread会把数据push到流中,然后触发data事件。这是tcp里默认的数据读取过程。而文章开头讲到的parser.consume打破了这个默认行为。stream->PushStreamListener(parser);修改了tcp流的listener链,http parser把自己作为数据的接收者。所以这时候tcp流上的数据是直接由node_http_parser.cc的OnStreamRead消费的。而不是触发socket的data事件,最后通过在nodejs源码中加log,重新编译验证的确如文中所述。最后提一个这个过程中还有一个关键的地方是调用consume函数的前提是socket._handle.isStreamBase为true。isStreamBase是在StreamBase::AddMethods中定义为true的,而tcp对象创建的过程中,调用了这个方法,所以tcp的isStreamBase是true,才会执行consume,才会执行kOnExecute回调。

相关推荐

windows开启telnet服务,检测远程服务端口是否可以连通

本文介绍windwos开启telnet服务,telnet服务一般可以用于检测远程主机的某个端口服务是否可以连通,在日常的工作中,我们经常会遇到在本地的windows检测远程服务端口是否可以连通。win...

仅在Web登录新华三交换机条件下启用设备Telnet登录方式

概述Web登录新华三交换机可以在“网络-服务”页面中启用设备Telnet服务或SSH服务,也可以在“设备-管理员”设置管理员用户的可用服务,然而,在设备Web页面中,无法设置lineVTY用户线【l...

思科交换机,路由器如何关闭telnet 开启ssh服务

SSH为建立在应用层基础上的安全协议。SSH是目前较可靠,专为远程登录会话和其他网络服务提供安全性的协议。利用SSH协议可以有效防止远程管理过程中的信息泄露问题。今天我们就来说说思科交换机,路...

智能化弱电行业常用的DOS命令,掌握了你也能成为...

前言在做智能化弱电项目时,前端摄像头设备安装结束后,我们会对网络摄像头进行调试,调试过程中会遇到前端摄像头没有图像或者图像出来了画面卡顿的现象。我们会采用ping命令来测试网络的连通性和网络承载能力。...

「干货」eNSP模拟器之配置Telnet登录

配置说明:配置Telnet,使R2(模拟PC)通过SW1登录到R1进行管理和配置。操作步骤:system-view##进入系统视图[Huawei]sysnameR1##改名为R1[R1]int...

win11开启telnet服务怎么操作 win11打开telent指令是什么

telnet服务是我们在进行远程连接的时候,必须要打开的一项功能。但是有不少用户们不清楚在windows11系统中怎么开启telnet服务。今天小编就使用详细的图文教程,来给大家说明一下打开telen...

华三(H3C)交换机Telnet的远程登陆

一,配置交换机管理IP[SW1]vlan20//创建管理vlan[SW1]interfacevlan20//进入vlan接口[SW1-Vlanif20]ipaddress192.168....

win10 telnet命令怎么查看端口是否打开

可能大家也会遇到这个问题,win10telnet命令查看端口是否打开的步骤是什么?具体方法如下:1、键盘输入快捷键WIN+R,打开运行窗口。2、输入cmd,点击确定按钮。3、弹出cmd命令行窗...

Windows 7如何打开Telnet功能(win7系统打开telnet)

Windows7默认安装后是没有开启telnet客户端功能的,例如,我们在开始菜单中输入cmd,然后使用telnet命令,会弹出下图提示:‘telnet’不是内部或外部命令,也不是可运行程序或批处理文...

为锐捷路由器交换机开启web和telnet,实现轻松管理

笔者上一篇文章写了关于锐捷二层交换机配置教程,那么接下来讲一下锐捷的路由交换设备配置web、telnet技巧。同样,今天的教程也是基于命令行,比较简单,适合新手小白进行学习。准备工作配置前准备:con...

一文学会telnet命令的用途和使用方法

Telnet是一个古老的远程登录协议,可以让本地计算机获得远程计算机的工作能力。它采用了TCP的可靠连接方式,可以连接任何网络互通的远程计算机。不过由于它采用了明文传输方式,存在安全风险,目前已经很少...

Telnet命令是什么?如何使用?(telnet命令在哪里开启)

telnet命令是一个常用的远程登陆工具,使用它,我们可以快捷地登陆远程服务器进行操作。那么如何使用telnet命令呢?首先,我们需要打开telnet功能,任何电脑默认是关闭此功能的,开启方式如下:打...

win11系统如何开启telnet服务(拷贝版本)

  我们要知道,Telnet协议是Internet远程登陆服务的标准协议,可以使用户在本地计算机上完成远程主机的工作,不过对于一些刚接触win11中文版系统的用户来说,可能还不知道telnet服务在哪...

如何开启telnet客户端(如何开启telnet服务)

Telnet协议是TCP/IP协议家族中的一员,是Internet远程登陆服务的标准协议和主要方式,Telnet是常用的远程控制Web服务器的方法。工作中经常用到telnet客户端,但在windows...

Telnet 是什么,如何启用它?(telnet有什么用)

对于Internet等TCP/IP网络,Telnet是一个终端仿真程序。Telnet软件在您的系统上运行并将您的个人计算机链接到网络服务器。它将所有数据转换为纯文本这一事实被认为是易受...

取消回复欢迎 发表评论: