GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/stream_base.cc Lines: 321 337 95.3 %
Date: 2021-04-26 04:12:24 Branches: 211 336 62.8 %

Line Branch Exec Source
1
#include "stream_base.h"  // NOLINT(build/include_inline)
2
#include "stream_base-inl.h"
3
#include "stream_wrap.h"
4
#include "allocated_buffer-inl.h"
5
6
#include "env-inl.h"
7
#include "js_stream.h"
8
#include "node.h"
9
#include "node_buffer.h"
10
#include "node_errors.h"
11
#include "node_external_reference.h"
12
#include "string_bytes.h"
13
#include "util-inl.h"
14
#include "v8.h"
15
16
#include <climits>  // INT_MAX
17
18
namespace node {
19
20
using v8::Array;
21
using v8::ArrayBuffer;
22
using v8::ConstructorBehavior;
23
using v8::Context;
24
using v8::DontDelete;
25
using v8::DontEnum;
26
using v8::External;
27
using v8::Function;
28
using v8::FunctionCallbackInfo;
29
using v8::FunctionTemplate;
30
using v8::HandleScope;
31
using v8::Integer;
32
using v8::Local;
33
using v8::MaybeLocal;
34
using v8::Object;
35
using v8::PropertyAttribute;
36
using v8::ReadOnly;
37
using v8::SideEffectType;
38
using v8::Signature;
39
using v8::String;
40
using v8::Value;
41
42
template int StreamBase::WriteString<ASCII>(
43
    const FunctionCallbackInfo<Value>& args);
44
template int StreamBase::WriteString<UTF8>(
45
    const FunctionCallbackInfo<Value>& args);
46
template int StreamBase::WriteString<UCS2>(
47
    const FunctionCallbackInfo<Value>& args);
48
template int StreamBase::WriteString<LATIN1>(
49
    const FunctionCallbackInfo<Value>& args);
50
51
52
54497
int StreamBase::ReadStartJS(const FunctionCallbackInfo<Value>& args) {
53
54497
  return ReadStart();
54
}
55
56
57
32474
int StreamBase::ReadStopJS(const FunctionCallbackInfo<Value>& args) {
58
32474
  return ReadStop();
59
}
60
61
12
int StreamBase::UseUserBuffer(const FunctionCallbackInfo<Value>& args) {
62
12
  CHECK(Buffer::HasInstance(args[0]));
63
64
24
  uv_buf_t buf = uv_buf_init(Buffer::Data(args[0]), Buffer::Length(args[0]));
65
12
  PushStreamListener(new CustomBufferJSListener(buf));
66
12
  return 0;
67
}
68
69
30884
int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
70
61768
  CHECK(args[0]->IsObject());
71
61768
  Local<Object> req_wrap_obj = args[0].As<Object>();
72
73
30884
  return Shutdown(req_wrap_obj);
74
}
75
76
335995
void StreamBase::SetWriteResult(const StreamWriteResult& res) {
77
335995
  env_->stream_base_state()[kBytesWritten] = res.bytes;
78
335995
  env_->stream_base_state()[kLastWriteWasAsync] = res.async;
79
335995
}
80
81
13103
int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
82
13103
  Environment* env = Environment::GetCurrent(args);
83
84
26206
  CHECK(args[0]->IsObject());
85
26206
  CHECK(args[1]->IsArray());
86
87
26206
  Local<Object> req_wrap_obj = args[0].As<Object>();
88
26206
  Local<Array> chunks = args[1].As<Array>();
89
26206
  bool all_buffers = args[2]->IsTrue();
90
91
  size_t count;
92
13103
  if (all_buffers)
93
317
    count = chunks->Length();
94
  else
95
12786
    count = chunks->Length() >> 1;
96
97
26206
  MaybeStackBuffer<uv_buf_t, 16> bufs(count);
98
99
13103
  size_t storage_size = 0;
100
  size_t offset;
