GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/stream_base.cc Lines: 238 249 95.6 %
Date: 2019-08-17 22:35:23 Branches: 187 294 63.6 %

Line Branch Exec Source
1
#include "stream_base.h"  // NOLINT(build/include_inline)
2
#include "stream_base-inl.h"
3
#include "stream_wrap.h"
4
5
#include "node.h"
6
#include "node_buffer.h"
7
#include "node_errors.h"
8
#include "env-inl.h"
9
#include "js_stream.h"
10
#include "string_bytes.h"
11
#include "util-inl.h"
12
#include "v8.h"
13
14
#include <climits>  // INT_MAX
15
16
namespace node {
17
18
using v8::Array;
19
using v8::ArrayBuffer;
20
using v8::Context;
21
using v8::DontDelete;
22
using v8::DontEnum;
23
using v8::External;
24
using v8::Function;
25
using v8::FunctionCallbackInfo;
26
using v8::HandleScope;
27
using v8::Integer;
28
using v8::Local;
29
using v8::Object;
30
using v8::ReadOnly;
31
using v8::String;
32
using v8::Value;
33
34
template int StreamBase::WriteString<ASCII>(
35
    const FunctionCallbackInfo<Value>& args);
36
template int StreamBase::WriteString<UTF8>(
37
    const FunctionCallbackInfo<Value>& args);
38
template int StreamBase::WriteString<UCS2>(
39
    const FunctionCallbackInfo<Value>& args);
40
template int StreamBase::WriteString<LATIN1>(
41
    const FunctionCallbackInfo<Value>& args);
42
43
44
75481
int StreamBase::ReadStartJS(const FunctionCallbackInfo<Value>& args) {
45
75481
  return ReadStart();
46
}
47
48
49
2046
int StreamBase::ReadStopJS(const FunctionCallbackInfo<Value>& args) {
50
2046
  return ReadStop();
51
}
52
53
54
56682
int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
55
113364
  CHECK(args[0]->IsObject());
56
113364
  Local<Object> req_wrap_obj = args[0].As<Object>();
57
58
56682
  return Shutdown(req_wrap_obj);
59
}
60
61
896882
void StreamBase::SetWriteResult(const StreamWriteResult& res) {
62
896882
  env_->stream_base_state()[kBytesWritten] = res.bytes;
63
896882
  env_->stream_base_state()[kLastWriteWasAsync] = res.async;
64
896882
}
65
66
126934
int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
67
126934
  Environment* env = Environment::GetCurrent(args);
68
69
253868
  CHECK(args[0]->IsObject());
70
253868
  CHECK(args[1]->IsArray());
71
72
253868
  Local<Object> req_wrap_obj = args[0].As<Object>();
73
253868
  Local<Array> chunks = args[1].As<Array>();
74
253868
  bool all_buffers = args[2]->IsTrue();
75
76
  size_t count;
77
126934
  if (all_buffers)
78
297
    count = chunks->Length();
79
  else
80
126637
    count = chunks->Length() >> 1;
81
82
126934
  MaybeStackBuffer<uv_buf_t, 16> bufs(count);
83
84
126934
  size_t storage_size = 0;
85
  size_t offset;
86
87
126934
  if (!all_buffers) {
88
    // Determine storage size first
89
665572
    for (size_t i = 0; i < count; i++) {
90
1616805
      Local<Value> chunk = chunks->Get(env->context(), i * 2).ToLocalChecked();
91
92
538935
      if (Buffer::HasInstance(chunk))
93
43477
        continue;
94
        // Buffer chunk, no additional storage required
95
96
      // String chunk
97
1486374
      Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
98
      enum encoding encoding = ParseEncoding(env->isolate(),
99
1486374
          chunks->Get(env->context(), i * 2 + 1).ToLocalChecked());
100
      size_t chunk_size;
101


1779412
      if (encoding == UTF8 && string->Length() > 65535 &&
102

495470
          !StringBytes::Size(env->isolate(), string, encoding).To(&chunk_size))
103
        return 0;
104
1486374
      else if (!StringBytes::StorageSize(env->isolate(), string, encoding)
105
1486374
                    .To(&chunk_size))
106
        return 0;
107
495458
      storage_size += chunk_size;
108
    }
109
110
126637
    if (storage_size > INT_MAX)
111
      return UV_ENOBUFS;
112
  } else {
113
162914
    for (size_t i = 0; i < count; i++) {
114
487851
      Local<Value> chunk = chunks->Get(env->context(), i).ToLocalChecked();
115
162617
      bufs[i].base = Buffer::Data(chunk);
116
162617
      bufs[i].len = Buffer::Length(chunk);
117
    }
118
  }
