Skip to content

Commit 872a352

Browse files
author
Shimin Guo
committed
Merge branch 'master' of https://github.com/kubernetes-client/haskell into kubeconfig
2 parents 28e567d + dee2a32 commit 872a352

File tree

6 files changed

+218
-22
lines changed

6 files changed

+218
-22
lines changed

kubernetes-client-helper/package.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ dependencies:
1515
- x509-validation
1616
- http-client >=0.5 && <0.6
1717
- http-client-tls
18+
- microlens >= 0.4.3 && <0.5
1819
- bytestring >=0.10.0 && <0.11
1920
- text >=0.11 && <1.3
2021
- safe-exceptions <0.2

kubernetes-client-helper/src/Kubernetes/ClientHelper.hs

+35-20
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,25 @@ import qualified Data.ByteString as B
1111
import qualified Data.ByteString.Lazy as LazyB
1212
import Data.Default.Class (def)
1313
import Data.Either (rights)
14+
import Data.Monoid ((<>))
1415
import Data.PEM (pemContent, pemParseBS)
1516
import qualified Data.Text as T
1617
import qualified Data.Text.Encoding as T
18+
import qualified Data.Text.IO as T
1719
import Data.Typeable (Typeable)
1820
import Data.X509 (SignedCertificate,
1921
decodeSignedCertificate)
2022
import qualified Data.X509 as X509
21-
import Data.X509.CertificateStore (makeCertificateStore)
23+
import Data.X509.CertificateStore (CertificateStore, makeCertificateStore)
2224
import qualified Data.X509.Validation as X509
25+
import Lens.Micro (Lens', lens, set)
2326
import Network.Connection (TLSSettings (..))
2427
import qualified Network.HTTP.Client as NH
2528
import Network.HTTP.Client.TLS (mkManagerSettings)
2629
import Network.TLS (Credential, defaultParamsClient)
2730
import qualified Network.TLS as TLS
2831
import qualified Network.TLS.Extra as TLS
32+
import System.Environment (getEnv)
2933
import System.X509 (getSystemCertificateStore)
3034

3135
-- |Sets the master URI in the 'K.KubernetesConfig'.
@@ -46,7 +50,7 @@ setTokenAuth
4650
-> K.KubernetesConfig
4751
-> K.KubernetesConfig
4852
setTokenAuth token kcfg = kcfg
49-
{ K.configAuthMethods = [K.AnyAuthMethod (K.AuthApiKeyBearerToken token)]
53+
{ K.configAuthMethods = [K.AnyAuthMethod (K.AuthApiKeyBearerToken $ "Bearer " <> token)]
5054
}
5155

5256
-- |Creates a 'NH.Manager' that can handle TLS.
@@ -67,25 +71,22 @@ defaultTLSClientParams = do
6771
}
6872
}
6973

74+
clientHooksL :: Lens' TLS.ClientParams TLS.ClientHooks
75+
clientHooksL = lens TLS.clientHooks (\cp ch -> cp { TLS.clientHooks = ch })
76+
77+
onServerCertificateL :: Lens' TLS.ClientParams (CertificateStore -> TLS.ValidationCache -> X509.ServiceID -> X509.CertificateChain -> IO [X509.FailedReason])
78+
onServerCertificateL =
79+
clientHooksL . lens TLS.onServerCertificate (\ch osc -> ch { TLS.onServerCertificate = osc })
80+
7081
-- |Don't check whether the cert presented by the server matches the name of the server you are connecting to.
7182
-- This is necessary if you specify the server host by its IP address.
7283
disableServerNameValidation :: TLS.ClientParams -> TLS.ClientParams
73-
disableServerNameValidation cp = cp
74-
{ TLS.clientHooks = (TLS.clientHooks cp)
75-
{ TLS.onServerCertificate = X509.validate
76-
X509.HashSHA256
77-
def
78-
def { X509.checkFQHN = False }
79-
}
80-
}
84+
disableServerNameValidation =
85+
set onServerCertificateL (X509.validate X509.HashSHA256 def (def { X509.checkFQHN = False }))
8186

8287
-- |Insecure mode. The client will not validate the server cert at all.
8388
disableServerCertValidation :: TLS.ClientParams -> TLS.ClientParams
84-
disableServerCertValidation cp = cp
85-
{ TLS.clientHooks = (TLS.clientHooks cp)
86-
{ TLS.onServerCertificate = (\_ _ _ _ -> return [])
87-
}
88-
}
89+
disableServerCertValidation = set onServerCertificateL (\_ _ _ _ -> return [])
8990

9091
-- |Use a custom CA store.
9192
setCAStore :: [SignedCertificate] -> TLS.ClientParams -> TLS.ClientParams
@@ -95,13 +96,13 @@ setCAStore certs cp = cp
9596
}
9697
}
9798

