GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/stream_base.cc Lines: 184 195 94.4 %
Date: 2019-02-13 22:28:58 Branches: 158 242 65.3 %

Line Branch Exec Source
1
#include "stream_base-inl.h"
2
#include "stream_wrap.h"
3
4
#include "node.h"
5
#include "node_buffer.h"
6
#include "node_errors.h"
7
#include "env-inl.h"
8
#include "js_stream.h"
9
#include "string_bytes.h"
10
#include "util-inl.h"
11
#include "v8.h"
12
13
#include <limits.h>  // INT_MAX
14
15
namespace node {
16
17
using v8::Array;
18
using v8::ArrayBuffer;
19
using v8::Context;
20
using v8::FunctionCallbackInfo;
21
using v8::HandleScope;
22
using v8::Integer;
23
using v8::Local;
24
using v8::Object;
25
using v8::String;
26
using v8::Value;
27
28
template int StreamBase::WriteString<ASCII>(
29
    const FunctionCallbackInfo<Value>& args);
30
template int StreamBase::WriteString<UTF8>(
31
    const FunctionCallbackInfo<Value>& args);
32
template int StreamBase::WriteString<UCS2>(
33
    const FunctionCallbackInfo<Value>& args);
34
template int StreamBase::WriteString<LATIN1>(
35
    const FunctionCallbackInfo<Value>& args);
36
37
38
62034
int StreamBase::ReadStartJS(const FunctionCallbackInfo<Value>& args) {
39
62034
  return ReadStart();
40
}
41
42
43
695
int StreamBase::ReadStopJS(const FunctionCallbackInfo<Value>& args) {
44
695
  return ReadStop();
45
}
46
47
48
43263
int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
49
86526
  CHECK(args[0]->IsObject());
50
86526
  Local<Object> req_wrap_obj = args[0].As<Object>();
51
52
43263
  return Shutdown(req_wrap_obj);
53
}
54
55
766526
void StreamBase::SetWriteResult(const StreamWriteResult& res) {
56
766526
  env_->stream_base_state()[kBytesWritten] = res.bytes;
57
766526
  env_->stream_base_state()[kLastWriteWasAsync] = res.async;
58
766526
}
59
60
22486
int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
61
22486
  Environment* env = Environment::GetCurrent(args);
62
63
44972
  CHECK(args[0]->IsObject());
64
44972
  CHECK(args[1]->IsArray());
65
66
44972
  Local<Object> req_wrap_obj = args[0].As<Object>();
67
44972
  Local<Array> chunks = args[1].As<Array>();
68
44972
  bool all_buffers = args[2]->IsTrue();
69
70
  size_t count;
71
22486
  if (all_buffers)
72
280
    count = chunks->Length();
73
  else
74
22206
    count = chunks->Length() >> 1;
75
76
22486
  MaybeStackBuffer<uv_buf_t, 16> bufs(count);
77
78
22486
  size_t storage_size = 0;
79
  size_t offset;
80
81
22486
  if (!all_buffers) {
82
    // Determine storage size first
83
310521
    for (size_t i = 0; i < count; i++) {
84
864945
      Local<Value> chunk = chunks->Get(env->context(), i * 2).ToLocalChecked();
85
86
288315
      if (Buffer::HasInstance(chunk))
87
43413
        continue;
88
        // Buffer chunk, no additional storage required
89
90
      // String chunk
91
734706
      Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
92
      enum encoding encoding = ParseEncoding(env->isolate(),
93
734706
          chunks->Get(env->context(), i * 2 + 1).ToLocalChecked());
94
      size_t chunk_size;
95


922614
      if (encoding == UTF8 && string->Length() > 65535 &&
96

244914
          !StringBytes::Size(env->isolate(), string, encoding).To(&chunk_size))
97
        return 0;
98
734706
      else if (!StringBytes::StorageSize(env->isolate(), string, encoding)
99
734706
                    .To(&chunk_size))
100
        return 0;
101
244902
      storage_size += chunk_size;
102
    }
103
104
22206
    if (storage_size > INT_MAX)
105
      return UV_ENOBUFS;
106
  } else {
107
28564
    for (size_t i = 0; i < count; i++) {
108
84852
      Local<Value> chunk = chunks->Get(env->context(), i).ToLocalChecked();
109
28284
      bufs[i].base = Buffer::Data(chunk);
110
28284
      bufs[i].len = Buffer::Length(chunk);
111
    }
112
  }