119
120
253868
  AllocatedBuffer storage;
121
126934
  if (storage_size > 0)
122
126591
    storage = env->AllocateManaged(storage_size);
123
124
126934
  offset = 0;
125
126934
  if (!all_buffers) {
126
665572
    for (size_t i = 0; i < count; i++) {
127
1616805
      Local<Value> chunk = chunks->Get(env->context(), i * 2).ToLocalChecked();
128
129
      // Write buffer
130
538935
      if (Buffer::HasInstance(chunk)) {
131
43477
        bufs[i].base = Buffer::Data(chunk);
132
43477
        bufs[i].len = Buffer::Length(chunk);
133
43477
        continue;
134
      }
135
136
      // Write string
137
495458
      CHECK_LE(offset, storage_size);
138
495458
      char* str_storage = storage.data() + offset;
139
495458
      size_t str_size = storage.size() - offset;
140
141
1486374
      Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
142
      enum encoding encoding = ParseEncoding(env->isolate(),
143
1486374
          chunks->Get(env->context(), i * 2 + 1).ToLocalChecked());
144
      str_size = StringBytes::Write(env->isolate(),
145
                                    str_storage,
146
                                    str_size,
147
                                    string,
148
495458
                                    encoding);
149
495458
      bufs[i].base = str_storage;
150
495458
      bufs[i].len = str_size;
151
495458
      offset += str_size;
152
    }
153
  }
154
155
126934
  StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj);
156
126934
  SetWriteResult(res);
157

126934
  if (res.wrap != nullptr && storage_size > 0) {
158
315
    res.wrap->SetAllocatedStorage(std::move(storage));
159
  }
160
253868
  return res.err;
161
}
162
163
164
41320
int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
165
82640
  CHECK(args[0]->IsObject());
166
167
41320
  Environment* env = Environment::GetCurrent(args);
168
169
82640
  if (!args[1]->IsUint8Array()) {
170
1
    node::THROW_ERR_INVALID_ARG_TYPE(env, "Second argument must be a buffer");
171
1
    return 0;
172
  }
173
174
82638
  Local<Object> req_wrap_obj = args[0].As<Object>();
175
176
  uv_buf_t buf;
177
41319
  buf.base = Buffer::Data(args[1]);
178
41319
  buf.len = Buffer::Length(args[1]);
179
180
41319
  StreamWriteResult res = Write(&buf, 1, nullptr, req_wrap_obj);
181
41319
  SetWriteResult(res);
182
183
41319
  return res.err;
184
}
185
186
187
template <enum encoding enc>
188
728629
int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
189
728629
  Environment* env = Environment::GetCurrent(args);
190


1457258
  CHECK(args[0]->IsObject());
191


2185887
  CHECK(args[1]->IsString());
192
193
1457258
  Local<Object> req_wrap_obj = args[0].As<Object>();
194
1457258
  Local<String> string = args[1].As<String>();
195
  Local<Object> send_handle_obj;
196


1457258
  if (args[2]->IsObject())
197
208
    send_handle_obj = args[2].As<Object>();
198
199
  // Compute the size of the storage that the string will be flattened into.
200
  // For UTF8 strings that are very long, go ahead and take the hit for
201
  // computing their actual size, rather than tripling the storage.
202
  size_t storage_size;
203

2863246
  if (enc == UTF8 && string->Length() > 65535 &&
204

715883
      !StringBytes::Size(env->isolate(), string, enc).To(&storage_size))
205
    return 0;
206


2185887
  else if (!StringBytes::StorageSize(env->isolate(), string, enc)
207
2185887
                .To(&storage_size))
208
    return 0;
209
210


728629
  if (storage_size > INT_MAX)
211
    return UV_ENOBUFS;
212
213
  // Try writing immediately if write size isn't too big
214
  char stack_storage[16384];  // 16kb
215
  size_t data_size;
216
728629
  size_t synchronously_written = 0;