101
102
13103
  if (!all_buffers) {
103
    // Determine storage size first
104
83381
    for (size_t i = 0; i < count; i++) {
105
211785
      Local<Value> chunk = chunks->Get(env->context(), i * 2).ToLocalChecked();
106
107
70595
      if (Buffer::HasInstance(chunk))
108
30086
        continue;
109
        // Buffer chunk, no additional storage required
110
111
      // String chunk
112
121527
      Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
113
40509
      enum encoding encoding = ParseEncoding(env->isolate(),
114
121527
          chunks->Get(env->context(), i * 2 + 1).ToLocalChecked());
115
      size_t chunk_size;
116


88755
      if (encoding == UTF8 && string->Length() > 65535 &&
117
40535
          !StringBytes::Size(env->isolate(), string, encoding).To(&chunk_size))
118
        return 0;
119
81018
      else if (!StringBytes::StorageSize(env->isolate(), string, encoding)
120
40509
                    .To(&chunk_size))
121
        return 0;
122
40509
      storage_size += chunk_size;
123
    }
124
125
12786
    if (storage_size > INT_MAX)
126
      return UV_ENOBUFS;
127
  } else {
128
27288
    for (size_t i = 0; i < count; i++) {
129
80913
      Local<Value> chunk = chunks->Get(env->context(), i).ToLocalChecked();
130
26971
      bufs[i].base = Buffer::Data(chunk);
131
26971
      bufs[i].len = Buffer::Length(chunk);
132
    }
133
  }
134
135
26206
  AllocatedBuffer storage;
136
13103
  if (storage_size > 0)
137
12740
    storage = AllocatedBuffer::AllocateManaged(env, storage_size);
138
139
13103
  offset = 0;
140
13103
  if (!all_buffers) {
141
83381
    for (size_t i = 0; i < count; i++) {
142
211785
      Local<Value> chunk = chunks->Get(env->context(), i * 2).ToLocalChecked();
143
144
      // Write buffer
145
70595
      if (Buffer::HasInstance(chunk)) {
146
30086
        bufs[i].base = Buffer::Data(chunk);
147
30086
        bufs[i].len = Buffer::Length(chunk);
148
30086
        continue;
149
      }
150
151
      // Write string
152
40509
      CHECK_LE(offset, storage_size);
153
40509
      char* str_storage = storage.data() + offset;
154
40509
      size_t str_size = storage.size() - offset;
155
156
121527
      Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
157
40509
      enum encoding encoding = ParseEncoding(env->isolate(),
158
121527
          chunks->Get(env->context(), i * 2 + 1).ToLocalChecked());
159
81018
      str_size = StringBytes::Write(env->isolate(),
160
                                    str_storage,
161
                                    str_size,
162
                                    string,
163
40509
                                    encoding);
164
40509
      bufs[i].base = str_storage;
165
40509
      bufs[i].len = str_size;
166
40509
      offset += str_size;
167
    }
168
  }
169
170
26206
  StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj);
171
13103
  SetWriteResult(res);
172

13103
  if (res.wrap != nullptr && storage_size > 0) {
173
312
    res.wrap->SetAllocatedStorage(std::move(storage));
174
  }
175
13103
  return res.err;
176
}
177
178
179
46014
int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
180
92028
  CHECK(args[0]->IsObject());
181
182
46014
  Environment* env = Environment::GetCurrent(args);
183
184
92028
  if (!args[1]->IsUint8Array()) {
185
1
    node::THROW_ERR_INVALID_ARG_TYPE(env, "Second argument must be a buffer");
186
1
    return 0;
187
  }
188
189
92026
  Local<Object> req_wrap_obj = args[0].As<Object>();
190
  uv_buf_t buf;
191
46013
  buf.base = Buffer::Data(args[1]);
192
46013
  buf.len = Buffer::Length(args[1]);
193
194
46013
  uv_stream_t* send_handle = nullptr;
195
196

92026
  if (args[2]->IsObject() && IsIPCPipe()) {
197
    Local<Object> send_handle_obj = args[2].As<Object>();
198
199
    HandleWrap* wrap;
200
    ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
201
    send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
202
    // Reference LibuvStreamWrap instance to prevent it from being garbage
203
    // collected before `AfterWrite` is called.
204
    req_wrap_obj->Set(env->context(),
205
                      env->handle_string(),
206
                      send_handle_obj).Check();
207
  }
