GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/stream_base.cc Lines: 238 249 95.6 %
Date: 2019-05-05 22:32:45 Branches: 187 294 63.6 %

Line Branch Exec Source
1
#include "stream_base.h"  // NOLINT(build/include_inline)
2
#include "stream_base-inl.h"
3
#include "stream_wrap.h"
4
5
#include "node.h"
6
#include "node_buffer.h"
7
#include "node_errors.h"
8
#include "env-inl.h"
9
#include "js_stream.h"
10
#include "string_bytes.h"
11
#include "util-inl.h"
12
#include "v8.h"
13
14
#include <climits>  // INT_MAX
15
16
namespace node {
17
18
using v8::Array;
19
using v8::ArrayBuffer;
20
using v8::Context;
21
using v8::DontDelete;
22
using v8::DontEnum;
23
using v8::External;
24
using v8::Function;
25
using v8::FunctionCallbackInfo;
26
using v8::HandleScope;
27
using v8::Integer;
28
using v8::Local;
29
using v8::Object;
30
using v8::ReadOnly;
31
using v8::String;
32
using v8::Value;
33
34
template int StreamBase::WriteString<ASCII>(
35
    const FunctionCallbackInfo<Value>& args);
36
template int StreamBase::WriteString<UTF8>(
37
    const FunctionCallbackInfo<Value>& args);
38
template int StreamBase::WriteString<UCS2>(
39
    const FunctionCallbackInfo<Value>& args);
40
template int StreamBase::WriteString<LATIN1>(
41
    const FunctionCallbackInfo<Value>& args);
42
43
44
60708
int StreamBase::ReadStartJS(const FunctionCallbackInfo<Value>& args) {
45
60708
  return ReadStart();
46
}
47
48
49
1533
int StreamBase::ReadStopJS(const FunctionCallbackInfo<Value>& args) {
50
1533
  return ReadStop();
51
}
52
53
54
41577
int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
55
83154
  CHECK(args[0]->IsObject());
56
83154
  Local<Object> req_wrap_obj = args[0].As<Object>();
57
58
41577
  return Shutdown(req_wrap_obj);
59
}
60
61
771243
void StreamBase::SetWriteResult(const StreamWriteResult& res) {
62
771243
  env_->stream_base_state()[kBytesWritten] = res.bytes;
63
771243
  env_->stream_base_state()[kLastWriteWasAsync] = res.async;
64
771243
}
65
66
22477
int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
67
22477
  Environment* env = Environment::GetCurrent(args);
68
69
44954
  CHECK(args[0]->IsObject());
70
44954
  CHECK(args[1]->IsArray());
71
72
44954
  Local<Object> req_wrap_obj = args[0].As<Object>();
73
44954
  Local<Array> chunks = args[1].As<Array>();
74
44954
  bool all_buffers = args[2]->IsTrue();
75
76
  size_t count;
77
22477
  if (all_buffers)
78
280
    count = chunks->Length();
79
  else
80
22197
    count = chunks->Length() >> 1;
81
82
22477
  MaybeStackBuffer<uv_buf_t, 16> bufs(count);
83
84
22477
  size_t storage_size = 0;
85
  size_t offset;
86
87
22477
  if (!all_buffers) {
88
    // Determine storage size first
89
310106
    for (size_t i = 0; i < count; i++) {
90
863727
      Local<Value> chunk = chunks->Get(env->context(), i * 2).ToLocalChecked();
91
92
287909
      if (Buffer::HasInstance(chunk))
93
43399
        continue;
94
        // Buffer chunk, no additional storage required
95
96
      // String chunk
97
733530
      Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
98
      enum encoding encoding = ParseEncoding(env->isolate(),
99
733530
          chunks->Get(env->context(), i * 2 + 1).ToLocalChecked());
100
      size_t chunk_size;
101


921080
      if (encoding == UTF8 && string->Length() > 65535 &&
102

244522
          !StringBytes::Size(env->isolate(), string, encoding).To(&chunk_size))
103
        return 0;
104
733530
      else if (!StringBytes::StorageSize(env->isolate(), string, encoding)
105
733530
                    .To(&chunk_size))
106
        return 0;
107
244510
      storage_size += chunk_size;
108
    }
109
110
22197
    if (storage_size > INT_MAX)
111
      return UV_ENOBUFS;
112
  } else {
113
28564
    for (size_t i = 0; i < count; i++) {
114
84852
      Local<Value> chunk = chunks->Get(env->context(), i).ToLocalChecked();
115
28284
      bufs[i].base = Buffer::Data(chunk);
116
28284
      bufs[i].len = Buffer::Length(chunk);
117
    }
118
  }
