GCC Code Coverage Report
Directory: ./ Exec Total Coverage
File: stream_wrap.cc Lines: 168 174 96.6 %
Date: 2022-05-21 04:15:56 Branches: 50 74 67.6 %

Line Branch Exec Source
1
// Copyright Joyent, Inc. and other Node contributors.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a
4
// copy of this software and associated documentation files (the
5
// "Software"), to deal in the Software without restriction, including
6
// without limitation the rights to use, copy, modify, merge, publish,
7
// distribute, sublicense, and/or sell copies of the Software, and to permit
8
// persons to whom the Software is furnished to do so, subject to the
9
// following conditions:
10
//
11
// The above copyright notice and this permission notice shall be included
12
// in all copies or substantial portions of the Software.
13
//
14
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20
// USE OR OTHER DEALINGS IN THE SOFTWARE.
21
22
#include "stream_wrap.h"
23
#include "stream_base-inl.h"
24
25
#include "env-inl.h"
26
#include "handle_wrap.h"
27
#include "node_buffer.h"
28
#include "node_external_reference.h"
29
#include "pipe_wrap.h"
30
#include "req_wrap-inl.h"
31
#include "tcp_wrap.h"
32
#include "udp_wrap.h"
33
#include "util-inl.h"
34
35
#include <cstring>  // memcpy()
36
#include <climits>  // INT_MAX
37
38
39
namespace node {
40
41
using v8::Context;
42
using v8::DontDelete;
43
using v8::EscapableHandleScope;
44
using v8::FunctionCallbackInfo;
45
using v8::FunctionTemplate;
46
using v8::HandleScope;
47
using v8::Local;
48
using v8::MaybeLocal;
49
using v8::Object;
50
using v8::PropertyAttribute;
51
using v8::ReadOnly;
52
using v8::Signature;
53
using v8::Value;
54
55
124657
void IsConstructCallCallback(const FunctionCallbackInfo<Value>& args) {
56
124657
  CHECK(args.IsConstructCall());
57
124657
  StreamReq::ResetObject(args.This());
58
124657
}
59
60
856
void LibuvStreamWrap::Initialize(Local<Object> target,
61
                                 Local<Value> unused,
62
                                 Local<Context> context,
63
                                 void* priv) {
64
856
  Environment* env = Environment::GetCurrent(context);
65
66
  Local<FunctionTemplate> sw =
67
856
      FunctionTemplate::New(env->isolate(), IsConstructCallCallback);
68
1712
  sw->InstanceTemplate()->SetInternalFieldCount(StreamReq::kInternalFieldCount);
69
70
  // we need to set handle and callback to null,
71
  // so that those fields are created and functions
72
  // do not become megamorphic
73
  // Fields:
74
  // - oncomplete
75
  // - callback
76
  // - handle
77
3424
  sw->InstanceTemplate()->Set(
78
      env->oncomplete_string(),
79
      v8::Null(env->isolate()));
80
3424
  sw->InstanceTemplate()->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "callback"),
81
      v8::Null(env->isolate()));
82
3424
  sw->InstanceTemplate()->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "handle"),
83
      v8::Null(env->isolate()));
84
85
856
  sw->Inherit(AsyncWrap::GetConstructorTemplate(env));
86
87
856
  env->SetConstructorFunction(target, "ShutdownWrap", sw);
88
856
  env->set_shutdown_wrap_template(sw->InstanceTemplate());
89
90
  Local<FunctionTemplate> ww =
91
856
      FunctionTemplate::New(env->isolate(), IsConstructCallCallback);
92
1712
  ww->InstanceTemplate()->SetInternalFieldCount(
93
      StreamReq::kInternalFieldCount);
94
856
  ww->Inherit(AsyncWrap::GetConstructorTemplate(env));
95
856
  env->SetConstructorFunction(target, "WriteWrap", ww);
96
856
  env->set_write_wrap_template(ww->InstanceTemplate());
97
98
2568
  NODE_DEFINE_CONSTANT(target, kReadBytesOrError);
99
2568
  NODE_DEFINE_CONSTANT(target, kArrayBufferOffset);
100
2568
  NODE_DEFINE_CONSTANT(target, kBytesWritten);
101
2568
  NODE_DEFINE_CONSTANT(target, kLastWriteWasAsync);
102
856
  target->Set(context, FIXED_ONE_BYTE_STRING(env->isolate(), "streamBaseState"),
103
2568
              env->stream_base_state().GetJSArray()).Check();