208
209
92026
  StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
210
46013
  SetWriteResult(res);
211
212
46013
  return res.err;
213
}
214
215
216
template <enum encoding enc>
217
276879
int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
218
276879
  Environment* env = Environment::GetCurrent(args);
219


553758
  CHECK(args[0]->IsObject());
220


830637
  CHECK(args[1]->IsString());
221
222
553758
  Local<Object> req_wrap_obj = args[0].As<Object>();
223
553758
  Local<String> string = args[1].As<String>();
224
  Local<Object> send_handle_obj;
225


553758
  if (args[2]->IsObject())
226
212
    send_handle_obj = args[2].As<Object>();
227
228
  // Compute the size of the storage that the string will be flattened into.
229
  // For UTF8 strings that are very long, go ahead and take the hit for
230
  // computing their actual size, rather than tripling the storage.
231
  size_t storage_size;
232



550733
  if (enc == UTF8 && string->Length() > 65535 &&
233
273925
      !StringBytes::Size(env->isolate(), string, enc).To(&storage_size))
234
    return 0;
235


553758
  else if (!StringBytes::StorageSize(env->isolate(), string, enc)
236
276879
                .To(&storage_size))
237
    return 0;
238
239


276879
  if (storage_size > INT_MAX)
240
    return UV_ENOBUFS;
241
242
  // Try writing immediately if write size isn't too big
243
  char stack_storage[16384];  // 16kb
244
  size_t data_size;
245
276879
  size_t synchronously_written = 0;
246
  uv_buf_t buf;
247
248




828333
  bool try_write = storage_size <= sizeof(stack_storage) &&
249


558034
                   (!IsIPCPipe() || send_handle_obj.IsEmpty());
250


276879
  if (try_write) {
251
275675
    data_size = StringBytes::Write(env->isolate(),
252
                                   stack_storage,
253
                                   storage_size,
254
                                   string,
255
                                   enc);
256
275675
    buf = uv_buf_init(stack_storage, data_size);
257
258
275675
    uv_buf_t* bufs = &buf;
259
275675
    size_t count = 1;
260
275675
    const int err = DoTryWrite(&bufs, &count);
261
    // Keep track of the bytes written here, because we're taking a shortcut
262
    // by using `DoTryWrite()` directly instead of using the utilities
263
    // provided by `Write()`.
264


275675
    synchronously_written = count == 0 ? data_size : data_size - buf.len;
265
275675
    bytes_written_ += synchronously_written;
266
267
    // Immediate failure or success
268




275675
    if (err != 0 || count == 0) {
269
273899
      SetWriteResult(StreamWriteResult { false, err, nullptr, data_size, {} });
270
273899
      return err;
271
    }
272
273
    // Partial write
274


1776
    CHECK_EQ(count, 1);
275
  }
276
277
5960
  AllocatedBuffer data;
278
279


2980
  if (try_write) {
280
    // Copy partial data
281
1776
    data = AllocatedBuffer::AllocateManaged(env, buf.len);
282
1776
    memcpy(data.data(), buf.base, buf.len);
283
1776
    data_size = buf.len;
284
  } else {
285
    // Write it
286
1204
    data = AllocatedBuffer::AllocateManaged(env, storage_size);
287
1204
    data_size = StringBytes::Write(env->isolate(),
288
                                   data.data(),
289
                                   storage_size,
290
                                   string,
291
                                   enc);
292
  }
293
294


2980
  CHECK_LE(data_size, storage_size);
295
296
2980
  buf = uv_buf_init(data.data(), data_size);
297
298
2980
  uv_stream_t* send_handle = nullptr;
299
300






3088
  if (IsIPCPipe() && !send_handle_obj.IsEmpty()) {
301
    HandleWrap* wrap;
302


106
    ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
303
106
    send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
304
    // Reference LibuvStreamWrap instance to prevent it from being garbage
305
    // collected before `AfterWrite` is called.
306
424
    req_wrap_obj->Set(env->context(),
307
                      env->handle_string(),
308
                      send_handle_obj).Check();
309
  }
