GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage/nodes/benchmark/out/../src/stream_base.h Lines: 77 80 96.3 %
Date: 2017-11-19 Branches: 17 26 65.4 %

Line Branch Exec Source
1
#ifndef SRC_STREAM_BASE_H_
2
#define SRC_STREAM_BASE_H_
3
4
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
5
6
#include "env.h"
7
#include "async_wrap.h"
8
#include "req_wrap-inl.h"
9
#include "node.h"
10
#include "util.h"
11
12
#include "v8.h"
13
14
namespace node {
15
16
// Forward declarations
17
class StreamBase;
18
19
template <class Req>
20
class StreamReq {
21
 public:
22
  typedef void (*DoneCb)(Req* req, int status);
23
24
21038
  explicit StreamReq(DoneCb cb) : cb_(cb) {
25
21038
  }
26
27
21003
  inline void Done(int status, const char* error_str = nullptr) {
28
21003
    Req* req = static_cast<Req*>(this);
29
21003
    Environment* env = req->env();
30

21003
    if (error_str != nullptr) {
31
24
      req->object()->Set(env->error_string(),
32
                         OneByteString(env->isolate(), error_str));
33
    }
34
35
21003
    cb_(req, status);
36
21001
  }
37
38
 private:
39
  DoneCb cb_;
40
};
41
42
class ShutdownWrap : public ReqWrap<uv_shutdown_t>,
43
                     public StreamReq<ShutdownWrap> {
44
 public:
45
2130
  ShutdownWrap(Environment* env,
46
               v8::Local<v8::Object> req_wrap_obj,
47
               StreamBase* wrap,
48
               DoneCb cb)
49
      : ReqWrap(env, req_wrap_obj, AsyncWrap::PROVIDER_SHUTDOWNWRAP),
50
        StreamReq<ShutdownWrap>(cb),
51
2130
        wrap_(wrap) {
52
2130
    Wrap(req_wrap_obj, this);
53
2130
  }
54
55
6372
  ~ShutdownWrap() {
56
2124
    ClearWrap(object());
57
4248
  }
58
59
2121
  static ShutdownWrap* from_req(uv_shutdown_t* req) {
60
2121
    return ContainerOf(&ShutdownWrap::req_, req);
61
  }
62
63
2124
  inline StreamBase* wrap() const { return wrap_; }
64
  size_t self_size() const override { return sizeof(*this); }
65
66
 private:
67
  StreamBase* const wrap_;
68
};
69
70
class WriteWrap: public ReqWrap<uv_write_t>,
71
                 public StreamReq<WriteWrap> {
72
 public:
73
  static inline WriteWrap* New(Environment* env,
74
                               v8::Local<v8::Object> obj,
75
                               StreamBase* wrap,
76
                               DoneCb cb,
77
                               size_t extra = 0);
78
  inline void Dispose();
79
  inline char* Extra(size_t offset = 0);
80
  inline size_t ExtraSize() const;
81
82
17175
  inline StreamBase* wrap() const { return wrap_; }
83
84
  size_t self_size() const override { return storage_size_; }
85
86
15414
  static WriteWrap* from_req(uv_write_t* req) {
87
15414
    return ContainerOf(&WriteWrap::req_, req);
88
  }
89
90
  static const size_t kAlignSize = 16;
91
92
  WriteWrap(Environment* env,
93
            v8::Local<v8::Object> obj,
94
            StreamBase* wrap,
95
            DoneCb cb)
96
      : ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP),
97
        StreamReq<WriteWrap>(cb),
98
        wrap_(wrap),
99
        storage_size_(0) {
100
    Wrap(obj, this);
101
  }
102
103
 protected:
104
18908
  WriteWrap(Environment* env,
105
            v8::Local<v8::Object> obj,
106
            StreamBase* wrap,
107
            DoneCb cb,
108
            size_t storage_size)
109
      : ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP),
110
        StreamReq<WriteWrap>(cb),
111
        wrap_(wrap),
