GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/stream_base.cc Lines: 259 270 95.9 %
Date: 2019-09-24 22:36:24 Branches: 199 312 63.8 %

Line Branch Exec Source
1
#include "stream_base.h"  // NOLINT(build/include_inline)
2
#include "stream_base-inl.h"
3
#include "stream_wrap.h"
4
5
#include "node.h"
6
#include "node_buffer.h"
7
#include "node_errors.h"
8
#include "env-inl.h"
9
#include "js_stream.h"
10
#include "string_bytes.h"
11
#include "util-inl.h"
12
#include "v8.h"
13
14
#include <climits>  // INT_MAX
15
16
namespace node {
17
18
using v8::Array;
19
using v8::ArrayBuffer;
20
using v8::Context;
21
using v8::DontDelete;
22
using v8::DontEnum;
23
using v8::External;
24
using v8::Function;
25
using v8::FunctionCallbackInfo;
26
using v8::HandleScope;
27
using v8::Integer;
28
using v8::Local;
29
using v8::MaybeLocal;
30
using v8::Object;
31
using v8::ReadOnly;
32
using v8::String;
33
using v8::Value;
34
35
template int StreamBase::WriteString<ASCII>(
36
    const FunctionCallbackInfo<Value>& args);
37
template int StreamBase::WriteString<UTF8>(
38
    const FunctionCallbackInfo<Value>& args);
39
template int StreamBase::WriteString<UCS2>(
40
    const FunctionCallbackInfo<Value>& args);
41
template int StreamBase::WriteString<LATIN1>(
42
    const FunctionCallbackInfo<Value>& args);
43
44
45
74892
int StreamBase::ReadStartJS(const FunctionCallbackInfo<Value>& args) {
46
74892
  return ReadStart();
47
}
48
49
50
2053
int StreamBase::ReadStopJS(const FunctionCallbackInfo<Value>& args) {
51
2053
  return ReadStop();
52
}
53
54
6
int StreamBase::UseUserBuffer(const FunctionCallbackInfo<Value>& args) {
55
6
  CHECK(Buffer::HasInstance(args[0]));
56
57
12
  uv_buf_t buf = uv_buf_init(Buffer::Data(args[0]), Buffer::Length(args[0]));
58
6
  PushStreamListener(new CustomBufferJSListener(buf));
59
6
  return 0;
60
}
61
62
56203
int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
63
112406
  CHECK(args[0]->IsObject());
64
112406
  Local<Object> req_wrap_obj = args[0].As<Object>();
65
66
56203
  return Shutdown(req_wrap_obj);
67
}
68
69
878658
void StreamBase::SetWriteResult(const StreamWriteResult& res) {
70
878658
  env_->stream_base_state()[kBytesWritten] = res.bytes;
71
878658
  env_->stream_base_state()[kLastWriteWasAsync] = res.async;
72
878658
}
73
74
113812
int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
75
113812
  Environment* env = Environment::GetCurrent(args);
76
77
227624
  CHECK(args[0]->IsObject());
78
227624
  CHECK(args[1]->IsArray());
79
80
227624
  Local<Object> req_wrap_obj = args[0].As<Object>();
81
227624
  Local<Array> chunks = args[1].As<Array>();
82
227624
  bool all_buffers = args[2]->IsTrue();
83
84
  size_t count;
85
113812
  if (all_buffers)
86
295
    count = chunks->Length();
87
  else
88
113517
    count = chunks->Length() >> 1;
89
90
113812
  MaybeStackBuffer<uv_buf_t, 16> bufs(count);
91
92
113812
  size_t storage_size = 0;
93
  size_t offset;
