GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage/nodes/benchmark/out/../src/stream_pipe.cc Lines: 139 161 86.3 %
Date: 2019-01-07 12:15:22 Branches: 34 64 53.1 %

Line Branch Exec Source
1
#include "stream_pipe.h"
2
#include "stream_base-inl.h"
3
#include "node_buffer.h"
4
#include "node_internals.h"
5
6
using v8::Context;
7
using v8::External;
8
using v8::FunctionCallbackInfo;
9
using v8::FunctionTemplate;
10
using v8::Local;
11
using v8::Object;
12
using v8::Value;
13
14
namespace node {
15
16
13
StreamPipe::StreamPipe(StreamBase* source,
17
                       StreamBase* sink,
18
                       Local<Object> obj)
19
13
    : AsyncWrap(source->stream_env(), obj, AsyncWrap::PROVIDER_STREAMPIPE) {
20
13
  MakeWeak();
21
22
13
  CHECK_NOT_NULL(sink);
23
13
  CHECK_NOT_NULL(source);
24
25
13
  source->PushStreamListener(&readable_listener_);
26
13
  sink->PushStreamListener(&writable_listener_);
27
28
13
  CHECK(sink->HasWantsWrite());
29
30
  // Set up links between this object and the source/sink objects.
31
  // In particular, this makes sure that they are garbage collected as a group,
32
  // if that applies to the given streams (for example, Http2Streams use
33
  // weak references).
34
65
  obj->Set(env()->context(), env()->source_string(), source->GetObject())
35
26
      .FromJust();
36
52
  source->GetObject()->Set(env()->context(), env()->pipe_target_string(), obj)
37
26
      .FromJust();
38
65
  obj->Set(env()->context(), env()->sink_string(), sink->GetObject())
39
26
      .FromJust();
40
52
  sink->GetObject()->Set(env()->context(), env()->pipe_source_string(), obj)
41
26
      .FromJust();
42
13
}
43
44
39
StreamPipe::~StreamPipe() {
45
13
  CHECK(is_closed_);
46
26
}
47
48
644
StreamBase* StreamPipe::source() {
49
644
  return static_cast<StreamBase*>(readable_listener_.stream());
50
}
51
52
227
StreamBase* StreamPipe::sink() {
53
227
  return static_cast<StreamBase*>(writable_listener_.stream());
54
}
55
56
25
void StreamPipe::Unpipe() {
57
25
  if (is_closed_)
58
37
    return;
59
60
  // Note that we possibly cannot use virtual methods on `source` and `sink`
61
  // here, because this function can be called from their destructors via
62
  // `OnStreamDestroy()`.
63
13
  if (!source_destroyed_)
64
13
    source()->ReadStop();
65
66
13
  is_closed_ = true;
67
13
  is_reading_ = false;
68
13
  source()->RemoveStreamListener(&readable_listener_);
69
13
  sink()->RemoveStreamListener(&writable_listener_);
70
71
  // Delay the JS-facing part with SetImmediate, because this might be from
72
  // inside the garbage collector, so we can’t run JS here.
73
13
  HandleScope handle_scope(env()->isolate());
74
39
  env()->SetImmediate([](Environment* env, void* data) {
75
13
    StreamPipe* pipe = static_cast<StreamPipe*>(data);
76
77
13
    HandleScope handle_scope(env->isolate());
78
13
    Context::Scope context_scope(env->context());
79
13
    Local<Object> object = pipe->object();
80
81
52
    if (object->Has(env->context(), env->onunpipe_string()).FromJust()) {
82
26
      pipe->MakeCallback(env->onunpipe_string(), 0, nullptr).ToLocalChecked();
83
    }
84
85
    // Set all the links established in the constructor to `null`.
86
13
    Local<Value> null = Null(env->isolate());
87
88
    Local<Value> source_v;
89
    Local<Value> sink_v;
90
52
    source_v = object->Get(env->context(), env->source_string())
91
26
        .ToLocalChecked();
92
52
    sink_v = object->Get(env->context(), env->sink_string())
93
26
        .ToLocalChecked();
94
13
    CHECK(source_v->IsObject());
95
13
    CHECK(sink_v->IsObject());
96
97
52
    object->Set(env->context(), env->source_string(), null).FromJust();
98
52
    object->Set(env->context(), env->sink_string(), null).FromJust();
99
    source_v.As<Object>()->Set(env->context(),
100
                               env->pipe_target_string(),
101
65
                               null).FromJust();
102
    sink_v.As<Object>()->Set(env->context(),
103
                             env->pipe_source_string(),
104
65
                             null).FromJust();
105
52
  }, static_cast<void*>(this), object());
106
}
107
108
208
uv_buf_t StreamPipe::ReadableListener::OnStreamAlloc(size_t suggested_size) {
109
208
  StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
110
208
  size_t size = std::min(suggested_size, pipe->wanted_data_);
111
208
  CHECK_GT(size, 0);
112
208
  return uv_buf_init(Malloc(size), size);
113
}
114
115
214
void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
116
                                                const uv_buf_t& buf) {
117
214
  StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
118
214
  AsyncScope async_scope(pipe);
119
214
  if (nread < 0) {
120
    // EOF or error; stop reading and pass the error to the previous listener
121
    // (which might end up in JS).
122
12
    free(buf.base);
123
12
    pipe->is_eof_ = true;
124
12
    stream()->ReadStop();
125
12
    CHECK_NOT_NULL(previous_listener_);
126
12
    previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0));
