GCC Code Coverage Report
Directory: ./ Exec Total Coverage
File: stream_base.cc Lines: 322 345 93.3 %
Date: 2022-05-07 04:15:18 Branches: 147 210 70.0 %

Line Branch Exec Source
1
#include "stream_base.h"  // NOLINT(build/include_inline)
2
#include "stream_base-inl.h"
3
#include "stream_wrap.h"
4
#include "allocated_buffer-inl.h"
5
6
#include "env-inl.h"
7
#include "js_stream.h"
8
#include "node.h"
9
#include "node_buffer.h"
10
#include "node_errors.h"
11
#include "node_external_reference.h"
12
#include "string_bytes.h"
13
#include "util-inl.h"
14
#include "v8.h"
15
16
#include <climits>  // INT_MAX
17
18
namespace node {
19
20
using v8::Array;
21
using v8::ArrayBuffer;
22
using v8::BackingStore;
23
using v8::ConstructorBehavior;
24
using v8::Context;
25
using v8::DontDelete;
26
using v8::DontEnum;
27
using v8::External;
28
using v8::Function;
29
using v8::FunctionCallbackInfo;
30
using v8::FunctionTemplate;
31
using v8::HandleScope;
32
using v8::Integer;
33
using v8::Isolate;
34
using v8::Local;
35
using v8::MaybeLocal;
36
using v8::Object;
37
using v8::PropertyAttribute;
38
using v8::ReadOnly;
39
using v8::SideEffectType;
40
using v8::Signature;
41
using v8::String;
42
using v8::Value;
43
44
template int StreamBase::WriteString<ASCII>(
45
    const FunctionCallbackInfo<Value>& args);
46
template int StreamBase::WriteString<UTF8>(
47
    const FunctionCallbackInfo<Value>& args);
48
template int StreamBase::WriteString<UCS2>(
49
    const FunctionCallbackInfo<Value>& args);
50
template int StreamBase::WriteString<LATIN1>(
51
    const FunctionCallbackInfo<Value>& args);
52
53
54
55133
int StreamBase::ReadStartJS(const FunctionCallbackInfo<Value>& args) {
55
55133
  return ReadStart();
56
}
57
58
59
13279
int StreamBase::ReadStopJS(const FunctionCallbackInfo<Value>& args) {
60
13279
  return ReadStop();
61
}
62
63
12
int StreamBase::UseUserBuffer(const FunctionCallbackInfo<Value>& args) {
64
12
  CHECK(Buffer::HasInstance(args[0]));
65
66
24
  uv_buf_t buf = uv_buf_init(Buffer::Data(args[0]), Buffer::Length(args[0]));
67
12
  PushStreamListener(new CustomBufferJSListener(buf));
68
12
  return 0;
69
}
70
71
35519
int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
72
35519
  CHECK(args[0]->IsObject());
73
35519
  Local<Object> req_wrap_obj = args[0].As<Object>();
74
75
35519
  return Shutdown(req_wrap_obj);
76
}
77
78
89360
void StreamBase::SetWriteResult(const StreamWriteResult& res) {
79
89360
  env_->stream_base_state()[kBytesWritten] = res.bytes;
80
89360
  env_->stream_base_state()[kLastWriteWasAsync] = res.async;
81
89360
}
82
83
13719
int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
84
13719
  Environment* env = Environment::GetCurrent(args);
85
13719
  Isolate* isolate = env->isolate();
86
13719
  Local<Context> context = env->context();
87
88
13719
  CHECK(args[0]->IsObject());
89
13719
  CHECK(args[1]->IsArray());
90
91
27438
  Local<Object> req_wrap_obj = args[0].As<Object>();
92
27438
  Local<Array> chunks = args[1].As<Array>();
93
13719
  bool all_buffers = args[2]->IsTrue();
94
95
  size_t count;
96
13719
  if (all_buffers)
97
319
    count = chunks->Length();
98
  else
99
13400
    count = chunks->Length() >> 1;
100
101
27438
  MaybeStackBuffer<uv_buf_t, 16> bufs(count);
102
103
13719
  size_t storage_size = 0;
104
  size_t offset;