94
95
113812
  if (!all_buffers) {
96
    // Determine storage size first
97
635381
    for (size_t i = 0; i < count; i++) {
98
1565592
      Local<Value> chunk = chunks->Get(env->context(), i * 2).ToLocalChecked();
99
100
521864
      if (Buffer::HasInstance(chunk))
101
43251
        continue;
102
        // Buffer chunk, no additional storage required
103
104
      // String chunk
105
1435839
      Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
106
      enum encoding encoding = ParseEncoding(env->isolate(),
107
1435839
          chunks->Get(env->context(), i * 2 + 1).ToLocalChecked());
108
      size_t chunk_size;
109


1717099
      if (encoding == UTF8 && string->Length() > 65535 &&
110

478625
          !StringBytes::Size(env->isolate(), string, encoding).To(&chunk_size))
111
        return 0;
112
1435839
      else if (!StringBytes::StorageSize(env->isolate(), string, encoding)
113
1435839
                    .To(&chunk_size))
114
        return 0;
115
478613
      storage_size += chunk_size;
116
    }
117
118
113517
    if (storage_size > INT_MAX)
119
      return UV_ENOBUFS;
120
  } else {
121
136700
    for (size_t i = 0; i < count; i++) {
122
409215
      Local<Value> chunk = chunks->Get(env->context(), i).ToLocalChecked();
123
136405
      bufs[i].base = Buffer::Data(chunk);
124
136405
      bufs[i].len = Buffer::Length(chunk);
125
    }
126
  }
127
128
227624
  AllocatedBuffer storage;
129
113812
  if (storage_size > 0)
130
113471
    storage = env->AllocateManaged(storage_size);
131
132
113812
  offset = 0;
133
113812
  if (!all_buffers) {
134
635381
    for (size_t i = 0; i < count; i++) {
135
1565592
      Local<Value> chunk = chunks->Get(env->context(), i * 2).ToLocalChecked();
136
137
      // Write buffer
138
521864
      if (Buffer::HasInstance(chunk)) {
139
43251
        bufs[i].base = Buffer::Data(chunk);
140
43251
        bufs[i].len = Buffer::Length(chunk);
141
43251
        continue;
142
      }
143
144
      // Write string
145
478613
      CHECK_LE(offset, storage_size);
146
478613
      char* str_storage = storage.data() + offset;
147
478613
      size_t str_size = storage.size() - offset;
148
149
1435839
      Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
150
      enum encoding encoding = ParseEncoding(env->isolate(),
151
1435839
          chunks->Get(env->context(), i * 2 + 1).ToLocalChecked());
152
      str_size = StringBytes::Write(env->isolate(),
153
                                    str_storage,
154
                                    str_size,
155
                                    string,
156
478613
                                    encoding);
157
478613
      bufs[i].base = str_storage;
158
478613
      bufs[i].len = str_size;
159
478613
      offset += str_size;
160
    }
161
  }
162
163
113812
  StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj);
164
113812
  SetWriteResult(res);
165

113812
  if (res.wrap != nullptr && storage_size > 0) {
166
348
    res.wrap->SetAllocatedStorage(std::move(storage));
167
  }
168
227624
  return res.err;
169
}
170
171
172
51052
int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
173
102104
  CHECK(args[0]->IsObject());
174
175
51052
  Environment* env = Environment::GetCurrent(args);
176
177
102104
  if (!args[1]->IsUint8Array()) {
178
1
    node::THROW_ERR_INVALID_ARG_TYPE(env, "Second argument must be a buffer");
179
1
    return 0;
180
  }
181
182
102102
  Local<Object> req_wrap_obj = args[0].As<Object>();
183
184
  uv_buf_t buf;
185
51051
  buf.base = Buffer::Data(args[1]);
186
51051
  buf.len = Buffer::Length(args[1]);
187
188
51051
  StreamWriteResult res = Write(&buf, 1, nullptr, req_wrap_obj);
189
51051
  SetWriteResult(res);
190
191
51051
  return res.err;
192
}
193
194
195
template <enum encoding enc>
196
713795
int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
197
713795
  Environment* env = Environment::GetCurrent(args);
198


1427590
  CHECK(args[0]->IsObject());
