GCC Code Coverage Report
Directory: ./ Exec Total Coverage
File: stream_pipe.cc Lines: 136 186 73.1 %
Date: 2022-05-23 04:15:47 Branches: 51 108 47.2 %

Line Branch Exec Source
1
#include "stream_pipe.h"
2
#include "stream_base-inl.h"
3
#include "node_buffer.h"
4
#include "util-inl.h"
5
6
namespace node {
7
8
using v8::BackingStore;
9
using v8::Context;
10
using v8::Function;
11
using v8::FunctionCallbackInfo;
12
using v8::FunctionTemplate;
13
using v8::HandleScope;
14
using v8::Local;
15
using v8::Object;
16
using v8::Value;
17
18
15
StreamPipe::StreamPipe(StreamBase* source,
19
                       StreamBase* sink,
20
15
                       Local<Object> obj)
21
15
    : AsyncWrap(source->stream_env(), obj, AsyncWrap::PROVIDER_STREAMPIPE) {
22
15
  MakeWeak();
23
24
15
  CHECK_NOT_NULL(sink);
25
15
  CHECK_NOT_NULL(source);
26
27
15
  source->PushStreamListener(&readable_listener_);
28
15
  sink->PushStreamListener(&writable_listener_);
29
30
15
  uses_wants_write_ = sink->HasWantsWrite();
31
32
  // Set up links between this object and the source/sink objects.
33
  // In particular, this makes sure that they are garbage collected as a group,
34
  // if that applies to the given streams (for example, Http2Streams use
35
  // weak references).
36
30
  if (obj->Set(env()->context(),
37
               env()->source_string(),
38
60
               source->GetObject()).IsNothing()) {
39
    return;
40
  }
41
15
  if (source->GetObject()->Set(env()->context(),
42
                               env()->pipe_target_string(),
43
45
                               obj).IsNothing()) {
44
      return;
45
  }
46
30
  if (obj->Set(env()->context(),
47
               env()->sink_string(),
48
60
               sink->GetObject()).IsNothing()) {
49
    return;
50
  }
51
15
  if (sink->GetObject()->Set(env()->context(),
52
                             env()->pipe_source_string(),
53
45
                             obj).IsNothing()) {
54
    return;
55
  }
56
}
57
58
60
StreamPipe::~StreamPipe() {
59
30
  Unpipe(true);
60
60
}
61
62
640
StreamBase* StreamPipe::source() {
63
640
  return static_cast<StreamBase*>(readable_listener_.stream());
64
}
65
66
231
StreamBase* StreamPipe::sink() {
67
231
  return static_cast<StreamBase*>(writable_listener_.stream());
68
}
69
70
29
void StreamPipe::Unpipe(bool is_in_deletion) {
71
29
  if (is_closed_)
72
15
    return;
73
74
  // Note that we possibly cannot use virtual methods on `source` and `sink`
75
  // here, because this function can be called from their destructors via
76
  // `OnStreamDestroy()`.
77
15
  if (!source_destroyed_)
78
15
    source()->ReadStop();
79
80
15
  is_closed_ = true;
81
15
  is_reading_ = false;
82
15
  source()->RemoveStreamListener(&readable_listener_);
83
15
  if (pending_writes_ == 0)
84
14
    sink()->RemoveStreamListener(&writable_listener_);
85
86
15
  if (is_in_deletion) return;
87
88
  // Delay the JS-facing part with SetImmediate, because this might be from
89
  // inside the garbage collector, so we can’t run JS here.
90
28
  HandleScope handle_scope(env()->isolate());
91
14
  BaseObjectPtr<StreamPipe> strong_ref{this};
92
14
  env()->SetImmediate([this, strong_ref](Environment* env) {
93
14
    HandleScope handle_scope(env->isolate());
94
14
    Context::Scope context_scope(env->context());
95
14
    Local<Object> object = this->object();
96
97
    Local<Value> onunpipe;
98
42
    if (!object->Get(env->context(), env->onunpipe_string()).ToLocal(&onunpipe))
99
      return;
100
28
    if (onunpipe->IsFunction() &&
101

56
        MakeCallback(onunpipe.As<Function>(), 0, nullptr).IsEmpty()) {
102
      return;
103
    }
104
105
    // Set all the links established in the constructor to `null`.
106
28
    Local<Value> null = Null(env->isolate());
107
108
    Local<Value> source_v;
109
    Local<Value> sink_v;
110
28
    if (!object->Get(env->context(), env->source_string()).ToLocal(&source_v) ||
111
42
        !object->Get(env->context(), env->sink_string()).ToLocal(&sink_v) ||
112


42
        !source_v->IsObject() || !sink_v->IsObject()) {
113
      return;
114
    }
115
116
28
    if (object->Set(env->context(), env->source_string(), null).IsNothing() ||
117
42
        object->Set(env->context(), env->sink_string(), null).IsNothing() ||
118
14
        source_v.As<Object>()
119
28
            ->Set(env->context(), env->pipe_target_string(), null)
120

42
            .IsNothing() ||
121
14
        sink_v.As<Object>()
122
42
            ->Set(env->context(), env->pipe_source_string(), null)
123
14
            .IsNothing()) {
124
      return;
125
    }
126
  });
127
}
128
129
207
uv_buf_t StreamPipe::ReadableListener::OnStreamAlloc(size_t suggested_size) {
130
207
  StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
131
207
  size_t size = std::min(suggested_size, pipe->wanted_data_);
132
207
  CHECK_GT(size, 0);
133
207
  return pipe->env()->allocate_managed_buffer(size);
134
}
135
136
212
void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
137
                                                const uv_buf_t& buf_) {
138
212
  StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
139
212
  std::unique_ptr<BackingStore> bs = pipe->env()->release_managed_buffer(buf_);
140
212
  if (nread < 0) {
141
    // EOF or error; stop reading and pass the error to the previous listener
142
    // (which might end up in JS).
143
13
    pipe->is_eof_ = true;
144
    // Cache `sink()` here because the previous listener might do things
145
    // that eventually lead to an `Unpipe()` call.
146
13
    StreamBase* sink = pipe->sink();
147
13
    stream()->ReadStop();
148
13
    CHECK_NOT_NULL(previous_listener_);
149
13
    previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0));