105
106
13719
  if (!all_buffers) {
107
    // Determine storage size first
108
89263
    for (size_t i = 0; i < count; i++) {
109
      Local<Value> chunk;
110
151726
      if (!chunks->Get(context, i * 2).ToLocal(&chunk))
111
        return -1;
112
113
75863
      if (Buffer::HasInstance(chunk))
114
33239
        continue;
115
        // Buffer chunk, no additional storage required
116
117
      // String chunk
118
      Local<String> string;
119
85248
      if (!chunk->ToString(context).ToLocal(&string))
120
        return -1;
121
      Local<Value> next_chunk;
122
85248
      if (!chunks->Get(context, i * 2 + 1).ToLocal(&next_chunk))
123
        return -1;
124
42624
      enum encoding encoding = ParseEncoding(isolate, next_chunk);
125
      size_t chunk_size;
126
8263
      if ((encoding == UTF8 &&
127
8263
             string->Length() > 65535 &&
128

85276
             !StringBytes::Size(isolate, string, encoding).To(&chunk_size)) ||
129
85248
              !StringBytes::StorageSize(isolate, string, encoding)
130
42624
                  .To(&chunk_size)) {
131
        return -1;
132
      }
133
42624
      storage_size += chunk_size;
134
    }
135
136
13400
    if (storage_size > INT_MAX)
137
      return UV_ENOBUFS;
138
  } else {
139
27295
    for (size_t i = 0; i < count; i++) {
140
      Local<Value> chunk;
141
53952
      if (!chunks->Get(context, i).ToLocal(&chunk))
142
        return -1;
143
26976
      bufs[i].base = Buffer::Data(chunk);
144
26976
      bufs[i].len = Buffer::Length(chunk);
145
    }
146
  }
147
148
13719
  std::unique_ptr<BackingStore> bs;
149
13719
  if (storage_size > 0) {
150
13354
    NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data());
151
13354
    bs = ArrayBuffer::NewBackingStore(isolate, storage_size);
152
  }
153
154
13719
  offset = 0;
155
13719
  if (!all_buffers) {
156
89263
    for (size_t i = 0; i < count; i++) {
157
      Local<Value> chunk;
158
151726
      if (!chunks->Get(context, i * 2).ToLocal(&chunk))
159
        return -1;
160
161
      // Write buffer
162
75863
      if (Buffer::HasInstance(chunk)) {
163
33239
        bufs[i].base = Buffer::Data(chunk);
164
33239
        bufs[i].len = Buffer::Length(chunk);
165
33239
        continue;
166
      }
167
168
      // Write string
169
42624
      CHECK_LE(offset, storage_size);
170
      char* str_storage =
171
42624
          static_cast<char*>(bs ? bs->Data() : nullptr) + offset;
172
42624
      size_t str_size = (bs ? bs->ByteLength() : 0) - offset;
173
174
      Local<String> string;
175
85248
      if (!chunk->ToString(context).ToLocal(&string))
176
        return -1;
177
      Local<Value> next_chunk;
178
85248
      if (!chunks->Get(context, i * 2 + 1).ToLocal(&next_chunk))
179
        return -1;
180
42624
      enum encoding encoding = ParseEncoding(isolate, next_chunk);
181
42624
      str_size = StringBytes::Write(isolate,
182
                                    str_storage,
183
                                    str_size,
184
                                    string,
185
                                    encoding);
186
42624
      bufs[i].base = str_storage;
187
42624
      bufs[i].len = str_size;
188
42624
      offset += str_size;
189
    }
190
  }
191
192
13719
  StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj);
193
13719
  SetWriteResult(res);
194

13719
  if (res.wrap != nullptr && storage_size > 0)
195
359
    res.wrap->SetBackingStore(std::move(bs));
196
13719
  return res.err;
197
}
198
199
200
46382
int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
201
46382
  CHECK(args[0]->IsObject());
202
203
46382
  Environment* env = Environment::GetCurrent(args);
204
205
46382
  if (!args[1]->IsUint8Array()) {
206
1
    node::THROW_ERR_INVALID_ARG_TYPE(env, "Second argument must be a buffer");
207
1
    return 0;
208
  }
209
210
92762
  Local<Object> req_wrap_obj = args[0].As<Object>();
211
  uv_buf_t buf;
212
46381
  buf.base = Buffer::Data(args[1]);
