GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/stream_base.cc Lines: 184 195 94.4 %
Date: 2019-02-23 22:23:05 Branches: 158 242 65.3 %

Line Branch Exec Source
1
#include "stream_base-inl.h"
2
#include "stream_wrap.h"
3
4
#include "node.h"
5
#include "node_buffer.h"
6
#include "node_errors.h"
7
#include "env-inl.h"
8
#include "js_stream.h"
9
#include "string_bytes.h"
10
#include "util-inl.h"
11
#include "v8.h"
12
13
#include <limits.h>  // INT_MAX
14
15
namespace node {
16
17
using v8::Array;
18
using v8::ArrayBuffer;
19
using v8::Context;
20
using v8::FunctionCallbackInfo;
21
using v8::HandleScope;
22
using v8::Integer;
23
using v8::Local;
24
using v8::Object;
25
using v8::String;
26
using v8::Value;
27
28
template int StreamBase::WriteString<ASCII>(
29
    const FunctionCallbackInfo<Value>& args);
30
template int StreamBase::WriteString<UTF8>(
31
    const FunctionCallbackInfo<Value>& args);
32
template int StreamBase::WriteString<UCS2>(
33
    const FunctionCallbackInfo<Value>& args);
34
template int StreamBase::WriteString<LATIN1>(
35
    const FunctionCallbackInfo<Value>& args);
36
37
38
61339
int StreamBase::ReadStartJS(const FunctionCallbackInfo<Value>& args) {
39
61339
  return ReadStart();
40
}
41
42
43
603
int StreamBase::ReadStopJS(const FunctionCallbackInfo<Value>& args) {
44
603
  return ReadStop();
45
}
46
47
48
42702
int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
49
85404
  CHECK(args[0]->IsObject());
50
85404
  Local<Object> req_wrap_obj = args[0].As<Object>();
51
52
42702
  return Shutdown(req_wrap_obj);
53
}
54
55
756466
void StreamBase::SetWriteResult(const StreamWriteResult& res) {
56
756466
  env_->stream_base_state()[kBytesWritten] = res.bytes;
57
756466
  env_->stream_base_state()[kLastWriteWasAsync] = res.async;
58
756466
}
59
60
22465
int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
61
22465
  Environment* env = Environment::GetCurrent(args);
62
63
44930
  CHECK(args[0]->IsObject());
64
44930
  CHECK(args[1]->IsArray());
65
66
44930
  Local<Object> req_wrap_obj = args[0].As<Object>();
67
44930
  Local<Array> chunks = args[1].As<Array>();
68
44930
  bool all_buffers = args[2]->IsTrue();
69
70
  size_t count;
71
22465
  if (all_buffers)
72
280
    count = chunks->Length();
73
  else
74
22185
    count = chunks->Length() >> 1;
75
76
22465
  MaybeStackBuffer<uv_buf_t, 16> bufs(count);
77
78
22465
  size_t storage_size = 0;
79
  size_t offset;
80
81
22465
  if (!all_buffers) {
82
    // Determine storage size first
83
310092
    for (size_t i = 0; i < count; i++) {
84
863721
      Local<Value> chunk = chunks->Get(env->context(), i * 2).ToLocalChecked();
85
86
287907
      if (Buffer::HasInstance(chunk))
87
43401
        continue;
88
        // Buffer chunk, no additional storage required
89
90
      // String chunk
91
733518
      Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
92
      enum encoding encoding = ParseEncoding(env->isolate(),
93
733518
          chunks->Get(env->context(), i * 2 + 1).ToLocalChecked());
94
      size_t chunk_size;
95


921040
      if (encoding == UTF8 && string->Length() > 65535 &&
96

244518
          !StringBytes::Size(env->isolate(), string, encoding).To(&chunk_size))
97
        return 0;
98
733518
      else if (!StringBytes::StorageSize(env->isolate(), string, encoding)
99
733518
                    .To(&chunk_size))
100
        return 0;
101
244506
      storage_size += chunk_size;
102
    }
103
104
22185
    if (storage_size > INT_MAX)
105
      return UV_ENOBUFS;
106
  } else {
107
28564
    for (size_t i = 0; i < count; i++) {
108
84852
      Local<Value> chunk = chunks->Get(env->context(), i).ToLocalChecked();
109
28284
      bufs[i].base = Buffer::Data(chunk);
110
28284
      bufs[i].len = Buffer::Length(chunk);
111
    }
112
  }