150
    // If we’re not writing, close now. Otherwise, we’ll do that in
151
    // `OnStreamAfterWrite()`.
152
13
    if (pipe->pending_writes_ == 0) {
153
8
      sink->Shutdown();
154
8
      pipe->Unpipe();
155
    }
156
13
    return;
157
  }
158
159
199
  pipe->ProcessData(nread, std::move(bs));
160
}
161
162
199
void StreamPipe::ProcessData(size_t nread,
163
                             std::unique_ptr<BackingStore> bs) {
164

199
  CHECK(uses_wants_write_ || pending_writes_ == 0);
165
199
  uv_buf_t buffer = uv_buf_init(static_cast<char*>(bs->Data()), nread);
166
398
  StreamWriteResult res = sink()->Write(&buffer, 1);
167
199
  pending_writes_++;
168
199
  if (!res.async) {
169
    writable_listener_.OnStreamAfterWrite(nullptr, res.err);
170
  } else {
171
199
    is_reading_ = false;
172
199
    res.wrap->SetBackingStore(std::move(bs));
173
199
    if (source() != nullptr)
174
199
      source()->ReadStop();
175
  }
176
199
}
177
178
198
void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w,
179
                                                      int status) {
180
198
  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
181
198
  pipe->pending_writes_--;
182
198
  if (pipe->is_closed_) {
183
    if (pipe->pending_writes_ == 0) {
184
      Environment* env = pipe->env();
185
      HandleScope handle_scope(env->isolate());
186
      Context::Scope context_scope(env->context());
187
      if (pipe->MakeCallback(env->oncomplete_string(), 0, nullptr).IsEmpty())
188
        return;
189
      stream()->RemoveStreamListener(this);
190
    }
191
    return;
192
  }
193
194
198
  if (pipe->is_eof_) {
195
10
    HandleScope handle_scope(pipe->env()->isolate());
196
    InternalCallbackScope callback_scope(pipe,
197
5
        InternalCallbackScope::kSkipTaskQueues);
198
5
    pipe->sink()->Shutdown();
199
5
    pipe->Unpipe();
200
5
    return;
201
  }
202
203
193
  if (status != 0) {
204
1
    CHECK_NOT_NULL(previous_listener_);
205
1
    StreamListener* prev = previous_listener_;
206
1
    pipe->Unpipe();
207
1
    prev->OnStreamAfterWrite(w, status);
208
1
    return;
209
  }
210
211
192
  if (!pipe->uses_wants_write_) {
212
    OnStreamWantsWrite(65536);
213
  }
214
}
215
216
void StreamPipe::WritableListener::OnStreamAfterShutdown(ShutdownWrap* w,
217
                                                         int status) {
218
  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
219
  CHECK_NOT_NULL(previous_listener_);
220
  StreamListener* prev = previous_listener_;
221
  pipe->Unpipe();
222
  prev->OnStreamAfterShutdown(w, status);
223
}
224
225
void StreamPipe::ReadableListener::OnStreamDestroy() {
226
  StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
227
  pipe->source_destroyed_ = true;
228
  if (!pipe->is_eof_) {
229
    OnStreamRead(UV_EPIPE, uv_buf_init(nullptr, 0));
230
  }
231
}
232
233
void StreamPipe::WritableListener::OnStreamDestroy() {
234
  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
235
  pipe->sink_destroyed_ = true;
236
  pipe->is_eof_ = true;
237
  pipe->pending_writes_ = 0;
238
  pipe->Unpipe();
239
}
240
241
227
void StreamPipe::WritableListener::OnStreamWantsWrite(size_t suggested_size) {
242
227
  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
243
227
  pipe->wanted_data_ = suggested_size;
244

227
  if (pipe->is_reading_ || pipe->is_closed_)
245
15
    return;
246
424
  HandleScope handle_scope(pipe->env()->isolate());
247
  InternalCallbackScope callback_scope(pipe,
248
424
      InternalCallbackScope::kSkipTaskQueues);
249
212
  pipe->is_reading_ = true;
250
212
  pipe->source()->ReadStart();
251
}
252
253
uv_buf_t StreamPipe::WritableListener::OnStreamAlloc(size_t suggested_size) {
254
  CHECK_NOT_NULL(previous_listener_);
255
  return previous_listener_->OnStreamAlloc(suggested_size);
256
}
257
258
void StreamPipe::WritableListener::OnStreamRead(ssize_t nread,
259
                                                const uv_buf_t& buf) {
260
  CHECK_NOT_NULL(previous_listener_);
261
  return previous_listener_->OnStreamRead(nread, buf);
262
}
263
264
15
void StreamPipe::New(const FunctionCallbackInfo<Value>& args) {
265
15
  CHECK(args.IsConstructCall());
266
15
  CHECK(args[0]->IsObject());
267
15
  CHECK(args[1]->IsObject());
268
30
  StreamBase* source = StreamBase::FromObject(args[0].As<Object>());
269
30
  StreamBase* sink = StreamBase::FromObject(args[1].As<Object>());
270
271
15
  new StreamPipe(source, sink, args.This());
272
15
}
273
274
15
void StreamPipe::Start(const FunctionCallbackInfo<Value>& args) {
275
  StreamPipe* pipe;
276
15
  ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
277
15
  pipe->is_closed_ = false;
278
15
  pipe->writable_listener_.OnStreamWantsWrite(65536);
279
}
280
281
void StreamPipe::Unpipe(const FunctionCallbackInfo<Value>& args) {
282
  StreamPipe* pipe;
283
  ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
284
  pipe->Unpipe();
285
}
286
287
void StreamPipe::IsClosed(const FunctionCallbackInfo<Value>& args) {
288
  StreamPipe* pipe;
289
  ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
290
  args.GetReturnValue().Set(pipe->is_closed_);
291
}
292
293
void StreamPipe::PendingWrites(const FunctionCallbackInfo<Value>& args) {
294
  StreamPipe* pipe;
295
  ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
296
  args.GetReturnValue().Set(pipe->pending_writes_);
297
}
298
299
namespace {
300
301
463
void InitializeStreamPipe(Local<Object> target,
302
                          Local<Value> unused,
303
                          Local<Context> context,
304
                          void* priv) {
305
463
  Environment* env = Environment::GetCurrent(context);
306
307
  // Create FunctionTemplate for FileHandle::CloseReq
308
463
  Local<FunctionTemplate> pipe = env->NewFunctionTemplate(StreamPipe::New);
309
463
  env->SetProtoMethod(pipe, "unpipe", StreamPipe::Unpipe);
310
463
  env->SetProtoMethod(pipe, "start", StreamPipe::Start);
311
463
  env->SetProtoMethod(pipe, "isClosed", StreamPipe::IsClosed);
312
463
  env->SetProtoMethod(pipe, "pendingWrites", StreamPipe::PendingWrites);
313
463
  pipe->Inherit(AsyncWrap::GetConstructorTemplate(env));
314
926
  pipe->InstanceTemplate()->SetInternalFieldCount(
315
      StreamPipe::kInternalFieldCount);
316
463
  env->SetConstructorFunction(target, "StreamPipe", pipe);
317
463
}
318
319
}  // anonymous namespace
320
321
}  // namespace node
322
323
5255
NODE_MODULE_CONTEXT_AWARE_INTERNAL(stream_pipe,
324
                                   node::InitializeStreamPipe)