213
46381
  buf.len = Buffer::Length(args[1]);
214
215
46381
  uv_stream_t* send_handle = nullptr;
216
217

46381
  if (args[2]->IsObject() && IsIPCPipe()) {
218
    Local<Object> send_handle_obj = args[2].As<Object>();
219
220
    HandleWrap* wrap;
221
    ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
222
    send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
223
    // Reference LibuvStreamWrap instance to prevent it from being garbage
224
    // collected before `AfterWrite` is called.
225
    if (req_wrap_obj->Set(env->context(),
226
                          env->handle_string(),
227
                          send_handle_obj).IsNothing()) {
228
      return -1;
229
    }
230
  }
231
232
46381
  StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
233
46381
  SetWriteResult(res);
234
235
46381
  return res.err;
236
}
237
238
239
template <enum encoding enc>
240
58520
int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
241
58520
  Environment* env = Environment::GetCurrent(args);
242
58520
  Isolate* isolate = env->isolate();
243
58520
  CHECK(args[0]->IsObject());
244
117040
  CHECK(args[1]->IsString());
245
246
117040
  Local<Object> req_wrap_obj = args[0].As<Object>();
247
117040
  Local<String> string = args[1].As<String>();
248
  Local<Object> send_handle_obj;
249
58520
  if (args[2]->IsObject())
250
424
    send_handle_obj = args[2].As<Object>();
251
252
  // Compute the size of the storage that the string will be flattened into.
253
  // For UTF8 strings that are very long, go ahead and take the hit for
254
  // computing their actual size, rather than tripling the storage.
255
  size_t storage_size;
256
58520
  if ((enc == UTF8 &&
257
51842
         string->Length() > 65535 &&
258

110646
         !StringBytes::Size(isolate, string, enc).To(&storage_size)) ||
259

168882
          !StringBytes::StorageSize(isolate, string, enc).To(&storage_size)) {
260
    return -1;
261
  }
262
263
58520
  if (storage_size > INT_MAX)
264
    return UV_ENOBUFS;
265
266
  // Try writing immediately if write size isn't too big
267
  char stack_storage[16384];  // 16kb
268
  size_t data_size;
269
58520
  size_t synchronously_written = 0;
270
  uv_buf_t buf;
271
272
114834
  bool try_write = storage_size <= sizeof(stack_storage) &&
273

67062
                   (!IsIPCPipe() || send_handle_obj.IsEmpty());
274
58520
  if (try_write) {
275
56106
    data_size = StringBytes::Write(isolate,
276
                                   stack_storage,
277
                                   storage_size,
278
                                   string,
279
                                   enc);
280
56106
    buf = uv_buf_init(stack_storage, data_size);
281
282
56106
    uv_buf_t* bufs = &buf;
283
56106
    size_t count = 1;
284
56106
    const int err = DoTryWrite(&bufs, &count);
285
    // Keep track of the bytes written here, because we're taking a shortcut
286
    // by using `DoTryWrite()` directly instead of using the utilities
287
    // provided by `Write()`.
288
56106
    synchronously_written = count == 0 ? data_size : data_size - buf.len;
289
56106
    bytes_written_ += synchronously_written;
290
291
    // Immediate failure or success
292

56106
    if (err != 0 || count == 0) {
293
51604
      SetWriteResult(StreamWriteResult { false, err, nullptr, data_size, {} });
294
51604
      return err;
295
    }
296
297
    // Partial write
298
4502
    CHECK_EQ(count, 1);
299
  }
300
301
6916
  std::unique_ptr<BackingStore> bs;
302
303
6916
  if (try_write) {
304
    // Copy partial data
305
4502
    NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data());
306
4502
    bs = ArrayBuffer::NewBackingStore(isolate, buf.len);
307
4502
    memcpy(static_cast<char*>(bs->Data()), buf.base, buf.len);
308
4502
    data_size = buf.len;
309
  } else {
310
    // Write it
311
2414
    NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data());
312
2414
    bs = ArrayBuffer::NewBackingStore(isolate, storage_size);
313
2414
    data_size = StringBytes::Write(isolate,
314
2414
                                   static_cast<char*>(bs->Data()),
315
                                   storage_size,
316
                                   string,
317
                                   enc);