99+
onCertificateRequestL :: Lens' TLS.ClientParams (([TLS.CertificateType], Maybe [TLS.HashAndSignatureAlgorithm], [X509.DistinguishedName]) -> IO (Maybe (X509.CertificateChain, TLS.PrivKey)))
100+
onCertificateRequestL =
101+
clientHooksL . lens TLS.onCertificateRequest (\ch ocr -> ch { TLS.onCertificateRequest = ocr })
102+
98103
-- |Use a client cert for authentication.
99104
setClientCert :: Credential -> TLS.ClientParams -> TLS.ClientParams
100-
setClientCert cred cp = cp
101-
{ TLS.clientHooks = (TLS.clientHooks cp)
102-
{ TLS.onCertificateRequest = (\_ -> return (Just cred))
103-
}
104-
}
105+
setClientCert cred = set onCertificateRequestL (\_ -> return $ Just cred)
105106

106107
-- |Parses a PEM-encoded @ByteString@ into a list of certificates.
107108
parsePEMCerts :: B.ByteString -> Either String [SignedCertificate]
@@ -119,3 +120,17 @@ loadPEMCerts p = do
119120
liftIO (B.readFile p)
120121
>>= either (throwM . ParsePEMCertsException) return
121122
. parsePEMCerts
123+
124+
serviceAccountDir :: FilePath
125+
serviceAccountDir = "/var/run/secrets/kubernetes.io/serviceaccount"
126+
127+
cluster :: (MonadIO m, MonadThrow m) => m (NH.Manager, K.KubernetesConfig)
128+
cluster = do
129+
caStore <- loadPEMCerts $ serviceAccountDir ++ "/ca.crt"
130+
defTlsParams <- liftIO defaultTLSClientParams
131+
mgr <- liftIO . newManager . setCAStore caStore $ disableServerNameValidation defTlsParams
132+
tok <- liftIO . T.readFile $ serviceAccountDir ++ "/token"
133+
host <- liftIO $ getEnv "KUBERNETES_SERVICE_HOST"
134+
port <- liftIO $ getEnv "KUBERNETES_SERVICE_PORT"
135+
config <- setTokenAuth tok . setMasterURI (T.pack $ "https://" ++ host ++ ":" ++ port) <$> liftIO K.newConfig
136+
return (mgr, config)

kubernetes-watch/README.md

