GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/stream_base.cc Lines: 238 249 95.6 %
Date: 2019-07-28 22:34:34 Branches: 187 294 63.6 %

Line Branch Exec Source
1
#include "stream_base.h"  // NOLINT(build/include_inline)
2
#include "stream_base-inl.h"
3
#include "stream_wrap.h"
4
5
#include "node.h"
6
#include "node_buffer.h"
7
#include "node_errors.h"
8
#include "env-inl.h"
9
#include "js_stream.h"
10
#include "string_bytes.h"
11
#include "util-inl.h"
12
#include "v8.h"
13
14
#include <climits>  // INT_MAX
15
16
namespace node {
17
18
using v8::Array;
19
using v8::ArrayBuffer;
20
using v8::Context;
21
using v8::DontDelete;
22
using v8::DontEnum;
23
using v8::External;
24
using v8::Function;
25
using v8::FunctionCallbackInfo;
26
using v8::HandleScope;
27
using v8::Integer;
28
using v8::Local;
29
using v8::Object;
30
using v8::ReadOnly;
31
using v8::String;
32
using v8::Value;
33
34
template int StreamBase::WriteString<ASCII>(
35
    const FunctionCallbackInfo<Value>& args);
36
template int StreamBase::WriteString<UTF8>(
37
    const FunctionCallbackInfo<Value>& args);
38
template int StreamBase::WriteString<UCS2>(
39
    const FunctionCallbackInfo<Value>& args);
40
template int StreamBase::WriteString<LATIN1>(
41
    const FunctionCallbackInfo<Value>& args);
42
43
44
60387
int StreamBase::ReadStartJS(const FunctionCallbackInfo<Value>& args) {
45
60387
  return ReadStart();
46
}
47
48
49
2067
int StreamBase::ReadStopJS(const FunctionCallbackInfo<Value>& args) {
50
2067
  return ReadStop();
51
}
52
53
54
42843
int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
55
85686
  CHECK(args[0]->IsObject());
56
85686
  Local<Object> req_wrap_obj = args[0].As<Object>();
57
58
42843
  return Shutdown(req_wrap_obj);
59
}
60
61
844827
void StreamBase::SetWriteResult(const StreamWriteResult& res) {
62
844827
  env_->stream_base_state()[kBytesWritten] = res.bytes;
63
844827
  env_->stream_base_state()[kLastWriteWasAsync] = res.async;
64
844827
}
65
66
68894
int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
67
68894
  Environment* env = Environment::GetCurrent(args);
68
69
137788
  CHECK(args[0]->IsObject());
70
137788
  CHECK(args[1]->IsArray());
71
72
137788
  Local<Object> req_wrap_obj = args[0].As<Object>();
73
137788
  Local<Array> chunks = args[1].As<Array>();
74
137788
  bool all_buffers = args[2]->IsTrue();
75
76
  size_t count;
77
68894
  if (all_buffers)
78
284
    count = chunks->Length();
79
  else
80
68610
    count = chunks->Length() >> 1;
81
82
68894
  MaybeStackBuffer<uv_buf_t, 16> bufs(count);
83
84
68894
  size_t storage_size = 0;
85
  size_t offset;
86
87
68894
  if (!all_buffers) {
88
    // Determine storage size first
89
467062
    for (size_t i = 0; i < count; i++) {
90
1195356
      Local<Value> chunk = chunks->Get(env->context(), i * 2).ToLocalChecked();
91
92
398452
      if (Buffer::HasInstance(chunk))
93
43413
        continue;
94
        // Buffer chunk, no additional storage required
95
96
      // String chunk
97
1065117
      Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
98
      enum encoding encoding = ParseEncoding(env->isolate(),
99
1065117
          chunks->Get(env->context(), i * 2 + 1).ToLocalChecked());
100
      size_t chunk_size;
101


1300372
      if (encoding == UTF8 && string->Length() > 65535 &&
102

355051
          !StringBytes::Size(env->isolate(), string, encoding).To(&chunk_size))
103
        return 0;
104
1065117
      else if (!StringBytes::StorageSize(env->isolate(), string, encoding)
105
1065117
                    .To(&chunk_size))
106
        return 0;
107
355039
      storage_size += chunk_size;
108
    }
109
110
68610
    if (storage_size > INT_MAX)
111
      return UV_ENOBUFS;
112
  } else {
113
61332
    for (size_t i = 0; i < count; i++) {
114
183144
      Local<Value> chunk = chunks->Get(env->context(), i).ToLocalChecked();
115
61048
      bufs[i].base = Buffer::Data(chunk);
116
61048
      bufs[i].len = Buffer::Length(chunk);
117
    }
118
  }