318
  }
319
320
6916
  CHECK_LE(data_size, storage_size);
321
322
6916
  buf = uv_buf_init(static_cast<char*>(bs->Data()), data_size);
323
324
6916
  uv_stream_t* send_handle = nullptr;
325
326

7132
  if (IsIPCPipe() && !send_handle_obj.IsEmpty()) {
327
    HandleWrap* wrap;
328
212
    ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
329
212
    send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
330
    // Reference LibuvStreamWrap instance to prevent it from being garbage
331
    // collected before `AfterWrite` is called.
332
424
    if (req_wrap_obj->Set(env->context(),
333
                          env->handle_string(),
334
212
                          send_handle_obj).IsNothing()) {
335
      return -1;
336
    }
337
  }
338
339
6916
  StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
340
6916
  res.bytes += synchronously_written;
341
342
6916
  SetWriteResult(res);
343
6916
  if (res.wrap != nullptr)
344
6870
    res.wrap->SetBackingStore(std::move(bs));
345
346
6916
  return res.err;
347
}
348
349
350
65715
MaybeLocal<Value> StreamBase::CallJSOnreadMethod(ssize_t nread,
351
                                                 Local<ArrayBuffer> ab,
352
                                                 size_t offset,
353
                                                 StreamBaseJSChecks checks) {
354
65715
  Environment* env = env_;
355
356
  DCHECK_EQ(static_cast<int32_t>(nread), nread);
357
  DCHECK_LE(offset, INT32_MAX);
358
359
65715
  if (checks == DONT_SKIP_NREAD_CHECKS) {
360
63625
    if (ab.IsEmpty()) {
361
      DCHECK_EQ(offset, 0);
362
      DCHECK_LE(nread, 0);
363
    } else {
364
      DCHECK_GE(nread, 0);
365
    }
366
  }
367
368
65715
  env->stream_base_state()[kReadBytesOrError] = static_cast<int32_t>(nread);
369
65715
  env->stream_base_state()[kArrayBufferOffset] = offset;
370
371
  Local<Value> argv[] = {
372
110275
    ab.IsEmpty() ? Undefined(env->isolate()).As<Value>() : ab.As<Value>()
373
65715
  };
374
375
65715
  AsyncWrap* wrap = GetAsyncWrap();
376
65715
  CHECK_NOT_NULL(wrap);
377
65715
  Local<Value> onread = wrap->object()->GetInternalField(
378
131430
      StreamBase::kOnReadFunctionField);
379
65715
  CHECK(onread->IsFunction());
380
131430
  return wrap->MakeCallback(onread.As<Function>(), arraysize(argv), argv);
381
}
382
383
384
4455
bool StreamBase::IsIPCPipe() {
385
4455
  return false;
386
}
387
388
389
int StreamBase::GetFD() {
390
  return -1;
391
}
392
393
394
56173
Local<Object> StreamBase::GetObject() {
395
56173
  return GetAsyncWrap()->object();
396
}
397
398
35296
void StreamBase::AddMethod(Environment* env,
399
                           Local<Signature> signature,
400
                           enum PropertyAttribute attributes,
401
                           Local<FunctionTemplate> t,
402
                           JSMethodFunction* stream_method,
403
                           Local<String> string) {
404
  Local<FunctionTemplate> templ =
405
      env->NewFunctionTemplate(stream_method,
406
                               signature,
407
                               ConstructorBehavior::kThrow,
408
35296
                               SideEffectType::kHasNoSideEffect);
409
70592
  t->PrototypeTemplate()->SetAccessorProperty(
410
      string, templ, Local<FunctionTemplate>(), attributes);
411
35296
}
412
413
8824
void StreamBase::AddMethods(Environment* env, Local<FunctionTemplate> t) {
414
8824
  HandleScope scope(env->isolate());
415
416
8824
  enum PropertyAttribute attributes =
417
      static_cast<PropertyAttribute>(ReadOnly | DontDelete | DontEnum);
418
8824
  Local<Signature> sig = Signature::New(env->isolate(), t);
419
420
8824
  AddMethod(env, sig, attributes, t, GetFD, env->fd_string());
421
8824
  AddMethod(
422
      env, sig, attributes, t, GetExternal, env->external_stream_string());
423
8824
  AddMethod(env, sig, attributes, t, GetBytesRead, env->bytes_read_string());
424
8824
  AddMethod(
425
      env, sig, attributes, t, GetBytesWritten, env->bytes_written_string());
426
8824
  env->SetProtoMethod(t, "readStart", JSMethod<&StreamBase::ReadStartJS>);
427
8824
  env->SetProtoMethod(t, "readStop", JSMethod<&StreamBase::ReadStopJS>);
428
8824
  env->SetProtoMethod(t, "shutdown", JSMethod<&StreamBase::Shutdown>);
429
8824
  env->SetProtoMethod(t,
430
                      "useUserBuffer",
431
                      JSMethod<&StreamBase::UseUserBuffer>);
432
8824
  env->SetProtoMethod(t, "writev", JSMethod<&StreamBase::Writev>);
433
8824
  env->SetProtoMethod(t, "writeBuffer", JSMethod<&StreamBase::WriteBuffer>);
434
8824
  env->SetProtoMethod(
435
      t, "writeAsciiString", JSMethod<&StreamBase::WriteString<ASCII>>);
436
8824
  env->SetProtoMethod(
437
      t, "writeUtf8String", JSMethod<&StreamBase::WriteString<UTF8>>);
438
8824
  env->SetProtoMethod(
439
      t, "writeUcs2String", JSMethod<&StreamBase::WriteString<UCS2>>);
440
8824
  env->SetProtoMethod(
441
      t, "writeLatin1String", JSMethod<&StreamBase::WriteString<LATIN1>>);
442
35296
  t->PrototypeTemplate()->Set(FIXED_ONE_BYTE_STRING(env->isolate(),
443
                                                    "isStreamBase"),
444
                              True(env->isolate()));
445
26472
  t->PrototypeTemplate()->SetAccessor(
446
      FIXED_ONE_BYTE_STRING(env->isolate(), "onread"),
447
      BaseObject::InternalFieldGet<
448
          StreamBase::kOnReadFunctionField>,
449
      BaseObject::InternalFieldSet<
450
          StreamBase::kOnReadFunctionField,
451
          &Value::IsFunction>);
452
8824
}
453
454
5175
void StreamBase::RegisterExternalReferences(
455
    ExternalReferenceRegistry* registry) {
456
5175
  registry->Register(GetFD);
457
5175
  registry->Register(GetExternal);
458
5175
  registry->Register(GetBytesRead);
459
5175
  registry->Register(GetBytesWritten);
460
5175
  registry->Register(JSMethod<&StreamBase::ReadStartJS>);
461
5175
  registry->Register(JSMethod<&StreamBase::ReadStopJS>);
462
5175
  registry->Register(JSMethod<&StreamBase::Shutdown>);
463
5175
  registry->Register(JSMethod<&StreamBase::UseUserBuffer>);
464
5175
  registry->Register(JSMethod<&StreamBase::Writev>);
465
5175
  registry->Register(JSMethod<&StreamBase::WriteBuffer>);
466
5175
  registry->Register(JSMethod<&StreamBase::WriteString<ASCII>>);
467
5175
  registry->Register(JSMethod<&StreamBase::WriteString<UTF8>>);
468
5175
  registry->Register(JSMethod<&StreamBase::WriteString<UCS2>>);
469
5175
  registry->Register(JSMethod<&StreamBase::WriteString<LATIN1>>);
470
5175
  registry->Register(
471
      BaseObject::InternalFieldGet<StreamBase::kOnReadFunctionField>);
472
5175
  registry->Register(
473
      BaseObject::InternalFieldSet<StreamBase::kOnReadFunctionField,
474
                                   &Value::IsFunction>);
475
5175
}
476
477
1308
void StreamBase::GetFD(const FunctionCallbackInfo<Value>& args) {
478
  // Mimic implementation of StreamBase::GetFD() and UDPWrap::GetFD().
479
2616
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
480
1308
  if (wrap == nullptr) return args.GetReturnValue().Set(UV_EINVAL);
481
482
1308
  if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL);
