GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage/nodes/benchmark/out/../src/stream_base.cc Lines: 212 231 91.8 %
Date: 2017-11-19 Branches: 143 248 57.7 %

Line Branch Exec Source
1
#include "stream_base-inl.h"
2
#include "stream_wrap.h"
3
4
#include "node.h"
5
#include "node_buffer.h"
6
#include "env-inl.h"
7
#include "js_stream.h"
8
#include "string_bytes.h"
9
#include "util-inl.h"
10
#include "v8.h"
11
12
#include <limits.h>  // INT_MAX
13
14
namespace node {
15
16
using v8::Array;
17
using v8::Context;
18
using v8::FunctionCallbackInfo;
19
using v8::HandleScope;
20
using v8::Integer;
21
using v8::Local;
22
using v8::Number;
23
using v8::Object;
24
using v8::String;
25
using v8::Value;
26
27
template int StreamBase::WriteString<ASCII>(
28
    const FunctionCallbackInfo<Value>& args);
29
template int StreamBase::WriteString<UTF8>(
30
    const FunctionCallbackInfo<Value>& args);
31
template int StreamBase::WriteString<UCS2>(
32
    const FunctionCallbackInfo<Value>& args);
33
template int StreamBase::WriteString<LATIN1>(
34
    const FunctionCallbackInfo<Value>& args);
35
36
37
8214
int StreamBase::ReadStart(const FunctionCallbackInfo<Value>& args) {
38
8214
  return ReadStart();
39
}
40
41
42
260
int StreamBase::ReadStop(const FunctionCallbackInfo<Value>& args) {
43
260
  return ReadStop();
44
}
45
46
47
2130
int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
48
2130
  Environment* env = Environment::GetCurrent(args);
49
50
4260
  CHECK(args[0]->IsObject());
51
4260
  Local<Object> req_wrap_obj = args[0].As<Object>();
52
53
2130
  AsyncWrap* wrap = GetAsyncWrap();
54
2130
  CHECK_NE(wrap, nullptr);
55
2130
  env->set_init_trigger_async_id(wrap->get_async_id());
56
  ShutdownWrap* req_wrap = new ShutdownWrap(env,
57
                                            req_wrap_obj,
58
                                            this,
59
2130
                                            AfterShutdown);
60
61
2130
  int err = DoShutdown(req_wrap);
62
2130
  if (err)
63
    delete req_wrap;
64
2130
  return err;
65
}
66
67
68
2124
void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) {
69
2124
  StreamBase* wrap = req_wrap->wrap();
70
2124
  Environment* env = req_wrap->env();
71
72
  // The wrap and request objects should still be there.
73
4248
  CHECK_EQ(req_wrap->persistent().IsEmpty(), false);
74
75
2124
  HandleScope handle_scope(env->isolate());
76
2124
  Context::Scope context_scope(env->context());
77
78
2124
  Local<Object> req_wrap_obj = req_wrap->object();
79
  Local<Value> argv[3] = {
80
    Integer::New(env->isolate(), status),
81
2124
    wrap->GetObject(),
82
    req_wrap_obj
83
10620
  };
84
85
8496
  if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
86
2124
    req_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
87
88
4248
  delete req_wrap;
89
2124
}
90
91
92
11800
int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
93
11800
  Environment* env = Environment::GetCurrent(args);
94
95
23600
  CHECK(args[0]->IsObject());
96
23600
  CHECK(args[1]->IsArray());
97
98
23600
  Local<Object> req_wrap_obj = args[0].As<Object>();
99
23600
  Local<Array> chunks = args[1].As<Array>();
100
23600
  bool all_buffers = args[2]->IsTrue();
101
102
  size_t count;
103
11800
  if (all_buffers)
104
433
    count = chunks->Length();
105
  else
106
11367
    count = chunks->Length() >> 1;
107
108
11800
  MaybeStackBuffer<uv_buf_t, 16> bufs(count);
109
11800
  uv_buf_t* buf_list = *bufs;
110
111
11800
  size_t storage_size = 0;
112
11800
  uint32_t bytes = 0;
113
  size_t offset;
114
  AsyncWrap* wrap;
115
  WriteWrap* req_wrap;
116
  int err;