113
114
44972
  MallocedBuffer<char> storage;
115
22486
  if (storage_size > 0)
116
22160
    storage = MallocedBuffer<char>(storage_size);
117
118
22486
  offset = 0;
119
22486
  if (!all_buffers) {
120
310521
    for (size_t i = 0; i < count; i++) {
121
864945
      Local<Value> chunk = chunks->Get(env->context(), i * 2).ToLocalChecked();
122
123
      // Write buffer
124
288315
      if (Buffer::HasInstance(chunk)) {
125
43413
        bufs[i].base = Buffer::Data(chunk);
126
43413
        bufs[i].len = Buffer::Length(chunk);
127
43413
        continue;
128
      }
129
130
      // Write string
131
244902
      CHECK_LE(offset, storage_size);
132
244902
      char* str_storage = storage.data + offset;
133
244902
      size_t str_size = storage_size - offset;
134
135
734706
      Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
136
      enum encoding encoding = ParseEncoding(env->isolate(),
137
734706
          chunks->Get(env->context(), i * 2 + 1).ToLocalChecked());
138
      str_size = StringBytes::Write(env->isolate(),
139
                                    str_storage,
140
                                    str_size,
141
                                    string,
142
244902
                                    encoding);
143
244902
      bufs[i].base = str_storage;
144
244902
      bufs[i].len = str_size;
145
244902
      offset += str_size;
146
    }
147
  }
148
149
22486
  StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj);
150
22486
  SetWriteResult(res);
151

22486
  if (res.wrap != nullptr && storage_size > 0) {
152
270
    res.wrap->SetAllocatedStorage(storage.release(), storage_size);
153
  }
154
44972
  return res.err;
155
}
156
157
158
67897
int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
159
135794
  CHECK(args[0]->IsObject());
160
161
67897
  Environment* env = Environment::GetCurrent(args);
162
163
135794
  if (!args[1]->IsUint8Array()) {
164
1
    node::THROW_ERR_INVALID_ARG_TYPE(env, "Second argument must be a buffer");
165
1
    return 0;
166
  }
167
168
135792
  Local<Object> req_wrap_obj = args[0].As<Object>();
169
170
  uv_buf_t buf;
171
67896
  buf.base = Buffer::Data(args[1]);
172
67896
  buf.len = Buffer::Length(args[1]);
173
174
67896
  StreamWriteResult res = Write(&buf, 1, nullptr, req_wrap_obj);
175
67896
  SetWriteResult(res);
176
177
67896
  return res.err;
178
}
179
180
181
template <enum encoding enc>
182
676144
int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
183
676144
  Environment* env = Environment::GetCurrent(args);
184


1352288
  CHECK(args[0]->IsObject());
185


2028432
  CHECK(args[1]->IsString());
186
187
1352288
  Local<Object> req_wrap_obj = args[0].As<Object>();
188
1352288
  Local<String> string = args[1].As<String>();
189
  Local<Object> send_handle_obj;
190


1352288
  if (args[2]->IsObject())
191
208
    send_handle_obj = args[2].As<Object>();
192
193
  // Compute the size of the storage that the string will be flattened into.
194
  // For UTF8 strings that are very long, go ahead and take the hit for
195
  // computing their actual size, rather than tripling the storage.
196
  size_t storage_size;
197

2654010
  if (enc == UTF8 && string->Length() > 65535 &&
198

663552
      !StringBytes::Size(env->isolate(), string, enc).To(&storage_size))
199
    return 0;
200


2028432
  else if (!StringBytes::StorageSize(env->isolate(), string, enc)
201
2028432
                .To(&storage_size))
202
    return 0;
203
204


676144
  if (storage_size > INT_MAX)
205
    return UV_ENOBUFS;
206
207
  // Try writing immediately if write size isn't too big
208
  char stack_storage[16384];  // 16kb
209
  size_t data_size;
210
676144
  size_t synchronously_written = 0;
211
  uv_buf_t buf;
212
213
  bool try_write = storage_size <= sizeof(stack_storage) &&
214






677973
                   (!IsIPCPipe() || send_handle_obj.IsEmpty());
215


676144
  if (try_write) {
216
675855
    data_size = StringBytes::Write(env->isolate(),
217
                                   stack_storage,
218
                                   storage_size,
219
                                   string,
220
1351710
                                   enc);
221
675855
    buf = uv_buf_init(stack_storage, data_size);
222
223
675855
    uv_buf_t* bufs = &buf;
224
675855
    size_t count = 1;
225
675855
    const int err = DoTryWrite(&bufs, &count);
226
    // Keep track of the bytes written here, because we're taking a shortcut
227
    // by using `DoTryWrite()` directly instead of using the utilities
228
    // provided by `Write()`.
229


675855
    synchronously_written = count == 0 ? data_size : data_size - buf.len;
230
675855
    bytes_written_ += synchronously_written;
231
232
    // Immediate failure or success
233




675855
    if (err != 0 || count == 0) {
234
674213
      SetWriteResult(StreamWriteResult { false, err, nullptr, data_size });
235
674213
      return err;
236
    }
237
238
    // Partial write
239


1642
    CHECK_EQ(count, 1);
240
  }
241
242
1931
  MallocedBuffer<char> data;
243
244


1931
  if (try_write) {
245
    // Copy partial data
246
1642
    data = MallocedBuffer<char>(buf.len);
247
1642
    memcpy(data.data, buf.base, buf.len);
248
1642
    data_size = buf.len;
249
  } else {
250
    // Write it
251
289
    data = MallocedBuffer<char>(storage_size);
252
289
    data_size = StringBytes::Write(env->isolate(),
253
                                   data.data,
254
                                   storage_size,
255
                                   string,
256
578
                                   enc);
257
  }
258
259


1931
  CHECK_LE(data_size, storage_size);
260
261
1931
  buf = uv_buf_init(data.data, data_size);
262
263
1931
  uv_stream_t* send_handle = nullptr;
264
265






2038
  if (IsIPCPipe() && !send_handle_obj.IsEmpty()) {
266
    HandleWrap* wrap;
267


104
    ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
268
104
    send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
269
    // Reference LibuvStreamWrap instance to prevent it from being garbage
270
    // collected before `AfterWrite` is called.
271
    req_wrap_obj->Set(env->context(),
272
                      env->handle_string(),
273
416
                      send_handle_obj).FromJust();
274
  }
275
276
1931
  StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
277
1931
  res.bytes += synchronously_written;
278
279
1931
  SetWriteResult(res);
280


1931
  if (res.wrap != nullptr) {
281
1757
    res.wrap->SetAllocatedStorage(data.release(), data_size);
282
  }
283
284
1931
  return res.err;
285
}
286
287
288
373662
void StreamBase::CallJSOnreadMethod(ssize_t nread,
289
                                    Local<ArrayBuffer> ab,
290
                                    size_t offset) {
291
373662
  Environment* env = env_;
292
293
  DCHECK_EQ(static_cast<int32_t>(nread), nread);
294
  DCHECK_LE(offset, INT32_MAX);
295
296
373662
  if (ab.IsEmpty()) {
297
    DCHECK_EQ(offset, 0);
298
    DCHECK_LE(nread, 0);
299
  } else {
300
    DCHECK_GE(nread, 0);
301
  }
302
303
373662
  env->stream_base_state()[kReadBytesOrError] = nread;
304
373662
  env->stream_base_state()[kArrayBufferOffset] = offset;
305
306
  Local<Value> argv[] = {
307
799467
    ab.IsEmpty() ? Undefined(env->isolate()).As<Value>() : ab.As<Value>()
308
1868310
  };
309
310
373662
  AsyncWrap* wrap = GetAsyncWrap();
311
373662
  CHECK_NOT_NULL(wrap);
312
373662
  wrap->MakeCallback(env->onread_string(), arraysize(argv), argv);
313
373597
}
314
315
316
2353
bool StreamBase::IsIPCPipe() {
317
2353
  return false;
318
}
319
320
321
int StreamBase::GetFD() {
322
  return -1;
323
}
324
325
326
25700
Local<Object> StreamBase::GetObject() {
327
25700
  return GetAsyncWrap()->object();
328
}
329
330
331
6136
int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
332
  // No TryWrite by default
333
6136
  return 0;
334
}
335
336
337
71607
const char* StreamResource::Error() const {
338
71607
  return nullptr;
339
}
340
341
342
void StreamResource::ClearError() {
343
  // No-op
344
}
345
346
347
379358
uv_buf_t StreamListener::OnStreamAlloc(size_t suggested_size) {
348
379358
  return uv_buf_init(Malloc(suggested_size), suggested_size);
349
}
350
351
352
363440
void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
353
363440
  CHECK_NOT_NULL(stream_);
354
363440
  StreamBase* stream = static_cast<StreamBase*>(stream_);
355
363440
  Environment* env = stream->stream_env();
356
363440
  HandleScope handle_scope(env->isolate());
357
709452
  Context::Scope context_scope(env->context());
358
359
363440
  if (nread <= 0)  {
360
17387
    free(buf.base);
361
17387
    if (nread < 0)
362
17381
      stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
363
380738
    return;
364
  }
365
366
346053
  CHECK_LE(static_cast<size_t>(nread), buf.len);
367
346053
  char* base = Realloc(buf.base, nread);
368
369
  Local<ArrayBuffer> obj = ArrayBuffer::New(
370
      env->isolate(),
371
      base,
372
      nread,
373
346053
      v8::ArrayBufferCreationMode::kInternalized);  // Transfer ownership to V8.
374
692065
  stream->CallJSOnreadMethod(nread, obj);
375
}
376
377
378
25644
void ReportWritesToJSStreamListener::OnStreamAfterReqFinished(
379
    StreamReq* req_wrap, int status) {
380
25644
  StreamBase* stream = static_cast<StreamBase*>(stream_);
381
25644
  Environment* env = stream->stream_env();
382
25644
  AsyncWrap* async_wrap = req_wrap->GetAsyncWrap();
383
25644
  HandleScope handle_scope(env->isolate());
384
25644
  Context::Scope context_scope(env->context());
385
51288
  CHECK(!async_wrap->persistent().IsEmpty());
386
25644
  Local<Object> req_wrap_obj = async_wrap->object();
387
388
  Local<Value> argv[] = {
389
    Integer::New(env->isolate(), status),
390
25644
    stream->GetObject(),
391
    Undefined(env->isolate())
392
128220
  };
393
394
25644
  const char* msg = stream->Error();
395
25644
  if (msg != nullptr) {
396
    argv[2] = OneByteString(env->isolate(), msg);
397
    stream->ClearError();
398
  }
399
400
102576
  if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
401
51283
    async_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
402
25641
}
403
404
3501
void ReportWritesToJSStreamListener::OnStreamAfterWrite(
405
    WriteWrap* req_wrap, int status) {
406
3501
  OnStreamAfterReqFinished(req_wrap, status);
407
3498
}
408
409
22143
void ReportWritesToJSStreamListener::OnStreamAfterShutdown(
410
    ShutdownWrap* req_wrap, int status) {
411
22143
  OnStreamAfterReqFinished(req_wrap, status);
412
22143
}
413
414
415
}  // namespace node