127
    // If we’re not writing, close now. Otherwise, we’ll do that in
128
    // `OnStreamAfterWrite()`.
129
12
    if (!pipe->is_writing_) {
130
6
      pipe->ShutdownWritable();
131
6
      pipe->Unpipe();
132
    }
133
226
    return;
134
  }
135
136
202
  pipe->ProcessData(nread, buf);
137
}
138
139
202
void StreamPipe::ProcessData(size_t nread, const uv_buf_t& buf) {
140
202
  uv_buf_t buffer = uv_buf_init(buf.base, nread);
141
202
  StreamWriteResult res = sink()->Write(&buffer, 1);
142
202
  if (!res.async) {
143
    free(buf.base);
144
    writable_listener_.OnStreamAfterWrite(nullptr, res.err);
145
  } else {
146
202
    is_writing_ = true;
147
202
    is_reading_ = false;
148
202
    res.wrap->SetAllocatedStorage(buf.base, buf.len);
149
202
    if (source() != nullptr)
150
202
      source()->ReadStop();
151
  }
152
202
}
153
154
12
void StreamPipe::ShutdownWritable() {
155
12
  sink()->Shutdown();
156
12
}
157
158
201
void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w,
159
                                                      int status) {
160
201
  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
161
201
  pipe->is_writing_ = false;
162
201
  if (pipe->is_eof_) {
163
6
    AsyncScope async_scope(pipe);
164
6
    pipe->ShutdownWritable();
165
6
    pipe->Unpipe();
166
6
    return;
167
  }
168
169
195
  if (status != 0) {
170
1
    CHECK_NOT_NULL(previous_listener_);
171
1
    StreamListener* prev = previous_listener_;
172
1
    pipe->Unpipe();
173
1
    prev->OnStreamAfterWrite(w, status);
174
1
    return;
175
  }
176
}
177
178
12
void StreamPipe::WritableListener::OnStreamAfterShutdown(ShutdownWrap* w,
179
                                                         int status) {
180
12
  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
181
12
  CHECK_NOT_NULL(previous_listener_);
182
12
  StreamListener* prev = previous_listener_;
183
12
  pipe->Unpipe();
184
12
  prev->OnStreamAfterShutdown(w, status);
185
12
}
186
187
void StreamPipe::ReadableListener::OnStreamDestroy() {
188
  StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
189
  pipe->source_destroyed_ = true;
190
  if (!pipe->is_eof_) {
191
    OnStreamRead(UV_EPIPE, uv_buf_init(nullptr, 0));
192
  }
193
}
194
195
void StreamPipe::WritableListener::OnStreamDestroy() {
196
  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
197
  pipe->sink_destroyed_ = true;
198
  pipe->is_eof_ = true;
199
  pipe->Unpipe();
200
}
201
202
214
void StreamPipe::WritableListener::OnStreamWantsWrite(size_t suggested_size) {
203
214
  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
204
214
  pipe->wanted_data_ = suggested_size;
205

214
  if (pipe->is_reading_ || pipe->is_closed_)
206
214
    return;
207
214
  AsyncScope async_scope(pipe);
208
214
  pipe->is_reading_ = true;
209
214
  pipe->source()->ReadStart();
210
}
211
212
uv_buf_t StreamPipe::WritableListener::OnStreamAlloc(size_t suggested_size) {
213
  CHECK_NOT_NULL(previous_listener_);
214
  return previous_listener_->OnStreamAlloc(suggested_size);
215
}
216
217
void StreamPipe::WritableListener::OnStreamRead(ssize_t nread,
218
                                                const uv_buf_t& buf) {
219
  CHECK_NOT_NULL(previous_listener_);
220
  return previous_listener_->OnStreamRead(nread, buf);
221
}
222
223
13
void StreamPipe::New(const FunctionCallbackInfo<Value>& args) {
224
13
  CHECK(args.IsConstructCall());
225
26
  CHECK(args[0]->IsExternal());
226
26
  CHECK(args[1]->IsExternal());
227
39
  auto source = static_cast<StreamBase*>(args[0].As<External>()->Value());
228
39
  auto sink = static_cast<StreamBase*>(args[1].As<External>()->Value());
229
230
13
  new StreamPipe(source, sink, args.This());
231
13
}
232
233
13
void StreamPipe::Start(const FunctionCallbackInfo<Value>& args) {
234
  StreamPipe* pipe;
235
26
  ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
236
13
  pipe->is_closed_ = false;
237
13
  if (pipe->wanted_data_ > 0)
238
    pipe->writable_listener_.OnStreamWantsWrite(pipe->wanted_data_);
239
}
240
241
void StreamPipe::Unpipe(const FunctionCallbackInfo<Value>& args) {
242
  StreamPipe* pipe;
243
  ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
244
  pipe->Unpipe();
245
}
246
247
namespace {
248
249
202
void InitializeStreamPipe(Local<Object> target,
250
                          Local<Value> unused,
251
                          Local<Context> context,
252
                          void* priv) {
253
202
  Environment* env = Environment::GetCurrent(context);
254
255
  // Create FunctionTemplate for FileHandle::CloseReq
256
202
  Local<FunctionTemplate> pipe = env->NewFunctionTemplate(StreamPipe::New);
257
  Local<String> stream_pipe_string =
258
202
      FIXED_ONE_BYTE_STRING(env->isolate(), "StreamPipe");
259
202
  env->SetProtoMethod(pipe, "unpipe", StreamPipe::Unpipe);
260
202
  env->SetProtoMethod(pipe, "start", StreamPipe::Start);
261
404
  pipe->Inherit(AsyncWrap::GetConstructorTemplate(env));
262
202
  pipe->SetClassName(stream_pipe_string);
263
404
  pipe->InstanceTemplate()->SetInternalFieldCount(1);
264
  target
265
      ->Set(context, stream_pipe_string,
266
606
            pipe->GetFunction(context).ToLocalChecked())
267
404
      .FromJust();
268
202
}
269
270
}  // anonymous namespace
271
272
}  // namespace node
273
274
3596
NODE_MODULE_CONTEXT_AWARE_INTERNAL(stream_pipe,
275
                                   node::InitializeStreamPipe)