117
118
11800
  if (!all_buffers) {
119
    // Determine storage size first
120
62898
    for (size_t i = 0; i < count; i++) {
121
51531
      storage_size = ROUND_UP(storage_size, WriteWrap::kAlignSize);
122
123
103062
      Local<Value> chunk = chunks->Get(i * 2);
124
125
51531
      if (Buffer::HasInstance(chunk))
126
21639
        continue;
127
        // Buffer chunk, no additional storage required
128
129
      // String chunk
130
59784
      Local<String> string = chunk->ToString(env->isolate());
131
      enum encoding encoding = ParseEncoding(env->isolate(),
132
59784
                                             chunks->Get(i * 2 + 1));
133
      size_t chunk_size;
134

32588
      if (encoding == UTF8 && string->Length() > 65535)
135
        chunk_size = StringBytes::Size(env->isolate(), string, encoding);
136
      else
137
29892
        chunk_size = StringBytes::StorageSize(env->isolate(), string, encoding);
138
139
29892
      storage_size += chunk_size;
140
    }
141
142
11367
    if (storage_size > INT_MAX)
143
      return UV_ENOBUFS;
144
  } else {
145
5866
    for (size_t i = 0; i < count; i++) {
146
10866
      Local<Value> chunk = chunks->Get(i);
147
5433
      bufs[i].base = Buffer::Data(chunk);
148
5433
      bufs[i].len = Buffer::Length(chunk);
149
5433
      bytes += bufs[i].len;
150
    }
151
152
    // Try writing immediately without allocation
153
433
    err = DoTryWrite(&buf_list, &count);
154

433
    if (err != 0 || count == 0)
155
      goto done;
156
  }
157
158
11536
  wrap = GetAsyncWrap();
159
11536
  CHECK_NE(wrap, nullptr);
160
11536
  env->set_init_trigger_async_id(wrap->get_async_id());
161
11536
  req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite, storage_size);
162
163
11536
  offset = 0;
164
11536
  if (!all_buffers) {
165
62898
    for (size_t i = 0; i < count; i++) {
166
103062
      Local<Value> chunk = chunks->Get(i * 2);
167
168
      // Write buffer
169
51531
      if (Buffer::HasInstance(chunk)) {
170
21639
        bufs[i].base = Buffer::Data(chunk);
171
21639
        bufs[i].len = Buffer::Length(chunk);
172
21639
        bytes += bufs[i].len;
173
21639
        continue;
174
      }
175
176
      // Write string
177
29892
      offset = ROUND_UP(offset, WriteWrap::kAlignSize);
178
29892
      CHECK_LE(offset, storage_size);
179
29892
      char* str_storage = req_wrap->Extra(offset);
180
29892
      size_t str_size = storage_size - offset;
181
182
59784
      Local<String> string = chunk->ToString(env->isolate());
183
      enum encoding encoding = ParseEncoding(env->isolate(),
184
59784
                                             chunks->Get(i * 2 + 1));
185
      str_size = StringBytes::Write(env->isolate(),
186
                                    str_storage,
187
                                    str_size,
188
                                    string,
189
29892
                                    encoding);
190
29892
      bufs[i].base = str_storage;
191
29892
      bufs[i].len = str_size;
192
29892
      offset += str_size;
193
29892
      bytes += str_size;
194
    }
195
  }
196
197
11536
  err = DoWrite(req_wrap, buf_list, count, nullptr);
198
34608
  req_wrap_obj->Set(env->async(), True(env->isolate()));
199
200
11536
  if (err)
201
    req_wrap->Dispose();
202
203
 done:
204
11800
  const char* msg = Error();
205
11800
  if (msg != nullptr) {
206
    req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
207
    ClearError();
208
  }
209
35400
  req_wrap_obj->Set(env->bytes_string(), Number::New(env->isolate(), bytes));
210
211
11800
  return err;
212
}
213
214
215
216
217
19444
int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
218
38888
  CHECK(args[0]->IsObject());
219
220
19444
  Environment* env = Environment::GetCurrent(args);
221
222
38888
  if (!args[1]->IsUint8Array()) {
223
1
    env->ThrowTypeError("Second argument must be a buffer");
224
1
    return 0;
225
  }
226
227
38886
  Local<Object> req_wrap_obj = args[0].As<Object>();
228
19443
  const char* data = Buffer::Data(args[1]);
229
19443
  size_t length = Buffer::Length(args[1]);
230
231
  AsyncWrap* wrap;
