GCC Code Coverage Report
Directory: ./ Exec Total Coverage
File: stream_pipe.cc Lines: 143 193 74.1 %
Date: 2022-08-14 04:19:53 Branches: 52 110 47.3 %

Line Branch Exec Source
1
#include "stream_pipe.h"
2
#include "stream_base-inl.h"
3
#include "node_buffer.h"
4
#include "util-inl.h"
5
6
namespace node {
7
8
using v8::BackingStore;
9
using v8::Context;
10
using v8::Function;
11
using v8::FunctionCallbackInfo;
12
using v8::FunctionTemplate;
13
using v8::HandleScope;
14
using v8::Isolate;
15
using v8::Just;
16
using v8::Local;
17
using v8::Maybe;
18
using v8::Nothing;
19
using v8::Object;
20
using v8::Value;
21
22
15
StreamPipe::StreamPipe(StreamBase* source,
23
                       StreamBase* sink,
24
15
                       Local<Object> obj)
25
15
    : AsyncWrap(source->stream_env(), obj, AsyncWrap::PROVIDER_STREAMPIPE) {
26
15
  MakeWeak();
27
28
15
  CHECK_NOT_NULL(sink);
29
15
  CHECK_NOT_NULL(source);
30
31
15
  source->PushStreamListener(&readable_listener_);
32
15
  sink->PushStreamListener(&writable_listener_);
33
34
15
  uses_wants_write_ = sink->HasWantsWrite();
35
15
}
36
37
60
StreamPipe::~StreamPipe() {
38
30
  Unpipe(true);
39
60
}
40
41
640
StreamBase* StreamPipe::source() {
42
640
  return static_cast<StreamBase*>(readable_listener_.stream());
43
}
44
45
231
StreamBase* StreamPipe::sink() {
46
231
  return static_cast<StreamBase*>(writable_listener_.stream());
47
}
48
49
29
void StreamPipe::Unpipe(bool is_in_deletion) {
50
29
  if (is_closed_)
51
15
    return;
52
53
  // Note that we possibly cannot use virtual methods on `source` and `sink`
54
  // here, because this function can be called from their destructors via
55
  // `OnStreamDestroy()`.
56
15
  if (!source_destroyed_)
57
15
    source()->ReadStop();
58
59
15
  is_closed_ = true;
60
15
  is_reading_ = false;
61
15
  source()->RemoveStreamListener(&readable_listener_);
62
15
  if (pending_writes_ == 0)
63
14
    sink()->RemoveStreamListener(&writable_listener_);
64
65
15
  if (is_in_deletion) return;
66
67
  // Delay the JS-facing part with SetImmediate, because this might be from
68
  // inside the garbage collector, so we can’t run JS here.
69
28
  HandleScope handle_scope(env()->isolate());
70
14
  BaseObjectPtr<StreamPipe> strong_ref{this};
71
14
  env()->SetImmediate([this, strong_ref](Environment* env) {
72
14
    HandleScope handle_scope(env->isolate());
73
14
    Context::Scope context_scope(env->context());
74
14
    Local<Object> object = this->object();
75
76
    Local<Value> onunpipe;
77
42
    if (!object->Get(env->context(), env->onunpipe_string()).ToLocal(&onunpipe))
78
      return;
79
28
    if (onunpipe->IsFunction() &&
80

56
        MakeCallback(onunpipe.As<Function>(), 0, nullptr).IsEmpty()) {
81
      return;
82
    }
83
84
    // Set all the links established in the constructor to `null`.
85
28
    Local<Value> null = Null(env->isolate());
86
87
    Local<Value> source_v;
88
    Local<Value> sink_v;
89
28
    if (!object->Get(env->context(), env->source_string()).ToLocal(&source_v) ||
90
42
        !object->Get(env->context(), env->sink_string()).ToLocal(&sink_v) ||
91


42
        !source_v->IsObject() || !sink_v->IsObject()) {
92
      return;
93
    }
94
95
28
    if (object->Set(env->context(), env->source_string(), null).IsNothing() ||
96
42
        object->Set(env->context(), env->sink_string(), null).IsNothing() ||
97
14
        source_v.As<Object>()
98
28
            ->Set(env->context(), env->pipe_target_string(), null)
99

42
            .IsNothing() ||
100
14
        sink_v.As<Object>()
101
42
            ->Set(env->context(), env->pipe_source_string(), null)
102
14
            .IsNothing()) {
103
      return;
104
    }
105
  });
106
}
107
108
207
uv_buf_t StreamPipe::ReadableListener::OnStreamAlloc(size_t suggested_size) {
109
207
  StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
110
207
  size_t size = std::min(suggested_size, pipe->wanted_data_);
111
207
  CHECK_GT(size, 0);
112
207
  return pipe->env()->allocate_managed_buffer(size);
113
}
114
115
212
void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
116
                                                const uv_buf_t& buf_) {
117
212
  StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
118
212
  std::unique_ptr<BackingStore> bs = pipe->env()->release_managed_buffer(buf_);
119
212
  if (nread < 0) {
120
    // EOF or error; stop reading and pass the error to the previous listener
121
    // (which might end up in JS).
122
13
    pipe->is_eof_ = true;
123
    // Cache `sink()` here because the previous listener might do things
124
    // that eventually lead to an `Unpipe()` call.
125
13
    StreamBase* sink = pipe->sink();
126
13
    stream()->ReadStop();
127
13
    CHECK_NOT_NULL(previous_listener_);
128
13
    previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0));