119
120
44954
  AllocatedBuffer storage;
121
22477
  if (storage_size > 0)
122
22151
    storage = env->AllocateManaged(storage_size);
123
124
22477
  offset = 0;
125
22477
  if (!all_buffers) {
126
310106
    for (size_t i = 0; i < count; i++) {
127
863727
      Local<Value> chunk = chunks->Get(env->context(), i * 2).ToLocalChecked();
128
129
      // Write buffer
130
287909
      if (Buffer::HasInstance(chunk)) {
131
43399
        bufs[i].base = Buffer::Data(chunk);
132
43399
        bufs[i].len = Buffer::Length(chunk);
133
43399
        continue;
134
      }
135
136
      // Write string
137
244510
      CHECK_LE(offset, storage_size);
138
244510
      char* str_storage = storage.data() + offset;
139
244510
      size_t str_size = storage.size() - offset;
140
141
733530
      Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
142
      enum encoding encoding = ParseEncoding(env->isolate(),
143
733530
          chunks->Get(env->context(), i * 2 + 1).ToLocalChecked());
144
      str_size = StringBytes::Write(env->isolate(),
145
                                    str_storage,
146
                                    str_size,
147
                                    string,
148
244510
                                    encoding);
149
244510
      bufs[i].base = str_storage;
150
244510
      bufs[i].len = str_size;
151
244510
      offset += str_size;
152
    }
153
  }
154
155
22477
  StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj);
156
22477
  SetWriteResult(res);
157

22477
  if (res.wrap != nullptr && storage_size > 0) {
158
266
    res.wrap->SetAllocatedStorage(std::move(storage));
159
  }
160
44954
  return res.err;
161
}
162
163
164
67903
int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
165
135806
  CHECK(args[0]->IsObject());
166
167
67903
  Environment* env = Environment::GetCurrent(args);
168
169
135806
  if (!args[1]->IsUint8Array()) {
170
1
    node::THROW_ERR_INVALID_ARG_TYPE(env, "Second argument must be a buffer");
171
1
    return 0;
172
  }
173
174
135804
  Local<Object> req_wrap_obj = args[0].As<Object>();
175
176
  uv_buf_t buf;
177
67902
  buf.base = Buffer::Data(args[1]);
178
67902
  buf.len = Buffer::Length(args[1]);
179
180
67902
  StreamWriteResult res = Write(&buf, 1, nullptr, req_wrap_obj);
181
67902
  SetWriteResult(res);
182
183
67902
  return res.err;
184
}
185
186
187
template <enum encoding enc>
188
680864
int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
189
680864
  Environment* env = Environment::GetCurrent(args);
190


1361728
  CHECK(args[0]->IsObject());
191


2042592
  CHECK(args[1]->IsString());
192
193
1361728
  Local<Object> req_wrap_obj = args[0].As<Object>();
194
1361728
  Local<String> string = args[1].As<String>();
195
  Local<Object> send_handle_obj;
196


1361728
  if (args[2]->IsObject())
197
206
    send_handle_obj = args[2].As<Object>();
198
199
  // Compute the size of the storage that the string will be flattened into.
200
  // For UTF8 strings that are very long, go ahead and take the hit for
201
  // computing their actual size, rather than tripling the storage.
202
  size_t storage_size;
203

2672502
  if (enc == UTF8 && string->Length() > 65535 &&
204

668197
      !StringBytes::Size(env->isolate(), string, enc).To(&storage_size))
205
    return 0;
206


2042592
  else if (!StringBytes::StorageSize(env->isolate(), string, enc)
207
2042592
                .To(&storage_size))
208
    return 0;
209
210


680864
  if (storage_size > INT_MAX)
211
    return UV_ENOBUFS;
212
213
  // Try writing immediately if write size isn't too big
214
  char stack_storage[16384];  // 16kb
215
  size_t data_size;
216
680864
  size_t synchronously_written = 0;
217
  uv_buf_t buf;