199


2141385
  CHECK(args[1]->IsString());
200
201
1427590
  Local<Object> req_wrap_obj = args[0].As<Object>();
202
1427590
  Local<String> string = args[1].As<String>();
203
  Local<Object> send_handle_obj;
204


1427590
  if (args[2]->IsObject())
205
206
    send_handle_obj = args[2].As<Object>();
206
207
  // Compute the size of the storage that the string will be flattened into.
208
  // For UTF8 strings that are very long, go ahead and take the hit for
209
  // computing their actual size, rather than tripling the storage.
210
  size_t storage_size;
211

2804314
  if (enc == UTF8 && string->Length() > 65535 &&
212

701150
      !StringBytes::Size(env->isolate(), string, enc).To(&storage_size))
213
    return 0;
214


2141385
  else if (!StringBytes::StorageSize(env->isolate(), string, enc)
215
2141385
                .To(&storage_size))
216
    return 0;
217
218


713795
  if (storage_size > INT_MAX)
219
    return UV_ENOBUFS;
220
221
  // Try writing immediately if write size isn't too big
222
  char stack_storage[16384];  // 16kb
223
  size_t data_size;
224
713795
  size_t synchronously_written = 0;
225
  uv_buf_t buf;
226
227
  bool try_write = storage_size <= sizeof(stack_storage) &&
228






715645
                   (!IsIPCPipe() || send_handle_obj.IsEmpty());
229


713795
  if (try_write) {
230
712640
    data_size = StringBytes::Write(env->isolate(),
231
                                   stack_storage,
232
                                   storage_size,
233
                                   string,
234
1425280
                                   enc);
235
712640
    buf = uv_buf_init(stack_storage, data_size);
236
237
712640
    uv_buf_t* bufs = &buf;
238
712640
    size_t count = 1;
239
712640
    const int err = DoTryWrite(&bufs, &count);
240
    // Keep track of the bytes written here, because we're taking a shortcut
241
    // by using `DoTryWrite()` directly instead of using the utilities
242
    // provided by `Write()`.
243


712640
    synchronously_written = count == 0 ? data_size : data_size - buf.len;
244
712640
    bytes_written_ += synchronously_written;
245
246
    // Immediate failure or success
247




712640
    if (err != 0 || count == 0) {
248
710807
      SetWriteResult(StreamWriteResult { false, err, nullptr, data_size });
249
710807
      return err;
250
    }
251
252
    // Partial write
253


1833
    CHECK_EQ(count, 1);
254
  }
255
256
2988
  AllocatedBuffer data;
257
258


2988
  if (try_write) {
259
    // Copy partial data
260
1833
    data = env->AllocateManaged(buf.len);
261
1833
    memcpy(data.data(), buf.base, buf.len);
262
1833
    data_size = buf.len;
263
  } else {
264
    // Write it
265
1155
    data = env->AllocateManaged(storage_size);
266
1155
    data_size = StringBytes::Write(env->isolate(),
267
                                   data.data(),
268
                                   storage_size,
269
                                   string,
270
2310
                                   enc);
271
  }
272
273


2988
  CHECK_LE(data_size, storage_size);
274
275
2988
  buf = uv_buf_init(data.data(), data_size);
276
277
2988
  uv_stream_t* send_handle = nullptr;
278
279






3092
  if (IsIPCPipe() && !send_handle_obj.IsEmpty()) {
280
    HandleWrap* wrap;
281


103
    ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
282
103
    send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
283
    // Reference LibuvStreamWrap instance to prevent it from being garbage
284
    // collected before `AfterWrite` is called.
285
    req_wrap_obj->Set(env->context(),
286
                      env->handle_string(),
287
412
                      send_handle_obj).Check();
288
  }
289
290
2988
  StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
291
2988
  res.bytes += synchronously_written;
292
293
2988
  SetWriteResult(res);
294


