GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/stream_pipe.cc Lines: 132 163 81.0 %
Date: 2019-05-05 22:32:45 Branches: 61 118 51.7 %

Line Branch Exec Source
1
#include "stream_pipe.h"
2
#include "stream_base-inl.h"
3
#include "node_buffer.h"
4
5
using v8::Context;
6
using v8::Function;
7
using v8::FunctionCallbackInfo;
8
using v8::FunctionTemplate;
9
using v8::Local;
10
using v8::Object;
11
using v8::Value;
12
13
namespace node {
14
15
15
StreamPipe::StreamPipe(StreamBase* source,
16
                       StreamBase* sink,
17
                       Local<Object> obj)
18
15
    : AsyncWrap(source->stream_env(), obj, AsyncWrap::PROVIDER_STREAMPIPE) {
19
15
  MakeWeak();
20
21
15
  CHECK_NOT_NULL(sink);
22
15
  CHECK_NOT_NULL(source);
23
24
15
  source->PushStreamListener(&readable_listener_);
25
15
  sink->PushStreamListener(&writable_listener_);
26
27
15
  CHECK(sink->HasWantsWrite());
28
29
  // Set up links between this object and the source/sink objects.
30
  // In particular, this makes sure that they are garbage collected as a group,
31
  // if that applies to the given streams (for example, Http2Streams use
32
  // weak references).
33
75
  obj->Set(env()->context(), env()->source_string(), source->GetObject())
34
30
      .Check();
35
60
  source->GetObject()->Set(env()->context(), env()->pipe_target_string(), obj)
36
30
      .Check();
37
75
  obj->Set(env()->context(), env()->sink_string(), sink->GetObject())
38
30
      .Check();
39
60
  sink->GetObject()->Set(env()->context(), env()->pipe_source_string(), obj)
40
30
      .Check();
41
15
}
42
43
45
StreamPipe::~StreamPipe() {
44
15
  Unpipe();
45
30
}
46
47
685
StreamBase* StreamPipe::source() {
48
685
  return static_cast<StreamBase*>(readable_listener_.stream());
49
}
50
51
242
StreamBase* StreamPipe::sink() {
52
242
  return static_cast<StreamBase*>(writable_listener_.stream());
53
}
54
55
29
void StreamPipe::Unpipe() {
56
29
  if (is_closed_)
57
43
    return;
58
59
  // Note that we possibly cannot use virtual methods on `source` and `sink`
60
  // here, because this function can be called from their destructors via
61
  // `OnStreamDestroy()`.
62
15
  if (!source_destroyed_)
63
15
    source()->ReadStop();
64
65
15
  is_closed_ = true;
66
15
  is_reading_ = false;
67
15
  source()->RemoveStreamListener(&readable_listener_);
68
15
  sink()->RemoveStreamListener(&writable_listener_);
69
70
  // Delay the JS-facing part with SetImmediate, because this might be from
71
  // inside the garbage collector, so we can’t run JS here.
72
15
  HandleScope handle_scope(env()->isolate());
73
43
  env()->SetImmediate([](Environment* env, void* data) {
74
14
    StreamPipe* pipe = static_cast<StreamPipe*>(data);
75
76
14
    HandleScope handle_scope(env->isolate());
77
28
    Context::Scope context_scope(env->context());
78
14
    Local<Object> object = pipe->object();
79
80
    Local<Value> onunpipe;
81
56
    if (!object->Get(env->context(), env->onunpipe_string()).ToLocal(&onunpipe))
82
      return;
83

70
    if (onunpipe->IsFunction() &&
84
56
        pipe->MakeCallback(onunpipe.As<Function>(), 0, nullptr).IsEmpty()) {
85
      return;
86
    }
87
88
    // Set all the links established in the constructor to `null`.
89
14
    Local<Value> null = Null(env->isolate());
90
91
    Local<Value> source_v;
92
    Local<Value> sink_v;
93


112
    if (!object->Get(env->context(), env->source_string()).ToLocal(&source_v) ||
94

84
        !object->Get(env->context(), env->sink_string()).ToLocal(&sink_v) ||
95

70
        !source_v->IsObject() || !sink_v->IsObject()) {
96
      return;
97
    }
98
99


112
    if (object->Set(env->context(), env->source_string(), null).IsNothing() ||
100

98
        object->Set(env->context(), env->sink_string(), null).IsNothing() ||
101
        source_v.As<Object>()
102

70
            ->Set(env->context(), env->pipe_target_string(), null)
103

140
            .IsNothing() ||
104
        sink_v.As<Object>()
105

70
            ->Set(env->context(), env->pipe_source_string(), null)
106
70
            .IsNothing()) {
107
      return;
108
    }
109
58
  }, static_cast<void*>(this), object());
110
}
111
112
221
uv_buf_t StreamPipe::ReadableListener::OnStreamAlloc(size_t suggested_size) {
113
221
  StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
114
221
  size_t size = std::min(suggested_size, pipe->wanted_data_);
115
221
  CHECK_GT(size, 0);
116
221
  return pipe->env()->AllocateManaged(size).release();
117
}
118
119
227
void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
120
                                                const uv_buf_t& buf_) {
121
227
  StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
122
227
  AllocatedBuffer buf(pipe->env(), buf_);
123
441
  AsyncScope async_scope(pipe);
124
227
  if (nread < 0) {
125
    // EOF or error; stop reading and pass the error to the previous listener
126
    // (which might end up in JS).
127
13
    pipe->is_eof_ = true;
128
13
    stream()->ReadStop();
129
13
    CHECK_NOT_NULL(previous_listener_);
130
13
    previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0));
131
    // If we’re not writing, close now. Otherwise, we’ll do that in
132
    // `OnStreamAfterWrite()`.
133
13
    if (!pipe->is_writing_) {
134
7
      pipe->ShutdownWritable();
135
7
      pipe->Unpipe();
136
    }
137
240
    return;
138
  }
139
140
428
  pipe->ProcessData(nread, std::move(buf));
141
}
142
143
214
void StreamPipe::ProcessData(size_t nread, AllocatedBuffer&& buf) {
144
214
  uv_buf_t buffer = uv_buf_init(buf.data(), nread);
145
214
  StreamWriteResult res = sink()->Write(&buffer, 1);
146
214
  if (!res.async) {
147
    writable_listener_.OnStreamAfterWrite(nullptr, res.err);
148
  } else {
149
214
    is_writing_ = true;
150
214
    is_reading_ = false;
151
214
    res.wrap->SetAllocatedStorage(std::move(buf));
152
214
    if (source() != nullptr)
153
214
      source()->ReadStop();
154
  }
155
214
}
156
157
13
void StreamPipe::ShutdownWritable() {
158
13
  sink()->Shutdown();
159
13
}
160
161
211
void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w,
162
                                                      int status) {
163
211
  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
164
211
  pipe->is_writing_ = false;
165
211
  if (pipe->is_eof_) {
166
6
    AsyncScope async_scope(pipe);
167
6
    pipe->ShutdownWritable();
168
6
    pipe->Unpipe();
169
6
    return;
170
  }
171
172
205
  if (status != 0) {
173
1
    CHECK_NOT_NULL(previous_listener_);
174
1
    StreamListener* prev = previous_listener_;
175
1
    pipe->Unpipe();
176
1
    prev->OnStreamAfterWrite(w, status);
177
1
    return;
178
  }
179
}
180
181
void StreamPipe::WritableListener::OnStreamAfterShutdown(ShutdownWrap* w,
182
                                                         int status) {
183
  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
184
  CHECK_NOT_NULL(previous_listener_);
185
  StreamListener* prev = previous_listener_;
186
  pipe->Unpipe();
187
  prev->OnStreamAfterShutdown(w, status);
188
}
189
190
void StreamPipe::ReadableListener::OnStreamDestroy() {
191
  StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
192
  pipe->source_destroyed_ = true;
193
  if (!pipe->is_eof_) {
194
    OnStreamRead(UV_EPIPE, uv_buf_init(nullptr, 0));
195
  }
196
}
197
198
void StreamPipe::WritableListener::OnStreamDestroy() {
199
  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
200
  pipe->sink_destroyed_ = true;
201
  pipe->is_eof_ = true;
202
  pipe->Unpipe();
203
}
204
205
227
void StreamPipe::WritableListener::OnStreamWantsWrite(size_t suggested_size) {
206
227
  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
207
227
  pipe->wanted_data_ = suggested_size;
208

227
  if (pipe->is_reading_ || pipe->is_closed_)
209
227
    return;
210
227
  AsyncScope async_scope(pipe);
211
227
  pipe->is_reading_ = true;
212
227
  pipe->source()->ReadStart();
213
}
214
215
uv_buf_t StreamPipe::WritableListener::OnStreamAlloc(size_t suggested_size) {
216
  CHECK_NOT_NULL(previous_listener_);
217
  return previous_listener_->OnStreamAlloc(suggested_size);
218
}
219
220
void StreamPipe::WritableListener::OnStreamRead(ssize_t nread,
221
                                                const uv_buf_t& buf) {
222
  CHECK_NOT_NULL(previous_listener_);
223
  return previous_listener_->OnStreamRead(nread, buf);
224
}
225
226
15
void StreamPipe::New(const FunctionCallbackInfo<Value>& args) {
227
15
  CHECK(args.IsConstructCall());
228
30
  CHECK(args[0]->IsObject());
229
30
  CHECK(args[1]->IsObject());
230
30
  StreamBase* source = StreamBase::FromObject(args[0].As<Object>());
231
30
  StreamBase* sink = StreamBase::FromObject(args[1].As<Object>());
232
233
15
  new StreamPipe(source, sink, args.This());
234
15
}
235
236
15
void StreamPipe::Start(const FunctionCallbackInfo<Value>& args) {
237
  StreamPipe* pipe;
238
30
  ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
239
15
  pipe->is_closed_ = false;
240
15
  if (pipe->wanted_data_ > 0)
241
    pipe->writable_listener_.OnStreamWantsWrite(pipe->wanted_data_);
242
}
243
244
void StreamPipe::Unpipe(const FunctionCallbackInfo<Value>& args) {
245
  StreamPipe* pipe;
246
  ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
247
  pipe->Unpipe();
248
}
249
250
namespace {
251
252
217
void InitializeStreamPipe(Local<Object> target,
253
                          Local<Value> unused,
254
                          Local<Context> context,
255
                          void* priv) {
256
217
  Environment* env = Environment::GetCurrent(context);
257
258
  // Create FunctionTemplate for FileHandle::CloseReq
259
217
  Local<FunctionTemplate> pipe = env->NewFunctionTemplate(StreamPipe::New);
260
  Local<String> stream_pipe_string =
261
217
      FIXED_ONE_BYTE_STRING(env->isolate(), "StreamPipe");
262
217
  env->SetProtoMethod(pipe, "unpipe", StreamPipe::Unpipe);
263
217
  env->SetProtoMethod(pipe, "start", StreamPipe::Start);
264
434
  pipe->Inherit(AsyncWrap::GetConstructorTemplate(env));
265
217
  pipe->SetClassName(stream_pipe_string);
266
434
  pipe->InstanceTemplate()->SetInternalFieldCount(1);
267
  target
268
      ->Set(context, stream_pipe_string,
269
651
            pipe->GetFunction(context).ToLocalChecked())
270
434
      .Check();
271
217
}
272
273
}  // anonymous namespace
274
275
}  // namespace node
276
277
4524
NODE_MODULE_CONTEXT_AWARE_INTERNAL(stream_pipe,
278
                                   node::InitializeStreamPipe)