129
    // If we’re not writing, close now. Otherwise, we’ll do that in
130
    // `OnStreamAfterWrite()`.
131
13
    if (pipe->pending_writes_ == 0) {
132
8
      sink->Shutdown();
133
8
      pipe->Unpipe();
134
    }
135
13
    return;
136
  }
137
138
199
  pipe->ProcessData(nread, std::move(bs));
139
}
140
141
199
void StreamPipe::ProcessData(size_t nread,
142
                             std::unique_ptr<BackingStore> bs) {
143

199
  CHECK(uses_wants_write_ || pending_writes_ == 0);
144
199
  uv_buf_t buffer = uv_buf_init(static_cast<char*>(bs->Data()), nread);
145
398
  StreamWriteResult res = sink()->Write(&buffer, 1);
146
199
  pending_writes_++;
147
199
  if (!res.async) {
148
    writable_listener_.OnStreamAfterWrite(nullptr, res.err);
149
  } else {
150
199
    is_reading_ = false;
151
199
    res.wrap->SetBackingStore(std::move(bs));
152
199
    if (source() != nullptr)
153
199
      source()->ReadStop();
154
  }
155
199
}
156
157
198
void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w,
158
                                                      int status) {
159
198
  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
160
198
  pipe->pending_writes_--;
161
198
  if (pipe->is_closed_) {
162
    if (pipe->pending_writes_ == 0) {
163
      Environment* env = pipe->env();
164
      HandleScope handle_scope(env->isolate());
165
      Context::Scope context_scope(env->context());
166
      if (pipe->MakeCallback(env->oncomplete_string(), 0, nullptr).IsEmpty())
167
        return;
168
      stream()->RemoveStreamListener(this);
169
    }
170
    return;
171
  }
172
173
198
  if (pipe->is_eof_) {
174
10
    HandleScope handle_scope(pipe->env()->isolate());
175
    InternalCallbackScope callback_scope(pipe,
176
5
        InternalCallbackScope::kSkipTaskQueues);
177
5
    pipe->sink()->Shutdown();
178
5
    pipe->Unpipe();
179
5
    return;
180
  }
181
182
193
  if (status != 0) {
183
1
    CHECK_NOT_NULL(previous_listener_);
184
1
    StreamListener* prev = previous_listener_;
185
1
    pipe->Unpipe();
186
1
    prev->OnStreamAfterWrite(w, status);
187
1
    return;
188
  }
189
190
192
  if (!pipe->uses_wants_write_) {
191
    OnStreamWantsWrite(65536);
192
  }
193
}
194
195
void StreamPipe::WritableListener::OnStreamAfterShutdown(ShutdownWrap* w,
196
                                                         int status) {
197
  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
198
  CHECK_NOT_NULL(previous_listener_);
199
  StreamListener* prev = previous_listener_;
200
  pipe->Unpipe();
201
  prev->OnStreamAfterShutdown(w, status);
202
}
203
204
void StreamPipe::ReadableListener::OnStreamDestroy() {
205
  StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
206
  pipe->source_destroyed_ = true;
207
  if (!pipe->is_eof_) {
208
    OnStreamRead(UV_EPIPE, uv_buf_init(nullptr, 0));
209
  }
210
}
211
212
void StreamPipe::WritableListener::OnStreamDestroy() {
213
  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
214
  pipe->sink_destroyed_ = true;
215
  pipe->is_eof_ = true;
216
  pipe->pending_writes_ = 0;
217
  pipe->Unpipe();
218
}
219
220
227
void StreamPipe::WritableListener::OnStreamWantsWrite(size_t suggested_size) {
221
227
  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
222
227
  pipe->wanted_data_ = suggested_size;
223

227
  if (pipe->is_reading_ || pipe->is_closed_)
224
15
    return;
225
424
  HandleScope handle_scope(pipe->env()->isolate());
226
  InternalCallbackScope callback_scope(pipe,
227
424
      InternalCallbackScope::kSkipTaskQueues);
228
212
  pipe->is_reading_ = true;
229
212
  pipe->source()->ReadStart();
230
}
231
232
uv_buf_t StreamPipe::WritableListener::OnStreamAlloc(size_t suggested_size) {
233
  CHECK_NOT_NULL(previous_listener_);
234
  return previous_listener_->OnStreamAlloc(suggested_size);
235
}
236
237
void StreamPipe::WritableListener::OnStreamRead(ssize_t nread,
238
                                                const uv_buf_t& buf) {
239
  CHECK_NOT_NULL(previous_listener_);
240
  return previous_listener_->OnStreamRead(nread, buf);
241
}
242
243
15
Maybe<StreamPipe*> StreamPipe::New(StreamBase* source,
244
                                   StreamBase* sink,
245
                                   Local<Object> obj) {
246
30
  std::unique_ptr<StreamPipe> stream_pipe(new StreamPipe(source, sink, obj));
247
248
  // Set up links between this object and the source/sink objects.
249
  // In particular, this makes sure that they are garbage collected as a group,
250
  // if that applies to the given streams (for example, Http2Streams use
251
  // weak references).
252
15
  Environment* env = source->stream_env();
253
45
  if (obj->Set(env->context(), env->source_string(), source->GetObject())
254
15
          .IsNothing()) {
255
    return Nothing<StreamPipe*>();
256
  }
257
15
  if (source->GetObject()
258
30
          ->Set(env->context(), env->pipe_target_string(), obj)
259
15
          .IsNothing()) {
260
    return Nothing<StreamPipe*>();
261
  }
262
45
  if (obj->Set(env->context(), env->sink_string(), sink->GetObject())
263
15
          .IsNothing()) {
264
    return Nothing<StreamPipe*>();
265
  }
266
15
  if (sink->GetObject()
267
30
          ->Set(env->context(), env->pipe_source_string(), obj)
268
15
          .IsNothing()) {
269
    return Nothing<StreamPipe*>();
270
  }
271
272
15
  return Just(stream_pipe.release());
273
}
274
275
15
void StreamPipe::New(const FunctionCallbackInfo<Value>& args) {
276
15
  CHECK(args.IsConstructCall());
277
15
  CHECK(args[0]->IsObject());
278
15
  CHECK(args[1]->IsObject());
279
30
  StreamBase* source = StreamBase::FromObject(args[0].As<Object>());
280
30
  StreamBase* sink = StreamBase::FromObject(args[1].As<Object>());
281
282
30
  if (StreamPipe::New(source, sink, args.This()).IsNothing()) return;
283
}
284
285
15
void StreamPipe::Start(const FunctionCallbackInfo<Value>& args) {
286
  StreamPipe* pipe;
287
15
  ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
288
15
  pipe->is_closed_ = false;
289
15
  pipe->writable_listener_.OnStreamWantsWrite(65536);
290
}
291
292
void StreamPipe::Unpipe(const FunctionCallbackInfo<Value>& args) {
293
  StreamPipe* pipe;
294
  ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
295
  pipe->Unpipe();
296
}
297
298
void StreamPipe::IsClosed(const FunctionCallbackInfo<Value>& args) {
299
  StreamPipe* pipe;
300
  ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
301
  args.GetReturnValue().Set(pipe->is_closed_);
302
}
303
304
void StreamPipe::PendingWrites(const FunctionCallbackInfo<Value>& args) {
305
  StreamPipe* pipe;
306
  ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
307
  args.GetReturnValue().Set(pipe->pending_writes_);
308
}
309
310
namespace {
311
312
289
void InitializeStreamPipe(Local<Object> target,
313
                          Local<Value> unused,
314
                          Local<Context> context,
315
                          void* priv) {
316
289
  Environment* env = Environment::GetCurrent(context);
317
289
  Isolate* isolate = env->isolate();
318
319
  // Create FunctionTemplate for FileHandle::CloseReq
320
289
  Local<FunctionTemplate> pipe = NewFunctionTemplate(isolate, StreamPipe::New);
321
289
  SetProtoMethod(isolate, pipe, "unpipe", StreamPipe::Unpipe);
322
289
  SetProtoMethod(isolate, pipe, "start", StreamPipe::Start);
323
289
  SetProtoMethod(isolate, pipe, "isClosed", StreamPipe::IsClosed);
324
289
  SetProtoMethod(isolate, pipe, "pendingWrites", StreamPipe::PendingWrites);
325
289
  pipe->Inherit(AsyncWrap::GetConstructorTemplate(env));
326
578
  pipe->InstanceTemplate()->SetInternalFieldCount(
327
      StreamPipe::kInternalFieldCount);
328
289
  SetConstructorFunction(context, target, "StreamPipe", pipe);
329
289
}
330
331
}  // anonymous namespace
332
333
}  // namespace node
334
335
5321
NODE_MODULE_CONTEXT_AWARE_INTERNAL(stream_pipe,
336
                                   node::InitializeStreamPipe)