2988
  if (res.wrap != nullptr) {
295
2965
    res.wrap->SetAllocatedStorage(std::move(data));
296
  }
297
298
2988
  return res.err;
299
}
300
301
302
373370
MaybeLocal<Value> StreamBase::CallJSOnreadMethod(ssize_t nread,
303
                                                 Local<ArrayBuffer> ab,
304
                                                 size_t offset,
305
                                                 StreamBaseJSChecks checks) {
306
373370
  Environment* env = env_;
307
308
  DCHECK_EQ(static_cast<int32_t>(nread), nread);
309
  DCHECK_LE(offset, INT32_MAX);
310
311
373370
  if (checks == DONT_SKIP_NREAD_CHECKS) {
312
373344
    if (ab.IsEmpty()) {
313
      DCHECK_EQ(offset, 0);
314
      DCHECK_LE(nread, 0);
315
    } else {
316
      DCHECK_GE(nread, 0);
317
    }
318
  }
319
320
373370
  env->stream_base_state()[kReadBytesOrError] = nread;
321
373370
  env->stream_base_state()[kArrayBufferOffset] = offset;
322
323
  Local<Value> argv[] = {
324
891082
    ab.IsEmpty() ? Undefined(env->isolate()).As<Value>() : ab.As<Value>()
325
1866850
  };
326
327
373370
  AsyncWrap* wrap = GetAsyncWrap();
328
373370
  CHECK_NOT_NULL(wrap);
329
1120110
  Local<Value> onread = wrap->object()->GetInternalField(kOnReadFunctionField);
330
373370
  CHECK(onread->IsFunction());
331
746740
  return wrap->MakeCallback(onread.As<Function>(), arraysize(argv), argv);
332
}
333
334
335
3277
bool StreamBase::IsIPCPipe() {
336
3277
  return false;
337
}
338
339
340
int StreamBase::GetFD() {
341
  return -1;
342
}
343
344
345
74583
Local<Object> StreamBase::GetObject() {
346
74583
  return GetAsyncWrap()->object();
347
}
348
349
44960
void StreamBase::AddMethod(Environment* env,
350
                           Local<Signature> signature,
351
                           enum PropertyAttribute attributes,
352
                           Local<FunctionTemplate> t,
353
                           JSMethodFunction* stream_method,
354
                           Local<String> string) {
355
  Local<FunctionTemplate> templ =
356
      env->NewFunctionTemplate(stream_method,
357
                               signature,
358
                               v8::ConstructorBehavior::kThrow,
359
44960
                               v8::SideEffectType::kHasNoSideEffect);
360
134880
  t->PrototypeTemplate()->SetAccessorProperty(
361
89920
      string, templ, Local<FunctionTemplate>(), attributes);
362
44960
}
363
364
11240
void StreamBase::AddMethods(Environment* env, Local<FunctionTemplate> t) {
365
11240
  HandleScope scope(env->isolate());
366
367
  enum PropertyAttribute attributes =
368
11240
      static_cast<PropertyAttribute>(ReadOnly | DontDelete | DontEnum);
369
11240
  Local<Signature> sig = Signature::New(env->isolate(), t);
370
371
11240
  AddMethod(env, sig, attributes, t, GetFD, env->fd_string());
372
  AddMethod(
373
11240
      env, sig, attributes, t, GetExternal, env->external_stream_string());
374
11240
  AddMethod(env, sig, attributes, t, GetBytesRead, env->bytes_read_string());
375
  AddMethod(
376
11240
      env, sig, attributes, t, GetBytesWritten, env->bytes_written_string());
377
11240
  env->SetProtoMethod(t, "readStart", JSMethod<&StreamBase::ReadStartJS>);
378
11240
  env->SetProtoMethod(t, "readStop", JSMethod<&StreamBase::ReadStopJS>);
379
11240
  env->SetProtoMethod(t, "shutdown", JSMethod<&StreamBase::Shutdown>);
380
  env->SetProtoMethod(t,
381
                      "useUserBuffer",
382
11240
                      JSMethod<&StreamBase::UseUserBuffer>);
383
11240
  env->SetProtoMethod(t, "writev", JSMethod<&StreamBase::Writev>);
384
11240
  env->SetProtoMethod(t, "writeBuffer", JSMethod<&StreamBase::WriteBuffer>);
385
  env->SetProtoMethod(
386
11240
      t, "writeAsciiString", JSMethod<&StreamBase::WriteString<ASCII>>);
387
  env->SetProtoMethod(
388
11240
      t, "writeUtf8String", JSMethod<&StreamBase::WriteString<UTF8>>);
389
  env->SetProtoMethod(
390
11240
      t, "writeUcs2String", JSMethod<&StreamBase::WriteString<UCS2>>);
391
  env->SetProtoMethod(
392
11240
      t, "writeLatin1String", JSMethod<&StreamBase::WriteString<LATIN1>>);
393
33720
  t->PrototypeTemplate()->Set(FIXED_ONE_BYTE_STRING(env->isolate(),
394
                                                    "isStreamBase"),
395
44960
                              True(env->isolate()));
396
11240
  t->PrototypeTemplate()->SetAccessor(
397
      FIXED_ONE_BYTE_STRING(env->isolate(), "onread"),
398
      BaseObject::InternalFieldGet<kOnReadFunctionField>,
399
33720
      BaseObject::InternalFieldSet<kOnReadFunctionField, &Value::IsFunction>);
400
11240
}
401
402
195
void StreamBase::GetFD(const FunctionCallbackInfo<Value>& args) {
403
  // Mimic implementation of StreamBase::GetFD() and UDPWrap::GetFD().
404
390
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
405
195
  if (wrap == nullptr) return args.GetReturnValue().Set(UV_EINVAL);
406
407
195
  if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL);
