Skip to content

Commit

Permalink
feat(admin): remove topics
Browse files Browse the repository at this point in the history
  • Loading branch information
JoranVanBelle committed Jan 10, 2025
1 parent 3b4d0c8 commit 50f001f
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 37 deletions.
57 changes: 48 additions & 9 deletions src/Kafka/Admin.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,16 @@ module Kafka.Admin(
module X
, newKAdmin
, createTopic
, deleteTopic
, closeKAdmin
) where

import Control.Monad
import Control.Monad.Trans.Class
import Control.Monad.Trans.Maybe
import Control.Monad.IO.Class
import Data.Text
import Data.Maybe
import Data.Bifunctor
import Data.List.NonEmpty
import qualified Data.List.NonEmpty as NEL
import qualified Data.Text as T
import qualified Data.Set as S

import Kafka.Internal.RdKafka
import Kafka.Internal.Setup
Expand Down Expand Up @@ -53,31 +49,74 @@ createTopic kAdmin topic = liftIO $ do
Right _ -> do
pure $ Right $ topicName topic

--- DELETE TOPIC ---
deleteTopic :: KAdmin
-> TopicName
-> IO (Either KafkaError TopicName)
deleteTopic kAdmin topic = liftIO $ do
let kafkaPtr = getRdKafka kAdmin
queue <- newRdKafkaQueue kafkaPtr
opts <- newRdKAdminOptions kafkaPtr RdKafkaAdminOpAny

topicRes <- withOldTopic topic $ \topic' -> rdKafkaDeleteTopics kafkaPtr [topic'] opts queue
case topicRes of
Left err -> do
pure $ Left (NEL.head err)
Right _ -> do
pure $ Right topic

withNewTopic :: NewTopic
-> (RdKafkaNewTopicTPtr -> IO a)
-> IO (Either (NonEmpty KafkaError) a)
withNewTopic t transform = do
mkNewTopicRes <- mkNewTopic t topicPtr
mkNewTopicRes <- mkNewTopic t newTopicPtr
case mkNewTopicRes of
Left err -> do
return $ Left err
Right topic -> do
res <- transform topic
return $ Right res

topicPtr :: NewTopic -> IO (Either KafkaError RdKafkaNewTopicTPtr)
topicPtr topic = do
withOldTopic :: TopicName
-> (RdKafkaDeleteTopicTPtr -> IO a)
-> IO (Either (NonEmpty KafkaError) a)
withOldTopic tName transform = do
rmOldTopicRes <- rmOldTopic tName oldTopicPtr
case rmOldTopicRes of
Left err -> do
return $ Left err
Right topic -> do
res <- transform topic
return $ Right res

newTopicPtr :: NewTopic -> IO (Either KafkaError RdKafkaNewTopicTPtr)
newTopicPtr topic = do
ptrRes <- newRdKafkaNewTopic (unpack $ unTopicName $ topicName topic) (unPartitionCount $ topicPartitionCount topic) (unReplicationFactor $ topicReplicationFactor topic)
case ptrRes of
Left str -> pure $ Left (KafkaError $ T.pack str)
Right ptr -> pure $ Right ptr

oldTopicPtr :: TopicName -> IO (Either KafkaError RdKafkaDeleteTopicTPtr)
oldTopicPtr tName = do
res <- newRdKafkaDeleteTopic $ unpack . unTopicName $ tName
case res of
Left str -> pure $ Left (KafkaError $ T.pack str)
Right ptr -> pure $ Right ptr

mkNewTopic :: NewTopic
-> (NewTopic -> IO (Either KafkaError a))
-> IO (Either (NonEmpty KafkaError) a)
mkNewTopic topic create = do
res <- create topic
case res of
Left err -> pure $ Left (Data.List.NonEmpty.singleton err)
Left err -> pure $ Left (NEL.singleton err)
Right resource -> pure $ Right resource

rmOldTopic :: TopicName
-> (TopicName -> IO (Either KafkaError a))
-> IO (Either (NonEmpty KafkaError) a)
rmOldTopic tName remove = do
res <- remove tName
case res of
Left err -> pure $ Left (NEL.singleton err)
Right resource -> pure $ Right resource
56 changes: 28 additions & 28 deletions src/Kafka/Internal/RdKafka.chs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import Foreign.Storable (Storable(..))
import Foreign.Ptr (Ptr, FunPtr, castPtr, nullPtr)
import Foreign.ForeignPtr (FinalizerPtr, addForeignPtrFinalizer, newForeignPtr_, withForeignPtr, ForeignPtr, newForeignPtr)
import Foreign.C.Error (Errno(..), getErrno)
import Foreign.C.String (CString, newCString, withCAString, peekCAString, peekCString)
import Foreign.C.String (CString, newCString, withCString, withCAString, peekCAString, peekCString)
import Foreign.C.Types (CFile, CInt(..), CSize, CChar, CLong)
import System.IO (Handle, stdin, stdout, stderr)
import System.Posix.IO (handleToFd)
Expand Down Expand Up @@ -1203,6 +1203,7 @@ newRdKafkaNewTopic topicName topicPartitions topicReplicationFactor = do
then peekCString ptr >>= pure . Left
else addForeignPtrFinalizer rdKafkaNewTopicDestroyFinalizer res >> pure (Right res)

--- Create topic
rdKafkaCreateTopic :: RdKafkaTPtr
-> RdKafkaNewTopicTPtr
-> RdKafkaAdminOptionsTPtr
Expand All @@ -1214,34 +1215,33 @@ rdKafkaCreateTopic kafkaPtr topic opts queue = do
withForeignPtrsArrayLen topics $ \tLen tPtr -> do
{#call rd_kafka_CreateTopics#} kPtr tPtr (fromIntegral tLen) oPtr qPtr

rdKafkaEventCreateTopicsResult :: RdKafkaEventTPtr -> IO (Maybe RdKafkaTopicResultTPtr)
rdKafkaEventCreateTopicsResult evtPtr =
withForeignPtr evtPtr $ \evtPtr' -> do
res <- {#call rd_kafka_event_CreateTopics_result#} (castPtr evtPtr')
--- Delete topic
foreign import ccall unsafe "rdkafka.h &rd_kafka_DeleteTopic_destroy"
rdKafkaDeleteTopicDestroy :: FinalizerPtr RdKafkaDeleteTopicT

data RdKafkaDeleteTopicT
{#pointer *rd_kafka_DeleteTopic_t as RdKafkaDeleteTopicTPtr foreign -> RdKafkaDeleteTopicT #}

data RdKafkaDeleteTopicsResultT
{#pointer *rd_kafka_DeleteTopics_result_t as RdKafkaDeleteTopicResultTPtr foreign -> RdKafkaDeleteTopicsResultT #}

newRdKafkaDeleteTopic :: String -> IO (Either String RdKafkaDeleteTopicTPtr)
newRdKafkaDeleteTopic topicNameStr =
withCString topicNameStr $ \topicNameStrPtr -> do
res <- {#call rd_kafka_DeleteTopic_new#} topicNameStrPtr
if (res == nullPtr)
then pure Nothing
else Just <$> newForeignPtr_ (castPtr res)

rdKafkaCreateTopicsResultTopics :: RdKafkaTopicResultTPtr
-> IO [Either (String, RdKafkaRespErrT, String) String]
rdKafkaCreateTopicsResultTopics tRes =
withForeignPtr tRes $ \tRes' ->
alloca $ \sPtr -> do
res <- {#call rd_kafka_CreateTopics_result_topics#} (castPtr tRes') sPtr
size <- peekIntConv sPtr
array <- peekArray size res
traverse unpackRdKafkaTopicResult array

unpackRdKafkaTopicResult :: Ptr RdKafkaTopicResultT
-> IO (Either (String, RdKafkaRespErrT, String) String)
unpackRdKafkaTopicResult resPtr = do
name <- {#call rd_kafka_topic_result_name#} resPtr >>= peekCString
err <- {#call rd_kafka_topic_result_error#} resPtr
case cIntToEnum err of
respErr -> do
errMsg <- {#call rd_kafka_topic_result_error_string#} resPtr >>= peekCString
pure $ Left (name, respErr, errMsg)
RdKafkaRespErrNoError -> pure $ Right name
then return $ Left $ "Something went wrong while deleting topic " ++ topicNameStr
else Right <$> newForeignPtr rdKafkaDeleteTopicDestroy res

rdKafkaDeleteTopics :: RdKafkaTPtr
-> [RdKafkaDeleteTopicTPtr]
-> RdKafkaAdminOptionsTPtr
-> RdKafkaQueueTPtr
-> IO ()
rdKafkaDeleteTopics kafkaPtr topics opts queue = do
withForeignPtrs kafkaPtr opts queue $ \kPtr oPtr qPtr ->
withForeignPtrsArrayLen topics $ \tLen tPtr -> do
{#call rd_kafka_DeleteTopics#} kPtr tPtr (fromIntegral tLen) oPtr qPtr

-- Marshall / Unmarshall
enumToCInt :: Enum a => a -> CInt
Expand Down
33 changes: 33 additions & 0 deletions tests-it/Kafka/IntegrationSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,46 @@ spec = do
describe "Kafka.Admin.Spec" $ do
let topicName = addRandomChars "admin.topic.created." 5

topicsMVar <- runIO newEmptyMVar

specWithAdmin "Create topic" $ do

it "should create a new topic" $ \(admin :: KAdmin) -> do
tName <- topicName
let newTopic = mkNewTopic (TopicName ( T.pack(tName) ))
result <- createTopic admin newTopic
result `shouldSatisfy` isRight

specWithConsumer "Read all topics" consumerProps $ do

it "should return all the topics" $ \(consumer :: KafkaConsumer) -> do
res <- allTopicsMetadata consumer (Timeout 1000)
res `shouldSatisfy` isRight
let filterUserTopics m = m { kmTopics = filter (\t -> topicType (tmTopicName t) == User) (kmTopics m) }
let res' = fmap filterUserTopics res
length . kmBrokers <$> res' `shouldBe` Right 1

putStrLn $ either
(const "Nothing to show")
(unlines . map (T.unpack . unTopicName . tmTopicName) . kmTopics)
res'

let topics = either (const []) (map tmTopicName . kmTopics) res'
putMVar topicsMVar topics

let topicsLen = either (const 0) (length . kmTopics) res'
let hasTopic = either (const False) (any (\t -> tmTopicName t == testTopic) . kmTopics) res'

topicsLen `shouldSatisfy` (>0)
hasTopic `shouldBe` True

specWithAdmin "Remove topics" $ do

it "should delete all the topics currently existing" $ \(admin ::KAdmin) -> do
topics <- takeMVar topicsMVar
forM_ topics $ \topic -> do
result <- deleteTopic admin topic
result `shouldSatisfy` isRight
----------------------------------------------------------------------------------------------------------------

data ReadState = Skip | Read
Expand Down

0 comments on commit 50f001f

Please sign in to comment.