Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagepy
titleMinio Python
from keycloak import KeycloakOpenID
import requests
import xmltodict
from minio import Minio
from minio.error import (ResponseError, BucketAlreadyOwnedByYou, BucketAlreadyExists)
import uuid
import io

def get_token():
   # Configure client
   keycloak_openid = KeycloakOpenID(server_url="http://keycloak:8080",
                                    client_id="minio-cli",
                                    realm_name="minio",
                                    client_secret_key="YVZGoUdprHmqexlX30stzGWwZtT2SNll")
   return keycloak_openid.token(grant_type='client_credentials')

def get_credentials():
   token = get_token()
   r = requests.post("http://minio:9000",
                     data={
                         'Action': "AssumeRoleWithWebIdentity",
                         'Version': "2011-06-15",
                         'WebIdentityToken': token['id_token'],
                         'DurationSeconds': token['expires_in']
                     })
   tree = xmltodict.parse(r.content)
   return dict(tree['AssumeRoleWithWebIdentityResponse']['AssumeRoleWithWebIdentityResult']['Credentials'])

# Retrieve credenstials
credenstials = get_credentials()

# Initialize Minio client
client = Minio(
        "minio:9000",
        access_key=credenstials['AccessKeyId'],
        secret_key=credenstials['SecretAccessKey'],
        session_token=credenstials['SessionToken'],
        secure=False,
        region=None,
        http_client=None,
        credentials=None
)

# Create bucket if it doesn't already exist
print("Create Bucket")
print("-------------")
bucket_name = "py-bucket"
found = client.bucket_exists(bucket_name)
if not found:
    client.make_bucket(bucket_name)
    print(f"Bucket '{bucket_name}' created")
else:
    print(f"Bucket '{bucket_name}' already exists")

buckets = client.list_buckets()

# Retrieve list of buckets
print("\nBucket List")
print("-----------")
for bucket in buckets:
    print(f"{bucket.name} created on {bucket.creation_date}")

# Upload an object to a bucket
data = "I want to stream some data to minio"
object_bytes = data.encode('utf-8')
object_stream = io.BytesIO(object_bytes)
object_name =  f"{uuid.uuid1()}.txt"

print("\nUpload To Bucket")
print("----------------")
try:
    client.put_object(bucket_name, object_name, object_stream , len(object_bytes))
    print(f"{object_name} uploaded to {bucket_name}")
except Exception as ex:
    raise ex

Sample output:

Image Added


A similar program written in go is available here: Minio Go ClientThis program

Links

Configure MinIO for Authentication using OpenID

...