408
409
585
  args.GetReturnValue().Set(wrap->GetFD());
410
}
411
412
63228
void StreamBase::GetBytesRead(const FunctionCallbackInfo<Value>& args) {
413
126456
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
414
126458
  if (wrap == nullptr) return args.GetReturnValue().Set(0);
415
416
  // uint64_t -> double. 53bits is enough for all real cases.
417
189681
  args.GetReturnValue().Set(static_cast<double>(wrap->bytes_read_));
418
}
419
420
63228
void StreamBase::GetBytesWritten(const FunctionCallbackInfo<Value>& args) {
421
126456
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
422
126456
  if (wrap == nullptr) return args.GetReturnValue().Set(0);
423
424
  // uint64_t -> double. 53bits is enough for all real cases.
425
189684
  args.GetReturnValue().Set(static_cast<double>(wrap->bytes_written_));
426
}
427
428
2
void StreamBase::GetExternal(const FunctionCallbackInfo<Value>& args) {
429
4
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
430
4
  if (wrap == nullptr) return;
431
432
2
  Local<External> ext = External::New(args.GetIsolate(), wrap);
433
4
  args.GetReturnValue().Set(ext);
434
}
435
436
template <int (StreamBase::*Method)(const FunctionCallbackInfo<Value>& args)>
437
1011813
void StreamBase::JSMethod(const FunctionCallbackInfo<Value>& args) {
438
2023626
  StreamBase* wrap = StreamBase::FromObject(args.Holder().As<Object>());
439





1011815
  if (wrap == nullptr) return;
440
441





1011816
  if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL);
442
443
1011813
  AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap->GetAsyncWrap());
444
3035438
  args.GetReturnValue().Set((wrap->*Method)(args));
445
}
446
447
7750
int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
448
  // No TryWrite by default
449
7750
  return 0;