217
  uv_buf_t buf;
218
219
  bool try_write = storage_size <= sizeof(stack_storage) &&
220






730478
                   (!IsIPCPipe() || send_handle_obj.IsEmpty());
221


728629
  if (try_write) {
222
727483
    data_size = StringBytes::Write(env->isolate(),
223
                                   stack_storage,
224
                                   storage_size,
225
                                   string,
226
1454966
                                   enc);
227
727483
    buf = uv_buf_init(stack_storage, data_size);
228
229
727483
    uv_buf_t* bufs = &buf;
230
727483
    size_t count = 1;
231
727483
    const int err = DoTryWrite(&bufs, &count);
232
    // Keep track of the bytes written here, because we're taking a shortcut
233
    // by using `DoTryWrite()` directly instead of using the utilities
234
    // provided by `Write()`.
235


727483
    synchronously_written = count == 0 ? data_size : data_size - buf.len;
236
727483
    bytes_written_ += synchronously_written;
237
238
    // Immediate failure or success
239




727483
    if (err != 0 || count == 0) {
240
725579
      SetWriteResult(StreamWriteResult { false, err, nullptr, data_size });
241
725579
      return err;
242
    }
243
244
    // Partial write
245


1904
    CHECK_EQ(count, 1);
246
  }
247
248
3050
  AllocatedBuffer data;
249
250


3050
  if (try_write) {
251
    // Copy partial data
252
1904
    data = env->AllocateManaged(buf.len);
253
1904
    memcpy(data.data(), buf.base, buf.len);
254
1904
    data_size = buf.len;
255
  } else {
256
    // Write it
257
1146
    data = env->AllocateManaged(storage_size);
258
1146
    data_size = StringBytes::Write(env->isolate(),
259
                                   data.data(),
260
                                   storage_size,
261
                                   string,
262
2292
                                   enc);
263
  }
264
265


3050
  CHECK_LE(data_size, storage_size);
266
267
3050
  buf = uv_buf_init(data.data(), data_size);
268
269
3050
  uv_stream_t* send_handle = nullptr;
270
271






3155
  if (IsIPCPipe() && !send_handle_obj.IsEmpty()) {
272
    HandleWrap* wrap;
273


104
    ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
274
104
    send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
275
    // Reference LibuvStreamWrap instance to prevent it from being garbage
276
    // collected before `AfterWrite` is called.
277
    req_wrap_obj->Set(env->context(),
278
                      env->handle_string(),
279
416
                      send_handle_obj).Check();
280
  }
281
282
3050
  StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
283
3050
  res.bytes += synchronously_written;
284
285
3050
  SetWriteResult(res);
286


3050
  if (res.wrap != nullptr) {
287
3026
    res.wrap->SetAllocatedStorage(std::move(data));
288
  }
289
290
3050
  return res.err;