119
120
137788
  AllocatedBuffer storage;
121
68894
  if (storage_size > 0)
122
68564
    storage = env->AllocateManaged(storage_size);
123
124
68894
  offset = 0;
125
68894
  if (!all_buffers) {
126
467062
    for (size_t i = 0; i < count; i++) {
127
1195356
      Local<Value> chunk = chunks->Get(env->context(), i * 2).ToLocalChecked();
128
129
      // Write buffer
130
398452
      if (Buffer::HasInstance(chunk)) {
131
43413
        bufs[i].base = Buffer::Data(chunk);
132
43413
        bufs[i].len = Buffer::Length(chunk);
133
43413
        continue;
134
      }
135
136
      // Write string
137
355039
      CHECK_LE(offset, storage_size);
138
355039
      char* str_storage = storage.data() + offset;
139
355039
      size_t str_size = storage.size() - offset;
140
141
1065117
      Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
142
      enum encoding encoding = ParseEncoding(env->isolate(),
143
1065117
          chunks->Get(env->context(), i * 2 + 1).ToLocalChecked());
144
      str_size = StringBytes::Write(env->isolate(),
145
                                    str_storage,
146
                                    str_size,
147
                                    string,
148
355039
                                    encoding);
149
355039
      bufs[i].base = str_storage;
150
355039
      bufs[i].len = str_size;
151
355039
      offset += str_size;
152
    }
153
  }
154
155
68894
  StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj);
156
68894
  SetWriteResult(res);
157

68894
  if (res.wrap != nullptr && storage_size > 0) {
158
326
    res.wrap->SetAllocatedStorage(std::move(storage));
159
  }
160
137788
  return res.err;
161
}
162
163
164
59726
int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
165
119452
  CHECK(args[0]->IsObject());
166
167
59726
  Environment* env = Environment::GetCurrent(args);
168
169
119452
  if (!args[1]->IsUint8Array()) {
170
1
    node::THROW_ERR_INVALID_ARG_TYPE(env, "Second argument must be a buffer");
171
1
    return 0;
172
  }
173
174
119450
  Local<Object> req_wrap_obj = args[0].As<Object>();
175
176
  uv_buf_t buf;
177
59725
  buf.base = Buffer::Data(args[1]);
178
59725
  buf.len = Buffer::Length(args[1]);
179
180
59725
  StreamWriteResult res = Write(&buf, 1, nullptr, req_wrap_obj);
181
59725
  SetWriteResult(res);
182
183
59725
  return res.err;
184
}
185
186
187
template <enum encoding enc>
188
716208
int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
189
716208
  Environment* env = Environment::GetCurrent(args);
190


1432416
  CHECK(args[0]->IsObject());
191


2148624
  CHECK(args[1]->IsString());
192
193
1432416
  Local<Object> req_wrap_obj = args[0].As<Object>();
194
1432416
  Local<String> string = args[1].As<String>();
195
  Local<Object> send_handle_obj;
196


1432416
  if (args[2]->IsObject())
197
204
    send_handle_obj = args[2].As<Object>();
198
199
  // Compute the size of the storage that the string will be flattened into.
200
  // For UTF8 strings that are very long, go ahead and take the hit for
201
  // computing their actual size, rather than tripling the storage.
202
  size_t storage_size;
203

2813622
  if (enc == UTF8 && string->Length() > 65535 &&
204

703477
      !StringBytes::Size(env->isolate(), string, enc).To(&storage_size))
205
    return 0;
206


2148624
  else if (!StringBytes::StorageSize(env->isolate(), string, enc)
207
2148624
                .To(&storage_size))
208
    return 0;
209
210


716208
  if (storage_size > INT_MAX)
211
    return UV_ENOBUFS;
212
213
  // Try writing immediately if write size isn't too big
214
  char stack_storage[16384];  // 16kb
215
  size_t data_size;
216
716208
  size_t synchronously_written = 0;
217
  uv_buf_t buf;
218
219
  bool try_write = storage_size <= sizeof(stack_storage) &&
220






718047
                   (!IsIPCPipe() || send_handle_obj.IsEmpty());
221


716208
  if (try_write) {
222
714914
    data_size = StringBytes::Write(env->isolate(),
223
                                   stack_storage,
224
                                   storage_size,
225
                                   string,
226
1429828
                                   enc);
227
714914
    buf = uv_buf_init(stack_storage, data_size);
228
229
714914
    uv_buf_t* bufs = &buf;
230
714914
    size_t count = 1;
231
714914
    const int err = DoTryWrite(&bufs, &count);
232
    // Keep track of the bytes written here, because we're taking a shortcut
233
    // by using `DoTryWrite()` directly instead of using the utilities
234
    // provided by `Write()`.
235


714914
    synchronously_written = count == 0 ? data_size : data_size - buf.len;
236
714914
    bytes_written_ += synchronously_written;
237
238
    // Immediate failure or success
239




714914
    if (err != 0 || count == 0) {
240
713140
      SetWriteResult(StreamWriteResult { false, err, nullptr, data_size });
241
713140
      return err;
242
    }
243
244
    // Partial write
245


1774
    CHECK_EQ(count, 1);
246
  }
247
248
3068
  AllocatedBuffer data;
249
250


3068
  if (try_write) {
251
    // Copy partial data
252
1774
    data = env->AllocateManaged(buf.len);
253
1774
    memcpy(data.data(), buf.base, buf.len);
254
1774
    data_size = buf.len;
255
  } else {
256
    // Write it
257
1294
    data = env->AllocateManaged(storage_size);
258
1294
    data_size = StringBytes::Write(env->isolate(),
259
                                   data.data(),
260
                                   storage_size,
261
                                   string,
262
2588
                                   enc);
263
  }
264
265


3068
  CHECK_LE(data_size, storage_size);
266
267
3068
  buf = uv_buf_init(data.data(), data_size);
268
269
3068
  uv_stream_t* send_handle = nullptr;
270
271






3171
  if (IsIPCPipe() && !send_handle_obj.IsEmpty()) {
272
    HandleWrap* wrap;
273


102
    ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
274
102
    send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
275
    // Reference LibuvStreamWrap instance to prevent it from being garbage
276
    // collected before `AfterWrite` is called.
277
    req_wrap_obj->Set(env->context(),
278
                      env->handle_string(),
279
408
                      send_handle_obj).Check();
280
  }
281
282
3068
  StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
283
3068
  res.bytes += synchronously_written;
284
285
3068
  SetWriteResult(res);
286


3068
  if (res.wrap != nullptr) {
287
2895
    res.wrap->SetAllocatedStorage(std::move(data));
288
  }
289
290
3068
  return res.err;