113
114
44930
  MallocedBuffer<char> storage;
115
22465
  if (storage_size > 0)
116
22139
    storage = MallocedBuffer<char>(storage_size);
117
118
22465
  offset = 0;
119
22465
  if (!all_buffers) {
120
310092
    for (size_t i = 0; i < count; i++) {
121
863721
      Local<Value> chunk = chunks->Get(env->context(), i * 2).ToLocalChecked();
122
123
      // Write buffer
124
287907
      if (Buffer::HasInstance(chunk)) {
125
43401
        bufs[i].base = Buffer::Data(chunk);
126
43401
        bufs[i].len = Buffer::Length(chunk);
127
43401
        continue;
128
      }
129
130
      // Write string
131
244506
      CHECK_LE(offset, storage_size);
132
244506
      char* str_storage = storage.data + offset;
133
244506
      size_t str_size = storage_size - offset;
134
135
733518
      Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
136
      enum encoding encoding = ParseEncoding(env->isolate(),
137
733518
          chunks->Get(env->context(), i * 2 + 1).ToLocalChecked());
138
      str_size = StringBytes::Write(env->isolate(),
139
                                    str_storage,
140
                                    str_size,
141
                                    string,
142
244506
                                    encoding);
143
244506
      bufs[i].base = str_storage;
144
244506
      bufs[i].len = str_size;
145
244506
      offset += str_size;
146
    }
147
  }
148
149
22465
  StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj);
150
22465
  SetWriteResult(res);
151

22465
  if (res.wrap != nullptr && storage_size > 0) {
152
256
    res.wrap->SetAllocatedStorage(storage.release(), storage_size);
153
  }
154
44930
  return res.err;
155
}
156
157
158
67914
int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
159
135828
  CHECK(args[0]->IsObject());
160
161
67914
  Environment* env = Environment::GetCurrent(args);
162
163
135828
  if (!args[1]->IsUint8Array()) {
164
1
    node::THROW_ERR_INVALID_ARG_TYPE(env, "Second argument must be a buffer");
165
1
    return 0;
166
  }
167
168
135826
  Local<Object> req_wrap_obj = args[0].As<Object>();
169
170
  uv_buf_t buf;
171
67913
  buf.base = Buffer::Data(args[1]);
172
67913
  buf.len = Buffer::Length(args[1]);
173
174
67913
  StreamWriteResult res = Write(&buf, 1, nullptr, req_wrap_obj);
175
67913
  SetWriteResult(res);
176
177
67913
  return res.err;
178
}
179
180
181
template <enum encoding enc>
182
666088
int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
183
666088
  Environment* env = Environment::GetCurrent(args);
184


1332176
  CHECK(args[0]->IsObject());
185


1998264
  CHECK(args[1]->IsString());
186
187
1332176
  Local<Object> req_wrap_obj = args[0].As<Object>();
188
1332176
  Local<String> string = args[1].As<String>();
189
  Local<Object> send_handle_obj;
190


1332176
  if (args[2]->IsObject())
191
198
    send_handle_obj = args[2].As<Object>();
192
193
  // Compute the size of the storage that the string will be flattened into.
194
  // For UTF8 strings that are very long, go ahead and take the hit for
195
  // computing their actual size, rather than tripling the storage.
196
  size_t storage_size;
197

2613822
  if (enc == UTF8 && string->Length() > 65535 &&
198

653505
      !StringBytes::Size(env->isolate(), string, enc).To(&storage_size))
199
    return 0;