291
}
292
293
294
372787
void StreamBase::CallJSOnreadMethod(ssize_t nread,
295
                                    Local<ArrayBuffer> ab,
296
                                    size_t offset) {
297
372787
  Environment* env = env_;
298
299
  DCHECK_EQ(static_cast<int32_t>(nread), nread);
300
  DCHECK_LE(offset, INT32_MAX);
301
302
372787
  if (ab.IsEmpty()) {
303
    DCHECK_EQ(offset, 0);
304
    DCHECK_LE(nread, 0);
305
  } else {
306
    DCHECK_GE(nread, 0);
307
  }
308
309
372787
  env->stream_base_state()[kReadBytesOrError] = nread;
310
372787
  env->stream_base_state()[kArrayBufferOffset] = offset;
311
312
  Local<Value> argv[] = {
313
889973
    ab.IsEmpty() ? Undefined(env->isolate()).As<Value>() : ab.As<Value>()
314
1863935
  };
315
316
372787
  AsyncWrap* wrap = GetAsyncWrap();
317
372787
  CHECK_NOT_NULL(wrap);
318
1118361
  Local<Value> onread = wrap->object()->GetInternalField(kOnReadFunctionField);
319
372787
  CHECK(onread->IsFunction());
320
745574
  wrap->MakeCallback(onread.As<Function>(), arraysize(argv), argv);
321
372712
}
322
323
324
3397
bool StreamBase::IsIPCPipe() {
325
3397
  return false;
326
}
327
328
329
int StreamBase::GetFD() {
330
  return -1;
331
}
332
333
334
75179
Local<Object> StreamBase::GetObject() {
335
75179
  return GetAsyncWrap()->object();
336
}
337
338
45304
void StreamBase::AddMethod(Environment* env,
339
                           Local<Signature> signature,
340
                           enum PropertyAttribute attributes,
341
                           Local<FunctionTemplate> t,
342
                           JSMethodFunction* stream_method,
343
                           Local<String> string) {
344
  Local<FunctionTemplate> templ =
345
      env->NewFunctionTemplate(stream_method,
346
                               signature,
347
                               v8::ConstructorBehavior::kThrow,
348
45304
                               v8::SideEffectType::kHasNoSideEffect);
349
135912
  t->PrototypeTemplate()->SetAccessorProperty(
350
90608
      string, templ, Local<FunctionTemplate>(), attributes);
351
45304
}
352
353
11326
void StreamBase::AddMethods(Environment* env, Local<FunctionTemplate> t) {
354
11326
  HandleScope scope(env->isolate());
355
356
  enum PropertyAttribute attributes =
357
11326
      static_cast<PropertyAttribute>(ReadOnly | DontDelete | DontEnum);
358
11326
  Local<Signature> sig = Signature::New(env->isolate(), t);
359
360
11326
  AddMethod(env, sig, attributes, t, GetFD, env->fd_string());
361
  AddMethod(
362
11326
      env, sig, attributes, t, GetExternal, env->external_stream_string());
363
11326
  AddMethod(env, sig, attributes, t, GetBytesRead, env->bytes_read_string());
364
  AddMethod(
365
11326
      env, sig, attributes, t, GetBytesWritten, env->bytes_written_string());
366
11326
  env->SetProtoMethod(t, "readStart", JSMethod<&StreamBase::ReadStartJS>);
367
11326
  env->SetProtoMethod(t, "readStop", JSMethod<&StreamBase::ReadStopJS>);
368
11326
  env->SetProtoMethod(t, "shutdown", JSMethod<&StreamBase::Shutdown>);
369
11326
  env->SetProtoMethod(t, "writev", JSMethod<&StreamBase::Writev>);
370
11326
  env->SetProtoMethod(t, "writeBuffer", JSMethod<&StreamBase::WriteBuffer>);
371
  env->SetProtoMethod(
372
11326
      t, "writeAsciiString", JSMethod<&StreamBase::WriteString<ASCII>>);
373
  env->SetProtoMethod(
374
11326
      t, "writeUtf8String", JSMethod<&StreamBase::WriteString<UTF8>>);
375
  env->SetProtoMethod(
376
11326
      t, "writeUcs2String", JSMethod<&StreamBase::WriteString<UCS2>>);
377
  env->SetProtoMethod(
378
11326
      t, "writeLatin1String", JSMethod<&StreamBase::WriteString<LATIN1>>);
379
33978
  t->PrototypeTemplate()->Set(FIXED_ONE_BYTE_STRING(env->isolate(),
380
                                                    "isStreamBase"),
381
45304
                              True(env->isolate()));
382
11326
  t->PrototypeTemplate()->SetAccessor(
383
      FIXED_ONE_BYTE_STRING(env->isolate(), "onread"),
384
      BaseObject::InternalFieldGet<kOnReadFunctionField>,
385
33978
      BaseObject::InternalFieldSet<kOnReadFunctionField, &Value::IsFunction>);
386
11326
}
387
388
195
void StreamBase::GetFD(const FunctionCallbackInfo<Value>& args) {
389
  // Mimic implementation of StreamBase::GetFD() and UDPWrap::GetFD().
390
390
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
391
195
  if (wrap == nullptr) return args.GetReturnValue().Set(UV_EINVAL);
392
393
195
  if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL);
394
395
585
  args.GetReturnValue().Set(wrap->GetFD());
