GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/stream_base.h Lines: 29 33 87.9 %
Date: 2019-02-23 22:23:05 Branches: 10 20 50.0 %

Line Branch Exec Source
1
#ifndef SRC_STREAM_BASE_H_
2
#define SRC_STREAM_BASE_H_
3
4
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
5
6
#include "env.h"
7
#include "async_wrap-inl.h"
8
#include "node.h"
9
#include "util.h"
10
11
#include "v8.h"
12
13
namespace node {
14
15
// Forward declarations
16
class ShutdownWrap;
17
class WriteWrap;
18
class StreamBase;
19
class StreamResource;
20
21
struct StreamWriteResult {
22
  bool async;
23
  int err;
24
  WriteWrap* wrap;
25
  size_t bytes;
26
};
27
28
29
class StreamReq {
30
 public:
31
  static constexpr int kStreamReqField = 1;
32
33
48093
  explicit StreamReq(StreamBase* stream,
34
48093
                     v8::Local<v8::Object> req_wrap_obj) : stream_(stream) {
35
48093
    AttachToObject(req_wrap_obj);
36
48093
  }
37
38
48075
  virtual ~StreamReq() {}
39
  virtual AsyncWrap* GetAsyncWrap() = 0;
40
  v8::Local<v8::Object> object();
41
42
  void Done(int status, const char* error_str = nullptr);
43
  void Dispose();
44
45
26466
  inline StreamBase* stream() const { return stream_; }
46
47
  static StreamReq* FromObject(v8::Local<v8::Object> req_wrap_obj);
48
49
  // Sets all internal fields of `req_wrap_obj` to `nullptr`.
50
  // This is what the `WriteWrap` and `ShutdownWrap` JS constructors do,
51
  // and what we use in C++ after creating these objects from their
52
  // v8::ObjectTemplates, to avoid the overhead of calling the
53
  // constructor explicitly.
54
  static inline void ResetObject(v8::Local<v8::Object> req_wrap_obj);
55
56
 protected:
57
  virtual void OnDone(int status) = 0;
58
59
  void AttachToObject(v8::Local<v8::Object> req_wrap_obj);
60
61
 private:
62
  StreamBase* const stream_;
63
};
64
65
43161
class ShutdownWrap : public StreamReq {
66
 public:
67
43163
  ShutdownWrap(StreamBase* stream,
68
               v8::Local<v8::Object> req_wrap_obj)
69
43163
    : StreamReq(stream, req_wrap_obj) { }
70
71
  // Call stream()->EmitAfterShutdown() and dispose of this request wrap.
72
  void OnDone(int status) override;
73
};
74
75
class WriteWrap : public StreamReq {
76
 public:
77
  char* Storage();
78
  size_t StorageSize() const;
79
  void SetAllocatedStorage(char* data, size_t size);
80
81
4930
  WriteWrap(StreamBase* stream,
82
            v8::Local<v8::Object> req_wrap_obj)
83
4930
    : StreamReq(stream, req_wrap_obj) { }
84
85
9828
  ~WriteWrap() override {
86
4914
    free(storage_);
87
4914
  }
88
89
  // Call stream()->EmitAfterWrite() and dispose of this request wrap.
90
  void OnDone(int status) override;
91
92
 private:
93
  char* storage_ = nullptr;
94
  size_t storage_size_ = 0;
95
};
96
97
98
// This is the generic interface for objects that control Node.js' C++ streams.
99
// For example, the default `EmitToJSStreamListener` emits a stream's data
100
// as Buffers in JS, or `TLSWrap` reads and decrypts data from a stream.
101
113682
class StreamListener {
102
 public:
103
  virtual ~StreamListener();
104
105
  // This is called when a stream wants to allocate memory before
106
  // reading data into the freshly allocated buffer (i.e. it is always followed
107
  // by a `OnStreamRead()` call).
108
  // This memory may be statically or dynamically allocated; for example,
109
  // a protocol parser may want to read data into a static buffer if it knows
110
  // that all data is going to be fully handled during the next
111
  // `OnStreamRead()` call.
112
  // The returned buffer does not need to contain `suggested_size` bytes.
113
  // The default implementation of this method returns a buffer that has exactly
114
  // the suggested size and is allocated using malloc().
115
  // It is not valid to return a zero-length buffer from this method.
116
  // It is not guaranteed that the corresponding `OnStreamRead()` call
117
  // happens in the same event loop turn as this call.
118
  virtual uv_buf_t OnStreamAlloc(size_t suggested_size);
119
120
  // `OnStreamRead()` is called when data is available on the socket and has
121
  // been read into the buffer provided by `OnStreamAlloc()`.
122
  // The `buf` argument is the return value of `uv_buf_t`, or may be a buffer
123
  // with base nullptr in case of an error.
124
  // `nread` is the number of read bytes (which is at most the buffer length),
125
  // or, if negative, a libuv error code.
126
  virtual void OnStreamRead(ssize_t nread,
127
                            const uv_buf_t& buf) = 0;
128
129
  // This is called once a write has finished. `status` may be 0 or,
130
  // if negative, a libuv error code.
131
  // By default, this is simply passed on to the previous listener
132
  // (and raises an assertion if there is none).
133
  virtual void OnStreamAfterWrite(WriteWrap* w, int status);
134
135
  // This is called once a shutdown has finished. `status` may be 0 or,
136
  // if negative, a libuv error code.
137
  // By default, this is simply passed on to the previous listener
138
  // (and raises an assertion if there is none).
139
  virtual void OnStreamAfterShutdown(ShutdownWrap* w, int status);
140
141
  // This is called by the stream if it determines that it wants more data
142
  // to be written to it. Not all streams support this.
143
  // This callback will not be called as long as there are active writes.
144
  // It is not supported by all streams; `stream->HasWantsWrite()` returns
145
  // true if it is supported by a stream.
146
2413
  virtual void OnStreamWantsWrite(size_t suggested_size) {}
147
148
  // This is called immediately before the stream is destroyed.
149
521
  virtual void OnStreamDestroy() {}
150
151
  // The stream this is currently associated with, or nullptr if there is none.
152
929
  inline StreamResource* stream() { return stream_; }
153
154
 protected:
155
  // Pass along a read error to the `StreamListener` instance that was active
156
  // before this one. For example, a protocol parser does not care about read
157
  // errors and may instead want to let the original handler
158
  // (e.g. the JS handler) take care of the situation.
159
  void PassReadErrorToPreviousListener(ssize_t nread);
160
161
  StreamResource* stream_ = nullptr;
162
  StreamListener* previous_listener_ = nullptr;
163
164
  friend class StreamResource;
165
};
166
167
168
// An (incomplete) stream listener class that calls the `.oncomplete()`
169
// method of the JS objects associated with the wrap objects.
170
154100
class ReportWritesToJSStreamListener : public StreamListener {
171
 public:
172
  void OnStreamAfterWrite(WriteWrap* w, int status) override;
173
  void OnStreamAfterShutdown(ShutdownWrap* w, int status) override;
174
175
 private:
176
  void OnStreamAfterReqFinished(StreamReq* req_wrap, int status);
177
};
178
179
180
// A default emitter that just pushes data chunks as Buffer instances to
181
// JS land via the handle’s .ondata method.
182
154100
class EmitToJSStreamListener : public ReportWritesToJSStreamListener {
183
 public:
184
  void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
185
};
186
187
188
// A generic stream, comparable to JS land’s `Duplex` streams.
189
// A stream is always controlled through one `StreamListener` instance.
190
77288
class StreamResource {
191
 public:
192
  virtual ~StreamResource();
193
194
  // These need to be implemented on the readable side of this stream:
195
196
  // Start reading from the underlying resource. This is called by the consumer
197
  // when more data is desired. Use `EmitAlloc()` and `EmitData()` to
198
  // pass data along to the consumer.
199
  virtual int ReadStart() = 0;
200
  // Stop reading from the underlying resource. This is called by the
201
  // consumer when its buffers are full and no more data can be handled.
202
  virtual int ReadStop() = 0;
203
204
  // These need to be implemented on the writable side of this stream:
205
  // All of these methods may return an error code synchronously.
206
  // In that case, the finish callback should *not* be called.
207
208
  // Perform a shutdown operation, and either call req_wrap->Done() when
209
  // finished and return 0, return 1 for synchronous success, or
210
  // a libuv error code for synchronous failures.
211
  virtual int DoShutdown(ShutdownWrap* req_wrap) = 0;
212
  // Try to write as much data as possible synchronously, and modify
213
  // `*bufs` and `*count` accordingly. This is a no-op by default.
214
  // Return 0 for success and a libuv error code for failures.
215
  virtual int DoTryWrite(uv_buf_t** bufs, size_t* count);
216
  // Perform a write of data, and either call req_wrap->Done() when finished
217
  // and return 0, or return a libuv error code for synchronous failures.
218
  virtual int DoWrite(WriteWrap* w,
219
                      uv_buf_t* bufs,
220
                      size_t count,
221
                      uv_stream_t* send_handle) = 0;
222
223
  // Returns true if the stream supports the `OnStreamWantsWrite()` interface.
224
  virtual bool HasWantsWrite() const { return false; }
225
226
  // Optionally, this may provide an error message to be used for
227
  // failing writes.
228
  virtual const char* Error() const;
229
  // Clear the current error (i.e. that would be returned by Error()).
230
  virtual void ClearError();
231
232
  // Transfer ownership of this stream to `listener`. The previous listener
233
  // will not receive any more callbacks while the new listener was active.
234
  void PushStreamListener(StreamListener* listener);
235
  // Remove a listener, and, if this was the currently active one,
236
  // transfer ownership back to the previous listener.
237
  void RemoveStreamListener(StreamListener* listener);
238
239
 protected:
240
  // Call the current listener's OnStreamAlloc() method.
241
  uv_buf_t EmitAlloc(size_t suggested_size);
242
  // Call the current listener's OnStreamRead() method and update the
243
  // stream's read byte counter.
244
  void EmitRead(ssize_t nread, const uv_buf_t& buf = uv_buf_init(nullptr, 0));
245
  // Call the current listener's OnStreamAfterWrite() method.
246
  void EmitAfterWrite(WriteWrap* w, int status);
247
  // Call the current listener's OnStreamAfterShutdown() method.
248
  void EmitAfterShutdown(ShutdownWrap* w, int status);
249
  // Call the current listener's OnStreamWantsWrite() method.
250
  void EmitWantsWrite(size_t suggested_size);
251
252
  StreamListener* listener_ = nullptr;
253
  uint64_t bytes_read_ = 0;
254
  uint64_t bytes_written_ = 0;
255
256
  friend class StreamListener;
257
};
258
259
260
76812
class StreamBase : public StreamResource {
261
 public:
262
  template <class Base>
263
  static inline void AddMethods(Environment* env,
264
                                v8::Local<v8::FunctionTemplate> target);
265
266
  virtual bool IsAlive() = 0;
267
  virtual bool IsClosing() = 0;
268
  virtual bool IsIPCPipe();
269
  virtual int GetFD();
270
271
  void CallJSOnreadMethod(ssize_t nread,
272
                          v8::Local<v8::ArrayBuffer> ab,
273
                          size_t offset = 0);
274
275
  // This is named `stream_env` to avoid name clashes, because a lot of
276
  // subclasses are also `BaseObject`s.
277
  Environment* stream_env() const;
278
279
  // Shut down the current stream. This request can use an existing
280
  // ShutdownWrap object (that was created in JS), or a new one will be created.
281
  // Returns 1 in case of a synchronous completion, 0 in case of asynchronous
282
  // completion, and a libuv error case in case of synchronous failure.
283
  int Shutdown(v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>());
284
285
  // Write data to the current stream. This request can use an existing
286
  // WriteWrap object (that was created in JS), or a new one will be created.
287
  // This will first try to write synchronously using `DoTryWrite()`, then
288
  // asynchronously using `DoWrite()`.
289
  // If the return value indicates a synchronous completion, no callback will
290
  // be invoked.
291
  StreamWriteResult Write(
292
      uv_buf_t* bufs,
293
      size_t count,
294
      uv_stream_t* send_handle = nullptr,
295
      v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>());
296
297
  // These can be overridden by subclasses to get more specific wrap instances.
298
  // For example, a subclass Foo could create a FooWriteWrap or FooShutdownWrap
299
  // (inheriting from ShutdownWrap/WriteWrap) that has extra fields, like
300
  // an associated libuv request.
301
  virtual ShutdownWrap* CreateShutdownWrap(v8::Local<v8::Object> object);
302
  virtual WriteWrap* CreateWriteWrap(v8::Local<v8::Object> object);
303
304
  // One of these must be implemented
305
  virtual AsyncWrap* GetAsyncWrap() = 0;
306
  virtual v8::Local<v8::Object> GetObject();
307
308
 protected:
309
  explicit StreamBase(Environment* env);
310
311
  // JS Methods
312
  int ReadStartJS(const v8::FunctionCallbackInfo<v8::Value>& args);
313
  int ReadStopJS(const v8::FunctionCallbackInfo<v8::Value>& args);
314
  int Shutdown(const v8::FunctionCallbackInfo<v8::Value>& args);
315
  int Writev(const v8::FunctionCallbackInfo<v8::Value>& args);
316
  int WriteBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
317
  template <enum encoding enc>
318
  int WriteString(const v8::FunctionCallbackInfo<v8::Value>& args);
319
320
  template <class Base>
321
  static void GetFD(const v8::FunctionCallbackInfo<v8::Value>& args);
322
323
  template <class Base>
324
  static void GetExternal(const v8::FunctionCallbackInfo<v8::Value>& args);
325
326
  template <class Base>
327
  static void GetBytesRead(const v8::FunctionCallbackInfo<v8::Value>& args);
328
329
  template <class Base>
330
  static void GetBytesWritten(const v8::FunctionCallbackInfo<v8::Value>& args);
331
332
  template <class Base,
333
            int (StreamBase::*Method)(
334
      const v8::FunctionCallbackInfo<v8::Value>& args)>
335
  static void JSMethod(const v8::FunctionCallbackInfo<v8::Value>& args);
336
337
  // Internal, used only in StreamBase methods + env.cc.
338
  enum StreamBaseStateFields {
339
    kReadBytesOrError,
340
    kArrayBufferOffset,
341
    kBytesWritten,
342
    kLastWriteWasAsync,
343
    kNumStreamBaseStateFields
344
  };
345
346
 private:
347
  Environment* env_;
348
  EmitToJSStreamListener default_listener_;
349
350
  void SetWriteResult(const StreamWriteResult& res);
351
352
  friend class WriteWrap;
353
  friend class ShutdownWrap;
354
  friend class Environment;  // For kNumStreamBaseStateFields.
355
};
356
357
358
// These are helpers for creating `ShutdownWrap`/`WriteWrap` instances.
359
// `OtherBase` must have a constructor that matches the `AsyncWrap`
360
// constructors’s (Environment*, Local<Object>, AsyncWrap::Provider) signature
361
// and be a subclass of `AsyncWrap`.
362
template <typename OtherBase>
363

86322
class SimpleShutdownWrap : public ShutdownWrap, public OtherBase {
364
 public:
365
  SimpleShutdownWrap(StreamBase* stream,
366
                     v8::Local<v8::Object> req_wrap_obj);
367
368
86275
  AsyncWrap* GetAsyncWrap() override { return this; }
369
370
  SET_NO_MEMORY_INFO()
371
  SET_MEMORY_INFO_NAME(SimpleShutdownWrap)
372
  SET_SELF_SIZE(SimpleShutdownWrap)
373
};
374
375
template <typename OtherBase>
376

9828
class SimpleWriteWrap : public WriteWrap, public OtherBase {
377
 public:
378
  SimpleWriteWrap(StreamBase* stream,
379
                  v8::Local<v8::Object> req_wrap_obj);
380
381
13895
  AsyncWrap* GetAsyncWrap() override { return this; }
382
383
2
  SET_NO_MEMORY_INFO()
384
2
  SET_MEMORY_INFO_NAME(SimpleWriteWrap)
385
2
  SET_SELF_SIZE(SimpleWriteWrap)
386
};
387
388
}  // namespace node
389
390
#endif  // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
391
392
#endif  // SRC_STREAM_BASE_H_