200


1998264
  else if (!StringBytes::StorageSize(env->isolate(), string, enc)
201
1998264
                .To(&storage_size))
202
    return 0;
203
204


666088
  if (storage_size > INT_MAX)
205
    return UV_ENOBUFS;
206
207
  // Try writing immediately if write size isn't too big
208
  char stack_storage[16384];  // 16kb
209
  size_t data_size;
210
666088
  size_t synchronously_written = 0;
211
  uv_buf_t buf;
212
213
  bool try_write = storage_size <= sizeof(stack_storage) &&
214






667899
                   (!IsIPCPipe() || send_handle_obj.IsEmpty());
215


666088
  if (try_write) {
216
665805
    data_size = StringBytes::Write(env->isolate(),
217
                                   stack_storage,
218
                                   storage_size,
219
                                   string,
220
1331610
                                   enc);
221
665805
    buf = uv_buf_init(stack_storage, data_size);
222
223
665805
    uv_buf_t* bufs = &buf;
224
665805
    size_t count = 1;
225
665805
    const int err = DoTryWrite(&bufs, &count);
226
    // Keep track of the bytes written here, because we're taking a shortcut
227
    // by using `DoTryWrite()` directly instead of using the utilities
228
    // provided by `Write()`.
229


665805
    synchronously_written = count == 0 ? data_size : data_size - buf.len;
230
665805
    bytes_written_ += synchronously_written;
231
232
    // Immediate failure or success
233




665805
    if (err != 0 || count == 0) {
234
664082
      SetWriteResult(StreamWriteResult { false, err, nullptr, data_size });
235
664082
      return err;
236
    }
237
238
    // Partial write
239


1723
    CHECK_EQ(count, 1);
240
  }
241
242
2006
  MallocedBuffer<char> data;
243
244


2006
  if (try_write) {
245
    // Copy partial data
246
1723
    data = MallocedBuffer<char>(buf.len);
247
1723
    memcpy(data.data, buf.base, buf.len);
248
1723
    data_size = buf.len;
249
  } else {
250
    // Write it
251
283
    data = MallocedBuffer<char>(storage_size);
252
283
    data_size = StringBytes::Write(env->isolate(),
253
                                   data.data,
254
                                   storage_size,
255
                                   string,
256
566
                                   enc);
257
  }
258
259


2006
  CHECK_LE(data_size, storage_size);
260
261
2006
  buf = uv_buf_init(data.data, data_size);
262
263
2006
  uv_stream_t* send_handle = nullptr;
264
265






2109
  if (IsIPCPipe() && !send_handle_obj.IsEmpty()) {
266
    HandleWrap* wrap;
267


99
    ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
268
99
    send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
269
    // Reference LibuvStreamWrap instance to prevent it from being garbage
270
    // collected before `AfterWrite` is called.
271
    req_wrap_obj->Set(env->context(),
272
                      env->handle_string(),
273
396
                      send_handle_obj).FromJust();
274
  }
275
276
2006
  StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
277
2006
  res.bytes += synchronously_written;
278
279
2006
  SetWriteResult(res);
280


2006
  if (res.wrap != nullptr) {
281
1832
    res.wrap->SetAllocatedStorage(data.release(), data_size);
282
  }
283
284
2006
  return res.err;