450
}
451
452
453
97154
const char* StreamResource::Error() const {
454
97154
  return nullptr;
455
}
456
457
458
void StreamResource::ClearError() {
459
  // No-op
460
}
461
462
463
318066
uv_buf_t EmitToJSStreamListener::OnStreamAlloc(size_t suggested_size) {
464
318066
  CHECK_NOT_NULL(stream_);
465
318066
  Environment* env = static_cast<StreamBase*>(stream_)->stream_env();
466
318066
  return env->AllocateManaged(suggested_size).release();
467
}
468
469
360678
void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {
470
360678
  CHECK_NOT_NULL(stream_);
471
360678
  StreamBase* stream = static_cast<StreamBase*>(stream_);
472
360678
  Environment* env = stream->stream_env();
473
360678
  HandleScope handle_scope(env->isolate());
474
673208
  Context::Scope context_scope(env->context());
475
673208
  AllocatedBuffer buf(env, buf_);
476
477
360678
  if (nread <= 0)  {
478
48102
    if (nread < 0)
479
48088
      stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
480
408676
    return;
481
  }
482
483
312576
  CHECK_LE(static_cast<size_t>(nread), buf.size());
484
312576
  buf.Resize(nread);
485
486
625106
  stream->CallJSOnreadMethod(nread, buf.ToArrayBuffer());
487
}
488
489
490
26
uv_buf_t CustomBufferJSListener::OnStreamAlloc(size_t suggested_size) {
491
26
  return buffer_;
492
}
493
494
495
26
void CustomBufferJSListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
496
26
  CHECK_NOT_NULL(stream_);
497
26
  CHECK_EQ(buf.base, buffer_.base);
498
499
26
  StreamBase* stream = static_cast<StreamBase*>(stream_);
500
26
  Environment* env = stream->stream_env();
501
26
  HandleScope handle_scope(env->isolate());
502
26
  Context::Scope context_scope(env->context());
503
504
  MaybeLocal<Value> ret = stream->CallJSOnreadMethod(nread,
505
                             Local<ArrayBuffer>(),
506
                             0,
507
26
                             StreamBase::SKIP_NREAD_CHECKS);
508
  Local<Value> next_buf_v;
509

78
  if (ret.ToLocal(&next_buf_v) && !next_buf_v->IsUndefined()) {
510
12
    buffer_.base = Buffer::Data(next_buf_v);
511
12
    buffer_.len = Buffer::Length(next_buf_v);
512
26
  }
513
26
}
514
515
516
37747
void ReportWritesToJSStreamListener::OnStreamAfterReqFinished(
517
    StreamReq* req_wrap, int status) {
518
37747
  StreamBase* stream = static_cast<StreamBase*>(stream_);
519
37747
  Environment* env = stream->stream_env();
520
37747
  AsyncWrap* async_wrap = req_wrap->GetAsyncWrap();
521
37747
  HandleScope handle_scope(env->isolate());
522
37747
  Context::Scope context_scope(env->context());
523
75494
  CHECK(!async_wrap->persistent().IsEmpty());
524
37747
  Local<Object> req_wrap_obj = async_wrap->object();
525
526
  Local<Value> argv[] = {
527
    Integer::New(env->isolate(), status),
528
37747
    stream->GetObject(),
529
    Undefined(env->isolate())
530
188735
  };
531
532
37747
  const char* msg = stream->Error();
533
37747
  if (msg != nullptr) {
534
    argv[2] = OneByteString(env->isolate(), msg);
535
    stream->ClearError();
536
  }
537
538
150988
  if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
539
75490
    async_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
540
37745
}
541
542
4856
void ReportWritesToJSStreamListener::OnStreamAfterWrite(
543
    WriteWrap* req_wrap, int status) {
544
4856
  OnStreamAfterReqFinished(req_wrap, status);
545
4854
}
546
547
32891
void ReportWritesToJSStreamListener::OnStreamAfterShutdown(
548
    ShutdownWrap* req_wrap, int status) {
549
32891
  OnStreamAfterReqFinished(req_wrap, status);
550
32891
}
551
552
553
}  // namespace node