104
856
}
105
106
5184
void LibuvStreamWrap::RegisterExternalReferences(
107
    ExternalReferenceRegistry* registry) {
108
5184
  registry->Register(IsConstructCallCallback);
109
5184
  registry->Register(GetWriteQueueSize);
110
5184
  registry->Register(SetBlocking);
111
  // TODO(joyee): StreamBase::RegisterExternalReferences() is called somewhere
112
  // else but we may want to do it here too and guard it with a static flag.
113
5184
}
114
115
18869
LibuvStreamWrap::LibuvStreamWrap(Environment* env,
116
                                 Local<Object> object,
117
                                 uv_stream_t* stream,
118
18869
                                 AsyncWrap::ProviderType provider)
119
    : HandleWrap(env,
120
                 object,
121
                 reinterpret_cast<uv_handle_t*>(stream),
122
                 provider),
123
      StreamBase(env),
124
18869
      stream_(stream) {
125
18869
  StreamBase::AttachToObject(object);
126
18869
}
127
128
129
16730
Local<FunctionTemplate> LibuvStreamWrap::GetConstructorTemplate(
130
    Environment* env) {
131
16730
  Local<FunctionTemplate> tmpl = env->libuv_stream_wrap_ctor_template();
132
16730
  if (tmpl.IsEmpty()) {
133
5997
    tmpl = env->NewFunctionTemplate(nullptr);
134
5997
    tmpl->SetClassName(
135
        FIXED_ONE_BYTE_STRING(env->isolate(), "LibuvStreamWrap"));
136
5997
    tmpl->Inherit(HandleWrap::GetConstructorTemplate(env));
137
11994
    tmpl->InstanceTemplate()->SetInternalFieldCount(
138
        StreamBase::kInternalFieldCount);
139
    Local<FunctionTemplate> get_write_queue_size =
140
        FunctionTemplate::New(env->isolate(),
141
                              GetWriteQueueSize,
142
                              Local<Value>(),
143
5997
                              Signature::New(env->isolate(), tmpl));
144
23988
    tmpl->PrototypeTemplate()->SetAccessorProperty(
145
        env->write_queue_size_string(),
146
        get_write_queue_size,
147
        Local<FunctionTemplate>(),
148
        static_cast<PropertyAttribute>(ReadOnly | DontDelete));
149
5997
    env->SetProtoMethod(tmpl, "setBlocking", SetBlocking);
150
5997
    StreamBase::AddMethods(env, tmpl);
151
5997
    env->set_libuv_stream_wrap_ctor_template(tmpl);
152
  }
153
16730
  return tmpl;
154
}
155
156
157
3903
LibuvStreamWrap* LibuvStreamWrap::From(Environment* env, Local<Object> object) {
158
3903
  Local<FunctionTemplate> sw = env->libuv_stream_wrap_ctor_template();
159

11709
  CHECK(!sw.IsEmpty() && sw->HasInstance(object));
160
3903
  return Unwrap<LibuvStreamWrap>(object);
161
}
162
163
164
120
int LibuvStreamWrap::GetFD() {
165
#ifdef _WIN32
166
  return fd_;
167
#else
168
120
  int fd = -1;
169
120
  if (stream() != nullptr)
170
120
    uv_fileno(reinterpret_cast<uv_handle_t*>(stream()), &fd);
171
120
  return fd;
172
#endif
173
}
174
175
176
122518
bool LibuvStreamWrap::IsAlive() {
177
122518
  return HandleWrap::IsAlive(this);
178
}
179
180
181
3
bool LibuvStreamWrap::IsClosing() {
182
3
  return uv_is_closing(reinterpret_cast<uv_handle_t*>(stream()));
183
}
184
185
186
171033
AsyncWrap* LibuvStreamWrap::GetAsyncWrap() {
187
171033
  return static_cast<AsyncWrap*>(this);
188
}
189
190
191
26800
bool LibuvStreamWrap::IsIPCPipe() {
192
26800
  return is_named_pipe_ipc();
193
}
194
195
196
22033
int LibuvStreamWrap::ReadStart() {
197
44066
  return uv_read_start(stream(), [](uv_handle_t* handle,
198
                                    size_t suggested_size,
199
73226
                                    uv_buf_t* buf) {
200
73226
    static_cast<LibuvStreamWrap*>(handle->data)->OnUvAlloc(suggested_size, buf);
201
146675
  }, [](uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
202
73449
    static_cast<LibuvStreamWrap*>(stream->data)->OnUvRead(nread, buf);
203
22033
  });
204
}
205
206
207
9254
int LibuvStreamWrap::ReadStop() {
208
9254
  return uv_read_stop(stream());
209
}
210
211
212
73226
void LibuvStreamWrap::OnUvAlloc(size_t suggested_size, uv_buf_t* buf) {
213
146452
  HandleScope scope(env()->isolate());
214
73226
  Context::Scope context_scope(env()->context());
215
216
73226
  *buf = EmitAlloc(suggested_size);
217
73226
}
218
219
template <class WrapType>
220
314
static MaybeLocal<Object> AcceptHandle(Environment* env,
221
                                       LibuvStreamWrap* parent) {
222
  static_assert(std::is_base_of<LibuvStreamWrap, WrapType>::value ||
223
                std::is_base_of<UDPWrap, WrapType>::value,
224
                "Can only accept stream handles");
225
226
314
  EscapableHandleScope scope(env->isolate());
227
  Local<Object> wrap_obj;
228
229
628
  if (!WrapType::Instantiate(env, parent, WrapType::SOCKET).ToLocal(&wrap_obj))
230
    return Local<Object>();
231
232
314
  HandleWrap* wrap = Unwrap<HandleWrap>(wrap_obj);
233
314
  CHECK_NOT_NULL(wrap);
234
314
  uv_stream_t* stream = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
235
314
  CHECK_NOT_NULL(stream);
236
237
314
  if (uv_accept(parent->stream(), stream))
238
    ABORT();
239
240
314
  return scope.Escape(wrap_obj);
241
}
242
243
244
73449
void LibuvStreamWrap::OnUvRead(ssize_t nread, const uv_buf_t* buf) {
245
73449
  HandleScope scope(env()->isolate());
246
73449
  Context::Scope context_scope(env()->context());
247
73449
  uv_handle_type type = UV_UNKNOWN_HANDLE;
248
249

77523
  if (is_named_pipe_ipc() &&
250
4074
      uv_pipe_pending_count(reinterpret_cast<uv_pipe_t*>(stream())) > 0) {
251
157
    type = uv_pipe_pending_type(reinterpret_cast<uv_pipe_t*>(stream()));
252
  }
253
254
  // We should not be getting this callback if someone has already called
255
  // uv_close() on the handle.
256
73449
  CHECK_EQ(persistent().IsEmpty(), false);
257
258
73449
  if (nread > 0) {
259
    MaybeLocal<Object> pending_obj;
260
261
66545
    if (type == UV_TCP) {
262
127
      pending_obj = AcceptHandle<TCPWrap>(env(), this);
263
66418
    } else if (type == UV_NAMED_PIPE) {
264
1
      pending_obj = AcceptHandle<PipeWrap>(env(), this);
265
66417
    } else if (type == UV_UDP) {
266
29
      pending_obj = AcceptHandle<UDPWrap>(env(), this);
267
    } else {
268
66388
      CHECK_EQ(type, UV_UNKNOWN_HANDLE);
269
    }
270
271
    Local<Object> local_pending_obj;
272
66702
    if (pending_obj.ToLocal(&local_pending_obj) &&
273
157
          object()->Set(env()->context(),
274
                        env()->pending_handle_string(),
275

67016
                        local_pending_obj).IsNothing()) {
276
      return;
277
    }
278
  }
279
280
73449
  EmitRead(nread, *buf);
281
}
282
283
284
5402
void LibuvStreamWrap::GetWriteQueueSize(
285
    const FunctionCallbackInfo<Value>& info) {
286
  LibuvStreamWrap* wrap;
287
5402
  ASSIGN_OR_RETURN_UNWRAP(&wrap, info.This());
288
289
5402
  if (wrap->stream() == nullptr) {
290
    info.GetReturnValue().Set(0);
291
    return;
292
  }
293
294
5402
  uint32_t write_queue_size = wrap->stream()->write_queue_size;
295
10804
  info.GetReturnValue().Set(write_queue_size);
296
}
297
298
299
32
void LibuvStreamWrap::SetBlocking(const FunctionCallbackInfo<Value>& args) {
300
  LibuvStreamWrap* wrap;
301
32
  ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
302
303
32
  CHECK_GT(args.Length(), 0);
304
32
  if (!wrap->IsAlive())
305
    return args.GetReturnValue().Set(UV_EINVAL);
306
307
32
  bool enable = args[0]->IsTrue();
308
64
  args.GetReturnValue().Set(uv_stream_set_blocking(wrap->stream(), enable));
309
}
310
311
typedef SimpleShutdownWrap<ReqWrap<uv_shutdown_t>> LibuvShutdownWrap;
312
typedef SimpleWriteWrap<ReqWrap<uv_write_t>> LibuvWriteWrap;
313
314
10002
ShutdownWrap* LibuvStreamWrap::CreateShutdownWrap(Local<Object> object) {
315
10002
  return new LibuvShutdownWrap(this, object);
316
}
317
318
245
WriteWrap* LibuvStreamWrap::CreateWriteWrap(Local<Object> object) {
319
245
  return new LibuvWriteWrap(this, object);
320
}
321
322
323
10002
int LibuvStreamWrap::DoShutdown(ShutdownWrap* req_wrap_) {
324
10002
  LibuvShutdownWrap* req_wrap = static_cast<LibuvShutdownWrap*>(req_wrap_);
325
10002
  return req_wrap->Dispatch(uv_shutdown, stream(), AfterUvShutdown);
326
}
327
328
329
9998
void LibuvStreamWrap::AfterUvShutdown(uv_shutdown_t* req, int status) {
330
9998
  LibuvShutdownWrap* req_wrap = static_cast<LibuvShutdownWrap*>(
331
9998
      LibuvShutdownWrap::from_req(req));
332
9998
  CHECK_NOT_NULL(req_wrap);
333
19996
  HandleScope scope(req_wrap->env()->isolate());
334
9998
  Context::Scope context_scope(req_wrap->env()->context());
335
9998
  req_wrap->Done(status);
336
9998
}
337
338
339
// NOTE: Call to this function could change both `buf`'s and `count`'s
340
// values, shifting their base and decrementing their length. This is
341
// required in order to skip the data that was successfully written via
342
// uv_try_write().
343
120283
int LibuvStreamWrap::DoTryWrite(uv_buf_t** bufs, size_t* count) {
344
  int err;
345
  size_t written;
346
120283
  uv_buf_t* vbufs = *bufs;
347
120283
  size_t vcount = *count;
348
349
120283
  err = uv_try_write(stream(), vbufs, vcount);
350

120283
  if (err == UV_ENOSYS || err == UV_EAGAIN)
351
5
    return 0;
352
120278
  if (err < 0)
353
2414
    return err;
354
355
  // Slice off the buffers: skip all written buffers and slice the one that
356
  // was partially written.
357
117864
  written = err;
358
314045
  for (; vcount > 0; vbufs++, vcount--) {
359
    // Slice
360
196318
    if (vbufs[0].len > written) {
361
137
      vbufs[0].base += written;
362
137
      vbufs[0].len -= written;
363
137
      written = 0;
364
137
      break;
365
366
    // Discard
367
    } else {
368
196181
      written -= vbufs[0].len;
369
    }
370
  }
371
372
117864
  *bufs = vbufs;
373
117864
  *count = vcount;
374
375
117864
  return 0;
376
}
377
378
379
245
int LibuvStreamWrap::DoWrite(WriteWrap* req_wrap,
380
                             uv_buf_t* bufs,
381
                             size_t count,
382
                             uv_stream_t* send_handle) {
383
245
  LibuvWriteWrap* w = static_cast<LibuvWriteWrap*>(req_wrap);
384
245
  return w->Dispatch(uv_write2,
385
                     stream(),
386
                     bufs,
387
                     count,
388
                     send_handle,
389
245
                     AfterUvWrite);
390
}
391
392
393
394
244
void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) {
395
244
  LibuvWriteWrap* req_wrap = static_cast<LibuvWriteWrap*>(
396
244
      LibuvWriteWrap::from_req(req));
397
244
  CHECK_NOT_NULL(req_wrap);
398
487
  HandleScope scope(req_wrap->env()->isolate());
399
244
  Context::Scope context_scope(req_wrap->env()->context());
400
244
  req_wrap->Done(status);
401
243
}
402
403
}  // namespace node
404
405
5252
NODE_MODULE_CONTEXT_AWARE_INTERNAL(stream_wrap,
406
                                   node::LibuvStreamWrap::Initialize)
407
5184
NODE_MODULE_EXTERNAL_REFERENCE(
408
    stream_wrap, node::LibuvStreamWrap::RegisterExternalReferences)