310
311
5960
  StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
312
2980
  res.bytes += synchronously_written;
313
314
2980
  SetWriteResult(res);
315


2980
  if (res.wrap != nullptr) {
316
2959
    res.wrap->SetAllocatedStorage(std::move(data));
317
  }
318
319
2980
  return res.err;
320
}
321
322
323
312994
MaybeLocal<Value> StreamBase::CallJSOnreadMethod(ssize_t nread,
324
                                                 Local<ArrayBuffer> ab,
325
                                                 size_t offset,
326
                                                 StreamBaseJSChecks checks) {
327
312994
  Environment* env = env_;
328
329
  DCHECK_EQ(static_cast<int32_t>(nread), nread);
330
  DCHECK_LE(offset, INT32_MAX);
331
332
312994
  if (checks == DONT_SKIP_NREAD_CHECKS) {
333
310904
    if (ab.IsEmpty()) {
334
      DCHECK_EQ(offset, 0);
335
      DCHECK_LE(nread, 0);
336
    } else {
337
      DCHECK_GE(nread, 0);
338
    }
339
  }
340
341
312994
  env->stream_base_state()[kReadBytesOrError] = static_cast<int32_t>(nread);
342
312994
  env->stream_base_state()[kArrayBufferOffset] = offset;
343
344
  Local<Value> argv[] = {
345
670324
    ab.IsEmpty() ? Undefined(env->isolate()).As<Value>() : ab.As<Value>()
346
1251976
  };
347
348
312994
  AsyncWrap* wrap = GetAsyncWrap();
349
312994
  CHECK_NOT_NULL(wrap);
350
625988
  Local<Value> onread = wrap->object()->GetInternalField(
351
625988
      StreamBase::kOnReadFunctionField);
352
312994
  CHECK(onread->IsFunction());
353
625988
  return wrap->MakeCallback(onread.As<Function>(), arraysize(argv), argv);
354
}
355
356
357
3495
bool StreamBase::IsIPCPipe() {
358
3495
  return false;
359
}
360
361
362
int StreamBase::GetFD() {
363
  return -1;
364
}
365
366
367
49643
Local<Object> StreamBase::GetObject() {
368
49643
  return GetAsyncWrap()->object();
369
}
370
371
25588
void StreamBase::AddMethod(Environment* env,
372
                           Local<Signature> signature,
373
                           enum PropertyAttribute attributes,
374
                           Local<FunctionTemplate> t,
375
                           JSMethodFunction* stream_method,
376
                           Local<String> string) {
377
  Local<FunctionTemplate> templ =
378
      env->NewFunctionTemplate(stream_method,
379
                               signature,
380
                               ConstructorBehavior::kThrow,
381
25588
                               SideEffectType::kHasNoSideEffect);
382
76764
  t->PrototypeTemplate()->SetAccessorProperty(
383
25588
      string, templ, Local<FunctionTemplate>(), attributes);
384
25588
}
385
386
6397
void StreamBase::AddMethods(Environment* env, Local<FunctionTemplate> t) {
387
12794
  HandleScope scope(env->isolate());
388
389
  enum PropertyAttribute attributes =
390
6397
      static_cast<PropertyAttribute>(ReadOnly | DontDelete | DontEnum);
391
6397
  Local<Signature> sig = Signature::New(env->isolate(), t);
392
393
6397
  AddMethod(env, sig, attributes, t, GetFD, env->fd_string());
394
6397
  AddMethod(
395
6397
      env, sig, attributes, t, GetExternal, env->external_stream_string());
396
6397
  AddMethod(env, sig, attributes, t, GetBytesRead, env->bytes_read_string());
397
6397
  AddMethod(
398
6397
      env, sig, attributes, t, GetBytesWritten, env->bytes_written_string());
399
6397
  env->SetProtoMethod(t, "readStart", JSMethod<&StreamBase::ReadStartJS>);
400
6397
  env->SetProtoMethod(t, "readStop", JSMethod<&StreamBase::ReadStopJS>);
401
6397
  env->SetProtoMethod(t, "shutdown", JSMethod<&StreamBase::Shutdown>);
402
  env->SetProtoMethod(t,
403
                      "useUserBuffer",
404
6397
                      JSMethod<&StreamBase::UseUserBuffer>);
405
6397
  env->SetProtoMethod(t, "writev", JSMethod<&StreamBase::Writev>);
406
6397
  env->SetProtoMethod(t, "writeBuffer", JSMethod<&StreamBase::WriteBuffer>);
407
  env->SetProtoMethod(
408
6397
      t, "writeAsciiString", JSMethod<&StreamBase::WriteString<ASCII>>);
409
  env->SetProtoMethod(
410
6397
      t, "writeUtf8String", JSMethod<&StreamBase::WriteString<UTF8>>);
411
  env->SetProtoMethod(
412
6397
      t, "writeUcs2String", JSMethod<&StreamBase::WriteString<UCS2>>);
413
  env->SetProtoMethod(
414
6397
      t, "writeLatin1String", JSMethod<&StreamBase::WriteString<LATIN1>>);
415
31985
  t->PrototypeTemplate()->Set(FIXED_ONE_BYTE_STRING(env->isolate(),
416
                                                    "isStreamBase"),
417
6397
                              True(env->isolate()));
418
25588
  t->PrototypeTemplate()->SetAccessor(
419
      FIXED_ONE_BYTE_STRING(env->isolate(), "onread"),
420
      BaseObject::InternalFieldGet<
421
          StreamBase::kOnReadFunctionField>,
422
      BaseObject::InternalFieldSet<
423
          StreamBase::kOnReadFunctionField,
424
6397
          &Value::IsFunction>);
425
6397
}
426
427
4758
void StreamBase::RegisterExternalReferences(
428
    ExternalReferenceRegistry* registry) {
429
4758
  registry->Register(GetFD);
430
4758
  registry->Register(GetExternal);
431
4758
  registry->Register(GetBytesRead);
432
4758
  registry->Register(GetBytesWritten);
433
4758
  registry->Register(JSMethod<&StreamBase::ReadStartJS>);
434
4758
  registry->Register(JSMethod<&StreamBase::ReadStopJS>);
435
4758
  registry->Register(JSMethod<&StreamBase::Shutdown>);
436
4758
  registry->Register(JSMethod<&StreamBase::UseUserBuffer>);
437
4758
  registry->Register(JSMethod<&StreamBase::Writev>);
438
4758
  registry->Register(JSMethod<&StreamBase::WriteBuffer>);
439
4758
  registry->Register(JSMethod<&StreamBase::WriteString<ASCII>>);
440
4758
  registry->Register(JSMethod<&StreamBase::WriteString<UTF8>>);
441
4758
  registry->Register(JSMethod<&StreamBase::WriteString<UCS2>>);
442
4758
  registry->Register(JSMethod<&StreamBase::WriteString<LATIN1>>);
443
  registry->Register(
444
4758
      BaseObject::InternalFieldGet<StreamBase::kOnReadFunctionField>);
445
  registry->Register(
446
      BaseObject::InternalFieldSet<StreamBase::kOnReadFunctionField,
447
4758
                                   &Value::IsFunction>);
448
4758
}
449
450
811
void StreamBase::GetFD(const FunctionCallbackInfo<Value>& args) {
451
  // Mimic implementation of StreamBase::GetFD() and UDPWrap::GetFD().
452
1622
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
453
811
  if (wrap == nullptr) return args.GetReturnValue().Set(UV_EINVAL);
454
455
811
  if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL);