483
484
2616
  args.GetReturnValue().Set(wrap->GetFD());
485
}
486
487
36231
void StreamBase::GetBytesRead(const FunctionCallbackInfo<Value>& args) {
488
72462
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
489
36233
  if (wrap == nullptr) return args.GetReturnValue().Set(0);
490
491
  // uint64_t -> double. 53bits is enough for all real cases.
492
72460
  args.GetReturnValue().Set(static_cast<double>(wrap->bytes_read_));
493
}
494
495
36265
void StreamBase::GetBytesWritten(const FunctionCallbackInfo<Value>& args) {
496
72530
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
497
36265
  if (wrap == nullptr) return args.GetReturnValue().Set(0);
498
499
  // uint64_t -> double. 53bits is enough for all real cases.
500
72530
  args.GetReturnValue().Set(static_cast<double>(wrap->bytes_written_));
501
}
502
503
2
void StreamBase::GetExternal(const FunctionCallbackInfo<Value>& args) {
504
4
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
505
2
  if (wrap == nullptr) return;
506
507
2
  Local<External> ext = External::New(args.GetIsolate(), wrap);
508
4
  args.GetReturnValue().Set(ext);
509
}
510
511
template <int (StreamBase::*Method)(const FunctionCallbackInfo<Value>& args)>
512
386612
void StreamBase::JSMethod(const FunctionCallbackInfo<Value>& args) {
513
773224
  StreamBase* wrap = StreamBase::FromObject(args.Holder().As<Object>());
514
386616
  if (wrap == nullptr) return;
515
516
386620
  if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL);