+71
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
# kubernetes-watch-client
2+
3+
Client for streaming events from watch enabled endpoints.
4+
5+
## Example
6+
Following is a simple example which
7+
just streams to stdout. First some setup - this assumes kubernetes is accessible
8+
at http://localhost:8001, e.g. after running `kubectl proxy`:
9+
10+
```haskell
11+
> import qualified Data.ByteString.Streaming.Char8 as Q
12+
13+
> manager <- newManager defaultManagerSettings
14+
> defaultConfig <- newConfig
15+
> config = defaultConfig { configHost = "http://localhost:8001", configValidateAuthMethods = False }
16+
> request = listEndpointsForAllNamespaces (Accept MimeJSON)
17+
```
18+
19+
Launching 'dispatchWatch' with the above we get a stream of endpoints data:
20+
21+
```haskell
22+
> dispatchWatch manager config request Q.stdout
23+
{"type":\"ADDED\","object":{"kind":\"Endpoints\","apiVersion":"v1","metadata":{"name":"heapster" ....
24+
```
25+
26+
A more complex example involving some ggprocessing of the stream, the following
27+
prints out the event types of each event. First, define functions to allow us apply
28+
a parser to a stream:
29+
30+
31+
```haskell
32+
import Data.Aeson
33+
import qualified Data.ByteString.Streaming.Char8 as Q
34+
import Data.JsonStream.Parser
35+
import qualified Streaming.Prelude as S
36+
37+
-- | Parse the stream using the given parser.
38+
streamParse ::
39+
FromJSON a =>
40+
Parser a
41+
-> Q.ByteString IO r
42+
-> Stream (Of [a]) IO r
43+
streamParse parser byteStream = do
44+
byteStream & Q.lines & parseEvent parser
45+
46+
-- | Parse a single event from the stream.
47+
parseEvent ::
48+
(FromJSON a, Monad m) =>
49+
Parser a
50+
-> Stream (Q.ByteString m) m r
51+
-> Stream (Of [a]) m r
52+
parseEvent parser byteStream = S.map (parseByteString parser) (S.mapped Q.toStrict byteStream)
53+
```
54+
55+
Next, define the parser and apply it to the stream:
56+
57+
```haskell
58+
> eventParser = value :: Parser (WatchEvent V1Endpoints)
59+
> withResponseBody body = streamParse eventParser body & S.map (map eventType)
60+
> dispatchWatch manager config request (S.print . withResponseBody)
61+
[\"ADDED\"]
62+
[\"ADDED\"]
63+
[\"MODIFIED\"]
64+
...
65+
```
66+
67+
Packages in this example:
68+
* Data.Aeson -- from [aeson](https://hackage.haskell.org/package/aeson)
69+
* Data.ByteString.Streaming.Char8 from [streaming-bytestring](https://hackage.haskell.org/package/streaming-bytestring-0.1.5/docs/Data-ByteString-Streaming-Char8.html)
70+
* Data.JsonStream.Parser from [json-stream](https://hackage.haskell.org/package/json-stream-0.4.1.5/docs/Data-JsonStream-Parser.html)
71+
* Streaming.Prelude from [streaming](https://hackage.haskell.org/package/streaming-0.2.0.0/docs/Streaming-Prelude.html)

kubernetes-watch/package.yaml

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
name: kubernetes-watch
2+
version: 0.1.0.0
3+
library:
4+
source-dirs: src
5+
dependencies:
6+
- base >=4.7 && <5.0
7+
- aeson >=1.0 && <2.0
8+
- bytestring >=0.10.0 && <0.11
9+
- http-client >=0.5 && <0.6
10+
- mtl >=2.2.1
11+
- streaming-bytestring >= 0.1.5 && < 0.2.0
12+
- text >=0.11 && <1.3
13+
- kubernetes == 0.1.0.0
14+
15+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
{-# LANGUAGE FlexibleContexts #-}
2+
{-# LANGUAGE OverloadedStrings #-}
3+
module Kubernetes.Watch.Client
4+
( WatchEvent
5+
, eventType
6+
, eventObject
7+
, dispatchWatch
8+
) where
9+
10+
import Control.Monad
11+
import Control.Monad.Trans (lift)
12+
import Data.Aeson
13+
import qualified Data.ByteString as B
14+
import qualified Data.ByteString.Streaming.Char8 as Q
15+
import qualified Data.Text as T
16+
import Kubernetes.Core
17+
import Kubernetes.Client
18+
import Kubernetes.MimeTypes
19+
import Kubernetes.Model (Watch(..))
20+
import Network.HTTP.Client
21+
22+
data WatchEvent a = WatchEvent
23+
{ _eventType :: T.Text
24+
, _eventObject :: a
25+
} deriving (Eq, Show)
26+
27+
instance FromJSON a => FromJSON (WatchEvent a) where
28+
parseJSON (Object x) = WatchEvent <$> x .: "type" <*> x .: "object"
29+
parseJSON _ = fail "Expected an object"
30+
31+
instance ToJSON a => ToJSON (WatchEvent a) where
32+
toJSON x = object
33+
[ "type" .= _eventType x
34+
, "object" .= _eventObject x
35+
]
36+
37+
-- | Type of the 'WatchEvent'.
38+
eventType :: WatchEvent a -> T.Text
39+
eventType = _eventType
40+
41+
-- | Object within the 'WatchEvent'.
42+
eventObject :: WatchEvent a -> a
43+
eventObject = _eventObject
44+
45+
{-| Dispatch a request setting watch to true. Takes a consumer function
46+
which consumes the 'Q.ByteString' stream. Following is a simple example which
47+
just streams to stdout. First some setup - this assumes kubernetes is accessible
48+
at http://localhost:8001, e.g. after running /kubectl proxy/:
49+
50+
@
51+
import qualified Data.ByteString.Streaming.Char8 as Q
52+
53+
manager <- newManager defaultManagerSettings
54+
defaultConfig <- newConfig
55+
config = defaultConfig { configHost = "http://localhost:8001", configValidateAuthMethods = False }
56+
request = listEndpointsForAllNamespaces (Accept MimeJSON)
57+
@
58+
59+
Launching 'dispatchWatch' with the above we get a stream of endpoints data:
60+
61+
@
62+
> dispatchWatch manager config request Q.stdout
63+
{"type":\"ADDED\","object":{"kind":\"Endpoints\","apiVersion":"v1","metadata":{"name":"heapster" ....
64+
@
65+
-}
66+
dispatchWatch ::
67+
(HasOptionalParam req Watch, MimeType accept, MimeType contentType) =>
68+
Manager
69+
-> KubernetesConfig
70+
-> KubernetesRequest req contentType resp accept
71+
-> (Q.ByteString IO () -> IO a)
72+
-> IO a
73+
dispatchWatch manager config request apply = do
74+
let watchRequest = applyOptionalParam request (Watch True)
75+
(InitRequest req) <- _toInitRequest config watchRequest
76+
withHTTP req manager $ \resp -> apply $ responseBody resp
77+
78+
withHTTP ::
79+
Request
80+
-> Manager
81+
-> (Response (Q.ByteString IO ()) -> IO a)
82+
-> IO a
83+
withHTTP request manager f = withResponse request manager f'
84+
where
85+
f' resp = do
86+
let p = (from . brRead . responseBody) resp
87+
f (resp {responseBody = p})
88+
from :: IO B.ByteString -> Q.ByteString IO ()
89+
from io = go
90+
where
91+
go = do
92+
bs <- lift io
93+
unless (B.null bs) $ do
94+
Q.chunk bs
95+
go

stack.yaml

+1-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,5 @@ extra-deps:
33
packages:
44
- kubernetes
55
- kubernetes-client-helper
6+
- kubernetes-watch
67
- kubeconfig
7-
nix:
8-
enable: true

0 commit comments

Comments
 (0)