232
  WriteWrap* req_wrap;
233
  uv_buf_t buf;
234
19443
  buf.base = const_cast<char*>(data);
235
19443
  buf.len = length;
236
237
  // Try writing immediately without allocation
238
19443
  uv_buf_t* bufs = &buf;
239
19443
  size_t count = 1;
240
19443
  int err = DoTryWrite(&bufs, &count);
241
19443
  if (err != 0)
242
    goto done;
243
19443
  if (count == 0)
244
18225
    goto done;
245
1218
  CHECK_EQ(count, 1);
246
247
1218
  wrap = GetAsyncWrap();
248
1218
  if (wrap != nullptr)
249
1218
    env->set_init_trigger_async_id(wrap->get_async_id());
250
  // Allocate, or write rest
251
1218
  req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite);
252
253
1218
  err = DoWrite(req_wrap, bufs, count, nullptr);
254
3654
  req_wrap_obj->Set(env->async(), True(env->isolate()));
255
2436
  req_wrap_obj->Set(env->buffer_string(), args[1]);
256
257
1218
  if (err)
258
    req_wrap->Dispose();
259
260
 done:
261
19443
  const char* msg = Error();
262
19443
  if (msg != nullptr) {
263
    req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
264
    ClearError();
265
  }
266
  req_wrap_obj->Set(env->bytes_string(),
267
58329
                    Integer::NewFromUnsigned(env->isolate(), length));
268
19443
  return err;
269
}
270
271
272
template <enum encoding enc>
273
34933
int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
274
34933
  Environment* env = Environment::GetCurrent(args);
275


69866
  CHECK(args[0]->IsObject());
276


104799
  CHECK(args[1]->IsString());
277
278
69866
  Local<Object> req_wrap_obj = args[0].As<Object>();
279
69866
  Local<String> string = args[1].As<String>();
280
  Local<Object> send_handle_obj;
281
  AsyncWrap* wrap;
282


69866
  if (args[2]->IsObject())
283
190
    send_handle_obj = args[2].As<Object>();
284
285
  int err;
286
287
  // Compute the size of the storage that the string will be flattened into.
288
  // For UTF8 strings that are very long, go ahead and take the hit for
289
  // computing their actual size, rather than tripling the storage.
290
  size_t storage_size;
291
33751
  if (enc == UTF8 && string->Length() > 65535)
292
7
    storage_size = StringBytes::Size(env->isolate(), string, enc);
293
  else
294
34926
    storage_size = StringBytes::StorageSize(env->isolate(), string, enc);
295
296


34933
  if (storage_size > INT_MAX)
297
    return UV_ENOBUFS;
298
299
  // Try writing immediately if write size isn't too big
300
  WriteWrap* req_wrap;
301
  char* data;
302
  char stack_storage[16384];  // 16kb
303
  size_t data_size;
304
  uv_buf_t buf;
305
306
  bool try_write = storage_size <= sizeof(stack_storage) &&
307






36462
                   (!IsIPCPipe() || send_handle_obj.IsEmpty());
308


34933
  if (try_write) {
309
34825
    data_size = StringBytes::Write(env->isolate(),
310
                                   stack_storage,
311
                                   storage_size,
312
                                   string,
313
69650
                                   enc);
314
34825
    buf = uv_buf_init(stack_storage, data_size);
315
316
34825
    uv_buf_t* bufs = &buf;
317
34825
    size_t count = 1;
318
34825
    err = DoTryWrite(&bufs, &count);
319
320
    // Failure
321


34825
    if (err != 0)
322
33466
      goto done;
323
324
    // Success
325


34824
    if (count == 0)
326
33464
      goto done;
327
328
    // Partial write
329


1360
    CHECK_EQ(count, 1);
330
  }
331
332
1468
  wrap = GetAsyncWrap();
333


1468
  if (wrap != nullptr)
334
1468
    env->set_init_trigger_async_id(wrap->get_async_id());
335
1468
  req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite, storage_size);
336
337
1468
  data = req_wrap->Extra();
338
339


1468
  if (try_write) {
340
    // Copy partial data
341
1360
    memcpy(data, buf.base, buf.len);
342
1360
    data_size = buf.len;
343
  } else {
344
    // Write it
345
108
    data_size = StringBytes::Write(env->isolate(),
346
                                   data,
347
                                   storage_size,
348
                                   string,
349
216
                                   enc);
350
  }