291
}
292
293
294
349948
void StreamBase::CallJSOnreadMethod(ssize_t nread,
295
                                    Local<ArrayBuffer> ab,
296
                                    size_t offset) {
297
349948
  Environment* env = env_;
298
299
  DCHECK_EQ(static_cast<int32_t>(nread), nread);
300
  DCHECK_LE(offset, INT32_MAX);
301
302
349948
  if (ab.IsEmpty()) {
303
    DCHECK_EQ(offset, 0);
304
    DCHECK_LE(nread, 0);
305
  } else {
306
    DCHECK_GE(nread, 0);
307
  }
308
309
349948
  env->stream_base_state()[kReadBytesOrError] = nread;
310
349948
  env->stream_base_state()[kArrayBufferOffset] = offset;
311
312
  Local<Value> argv[] = {
313
805295
    ab.IsEmpty() ? Undefined(env->isolate()).As<Value>() : ab.As<Value>()
314
1749740
  };
315
316
349948
  AsyncWrap* wrap = GetAsyncWrap();
317
349948
  CHECK_NOT_NULL(wrap);
318
1049844
  Local<Value> onread = wrap->object()->GetInternalField(kOnReadFunctionField);
319
349948
  CHECK(onread->IsFunction());
320
699896
  wrap->MakeCallback(onread.As<Function>(), arraysize(argv), argv);
321
349874
}
322
323
324
3317
bool StreamBase::IsIPCPipe() {
325
3317
  return false;
326
}
327
328
329
int StreamBase::GetFD() {
330
  return -1;
331
}
332
333
334
60051
Local<Object> StreamBase::GetObject() {
335
60051
  return GetAsyncWrap()->object();
336
}
337
338
43760
void StreamBase::AddMethod(Environment* env,
339
                           Local<Signature> signature,
340
                           enum PropertyAttribute attributes,
341
                           Local<FunctionTemplate> t,
342
                           JSMethodFunction* stream_method,
343
                           Local<String> string) {
344
  Local<FunctionTemplate> templ =
345
      env->NewFunctionTemplate(stream_method,
346
                               signature,
347
                               v8::ConstructorBehavior::kThrow,
348
43760
                               v8::SideEffectType::kHasNoSideEffect);
349
131280
  t->PrototypeTemplate()->SetAccessorProperty(
350
87520
      string, templ, Local<FunctionTemplate>(), attributes);
351
43760
}
352
353
10940
void StreamBase::AddMethods(Environment* env, Local<FunctionTemplate> t) {
354
10940
  HandleScope scope(env->isolate());
355
356
  enum PropertyAttribute attributes =
357
10940
      static_cast<PropertyAttribute>(ReadOnly | DontDelete | DontEnum);
358
10940
  Local<Signature> sig = Signature::New(env->isolate(), t);
359
360
10940
  AddMethod(env, sig, attributes, t, GetFD, env->fd_string());
361
  AddMethod(
362
10940
      env, sig, attributes, t, GetExternal, env->external_stream_string());
363
10940
  AddMethod(env, sig, attributes, t, GetBytesRead, env->bytes_read_string());
364
  AddMethod(
365
10940
      env, sig, attributes, t, GetBytesWritten, env->bytes_written_string());
366
10940
  env->SetProtoMethod(t, "readStart", JSMethod<&StreamBase::ReadStartJS>);
367
10940
  env->SetProtoMethod(t, "readStop", JSMethod<&StreamBase::ReadStopJS>);
368
10940
  env->SetProtoMethod(t, "shutdown", JSMethod<&StreamBase::Shutdown>);
369
10940
  env->SetProtoMethod(t, "writev", JSMethod<&StreamBase::Writev>);
370
10940
  env->SetProtoMethod(t, "writeBuffer", JSMethod<&StreamBase::WriteBuffer>);
371
  env->SetProtoMethod(
372
10940
      t, "writeAsciiString", JSMethod<&StreamBase::WriteString<ASCII>>);
373
  env->SetProtoMethod(
374
10940
      t, "writeUtf8String", JSMethod<&StreamBase::WriteString<UTF8>>);
375
  env->SetProtoMethod(
376
10940
      t, "writeUcs2String", JSMethod<&StreamBase::WriteString<UCS2>>);
377
  env->SetProtoMethod(
378
10940
      t, "writeLatin1String", JSMethod<&StreamBase::WriteString<LATIN1>>);
379
32820
  t->PrototypeTemplate()->Set(FIXED_ONE_BYTE_STRING(env->isolate(),
380
                                                    "isStreamBase"),
381
43760
                              True(env->isolate()));
382
10940
  t->PrototypeTemplate()->SetAccessor(
383
      FIXED_ONE_BYTE_STRING(env->isolate(), "onread"),
384
      BaseObject::InternalFieldGet<kOnReadFunctionField>,
385
32820
      BaseObject::InternalFieldSet<kOnReadFunctionField, &Value::IsFunction>);
386
10940
}
387
388
195
void StreamBase::GetFD(const FunctionCallbackInfo<Value>& args) {
389
  // Mimic implementation of StreamBase::GetFD() and UDPWrap::GetFD().
390
390
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
391
195
  if (wrap == nullptr) return args.GetReturnValue().Set(UV_EINVAL);
392
393
195
  if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL);
394
395
585
  args.GetReturnValue().Set(wrap->GetFD());