456
457
2433
  args.GetReturnValue().Set(wrap->GetFD());
458
}
459
460
35807
void StreamBase::GetBytesRead(const FunctionCallbackInfo<Value>& args) {
461
71614
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
462
35809
  if (wrap == nullptr) return args.GetReturnValue().Set(0);
463
464
  // uint64_t -> double. 53bits is enough for all real cases.
465
107418
  args.GetReturnValue().Set(static_cast<double>(wrap->bytes_read_));
466
}
467
468
35831
void StreamBase::GetBytesWritten(const FunctionCallbackInfo<Value>& args) {
469
71662
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
470
35831
  if (wrap == nullptr) return args.GetReturnValue().Set(0);
471
472
  // uint64_t -> double. 53bits is enough for all real cases.
473
107493
  args.GetReturnValue().Set(static_cast<double>(wrap->bytes_written_));
474
}
475
476
2
void StreamBase::GetExternal(const FunctionCallbackInfo<Value>& args) {
477
4
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
478
2
  if (wrap == nullptr) return;
479
480
2
  Local<External> ext = External::New(args.GetIsolate(), wrap);
481
4
  args.GetReturnValue().Set(ext);
482
}
483
484
template <int (StreamBase::*Method)(const FunctionCallbackInfo<Value>& args)>
485
453865
void StreamBase::JSMethod(const FunctionCallbackInfo<Value>& args) {
486
907730
  StreamBase* wrap = StreamBase::FromObject(args.Holder().As<Object>());
487





453867
  if (wrap == nullptr) return;
488
489





453869
  if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL);