351
352


1468
  CHECK_LE(data_size, storage_size);
353
354
1468
  buf = uv_buf_init(data, data_size);
355
356


1468
  if (!IsIPCPipe()) {
357
1367
    err = DoWrite(req_wrap, &buf, 1, nullptr);
358
  } else {
359
101
    uv_handle_t* send_handle = nullptr;
360
361


101
    if (!send_handle_obj.IsEmpty()) {
362
      HandleWrap* wrap;
363


95
      ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
364
95
      send_handle = wrap->GetHandle();
365
      // Reference LibuvStreamWrap instance to prevent it from being garbage
366
      // collected before `AfterWrite` is called.
367


190
      CHECK_EQ(false, req_wrap->persistent().IsEmpty());
368
190
      req_wrap_obj->Set(env->handle_string(), send_handle_obj);
369
    }
370
371
101
    err = DoWrite(
372
        req_wrap,
373
        &buf,
374
        1,
375
101
        reinterpret_cast<uv_stream_t*>(send_handle));
376
  }
377
378
4404
  req_wrap_obj->Set(env->async(), True(env->isolate()));
379
380


1468
  if (err)
381
    req_wrap->Dispose();
382
383
 done:
384
34933
  const char* msg = Error();
385


34933
  if (msg != nullptr) {
386
    req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
387
    ClearError();
388
  }
389
34933
  req_wrap_obj->Set(env->bytes_string(),
390
139732
                    Integer::NewFromUnsigned(env->isolate(), data_size));
391
34933
  return err;
392
}
393
394
395
14211
void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) {
396
14211
  StreamBase* wrap = req_wrap->wrap();
397
14211
  Environment* env = req_wrap->env();
398
399
14211
  HandleScope handle_scope(env->isolate());
400
14211
  Context::Scope context_scope(env->context());
401
402
  // The wrap and request objects should still be there.
403
28422
  CHECK_EQ(req_wrap->persistent().IsEmpty(), false);
404
405
  // Unref handle property
406
14211
  Local<Object> req_wrap_obj = req_wrap->object();
407
56844
  req_wrap_obj->Delete(env->context(), env->handle_string()).FromJust();
408
14211
  wrap->OnAfterWrite(req_wrap);
409
410
  Local<Value> argv[] = {
411
    Integer::New(env->isolate(), status),
412
14211
    wrap->GetObject(),
413
    req_wrap_obj,
414
    Undefined(env->isolate())
415
85266
  };
416
417
14211
  const char* msg = wrap->Error();
418
14211
  if (msg != nullptr) {
419
    argv[3] = OneByteString(env->isolate(), msg);
420
    wrap->ClearError();
421
  }
422
423
56844
  if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
424
14211
    req_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
425
426
28418
  req_wrap->Dispose();
427
14209
}
428
429
430
28345
void StreamBase::EmitData(ssize_t nread,
431
                          Local<Object> buf,
432
                          Local<Object> handle) {
433
28345
  Environment* env = env_;
434
435
  Local<Value> argv[] = {
436
    Integer::New(env->isolate(), nread),
437
    buf,
438
    handle
439
113380
  };
440
441
28345
  if (argv[1].IsEmpty())
442
9016
    argv[1] = Undefined(env->isolate());
443
444
28345
  if (argv[2].IsEmpty())
445
50126
    argv[2] = Undefined(env->isolate());
446
447
28345
  AsyncWrap* wrap = GetAsyncWrap();
448
28345
  CHECK_NE(wrap, nullptr);
449
28345
  wrap->MakeCallback(env->onread_string(), arraysize(argv), argv);
450
28277
}
451
452
453
2062
bool StreamBase::IsIPCPipe() {
454
2062
  return false;
455
}
456
457
458
int StreamBase::GetFD() {
459
  return -1;
460
}
461
462
463
16335
Local<Object> StreamBase::GetObject() {
464
16335
  return GetAsyncWrap()->object();
465
}
466
467
468
2648
int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
469
  // No TryWrite by default
470
2648
  return 0;
471
}
472
473
474
78648
const char* StreamResource::Error() const {
475
78648
  return nullptr;
476
}
477
478
479
void StreamResource::ClearError() {
480
  // No-op
481
}
482
483
}  // namespace node