Commit 8b965348 authored by rui.zheng's avatar rui.zheng

update vendor

parent b244ce00
package gost
import (
"crypto/rand"
"crypto/tls"
"encoding/base64"
"errors"
"github.com/golang/glog"
"golang.org/x/net/http2"
"io"
//"io/ioutil"
"net"
"net/http"
"net/http/httputil"
"net/url"
"strconv"
"strings"
"sync"
"time"
)
// Proxy chain holds a list of proxy nodes
......@@ -134,7 +136,15 @@ func (c *ProxyChain) initHttp2Client(config *tls.Config, nodes ...ProxyNode) {
if err != nil {
return conn, err
}
return tls.Client(conn, cfg), nil
conn = tls.Client(conn, cfg)
// enable HTTP2 ping-pong
pingIntvl, _ := strconv.Atoi(http2Node.Get("ping"))
if pingIntvl > 0 {
enablePing(conn, time.Duration(pingIntvl)*time.Second)
}
return conn, nil
},
}
c.http2Client = &http.Client{Transport: &tr}
......@@ -142,6 +152,37 @@ func (c *ProxyChain) initHttp2Client(config *tls.Config, nodes ...ProxyNode) {
}
func enablePing(conn net.Conn, interval time.Duration) {
if conn == nil || interval == 0 {
return
}
glog.V(LINFO).Infoln("[http2] ping enabled, interval:", interval)
go func() {
t := time.NewTicker(interval)
var framer *http2.Framer
for {
select {
case <-t.C:
if framer == nil {
framer = http2.NewFramer(conn, conn)
}
var p [8]byte
rand.Read(p[:])
err := framer.WritePing(false, p)
if err != nil {
t.Stop()
framer = nil
glog.V(LWARNING).Infoln("[http2] ping:", err)
return
}
glog.V(LINFO).Infoln("[http2] ping OK")
}
}
}()
}
// Connect to addr through proxy chain
func (c *ProxyChain) Dial(addr string) (net.Conn, error) {
if !strings.Contains(addr, ":") {
......
......@@ -11,7 +11,7 @@ import (
)
const (
Version = "2.3-rc1"
Version = "2.3-rc2"
)
// Log level for glog
......
......@@ -317,10 +317,12 @@ type Framer struct {
// non-Continuation or Continuation on a different stream is
// attempted to be written.
logReads bool
logReads, logWrites bool
debugFramer *Framer // only use for logging written writes
debugFramerBuf *bytes.Buffer
debugFramer *Framer // only use for logging written writes
debugFramerBuf *bytes.Buffer
debugReadLoggerf func(string, ...interface{})
debugWriteLoggerf func(string, ...interface{})
}
func (fr *Framer) maxHeaderListSize() uint32 {
......@@ -355,7 +357,7 @@ func (f *Framer) endWrite() error {
byte(length>>16),
byte(length>>8),
byte(length))
if logFrameWrites {
if f.logWrites {
f.logWrite()
}
......@@ -378,10 +380,10 @@ func (f *Framer) logWrite() {
f.debugFramerBuf.Write(f.wbuf)
fr, err := f.debugFramer.ReadFrame()
if err != nil {
log.Printf("http2: Framer %p: failed to decode just-written frame", f)
f.debugWriteLoggerf("http2: Framer %p: failed to decode just-written frame", f)
return
}
log.Printf("http2: Framer %p: wrote %v", f, summarizeFrame(fr))
f.debugWriteLoggerf("http2: Framer %p: wrote %v", f, summarizeFrame(fr))
}
func (f *Framer) writeByte(v byte) { f.wbuf = append(f.wbuf, v) }
......@@ -399,9 +401,12 @@ const (
// NewFramer returns a Framer that writes frames to w and reads them from r.
func NewFramer(w io.Writer, r io.Reader) *Framer {
fr := &Framer{
w: w,
r: r,
logReads: logFrameReads,
w: w,
r: r,
logReads: logFrameReads,
logWrites: logFrameWrites,
debugReadLoggerf: log.Printf,
debugWriteLoggerf: log.Printf,
}
fr.getReadBuf = func(size uint32) []byte {
if cap(fr.readBuf) >= int(size) {
......@@ -483,7 +488,7 @@ func (fr *Framer) ReadFrame() (Frame, error) {
return nil, err
}
if fr.logReads {
log.Printf("http2: Framer %p: read %v", fr, summarizeFrame(f))
fr.debugReadLoggerf("http2: Framer %p: read %v", fr, summarizeFrame(f))
}
if fh.Type == FrameHeaders && fr.ReadMetaHeaders != nil {
return fr.readMetaFrame(f.(*HeadersFrame))
......@@ -1419,8 +1424,8 @@ func (fr *Framer) readMetaFrame(hf *HeadersFrame) (*MetaHeadersFrame, error) {
hdec.SetEmitEnabled(true)
hdec.SetMaxStringLength(fr.maxHeaderStringLen())
hdec.SetEmitFunc(func(hf hpack.HeaderField) {
if VerboseLogs && logFrameReads {
log.Printf("http2: decoded hpack field %+v", hf)
if VerboseLogs && fr.logReads {
fr.debugReadLoggerf("http2: decoded hpack field %+v", hf)
}
if !httplex.ValidHeaderFieldValue(hf.Value) {
invalid = headerFieldValueError(hf.Value)
......
......@@ -6,6 +6,45 @@
package http2
import "crypto/tls"
import (
"crypto/tls"
"io"
"net/http"
)
func cloneTLSConfig(c *tls.Config) *tls.Config { return c.Clone() }
var _ http.Pusher = (*responseWriter)(nil)
// Push implements http.Pusher.
func (w *responseWriter) Push(target string, opts *http.PushOptions) error {
internalOpts := pushOptions{}
if opts != nil {
internalOpts.Method = opts.Method
internalOpts.Header = opts.Header
}
return w.push(target, internalOpts)
}
func configureServer18(h1 *http.Server, h2 *Server) error {
if h2.IdleTimeout == 0 {
if h1.IdleTimeout != 0 {
h2.IdleTimeout = h1.IdleTimeout
} else {
h2.IdleTimeout = h1.ReadTimeout
}
}
return nil
}
func shouldLogPanic(panicValue interface{}) bool {
return panicValue != nil && panicValue != http.ErrAbortHandler
}
func reqGetBody(req *http.Request) func() (io.ReadCloser, error) {
return req.GetBody
}
func reqBodyIsNoBody(body io.ReadCloser) bool {
return body == http.NoBody
}
......@@ -78,13 +78,23 @@ var (
type streamState int
// HTTP/2 stream states.
//
// See http://tools.ietf.org/html/rfc7540#section-5.1.
//
// For simplicity, the server code merges "reserved (local)" into
// "half-closed (remote)". This is one less state transition to track.
// The only downside is that we send PUSH_PROMISEs slightly less
// liberally than allowable. More discussion here:
// https://lists.w3.org/Archives/Public/ietf-http-wg/2016JulSep/0599.html
//
// "reserved (remote)" is omitted since the client code does not
// support server push.
const (
stateIdle streamState = iota
stateOpen
stateHalfClosedLocal
stateHalfClosedRemote
stateResvLocal
stateResvRemote
stateClosed
)
......@@ -93,8 +103,6 @@ var stateName = [...]string{
stateOpen: "Open",
stateHalfClosedLocal: "HalfClosedLocal",
stateHalfClosedRemote: "HalfClosedRemote",
stateResvLocal: "ResvLocal",
stateResvRemote: "ResvRemote",
stateClosed: "Closed",
}
......
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build !go1.8
package http2
import (
"io"
"net/http"
)
func configureServer18(h1 *http.Server, h2 *Server) error {
// No IdleTimeout to sync prior to Go 1.8.
return nil
}
func shouldLogPanic(panicValue interface{}) bool {
return panicValue != nil
}
func reqGetBody(req *http.Request) func() (io.ReadCloser, error) {
return nil
}
func reqBodyIsNoBody(io.ReadCloser) bool { return false }
......@@ -33,6 +33,7 @@ import (
"fmt"
"io"
"log"
"math"
"net"
"net/http"
"net/textproto"
......@@ -134,9 +135,15 @@ func (s *Server) maxConcurrentStreams() uint32 {
//
// ConfigureServer must be called before s begins serving.
func ConfigureServer(s *http.Server, conf *Server) error {
if s == nil {
panic("nil *http.Server")
}
if conf == nil {
conf = new(Server)
}
if err := configureServer18(s, conf); err != nil {
return err
}
if s.TLSConfig == nil {
s.TLSConfig = new(tls.Config)
......@@ -181,9 +188,6 @@ func ConfigureServer(s *http.Server, conf *Server) error {
if !haveNPN {
s.TLSConfig.NextProtos = append(s.TLSConfig.NextProtos, NextProtoTLS)
}
// h2-14 is temporary (as of 2015-03-05) while we wait for all browsers
// to switch to "h2".
s.TLSConfig.NextProtos = append(s.TLSConfig.NextProtos, "h2-14")
if s.TLSNextProto == nil {
s.TLSNextProto = map[string]func(*http.Server, *tls.Conn, http.Handler){}
......@@ -198,7 +202,6 @@ func ConfigureServer(s *http.Server, conf *Server) error {
})
}
s.TLSNextProto[NextProtoTLS] = protoHandler
s.TLSNextProto["h2-14"] = protoHandler // temporary; see above.
return nil
}
......@@ -262,9 +265,11 @@ func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) {
streams: make(map[uint32]*stream),
readFrameCh: make(chan readFrameResult),
wantWriteFrameCh: make(chan FrameWriteRequest, 8),
wantStartPushCh: make(chan startPushRequest, 8),
wroteFrameCh: make(chan frameWriteResult, 1), // buffered; one send in writeFrameAsync
bodyReadCh: make(chan bodyReadMsg), // buffering doesn't matter either way
doneServing: make(chan struct{}),
clientMaxStreams: math.MaxUint32, // Section 6.5.2: "Initially, there is no limit to this value"
advMaxStreams: s.maxConcurrentStreams(),
initialWindowSize: initialWindowSize,
maxFrameSize: initialMaxFrameSize,
......@@ -273,6 +278,16 @@ func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) {
pushEnabled: true,
}
// The net/http package sets the write deadline from the
// http.Server.WriteTimeout during the TLS handshake, but then
// passes the connection off to us with the deadline already
// set. Disarm it here so that it is not applied to additional
// streams opened on this connection.
// TODO: implement WriteTimeout fully. See Issue 18437.
if sc.hs.WriteTimeout != 0 {
sc.conn.SetWriteDeadline(time.Time{})
}
if s.NewWriteScheduler != nil {
sc.writeSched = s.NewWriteScheduler()
} else {
......@@ -361,6 +376,7 @@ type serverConn struct {
doneServing chan struct{} // closed when serverConn.serve ends
readFrameCh chan readFrameResult // written by serverConn.readFrames
wantWriteFrameCh chan FrameWriteRequest // from handlers -> serve
wantStartPushCh chan startPushRequest // from handlers -> serve
wroteFrameCh chan frameWriteResult // from writeFrameAsync -> serve, tickles more frame writes
bodyReadCh chan bodyReadMsg // from handlers -> serve
testHookCh chan func(int) // code to run on the serve loop
......@@ -378,17 +394,21 @@ type serverConn struct {
unackedSettings int // how many SETTINGS have we sent without ACKs?
clientMaxStreams uint32 // SETTINGS_MAX_CONCURRENT_STREAMS from client (our PUSH_PROMISE limit)
advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client
curOpenStreams uint32 // client's number of open streams
maxStreamID uint32 // max ever seen
curClientStreams uint32 // number of open streams initiated by the client
curPushedStreams uint32 // number of open streams initiated by server push
maxClientStreamID uint32 // max ever seen from client (odd), or 0 if there have been no client requests
maxPushPromiseID uint32 // ID of the last push promise (even), or 0 if there have been no pushes
streams map[uint32]*stream
initialWindowSize int32
maxFrameSize int32
headerTableSize uint32
peerMaxHeaderListSize uint32 // zero means unknown (default)
canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case
writingFrame bool // started write goroutine but haven't heard back on wroteFrameCh
writingFrame bool // started writing a frame (on serve goroutine or separate)
writingFrameAsync bool // started a frame on its own goroutine but haven't heard back on wroteFrameCh
needsFrameFlush bool // last frame write wasn't a flush
inGoAway bool // we've started to or sent GOAWAY
inFrameScheduleLoop bool // whether we're in the scheduleFrameWrite loop
needToSendGoAway bool // we need to schedule a GOAWAY frame write
goAwayCode ErrCode
shutdownTimerCh <-chan time.Time // nil until used
......@@ -413,6 +433,11 @@ func (sc *serverConn) maxHeaderListSize() uint32 {
return uint32(n + typicalHeaders*perFieldOverhead)
}
func (sc *serverConn) curOpenStreams() uint32 {
sc.serveG.check()
return sc.curClientStreams + sc.curPushedStreams
}
// stream represents a stream. This is the minimal metadata needed by
// the serve goroutine. Most of the actual stream state is owned by
// the http.Handler's goroutine in the responseWriter. Because the
......@@ -438,8 +463,7 @@ type stream struct {
numTrailerValues int64
weight uint8
state streamState
sentReset bool // only true once detached from streams map
gotReset bool // only true once detacted from streams map
resetQueued bool // RST_STREAM queued for write; set by sc.resetStream
gotTrailerHeader bool // HEADER frame for trailers was seen
wroteHeaders bool // whether we wrote headers (not status 100)
reqBuf []byte // if non-nil, body pipe buffer to return later at EOF
......@@ -457,7 +481,7 @@ func (sc *serverConn) HeaderEncoder() (*hpack.Encoder, *bytes.Buffer) {
func (sc *serverConn) state(streamID uint32) (streamState, *stream) {
sc.serveG.check()
// http://http2.github.io/http2-spec/#rfc.section.5.1
// http://tools.ietf.org/html/rfc7540#section-5.1
if st, ok := sc.streams[streamID]; ok {
return st.state, st
}
......@@ -467,8 +491,14 @@ func (sc *serverConn) state(streamID uint32) (streamState, *stream) {
// a client sends a HEADERS frame on stream 7 without ever sending a
// frame on stream 5, then stream 5 transitions to the "closed"
// state when the first frame for stream 7 is sent or received."
if streamID <= sc.maxStreamID {
return stateClosed, nil
if streamID%2 == 1 {
if streamID <= sc.maxClientStreamID {
return stateClosed, nil
}
} else {
if streamID <= sc.maxPushPromiseID {
return stateClosed, nil
}
}
return stateIdle, nil
}
......@@ -692,6 +722,11 @@ func (sc *serverConn) serve() {
sc.idleTimerCh = sc.idleTimer.C
}
var gracefulShutdownCh <-chan struct{}
if sc.hs != nil {
gracefulShutdownCh = h1ServerShutdownChan(sc.hs)
}
go sc.readFrames() // closed by defer sc.conn.Close above
settingsTimer := time.NewTimer(firstSettingsTimeout)
......@@ -701,6 +736,8 @@ func (sc *serverConn) serve() {
select {
case wr := <-sc.wantWriteFrameCh:
sc.writeFrame(wr)
case spr := <-sc.wantStartPushCh:
sc.startPush(spr)
case res := <-sc.wroteFrameCh:
sc.wroteFrame(res)
case res := <-sc.readFrameCh:
......@@ -717,6 +754,9 @@ func (sc *serverConn) serve() {
case <-settingsTimer.C:
sc.logf("timeout waiting for SETTINGS frames from %v", sc.conn.RemoteAddr())
return
case <-gracefulShutdownCh:
gracefulShutdownCh = nil
sc.startGracefulShutdown()
case <-sc.shutdownTimerCh:
sc.vlogf("GOAWAY close timer fired; closing conn from %v", sc.conn.RemoteAddr())
return
......@@ -726,6 +766,10 @@ func (sc *serverConn) serve() {
case fn := <-sc.testHookCh:
fn(loopNum)
}
if sc.inGoAway && sc.curOpenStreams() == 0 && !sc.needToSendGoAway && !sc.writingFrame {
return
}
}
}
......@@ -839,8 +883,34 @@ func (sc *serverConn) writeFrameFromHandler(wr FrameWriteRequest) error {
func (sc *serverConn) writeFrame(wr FrameWriteRequest) {
sc.serveG.check()
// If true, wr will not be written and wr.done will not be signaled.
var ignoreWrite bool
// We are not allowed to write frames on closed streams. RFC 7540 Section
// 5.1.1 says: "An endpoint MUST NOT send frames other than PRIORITY on
// a closed stream." Our server never sends PRIORITY, so that exception
// does not apply.
//
// The serverConn might close an open stream while the stream's handler
// is still running. For example, the server might close a stream when it
// receives bad data from the client. If this happens, the handler might
// attempt to write a frame after the stream has been closed (since the
// handler hasn't yet been notified of the close). In this case, we simply
// ignore the frame. The handler will notice that the stream is closed when
// it waits for the frame to be written.
//
// As an exception to this rule, we allow sending RST_STREAM after close.
// This allows us to immediately reject new streams without tracking any
// state for those streams (except for the queued RST_STREAM frame). This
// may result in duplicate RST_STREAMs in some cases, but the client should
// ignore those.
if wr.StreamID() != 0 {
_, isReset := wr.write.(StreamError)
if state, _ := sc.state(wr.StreamID()); state == stateClosed && !isReset {
ignoreWrite = true
}
}
// Don't send a 100-continue response if we've already sent headers.
// See golang.org/issue/14030.
switch wr.write.(type) {
......@@ -848,6 +918,11 @@ func (sc *serverConn) writeFrame(wr FrameWriteRequest) {
wr.stream.wroteHeaders = true
case write100ContinueHeadersFrame:
if wr.stream.wroteHeaders {
// We do not need to notify wr.done because this frame is
// never written with wr.done != nil.
if wr.done != nil {
panic("wr.done != nil for write100ContinueHeadersFrame")
}
ignoreWrite = true
}
}
......@@ -871,23 +946,35 @@ func (sc *serverConn) startFrameWrite(wr FrameWriteRequest) {
if st != nil {
switch st.state {
case stateHalfClosedLocal:
panic("internal error: attempt to send frame on half-closed-local stream")
case stateClosed:
if st.sentReset || st.gotReset {
// Skip this frame.
sc.scheduleFrameWrite()
return
switch wr.write.(type) {
case StreamError, handlerPanicRST, writeWindowUpdate:
// RFC 7540 Section 5.1 allows sending RST_STREAM, PRIORITY, and WINDOW_UPDATE
// in this state. (We never send PRIORITY from the server, so that is not checked.)
default:
panic(fmt.Sprintf("internal error: attempt to send frame on a half-closed-local stream: %v", wr))
}
panic(fmt.Sprintf("internal error: attempt to send a write %v on a closed stream", wr))
case stateClosed:
panic(fmt.Sprintf("internal error: attempt to send frame on a closed stream: %v", wr))
}
}
if wpp, ok := wr.write.(*writePushPromise); ok {
var err error
wpp.promisedID, err = wpp.allocatePromisedID()
if err != nil {
sc.writingFrameAsync = false
wr.replyToWriter(err)
return
}
}
sc.writingFrame = true
sc.needsFrameFlush = true
if wr.write.staysWithinBuffer(sc.bw.Available()) {
sc.writingFrameAsync = false
err := wr.write.writeFrame(sc)
sc.wroteFrame(frameWriteResult{wr, err})
} else {
sc.writingFrameAsync = true
go sc.writeFrameAsync(wr)
}
}
......@@ -905,27 +992,12 @@ func (sc *serverConn) wroteFrame(res frameWriteResult) {
panic("internal error: expected to be already writing a frame")
}
sc.writingFrame = false
sc.writingFrameAsync = false
wr := res.wr
st := wr.stream
closeStream := endsStream(wr.write)
if _, ok := wr.write.(handlerPanicRST); ok {
sc.closeStream(st, errHandlerPanicked)
}
// Reply (if requested) to the blocked ServeHTTP goroutine.
if ch := wr.done; ch != nil {
select {
case ch <- res.err:
default:
panic(fmt.Sprintf("unbuffered done channel passed in for type %T", wr.write))
}
}
wr.write = nil // prevent use (assume it's tainted after wr.done send)
if closeStream {
if writeEndsStream(wr.write) {
st := wr.stream
if st == nil {
panic("internal error: expecting non-nil stream")
}
......@@ -938,15 +1010,29 @@ func (sc *serverConn) wroteFrame(res frameWriteResult) {
// reading data (see possible TODO at top of
// this file), we go into closed state here
// anyway, after telling the peer we're
// hanging up on them.
st.state = stateHalfClosedLocal // won't last long, but necessary for closeStream via resetStream
errCancel := streamError(st.id, ErrCodeCancel)
sc.resetStream(errCancel)
// hanging up on them. We'll transition to
// stateClosed after the RST_STREAM frame is
// written.
st.state = stateHalfClosedLocal
sc.resetStream(streamError(st.id, ErrCodeCancel))
case stateHalfClosedRemote:
sc.closeStream(st, errHandlerComplete)
}
} else {
switch v := wr.write.(type) {
case StreamError:
// st may be unknown if the RST_STREAM was generated to reject bad input.
if st, ok := sc.streams[v.StreamID]; ok {
sc.closeStream(st, v)
}
case handlerPanicRST:
sc.closeStream(wr.stream, errHandlerPanicked)
}
}
// Reply (if requested) to unblock the ServeHTTP goroutine.
wr.replyToWriter(res.err)
sc.scheduleFrameWrite()
}
......@@ -964,47 +1050,68 @@ func (sc *serverConn) wroteFrame(res frameWriteResult) {
// flush the write buffer.
func (sc *serverConn) scheduleFrameWrite() {
sc.serveG.check()
if sc.writingFrame {
return
}
if sc.needToSendGoAway {
sc.needToSendGoAway = false
sc.startFrameWrite(FrameWriteRequest{
write: &writeGoAway{
maxStreamID: sc.maxStreamID,
code: sc.goAwayCode,
},
})
if sc.writingFrame || sc.inFrameScheduleLoop {
return
}
if sc.needToSendSettingsAck {
sc.needToSendSettingsAck = false
sc.startFrameWrite(FrameWriteRequest{write: writeSettingsAck{}})
return
}
if !sc.inGoAway {
if wr, ok := sc.writeSched.Pop(); ok {
sc.startFrameWrite(wr)
return
sc.inFrameScheduleLoop = true
for !sc.writingFrameAsync {
if sc.needToSendGoAway {
sc.needToSendGoAway = false
sc.startFrameWrite(FrameWriteRequest{
write: &writeGoAway{
maxStreamID: sc.maxClientStreamID,
code: sc.goAwayCode,
},
})
continue
}
if sc.needToSendSettingsAck {
sc.needToSendSettingsAck = false
sc.startFrameWrite(FrameWriteRequest{write: writeSettingsAck{}})
continue
}
if !sc.inGoAway || sc.goAwayCode == ErrCodeNo {
if wr, ok := sc.writeSched.Pop(); ok {
sc.startFrameWrite(wr)
continue
}
}
if sc.needsFrameFlush {
sc.startFrameWrite(FrameWriteRequest{write: flushFrameWriter{}})
sc.needsFrameFlush = false // after startFrameWrite, since it sets this true
continue
}
break
}
if sc.needsFrameFlush {
sc.startFrameWrite(FrameWriteRequest{write: flushFrameWriter{}})
sc.needsFrameFlush = false // after startFrameWrite, since it sets this true
return
}
sc.inFrameScheduleLoop = false
}
// startGracefulShutdown sends a GOAWAY with ErrCodeNo to tell the
// client we're gracefully shutting down. The connection isn't closed
// until all current streams are done.
func (sc *serverConn) startGracefulShutdown() {
sc.goAwayIn(ErrCodeNo, 0)
}
func (sc *serverConn) goAway(code ErrCode) {
sc.serveG.check()
if sc.inGoAway {
return
}
var forceCloseIn time.Duration
if code != ErrCodeNo {
sc.shutDownIn(250 * time.Millisecond)
forceCloseIn = 250 * time.Millisecond
} else {
// TODO: configurable
sc.shutDownIn(1 * time.Second)
forceCloseIn = 1 * time.Second
}
sc.goAwayIn(code, forceCloseIn)
}
func (sc *serverConn) goAwayIn(code ErrCode, forceCloseIn time.Duration) {
sc.serveG.check()
if sc.inGoAway {
return
}
if forceCloseIn != 0 {
sc.shutDownIn(forceCloseIn)
}
sc.inGoAway = true
sc.needToSendGoAway = true
......@@ -1022,8 +1129,7 @@ func (sc *serverConn) resetStream(se StreamError) {
sc.serveG.check()
sc.writeFrame(FrameWriteRequest{write: se})
if st, ok := sc.streams[se.StreamID]; ok {
st.sentReset = true
sc.closeStream(st, se)
st.resetQueued = true
}
}
......@@ -1108,6 +1214,8 @@ func (sc *serverConn) processFrame(f Frame) error {
return sc.processResetStream(f)
case *PriorityFrame:
return sc.processPriority(f)
case *GoAwayFrame:
return sc.processGoAway(f)
case *PushPromiseFrame:
// A client cannot push. Thus, servers MUST treat the receipt of a PUSH_PROMISE
// frame as a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
......@@ -1133,7 +1241,7 @@ func (sc *serverConn) processPing(f *PingFrame) error {
// PROTOCOL_ERROR."
return ConnectionError(ErrCodeProtocol)
}
if sc.inGoAway {
if sc.inGoAway && sc.goAwayCode != ErrCodeNo {
return nil
}
sc.writeFrame(FrameWriteRequest{write: writePingAck{f}})
......@@ -1142,9 +1250,6 @@ func (sc *serverConn) processPing(f *PingFrame) error {
func (sc *serverConn) processWindowUpdate(f *WindowUpdateFrame) error {
sc.serveG.check()
if sc.inGoAway {
return nil
}
switch {
case f.StreamID != 0: // stream-level flow control
state, st := sc.state(f.StreamID)
......@@ -1177,9 +1282,6 @@ func (sc *serverConn) processWindowUpdate(f *WindowUpdateFrame) error {
func (sc *serverConn) processResetStream(f *RSTStreamFrame) error {
sc.serveG.check()
if sc.inGoAway {
return nil
}
state, st := sc.state(f.StreamID)
if state == stateIdle {
......@@ -1191,7 +1293,6 @@ func (sc *serverConn) processResetStream(f *RSTStreamFrame) error {
return ConnectionError(ErrCodeProtocol)
}
if st != nil {
st.gotReset = true
st.cancelCtx()
sc.closeStream(st, streamError(f.StreamID, f.ErrCode))
}
......@@ -1204,13 +1305,20 @@ func (sc *serverConn) closeStream(st *stream, err error) {
panic(fmt.Sprintf("invariant; can't close stream in state %v", st.state))
}
st.state = stateClosed
sc.curOpenStreams--
if sc.curOpenStreams == 0 {
sc.setConnState(http.StateIdle)
if st.isPushed() {
sc.curPushedStreams--
} else {
sc.curClientStreams--
}
delete(sc.streams, st.id)
if len(sc.streams) == 0 && sc.srv.IdleTimeout != 0 {
sc.idleTimer.Reset(sc.srv.IdleTimeout)
if len(sc.streams) == 0 {
sc.setConnState(http.StateIdle)
if sc.srv.IdleTimeout != 0 {
sc.idleTimer.Reset(sc.srv.IdleTimeout)
}
if h1ServerKeepAlivesDisabled(sc.hs) {
sc.startGracefulShutdown()
}
}
if p := st.body; p != nil {
// Return any buffered unread bytes worth of conn-level flow control.
......@@ -1235,9 +1343,6 @@ func (sc *serverConn) processSettings(f *SettingsFrame) error {
}
return nil
}
if sc.inGoAway {
return nil
}
if err := f.ForeachSetting(sc.processSetting); err != nil {
return err
}
......@@ -1309,7 +1414,7 @@ func (sc *serverConn) processSettingInitialWindowSize(val uint32) error {
func (sc *serverConn) processData(f *DataFrame) error {
sc.serveG.check()
if sc.inGoAway {
if sc.inGoAway && sc.goAwayCode != ErrCodeNo {
return nil
}
data := f.Data()
......@@ -1326,7 +1431,7 @@ func (sc *serverConn) processData(f *DataFrame) error {
// type PROTOCOL_ERROR."
return ConnectionError(ErrCodeProtocol)
}
if st == nil || state != stateOpen || st.gotTrailerHeader {
if st == nil || state != stateOpen || st.gotTrailerHeader || st.resetQueued {
// This includes sending a RST_STREAM if the stream is
// in stateHalfClosedLocal (which currently means that
// the http.Handler returned, so it's done reading &
......@@ -1346,6 +1451,10 @@ func (sc *serverConn) processData(f *DataFrame) error {
sc.inflow.take(int32(f.Length))
sc.sendWindowUpdate(nil, int(f.Length)) // conn-level
if st != nil && st.resetQueued {
// Already have a stream error in flight. Don't send another.
return nil
}
return streamError(id, ErrCodeStreamClosed)
}
if st.body == nil {
......@@ -1388,6 +1497,25 @@ func (sc *serverConn) processData(f *DataFrame) error {
return nil
}
func (sc *serverConn) processGoAway(f *GoAwayFrame) error {
sc.serveG.check()
if f.ErrCode != ErrCodeNo {
sc.logf("http2: received GOAWAY %+v, starting graceful shutdown", f)
} else {
sc.vlogf("http2: received GOAWAY %+v, starting graceful shutdown", f)
}
sc.startGracefulShutdown()
// http://tools.ietf.org/html/rfc7540#section-6.8
// We should not create any new streams, which means we should disable push.
sc.pushEnabled = false
return nil
}
// isPushed reports whether the stream is server-initiated.
func (st *stream) isPushed() bool {
return st.id%2 == 0
}
// endStream closes a Request.Body's pipe. It is called when a DATA
// frame says a request body is over (or after trailers).
func (st *stream) endStream() {
......@@ -1417,12 +1545,12 @@ func (st *stream) copyTrailersToHandlerRequest() {
func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
sc.serveG.check()
id := f.Header().StreamID
id := f.StreamID
if sc.inGoAway {
// Ignore.
return nil
}
// http://http2.github.io/http2-spec/#rfc.section.5.1.1
// http://tools.ietf.org/html/rfc7540#section-5.1.1
// Streams initiated by a client MUST use odd-numbered stream
// identifiers. [...] An endpoint that receives an unexpected
// stream identifier MUST respond with a connection error
......@@ -1434,8 +1562,12 @@ func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
// send a trailer for an open one. If we already have a stream
// open, let it process its own HEADERS frame (trailers at this
// point, if it's valid).
st := sc.streams[f.Header().StreamID]
if st != nil {
if st := sc.streams[f.StreamID]; st != nil {
if st.resetQueued {
// We're sending RST_STREAM to close the stream, so don't bother
// processing this frame.
return nil
}
return st.processTrailerHeaders(f)
}
......@@ -1444,57 +1576,40 @@ func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
// endpoint has opened or reserved. [...] An endpoint that
// receives an unexpected stream identifier MUST respond with
// a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
if id <= sc.maxStreamID {
if id <= sc.maxClientStreamID {
return ConnectionError(ErrCodeProtocol)
}
sc.maxStreamID = id
sc.maxClientStreamID = id
if sc.idleTimer != nil {
sc.idleTimer.Stop()
}
ctx, cancelCtx := contextWithCancel(sc.baseCtx)
st = &stream{
sc: sc,
id: id,
state: stateOpen,
ctx: ctx,
cancelCtx: cancelCtx,
}
if f.StreamEnded() {
st.state = stateHalfClosedRemote
}
st.cw.Init()
st.flow.conn = &sc.flow // link to conn-level counter
st.flow.add(sc.initialWindowSize)
st.inflow.conn = &sc.inflow // link to conn-level counter
st.inflow.add(initialWindowSize) // TODO: update this when we send a higher initial window size in the initial settings
sc.streams[id] = st
sc.writeSched.OpenStream(st.id, OpenStreamOptions{})
sc.curOpenStreams++
if sc.curOpenStreams == 1 {
sc.setConnState(http.StateActive)
}
if sc.curOpenStreams > sc.advMaxStreams {
// "Endpoints MUST NOT exceed the limit set by their
// peer. An endpoint that receives a HEADERS frame
// that causes their advertised concurrent stream
// limit to be exceeded MUST treat this as a stream
// error (Section 5.4.2) of type PROTOCOL_ERROR or
// REFUSED_STREAM."
// http://tools.ietf.org/html/rfc7540#section-5.1.2
// [...] Endpoints MUST NOT exceed the limit set by their peer. An
// endpoint that receives a HEADERS frame that causes their
// advertised concurrent stream limit to be exceeded MUST treat
// this as a stream error (Section 5.4.2) of type PROTOCOL_ERROR
// or REFUSED_STREAM.
if sc.curClientStreams+1 > sc.advMaxStreams {
if sc.unackedSettings == 0 {
// They should know better.
return streamError(st.id, ErrCodeProtocol)
return streamError(id, ErrCodeProtocol)
}
// Assume it's a network race, where they just haven't
// received our last SETTINGS update. But actually
// this can't happen yet, because we don't yet provide
// a way for users to adjust server parameters at
// runtime.
return streamError(st.id, ErrCodeRefusedStream)
return streamError(id, ErrCodeRefusedStream)
}
initialState := stateOpen
if f.StreamEnded() {
initialState = stateHalfClosedRemote
}
st := sc.newStream(id, 0, initialState)
if f.HasPriority() {
if err := checkPriority(f.StreamID, f.Priority); err != nil {
return err
......@@ -1517,19 +1632,17 @@ func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
if f.Truncated {
// Their header list was too long. Send a 431 error.
handler = handleHeaderListTooLong
} else if err := checkValidHTTP2Request(req); err != nil {
} else if err := checkValidHTTP2RequestHeaders(req.Header); err != nil {
handler = new400Handler(err)
}
// The net/http package sets the read deadline from the
// http.Server.ReadTimeout during the TLS handshake, but then
// passes the connection off to us with the deadline already
// set. Disarm it here after the request headers are read, similar
// to how the http1 server works.
// Unlike http1, though, we never re-arm it yet, though.
// TODO(bradfitz): figure out golang.org/issue/14204
// (IdleTimeout) and how this relates. Maybe the default
// IdleTimeout is ReadTimeout.
// set. Disarm it here after the request headers are read,
// similar to how the http1 server works. Here it's
// technically more like the http1 Server's ReadHeaderTimeout
// (in Go 1.8), though. That's a more sane option anyway.
if sc.hs.ReadTimeout != 0 {
sc.conn.SetReadDeadline(time.Time{})
}
......@@ -1590,21 +1703,56 @@ func (sc *serverConn) processPriority(f *PriorityFrame) error {
return nil
}
func (sc *serverConn) newStream(id, pusherID uint32, state streamState) *stream {
sc.serveG.check()
if id == 0 {
panic("internal error: cannot create stream with id 0")
}
ctx, cancelCtx := contextWithCancel(sc.baseCtx)
st := &stream{
sc: sc,
id: id,
state: state,
ctx: ctx,
cancelCtx: cancelCtx,
}
st.cw.Init()
st.flow.conn = &sc.flow // link to conn-level counter
st.flow.add(sc.initialWindowSize)
st.inflow.conn = &sc.inflow // link to conn-level counter
st.inflow.add(initialWindowSize) // TODO: update this when we send a higher initial window size in the initial settings
sc.streams[id] = st
sc.writeSched.OpenStream(st.id, OpenStreamOptions{PusherID: pusherID})
if st.isPushed() {
sc.curPushedStreams++
} else {
sc.curClientStreams++
}
if sc.curOpenStreams() == 1 {
sc.setConnState(http.StateActive)
}
return st
}
func (sc *serverConn) newWriterAndRequest(st *stream, f *MetaHeadersFrame) (*responseWriter, *http.Request, error) {
sc.serveG.check()
method := f.PseudoValue("method")
path := f.PseudoValue("path")
scheme := f.PseudoValue("scheme")
authority := f.PseudoValue("authority")
rp := requestParam{
method: f.PseudoValue("method"),
scheme: f.PseudoValue("scheme"),
authority: f.PseudoValue("authority"),
path: f.PseudoValue("path"),
}
isConnect := method == "CONNECT"
isConnect := rp.method == "CONNECT"
if isConnect {
if path != "" || scheme != "" || authority == "" {
if rp.path != "" || rp.scheme != "" || rp.authority == "" {
return nil, nil, streamError(f.StreamID, ErrCodeProtocol)
}
} else if method == "" || path == "" ||
(scheme != "https" && scheme != "http") {
} else if rp.method == "" || rp.path == "" || (rp.scheme != "https" && rp.scheme != "http") {
// See 8.1.2.6 Malformed Requests and Responses:
//
// Malformed requests or responses that are detected
......@@ -1619,36 +1767,64 @@ func (sc *serverConn) newWriterAndRequest(st *stream, f *MetaHeadersFrame) (*res
}
bodyOpen := !f.StreamEnded()
if method == "HEAD" && bodyOpen {
if rp.method == "HEAD" && bodyOpen {
// HEAD requests can't have bodies
return nil, nil, streamError(f.StreamID, ErrCodeProtocol)
}
var tlsState *tls.ConnectionState // nil if not scheme https
if scheme == "https" {
tlsState = sc.tlsState
rp.header = make(http.Header)
for _, hf := range f.RegularFields() {
rp.header.Add(sc.canonicalHeader(hf.Name), hf.Value)
}
if rp.authority == "" {
rp.authority = rp.header.Get("Host")
}
header := make(http.Header)
for _, hf := range f.RegularFields() {
header.Add(sc.canonicalHeader(hf.Name), hf.Value)
rw, req, err := sc.newWriterAndRequestNoBody(st, rp)
if err != nil {
return nil, nil, err
}
if bodyOpen {
st.reqBuf = getRequestBodyBuf()
req.Body.(*requestBody).pipe = &pipe{
b: &fixedBuffer{buf: st.reqBuf},
}
if authority == "" {
authority = header.Get("Host")
if vv, ok := rp.header["Content-Length"]; ok {
req.ContentLength, _ = strconv.ParseInt(vv[0], 10, 64)
} else {
req.ContentLength = -1
}
}
return rw, req, nil
}
type requestParam struct {
method string
scheme, authority, path string
header http.Header
}
func (sc *serverConn) newWriterAndRequestNoBody(st *stream, rp requestParam) (*responseWriter, *http.Request, error) {
sc.serveG.check()
var tlsState *tls.ConnectionState // nil if not scheme https
if rp.scheme == "https" {
tlsState = sc.tlsState
}
needsContinue := header.Get("Expect") == "100-continue"
needsContinue := rp.header.Get("Expect") == "100-continue"
if needsContinue {
header.Del("Expect")
rp.header.Del("Expect")
}
// Merge Cookie headers into one "; "-delimited value.
if cookies := header["Cookie"]; len(cookies) > 1 {
header.Set("Cookie", strings.Join(cookies, "; "))
if cookies := rp.header["Cookie"]; len(cookies) > 1 {
rp.header.Set("Cookie", strings.Join(cookies, "; "))
}
// Setup Trailers
var trailer http.Header
for _, v := range header["Trailer"] {
for _, v := range rp.header["Trailer"] {
for _, key := range strings.Split(v, ",") {
key = http.CanonicalHeaderKey(strings.TrimSpace(key))
switch key {
......@@ -1663,53 +1839,42 @@ func (sc *serverConn) newWriterAndRequest(st *stream, f *MetaHeadersFrame) (*res
}
}
}
delete(header, "Trailer")
delete(rp.header, "Trailer")
body := &requestBody{
conn: sc,
stream: st,
needsContinue: needsContinue,
}
var url_ *url.URL
var requestURI string
if isConnect {
url_ = &url.URL{Host: authority}
requestURI = authority // mimic HTTP/1 server behavior
if rp.method == "CONNECT" {
url_ = &url.URL{Host: rp.authority}
requestURI = rp.authority // mimic HTTP/1 server behavior
} else {
var err error
url_, err = url.ParseRequestURI(path)
url_, err = url.ParseRequestURI(rp.path)
if err != nil {
return nil, nil, streamError(f.StreamID, ErrCodeProtocol)
return nil, nil, streamError(st.id, ErrCodeProtocol)
}
requestURI = path
requestURI = rp.path
}
body := &requestBody{
conn: sc,
stream: st,
needsContinue: needsContinue,
}
req := &http.Request{
Method: method,
Method: rp.method,
URL: url_,
RemoteAddr: sc.remoteAddrStr,
Header: header,
Header: rp.header,
RequestURI: requestURI,
Proto: "HTTP/2.0",
ProtoMajor: 2,
ProtoMinor: 0,
TLS: tlsState,
Host: authority,
Host: rp.authority,
Body: body,
Trailer: trailer,
}
req = requestWithContext(req, st.ctx)
if bodyOpen {
st.reqBuf = getRequestBodyBuf()
body.pipe = &pipe{
b: &fixedBuffer{buf: st.reqBuf},
}
if vv, ok := header["Content-Length"]; ok {
req.ContentLength, _ = strconv.ParseInt(vv[0], 10, 64)
} else {
req.ContentLength = -1
}
}
rws := responseWriterStatePool.Get().(*responseWriterState)
bwSave := rws.bw
......@@ -1750,15 +1915,17 @@ func (sc *serverConn) runHandler(rw *responseWriter, req *http.Request, handler
rw.rws.stream.cancelCtx()
if didPanic {
e := recover()
// Same as net/http:
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
sc.writeFrameFromHandler(FrameWriteRequest{
write: handlerPanicRST{rw.rws.stream.id},
stream: rw.rws.stream,
})
sc.logf("http2: panic serving %v: %v\n%s", sc.conn.RemoteAddr(), e, buf)
// Same as net/http:
if shouldLogPanic(e) {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
sc.logf("http2: panic serving %v: %v\n%s", sc.conn.RemoteAddr(), e, buf)
}
return
}
rw.handlerDone()
......@@ -2170,8 +2337,9 @@ func (w *responseWriter) CloseNotify() <-chan bool {
if ch == nil {
ch = make(chan bool, 1)
rws.closeNotifierCh = ch
cw := rws.stream.cw
go func() {
rws.stream.cw.Wait() // wait for close
cw.Wait() // wait for close
ch <- true
}()
}
......@@ -2267,6 +2435,200 @@ func (w *responseWriter) handlerDone() {
responseWriterStatePool.Put(rws)
}
// Push errors.
var (
ErrRecursivePush = errors.New("http2: recursive push not allowed")
ErrPushLimitReached = errors.New("http2: push would exceed peer's SETTINGS_MAX_CONCURRENT_STREAMS")
)
// pushOptions is the internal version of http.PushOptions, which we
// cannot include here because it's only defined in Go 1.8 and later.
type pushOptions struct {
Method string
Header http.Header
}
func (w *responseWriter) push(target string, opts pushOptions) error {
st := w.rws.stream
sc := st.sc
sc.serveG.checkNotOn()
// No recursive pushes: "PUSH_PROMISE frames MUST only be sent on a peer-initiated stream."
// http://tools.ietf.org/html/rfc7540#section-6.6
if st.isPushed() {
return ErrRecursivePush
}
// Default options.
if opts.Method == "" {
opts.Method = "GET"
}
if opts.Header == nil {
opts.Header = http.Header{}
}
wantScheme := "http"
if w.rws.req.TLS != nil {
wantScheme = "https"
}
// Validate the request.
u, err := url.Parse(target)
if err != nil {
return err
}
if u.Scheme == "" {
if !strings.HasPrefix(target, "/") {
return fmt.Errorf("target must be an absolute URL or an absolute path: %q", target)
}
u.Scheme = wantScheme
u.Host = w.rws.req.Host
} else {
if u.Scheme != wantScheme {
return fmt.Errorf("cannot push URL with scheme %q from request with scheme %q", u.Scheme, wantScheme)
}
if u.Host == "" {
return errors.New("URL must have a host")
}
}
for k := range opts.Header {
if strings.HasPrefix(k, ":") {
return fmt.Errorf("promised request headers cannot include pseudo header %q", k)
}
// These headers are meaningful only if the request has a body,
// but PUSH_PROMISE requests cannot have a body.
// http://tools.ietf.org/html/rfc7540#section-8.2
// Also disallow Host, since the promised URL must be absolute.
switch strings.ToLower(k) {
case "content-length", "content-encoding", "trailer", "te", "expect", "host":
return fmt.Errorf("promised request headers cannot include %q", k)
}
}
if err := checkValidHTTP2RequestHeaders(opts.Header); err != nil {
return err
}
// The RFC effectively limits promised requests to GET and HEAD:
// "Promised requests MUST be cacheable [GET, HEAD, or POST], and MUST be safe [GET or HEAD]"
// http://tools.ietf.org/html/rfc7540#section-8.2
if opts.Method != "GET" && opts.Method != "HEAD" {
return fmt.Errorf("method %q must be GET or HEAD", opts.Method)
}
msg := startPushRequest{
parent: st,
method: opts.Method,
url: u,
header: cloneHeader(opts.Header),
done: errChanPool.Get().(chan error),
}
select {
case <-sc.doneServing:
return errClientDisconnected
case <-st.cw:
return errStreamClosed
case sc.wantStartPushCh <- msg:
}
select {
case <-sc.doneServing:
return errClientDisconnected
case <-st.cw:
return errStreamClosed
case err := <-msg.done:
errChanPool.Put(msg.done)
return err
}
}
type startPushRequest struct {
parent *stream
method string
url *url.URL
header http.Header
done chan error
}
func (sc *serverConn) startPush(msg startPushRequest) {
sc.serveG.check()
// http://tools.ietf.org/html/rfc7540#section-6.6.
// PUSH_PROMISE frames MUST only be sent on a peer-initiated stream that
// is in either the "open" or "half-closed (remote)" state.
if msg.parent.state != stateOpen && msg.parent.state != stateHalfClosedRemote {
// responseWriter.Push checks that the stream is peer-initiaed.
msg.done <- errStreamClosed
return
}
// http://tools.ietf.org/html/rfc7540#section-6.6.
if !sc.pushEnabled {
msg.done <- http.ErrNotSupported
return
}
// PUSH_PROMISE frames must be sent in increasing order by stream ID, so
// we allocate an ID for the promised stream lazily, when the PUSH_PROMISE
// is written. Once the ID is allocated, we start the request handler.
allocatePromisedID := func() (uint32, error) {
sc.serveG.check()
// Check this again, just in case. Technically, we might have received
// an updated SETTINGS by the time we got around to writing this frame.
if !sc.pushEnabled {
return 0, http.ErrNotSupported
}
// http://tools.ietf.org/html/rfc7540#section-6.5.2.
if sc.curPushedStreams+1 > sc.clientMaxStreams {
return 0, ErrPushLimitReached
}
// http://tools.ietf.org/html/rfc7540#section-5.1.1.
// Streams initiated by the server MUST use even-numbered identifiers.
// A server that is unable to establish a new stream identifier can send a GOAWAY
// frame so that the client is forced to open a new connection for new streams.
if sc.maxPushPromiseID+2 >= 1<<31 {
sc.startGracefulShutdown()
return 0, ErrPushLimitReached
}
sc.maxPushPromiseID += 2
promisedID := sc.maxPushPromiseID
// http://tools.ietf.org/html/rfc7540#section-8.2.
// Strictly speaking, the new stream should start in "reserved (local)", then
// transition to "half closed (remote)" after sending the initial HEADERS, but
// we start in "half closed (remote)" for simplicity.
// See further comments at the definition of stateHalfClosedRemote.
promised := sc.newStream(promisedID, msg.parent.id, stateHalfClosedRemote)
rw, req, err := sc.newWriterAndRequestNoBody(promised, requestParam{
method: msg.method,
scheme: msg.url.Scheme,
authority: msg.url.Host,
path: msg.url.RequestURI(),
header: cloneHeader(msg.header), // clone since handler runs concurrently with writing the PUSH_PROMISE
})
if err != nil {
// Should not happen, since we've already validated msg.url.
panic(fmt.Sprintf("newWriterAndRequestNoBody(%+v): %v", msg.url, err))
}
go sc.runHandler(rw, req, sc.handler.ServeHTTP)
return promisedID, nil
}
sc.writeFrame(FrameWriteRequest{
write: &writePushPromise{
streamID: msg.parent.id,
method: msg.method,
url: msg.url,
h: msg.header,
allocatePromisedID: allocatePromisedID,
},
stream: msg.parent,
done: msg.done,
})
}
// foreachHeaderElement splits v according to the "#rule" construction
// in RFC 2616 section 2.1 and calls fn for each non-empty element.
func foreachHeaderElement(v string, fn func(string)) {
......@@ -2294,16 +2656,16 @@ var connHeaders = []string{
"Upgrade",
}
// checkValidHTTP2Request checks whether req is a valid HTTP/2 request,
// checkValidHTTP2RequestHeaders checks whether h is a valid HTTP/2 request,
// per RFC 7540 Section 8.1.2.2.
// The returned error is reported to users.
func checkValidHTTP2Request(req *http.Request) error {
for _, h := range connHeaders {
if _, ok := req.Header[h]; ok {
return fmt.Errorf("request header %q is not valid in HTTP/2", h)
func checkValidHTTP2RequestHeaders(h http.Header) error {
for _, k := range connHeaders {
if _, ok := h[k]; ok {
return fmt.Errorf("request header %q is not valid in HTTP/2", k)
}
}
te := req.Header["Te"]
te := h["Te"]
if len(te) > 0 && (len(te) > 1 || (te[0] != "trailers" && te[0] != "")) {
return errors.New(`request header "TE" may only be "trailers" in HTTP/2`)
}
......@@ -2350,3 +2712,42 @@ var badTrailer = map[string]bool{
"Transfer-Encoding": true,
"Www-Authenticate": true,
}
// h1ServerShutdownChan returns a channel that will be closed when the
// provided *http.Server wants to shut down.
//
// This is a somewhat hacky way to get at http1 innards. It works
// when the http2 code is bundled into the net/http package in the
// standard library. The alternatives ended up making the cmd/go tool
// depend on http Servers. This is the lightest option for now.
// This is tested via the TestServeShutdown* tests in net/http.
func h1ServerShutdownChan(hs *http.Server) <-chan struct{} {
if fn := testh1ServerShutdownChan; fn != nil {
return fn(hs)
}
var x interface{} = hs
type I interface {
getDoneChan() <-chan struct{}
}
if hs, ok := x.(I); ok {
return hs.getDoneChan()
}
return nil
}
// optional test hook for h1ServerShutdownChan.
var testh1ServerShutdownChan func(hs *http.Server) <-chan struct{}
// h1ServerKeepAlivesDisabled reports whether hs has its keep-alives
// disabled. See comments on h1ServerShutdownChan above for why
// the code is written this way.
func h1ServerKeepAlivesDisabled(hs *http.Server) bool {
var x interface{} = hs
type I interface {
doKeepAlives() bool
}
if hs, ok := x.(I); ok {
return !hs.doKeepAlives()
}
return false
}
......@@ -191,6 +191,7 @@ type clientStream struct {
ID uint32
resc chan resAndError
bufPipe pipe // buffered pipe with the flow-controlled response payload
startedWrite bool // started request body write; guarded by cc.mu
requestedGzip bool
on100 func() // optional code to run if get a 100 continue response
......@@ -199,6 +200,7 @@ type clientStream struct {
bytesRemain int64 // -1 means unknown; owned by transportResponseBody.Read
readErr error // sticky read error; owned by transportResponseBody.Read
stopReqBody error // if non-nil, stop writing req body; guarded by cc.mu
didReset bool // whether we sent a RST_STREAM to the server; guarded by cc.mu
peerReset chan struct{} // closed on peer reset
resetErr error // populated before peerReset is closed
......@@ -226,15 +228,26 @@ func (cs *clientStream) awaitRequestCancel(req *http.Request) {
}
select {
case <-req.Cancel:
cs.cancelStream()
cs.bufPipe.CloseWithError(errRequestCanceled)
cs.cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
case <-ctx.Done():
cs.cancelStream()
cs.bufPipe.CloseWithError(ctx.Err())
cs.cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
case <-cs.done:
}
}
func (cs *clientStream) cancelStream() {
cs.cc.mu.Lock()
didReset := cs.didReset
cs.didReset = true
cs.cc.mu.Unlock()
if !didReset {
cs.cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
}
}
// checkResetOrDone reports any error sent in a RST_STREAM frame by the
// server, or errStreamClosed if the stream is complete.
func (cs *clientStream) checkResetOrDone() error {
......@@ -302,6 +315,10 @@ func authorityAddr(scheme string, authority string) (addr string) {
if a, err := idna.ToASCII(host); err == nil {
host = a
}
// IPv6 address literal, without a port:
if strings.HasPrefix(host, "[") && strings.HasSuffix(host, "]") {
return host + ":" + port
}
return net.JoinHostPort(host, port)
}
......@@ -320,8 +337,10 @@ func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Res
}
traceGotConn(req, cc)
res, err := cc.RoundTrip(req)
if shouldRetryRequest(req, err) {
continue
if err != nil {
if req, err = shouldRetryRequest(req, err); err == nil {
continue
}
}
if err != nil {
t.vlogf("RoundTrip failure: %v", err)
......@@ -343,12 +362,41 @@ func (t *Transport) CloseIdleConnections() {
var (
errClientConnClosed = errors.New("http2: client conn is closed")
errClientConnUnusable = errors.New("http2: client conn not usable")
errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
errClientConnGotGoAwayAfterSomeReqBody = errors.New("http2: Transport received Server's graceful shutdown GOAWAY; some request body already written")
)
func shouldRetryRequest(req *http.Request, err error) bool {
// TODO: retry GET requests (no bodies) more aggressively, if shutdown
// before response.
return err == errClientConnUnusable
// shouldRetryRequest is called by RoundTrip when a request fails to get
// response headers. It is always called with a non-nil error.
// It returns either a request to retry (either the same request, or a
// modified clone), or an error if the request can't be replayed.
func shouldRetryRequest(req *http.Request, err error) (*http.Request, error) {
switch err {
default:
return nil, err
case errClientConnUnusable, errClientConnGotGoAway:
return req, nil
case errClientConnGotGoAwayAfterSomeReqBody:
// If the Body is nil (or http.NoBody), it's safe to reuse
// this request and its Body.
if req.Body == nil || reqBodyIsNoBody(req.Body) {
return req, nil
}
// Otherwise we depend on the Request having its GetBody
// func defined.
getBody := reqGetBody(req) // Go 1.8: getBody = req.GetBody
if getBody == nil {
return nil, errors.New("http2: Transport: peer server initiated graceful shutdown after some of Request.Body was written; define Request.GetBody to avoid this error")
}
body, err := getBody()
if err != nil {
return nil, err
}
newReq := *req
newReq.Body = body
return &newReq, nil
}
}
func (t *Transport) dialClientConn(addr string, singleUse bool) (*ClientConn, error) {
......@@ -501,6 +549,15 @@ func (cc *ClientConn) setGoAway(f *GoAwayFrame) {
if old != nil && old.ErrCode != ErrCodeNo {
cc.goAway.ErrCode = old.ErrCode
}
last := f.LastStreamID
for streamID, cs := range cc.streams {
if streamID > last {
select {
case cs.resc <- resAndError{err: errClientConnGotGoAway}:
default:
}
}
}
}
func (cc *ClientConn) CanTakeNewRequest() bool {
......@@ -601,8 +658,6 @@ func commaSeparatedTrailers(req *http.Request) (string, error) {
}
if len(keys) > 0 {
sort.Strings(keys)
// TODO: could do better allocation-wise here, but trailers are rare,
// so being lazy for now.
return strings.Join(keys, ","), nil
}
return "", nil
......@@ -761,6 +816,13 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
cs.abortRequestBodyWrite(errStopReqBodyWrite)
}
if re.err != nil {
if re.err == errClientConnGotGoAway {
cc.mu.Lock()
if cs.startedWrite {
re.err = errClientConnGotGoAwayAfterSomeReqBody
}
cc.mu.Unlock()
}
cc.forgetStreamID(cs.ID)
return nil, re.err
}
......@@ -1666,9 +1728,10 @@ func (rl *clientConnReadLoop) processData(f *DataFrame) error {
cc.bw.Flush()
cc.wmu.Unlock()
}
didReset := cs.didReset
cc.mu.Unlock()
if len(data) > 0 {
if len(data) > 0 && !didReset {
if _, err := cs.bufPipe.Write(data); err != nil {
rl.endStreamError(cs, err)
return err
......@@ -2000,6 +2063,9 @@ func (t *Transport) getBodyWriterState(cs *clientStream, body io.Reader) (s body
resc := make(chan error, 1)
s.resc = resc
s.fn = func() {
cs.cc.mu.Lock()
cs.startedWrite = true
cs.cc.mu.Unlock()
resc <- cs.writeRequestBody(body, cs.req.Body)
}
s.delay = t.expectContinueTimeout()
......
......@@ -9,6 +9,7 @@ import (
"fmt"
"log"
"net/http"
"net/url"
"time"
"golang.org/x/net/http2/hpack"
......@@ -44,9 +45,10 @@ type writeContext interface {
HeaderEncoder() (*hpack.Encoder, *bytes.Buffer)
}
// endsStream reports whether the given frame writer w will locally
// close the stream.
func endsStream(w writeFramer) bool {
// writeEndsStream reports whether w writes a frame that will transition
// the stream to a half-closed local state. This returns false for RST_STREAM,
// which closes the entire stream (not just the local half).
func writeEndsStream(w writeFramer) bool {
switch v := w.(type) {
case *writeData:
return v.endStream
......@@ -56,7 +58,7 @@ func endsStream(w writeFramer) bool {
// This can only happen if the caller reuses w after it's
// been intentionally nil'ed out to prevent use. Keep this
// here to catch future refactoring breaking it.
panic("endsStream called on nil writeFramer")
panic("writeEndsStream called on nil writeFramer")
}
return false
}
......@@ -150,6 +152,33 @@ func (writeSettingsAck) writeFrame(ctx writeContext) error {
func (writeSettingsAck) staysWithinBuffer(max int) bool { return frameHeaderLen <= max }
// splitHeaderBlock splits headerBlock into fragments so that each fragment fits
// in a single frame, then calls fn for each fragment. firstFrag/lastFrag are true
// for the first/last fragment, respectively.
func splitHeaderBlock(ctx writeContext, headerBlock []byte, fn func(ctx writeContext, frag []byte, firstFrag, lastFrag bool) error) error {
// For now we're lazy and just pick the minimum MAX_FRAME_SIZE
// that all peers must support (16KB). Later we could care
// more and send larger frames if the peer advertised it, but
// there's little point. Most headers are small anyway (so we
// generally won't have CONTINUATION frames), and extra frames
// only waste 9 bytes anyway.
const maxFrameSize = 16384
first := true
for len(headerBlock) > 0 {
frag := headerBlock
if len(frag) > maxFrameSize {
frag = frag[:maxFrameSize]
}
headerBlock = headerBlock[len(frag):]
if err := fn(ctx, frag, first, len(headerBlock) == 0); err != nil {
return err
}
first = false
}
return nil
}
// writeResHeaders is a request to write a HEADERS and 0+ CONTINUATION frames
// for HTTP response headers or trailers from a server handler.
type writeResHeaders struct {
......@@ -207,39 +236,69 @@ func (w *writeResHeaders) writeFrame(ctx writeContext) error {
panic("unexpected empty hpack")
}
// For now we're lazy and just pick the minimum MAX_FRAME_SIZE
// that all peers must support (16KB). Later we could care
// more and send larger frames if the peer advertised it, but
// there's little point. Most headers are small anyway (so we
// generally won't have CONTINUATION frames), and extra frames
// only waste 9 bytes anyway.
const maxFrameSize = 16384
return splitHeaderBlock(ctx, headerBlock, w.writeHeaderBlock)
}
first := true
for len(headerBlock) > 0 {
frag := headerBlock
if len(frag) > maxFrameSize {
frag = frag[:maxFrameSize]
}
headerBlock = headerBlock[len(frag):]
endHeaders := len(headerBlock) == 0
var err error
if first {
first = false
err = ctx.Framer().WriteHeaders(HeadersFrameParam{
StreamID: w.streamID,
BlockFragment: frag,
EndStream: w.endStream,
EndHeaders: endHeaders,
})
} else {
err = ctx.Framer().WriteContinuation(w.streamID, endHeaders, frag)
}
if err != nil {
return err
}
func (w *writeResHeaders) writeHeaderBlock(ctx writeContext, frag []byte, firstFrag, lastFrag bool) error {
if firstFrag {
return ctx.Framer().WriteHeaders(HeadersFrameParam{
StreamID: w.streamID,
BlockFragment: frag,
EndStream: w.endStream,
EndHeaders: lastFrag,
})
} else {
return ctx.Framer().WriteContinuation(w.streamID, lastFrag, frag)
}
}
// writePushPromise is a request to write a PUSH_PROMISE and 0+ CONTINUATION frames.
type writePushPromise struct {
streamID uint32 // pusher stream
method string // for :method
url *url.URL // for :scheme, :authority, :path
h http.Header
// Creates an ID for a pushed stream. This runs on serveG just before
// the frame is written. The returned ID is copied to promisedID.
allocatePromisedID func() (uint32, error)
promisedID uint32
}
func (w *writePushPromise) staysWithinBuffer(max int) bool {
// TODO: see writeResHeaders.staysWithinBuffer
return false
}
func (w *writePushPromise) writeFrame(ctx writeContext) error {
enc, buf := ctx.HeaderEncoder()
buf.Reset()
encKV(enc, ":method", w.method)
encKV(enc, ":scheme", w.url.Scheme)
encKV(enc, ":authority", w.url.Host)
encKV(enc, ":path", w.url.RequestURI())
encodeHeaders(enc, w.h, nil)
headerBlock := buf.Bytes()
if len(headerBlock) == 0 {
panic("unexpected empty hpack")
}
return splitHeaderBlock(ctx, headerBlock, w.writeHeaderBlock)
}
func (w *writePushPromise) writeHeaderBlock(ctx writeContext, frag []byte, firstFrag, lastFrag bool) error {
if firstFrag {
return ctx.Framer().WritePushPromise(PushPromiseParam{
StreamID: w.streamID,
PromiseID: w.promisedID,
BlockFragment: frag,
EndHeaders: lastFrag,
})
} else {
return ctx.Framer().WriteContinuation(w.streamID, lastFrag, frag)
}
return nil
}
type write100ContinueHeadersFrame struct {
......@@ -274,6 +333,8 @@ func (wu writeWindowUpdate) writeFrame(ctx writeContext) error {
return ctx.Framer().WriteWindowUpdate(wu.streamID, wu.n)
}
// encodeHeaders encodes an http.Header. If keys is not nil, then (k, h[k])
// is encoded only only if k is in keys.
func encodeHeaders(enc *hpack.Encoder, h http.Header, keys []string) {
if keys == nil {
sorter := sorterPool.Get().(*sorter)
......
......@@ -25,7 +25,9 @@ type WriteScheduler interface {
// https://tools.ietf.org/html/rfc7540#section-5.1
AdjustStream(streamID uint32, priority PriorityParam)
// Push queues a frame in the scheduler.
// Push queues a frame in the scheduler. In most cases, this will not be
// called with wr.StreamID()!=0 unless that stream is currently open. The one
// exception is RST_STREAM frames, which may be sent on idle or closed streams.
Push(wr FrameWriteRequest)
// Pop dequeues the next frame to write. Returns false if no frames can
......@@ -62,6 +64,13 @@ type FrameWriteRequest struct {
// 0 is used for non-stream frames such as PING and SETTINGS.
func (wr FrameWriteRequest) StreamID() uint32 {
if wr.stream == nil {
if se, ok := wr.write.(StreamError); ok {
// (*serverConn).resetStream doesn't set
// stream because it doesn't necessarily have
// one. So special case this type of write
// message.
return se.StreamID
}
return 0
}
return wr.stream.id
......@@ -142,17 +151,27 @@ func (wr FrameWriteRequest) Consume(n int32) (FrameWriteRequest, FrameWriteReque
// String is for debugging only.
func (wr FrameWriteRequest) String() string {
var streamID uint32
if wr.stream != nil {
streamID = wr.stream.id
}
var des string
if s, ok := wr.write.(fmt.Stringer); ok {
des = s.String()
} else {
des = fmt.Sprintf("%T", wr.write)
}
return fmt.Sprintf("[FrameWriteRequest stream=%d, ch=%v, writer=%v]", streamID, wr.done != nil, des)
return fmt.Sprintf("[FrameWriteRequest stream=%d, ch=%v, writer=%v]", wr.StreamID(), wr.done != nil, des)
}
// replyToWriter sends err to wr.done and panics if the send must block
// This does nothing if wr.done is nil.
func (wr *FrameWriteRequest) replyToWriter(err error) {
if wr.done == nil {
return
}
select {
case wr.done <- err:
default:
panic(fmt.Sprintf("unbuffered done channel passed in for type %T", wr.write))
}
wr.write = nil // prevent use (assume it's tainted after wr.done send)
}
// writeQueue is used by implementations of WriteScheduler.
......
......@@ -388,7 +388,15 @@ func (ws *priorityWriteScheduler) Push(wr FrameWriteRequest) {
} else {
n = ws.nodes[id]
if n == nil {
panic("add on non-open stream")
// id is an idle or closed stream. wr should not be a HEADERS or
// DATA frame. However, wr can be a RST_STREAM. In this case, we
// push wr onto the root, rather than creating a new priorityNode,
// since RST_STREAM is tiny and the stream's priority is unknown
// anyway. See issue #17919.
if wr.DataSize() > 0 {
panic("add DATA on non-open stream")
}
n = &ws.root
}
}
n.q.push(wr)
......
......@@ -15,10 +15,10 @@
"revisionTime": "2016-09-03T01:06:34Z"
},
{
"checksumSHA1": "QooLgOqsl0ivXxYygSepzSRAtjQ=",
"checksumSHA1": "omTjLIMmJwoaAJ99BOLRn7yWgtc=",
"path": "github.com/ginuerzh/gost",
"revision": "15a5d74b563e644a471fa42aab0a2876da6b1bb0",
"revisionTime": "2017-01-09T03:39:18Z"
"revision": "b244ce00c9cb4fa0051b5827b85e45241ab75935",
"revisionTime": "2017-01-13T07:27:51Z"
},
{
"checksumSHA1": "URsJa4y/sUUw/STmbeYx9EKqaYE=",
......@@ -225,10 +225,10 @@
"revisionTime": "2016-10-24T22:38:16Z"
},
{
"checksumSHA1": "UovsbmfW33+DGfUh1geZpGzIoVo=",
"checksumSHA1": "N1akwAdrHVfPPrsFOhG2ouP21VA=",
"path": "golang.org/x/net/http2",
"revision": "65dfc08770ce66f74becfdff5f8ab01caef4e946",
"revisionTime": "2016-10-24T22:38:16Z"
"revision": "60c41d1de8da134c05b7b40154a9a82bf5b7edb9",
"revisionTime": "2017-01-10T03:16:11Z"
},
{
"checksumSHA1": "HzuGD7AwgC0p1az1WAQnEFnEk98=",
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment