compute special remote mostly implemented
authorJoey Hess <joeyh@joeyh.name>
Fri, 21 Feb 2025 19:02:53 +0000 (15:02 -0400)
committerJoey Hess <joeyh@joeyh.name>
Fri, 21 Feb 2025 19:02:53 +0000 (15:02 -0400)
Except for some of the hard parts: progress displays, incremental
verification, and getting inputs before running a computation.

Untested! In order to test this, git-annex addcomputed needs to be
implemented.

Remote/Compute.hs

index 18ebe950f74d729f1dacc8f248d31f11c15f95d1..3012337f3db395df5b549e851080777a9692aca5 100644 (file)
@@ -5,7 +5,19 @@
  - Licensed under the GNU AGPL version 3 or higher.
  -}
 
-module Remote.Compute (remote) where
+{-# LANGUAGE OverloadedStrings #-}
+
+module Remote.Compute (
+       remote,
+       Interface,
+       ComputeState(..),
+       setComputeState,
+       getComputeStates,
+       InterfaceEnv,
+       interfaceEnv,
+       getComputeProgram,
+       runComputeProgram,
+) where
 
 import Annex.Common
 import Types.Remote
@@ -18,21 +30,23 @@ import Remote.Helper.Special
 import Remote.Helper.ExportImport
 import Annex.SpecialRemote.Config
 import Annex.UUID
+import Annex.Content
+import Annex.Tmp
 import Logs.MetaData
 import Utility.Metered
-import Utility.Hash
 import Utility.TimeStamp
-import Git.FilePath
+import Utility.Env
 import qualified Git
 import qualified Utility.SimpleProtocol as Proto
 
+import Network.HTTP.Types.URI
 import Control.Concurrent.STM
 import Data.Time.Clock
 import Data.Either
-import Data.Char
-import Data.Ord
+import Text.Read
 import qualified Data.Map as M
 import qualified Data.Set as S
+import qualified Data.ByteString as B
 import qualified Data.Text as T
 import qualified Data.Text.Encoding as T
 
@@ -106,19 +120,20 @@ computeConfigParser :: RemoteConfig -> Annex RemoteConfigParser
 computeConfigParser rc = do
        Interface interface <- case getComputeProgram rc of
                Left _ -> pure $ Interface []
-               Right program -> liftIO (getInterfaceUncached program) >>= return . \case
+               Right program -> liftIO (getInterface program) >>= return . \case
                        Left _ -> Interface []
                        Right interface -> interface
        let m = M.fromList $ mapMaybe collectfields interface
-       let ininterface f = case toField (fromProposedAccepted f) of
-               Just f' -> M.member f' m
-               Nothing -> False
+       let ininterface f = M.member (Field (fromProposedAccepted f)) m
        return $ RemoteConfigParser
                { remoteConfigFieldParsers = 
                        [ optionalStringParser programField
                                (FieldDesc $ "compute program (must start with \"" ++ safetyPrefix ++ "\")")
                        ]
-               , remoteConfigRestPassthrough = Just (ininterface, M.toList $ M.mapKeys fromField m)
+               , remoteConfigRestPassthrough = Just
+                       ( ininterface
+                       , M.toList $ M.mapKeys fromField m
+                       )
                }
   where
        collectfields (InterfaceInput f d) = Just (f, FieldDesc d)
@@ -150,7 +165,7 @@ programField = Accepted "program"
 
 type Description = String
 
-newtype Field = Field MetaField
+newtype Field = Field { fromField :: String }
        deriving (Show, Eq, Ord)
 
 data InterfaceItem
@@ -175,23 +190,33 @@ instance Proto.Receivable InterfaceItem where
        parseCommand "VALUE?" = Proto.parse2 InterfaceOptionalValue
        parseCommand "OUTPUT" = Proto.parse2 InterfaceOutput
        parseCommand "REPRODUCIBLE" = Proto.parse0 InterfaceReproducible
+       parseCommand _ = Proto.parseFail
+
+data ProcessOutput
+       = Computing Field FilePath
+       | Progress PercentFloat
+       deriving (Show, Eq)
+
+instance Proto.Receivable ProcessOutput where
+       parseCommand "COMPUTING" = Proto.parse2 Computing
+       parseCommand "PROGRESS" = Proto.parse1 Progress
+       parseCommand _ = Proto.parseFail
 
 instance Proto.Serializable Field where
        serialize = fromField
-       deserialize = toField
+       deserialize = Just . Field
 
--- While MetaField is case insensitive, environment variable names are not,
--- so make Field always lower cased.
-toField :: String -> Maybe Field
-toField f = Field <$> toMetaField (T.pack (map toLower f))
+newtype PercentFloat = PercentFloat Float
+       deriving (Show, Eq)
 
-fromField :: Field -> String
-fromField (Field f) = T.unpack (fromMetaField f) 
+instance Proto.Serializable PercentFloat where
+       serialize (PercentFloat p) = show p
+       deserialize s = PercentFloat <$> readMaybe s
 
-getInterface :: ComputeProgram -> TMVar (Maybe Interface) -> IO (Either String Interface)
-getInterface program iv =
+getInterfaceCached :: ComputeProgram -> TMVar (Maybe Interface) -> IO (Either String Interface)
+getInterfaceCached program iv =
        atomically (takeTMVar iv) >>= \case
-               Nothing -> getInterfaceUncached program >>= \case
+               Nothing -> getInterface program >>= \case
                        Left err -> do
                                atomically $ putTMVar iv Nothing
                                return (Left err)
@@ -202,8 +227,8 @@ getInterface program iv =
                atomically $ putTMVar iv (Just interface)
                return (Right interface)
 
-getInterfaceUncached :: ComputeProgram -> IO (Either String Interface)
-getInterfaceUncached (ComputeProgram program) = 
+getInterface :: ComputeProgram -> IO (Either String Interface)
+getInterface (ComputeProgram program) = 
        catchMaybeIO (readProcess program ["interface"]) >>= \case
                Nothing -> return $ Left $ "Failed to run " ++ program
                Just output -> return $ case parseInterface output of
@@ -235,148 +260,294 @@ data ComputeState = ComputeState
        { computeInputs :: M.Map Field ComputeInput
        , computeValues :: M.Map Field ComputeValue
        , computeOutputs :: M.Map Field ComputeOutput
-       , computeTimeEstimate :: NominalDiffTime
        }
        deriving (Show, Eq)
 
--- Generates a hash of a ComputeState.
---
--- This is used as a short unique identifier in the metadata fields,
--- since more than one ComputeState may be stored in the compute remote's
--- metadata for a given Key.
---
--- A md5 is fine for this. It does not need to protect against intentional
--- collisions. And 2^64 is a sufficiently small chance of accidental
--- collision.
-hashComputeState :: ComputeState -> String
-hashComputeState state = show $ md5s $
-       mconcat (map (go goi) (M.toAscList (computeInputs state)))
-       <>
-       mconcat (map (go gov) (M.toAscList (computeValues state)))
-       <>
-       mconcat (map (go goo) (M.toAscList (computeOutputs state)))
-       <>
-       encodeBS (show (computeTimeEstimate state))
+{- Formats a ComputeState as an URL query string.
+ -
+ - Prefixes fields with "k" and "f" for computeInputs, with
+ - "v" for computeValues and "o" for computeOutputs.
+ -
+ - When the passed Key is an output, rather than duplicate it
+ - in the query string, that output has no value.
+ -
+ - Fields in the query string are sorted. This is in order to ensure
+ - that the same ComputeState is always formatted the same way.
+ -
+ - Example: "ffoo=somefile&kfoo=WORM--foo&oresult&vbar=11"
+ -}
+formatComputeState :: Key -> ComputeState -> B.ByteString
+formatComputeState k st = renderQuery False $ sortOn fst $ concat
+       [ concatMap formatinput $ M.toList (computeInputs st)
+       , map formatvalue $ M.toList (computeValues st)
+       , map formatoutput $ M.toList (computeOutputs st)
+       ]
   where
-       go c (Field f, v) = T.encodeUtf8 (fromMetaField f) <> c v
-       goi (ComputeInput k f) = serializeKey' k <> encodeBS f
-       gov (ComputeValue s) = encodeBS s
-       goo (ComputeOutput k) = serializeKey' k
-
-computeStateMetaData :: ComputeState -> MetaData
-computeStateMetaData = undefined
-
--- FIXME: Need to unswizzle the mixed up metadata based on hash prefixes.
-metaDataComputeStates :: MetaData -> [ComputeState]
-metaDataComputeStates (MetaData m) =
-       go (ComputeState mempty mempty mempty 0) (M.toList m)
+       formatinput (f, ComputeInput key file) =
+               [ ("k" <> fb, Just (serializeKey' key))
+               , ("f" <> fb, Just (toRawFilePath file))
+               ]
+         where
+               fb = encodeBS (fromField f)
+       formatvalue (f, ComputeValue v) =
+               ("v" <> encodeBS (fromField f), Just (encodeBS v))
+       formatoutput (f, ComputeOutput key) = 
+               ("o" <> encodeBS (fromField f),
+                       if key == k
+                               then Nothing
+                               else Just (serializeKey' key)
+               )
+
+parseComputeState :: Key -> B.ByteString -> Maybe ComputeState
+parseComputeState k b =
+       let q = parseQuery b
+           st = go emptycomputestate (M.fromList q) q
+       in if st == emptycomputestate then Nothing else Just st
   where
-       go c ((f,v):rest) = 
-               let c' = case T.unpack (fromMetaField f) of
-                       ('i':'n':'p':'u':'t':'-':f') -> case M.lookup m =<< toMetaField (T.pack ("key-" ++ f')) of
-                               Nothing -> c
-                               Just kv -> case deserializeKey' (fromMetaValue kv) of
-                                       Just k -> c
-                                               { computeInputs =
-                                                       M.insert (toField f)
-                                                               (ComputeInput k (decodeBS (fromMetaValue v)))
-                                                               (computeOutputs c)
-                                               }
-                                       Nothing -> c
-                       ('v':'a':'l':'u':'e':'-':f') -> c 
-                               { computeValues = 
-                                       M.insert (toField f)
-                                               (ComputeValue (decodeBS (fromMetaValue v)))
-                                               (computeValues c)
-                               }
-                       ('o':'u':'t':'p':'u':'t':'-':f') ->
-                               case deserializeKey' (fromMetaValue v) of
-                                       Just k -> c
+       emptycomputestate = ComputeState mempty mempty mempty
+       go c _ [] = c
+       go c m ((f, v):rest) = 
+               let c' = fromMaybe c $ case decodeBS f of
+                       ('f':f') -> do
+                               file <- fromRawFilePath <$> v
+                               kv <- M.lookup (encodeBS ('k':f')) m
+                               key <- deserializeKey' =<< kv
+                               Just $ c
+                                       { computeInputs =
+                                               M.insert (Field f')
+                                                       (ComputeInput key file)
+                                                       (computeInputs c)
+                                       }
+                       ('v':f') -> do
+                               val <- decodeBS <$> v
+                               Just $ c
+                                       { computeValues = 
+                                               M.insert (Field f')
+                                                       (ComputeValue val)
+                                                       (computeValues c)
+                                       }
+                       ('o':f') -> case v of
+                               Just kv -> do
+                                       key <- deserializeKey' kv
+                                       Just $ c
                                                { computeOutputs = 
-                                                       M.insert (toField f)
-                                                               (ComputeOutput k)
+                                                       M.insert (Field f')
+                                                               (ComputeOutput key)
                                                                (computeOutputs c)
                                                }
-                                       Nothing -> c
-                       ('t':'i':'m':'e':'-':f') ->
-                               case parsePOSIXTime (fromMetaValue v) of
-                                       Just t -> c { computeTimeEstimate = t }
-                                       Nothing -> c
-                       _ -> c
-               in go c' rest
-
-getComputeStates :: RemoteStateHandle -> Key -> Annex [ComputeState]
+                               Nothing -> Just $ c
+                                       { computeOutputs =
+                                               M.insert (Field f')
+                                                       (ComputeOutput k)
+                                                       (computeOutputs c)
+                                       }
+                       _ -> Nothing
+               in go c' m rest
+
+{- The per remote metadata is used to store ComputeState. This allows
+ - recording multiple ComputeStates that generate the same key.
+ -
+ - The metadata fields are numbers (prefixed with "t" to make them legal
+ - field names), which are estimates of how long it might take to run
+ - the computation (in seconds).
+ -}
+setComputeState :: RemoteStateHandle -> Key -> NominalDiffTime -> ComputeState -> Annex ()
+setComputeState rs k ts st = addRemoteMetaData k rs $ MetaData $ M.singleton
+       (mkMetaFieldUnchecked $ T.pack ('t':show (truncateResolution 1 ts)))
+       (S.singleton (MetaValue (CurrentlySet True) (formatComputeState k st)))
+
+getComputeStates :: RemoteStateHandle -> Key -> Annex [(NominalDiffTime, ComputeState)]
 getComputeStates rs k = do
-       RemoteMetaData _ m <- getCurrentRemoteMetaData rs k
-       return (metaDataComputeStates m)
+       RemoteMetaData _ (MetaData m) <- getCurrentRemoteMetaData rs k
+       return $ go [] (M.toList m)
+  where
+       go c [] = concat c
+       go c ((f, s) : rest) =
+               let sts = mapMaybe (parseComputeState k . fromMetaValue)
+                       (S.toList s)
+               in case parsePOSIXTime (T.encodeUtf8 (T.drop 1 (fromMetaField f))) of
+                       Just ts -> go (zip (repeat ts) sts : c) rest
+                       Nothing -> go c rest
+
+data InterfaceEnv = InterfaceEnv [(String, Either Key String)]
 
-setComputeState :: RemoteStateHandle -> Key -> ComputeState -> Annex ()
-setComputeState rs k st = addRemoteMetaData k rs (computeStateMetaData st)
+data InterfaceOutputs = InterfaceOutputs (M.Map Field Key)
 
 {- Finds the first compute state that provides everything required by the
  - interface, and returns a list of what should be provided to the program
- - in its environment.
+ - in its environment, and what outputs the program is expected to make.
  -}
-interfaceEnv :: [ComputeState] -> Interface -> Either String [(String, Either Key String)]
+interfaceEnv :: [ComputeState] -> Interface -> Either String (InterfaceEnv, InterfaceOutputs)
 interfaceEnv states interface = go Nothing states
   where
        go (Just firsterr) [] = Left firsterr
-       go Nothing [] = interfaceEnv' (ComputeState mempty mempty mempty 0) interface
+       go Nothing [] = interfaceEnv' (ComputeState mempty mempty mempty) interface
        go firsterr (state:rest) = case interfaceEnv' state interface of
                Right v -> Right v
                Left e
                        | null rest -> Left (fromMaybe e firsterr)
                        | otherwise -> go (firsterr <|> Just e) rest
 
-interfaceEnv' :: ComputeState -> Interface -> Either String [(String, Either Key String)]
-interfaceEnv' state (Interface interface) = 
-       case partitionEithers (mapMaybe go interface) of
-               ([], env) -> Right $
-                       map (\(f, v) -> (fromField f, v)) env
+interfaceEnv' :: ComputeState -> Interface -> Either String (InterfaceEnv, InterfaceOutputs)
+interfaceEnv' state interface@(Interface i) = 
+       case partitionEithers (mapMaybe go i) of
+               ([], r) -> Right 
+                       ( InterfaceEnv (map (\(f, v) -> (fromField f, v)) r)
+                       , interfaceOutputs state interface
+                       )
                (problems, _) -> Left $ unlines problems
   where
-       go (InterfaceInput name desc) =
-               case M.lookup name (computeInputs state) of
+       go (InterfaceInput field desc) =
+               case M.lookup field (computeInputs state) of
                        Just (ComputeInput key _file) -> Just $
-                               Right (name, Left key)
+                               Right (field, Left key)
                        Nothing -> Just $
-                               Left $ "Missing required input \"" ++ fromField name ++ "\" -- " ++ desc
-       go (InterfaceOptionalInput name desc) =
-               case M.lookup name (computeInputs state) of
+                               Left $ "Missing required input \"" ++ fromField field ++ "\" -- " ++ desc
+       go (InterfaceOptionalInput field _desc) =
+               case M.lookup field (computeInputs state) of
                        Just (ComputeInput key _file) -> Just $
-                               Right (name, Left key)
+                               Right (field, Left key)
                        Nothing -> Nothing
-       go (InterfaceValue name desc) =
-               case M.lookup name (computeValues state) of
+       go (InterfaceValue field desc) =
+               case M.lookup field (computeValues state) of
                        Just (ComputeValue v) -> Just $
-                               Right (name, Right v)
-                       nothing -> Just $
-                               Left $ "Missing required value \"" ++ fromField name ++ "\" -- " ++ desc
-       go (InterfaceOptionalValue name desc) =
-               case M.lookup name (computeValues state) of
+                               Right (field, Right v)
+                       Nothing -> Just $
+                               Left $ "Missing required value \"" ++ fromField field ++ "\" -- " ++ desc
+       go (InterfaceOptionalValue field _desc) =
+               case M.lookup field (computeValues state) of
                        Just (ComputeValue v) -> Just $
-                               Right (name, Right v)
+                               Right (field, Right v)
                        Nothing -> Nothing
        go (InterfaceOutput _ _) = Nothing
        go InterfaceReproducible = Nothing
 
+interfaceOutputs :: ComputeState -> Interface -> InterfaceOutputs
+interfaceOutputs state (Interface interface) =
+       InterfaceOutputs $ M.fromList $ mapMaybe go interface
+  where
+       go (InterfaceOutput field _) = do
+               ComputeOutput key <- M.lookup field (computeOutputs state)
+               Just (field, key)
+       go _ = Nothing
+
+computeProgramEnvironment :: InterfaceEnv -> Annex [(String, String)]
+computeProgramEnvironment (InterfaceEnv ienv) = do
+       environ <- filter (caninherit . fst) <$> liftIO getEnvironment
+       interfaceenv <- mapM go ienv
+       return $ environ ++ interfaceenv
+  where
+       envprefix = "ANNEX_COMPUTE_"
+       caninherit v = not (envprefix `isPrefixOf` v)
+       go (f, Right v) = return (envprefix ++ f, v)
+       go (f, Left k) =
+               ifM (inAnnex k)
+                       ( do
+                               objloc <- calcRepo (gitAnnexLocation k)
+                               return (envprefix ++ f, fromOsPath objloc)
+                       , giveup "missing an input to the computation"
+                       )
+
+runComputeProgram
+       :: ComputeProgram
+       -> Key
+       -> AssociatedFile
+       -> OsPath
+       -> MeterUpdate
+       -> VerifyConfig
+       -> (InterfaceEnv, InterfaceOutputs)
+       -> Annex Verification
+runComputeProgram (ComputeProgram program) k _af dest p vc (ienv, InterfaceOutputs iout) = do
+       environ <- computeProgramEnvironment ienv
+       withOtherTmp $ \tmpdir -> 
+               go environ tmpdir
+                       `finally` liftIO (removeDirectoryRecursive tmpdir)
+  where
+       go environ tmpdir = do
+               let pr = (proc program [])
+                        { cwd = Just $ fromOsPath tmpdir
+                        , std_out = CreatePipe
+                        , env = Just environ
+                        }
+               computing <- liftIO $ withCreateProcess pr $
+                       processoutput mempty tmpdir
+               finish computing tmpdir
+       
+       processoutput computing tmpdir _ (Just h) _ pid =
+               hGetLineUntilExitOrEOF pid h >>= \case
+                       Just l
+                               | null l -> processoutput computing tmpdir Nothing (Just h) Nothing pid
+                               | otherwise -> parseoutput computing l >>= \case
+                                       Just computing' -> 
+                                               processoutput computing' tmpdir Nothing (Just h) Nothing pid
+                                       Nothing -> do
+                                               hClose h
+                                               ifM (checkSuccessProcess pid)
+                                                       ( giveup $ program ++ " output included an unparseable line: \"" ++ l ++ "\""
+                                                       , giveup $ program ++ " exited unsuccessfully"
+                                                       )
+                       Nothing -> do
+                               hClose h
+                               unlessM (checkSuccessProcess pid) $
+                                       giveup $ program ++ " exited unsuccessfully"
+                               return computing
+       processoutput _ _ _ _ _ _ = error "internal"
+       
+       parseoutput computing l = case Proto.parseMessage l of
+               Just (Computing field file) ->
+                       case M.lookup field iout of
+                               Just key -> do
+                                       when (key == k) $
+                                               -- XXX can start watching the file and updating progess now
+                                               return ()
+                                       return $ Just $
+                                               M.insert key (toRawFilePath file) computing
+                               Nothing -> return (Just computing)
+               Just (Progress percent) -> do
+                       -- XXX
+                       return Nothing
+               Nothing -> return Nothing
+       
+       finish computing tmpdir = do
+               case M.lookup k computing of
+                       Nothing -> giveup $ program ++ " exited successfully, but failed to output a filename"
+                       Just file -> do
+                               let file' = tmpdir </> file
+                               unlessM (liftIO $ doesFileExist file') $
+                                       giveup $ program ++ " exited sucessfully, but failed to write the computed file"
+                               catchNonAsync (liftIO $ moveFile file' dest)
+                                       (\err -> giveup $ "failed to move the computed file: " ++ show err)
+               
+               -- Try to move any other computed object files into the annex.
+               forM_ (M.toList computing) $ \(key, file) ->
+                       when (k /= key) $ do
+                               let file' = tmpdir </> file
+                               whenM (liftIO $ doesFileExist file') $
+                                       whenM (verifyKeyContentPostRetrieval RetrievalAllKeysSecure vc verification k file') $
+                                               void $ tryNonAsync $ moveAnnex k file'
+
+               return verification
+       
+       -- The program might not be reproducible, so require strong
+       -- verification.
+       verification = MustVerify
+
 computeKey :: RemoteStateHandle -> ComputeProgram -> TMVar (Maybe Interface) -> Key -> AssociatedFile -> OsPath -> MeterUpdate -> VerifyConfig -> Annex Verification
-computeKey rs program iv k _af dest p vc =
-       liftIO (getInterface program iv) >>= \case
+computeKey rs program iv k af dest p vc =
+       liftIO (getInterfaceCached program iv) >>= \case
                Left err -> giveup err
                Right interface -> do
-                       states <- sortBy (comparing computeTimeEstimate)
+                       states <- map snd . sortOn fst
                                <$> getComputeStates rs k
-                       case interfaceEnv states interface of
-                               Left err -> giveup err
-                               Right ienv -> undefined -- TODO
+                       either giveup (runComputeProgram program k af dest p vc)
+                               (interfaceEnv states interface)
 
 -- Make sure that the compute state has everything needed by
 -- the program's current interface.
 checkKey :: RemoteStateHandle -> ComputeProgram -> TMVar (Maybe Interface) -> Key -> Annex Bool
 checkKey rs program iv k = do
-       states <- getComputeStates rs k
-       liftIO (getInterface program iv) >>= \case
+       states <- map snd <$> getComputeStates rs k
+       liftIO (getInterfaceCached program iv) >>= \case
                Left err -> giveup err
                Right interface ->
                        case interfaceEnv states interface of