396
}
397
398
63626
void StreamBase::GetBytesRead(const FunctionCallbackInfo<Value>& args) {
399
127252
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
400
127254
  if (wrap == nullptr) return args.GetReturnValue().Set(0);
401
402
  // uint64_t -> double. 53bits is enough for all real cases.
403
190875
  args.GetReturnValue().Set(static_cast<double>(wrap->bytes_read_));
404
}
405
406
63626
void StreamBase::GetBytesWritten(const FunctionCallbackInfo<Value>& args) {
407
127252
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
408
127252
  if (wrap == nullptr) return args.GetReturnValue().Set(0);
409
410
  // uint64_t -> double. 53bits is enough for all real cases.
411
190878
  args.GetReturnValue().Set(static_cast<double>(wrap->bytes_written_));
412
}
413
414
2
void StreamBase::GetExternal(const FunctionCallbackInfo<Value>& args) {
415
4
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
416
4
  if (wrap == nullptr) return;
417
418
2
  Local<External> ext = External::New(args.GetIsolate(), wrap);
419
4
  args.GetReturnValue().Set(ext);
420
}
421
422
template <int (StreamBase::*Method)(const FunctionCallbackInfo<Value>& args)>
423
1031093
void StreamBase::JSMethod(const FunctionCallbackInfo<Value>& args) {
424
2062186
  StreamBase* wrap = StreamBase::FromObject(args.Holder().As<Object>());
425




1031094
  if (wrap == nullptr) return;
426
427




1031095
  if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL);
428
429
1031092
  AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap->GetAsyncWrap());
430
3093276
  args.GetReturnValue().Set((wrap->*Method)(args));
431
}
432
433
7869
int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
434
  // No TryWrite by default
435
7869
  return 0;
436
}
437
438
439
98236
const char* StreamResource::Error() const {
440
98236
  return nullptr;
441
}
442
443
444
void StreamResource::ClearError() {
445
  // No-op
446
}
447
448
317432
uv_buf_t EmitToJSStreamListener::OnStreamAlloc(size_t suggested_size) {
449
317432
  CHECK_NOT_NULL(stream_);
450
317432
  Environment* env = static_cast<StreamBase*>(stream_)->stream_env();
451
317432
  return env->AllocateManaged(suggested_size).release();
452
}
453
454
360075
void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {
455
360075
  CHECK_NOT_NULL(stream_);
456
360075
  StreamBase* stream = static_cast<StreamBase*>(stream_);
457
360075
  Environment* env = stream->stream_env();
458
360075
  HandleScope handle_scope(env->isolate());
459
671953
  Context::Scope context_scope(env->context());
460
671953
  AllocatedBuffer buf(env, buf_);
461
462
360075
  if (nread <= 0)  {
463
48152
    if (nread < 0)
464
48133
      stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
465
408122
    return;
466
  }
467
468
311923
  CHECK_LE(static_cast<size_t>(nread), buf.size());
469
311923
  buf.Resize(nread);
470
471
623801
  stream->CallJSOnreadMethod(nread, buf.ToArrayBuffer());
472
}
473
474
475
38234
void ReportWritesToJSStreamListener::OnStreamAfterReqFinished(
476
    StreamReq* req_wrap, int status) {
477
38234
  StreamBase* stream = static_cast<StreamBase*>(stream_);
478
38234
  Environment* env = stream->stream_env();
479
38234
  AsyncWrap* async_wrap = req_wrap->GetAsyncWrap();
480
38234
  HandleScope handle_scope(env->isolate());
481
38234
  Context::Scope context_scope(env->context());
482
76468
  CHECK(!async_wrap->persistent().IsEmpty());
483
38234
  Local<Object> req_wrap_obj = async_wrap->object();
484
485
  Local<Value> argv[] = {
486
    Integer::New(env->isolate(), status),
487
38234
    stream->GetObject(),
488
    Undefined(env->isolate())
489
191170
  };
490
491
38234
  const char* msg = stream->Error();
492
38234
  if (msg != nullptr) {
493
    argv[2] = OneByteString(env->isolate(), msg);
494
    stream->ClearError();
495
  }
496
497
152936
  if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
498
76465
    async_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
499
38233
}
500
501
4962
void ReportWritesToJSStreamListener::OnStreamAfterWrite(
502
    WriteWrap* req_wrap, int status) {
503
4962
  OnStreamAfterReqFinished(req_wrap, status);
504
4961
}
505
506
33272
void ReportWritesToJSStreamListener::OnStreamAfterShutdown(
507
    ShutdownWrap* req_wrap, int status) {
508
33272
  OnStreamAfterReqFinished(req_wrap, status);
509
33272
}
510
511
512
}  // namespace node