517
518
386608
  AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap->GetAsyncWrap());
519
773216
  args.GetReturnValue().Set((wrap->*Method)(args));
520
}
521
522
11444
int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
523
  // No TryWrite by default
524
11444
  return 0;
525
}
526
527
528
57451
const char* StreamResource::Error() const {
529
57451
  return nullptr;
530
}
531
532
533
void StreamResource::ClearError() {
534
  // No-op
535
}
536
537
538
35222
uv_buf_t EmitToJSStreamListener::OnStreamAlloc(size_t suggested_size) {
539
35222
  CHECK_NOT_NULL(stream_);
540
35222
  Environment* env = static_cast<StreamBase*>(stream_)->stream_env();
541
35222
  return env->allocate_managed_buffer(suggested_size);
542
}
543
544
49847
void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {
545
49847
  CHECK_NOT_NULL(stream_);
546
49847
  StreamBase* stream = static_cast<StreamBase*>(stream_);
547
49847
  Environment* env = stream->stream_env();
548
49847
  Isolate* isolate = env->isolate();
549
49847
  HandleScope handle_scope(isolate);
550
49847
  Context::Scope context_scope(env->context());
551
49847
  std::unique_ptr<BackingStore> bs = env->release_managed_buffer(buf_);
552
553
49847
  if (nread <= 0)  {
554
20203
    if (nread < 0)
555
20184
      stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
556
20147
    return;
557
  }
558
559
29644
  CHECK_LE(static_cast<size_t>(nread), bs->ByteLength());
560
29644
  bs = BackingStore::Reallocate(isolate, std::move(bs), nread);
561
562
29644
  stream->CallJSOnreadMethod(nread, ArrayBuffer::New(isolate, std::move(bs)));
563
}
564
565
566
2090
uv_buf_t CustomBufferJSListener::OnStreamAlloc(size_t suggested_size) {
567
2090
  return buffer_;
568
}
569
570
571
2096
void CustomBufferJSListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
572
2096
  CHECK_NOT_NULL(stream_);
573
574
2096
  StreamBase* stream = static_cast<StreamBase*>(stream_);
575
2096
  Environment* env = stream->stream_env();
576
2096
  HandleScope handle_scope(env->isolate());
577
2096
  Context::Scope context_scope(env->context());
578
579
  // To deal with the case where POLLHUP is received and UV_EOF is returned, as
580
  // libuv returns an empty buffer (on unices only).
581

2096
  if (nread == UV_EOF && buf.base == nullptr) {
582
6
    stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
583
6
    return;
584
  }
585
586
2090
  CHECK_EQ(buf.base, buffer_.base);
587
588
  MaybeLocal<Value> ret = stream->CallJSOnreadMethod(nread,
589
                             Local<ArrayBuffer>(),
590
                             0,
591
2090
                             StreamBase::SKIP_NREAD_CHECKS);