396
}
397
398
49931
void StreamBase::GetBytesRead(const FunctionCallbackInfo<Value>& args) {
399
99862
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
400
99864
  if (wrap == nullptr) return args.GetReturnValue().Set(0);
401
402
  // uint64_t -> double. 53bits is enough for all real cases.
403
149790
  args.GetReturnValue().Set(static_cast<double>(wrap->bytes_read_));
404
}
405
406
49931
void StreamBase::GetBytesWritten(const FunctionCallbackInfo<Value>& args) {
407
99862
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
408
99862
  if (wrap == nullptr) return args.GetReturnValue().Set(0);
409
410
  // uint64_t -> double. 53bits is enough for all real cases.
411
149793
  args.GetReturnValue().Set(static_cast<double>(wrap->bytes_written_));
412
}
413
414
2
void StreamBase::GetExternal(const FunctionCallbackInfo<Value>& args) {
415
4
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
416
4
  if (wrap == nullptr) return;
417
418
2
  Local<External> ext = External::New(args.GetIsolate(), wrap);
419
4
  args.GetReturnValue().Set(ext);
420
}
421
422
template <int (StreamBase::*Method)(const FunctionCallbackInfo<Value>& args)>
423
950126
void StreamBase::JSMethod(const FunctionCallbackInfo<Value>& args) {
424
1900252
  StreamBase* wrap = StreamBase::FromObject(args.Holder().As<Object>());
425




950127
  if (wrap == nullptr) return;
426
427




950128
  if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL);
428
429
950125
  AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap->GetAsyncWrap());
430
2850375
  args.GetReturnValue().Set((wrap->*Method)(args));
431
}
432
433
7473
int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
434
  // No TryWrite by default
435
7473
  return 0;
436
}
437
438
439
70373
const char* StreamResource::Error() const {
440
70373
  return nullptr;
441
}
442
443
444
void StreamResource::ClearError() {
445
  // No-op
446
}
447
448
308906
uv_buf_t EmitToJSStreamListener::OnStreamAlloc(size_t suggested_size) {
449
308906
  CHECK_NOT_NULL(stream_);
450
308906
  Environment* env = static_cast<StreamBase*>(stream_)->stream_env();
451
308906
  return env->AllocateManaged(suggested_size).release();
452
}
453
454
338360
void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {
455
338360
  CHECK_NOT_NULL(stream_);
456
338360
  StreamBase* stream = static_cast<StreamBase*>(stream_);
457
338360
  Environment* env = stream->stream_env();
458
338360
  HandleScope handle_scope(env->isolate());
459
641536
  Context::Scope context_scope(env->context());
460
641536
  AllocatedBuffer buf(env, buf_);
461
462
338360
  if (nread <= 0)  {
463
35139
    if (nread < 0)
464
35133
      stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
465
373396
    return;
466
  }
467
468
303221
  CHECK_LE(static_cast<size_t>(nread), buf.size());
469
303221
  buf.Resize(nread);
470
471
606397
  stream->CallJSOnreadMethod(nread, buf.ToArrayBuffer());
472
}
473
474
475
24375
void ReportWritesToJSStreamListener::OnStreamAfterReqFinished(
476
    StreamReq* req_wrap, int status) {
477
24375
  StreamBase* stream = static_cast<StreamBase*>(stream_);
478
24375
  Environment* env = stream->stream_env();
479
24375
  AsyncWrap* async_wrap = req_wrap->GetAsyncWrap();
480
24375
  HandleScope handle_scope(env->isolate());
481
24375
  Context::Scope context_scope(env->context());
482
48750
  CHECK(!async_wrap->persistent().IsEmpty());
483
24375
  Local<Object> req_wrap_obj = async_wrap->object();
484
485
  Local<Value> argv[] = {
486
    Integer::New(env->isolate(), status),
487
24375
    stream->GetObject(),
488
    Undefined(env->isolate())
489
121875
  };
490
491
24375
  const char* msg = stream->Error();
492
24375
  if (msg != nullptr) {
493
    argv[2] = OneByteString(env->isolate(), msg);
494
    stream->ClearError();
495
  }
496
497
97500
  if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
498
48746
    async_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
499
24373
}
500
501
4687
void ReportWritesToJSStreamListener::OnStreamAfterWrite(
502
    WriteWrap* req_wrap, int status) {
503
4687
  OnStreamAfterReqFinished(req_wrap, status);
504
4685
}
505
506
19688
void ReportWritesToJSStreamListener::OnStreamAfterShutdown(
507
    ShutdownWrap* req_wrap, int status) {
508
19688
  OnStreamAfterReqFinished(req_wrap, status);
509
19688
}
510
511
512
}  // namespace node