490
491
907726
  AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap->GetAsyncWrap());
492
1361589
  args.GetReturnValue().Set((wrap->*Method)(args));
493
}
494
495
9294
int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
496
  // No TryWrite by default
497
9294
  return 0;
498
}
499
500
501
46535
const char* StreamResource::Error() const {
502
46535
  return nullptr;
503
}
504
505
506
void StreamResource::ClearError() {
507
  // No-op
508
}
509
510
511
282579
uv_buf_t EmitToJSStreamListener::OnStreamAlloc(size_t suggested_size) {
512
282579
  CHECK_NOT_NULL(stream_);
513
282579
  Environment* env = static_cast<StreamBase*>(stream_)->stream_env();
514
282579
  return AllocatedBuffer::AllocateManaged(env, suggested_size).release();
515
}
516
517
297090
void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {
518
297090
  CHECK_NOT_NULL(stream_);
519
297090
  StreamBase* stream = static_cast<StreamBase*>(stream_);
520
297090
  Environment* env = stream->stream_env();
521
573957
  HandleScope handle_scope(env->isolate());
522
573957
  Context::Scope context_scope(env->context());
523
573957
  AllocatedBuffer buf(env, buf_);
524
525
297090
  if (nread <= 0)  {
526
20082
    if (nread < 0)
527
20072
      stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
528
20031
    return;
529
  }
530
531
277008
  CHECK_LE(static_cast<size_t>(nread), buf.size());
532
277008
  buf.Resize(nread);
533
534
277008
  stream->CallJSOnreadMethod(nread, buf.ToArrayBuffer());
535
}
536
537
538
2090
uv_buf_t CustomBufferJSListener::OnStreamAlloc(size_t suggested_size) {
539
2090
  return buffer_;
540
}
541
542
543
2096
void CustomBufferJSListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
544
2096
  CHECK_NOT_NULL(stream_);
545
546
2096
  StreamBase* stream = static_cast<StreamBase*>(stream_);
547
2096
  Environment* env = stream->stream_env();
548
4186
  HandleScope handle_scope(env->isolate());
549
4186
  Context::Scope context_scope(env->context());
550
551
  // To deal with the case where POLLHUP is received and UV_EOF is returned, as
552
  // libuv returns an empty buffer (on unices only).
553

2096
  if (nread == UV_EOF && buf.base == nullptr) {
554
6
    stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
555
6
    return;
556
  }
557
558
2090
  CHECK_EQ(buf.base, buffer_.base);
559
560
  MaybeLocal<Value> ret = stream->CallJSOnreadMethod(nread,
561
                             Local<ArrayBuffer>(),
562
                             0,
563
2090
                             StreamBase::SKIP_NREAD_CHECKS);
564
  Local<Value> next_buf_v;
565