592
  Local<Value> next_buf_v;
593

6270
  if (ret.ToLocal(&next_buf_v) && !next_buf_v->IsUndefined()) {
594
24
    buffer_.base = Buffer::Data(next_buf_v);
595
24
    buffer_.len = Buffer::Length(next_buf_v);
596
  }
597
}
598
599
600
15948
void ReportWritesToJSStreamListener::OnStreamAfterReqFinished(
601
    StreamReq* req_wrap, int status) {
602
15948
  StreamBase* stream = static_cast<StreamBase*>(stream_);
603
15948
  Environment* env = stream->stream_env();
604
15948
  if (env->is_stopping()) return;
605
15501
  AsyncWrap* async_wrap = req_wrap->GetAsyncWrap();
606
31001
  HandleScope handle_scope(env->isolate());
607
15501
  Context::Scope context_scope(env->context());
608
15501
  CHECK(!async_wrap->persistent().IsEmpty());
609
15501
  Local<Object> req_wrap_obj = async_wrap->object();
610
611
  Local<Value> argv[] = {
612
    Integer::New(env->isolate(), status),
613
15501
    stream->GetObject(),
614
    Undefined(env->isolate())
615
46503
  };
616
617
15501
  const char* msg = stream->Error();
618
15501
  if (msg != nullptr) {
619
    argv[2] = OneByteString(env->isolate(), msg);
620
    stream->ClearError();
621
  }
622
623
46503
  if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
624
15500
    async_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
625
}
626
627
5945
void ReportWritesToJSStreamListener::OnStreamAfterWrite(
628
    WriteWrap* req_wrap, int status) {
629
5945
  OnStreamAfterReqFinished(req_wrap, status);
630
5944
}
631
632
10003
void ReportWritesToJSStreamListener::OnStreamAfterShutdown(
633
    ShutdownWrap* req_wrap, int status) {
634
10003
  OnStreamAfterReqFinished(req_wrap, status);
635
10003
}
636
637
10003
void ShutdownWrap::OnDone(int status) {
638
10003
  stream()->EmitAfterShutdown(this, status);
639
10003
  Dispose();
640
10003
}
641
642
9085
void WriteWrap::OnDone(int status) {
643
9085
  stream()->EmitAfterWrite(this, status);
644
9084
  Dispose();
645
9084
}
646
647
462204
StreamListener::~StreamListener() {
648
231102
  if (stream_ != nullptr)
649
194868
    stream_->RemoveStreamListener(this);
650
}
651
652
3924
void StreamListener::OnStreamAfterShutdown(ShutdownWrap* w, int status) {
653
3924
  CHECK_NOT_NULL(previous_listener_);
654
3924
  previous_listener_->OnStreamAfterShutdown(w, status);
655
3924
}
656
657
4552
void StreamListener::OnStreamAfterWrite(WriteWrap* w, int status) {
658
4552
  CHECK_NOT_NULL(previous_listener_);
659
4552
  previous_listener_->OnStreamAfterWrite(w, status);
660
4552
}
661
662
139818
StreamResource::~StreamResource() {
663
141068
  while (listener_ != nullptr) {
664
1250
    StreamListener* listener = listener_;
665
1250
    listener->OnStreamDestroy();
666
    // Remove the listener if it didn’t remove itself. This makes the logic
667
    // in `OnStreamDestroy()` implementations easier, because they
668
    // may call generic cleanup functions which can just remove the
669
    // listener unconditionally.
670
1250
    if (listener == listener_)
671
1226
      RemoveStreamListener(listener_);
672
  }
673
}
674
675
13
ShutdownWrap* StreamBase::CreateShutdownWrap(
676
    Local<Object> object) {
677
13
  auto* wrap = new SimpleShutdownWrap<AsyncWrap>(this, object);
678
13
  wrap->MakeWeak();
679
13
  return wrap;
680
}
681
682
9190
WriteWrap* StreamBase::CreateWriteWrap(
683
    Local<Object> object) {
684
9190
  auto* wrap = new SimpleWriteWrap<AsyncWrap>(this, object);
685
9190
  wrap->MakeWeak();
686
9190
  return wrap;
687
}
688
689
}  // namespace node