218
219
  bool try_write = storage_size <= sizeof(stack_storage) &&
220






682705
                   (!IsIPCPipe() || send_handle_obj.IsEmpty());
221


680864
  if (try_write) {
222
680569
    data_size = StringBytes::Write(env->isolate(),
223
                                   stack_storage,
224
                                   storage_size,
225
                                   string,
226
1361138
                                   enc);
227
680569
    buf = uv_buf_init(stack_storage, data_size);
228
229
680569
    uv_buf_t* bufs = &buf;
230
680569
    size_t count = 1;
231
680569
    const int err = DoTryWrite(&bufs, &count);
232
    // Keep track of the bytes written here, because we're taking a shortcut
233
    // by using `DoTryWrite()` directly instead of using the utilities
234
    // provided by `Write()`.
235


680569
    synchronously_written = count == 0 ? data_size : data_size - buf.len;
236
680569
    bytes_written_ += synchronously_written;
237
238
    // Immediate failure or success
239




680569
    if (err != 0 || count == 0) {
240
678757
      SetWriteResult(StreamWriteResult { false, err, nullptr, data_size });
241
678757
      return err;
242
    }
243
244
    // Partial write
245


1812
    CHECK_EQ(count, 1);
246
  }
247
248
2107
  AllocatedBuffer data;
249
250


2107
  if (try_write) {
251
    // Copy partial data
252
1812
    data = env->AllocateManaged(buf.len);
253
1812
    memcpy(data.data(), buf.base, buf.len);
254
1812
    data_size = buf.len;
255
  } else {
256
    // Write it
257
295
    data = env->AllocateManaged(storage_size);
258
295
    data_size = StringBytes::Write(env->isolate(),
259
                                   data.data(),
260
                                   storage_size,
261
                                   string,
262
590
                                   enc);
263
  }
264
265


2107
  CHECK_LE(data_size, storage_size);
266
267
2107
  buf = uv_buf_init(data.data(), data_size);
268
269
2107
  uv_stream_t* send_handle = nullptr;
270
271






2212
  if (IsIPCPipe() && !send_handle_obj.IsEmpty()) {
272
    HandleWrap* wrap;
273


103
    ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
274
103
    send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
275
    // Reference LibuvStreamWrap instance to prevent it from being garbage
276
    // collected before `AfterWrite` is called.
277
    req_wrap_obj->Set(env->context(),
278
                      env->handle_string(),
279
412
                      send_handle_obj).Check();
280
  }
281
282
2107
  StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
283
2107
  res.bytes += synchronously_written;
284
285
2107
  SetWriteResult(res);
286


2107
  if (res.wrap != nullptr) {
287
1934
    res.wrap->SetAllocatedStorage(std::move(data));
288
  }
289
290
2107
  return res.err;
291
}
292
293
294
352513
void StreamBase::CallJSOnreadMethod(ssize_t nread,
295
                                    Local<ArrayBuffer> ab,
296
                                    size_t offset) {
297
352513
  Environment* env = env_;
298
299
  DCHECK_EQ(static_cast<int32_t>(nread), nread);
300
  DCHECK_LE(offset, INT32_MAX);
301
302
352513
  if (ab.IsEmpty()) {
303
    DCHECK_EQ(offset, 0);
304
    DCHECK_LE(nread, 0);
305
  } else {
306
    DCHECK_GE(nread, 0);
307
  }
308
309
352513
  env->stream_base_state()[kReadBytesOrError] = nread;
310
352513
  env->stream_base_state()[kArrayBufferOffset] = offset;
311
312
  Local<Value> argv[] = {
313
758798
    ab.IsEmpty() ? Undefined(env->isolate()).As<Value>() : ab.As<Value>()
314
1762565
  };
315
316
352513
  AsyncWrap* wrap = GetAsyncWrap();
317
352513
  CHECK_NOT_NULL(wrap);
318
1057539
  Local<Value> onread = wrap->object()->GetInternalField(kOnReadFunctionField);
319
352513
  CHECK(onread->IsFunction());
320
705026
  wrap->MakeCallback(onread.As<Function>(), arraysize(argv), argv);
321
352442
}
322
323
324
2353
bool StreamBase::IsIPCPipe() {
325
2353
  return false;
326
}
327
328
329
int StreamBase::GetFD() {
330
  return -1;
331
}
332
333
334
57732
Local<Object> StreamBase::GetObject() {
335
57732
  return GetAsyncWrap()->object();
336
}
337
338
40508
void StreamBase::AddMethod(Environment* env,
339
                           Local<Signature> signature,
340
                           enum PropertyAttribute attributes,
341
                           Local<FunctionTemplate> t,
342
                           JSMethodFunction* stream_method,
343
                           Local<String> string) {
344
  Local<FunctionTemplate> templ =
345
      env->NewFunctionTemplate(stream_method,
346
                               signature,
347
                               v8::ConstructorBehavior::kThrow,
348
40508
                               v8::SideEffectType::kHasNoSideEffect);
349
121524
  t->PrototypeTemplate()->SetAccessorProperty(
350
81016
      string, templ, Local<FunctionTemplate>(), attributes);
351
40508
}
352
353
10127
void StreamBase::AddMethods(Environment* env, Local<FunctionTemplate> t) {
354
10127
  HandleScope scope(env->isolate());
355
356
  enum PropertyAttribute attributes =
357
10127
      static_cast<PropertyAttribute>(ReadOnly | DontDelete | DontEnum);
358
10127
  Local<Signature> sig = Signature::New(env->isolate(), t);
359
360
10127
  AddMethod(env, sig, attributes, t, GetFD, env->fd_string());
361
  AddMethod(
362
10127
      env, sig, attributes, t, GetExternal, env->external_stream_string());
363
10127
  AddMethod(env, sig, attributes, t, GetBytesRead, env->bytes_read_string());
364
  AddMethod(
365
10127
      env, sig, attributes, t, GetBytesWritten, env->bytes_written_string());
366
10127
  env->SetProtoMethod(t, "readStart", JSMethod<&StreamBase::ReadStartJS>);
367
10127
  env->SetProtoMethod(t, "readStop", JSMethod<&StreamBase::ReadStopJS>);
368
10127
  env->SetProtoMethod(t, "shutdown", JSMethod<&StreamBase::Shutdown>);
369
10127
  env->SetProtoMethod(t, "writev", JSMethod<&StreamBase::Writev>);
370
10127
  env->SetProtoMethod(t, "writeBuffer", JSMethod<&StreamBase::WriteBuffer>);
371
  env->SetProtoMethod(
372
10127
      t, "writeAsciiString", JSMethod<&StreamBase::WriteString<ASCII>>);
373
  env->SetProtoMethod(
374
10127
      t, "writeUtf8String", JSMethod<&StreamBase::WriteString<UTF8>>);
375
  env->SetProtoMethod(
376
10127
      t, "writeUcs2String", JSMethod<&StreamBase::WriteString<UCS2>>);
377
  env->SetProtoMethod(
378
10127
      t, "writeLatin1String", JSMethod<&StreamBase::WriteString<LATIN1>>);
379
30381
  t->PrototypeTemplate()->Set(FIXED_ONE_BYTE_STRING(env->isolate(),
380
                                                    "isStreamBase"),
381
40508
                              True(env->isolate()));
382
10127
  t->PrototypeTemplate()->SetAccessor(
383
      FIXED_ONE_BYTE_STRING(env->isolate(), "onread"),
384
      BaseObject::InternalFieldGet<kOnReadFunctionField>,
385
30381
      BaseObject::InternalFieldSet<kOnReadFunctionField, &Value::IsFunction>);
386
10127
}
387
388
195
void StreamBase::GetFD(const FunctionCallbackInfo<Value>& args) {
389
  // Mimic implementation of StreamBase::GetFD() and UDPWrap::GetFD().
390
390
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
391
195
  if (wrap == nullptr) return args.GetReturnValue().Set(UV_EINVAL);
392
393
195
  if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL);
394
395
585
  args.GetReturnValue().Set(wrap->GetFD());