112
18908
        storage_size_(storage_size) {
113
18908
    Wrap(obj, this);
114
18908
  }
115
116
37786
  ~WriteWrap() {
117
18893
    ClearWrap(object());
118
18893
  }
119
120
  void* operator new(size_t size) = delete;
121
18908
  void* operator new(size_t size, char* storage) { return storage; }
122
123
  // This is just to keep the compiler happy. It should never be called, since
124
  // we don't use exceptions in node.
125
  void operator delete(void* ptr, char* storage) { UNREACHABLE(); }
126
127
 private:
128
  // People should not be using the non-placement new and delete operator on a
129
  // WriteWrap. Ensure this never happens.
130
  void operator delete(void* ptr) { UNREACHABLE(); }
131
132
  StreamBase* const wrap_;
133
  const size_t storage_size_;
134
};
135
136
class StreamResource {
137
 public:
138
  template <class T>
139
  struct Callback {
140
52454
    Callback() : fn(nullptr), ctx(nullptr) {}
141
40551
    Callback(T fn, void* ctx) : fn(fn), ctx(ctx) {}
142
    Callback(const Callback&) = default;
143
144
90772
    inline bool is_empty() { return fn == nullptr; }
145
3936
    inline void clear() {
146
3936
      fn = nullptr;
147
3936
      ctx = nullptr;
148
3936
    }
149
150
    T fn;
151
    void* ctx;
152
  };
153
154
  typedef void (*AfterWriteCb)(WriteWrap* w, void* ctx);
155
  typedef void (*AllocCb)(size_t size, uv_buf_t* buf, void* ctx);
156
  typedef void (*ReadCb)(ssize_t nread,
157
                         const uv_buf_t* buf,
158
                         uv_handle_type pending,
159
                         void* ctx);
160
  typedef void (*DestructCb)(void* ctx);
161
162
11883
  StreamResource() : bytes_read_(0) {
163
11883
  }
164
9146
  virtual ~StreamResource() {
165
9146
    if (!destruct_cb_.is_empty())
166
768
      destruct_cb_.fn(destruct_cb_.ctx);
167
9146
  }
168
169
  virtual int DoShutdown(ShutdownWrap* req_wrap) = 0;
170
  virtual int DoTryWrite(uv_buf_t** bufs, size_t* count);
171
  virtual int DoWrite(WriteWrap* w,
172
                      uv_buf_t* bufs,
173
                      size_t count,
174
                      uv_stream_t* send_handle) = 0;
175
  virtual const char* Error() const;
176
  virtual void ClearError();
177
178
  // Events
179
14233
  inline void OnAfterWrite(WriteWrap* w) {
180
14233
    if (!after_write_cb_.is_empty())
181
11371
      after_write_cb_.fn(w, after_write_cb_.ctx);
182
14233
  }
183
184
32344
  inline void OnAlloc(size_t size, uv_buf_t* buf) {
185
32344
    if (!alloc_cb_.is_empty())
186
32344
      alloc_cb_.fn(size, buf, alloc_cb_.ctx);
187
32344
  }
188
189
33081
  inline void OnRead(ssize_t nread,
190
                     const uv_buf_t* buf,
191
                     uv_handle_type pending = UV_UNKNOWN_HANDLE) {
192
33081
    if (nread > 0)
193
28703
      bytes_read_ += static_cast<uint64_t>(nread);
194
33081
    if (!read_cb_.is_empty())
195
33081
      read_cb_.fn(nread, buf, pending, read_cb_.ctx);
196
33006
  }
197
198
11388
  inline void set_after_write_cb(Callback<AfterWriteCb> c) {
199
11388
    after_write_cb_ = c;
200
11388
  }
201
202
15674
  inline void set_alloc_cb(Callback<AllocCb> c) { alloc_cb_ = c; }
203
15674
  inline void set_read_cb(Callback<ReadCb> c) { read_cb_ = c; }
204
795
  inline void set_destruct_cb(Callback<DestructCb> c) { destruct_cb_ = c; }
205
206
  inline Callback<AfterWriteCb> after_write_cb() { return after_write_cb_; }
207
1981
  inline Callback<AllocCb> alloc_cb() { return alloc_cb_; }
208
1981
  inline Callback<ReadCb> read_cb() { return read_cb_; }
209
  inline Callback<DestructCb> destruct_cb() { return destruct_cb_; }
210
211
 private:
212
  Callback<AfterWriteCb> after_write_cb_;
213
  Callback<AllocCb> alloc_cb_;
214
  Callback<ReadCb> read_cb_;
215
  Callback<DestructCb> destruct_cb_;
216
  uint64_t bytes_read_;
217
218
  friend class StreamBase;
219
};
220
221
class StreamBase : public StreamResource {
222
 public:
223
  enum Flags {
224
    kFlagNone = 0x0,
225
    kFlagHasWritev = 0x1,
226
    kFlagNoShutdown = 0x2
227
  };
228
229
  template <class Base>
230
  static inline void AddMethods(Environment* env,
231
                                v8::Local<v8::FunctionTemplate> target,
232
                                int flags = kFlagNone);
233
234
  virtual void* Cast() = 0;
235
  virtual bool IsAlive() = 0;
236
  virtual bool IsClosing() = 0;
237
  virtual bool IsIPCPipe();
238
  virtual int GetFD();
239
240
  virtual int ReadStart() = 0;
241
  virtual int ReadStop() = 0;
242
243
2776
  inline void Consume() {
244
2776
    CHECK_EQ(consumed_, false);
245
2776
    consumed_ = true;
246
2776
  }
247
248
1016
  inline void Unconsume() {
249
1016
    CHECK_EQ(consumed_, true);
250
1016
    consumed_ = false;
251
1016
  }
252
253
  template <class Outer>
254
2964
  inline Outer* Cast() { return static_cast<Outer*>(Cast()); }
255
256
  void EmitData(ssize_t nread,
257
                v8::Local<v8::Object> buf,
258
                v8::Local<v8::Object> handle);
259
260
 protected:
261
11883
  explicit StreamBase(Environment* env) : env_(env), consumed_(false) {
262
11883
  }
263
264
9146
  virtual ~StreamBase() = default;
265
266
  // One of these must be implemented
267
  virtual AsyncWrap* GetAsyncWrap() = 0;
268
  virtual v8::Local<v8::Object> GetObject();
269
270
  // Libuv callbacks
271
  static void AfterShutdown(ShutdownWrap* req, int status);
272
  static void AfterWrite(WriteWrap* req, int status);
273
274
  // JS Methods
275
  int ReadStart(const v8::FunctionCallbackInfo<v8::Value>& args);
276
  int ReadStop(const v8::FunctionCallbackInfo<v8::Value>& args);
277
  int Shutdown(const v8::FunctionCallbackInfo<v8::Value>& args);
278
  int Writev(const v8::FunctionCallbackInfo<v8::Value>& args);
279
  int WriteBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
280
  template <enum encoding enc>
281
  int WriteString(const v8::FunctionCallbackInfo<v8::Value>& args);
282
283
  template <class Base>
284
  static void GetFD(v8::Local<v8::String> key,
285
                    const v8::PropertyCallbackInfo<v8::Value>& args);
286
287
  template <class Base>
288
  static void GetExternal(v8::Local<v8::String> key,
289
                          const v8::PropertyCallbackInfo<v8::Value>& args);
290
291
  template <class Base>
292
  static void GetBytesRead(v8::Local<v8::String> key,
293
                           const v8::PropertyCallbackInfo<v8::Value>& args);
294
295
  template <class Base,
296
            int (StreamBase::*Method)(
297
      const v8::FunctionCallbackInfo<v8::Value>& args)>
298
  static void JSMethod(const v8::FunctionCallbackInfo<v8::Value>& args);
299
300
 private:
301
  Environment* env_;
302
  bool consumed_;
303
};
304
305
}  // namespace node
306
307
#endif  // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
308
309
#endif  // SRC_STREAM_BASE_H_