285
}
286
287
288
389591
void StreamBase::CallJSOnreadMethod(ssize_t nread,
289
                                    Local<ArrayBuffer> ab,
290
                                    size_t offset) {
291
389591
  Environment* env = env_;
292
293
  DCHECK_EQ(static_cast<int32_t>(nread), nread);
294
  DCHECK_LE(offset, INT32_MAX);
295
296
389591
  if (ab.IsEmpty()) {
297
    DCHECK_EQ(offset, 0);
298
    DCHECK_LE(nread, 0);
299
  } else {
300
    DCHECK_GE(nread, 0);
301
  }
302
303
389591
  env->stream_base_state()[kReadBytesOrError] = nread;
304
389591
  env->stream_base_state()[kArrayBufferOffset] = offset;
305
306
  Local<Value> argv[] = {
307
831166
    ab.IsEmpty() ? Undefined(env->isolate()).As<Value>() : ab.As<Value>()
308
1947955
  };
309
310
389591
  AsyncWrap* wrap = GetAsyncWrap();
311
389591
  CHECK_NOT_NULL(wrap);
312
389591
  wrap->MakeCallback(env->onread_string(), arraysize(argv), argv);
313
389525
}
314
315
316
2413
bool StreamBase::IsIPCPipe() {
317
2413
  return false;
318
}
319
320
321
int StreamBase::GetFD() {
322
  return -1;
323
}
324
325
326
25091
Local<Object> StreamBase::GetObject() {
327
25091
  return GetAsyncWrap()->object();
328
}
329
330
331
6300
int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
332
  // No TryWrite by default
333
6300
  return 0;
334
}
335
336
337
70331
const char* StreamResource::Error() const {
338
70331
  return nullptr;
339
}
340
341
342
void StreamResource::ClearError() {
343
  // No-op
344
}
345
346
347
395322
uv_buf_t StreamListener::OnStreamAlloc(size_t suggested_size) {
348
395322
  return uv_buf_init(Malloc(suggested_size), suggested_size);
349
}
350
351
352
379350
void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
353
379350
  CHECK_NOT_NULL(stream_);
354
379350
  StreamBase* stream = static_cast<StreamBase*>(stream_);
355
379350
  Environment* env = stream->stream_env();
356
379350
  HandleScope handle_scope(env->isolate());
357
741313
  Context::Scope context_scope(env->context());
358
359
379350
  if (nread <= 0)  {
360
17345
    free(buf.base);
361
17345
    if (nread < 0)
362
17328
      stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
363
396605
    return;
364
  }
365
366
362005
  CHECK_LE(static_cast<size_t>(nread), buf.len);
367
362005
  char* base = Realloc(buf.base, nread);
368
369
  Local<ArrayBuffer> obj = ArrayBuffer::New(
370
      env->isolate(),
371
      base,
372
      nread,
373
362005
      v8::ArrayBufferCreationMode::kInternalized);  // Transfer ownership to V8.
374
723968
  stream->CallJSOnreadMethod(nread, obj);
375
}
376
377
378
25035
void ReportWritesToJSStreamListener::OnStreamAfterReqFinished(
379
    StreamReq* req_wrap, int status) {
380
25035
  StreamBase* stream = static_cast<StreamBase*>(stream_);
381
25035
  Environment* env = stream->stream_env();
382
25035
  AsyncWrap* async_wrap = req_wrap->GetAsyncWrap();
383
25035
  HandleScope handle_scope(env->isolate());
384
25035
  Context::Scope context_scope(env->context());
385
50070
  CHECK(!async_wrap->persistent().IsEmpty());
386
25035
  Local<Object> req_wrap_obj = async_wrap->object();
387
388
  Local<Value> argv[] = {
389
    Integer::New(env->isolate(), status),
390
25035
    stream->GetObject(),
391
    Undefined(env->isolate())
392
125175
  };
393
394
25035
  const char* msg = stream->Error();
395
25035
  if (msg != nullptr) {
396
    argv[2] = OneByteString(env->isolate(), msg);
397
    stream->ClearError();
398
  }
399
400
100140
  if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
401
50065
    async_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
402
25032
}
403
404
3481
void ReportWritesToJSStreamListener::OnStreamAfterWrite(
405
    WriteWrap* req_wrap, int status) {
406
3481
  OnStreamAfterReqFinished(req_wrap, status);
407
3478
}
408
409
21554
void ReportWritesToJSStreamListener::OnStreamAfterShutdown(
410
    ShutdownWrap* req_wrap, int status) {
411
21554
  OnStreamAfterReqFinished(req_wrap, status);
412
21554
}
413
414
415
}  // namespace node