GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage/nodes/benchmark/out/../src/stream_wrap.cc Lines: 172 177 97.2 %
Date: 2017-12-18 Branches: 64 88 72.7 %

Line Branch Exec Source
1
// Copyright Joyent, Inc. and other Node contributors.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a
4
// copy of this software and associated documentation files (the
5
// "Software"), to deal in the Software without restriction, including
6
// without limitation the rights to use, copy, modify, merge, publish,
7
// distribute, sublicense, and/or sell copies of the Software, and to permit
8
// persons to whom the Software is furnished to do so, subject to the
9
// following conditions:
10
//
11
// The above copyright notice and this permission notice shall be included
12
// in all copies or substantial portions of the Software.
13
//
14
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20
// USE OR OTHER DEALINGS IN THE SOFTWARE.
21
22
#include "stream_wrap.h"
23
#include "stream_base-inl.h"
24
25
#include "env-inl.h"
26
#include "handle_wrap.h"
27
#include "node_buffer.h"
28
#include "node_counters.h"
29
#include "pipe_wrap.h"
30
#include "req_wrap-inl.h"
31
#include "tcp_wrap.h"
32
#include "udp_wrap.h"
33
#include "util-inl.h"
34
35
#include <stdlib.h>  // abort()
36
#include <string.h>  // memcpy()
37
#include <limits.h>  // INT_MAX
38
39
40
namespace node {
41
42
using v8::Context;
43
using v8::DontDelete;
44
using v8::EscapableHandleScope;
45
using v8::FunctionCallbackInfo;
46
using v8::FunctionTemplate;
47
using v8::HandleScope;
48
using v8::Local;
49
using v8::Object;
50
using v8::ReadOnly;
51
using v8::Signature;
52
using v8::Value;
53
54
55
3297
void LibuvStreamWrap::Initialize(Local<Object> target,
56
                                 Local<Value> unused,
57
                                 Local<Context> context) {
58
3297
  Environment* env = Environment::GetCurrent(context);
59
60
  auto is_construct_call_callback =
61
171644
      [](const FunctionCallbackInfo<Value>& args) {
62
82525
    CHECK(args.IsConstructCall());
63
82525
    ClearWrap(args.This());
64
171644
  };
65
  Local<FunctionTemplate> sw =
66
3297
      FunctionTemplate::New(env->isolate(), is_construct_call_callback);
67
6594
  sw->InstanceTemplate()->SetInternalFieldCount(1);
68
  Local<String> wrapString =
69
3297
      FIXED_ONE_BYTE_STRING(env->isolate(), "ShutdownWrap");
70
3297
  sw->SetClassName(wrapString);
71
3297
  AsyncWrap::AddWrapMethods(env, sw);
72
6594
  target->Set(wrapString, sw->GetFunction());
73
74
  Local<FunctionTemplate> ww =
75
3297
      FunctionTemplate::New(env->isolate(), is_construct_call_callback);
76
6594
  ww->InstanceTemplate()->SetInternalFieldCount(1);
77
  Local<String> writeWrapString =
78
3297
      FIXED_ONE_BYTE_STRING(env->isolate(), "WriteWrap");
79
3297
  ww->SetClassName(writeWrapString);
80
3297
  AsyncWrap::AddWrapMethods(env, ww);
81
6594
  target->Set(writeWrapString, ww->GetFunction());
82
3297
  env->set_write_wrap_constructor_function(ww->GetFunction());
83
3297
}
84
85
86
10601
LibuvStreamWrap::LibuvStreamWrap(Environment* env,
87
                                 Local<Object> object,
88
                                 uv_stream_t* stream,
89
                                 AsyncWrap::ProviderType provider)
90
    : HandleWrap(env,
91
                 object,
92
                 reinterpret_cast<uv_handle_t*>(stream),
93
                 provider),
94
      StreamBase(env),
95
10601
      stream_(stream) {
96
10601
  set_alloc_cb({ OnAllocImpl, this });
97
10601
  set_read_cb({ OnReadImpl, this });
98
10601
}
99
100
101
9942
void LibuvStreamWrap::AddMethods(Environment* env,
102
                                 v8::Local<v8::FunctionTemplate> target,
103
                                 int flags) {
104
  Local<FunctionTemplate> get_write_queue_size =
105
      FunctionTemplate::New(env->isolate(),
106
                            GetWriteQueueSize,
107
                            env->as_external(),
108
19884
                            Signature::New(env->isolate(), target));
109
29826
  target->PrototypeTemplate()->SetAccessorProperty(
110
      env->write_queue_size_string(),
111
      get_write_queue_size,
112
      Local<FunctionTemplate>(),
113
29826
      static_cast<PropertyAttribute>(ReadOnly | DontDelete));
114
9942
  env->SetProtoMethod(target, "setBlocking", SetBlocking);
115
9942
  StreamBase::AddMethods<LibuvStreamWrap>(env, target, flags);
116
9942
}
117
118
119
96
int LibuvStreamWrap::GetFD() {
120
96
  int fd = -1;
121
#if !defined(_WIN32)
122
96
  if (stream() != nullptr)
123
96
    uv_fileno(reinterpret_cast<uv_handle_t*>(stream()), &fd);
124
#endif
125
96
  return fd;
126
}
127
128
129
83143
bool LibuvStreamWrap::IsAlive() {
130
83143
  return HandleWrap::IsAlive(this);
131
}
132
133
134
3
bool LibuvStreamWrap::IsClosing() {
135
3
  return uv_is_closing(reinterpret_cast<uv_handle_t*>(stream()));
136
}
137
138
139
54411
AsyncWrap* LibuvStreamWrap::GetAsyncWrap() {
140
54411
  return static_cast<AsyncWrap*>(this);
141
}
142
143
144
42415
bool LibuvStreamWrap::IsIPCPipe() {
145
42415
  return is_named_pipe_ipc();
146
}
147
148
149
8166
int LibuvStreamWrap::ReadStart() {
150
8166
  return uv_read_start(stream(), OnAlloc, OnRead);
151
}
152
153
154
654
int LibuvStreamWrap::ReadStop() {
155
654
  return uv_read_stop(stream());
156
}
157
158
159
32832
void LibuvStreamWrap::OnAlloc(uv_handle_t* handle,
160
                         size_t suggested_size,
161
                         uv_buf_t* buf) {
162
32832
  LibuvStreamWrap* wrap = static_cast<LibuvStreamWrap*>(handle->data);
163
32832
  HandleScope scope(wrap->env()->isolate());
164
32832
  Context::Scope context_scope(wrap->env()->context());
165
166
32832
  CHECK_EQ(wrap->stream(), reinterpret_cast<uv_stream_t*>(handle));
167
168
65664
  return wrap->EmitAlloc(suggested_size, buf);
169
}
170
171
172
23104
void LibuvStreamWrap::OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx) {
173
23104
  buf->base = node::Malloc(size);
174
23104
  buf->len = size;
175
23104
}
176
177
178
template <class WrapType, class UVType>
179
152
static Local<Object> AcceptHandle(Environment* env, LibuvStreamWrap* parent) {
180
152
  EscapableHandleScope scope(env->isolate());
181
  Local<Object> wrap_obj;
182
  UVType* handle;
183
184
152
  wrap_obj = WrapType::Instantiate(env, parent, WrapType::SOCKET);
185

152
  if (wrap_obj.IsEmpty())
186
    return Local<Object>();
187
188
  WrapType* wrap;
189

152
  ASSIGN_OR_RETURN_UNWRAP(&wrap, wrap_obj, Local<Object>());
190
152
  handle = wrap->UVHandle();
191
192

152
  if (uv_accept(parent->stream(), reinterpret_cast<uv_stream_t*>(handle)))
193
    ABORT();
194
195
152
  return scope.Escape(wrap_obj);
196
}
197
198
199
23595
void LibuvStreamWrap::OnReadImpl(ssize_t nread,
200
                            const uv_buf_t* buf,
201
                            uv_handle_type pending,
202
                            void* ctx) {
203
23595
  LibuvStreamWrap* wrap = static_cast<LibuvStreamWrap*>(ctx);
204
23595
  Environment* env = wrap->env();
205
23595
  HandleScope handle_scope(env->isolate());
206
43572
  Context::Scope context_scope(env->context());
207
208
  Local<Object> pending_obj;
209
210
23595
  if (nread < 0)  {
211
3567
    if (buf->base != nullptr)
212
3076
      free(buf->base);
213
3567
    wrap->EmitData(nread, Local<Object>(), pending_obj);
214
3546
    return;
215
  }
216
217
20028
  if (nread == 0) {
218
9
    if (buf->base != nullptr)
219
9
      free(buf->base);
220
9
    return;
221
  }
222
223
20019
  CHECK_LE(static_cast<size_t>(nread), buf->len);
224
20019
  char* base = node::Realloc(buf->base, nread);
225
226
20019
  if (pending == UV_TCP) {
227
132
    pending_obj = AcceptHandle<TCPWrap, uv_tcp_t>(env, wrap);
228
19887
  } else if (pending == UV_NAMED_PIPE) {
229
1
    pending_obj = AcceptHandle<PipeWrap, uv_pipe_t>(env, wrap);
230
19886
  } else if (pending == UV_UDP) {
231
19
    pending_obj = AcceptHandle<UDPWrap, uv_udp_t>(env, wrap);
232
  } else {
233
19867
    CHECK_EQ(pending, UV_UNKNOWN_HANDLE);
234
  }
235
236
40038
  Local<Object> obj = Buffer::New(env, base, nread).ToLocalChecked();
237
39996
  wrap->EmitData(nread, obj, pending_obj);
238
}
239
240
241
33022
void LibuvStreamWrap::OnRead(uv_stream_t* handle,
242
                        ssize_t nread,
243
                        const uv_buf_t* buf) {
244
33022
  LibuvStreamWrap* wrap = static_cast<LibuvStreamWrap*>(handle->data);
245
33022
  HandleScope scope(wrap->env()->isolate());
246
33022
  Context::Scope context_scope(wrap->env()->context());
247
33022
  uv_handle_type type = UV_UNKNOWN_HANDLE;
248
249

35148
  if (wrap->is_named_pipe_ipc() &&
250
2126
      uv_pipe_pending_count(reinterpret_cast<uv_pipe_t*>(handle)) > 0) {
251
152
    type = uv_pipe_pending_type(reinterpret_cast<uv_pipe_t*>(handle));
252
  }
253
254
  // We should not be getting this callback if someone as already called
255
  // uv_close() on the handle.
256
66044
  CHECK_EQ(wrap->persistent().IsEmpty(), false);
257
258
33022
  if (nread > 0) {
259
29195
    if (wrap->is_tcp()) {
260
      NODE_COUNT_NET_BYTES_RECV(nread);
261
5472
    } else if (wrap->is_named_pipe()) {
262
      NODE_COUNT_PIPE_BYTES_RECV(nread);
263
    }
264
  }
265
266
65976
  wrap->EmitRead(nread, buf, type);
267
32954
}
268
269
270
12441
void LibuvStreamWrap::GetWriteQueueSize(
271
    const FunctionCallbackInfo<Value>& info) {
272
  LibuvStreamWrap* wrap;
273
12441
  ASSIGN_OR_RETURN_UNWRAP(&wrap, info.This());
274
275
12441
  if (wrap->stream() == nullptr) {
276
    info.GetReturnValue().Set(0);
277
    return;
278
  }
279
280
12441
  uint32_t write_queue_size = wrap->stream()->write_queue_size;
281
24882
  info.GetReturnValue().Set(write_queue_size);
282
}
283
284
285
29
void LibuvStreamWrap::SetBlocking(const FunctionCallbackInfo<Value>& args) {
286
  LibuvStreamWrap* wrap;
287
29
  ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
288
289
29
  CHECK_GT(args.Length(), 0);
290
29
  if (!wrap->IsAlive())
291
    return args.GetReturnValue().Set(UV_EINVAL);
292
293
58
  bool enable = args[0]->IsTrue();
294
87
  args.GetReturnValue().Set(uv_stream_set_blocking(wrap->stream(), enable));
295
}
296
297
298
2333
int LibuvStreamWrap::DoShutdown(ShutdownWrap* req_wrap) {
299
  int err;
300
2333
  err = uv_shutdown(req_wrap->req(), stream(), AfterUvShutdown);
301
2333
  req_wrap->Dispatched();
302
2333
  return err;
303
}
304
305
306
2327
void LibuvStreamWrap::AfterUvShutdown(uv_shutdown_t* req, int status) {
307
2327
  ShutdownWrap* req_wrap = ShutdownWrap::from_req(req);
308
2327
  CHECK_NE(req_wrap, nullptr);
309
2327
  HandleScope scope(req_wrap->env()->isolate());
310
2327
  Context::Scope context_scope(req_wrap->env()->context());
311
4654
  req_wrap->Done(status);
312
2327
}
313
314
315
// NOTE: Call to this function could change both `buf`'s and `count`'s
316
// values, shifting their base and decrementing their length. This is
317
// required in order to skip the data that was successfully written via
318
// uv_try_write().
319
60150
int LibuvStreamWrap::DoTryWrite(uv_buf_t** bufs, size_t* count) {
320
  int err;
321
  size_t written;
322
60150
  uv_buf_t* vbufs = *bufs;
323
60150
  size_t vcount = *count;
324
325
60150
  err = uv_try_write(stream(), vbufs, vcount);
326

60150
  if (err == UV_ENOSYS || err == UV_EAGAIN)
327
11
    return 0;
328
60139
  if (err < 0)
329
1
    return err;
330
331
  // Slice off the buffers: skip all written buffers and slice the one that
332
  // was partially written.
333
60138
  written = err;
334
122800
  for (; vcount > 0; vbufs++, vcount--) {
335
    // Slice
336
62726
    if (vbufs[0].len > written) {
337
64
      vbufs[0].base += written;
338
64
      vbufs[0].len -= written;
339
64
      written = 0;
340
64
      break;
341
342
    // Discard
343
    } else {
344
62662
      written -= vbufs[0].len;
345
    }
346
  }
347
348
60138
  *bufs = vbufs;
349
60138
  *count = vcount;
350
351
60138
  return 0;
352
}
353
354
355
15554
int LibuvStreamWrap::DoWrite(WriteWrap* w,
356
                        uv_buf_t* bufs,
357
                        size_t count,
358
                        uv_stream_t* send_handle) {
359
  int r;
360
15554
  if (send_handle == nullptr) {
361
15461
    r = uv_write(w->req(), stream(), bufs, count, AfterUvWrite);
362
  } else {
363
93
    r = uv_write2(w->req(), stream(), bufs, count, send_handle, AfterUvWrite);
364
  }
365
366
15554
  if (!r) {
367
15532
    size_t bytes = 0;
368
73217
    for (size_t i = 0; i < count; i++)
369
57685
      bytes += bufs[i].len;
370
15532
    if (stream()->type == UV_TCP) {
371
      NODE_COUNT_NET_BYTES_SENT(bytes);
372
132
    } else if (stream()->type == UV_NAMED_PIPE) {
373
      NODE_COUNT_PIPE_BYTES_SENT(bytes);
374
    }
375
  }
376
377
15554
  w->Dispatched();
378
379
15554
  return r;
380
}
381
382
383
11328
bool LibuvStreamWrap::HasWriteQueue() {
384
11328
  return stream()->write_queue_size > 0;
385
}
386
387
388
15521
void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) {
389
15521
  WriteWrap* req_wrap = WriteWrap::from_req(req);
390
15521
  CHECK_NE(req_wrap, nullptr);
391
15521
  HandleScope scope(req_wrap->env()->isolate());
392
15521
  Context::Scope context_scope(req_wrap->env()->context());
393
31040
  req_wrap->Done(status);
394
15519
}
395
396
397
15519
void LibuvStreamWrap::AfterWrite(WriteWrap* w, int status) {
398
15519
  StreamBase::AfterWrite(w, status);
399
15517
}
400
401
}  // namespace node
402
403
3391
NODE_BUILTIN_MODULE_CONTEXT_AWARE(stream_wrap,
404
                                  node::LibuvStreamWrap::Initialize)