6270
  if (ret.ToLocal(&next_buf_v) && !next_buf_v->IsUndefined()) {
566
24
    buffer_.base = Buffer::Data(next_buf_v);
567
24
    buffer_.len = Buffer::Length(next_buf_v);
568
  }
569
}
570
571
572
12849
void ReportWritesToJSStreamListener::OnStreamAfterReqFinished(
573
    StreamReq* req_wrap, int status) {
574
12849
  StreamBase* stream = static_cast<StreamBase*>(stream_);
575
12849
  Environment* env = stream->stream_env();
576
12849
  AsyncWrap* async_wrap = req_wrap->GetAsyncWrap();
577
25697
  HandleScope handle_scope(env->isolate());
578
12849
  Context::Scope context_scope(env->context());
579
25698
  CHECK(!async_wrap->persistent().IsEmpty());
580
12849
  Local<Object> req_wrap_obj = async_wrap->object();
581
582
  Local<Value> argv[] = {
583
    Integer::New(env->isolate(), status),
584
12849
    stream->GetObject(),
585
    Undefined(env->isolate())
586
64245
  };
587
588
12849
  const char* msg = stream->Error();
589
12849
  if (msg != nullptr) {
590
    argv[2] = OneByteString(env->isolate(), msg);
591
    stream->ClearError();
592
  }
593
594
51396
  if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
595
12848
    async_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
596
12848
}
597
598
5434
void ReportWritesToJSStreamListener::OnStreamAfterWrite(
599
    WriteWrap* req_wrap, int status) {
600
5434
  OnStreamAfterReqFinished(req_wrap, status);
601
5433
}
602
603
7415
void ReportWritesToJSStreamListener::OnStreamAfterShutdown(
604
    ShutdownWrap* req_wrap, int status) {
605
7415
  OnStreamAfterReqFinished(req_wrap, status);
606
7415
}
607
608
7415
void ShutdownWrap::OnDone(int status) {
609
7415
  stream()->EmitAfterShutdown(this, status);
610
7415
  Dispose();
611
7415
}
612
613
7759
void WriteWrap::OnDone(int status) {
614
7759
  stream()->EmitAfterWrite(this, status);
615
7758
  Dispose();
616
7758
}
617
618
210216
StreamListener::~StreamListener() {
619
105108
  if (stream_ != nullptr)
620
88672
    stream_->RemoveStreamListener(this);
621
105108
}
622
623
3488
void StreamListener::OnStreamAfterShutdown(ShutdownWrap* w, int status) {
624
3488
  CHECK_NOT_NULL(previous_listener_);
625
3488
  previous_listener_->OnStreamAfterShutdown(w, status);
626
3488
}
627
628
4100
void StreamListener::OnStreamAfterWrite(WriteWrap* w, int status) {
629
4100
  CHECK_NOT_NULL(previous_listener_);
630
4100
  previous_listener_->OnStreamAfterWrite(w, status);
631
4100
}
632
633
129622
StreamResource::~StreamResource() {
634
66029
  while (listener_ != nullptr) {
635
609
    StreamListener* listener = listener_;
636
609
    listener->OnStreamDestroy();
637
    // Remove the listener if it didn’t remove itself. This makes the logic
638
    // in `OnStreamDestroy()` implementations easier, because they
639
    // may call generic cleanup functions which can just remove the
640
    // listener unconditionally.
641
609
    if (listener == listener_)
642
597
      RemoveStreamListener(listener_);
643
  }
644
64811
}
645
646
12
ShutdownWrap* StreamBase::CreateShutdownWrap(
647
    Local<Object> object) {
648
12
  auto* wrap = new SimpleShutdownWrap<AsyncWrap>(this, object);
649
12
  wrap->MakeWeak();
650
12
  return wrap;
651
}
652
653
7518
WriteWrap* StreamBase::CreateWriteWrap(
654
    Local<Object> object) {
655
7518
  auto* wrap = new SimpleWriteWrap<AsyncWrap>(this, object);
656
7518
  wrap->MakeWeak();
657
7518
  return wrap;
658
}
659
660

14475
}  // namespace node