396
}
397
398
50648
void StreamBase::GetBytesRead(const FunctionCallbackInfo<Value>& args) {
399
101296
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
400
101298
  if (wrap == nullptr) return args.GetReturnValue().Set(0);
401
402
  // uint64_t -> double. 53bits is enough for all real cases.
403
151941
  args.GetReturnValue().Set(static_cast<double>(wrap->bytes_read_));
404
}
405
406
50648
void StreamBase::GetBytesWritten(const FunctionCallbackInfo<Value>& args) {
407
101296
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
408
101296
  if (wrap == nullptr) return args.GetReturnValue().Set(0);
409
410
  // uint64_t -> double. 53bits is enough for all real cases.
411
151944
  args.GetReturnValue().Set(static_cast<double>(wrap->bytes_written_));
412
}
413
414
2
void StreamBase::GetExternal(const FunctionCallbackInfo<Value>& args) {
415
4
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
416
4
  if (wrap == nullptr) return;
417
418
2
  Local<External> ext = External::New(args.GetIsolate(), wrap);
419
4
  args.GetReturnValue().Set(ext);
420
}
421
422
template <int (StreamBase::*Method)(const FunctionCallbackInfo<Value>& args)>
423
875063
void StreamBase::JSMethod(const FunctionCallbackInfo<Value>& args) {
424
1750126
  StreamBase* wrap = StreamBase::FromObject(args.Holder().As<Object>());
425




875064
  if (wrap == nullptr) return;
426
427




875065
  if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL);
428
429
875062
  AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap->GetAsyncWrap());
430
2625186
  args.GetReturnValue().Set((wrap->*Method)(args));
431
}
432
433
6477
int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
434
  // No TryWrite by default
435
6477
  return 0;
436
}
437
438
439
68048
const char* StreamResource::Error() const {
440
68048
  return nullptr;
441
}
442
443
444
void StreamResource::ClearError() {
445
  // No-op
446
}
447
448
330056
uv_buf_t EmitToJSStreamListener::OnStreamAlloc(size_t suggested_size) {
449
330056
  CHECK_NOT_NULL(stream_);
450
330056
  Environment* env = static_cast<StreamBase*>(stream_)->stream_env();
451
330056
  return env->AllocateManaged(suggested_size).release();
452
}
453
454
342348
void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {
455
342348
  CHECK_NOT_NULL(stream_);
456
342348
  StreamBase* stream = static_cast<StreamBase*>(stream_);
457
342348
  Environment* env = stream->stream_env();
458
342348
  HandleScope handle_scope(env->isolate());
459
666667
  Context::Scope context_scope(env->context());
460
666667
  AllocatedBuffer buf(env, buf_);
461
462
342348
  if (nread <= 0)  {
463
17986
    if (nread < 0)
464
17924
      stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
465
360235
    return;
466
  }
467
468
324362
  CHECK_LE(static_cast<size_t>(nread), buf.size());
469
324362
  buf.Resize(nread);
470
471
648681
  stream->CallJSOnreadMethod(nread, buf.ToArrayBuffer());
472
}
473
474
475
24168
void ReportWritesToJSStreamListener::OnStreamAfterReqFinished(
476
    StreamReq* req_wrap, int status) {
477
24168
  StreamBase* stream = static_cast<StreamBase*>(stream_);
478
24168
  Environment* env = stream->stream_env();
479
24168
  AsyncWrap* async_wrap = req_wrap->GetAsyncWrap();
480
24168
  HandleScope handle_scope(env->isolate());
481
24168
  Context::Scope context_scope(env->context());
482
48336
  CHECK(!async_wrap->persistent().IsEmpty());
483
24168
  Local<Object> req_wrap_obj = async_wrap->object();
484
485
  Local<Value> argv[] = {
486
    Integer::New(env->isolate(), status),
487
24168
    stream->GetObject(),
488
    Undefined(env->isolate())
489
120840
  };
490
491
24168
  const char* msg = stream->Error();
492
24168
  if (msg != nullptr) {
493
    argv[2] = OneByteString(env->isolate(), msg);
494
    stream->ClearError();
495
  }
496
497
96672
  if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
498
48331
    async_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
499
24165
}
500
501
3680
void ReportWritesToJSStreamListener::OnStreamAfterWrite(
502
    WriteWrap* req_wrap, int status) {
503
3680
  OnStreamAfterReqFinished(req_wrap, status);
504
3677
}
505
506
20488
void ReportWritesToJSStreamListener::OnStreamAfterShutdown(
507
    ShutdownWrap* req_wrap, int status) {
508
20488
  OnStreamAfterReqFinished(req_wrap, status);
509
20488
}
510
511
512
}  // namespace node