module Network.Wai.Handler.Warp.HTTP2.Types where
import Data.ByteString.Builder (Builder)
#if __GLASGOW_HASKELL__ < 709
import Control.Applicative ((<$>), (<*>), pure)
#endif
import Control.Concurrent (forkIO)
import Control.Concurrent.MVar (MVar)
import Control.Concurrent.STM
import Control.Exception (SomeException)
import Control.Monad (void)
import Control.Reaper
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import Data.IntMap.Strict (IntMap, IntMap)
import qualified Data.IntMap.Strict as M
import qualified Network.HTTP.Types as H
import Network.Wai (Request, FilePart)
import Network.Wai.HTTP2 (PushPromise, Trailers)
import Network.Wai.Handler.Warp.IORef
import Network.Wai.Handler.Warp.Types
import Network.HTTP2
import Network.HTTP2.Priority
import Network.HPACK
http2ver :: H.HttpVersion
http2ver = H.HttpVersion 2 0
isHTTP2 :: Transport -> Bool
isHTTP2 TCP = False
isHTTP2 tls = useHTTP2
where
useHTTP2 = case tlsNegotiatedProtocol tls of
Nothing -> False
Just proto -> "h2-" `BS.isPrefixOf` proto || proto == "h2"
data Input = Input Stream Request
data Control a = CFinish Trailers
| CNext a
| CNone
instance Show (Control a) where
show (CFinish _) = "CFinish"
show (CNext _) = "CNext"
show CNone = "CNone"
type DynaNext = WindowSize -> IO Next
type BytesFilled = Int
data Next = Next BytesFilled (Control DynaNext)
data Output = OFinish
| OGoaway ByteString
| OSettings ByteString SettingsList
| OFrame ByteString
| OResponse Stream H.Status H.ResponseHeaders Aux
| OPush Stream PushPromise (MVar Bool) Stream H.Status H.ResponseHeaders Aux
| ONext Stream DynaNext
outputStream :: Output -> Stream
outputStream (OResponse strm _ _ _) = strm
outputStream (ONext strm _) = strm
outputStream (OPush strm _ _ _ _ _ _) = strm
outputStream _ = error "outputStream"
data Sequence = SFinish Trailers
| SFlush
| SBuilder Builder
| SFile FilePath FilePart
data Sync = SyncNone
| SyncFinish
| SyncNext Output
data Aux = Persist (TBQueue Sequence) (TVar Sync)
data Context = Context {
http2settings :: IORef Settings
, streamTable :: StreamTable
, concurrency :: IORef Int
, pushConcurrency :: IORef Int
, priorityTreeSize :: IORef Int
, continued :: IORef (Maybe StreamId)
, currentStreamId :: IORef StreamId
, nextPushStreamId :: IORef StreamId
, inputQ :: TQueue Input
, outputQ :: PriorityTree Output
, encodeDynamicTable :: IORef DynamicTable
, decodeDynamicTable :: IORef DynamicTable
, connectionWindow :: TVar WindowSize
}
newContext :: IO Context
newContext = Context <$> newIORef defaultSettings
<*> initialize 10
<*> newIORef 0
<*> newIORef 0
<*> newIORef 0
<*> newIORef Nothing
<*> newIORef 0
<*> newIORef 2
<*> newTQueueIO
<*> newPriorityTree
<*> (newDynamicTableForEncoding defaultDynamicTableSize >>= newIORef)
<*> (newDynamicTableForDecoding defaultDynamicTableSize >>= newIORef)
<*> newTVarIO defaultInitialWindowSize
clearContext :: Context -> IO ()
clearContext ctx = void $ reaperStop $ streamTable ctx
data OpenState =
JustOpened
| Continued [HeaderBlockFragment]
Int
Int
Bool
Priority
| NoBody HeaderList Priority
| HasBody HeaderList Priority
| Body (TQueue ByteString)
data ClosedCode = Finished
| Killed
| Reset ErrorCodeId
| ResetByMe SomeException
deriving Show
data StreamState =
Idle
| Open OpenState
| HalfClosed
| Closed ClosedCode
isIdle :: StreamState -> Bool
isIdle Idle = True
isIdle _ = False
isOpen :: StreamState -> Bool
isOpen Open{} = True
isOpen _ = False
isHalfClosed :: StreamState -> Bool
isHalfClosed HalfClosed = True
isHalfClosed _ = False
isClosed :: StreamState -> Bool
isClosed Closed{} = True
isClosed _ = False
instance Show StreamState where
show Idle = "Idle"
show Open{} = "Open"
show HalfClosed = "HalfClosed"
show (Closed e) = "Closed: " ++ show e
data Stream = Stream {
streamNumber :: StreamId
, streamState :: IORef StreamState
, streamContentLength :: IORef (Maybe Int)
, streamBodyLength :: IORef Int
, streamWindow :: TVar WindowSize
, streamPrecedence :: IORef Precedence
, concurrencyRef :: IORef Int
}
instance Show Stream where
show s = show (streamNumber s)
newStream :: IORef Int -> StreamId -> WindowSize -> IO Stream
newStream ref sid win =
Stream sid <$> newIORef Idle
<*> newIORef Nothing
<*> newIORef 0
<*> newTVarIO win
<*> newIORef defaultPrecedence
<*> pure ref
opened :: Stream -> IO ()
opened Stream{concurrencyRef,streamState} = do
atomicModifyIORef' concurrencyRef (\x -> (x+1,()))
writeIORef streamState (Open JustOpened)
closed :: Stream -> ClosedCode -> IO ()
closed Stream{concurrencyRef,streamState} cc = do
atomicModifyIORef' concurrencyRef (\x -> (x1,()))
writeIORef streamState (Closed cc)
type StreamTable = Reaper (IntMap Stream) (M.Key, Stream)
initialize :: Int -> IO StreamTable
initialize duration = mkReaper settings
where
settings = defaultReaperSettings {
reaperAction = clean
, reaperDelay = duration * 1000000
, reaperCons = uncurry M.insert
, reaperNull = M.null
, reaperEmpty = M.empty
}
clean :: IntMap Stream -> IO (IntMap Stream -> IntMap Stream)
clean old = do
new <- M.fromAscList <$> prune oldlist []
return $ M.union new
where
oldlist = M.toDescList old
prune [] lst = return lst
prune (x@(_,s):xs) lst = do
st <- readIORef (streamState s)
if isClosed st then
prune xs lst
else
prune xs (x:lst)
insert :: StreamTable -> M.Key -> Stream -> IO ()
insert strmtbl k v = reaperAdd strmtbl (k,v)
search :: StreamTable -> M.Key -> IO (Maybe Stream)
search strmtbl k = M.lookup k <$> reaperRead strmtbl
enqueueWhenWindowIsOpen :: PriorityTree Output -> Output -> IO ()
enqueueWhenWindowIsOpen outQ out = do
let Stream{..} = outputStream out
atomically $ do
x <- readTVar streamWindow
check (x > 0)
pre <- readIORef streamPrecedence
enqueue outQ streamNumber pre out
enqueueOrSpawnTemporaryWaiter :: Stream -> PriorityTree Output -> Output -> IO ()
enqueueOrSpawnTemporaryWaiter Stream{..} outQ out = do
sw <- atomically $ readTVar streamWindow
if sw == 0 then
void $ forkIO $ enqueueWhenWindowIsOpen outQ out
else do
pre <- readIORef streamPrecedence
enqueue outQ streamNumber pre out