GCC Code Coverage Report
Directory: ./ Exec Total Coverage
File: stream_wrap.cc Lines: 178 184 96.7 %
Date: 2022-07-28 04:16:57 Branches: 51 76 67.1 %

Line Branch Exec Source
1
// Copyright Joyent, Inc. and other Node contributors.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a
4
// copy of this software and associated documentation files (the
5
// "Software"), to deal in the Software without restriction, including
6
// without limitation the rights to use, copy, modify, merge, publish,
7
// distribute, sublicense, and/or sell copies of the Software, and to permit
8
// persons to whom the Software is furnished to do so, subject to the
9
// following conditions:
10
//
11
// The above copyright notice and this permission notice shall be included
12
// in all copies or substantial portions of the Software.
13
//
14
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20
// USE OR OTHER DEALINGS IN THE SOFTWARE.
21
22
#include "stream_wrap.h"
23
#include "stream_base-inl.h"
24
25
#include "env-inl.h"
26
#include "handle_wrap.h"
27
#include "node_buffer.h"
28
#include "node_errors.h"
29
#include "node_external_reference.h"
30
#include "pipe_wrap.h"
31
#include "req_wrap-inl.h"
32
#include "tcp_wrap.h"
33
#include "udp_wrap.h"
34
#include "util-inl.h"
35
36
#include <cstring>  // memcpy()
37
#include <climits>  // INT_MAX
38
39
40
namespace node {
41
42
using errors::TryCatchScope;
43
using v8::Context;
44
using v8::DontDelete;
45
using v8::EscapableHandleScope;
46
using v8::FunctionCallbackInfo;
47
using v8::FunctionTemplate;
48
using v8::HandleScope;
49
using v8::JustVoid;
50
using v8::Local;
51
using v8::Maybe;
52
using v8::MaybeLocal;
53
using v8::Nothing;
54
using v8::Object;
55
using v8::PropertyAttribute;
56
using v8::ReadOnly;
57
using v8::Signature;
58
using v8::Value;
59
60
123155
void IsConstructCallCallback(const FunctionCallbackInfo<Value>& args) {
61
123155
  CHECK(args.IsConstructCall());
62
123155
  StreamReq::ResetObject(args.This());
63
123155
}
64
65
1303
void LibuvStreamWrap::Initialize(Local<Object> target,
66
                                 Local<Value> unused,
67
                                 Local<Context> context,
68
                                 void* priv) {
69
1303
  Environment* env = Environment::GetCurrent(context);
70
71
  Local<FunctionTemplate> sw =
72
1303
      FunctionTemplate::New(env->isolate(), IsConstructCallCallback);
73
2606
  sw->InstanceTemplate()->SetInternalFieldCount(StreamReq::kInternalFieldCount);
74
75
  // we need to set handle and callback to null,
76
  // so that those fields are created and functions
77
  // do not become megamorphic
78
  // Fields:
79
  // - oncomplete
80
  // - callback
81
  // - handle
82
5212
  sw->InstanceTemplate()->Set(
83
      env->oncomplete_string(),
84
      v8::Null(env->isolate()));
85
5212
  sw->InstanceTemplate()->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "callback"),
86
      v8::Null(env->isolate()));
87
5212
  sw->InstanceTemplate()->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "handle"),
88
      v8::Null(env->isolate()));
89
90
1303
  sw->Inherit(AsyncWrap::GetConstructorTemplate(env));
91
92
1303
  env->SetConstructorFunction(target, "ShutdownWrap", sw);
93
1303
  env->set_shutdown_wrap_template(sw->InstanceTemplate());
94
95
  Local<FunctionTemplate> ww =
96
1303
      FunctionTemplate::New(env->isolate(), IsConstructCallCallback);
97
2606
  ww->InstanceTemplate()->SetInternalFieldCount(
98
      StreamReq::kInternalFieldCount);
99
1303
  ww->Inherit(AsyncWrap::GetConstructorTemplate(env));
100
1303
  env->SetConstructorFunction(target, "WriteWrap", ww);
101
1303
  env->set_write_wrap_template(ww->InstanceTemplate());
102
103
3909
  NODE_DEFINE_CONSTANT(target, kReadBytesOrError);
104
3909
  NODE_DEFINE_CONSTANT(target, kArrayBufferOffset);
105
3909
  NODE_DEFINE_CONSTANT(target, kBytesWritten);
106
3909
  NODE_DEFINE_CONSTANT(target, kLastWriteWasAsync);
107
1303
  target->Set(context, FIXED_ONE_BYTE_STRING(env->isolate(), "streamBaseState"),
108
3909
              env->stream_base_state().GetJSArray()).Check();
109
1303
}
110
111
5287
void LibuvStreamWrap::RegisterExternalReferences(
112
    ExternalReferenceRegistry* registry) {
113
5287
  registry->Register(IsConstructCallCallback);
114
5287
  registry->Register(GetWriteQueueSize);
115
5287
  registry->Register(SetBlocking);
116
5287
  StreamBase::RegisterExternalReferences(registry);
117
5287
}
118
119
18825
LibuvStreamWrap::LibuvStreamWrap(Environment* env,
120
                                 Local<Object> object,
121
                                 uv_stream_t* stream,
122
18825
                                 AsyncWrap::ProviderType provider)
123
    : HandleWrap(env,
124
                 object,
125
                 reinterpret_cast<uv_handle_t*>(stream),
126
                 provider),
127
      StreamBase(env),
128
18825
      stream_(stream) {
129
18825
  StreamBase::AttachToObject(object);
130
18825
}
131
132
133
18342
Local<FunctionTemplate> LibuvStreamWrap::GetConstructorTemplate(
134
    Environment* env) {
135
18342
  Local<FunctionTemplate> tmpl = env->libuv_stream_wrap_ctor_template();
136
18342
  if (tmpl.IsEmpty()) {
137
6546
    tmpl = env->NewFunctionTemplate(nullptr);
138
6546
    tmpl->SetClassName(
139
        FIXED_ONE_BYTE_STRING(env->isolate(), "LibuvStreamWrap"));
140
6546
    tmpl->Inherit(HandleWrap::GetConstructorTemplate(env));
141
13092
    tmpl->InstanceTemplate()->SetInternalFieldCount(
142
        StreamBase::kInternalFieldCount);
143
    Local<FunctionTemplate> get_write_queue_size =
144
        FunctionTemplate::New(env->isolate(),
145
                              GetWriteQueueSize,
146
                              Local<Value>(),
147
6546
                              Signature::New(env->isolate(), tmpl));
148
26184
    tmpl->PrototypeTemplate()->SetAccessorProperty(
149
        env->write_queue_size_string(),
150
        get_write_queue_size,
151
        Local<FunctionTemplate>(),
152
        static_cast<PropertyAttribute>(ReadOnly | DontDelete));
153
6546
    env->SetProtoMethod(tmpl, "setBlocking", SetBlocking);
154
6546
    StreamBase::AddMethods(env, tmpl);
155
6546
    env->set_libuv_stream_wrap_ctor_template(tmpl);
156
  }
157
18342
  return tmpl;
158
}
159
160
161
3979
LibuvStreamWrap* LibuvStreamWrap::From(Environment* env, Local<Object> object) {
162
3979
  Local<FunctionTemplate> sw = env->libuv_stream_wrap_ctor_template();
163

7958
  CHECK(!sw.IsEmpty() && sw->HasInstance(object));
164
3979
  return Unwrap<LibuvStreamWrap>(object);
165
}
166
167
168
120
int LibuvStreamWrap::GetFD() {
169
#ifdef _WIN32
170
  return fd_;
171
#else
172
120
  int fd = -1;
173
120
  if (stream() != nullptr)
174
120
    uv_fileno(reinterpret_cast<uv_handle_t*>(stream()), &fd);
175
120
  return fd;
176
#endif
177
}
178
179
180
121135
bool LibuvStreamWrap::IsAlive() {
181
121135
  return HandleWrap::IsAlive(this);
182
}
183
184
185
3
bool LibuvStreamWrap::IsClosing() {
186
3
  return uv_is_closing(reinterpret_cast<uv_handle_t*>(stream()));
187
}
188
189
190
163790
AsyncWrap* LibuvStreamWrap::GetAsyncWrap() {
191
163790
  return static_cast<AsyncWrap*>(this);
192
}
193
194
195
28225
bool LibuvStreamWrap::IsIPCPipe() {
196
28225
  return is_named_pipe_ipc();
197
}
198
199
21883
int LibuvStreamWrap::ReadStart() {
200
43766
  return uv_read_start(
201
      stream(),
202
72402
      [](uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
203
72402
        static_cast<LibuvStreamWrap*>(handle->data)
204
72402
            ->OnUvAlloc(suggested_size, buf);
205
72402
      },
206
72619
      [](uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
207
72619
        LibuvStreamWrap* wrap = static_cast<LibuvStreamWrap*>(stream->data);
208
145118
        TryCatchScope try_catch(wrap->env());
209
72619
        try_catch.SetVerbose(true);
210
72619
        wrap->OnUvRead(nread, buf);
211
21883
      });
212
}
213
214
215
9277
int LibuvStreamWrap::ReadStop() {
216
9277
  return uv_read_stop(stream());
217
}
218
219
220
72402
void LibuvStreamWrap::OnUvAlloc(size_t suggested_size, uv_buf_t* buf) {
221
144804
  HandleScope scope(env()->isolate());
222
72402
  Context::Scope context_scope(env()->context());
223
224
72402
  *buf = EmitAlloc(suggested_size);
225
72402
}
226
227
template <class WrapType>
228
346
static MaybeLocal<Object> AcceptHandle(Environment* env,
229
                                       LibuvStreamWrap* parent) {
230
  static_assert(std::is_base_of<LibuvStreamWrap, WrapType>::value ||
231
                std::is_base_of<UDPWrap, WrapType>::value,
232
                "Can only accept stream handles");
233
234
346
  EscapableHandleScope scope(env->isolate());
235
  Local<Object> wrap_obj;
236
237
692
  if (!WrapType::Instantiate(env, parent, WrapType::SOCKET).ToLocal(&wrap_obj))
238
    return Local<Object>();
239
240
346
  HandleWrap* wrap = Unwrap<HandleWrap>(wrap_obj);
241
346
  CHECK_NOT_NULL(wrap);
242
346
  uv_stream_t* stream = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
243
346
  CHECK_NOT_NULL(stream);
244
245
346
  if (uv_accept(parent->stream(), stream))
246
    ABORT();
247
248
346
  return scope.Escape(wrap_obj);
249
}
250
251
72619
Maybe<void> LibuvStreamWrap::OnUvRead(ssize_t nread, const uv_buf_t* buf) {
252
145118
  HandleScope scope(env()->isolate());
253
72619
  Context::Scope context_scope(env()->context());
254
72619
  uv_handle_type type = UV_UNKNOWN_HANDLE;
255
256

76665
  if (is_named_pipe_ipc() &&
257
4046
      uv_pipe_pending_count(reinterpret_cast<uv_pipe_t*>(stream())) > 0) {
258
173
    type = uv_pipe_pending_type(reinterpret_cast<uv_pipe_t*>(stream()));
259
  }
260
261
  // We should not be getting this callback if someone has already called
262
  // uv_close() on the handle.
263
72619
  CHECK_EQ(persistent().IsEmpty(), false);
264
265
72619
  if (nread > 0) {
266
    MaybeLocal<Object> pending_obj;
267
268
66692
    if (type == UV_TCP) {
269
129
      pending_obj = AcceptHandle<TCPWrap>(env(), this);
270
66563
    } else if (type == UV_NAMED_PIPE) {
271
11
      pending_obj = AcceptHandle<PipeWrap>(env(), this);
272
66552
    } else if (type == UV_UDP) {
273
33
      pending_obj = AcceptHandle<UDPWrap>(env(), this);
274
    } else {
275
66519
      CHECK_EQ(type, UV_UNKNOWN_HANDLE);
276
    }
277
278
    Local<Object> local_pending_obj;
279
66865
    if (type != UV_UNKNOWN_HANDLE &&
280
173
        (!pending_obj.ToLocal(&local_pending_obj) ||
281
173
         object()
282
66692
             ->Set(env()->context(),
283
                   env()->pending_handle_string(),
284
67038
                   local_pending_obj)
285
173
             .IsNothing())) {
286
      return Nothing<void>();
287
    }
288
  }
289
290
72619
  EmitRead(nread, *buf);
291
72499
  return JustVoid();
292
}
293
294
5489
void LibuvStreamWrap::GetWriteQueueSize(
295
    const FunctionCallbackInfo<Value>& info) {
296
  LibuvStreamWrap* wrap;
297
5489
  ASSIGN_OR_RETURN_UNWRAP(&wrap, info.This());
298
299
5489
  if (wrap->stream() == nullptr) {
300
    info.GetReturnValue().Set(0);
301
    return;
302
  }
303
304
5489
  uint32_t write_queue_size = wrap->stream()->write_queue_size;
305
10978
  info.GetReturnValue().Set(write_queue_size);
306
}
307
308
309
32
void LibuvStreamWrap::SetBlocking(const FunctionCallbackInfo<Value>& args) {
310
  LibuvStreamWrap* wrap;
311
32
  ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
312
313
32
  CHECK_GT(args.Length(), 0);
314
32
  if (!wrap->IsAlive())
315
    return args.GetReturnValue().Set(UV_EINVAL);
316
317
32
  bool enable = args[0]->IsTrue();
318
64
  args.GetReturnValue().Set(uv_stream_set_blocking(wrap->stream(), enable));
319
}
320
321
typedef SimpleShutdownWrap<ReqWrap<uv_shutdown_t>> LibuvShutdownWrap;
322
typedef SimpleWriteWrap<ReqWrap<uv_write_t>> LibuvWriteWrap;
323
324
7417
ShutdownWrap* LibuvStreamWrap::CreateShutdownWrap(Local<Object> object) {
325
7417
  return new LibuvShutdownWrap(this, object);
326
}
327
328
260
WriteWrap* LibuvStreamWrap::CreateWriteWrap(Local<Object> object) {
329
260
  return new LibuvWriteWrap(this, object);
330
}
331
332
333
7417
int LibuvStreamWrap::DoShutdown(ShutdownWrap* req_wrap_) {
334
7417
  LibuvShutdownWrap* req_wrap = static_cast<LibuvShutdownWrap*>(req_wrap_);
335
7417
  return req_wrap->Dispatch(uv_shutdown, stream(), AfterUvShutdown);
336
}
337
338
339
7414
void LibuvStreamWrap::AfterUvShutdown(uv_shutdown_t* req, int status) {
340
7414
  LibuvShutdownWrap* req_wrap = static_cast<LibuvShutdownWrap*>(
341
7414
      LibuvShutdownWrap::from_req(req));
342
7414
  CHECK_NOT_NULL(req_wrap);
343
14828
  HandleScope scope(req_wrap->env()->isolate());
344
7414
  Context::Scope context_scope(req_wrap->env()->context());
345
7414
  req_wrap->Done(status);
346
7414
}
347
348
349
// NOTE: Call to this function could change both `buf`'s and `count`'s
350
// values, shifting their base and decrementing their length. This is
351
// required in order to skip the data that was successfully written via
352
// uv_try_write().
353
121551
int LibuvStreamWrap::DoTryWrite(uv_buf_t** bufs, size_t* count) {
354
  int err;
355
  size_t written;
356
121551
  uv_buf_t* vbufs = *bufs;
357
121551
  size_t vcount = *count;
358
359
121551
  err = uv_try_write(stream(), vbufs, vcount);
360

121551
  if (err == UV_ENOSYS || err == UV_EAGAIN)
361
5
    return 0;
362
121546
  if (err < 0)
363
2472
    return err;
364
365
  // Slice off the buffers: skip all written buffers and slice the one that
366
  // was partially written.
367
119074
  written = err;
368
315928
  for (; vcount > 0; vbufs++, vcount--) {
369
    // Slice
370
196990
    if (vbufs[0].len > written) {
371
136
      vbufs[0].base += written;
372
136
      vbufs[0].len -= written;
373
136
      written = 0;
374
136
      break;
375
376
    // Discard
377
    } else {
378
196854
      written -= vbufs[0].len;
379
    }
380
  }
381
382
119074
  *bufs = vbufs;
383
119074
  *count = vcount;
384
385
119074
  return 0;
386
}
387
388
389
260
int LibuvStreamWrap::DoWrite(WriteWrap* req_wrap,
390
                             uv_buf_t* bufs,
391
                             size_t count,
392
                             uv_stream_t* send_handle) {
393
260
  LibuvWriteWrap* w = static_cast<LibuvWriteWrap*>(req_wrap);
394
260
  return w->Dispatch(uv_write2,
395
                     stream(),
396
                     bufs,
397
                     count,
398
                     send_handle,
399
260
                     AfterUvWrite);
400
}
401
402
403
404
258
void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) {
405
258
  LibuvWriteWrap* req_wrap = static_cast<LibuvWriteWrap*>(
406
258
      LibuvWriteWrap::from_req(req));
407
258
  CHECK_NOT_NULL(req_wrap);
408
515
  HandleScope scope(req_wrap->env()->isolate());
409
258
  Context::Scope context_scope(req_wrap->env()->context());
410
258
  req_wrap->Done(status);
411
257
}
412
413
}  // namespace node
414
415
5355
NODE_MODULE_CONTEXT_AWARE_INTERNAL(stream_wrap,
416
                                   node::LibuvStreamWrap::Initialize)
417
5287
NODE_MODULE_EXTERNAL_REFERENCE(
418
    stream_wrap, node::LibuvStreamWrap::RegisterExternalReferences)